大数据学习教程SD版—第四篇【Hadoop MapReduce】

文章目录

4. Hadoop MapReduce

分布式计算引擎框架,离线计算,不擅长DAG计算

4.1 MapReduce 优点

  1. 易于编程,实现框架接口即可
  2. 良好的扩展性,动态加节点
  3. 高容错性,任务可以转移
  4. 适合海量数据计算

4.2 MapReduce 核心思想

一个MapTask 默认处理128M数据

以WordCount 的MapReduce程序为例

  • Map阶段 MapTask并行工作
  1. 读数据,按行处理
  2. 按空格(或其他切割符)切分单词
  3. 形成KV键值对(word,1)
  4. 将所有KV键值对,按照指定的分区,溢写到磁盘
  • Reduce阶段 ReduceTask并行工作
  1. 根据MapTask的分区数,开启对应数量的ReduceTask
  2. 一个ReduceTask只处理对应分区号的多个MapTask产生的结果
  3. 最终完成单词统计,并输出到结果文件

4.3 MapReduce 序列化类型

除了String 在Hadoop 类型中是Text外,其余都是在原Java类型后加上Writable后缀

4.4 MapReduce 编程规范

  • Mapper阶段
  1. 继承Mapper父类,重写map()方法
  2. Mapper输入输出都是KV
  3. map()方法定义处理逻辑,对每个KV调用一次
  • Reduce阶段
  1. 继承Reduce父类,重写reduce()方法
  2. Reduce输入输出也是KV
  3. reduce()方法定义处理逻辑,对每组KV(按K分组)调用一次
  • Dirver阶段

启动MapReduce程序的客户端,提交任务到YARN集群

4.5 MapReduce WordCount

执行流程: Mapper : setup() -> run() -> for map() -> cleanup() ->Reducer: setup() ->run -> for reduce() ->cleanup()

  • WordCountMapper
package com.ipinyou.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outK = new Text();
    private IntWritable outV = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            outK.set(word);
            context.write(outK,outV);
        }
    }
}
  • WordCountReducer
package com.ipinyou.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            int times = value.get();
            count += times;
        }
        outV.set(count);
        context.write(key, outV);
    }
}
  • WordCountDriver
