hadoop2.2.0的WordCount程序

package com.my.hadoop.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * MapReduce中的WordCount
 * @author yao
 *
 */
public class WordCount {

/**
     * MapReduce中的map函数的泛型
     * KEYIN        map函数读取文件行内容的偏移量为key
     * VALUEIN         map函数读取文件行内容
     * KEYOUT        map函数处理后输出到reduce函数的key
     * VALUEOUT        map函数处理后输出到reduce函数的value
     * @author yao
     *
     */
    static class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
        private static final LongWritable ONE = new LongWritable(1l);
        private Text word = new Text();
        public void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
            String[] words = value.toString().split(" ");
            for (String w : words) {
                word.set(w);
                context.write(word, ONE);
            }
        }
    }
    
    /**
     * MapReduce中的reduce函数的泛型
     * KEYIN        reduce函数读取map函数输出的key
     * VALUEIN        reduce函数读取map函数输出的value
     * KEYOUT        reduce函数处理后输出到hdfs上文件的key
     * VALUEOUT        reduce函数处理后输出到hdfs上文件的value
     * @author yao
     *
     */
    static class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
        public void reduce(Text key, Iterable<LongWritable> value, Context context) throws java.io.IOException ,InterruptedException {
            long count = 0;
            for (LongWritable i : value) {
                count += i.get();
            }
            context.write(key, new LongWritable(count));
        }
    }
    
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();                                            //new配置对象,默认读取顺序是default-site.xml<core-site.xml
        
        String[] paths = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (paths.length != 2) {
            System.err.println("Usage: " + WordCount.class.getName() + " <in> <out>");
            System.exit(2);
        }
        
        Job job = Job.getInstance(conf, WordCount.class.getSimpleName());                    //1.x是new Job,2.x为Job.getInstance
        job.setJarByClass(WordCount.class);                                                    //设置main方法所在的类
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));                                //设置当前作业的输入路径(可有多个输入路径)
        job.setMapperClass(WcMap.class);                                                    //指定自定义的map函数
        job.setMapOutputKeyClass(Text.class);                                                //指定自定义map函数的输出到reduce函数的key类型
        job.setMapOutputValueClass(LongWritable.class);                                        //指定自定义map函数的输出到reduce函数的value类型
        
        job.setCombinerClass(WcReduce.class);                                                //在map函数输出到reduce函数进行本地合并以减少网络传输的带宽资源(根据需求使用,并不适用所有业务)
        
        job.setReducerClass(WcReduce.class);                                                //指定自定义的reduce函数
        job.setOutputKeyClass(Text.class);                                                    //指定自定义的reduce函数输出到hdfs的key类型
        job.setOutputValueClass(LongWritable.class);                                        //指定自定义的reduce函数输出到hdfs的value类型
        FileOutputFormat.setOutputPath(job, new Path(args[1]));                                //设置当前作业的输出到hdfs的路径(只有一个输出路径且该路径必须不存在)
        
        int status = job.waitForCompletion(true) ? 0 : 1;                                    //提交作业:true是打印作业进度详情,false则是不打印
        System.exit(status);
    }

}

上一篇:ABC232


下一篇:Windows 8 动手实验系列教程 实验5:进程生命周期管理