环境搭建
hadoop2.7.7,可用:http://www.4k8k.xyz/article/weixin_42278880/102599472
WordCount
业务逻辑:
MapTask 阶段处理每个数据分块的单词统计分析,思路是将每一行文本拆分成一个个的单词,每遇到一个单词则把其转换成一个 key-value 对,比如单词 Car,就转换成<’Car’,1>发送给 ReduceTask 去汇总。
ReduceTask 阶段将接收 MapTask 的结果,按照 key 对 value 做汇总计数。
Map端
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* Map阶段的业务逻辑需写在自定义的map()方法中
* MapTask会对每一行输入数据调用一次我们自定义的map()方法
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//(1)将MapTask传给我们的一行文本内容先转换成String
String line = value.toString();
//(2)根据空格将这一行切分成单词
String[] words = line.split(" ");
//(3)将单词输出为<单词,1>
for (String word : words) {
//将单词作为key,将次数1作为value,以便后续的数据分发,可以根据单词分发,将相同单词分发到同一个ReduceTask中
context.write(new Text(word), new IntWritable(1));
}
}
}
Reduce端
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 首先,和前面一样,Reducer类也有输入和输出,输入就是Map阶段的处理结果,输出就是Reduce最后的输出即KEYIN,VALUEIN 对应 Mapper 输出的 KEYOUT,VALUEOUT
* KEYOUT,VALUEOUT是自定义Reduce逻辑处理结果的输出数据类型(KEYOUT:单词 VALUEOUT:总次数)
*
* ReduceTask在调我们写的reduce方法,ReduceTask应该收到了前一阶段(Map阶段)中所有MapTask输出的数据中的一部分
* (数据的key.hashcode%ReduceTask数==本ReduceTask号),所以ReduceTask的输入类型必须和MapTask的输出类型一样
*
* ReduceTask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的: 先将自己收到的所有的kv对按照k分组(根据k是否相同)
* 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values
*
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* <Deer,1><Deer,1><Deer,1><Deer,1><Deer,1>
* <Car,1><Car,1><Car,1><Car,1>
* 框架在Map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组<key,values{}>,调用一次reduce()方法
* <Deer,{1,1,1,1,1,1.....}>
* 入参key,是一组相同单词kv对的key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//做每个单词的结果汇总
int sum = 0;
for (IntWritable v : values) {
sum += v.get();
}
//写出最后的结果
context.write(key, new IntWritable(sum));
}
}
Reduce端
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
/**
* 该MR程序运行的入口,相当于yarn集群(分配运算资源)的客户端,需要在此封装MR程序的相关运行参数,指定jar包,最后提交给yarn
*
* 其中用一个Job类对象来管理程序运行时所需要的很多参数: 比如,指定哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类;
* 指定wordcount job程序的jar包所在路径...以及其他各种需要的参数。
*/
public static void main(String[] args) throws Exception {
// (1)创建配置文件对象
Configuration conf = new Configuration();
// (2)新建一个 job 任务
Job job = Job.getInstance(conf);
// (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写)
job.setJarByClass(WordCount.class);
// (4)指定 mapper 类和 reducer 类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// (5)指定 MapTask 的输出key-value类型(可以省略)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// (6)指定 ReduceTask 的输出key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// (7)指定该 mapreduce 程序数据的输入和输出路径
Path inPath=new Path("/wordcount/input");
Path outpath=new Path("/wordcount/output");
// 获取 fs 对象
FileSystem fs=FileSystem.get(conf);
if(fs.exists(outpath)){
fs.delete(outpath,true);
}
FileInputFormat.setInputPaths(job,inPath);
FileOutputFormat.setOutputPath(job, outpath);
// (8)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}
}