package com.ipinyou.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2.jar class
        job.setJarByClass(WordCountDriver.class);
        // 3.mapper class reducer class
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4.map kv
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5.out kv
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6.path: input output
        FileInputFormat.setInputPaths(job, new Path("E:\\HadoopCode\\InputText"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopCode\\OutputText2"));
        // 7.submit
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

打成jar包,在集群上运行

 hadoop jar wordcount.jar com.ipinyou.mapreduce.wordcount.WordCountDriver /input /output

4.6 MapReduce 序列化

轻量、紧凑、快速、互操作性

  • 自定义序列化类步骤
  1. 实现Writable接口
  2. 反序列化需要空参构造函数
  3. 实现序列化write()和反序列化readFields(),字段顺序必须一致

如果自定义的bean要作为Map的输出Key,则必须实现Compare接口,shuffle需要对key进行排序

此处也可以用字符串,绝大部分数据都可以用字符串拼接得到!

4.7 MapReduce InputFormat

  • InputFormat 接口

数据块:物理上的Block

数据切片:逻辑上对数据进行切分,默认切片大小=Block 大小,切片的对象是单个文件

  • FileInputFormat 实现InputFormat 接口
  1. 找到数据输入目录
  2. 遍历目录的每一个文件
  3. 遍历每一个文件,获取文件大小,计算切片大小,默认splitSize=blockSize 弹性1.1

获取切片大小的公式:Math.max(minSize,Math.min(maxSize,blockSize))

minsize =1 maxsize =Long.MaxValue

  • TextInputFormat 继承FileInputFormat类

按文件处理,按文件进行逐行处理

  • CombineTextInputFormat 继承了CombineFileInputFormat类

针对大量小文件,通过设置虚拟文件大小,来进行小文件逻辑上的合并切片

例如 设置为4M 小文件 2M 5M 4M 产生的虚拟文件 2M 2.5M 2.5M 4M 切成两片(2+2.5)、(2.5+4)

代码中设置:

        // 改变默认InputFormat 设置为4M
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

4.8 MapReduce 工作流程

  1. 准备待处理文件
  2. client submit 前,获取待处理数据的信息
  3. 提交信息(Job.split、xxx.jar、Job.xml)
  4. 计算MapTask数量,开启对应数量的MapTask
  5. 在MapperTask内部进行逻辑处理
  6. 写出数据到环形缓冲区(默认大小 100M 80%)
  7. 达到溢写比之后,根据设置的分区数进行分区,并且分区内部数据进行快速排序(meta排序)
  8. 反向溢写到磁盘文件,文件各分区进行Merge归并排序 ,此处对于如果相同key进行value的累加时,可以Combine 合并相同K,对V进行累加
  9. 默认等待所有的MapTask任务结束后,根据数据分区数,开启对应ReduceTask数量
  10. 对应的ReduceTask拉取到对应分区的数据,下载到本地的磁盘
  11. 合并文件,进行归并排序(原因:分区有序,但整体无序)
  12. 在ReduceTask内部进行逻辑处理之后,输出到磁盘文件

4.9 MapReduce Shuffle机制

Mapper任务之后,Reducer任务之前的洗牌过程

  1. mapper任务把数据写入环形缓冲区collector
  2. 达到溢写比0.8之后,反向写出数据到指定分区partitioner.getPartition(),并进行分区快排
  3. 溢写到磁盘后,各分区进行归并排序
  4. reducer任务把分区数据拉取到内存缓冲,不够溢出到磁盘
  5. 对数据进行归并排序,按相同K进行分组,执行reduce方法

4.10 MapReduce Partitioner

默认Partitioner分区是对K的hashcode值对ReduceTask个数取模得到,若没有设置ReduceTask数量,则使用Partition-1,即 1-1 =0 一个分区

  • 自定义Partitioner步骤
  1. 定义类继承Partitioner类,实现getPartition()方法
  2. 在驱动类中设置自定义的Partitioner和对应数量的ReduceTask数量

如果ReduceTask > Partitioner 数量 ,则会产生空的分区文件

如果ReduceTask = Partitioner 数量,正好

如果ReduceTask < Partiioner && ReduceTask >1 ,则会抛出IO异常

如果ReduceTask = 1,则设置的分区方法不生效,产生一个分区文件

4.11 MapReduce Sort

Hadoop 程序默认都会按照Key进行排序,为了Reduce阶段提高效率,默认字典序,快排

  • 自定义排序步骤
  1. 实现WritableComparable接口,重写CompareTo()方法

缺点:只能对Key进行排序,需要把待排序的对象放在Mapper的Key的位置,在Reducer再把对象放回到Value的位置,正序 >1 倒序> -1

4.12 MapReduce Combiner

Reducer的一个子类,用于每一个MapTask的输出进行局部汇总,且使用有限制,不能影响最终的业务逻辑,继而言之,只能用于求和,预聚合

  • 自定义预聚合步骤
  1. 继承Reducer类,重写reduce方法
  2. 在驱动类中设置自定义的预聚合类

实际在驱动类中,直接设置使用Reducer类即可,不用再写一遍

4.13 MapReduce OutputFormat

默认使用TextOutputFormat

  • 自定义OutputFormat
  1. 创建一个类继承FileOutputFormat,重写其方法
  2. 需要一个RecordWriter实现类,在此类方法中处理业务逻辑,默认不换行
  3. 在驱动类中指定自定义的OutputFormat

4.14 MapReduce MapTask工作机制

  1. Read阶段
  2. Map阶段
  3. Collect阶段
  4. 溢写阶段
  5. Merge阶段

MapTask 并行度:由切片数量决定

4.15 MapReduce ReduceTask工作机制

  1. Copy阶段
  2. Sort阶段
  3. Reduce阶段

ReduceTask 并行度 :

= 0 没有

默认= 1 一个

根据集群性能而定

MapTask和Reduce的工作流程 ,具体更细节的部分可以看源码

4.16 MapReduce ReduceJoin

针对两个不同类型数据的join操作,在reduce端进行join,导致reduce端处理的数据过多,容易导致数据倾斜

4.17 MapReduce Mapjoin

在map端进行join,缓存一张小表,即在mapper的setup方法中将小表数据缓存到集合,并且需要在驱动类中加载缓存,可以避免ReduceJoin的数据倾斜!

  • 具体步骤
  1. Driver类加载缓存数据
  2. Mapper端在setup()读取缓存数据,加载到内存集合
  3. Mapper端在map() 处理业务逻辑,即join操作
  4. 由于不需要Reducer端,设置reduceTask数目为0

4.18 MapReduce ETL

ETL 数据清洗,只需要Mapper端,根据业务,梳理清洗规则,来清洗数据

4.19 MapReduce 开发总结

  1. InputFormat
  • 默认使用TextInputFormat, KV K:偏移量 V:一行内容
  • 处理小文件,使用CombineTextInputFormat 小文件合并切片
  1. Mapper
  • 三个核心方法:setup() map() clearup()
  • Partitioner 默认HashPartitioner ,按照K.hashcode()%numReduceTask 进行分区,不设置reduceTask数,进0分区;可以自定义分区
  • Sort
    • 部分排序 每个文件内部有序(框架做的)
    • 全排序 一个reduce对全部数据进行全排序
    • 二次排序 就是自定义排序,实现WritableCompare借口,实现compareTo方法
  • Combine 预聚合,map端处理,只针对求和处理
  1. Reducer
  • 三个核心方法: setup() map() clearup()
  1. OutputFormat
  • 默认使用TextOutputFormat,按行输出到文件
  • 自定义OutputFormat

4.20 MapReduce 压缩

减小磁盘IO,但对应也会带来CPU开销,对于IO型job

  • 压缩方式考虑的因素

    1. 压解速率
    2. 压缩比
    3. 是否支持切片
  • 压缩使用位置

    1. 输入端
      • 数据量 < BlockSize 考虑Snappy、LZO
      • 数据量非常大 考虑 LZO、Bzip2
    2. 中间端
      • 考虑Snappy、 LZO
    3. 输出端
      • 如果需要永久保存 考虑Bzip2 、Gzip
      • 如果作为下一个MapReduce输入,则考虑输入端因素

    Windows本地运行不支持Snappy压缩

上一篇:Backup &recovery备份和还原


下一篇:55-02 平衡二叉树