一、MapReduce思想
MapReduce的思想核心是分而治之
,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
Map负责“分
”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce(规约)负责“合
”,即对map阶段的结果进行全局汇总。
这两个阶段合起来正是MapReduce思想的体现。
还有一个比较形象的语言解释MapReduce:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
二、Hadoop MapReduce设计构思
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
既然是做计算的框架,那么表现形式就是有个输入(input),MapReduce操作这个输入(input),通过本身定义好的计算模型,得到一个输出(output)。
如何对付大数据处理:分而治之
构建抽象模型:Map和Reduce
MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。
Map: 对一组数据元素进行某种重复式的处理;
Reduce: 对Map的中间结果进行某种进一步的结果整理。
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
map: (k1; v1) → [(k2; v2)]
reduce: (k2; [v2]) → [(k3; v3)]
三、MapReduce编程规范及示例编写
1 编程规范
mapReduce编程模型的总结:
MapReduce的开发一共有八个步骤其中map阶段分为2个步骤,shuffle阶段4个步骤,reduce阶段分为2个步骤
2 Map阶段2个步骤
第一步:设置inputFormat类,将我们的数据切分成key,value对,输入到第二步
第二步:自定义map逻辑,处理我们第一步的输入数据,然后转换成新的key,value对进行输出
3 shuffle阶段4个步骤
第三步:对输出的key,value对进行分区
第四步:对不同分区的数据按照相同的key进行排序
第五步:对分组后的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)
第六步:对排序后的额数据进行分组,分组的过程中,将相同key的value放到一个集合当中
4 reduce阶段2个步骤
第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出
第八步:设置outputformat将输出的key,value对数据进行保存到文件中
四、WordCount实例
1 准备数据并上传
cd /opt/servers
vim wordcount.txt
hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop
hdfs dfs -mkdir /wordcount/input
hdfs dfs -put wordcount.txt /wordcount/input
2 测试官方案例
说明:该官方案例默认不是安装逗号分隔,数据结果可能不是很理想
hadoop jar /opt/servers/hadoop-2.7.7/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar wordcount /wordcount/input /wordcount/output
3 定义一个mapper类
package com.zhanlijuan.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* KEYIN:行的偏移量 k1 ;
* VALUEIN:行的值 v1;
* KEYOUT:map输出的key (hello,1) k1 ;
* VALUEOUT:map输出的value v2
* 注意:数据是要在网络上传输,需要进行数据的序列化
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* description: 每一行调用一次map方法
*
* @param key: k1,行的偏移量
* @param value: v1 行的值
* @param context: 数据处理的上下文,主要用于数据的输出
* return: void
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//对于每一行数据进行处理,最后形成(hello,1)格式的数据输出
//hello,world,hadoop --->hello,1
String[] strs = value.toString().split(",");
for (String str : strs) {
//将数据输出给下游
context.write(new Text(str), new LongWritable(1));
}
}
}
4 定义一个reducer类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* KEYIN:k2 (hello,1);
* VALUEIN:v2;
* KEYOUT:k3,(hello,3) ;
* VALUEOUT:v3
* 注意:数据是要在网络上传输,需要进行数据的序列化
*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text,LongWritable> {
/**
* description: 相同的key,value聚合到一起,进行数据的处理
* @param key: 同一个key
* @param values: 聚合的value (hello,(1,1,1,1))
* @param context:
* return: void
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count=0;
//获取每一个单词出现的次数迭代累加
for(LongWritable value:values){
count+=value.get();
}
//将数据输出
context.write(key,new LongWritable(count));
}
}
5 定义一个主类,并提交job
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* author :zhanlijuan
* date :Created in 2021/11/23 9:11
* description:
*/
public class WordCountMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//定义一个job对象,用于提交任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCountMain");
//如果在服务器上运行,一定要加上,指定程序的main函数
job.setJarByClass(WordCountMain.class);
//1.定义TextInputFormat
job.setInputFormatClass(TextInputFormat.class);
//指明读取的数据,形成k1,v1
TextInputFormat.addInputPath(job, new Path("hdfs://hadoop01:8020/wordcount/input"));
//2.设定mapper类,指定k2,v2的输出类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//第三步,第四步,第五步,第六步,省略
//7.指定reduce类,设置我们reduce阶段完成之后的输出类型,指定k2,v3的输出类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//8.指定数据输出类型和输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:8020/wordcount/output2"));
//提交任务
job.waitForCompletion(true);
}
}
五、hadoop中分片
split,默认情况下一个块对应一个片。
400m ,128 128 44 128 1个map 128 1个map 44 1个map
130m , 128 2 130 1个map