MR的单词计数小程序

----------------------------------主程序入口----------------------------------
package com.demo01.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {
    /**
     *主程序入口
     * @param args
     */
    public static void main(String[] args) throws Exception {

        //这里执行完成,返回一个程序退出状态码  0成功
        //这里设置configguration相当于给父类赋值了
        int run = ToolRunner.run(new Configuration(),new JobMain(),args);

        System.exit(run);

    }

    /**
     *
     * run方法很重要,用来组装8个类,用Job组装在一起
     * @param strings
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] strings) throws Exception {
        //1.读取文件解析成value对

        //第一个是configuration配置文件,第二个定义job的名字
        Job job = Job.getInstance(super.getConf(),"XXX");

        //设置程序入口类
        job.setJarByClass(JobMain.class);
        //设置job接收的的数据类型
        job.setInputFormatClass(TextInputFormat.class);

        //设置需要处理的文件

        //hdfs集群下执行
//        FileInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
        //本地测试
        FileInputFormat.addInputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\3、大数据离线第三天\\3、大数据离线第三天\\wordcount\\input"));




        //2.自定义mapper类
        job.setMapperClass(WordCountMapper.class);
        //设置key2和value2的类
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        /**
         * 第三到六步:
         * 分区   相同key的value,放松到一个reduce,key合并,value形成一个集合
         * 排序
         * 规约
         * 分组
         */

        //7.自定义reduce逻辑
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //8.输出文件
        //路径一定要不存在,存在就报错
//        TextOutputFormat.setOutputPath(job,new Path("hdfs://node01/wordcountoutput"));
        //本地测试
        TextOutputFormat.setOutputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\3、大数据离线第三天\\3、大数据离线第三天\\wordcount\\output"));
        //提交任务到集群上
        boolean b = job.waitForCompletion(true);

        return b?0:1;

    }
}

----------------------------------mapper程序----------------------------------

package com.demo01.wordcount;



import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//此处泛型hadoop对java基础类型进行了包装,加快网络传输,    4个参数代表
public class WordCountMapper extends Mapper<LongWritable, Text,Text, LongWritable> {

    //重写map方法:自定义k1 v1转换到k2 v2的方法

    /**
     *
     * @param key  k1
     * @param value  v1
     * @param context   上下文对象,对接我们上面的组件与下面的组件
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //hive,sqoop,flume,hello

        String[] split = value.toString().split(",");
        //遍历k2和v2往下发送
        for (String word : split) {
            Text k2 = new Text(word);
            LongWritable v2 = new LongWritable(1);
            context.write(k2,v2);
        }

    }
}

----------------------------REDUCE程序--------------------------------------

package com.demo01.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//k2,v2,k3,v3
public class WordCountReduce extends Reducer<Text, LongWritable, Text,LongWritable> {

    /**
     *
     * @param key  k2
     * @param values  一个集合,集合类型是v2的类型
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        int num = 0;
        for (LongWritable value : values) {
            //IntWritable这个类没有加方法,通过get()编程编程java类型
             num += value.get();
        }
        context.write(key,new LongWritable(num));
    }
}
上一篇:10学习大数据-切片、MapReduce工作流程、Shuffle、排序


下一篇:学习爬虫的day03 (通过代理去爬去数据)