MapReduce 를 사용하여 텍스트 파일 입력으로 부터 단어수를 계산해내는 아주 간단한 예제를 만들어보았다.


한글의 경우는 형태소분석이라던지 여러 축적인 알고리즘이 필요하기 때문에 하지 간단한 예제가 될수 없어서, 정말 공백이 나오기만 하면(Tokenize 되면) 그냥 단어로 계산하게 하는 예제 이다.


기본적으로 MapReduce 는 두개의 클래스를 구현하고, 두개의 메소드만 구현하면 Hadoop이 알아서 실행시켜주기 때문에 여러 큰 데이터들로 부터 정보를 가공하기 위한 방법으로 사용하기 좋다.


Driver.java ( 클래스 등록)

package hopeisagoodthing;

import org.apache.hadoop.util.ProgramDriver;

public class Driver {
        public static void main(String[] args) {                
                ProgramDriver pgd = new ProgramDriver();
                try {
                        pgd.addClass("counter",Counter.class,"");
                        pgd.driver(args);                       
                }
                catch(Throwable e) {
                        e.printStackTrace();
                }

                System.exit(0);
        }
}




Counter.java( map - reduce)

package hopeisagoodthing;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Counter {
        public static class CounterMapper extends Mapper<Object,Text,Text,IntWritable> {
                private final static IntWritable FOUND = new IntWritable(1);
                private Text word = new Text();
                public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
                        StringTokenizer iter = new StringTokenizer(value.toString());
                        while ( iter.hasMoreTokens() ) {
                                word.set(iter.nextToken());
                                context.write(word,FOUND);
                        }
                }
        }
        public static class CounterReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
                private IntWritable count = new IntWritable();
                public void reduce(Text key, Iterable<intwritable> values, Context context) throws IOException, InterruptedException {
                        int sum = 0;
                        for ( IntWritable val : values ) {
                                sum += val.get();
                        }
                        count.set(sum);
                        context.write(key,count);
                }
        }
        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
                Job job = new Job(conf,"counter");
                job.setJarByClass(Counter.class);
                job.setMapperClass(CounterMapper.class);
                job.setReducerClass(CounterReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                job.setNumReduceTasks(2);
                FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
                System.exit(job.waitForCompletion(true) ? 0 : 1 );
        }
}

실행방법

hadoop@ubuntu:~/work$ hadoop dfs -mkdir /data

hadoop@ubuntu:~/work$ hadoop dfs -put data/prince.txt /data/prince.txt

hadoop@ubuntu:~/work$ hadoop jar hopeisagoodthing.jar counter -jt local /data/prince.txt /data/prince_out


실행결과

12/11/08 07:47:31 INFO util.NativeCodeLoader: Loaded the native-hadoop library

12/11/08 07:47:31 INFO input.FileInputFormat: Total input paths to process : 1

12/11/08 07:47:31 WARN snappy.LoadSnappy: Snappy native library not loaded

12/11/08 07:47:31 INFO mapred.JobClient: Running job: job_local_0001

12/11/08 07:47:32 INFO util.ProcessTree: setsid exited with exit code 0

12/11/08 07:47:32 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1807ca8

12/11/08 07:47:32 INFO mapred.MapTask: io.sort.mb = 100

12/11/08 07:47:32 INFO mapred.JobClient:  map 0% reduce 0%

12/11/08 07:47:33 INFO mapred.MapTask: data buffer = 79691776/99614720

12/11/08 07:47:33 INFO mapred.MapTask: record buffer = 262144/327680

12/11/08 07:47:34 INFO mapred.MapTask: Starting flush of map output

12/11/08 07:47:34 INFO mapred.MapTask: Finished spill 0

12/11/08 07:47:34 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

12/11/08 07:47:35 INFO mapred.LocalJobRunner:

12/11/08 07:47:35 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.

12/11/08 07:47:35 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@76fba0

12/11/08 07:47:35 INFO mapred.LocalJobRunner:

12/11/08 07:47:35 INFO mapred.Merger: Merging 1 sorted segments

12/11/08 07:47:35 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 190948 bytes

12/11/08 07:47:35 INFO mapred.LocalJobRunner:

12/11/08 07:47:35 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

12/11/08 07:47:35 INFO mapred.LocalJobRunner:

12/11/08 07:47:35 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now

12/11/08 07:47:35 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to /data/prince_out

12/11/08 07:47:35 INFO mapred.JobClient:  map 100% reduce 0%

12/11/08 07:47:38 INFO mapred.LocalJobRunner: reduce > reduce

12/11/08 07:47:38 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.

12/11/08 07:47:38 INFO mapred.JobClient:  map 100% reduce 100%

12/11/08 07:47:38 INFO mapred.JobClient: Job complete: job_local_0001

12/11/08 07:47:38 INFO mapred.JobClient: Counters: 22

12/11/08 07:47:38 INFO mapred.JobClient:   File Output Format Counters

12/11/08 07:47:38 INFO mapred.JobClient:     Bytes Written=36074

12/11/08 07:47:39 INFO mapred.JobClient:   FileSystemCounters

12/11/08 07:47:39 INFO mapred.JobClient:     FILE_BYTES_READ=327856

12/11/08 07:47:39 INFO mapred.JobClient:     HDFS_BYTES_READ=184590

12/11/08 07:47:39 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=600714

12/11/08 07:47:39 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=36074

12/11/08 07:47:39 INFO mapred.JobClient:   File Input Format Counters

12/11/08 07:47:39 INFO mapred.JobClient:     Bytes Read=92295

12/11/08 07:47:39 INFO mapred.JobClient:   Map-Reduce Framework

12/11/08 07:47:39 INFO mapred.JobClient:     Map output materialized bytes=190952

12/11/08 07:47:39 INFO mapred.JobClient:     Map input records=1660

12/11/08 07:47:39 INFO mapred.JobClient:     Reduce shuffle bytes=0

12/11/08 07:47:39 INFO mapred.JobClient:     Spilled Records=33718

12/11/08 07:47:39 INFO mapred.JobClient:     Map output bytes=157228

12/11/08 07:47:39 INFO mapred.JobClient:     Total committed heap usage (bytes)=231874560

12/11/08 07:47:39 INFO mapred.JobClient:     CPU time spent (ms)=0

12/11/08 07:47:39 INFO mapred.JobClient:     SPLIT_RAW_BYTES=100

12/11/08 07:47:39 INFO mapred.JobClient:     Combine input records=0

12/11/08 07:47:39 INFO mapred.JobClient:     Reduce input records=16859

12/11/08 07:47:39 INFO mapred.JobClient:     Reduce input groups=3714

12/11/08 07:47:39 INFO mapred.JobClient:     Combine output records=0

12/11/08 07:47:39 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0

12/11/08 07:47:39 INFO mapred.JobClient:     Reduce output records=3714

12/11/08 07:47:39 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0

12/11/08 07:47:39 INFO mapred.JobClient:     Map output records=16859



실행결과 가지고 오기

hadoop@ubuntu:~/work$ hadoop dfs -get /data/prince_out ./


결과확인

hadoop@ubuntu:~/work/prince_out$ ls

part-r-00000  _SUCCESS


저작자 표시
신고

+ Recent posts

티스토리 툴바