一、概述
1.1MapReduce定义
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。
MapReduce的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
1.2MapReduce优缺点
优点
1.MapReduce易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。
2.良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3.高容错性
MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参加,而完全是由Hadoop内部完成。
4.适合PB级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
缺点
1.不擅长实时计算
MapReduce无法像MySql一样,在毫秒级内返回结果。
2.不擅长流式计算
流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
3.不擅长DAG计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
1.3 MapReduce的核心思想
- 分布式的运算程序往往需要分成至少2个阶段。
- 第一个阶段的MapTask并发实例,完全并行运行,互不相干。
- 第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一阶段的所有MapTask并发实例的输出。
- MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
1.4 MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:
- MrAppMaster:负责整个程序的过程调度及状态协调。
- MapTask:负责Map阶段的整个数据处理流程。
- ReduceTask:负责Reduce阶段的整个数据处理流程。
1.5 MapReduce编程
常用数据序列化类型
Java类型 |
Hadoop Writable类型 |
boolean |
BooleanWritable |
byte |
ByteWritable |
int |
IntWritable |
float |
FloatWritable |
long |
LongWritable |
double |
DoubleWritable |
String |
Text |
map |
MapWritable |
array |
ArrayWritable |
用户编写的程序分成三个部分:Mapper、Reducer和Driver。
1.Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(MapTask进程)对每个调用一次
2.Reducer阶段
(1)用户自定义的Reduce要继承自己的父类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同的k的组调用一次reduce()方法
3.Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。
二.WordCount案例实操
需求:在给定的文本文件中统计输出每一个单词出现的总次数
需求分析:按照MapReduce编程规范,分别编写Mapper、Reducer、Driver,如图
实操步骤:
创建maven工程
pom文件
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.1</version>
</dependency>
1.Map类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author JLoong
* @date 2022/1/6 21:06
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
//1.获取一行
String line = value.toString();
//2.切割
String[] words =line.split(" ");
//3.输出
for (String word:words) {
k.set(word);
context.write(k,v);
}
}
}
2.Reduce类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author JLoong
* @date 2022/1/6 21:14
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1.累加求和
sum=0;
for (IntWritable count:values){
sum+=count.get();
}
//2.输出
v.set(sum);
context.write(key,v);
}
}
3.Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author JLoong
* @date 2022/1/7 15:26
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1.获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//2.设置jar加载路径
job.setJarByClass(WordCountDriver.class);
//3.设置map和reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
设置程序参数
运行结果
4.打成jar包放在集群上运行
问题:运行超级慢,在Running job:卡住
稍微改了一下yarn-site.xml的参数如下:新增这几个,速度极大提升,说明还是分配的内存和cpu不够才导致慢的。
三.Hadoop序列化
3.1概述
什么是序列化?
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
为什么要序列化?
一般来说,“活的”对象只生存在内存里,关机断电后就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
为什么不用Java的序列化?
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
Hadoop序列化特点:
(1)紧凑:高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)可扩展:随着通信协议的升级而升级。
(4)互操作:支持多语言的交互。
3.2自定义bean对象实现序列化接口(Writable)
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
实现bean对象序列化步骤:
- 必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
- 重写序列化方法
- 重写反序列化方法
- 注意反序列化的顺序和序列化顺序完全一致
- 想要把结果显示在文件中,需要重写toString(),可用“\t”分开,方便后续用。
- 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的shuffle过程要求对key必须能排序。
3.3序列化案例实操
1.需求:
统计每一个手机号耗费的总上行流量、下行流量、总流量
输入数据格式
7 13560436666 120.196.100.99 1116 954 200
id 手机号码 网络ip 上行流量 下行流量 网络状态码
期望输出数据格式:
13560436666 1116 954 2070
手机号码 上行流量 下行流量 总流量
2.需求分析
3.编写MapReduce程序
3.1 bean对象
/**
* @author JLoong
* @date 2022/1/8 17:34
*/
public class FlowBean implements Writable {
//1 实现Writable接口
private long upFlow;
private long downFlow;
private long sumFlow;
//2 反序列化时,需要反射调用空参构造函数
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//3.写序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
//4.写反序列化方法
//5.顺序要和写序列化方法一致
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//6.toString方法
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void set(long downFlow, long upFlow) {
this.downFlow = downFlow;
this.upFlow = upFlow;
sumFlow = downFlow + upFlow;
}
}
3.2 Mapper类
/**
* @author JLoong
* @date 2022/1/8 18:00
*/
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
FlowBean v = new FlowBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取一行
String line = value.toString();
//2.切割字段
String[] fields = line.split("\t");
//3.封装对象
//取出手机号
String phoneNum = fields[1];
//取出上行流量和下行流量
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
k.set(phoneNum);
v.set(downFlow, upFlow);
//4.写出
context.write(k, v);
}
}
3.3 Reducer类
/**
* @author JLoong
* @date 2022/1/8 18:24
*/
public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sumUpFlow =0;
long sumDownFlow =0;
//1.遍历每个bean,累加流量
for(FlowBean flowBean:values){
sumUpFlow +=flowBean.getUpFlow();
sumDownFlow+=flowBean.getDownFlow();
}
//2.分装对象
FlowBean resultBean = new FlowBean(sumUpFlow,sumDownFlow);
//3.写对象
context.write(key,resultBean);
}
}
3.4 Driver类
/**
* @author JLoong
* @date 2022/1/8 18:30
*/
public class FlowCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
args = new String[]{"d:/work/input", "d:/output"};
//1.获取job实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//2.设置jar加载路径
job.setJarByClass(FlowCountDriver.class);
//3.设置Map类和Reduce类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
//4.设置Map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}