文章目录
- 4. Hadoop MapReduce
- 4.1 MapReduce 优点
- 4.2 MapReduce 核心思想
- 4.3 MapReduce 序列化类型
- 4.4 MapReduce 编程规范
- 4.5 MapReduce WordCount
- 4.6 MapReduce 序列化
- 4.7 MapReduce InputFormat
- 4.8 MapReduce 工作流程
- 4.9 MapReduce Shuffle机制
- 4.10 MapReduce Partitioner
- 4.11 MapReduce Sort
- 4.12 MapReduce Combiner
- 4.13 MapReduce OutputFormat
- 4.14 MapReduce MapTask工作机制
- 4.15 MapReduce ReduceTask工作机制
- 4.16 MapReduce ReduceJoin
- 4.17 MapReduce Mapjoin
- 4.18 MapReduce ETL
- 4.19 MapReduce 开发总结
- 4.20 MapReduce 压缩
4. Hadoop MapReduce
分布式计算引擎框架,离线计算,不擅长DAG计算
4.1 MapReduce 优点
- 易于编程,实现框架接口即可
- 良好的扩展性,动态加节点
- 高容错性,任务可以转移
- 适合海量数据计算
4.2 MapReduce 核心思想
一个MapTask 默认处理128M数据
以WordCount 的MapReduce程序为例
- Map阶段 MapTask并行工作
- 读数据,按行处理
- 按空格(或其他切割符)切分单词
- 形成KV键值对(word,1)
- 将所有KV键值对,按照指定的分区,溢写到磁盘
- Reduce阶段 ReduceTask并行工作
- 根据MapTask的分区数,开启对应数量的ReduceTask
- 一个ReduceTask只处理对应分区号的多个MapTask产生的结果
- 最终完成单词统计,并输出到结果文件
4.3 MapReduce 序列化类型
除了String 在Hadoop 类型中是Text外,其余都是在原Java类型后加上Writable后缀
4.4 MapReduce 编程规范
- Mapper阶段
- 继承Mapper父类,重写map()方法
- Mapper输入输出都是KV
- map()方法定义处理逻辑,对每个KV调用一次
- Reduce阶段
- 继承Reduce父类,重写reduce()方法
- Reduce输入输出也是KV
- reduce()方法定义处理逻辑,对每组KV(按K分组)调用一次
- Dirver阶段
启动MapReduce程序的客户端,提交任务到YARN集群
4.5 MapReduce WordCount
执行流程: Mapper : setup() -> run() -> for map() -> cleanup() ->Reducer: setup() ->run -> for reduce() ->cleanup()
- WordCountMapper
package com.ipinyou.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outK = new Text();
private IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
outK.set(word);
context.write(outK,outV);
}
}
}
- WordCountReducer
package com.ipinyou.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
int times = value.get();
count += times;
}
outV.set(count);
context.write(key, outV);
}
}
- WordCountDriver
package com.ipinyou.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.jar class
job.setJarByClass(WordCountDriver.class);
// 3.mapper class reducer class
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.map kv
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.out kv
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.path: input output
FileInputFormat.setInputPaths(job, new Path("E:\\HadoopCode\\InputText"));
FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopCode\\OutputText2"));
// 7.submit
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
打成jar包,在集群上运行
hadoop jar wordcount.jar com.ipinyou.mapreduce.wordcount.WordCountDriver /input /output
4.6 MapReduce 序列化
轻量、紧凑、快速、互操作性
- 自定义序列化类步骤
- 实现Writable接口
- 反序列化需要空参构造函数
- 实现序列化write()和反序列化readFields(),字段顺序必须一致
如果自定义的bean要作为Map的输出Key,则必须实现Compare接口,shuffle需要对key进行排序
此处也可以用字符串,绝大部分数据都可以用字符串拼接得到!
4.7 MapReduce InputFormat
- InputFormat 接口
数据块:物理上的Block
数据切片:逻辑上对数据进行切分,默认切片大小=Block 大小,切片的对象是单个文件
- FileInputFormat 实现InputFormat 接口
- 找到数据输入目录
- 遍历目录的每一个文件
- 遍历每一个文件,获取文件大小,计算切片大小,默认splitSize=blockSize 弹性1.1
获取切片大小的公式:Math.max(minSize,Math.min(maxSize,blockSize))
minsize =1 maxsize =Long.MaxValue
- TextInputFormat 继承FileInputFormat类
按文件处理,按文件进行逐行处理
- CombineTextInputFormat 继承了CombineFileInputFormat类
针对大量小文件,通过设置虚拟文件大小,来进行小文件逻辑上的合并切片
例如 设置为4M 小文件 2M 5M 4M 产生的虚拟文件 2M 2.5M 2.5M 4M 切成两片(2+2.5)、(2.5+4)
代码中设置:
// 改变默认InputFormat 设置为4M
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
4.8 MapReduce 工作流程
- 准备待处理文件
- client submit 前,获取待处理数据的信息
- 提交信息(Job.split、xxx.jar、Job.xml)
- 计算MapTask数量,开启对应数量的MapTask
- 在MapperTask内部进行逻辑处理
- 写出数据到环形缓冲区(默认大小 100M 80%)
- 达到溢写比之后,根据设置的分区数进行分区,并且分区内部数据进行快速排序(meta排序)
- 反向溢写到磁盘文件,文件各分区进行Merge归并排序 ,此处对于如果相同key进行value的累加时,可以Combine 合并相同K,对V进行累加
- 默认等待所有的MapTask任务结束后,根据数据分区数,开启对应ReduceTask数量
- 对应的ReduceTask拉取到对应分区的数据,下载到本地的磁盘
- 合并文件,进行归并排序(原因:分区有序,但整体无序)
- 在ReduceTask内部进行逻辑处理之后,输出到磁盘文件
4.9 MapReduce Shuffle机制
Mapper任务之后,Reducer任务之前的洗牌过程
- mapper任务把数据写入环形缓冲区collector
- 达到溢写比0.8之后,反向写出数据到指定分区partitioner.getPartition(),并进行分区快排
- 溢写到磁盘后,各分区进行归并排序
- reducer任务把分区数据拉取到内存缓冲,不够溢出到磁盘
- 对数据进行归并排序,按相同K进行分组,执行reduce方法
4.10 MapReduce Partitioner
默认Partitioner分区是对K的hashcode值对ReduceTask个数取模得到,若没有设置ReduceTask数量,则使用Partition-1,即 1-1 =0 一个分区
- 自定义Partitioner步骤
- 定义类继承Partitioner类,实现getPartition()方法
- 在驱动类中设置自定义的Partitioner和对应数量的ReduceTask数量
如果ReduceTask > Partitioner 数量 ,则会产生空的分区文件
如果ReduceTask = Partitioner 数量,正好
如果ReduceTask < Partiioner && ReduceTask >1 ,则会抛出IO异常
如果ReduceTask = 1,则设置的分区方法不生效,产生一个分区文件
4.11 MapReduce Sort
Hadoop 程序默认都会按照Key进行排序,为了Reduce阶段提高效率,默认字典序,快排
- 自定义排序步骤
- 实现WritableComparable接口,重写CompareTo()方法
缺点:只能对Key进行排序,需要把待排序的对象放在Mapper的Key的位置,在Reducer再把对象放回到Value的位置,正序 >1 倒序> -1
4.12 MapReduce Combiner
Reducer的一个子类,用于每一个MapTask的输出进行局部汇总,且使用有限制,不能影响最终的业务逻辑,继而言之,只能用于求和,预聚合
- 自定义预聚合步骤
- 继承Reducer类,重写reduce方法
- 在驱动类中设置自定义的预聚合类
实际在驱动类中,直接设置使用Reducer类即可,不用再写一遍
4.13 MapReduce OutputFormat
默认使用TextOutputFormat
- 自定义OutputFormat
- 创建一个类继承FileOutputFormat,重写其方法
- 需要一个RecordWriter实现类,在此类方法中处理业务逻辑,默认不换行
- 在驱动类中指定自定义的OutputFormat
4.14 MapReduce MapTask工作机制
- Read阶段
- Map阶段
- Collect阶段
- 溢写阶段
- Merge阶段
MapTask 并行度:由切片数量决定
4.15 MapReduce ReduceTask工作机制
- Copy阶段
- Sort阶段
- Reduce阶段
ReduceTask 并行度 :
= 0 没有
默认= 1 一个
根据集群性能而定
MapTask和Reduce的工作流程 ,具体更细节的部分可以看源码
4.16 MapReduce ReduceJoin
针对两个不同类型数据的join操作,在reduce端进行join,导致reduce端处理的数据过多,容易导致数据倾斜
4.17 MapReduce Mapjoin
在map端进行join,缓存一张小表,即在mapper的setup方法中将小表数据缓存到集合,并且需要在驱动类中加载缓存,可以避免ReduceJoin的数据倾斜!
- 具体步骤
- Driver类加载缓存数据
- Mapper端在setup()读取缓存数据,加载到内存集合
- Mapper端在map() 处理业务逻辑,即join操作
- 由于不需要Reducer端,设置reduceTask数目为0
4.18 MapReduce ETL
ETL 数据清洗,只需要Mapper端,根据业务,梳理清洗规则,来清洗数据
4.19 MapReduce 开发总结
- InputFormat
- 默认使用TextInputFormat, KV K:偏移量 V:一行内容
- 处理小文件,使用CombineTextInputFormat 小文件合并切片
- Mapper
- 三个核心方法:setup() map() clearup()
- Partitioner 默认HashPartitioner ,按照K.hashcode()%numReduceTask 进行分区,不设置reduceTask数,进0分区;可以自定义分区
- Sort
- 部分排序 每个文件内部有序(框架做的)
- 全排序 一个reduce对全部数据进行全排序
- 二次排序 就是自定义排序,实现WritableCompare借口,实现compareTo方法
- Combine 预聚合,map端处理,只针对求和处理
- Reducer
- 三个核心方法: setup() map() clearup()
- OutputFormat
- 默认使用TextOutputFormat,按行输出到文件
- 自定义OutputFormat
4.20 MapReduce 压缩
减小磁盘IO,但对应也会带来CPU开销,对于IO型job
-
压缩方式考虑的因素
- 压解速率
- 压缩比
- 是否支持切片
-
压缩使用位置
- 输入端
- 数据量 < BlockSize 考虑Snappy、LZO
- 数据量非常大 考虑 LZO、Bzip2
- 中间端
- 考虑Snappy、 LZO
- 输出端
- 如果需要永久保存 考虑Bzip2 、Gzip
- 如果作为下一个MapReduce输入,则考虑输入端因素
Windows本地运行不支持Snappy压缩
- 输入端