Data Engineering/Hadoop

[Hadoop] Java로 MapReduce 구현하기

snoony 2024. 3. 6. 16:58

rocky linux 환경에 intellij 다운받고 실행

다운받은 intellij 파일 root로 옮기고 압축 푼 후

[root@localhost ~]# cd idea-IC-233.14475.28/bin
[root@localhost bin]# ./idea.sh

실행

MapReduce를 위한 Java Project 설정

 pom.xml에 dependencies 추가 - mvn repository에서 검색

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.6</version>
        </dependency

WordCountMapper.java

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer st = new StringTokenizer(line," ");
        while(st.hasMoreTokens()) {
            word.set(st.nextToken());
            context.write(word,one);
        }
    }
}

WordCountReducer.java

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum = 0;
        Iterator<IntWritable> valueslt = values.iterator();
        while(valueslt.hasNext()){
            sum += valueslt.next().get();
        }
        context.write(key, new IntWritable(sum));
    }
}

WordCount.java

package com.mycompany.wordcount;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.Text;

public class WordCount extends Configured implements Tool {
    public static void main(String[] args) throws Exception{
            int exitCode = ToolRunner.run(new WordCount(), args);
            System.exit(exitCode);
        }
    @Override
    public int run(String[] args) throws Exception {
        if(args.length != 2) {
            System.err.println("error");
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }
        Job job = new Job();
        job.setJarByClass(WordCount.class);
        job.setJobName("Wordcounter");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        
        int returnValue = job.waitForCompletion(true)? 0:1;
        System.out.println("job Successful" + job.isSuccessful());
        return returnValue;
    }

}

 

jar 파일로 빌드

 

scp 명령어

C:\Users\Playdata>scp [jar파일 경로] root@[리눅스서버의ip]:/root/hadoop-3.3.6
scp [OPTIONS] [[user@]src_host:]fil1 [[user@]dest_host:]file2

scp를 이용해서 빌드한 jar 파일 linux 서버로 옮기기

rocky linux에서 확인 가능

[root@localhost hadoop-3.3.6]# sbin/start-all.sh

dfs, yarn 다시 실행시키기

[root@localhost hadoop-3.3.6]# hadoop fs -rm -r output/
[root@localhost hadoop-3.3.6]# hadoop fs -mkdir input
[root@localhost ~]# cd hadoop-3.3.6/
[root@localhost hadoop-3.3.6]# hadoop fs -put LICENSE.txt input
[root@localhost hadoop-3.3.6]# hadoop fs -ls input
Found 1 items
-rw-r--r--   1 root supergroup      15217 2024-03-07 10:16 input/LICENSE.txt

jar 파일 실행

[root@localhost hadoop-3.3.6]# hadoop jar wordcount-1.0-SNAPSHOT.jar com.mycompany.wordcount.WordCount input/LICENSE.txt output
[root@localhost hadoop-3.3.6]# hadoop fs -ls output
Found 2 items
-rw-r--r--   1 root supergroup          0 2024-03-07 10:23 output/_SUCCESS
-rw-r--r--   1 root supergroup       9894 2024-03-07 10:23 output/part-r-00000
[root@localhost hadoop-3.3.6]# hadoop fs -get output/part-r-00000 
[root@localhost hadoop-3.3.6]# hadoop fs -cat output/*

 

MapReduce 메모리 설정 및 최적화

/etc/hadoop/mapred-site.xml 수정

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>mapreduce.application.classpath</name>
        <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
    </property>
    <property>
    	<name>mapreduce.job.maps</name>
    	<value>100</value>
    </property>
    <property>
    	<name>mapreduce.job.reduces</name>
    	<value>50</value>
    </property>
    <property>
    	<name>mapreduce.task.io.sort.mb</name>
    	<value>200</value>
    </property>
    <property>
    	<name>mapreduce.map.sort.spill.percent</name>
    	<value>0.80</value>
    </property>
    <property>
    	<name>mapreduce.task.io.sort.factor</name>
    	<value>100</value>
    </property>
    <property>
    	<name>mapreduce.map.output.compress</name>
    	<value>true</value>
    </property>
    <property>
    	<name>mapreduce.map.output.compress.codec</name>
    	<value>org.apache.hadoop.io.compress.SnappyCodec</value>
    </property>
</configuration>

WordCount.java 수정

job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

hadoop에서 jar 파일 다시 실행하기