MapReduce学习+案例

环境搭建

hadoop2.7.7,可用:http://www.4k8k.xyz/article/weixin_42278880/102599472

WordCount

业务逻辑:

MapTask 阶段处理每个数据分块的单词统计分析,思路是将每一行文本拆分成一个个的单词,每遇到一个单词则把其转换成一个 key-value 对,比如单词 Car,就转换成<’Car’,1>发送给 ReduceTask 去汇总。
ReduceTask 阶段将接收 MapTask 的结果,按照 key 对 value 做汇总计数。

Map端

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	/**
	 * Map阶段的业务逻辑需写在自定义的map()方法中
	 * MapTask会对每一行输入数据调用一次我们自定义的map()方法
	 */
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		//(1)将MapTask传给我们的一行文本内容先转换成String
		String line = value.toString();  
		//(2)根据空格将这一行切分成单词
		String[] words = line.split(" ");
		//(3)将单词输出为<单词,1>
		for (String word : words) {
			//将单词作为key,将次数1作为value,以便后续的数据分发,可以根据单词分发,将相同单词分发到同一个ReduceTask中
			context.write(new Text(word), new IntWritable(1));
		}
	}
}

Reduce端

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

	/**
	 * 首先,和前面一样,Reducer类也有输入和输出,输入就是Map阶段的处理结果,输出就是Reduce最后的输出即KEYIN,VALUEIN 对应 Mapper 输出的 KEYOUT,VALUEOUT
	 * KEYOUT,VALUEOUT是自定义Reduce逻辑处理结果的输出数据类型(KEYOUT:单词 VALUEOUT:总次数)
	 * 
	 * ReduceTask在调我们写的reduce方法,ReduceTask应该收到了前一阶段(Map阶段)中所有MapTask输出的数据中的一部分
	 * (数据的key.hashcode%ReduceTask数==本ReduceTask号),所以ReduceTask的输入类型必须和MapTask的输出类型一样
	 * 
	 * ReduceTask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的: 先将自己收到的所有的kv对按照k分组(根据k是否相同)
	 * 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values
	 *
	 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	/**
	 * <Deer,1><Deer,1><Deer,1><Deer,1><Deer,1>
	 * <Car,1><Car,1><Car,1><Car,1>
	 * 框架在Map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组<key,values{}>,调用一次reduce()方法
	 * <Deer,{1,1,1,1,1,1.....}>
	 * 入参key,是一组相同单词kv对的key
	 */
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		//做每个单词的结果汇总
		int sum = 0;
		for (IntWritable v : values) {
			sum += v.get();
		}
		//写出最后的结果
		context.write(key, new IntWritable(sum));
	}
}

Reduce端

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
	/**
	 * 该MR程序运行的入口,相当于yarn集群(分配运算资源)的客户端,需要在此封装MR程序的相关运行参数,指定jar包,最后提交给yarn
	 * 
	 * 其中用一个Job类对象来管理程序运行时所需要的很多参数: 比如,指定哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类;
	 * 指定wordcount job程序的jar包所在路径...以及其他各种需要的参数。
	 */
	public static void main(String[] args) throws Exception {
		// (1)创建配置文件对象
		Configuration conf = new Configuration();

		// (2)新建一个 job 任务
		Job job = Job.getInstance(conf);

		// (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写)
		job.setJarByClass(WordCount.class);

		// (4)指定 mapper 类和 reducer 类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		// (5)指定 MapTask 的输出key-value类型(可以省略)
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// (6)指定 ReduceTask 的输出key-value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		// (7)指定该 mapreduce 程序数据的输入和输出路径
		Path inPath=new Path("/wordcount/input");
		Path outpath=new Path("/wordcount/output");
                // 获取 fs 对象
		FileSystem fs=FileSystem.get(conf);
		if(fs.exists(outpath)){
			fs.delete(outpath,true);
		}
		FileInputFormat.setInputPaths(job,inPath);
		FileOutputFormat.setOutputPath(job, outpath);

		// (8)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion ? 0 : 1);
	}
}
上一篇:Java架构师的升级之路,腾讯T3手把手教你


下一篇:10.Mapreduce实例——MapReduce自定义输入格式小