Hadoop学习记录(4)|MapReduce原理|API操作使用

MapReduce概念

MapReduce是一种分布式计算模型,由谷歌提出,主要用于搜索领域,解决海量数据计算问题。

MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数实现分布式计算。

这两个函数的形参是key,value对,表示函数的输入信息。

MP执行流程

客户端提交给jobtracker,jobtracker分配给tasktracker。

trasktracker会对任务进行mapper和reducer操作。

MapReduce原理

一个map输入k1、v1,数据由输入文件中获取

map会把数据提交到每一个shuffle,最后输出到reducer任务。

reducer任务的数量跟mapper发送到shuffle的数量是一致的。

map任务处理

1.1、读取输入文件内容,解析成key,value对。对输入文件的每一个解析成key\value对。每个键值对调用一次map函数。

1.2、写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

1.3、对输出的key、value进行分区

1.4、对不同分区的数据按照key进行排序、分组。相同key的value放到一个集合中。

1.5、(可选)分组后对数据进行规约

reduce任务处理

2.1、对多个map任务的输出按照不同的分区通过网络拷贝到reduce节点。

2.2、对多个map任务输出进行合并、排序。写reduce函数自己的逻辑,对输入key、value处理,转换成新的key、value输出。

2.3、把输出的结果保存到HDFS中。

MapReduce执行过程

1.1.读取hdfs中文件,每个解析成<k,v>。每一个键值对调用一次map函数。

解析成两个<k,v>,分别<0,hello you><10,hello me>。调用map函数两次。

k是每行的开始位置,v则示每行的文本内容。

1.2.覆盖map()函数,接收1.1产生的<k,v>,进行处理,转换新的<k,v>输出。

1.3.对1.2输出的<k,v>进行分区

public void map(k,v,context){

String[] split = v.toString().split(“ ”);

for(String str : split){

context.write(str,1);

}

}

1.4.对不同分区中的数据进行排序(按照k)、分组,分别将key的value放到一个集合中。

map输出后的数据是:<hello,1]>,<you,1>,<hello,1>,<me,1>

排序后: <hello,1]>, <hello,1>,<you,1>,<me,1>

分组后:<hello,{1,1}>,<you,{1}>,<me,{1}>

1.5.(可选)对分组后的数据进行规约。

2.1.多个map任务的输出按照不同的分区,通过网络copy到不同的reduce节点上。

2.2.对多个map的输出进行合并,排序,覆盖reduce函数,接收的是分组的数据,实现自己的业务逻辑,处理后产生新的<k,v>输出。

reduce函数被调用3次,跟分组次数一致。

public void reduce(k,vs,context){

long sum =0L;

for(long num:vs){

sum +=num;

}

context.write(k,sum);

}

2.3.设置任务执行,对reduce输出的<k,v>保存到hdfs中。

job.waitForCompletion(true);

 

Hadoop学习记录(4)|MapReduce原理|API操作使用

整个流程我分了四步。简单些可以这样说,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

MapReduce提交的源代码分析

waitForCompletion函数中的submit方法连接和提交到jobtracker。

在eclipse中写的代码如何提交到JobTracker中的哪?

答(1)eclipse中调用的job.waitForCompletion(true),实际调用的是JobClient中的提交方法。

contect()

info = jobClient.submitJobInternal(conf)

(2)在contect()中,实际创建了一个JobClient对象,在调用该对象的构造方法时,获得了JobTracker的客户端代理对象JobSubmissionProtocol

jobSubmissionProtocol实现类是JobTracker

(3)在jobClient.submitJobInternal(conf)方法中,调用了jobSubmissionProtocol.submitJob()

即,执行的是JobTracker.submitJob(..)

Hadoop基本类型

Hadoop数据类型必须实现Writable接口。

Long LongWritable

Boolean BooleanWritable

String Text

Integer IntWritable

Java类型转换为Hadoop基本类型:

直接调用hadoop类的构造方法,或者调用set()方法

new IntWritable(123)

Hadoop类型转换成Java类型:

text需要调用toString方法

其他类型调用get()方法

使用Hadoop自定义类型处理手机上网流量

1、自定义类

class KpiWritable implements Writable{

long upPackNum;

long downPackNum;

long upPayLoad;

long downPayLoad;

public KpiWritable() {}

public KpiWritable(String upPackNum,String downPackNum,String upPayLoad,String downPayLoad){

this.upPackNum = Long.parseLong(upPackNum);

this.downPackNum = Long.parseLong(downPackNum);

this.upPayLoad = Long.parseLong(upPayLoad);

this.downPayLoad = Long.parseLong(downPayLoad);

}

@Override

public void write(DataOutput out) throws IOException {

//序列化出去

out.writeLong(upPackNum);

out.writeLong(downPackNum);

out.writeLong(upPayLoad);

out.writeLong(downPayLoad);

}

@Override

public void readFields(DataInput in) throws IOException {

//顺序和写出去一样

this.upPackNum = in.readLong();

this.downPackNum = in.readLong();

this.upPayLoad = in.readLong();

this.downPayLoad = in.readLong();

}

@Override

public String toString() {

return upPackNum+"\t"+downPackNum+"\t"+upPayLoad+"\t"+downPayLoad;

}

2、自定义Map

static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{

protected void map(LongWritable k1, Text v1,

org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context)

throws IOException, InterruptedException {

//处理接收的数据

String[] splits = v1.toString().split("\t");

//获取手机号

String msisdn = splits[1];

Text k2 = new Text(msisdn);

KpiWritable v2 = new KpiWritable(splits[6],splits[7],splits[8],splits[9]);

//写入context中交过reduce执行

context.write(k2, v2);

}

}

3、自定义Reduce

static class MyReduce extends Reducer<Text, KpiWritable, Text, KpiWritable>{

protected void reduce(Text k2, Iterable<KpiWritable> v2s,org.apache.hadoop.mapreduce.Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)

throws IOException, InterruptedException {

/**

* k2 表示不同的手机号

* v2s 表示该手机号不同时段流量集合

*/

//定义计数器

long upPackNum = 0L;

long downPackNum = 0L;

long upPayLoad = 0L;

long downPayLoad = 0L;

//遍历合并数据

for(KpiWritable kpi : v2s){

upPackNum += kpi.upPackNum;

downPackNum += kpi.downPackNum;

upPayLoad += kpi.upPayLoad;

downPayLoad += kpi.downPayLoad;

}

//封装到对象中

KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+"");

//写入context中

context.write(k2, v3);

}

}

4、写驱动程序

static final String INPUT_PATH = "hdfs://h1:9000/wlan";

static final String OUT_PATH = "hdfs://h1:9000/wlan_out";

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

FileSystem fileSystem = FileSystem.get(new URI("hdfs://h1:9000/"), conf);

Path outPut = new Path(OUT_PATH);

if(fileSystem.exists(outPut)){

fileSystem.delete(outPut,true);

}

/**

* 1.1、指定输入文件路径

* 1.1.1. 指定那个类来格式化输入文

* 1.2、指定自定义的Mapper

* 1.2.1.指定输出<k2,v2>的类型

* 1.3、指定分区

* 1.4、排序分区(TODO)

* 1.5、(可选)合并

* 2.1、多个map任务的输出,通过网络copy到不同的reduce节点上,这个操作由hadoop自动完成

* 2.2、指定定义的reduce类

* 2.2.1.指定输出<k3,v3>类型

* 2.3、指定输出位置

* 2.3.1、设置输出文件的格式化类

*

* 最后吧代码提交到JobTracker执行

*/

//创建Job任务

Job job = new Job(conf,KpiApp.class.getSimpleName());

//设置输入路径

FileInputFormat.setInputPaths(job, INPUT_PATH);

//设置输入数据使用的格式化类

job.setInputFormatClass(TextInputFormat.class);

//设置自定义Map类

job.setMapperClass(MyMapper.class);

//设置Map类输出的key和value值类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(KpiWritable.class);

//设置分区类

job.setPartitionerClass(HashPartitioner.class);

//设置任务数

job.setNumReduceTasks(1);

//设置自定义Reduce类

job.setReducerClass(MyReduce.class);

//设置输入数据使用的格式化类

job.setInputFormatClass(TextInputFormat.class);

//设置Reduce输出的key和value值类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(KpiWritable.class);

//设置输出文件位置

FileOutputFormat.setOutputPath(job, outPut);

//设置将任务提交到JobTracker

job.waitForCompletion(true);

}

MapReduce 0.x API区别

hadoop版本0.x

1、包一般是mapred。

2使用是JobConf类创建Job任务。

3、使用JobClient.runJob(jobConf)提交任务。

4、自定义类需要继承MapReduceBase实现Mapper和Reducer接口。

hadoop版本1.x

1、包一般是mapreduce。

2、使用的Job类创建任务。

3、ob.waitForCompletion(true)提交任务。

4、自定义类只需要继承Mapper和Reducer类。

命令行运行指定参数

hadoop jar WordCount.jar hdfs://h1:9000/hello hdfs://h1:9000/cmd_out

跟eclipse直接运行的代码区别:

1、类需要继承org.apache.hadoop.conf.Configured,并实现org.apache.hadoop.util.Tool。

2、以前main方法中写的驱动程序卸载覆写的run方法中。
3、run()方法中

job.setJarByClass(CmdWordCount.class);

4、输入输出字符串定义为全局空字符串

5、main方法中使用org.apache.hadoop.util.ToolRunner的run方法,传入new CmdWordCount()和args。args是main方法接收的字符串数组。

6、在覆写的run方法中把接收到的args数组提取并赋值给INPUT和OUTPUT的路径

INPUT_PATH = agrs0[0];

OUTPUT_PATH = args0[1];

6、打包时一定要记得选择输出类。

Hadoop学习记录(4)|MapReduce原理|API操作使用

Hadoop计数器

File Input Format Counters

Bytes Read=19 读取文件字节数

Map-Reduce Framework

Map output materialized bytes=65

1#Map input records=2 读取记录行

Reduce shuffle bytes=65

Spilled Records=8

Map output bytes=51

Total committed heap usage (bytes)=115675136

5#Combine input records=0 Map合并/规约输入

SPLIT_RAW_BYTES=85

3# Reduce input records=4 reduce输入行

4#Reduce input groups=3 Reduce输入组数

Combine output records=0 Map合并/规约输出

Reduce output records=3 Reduce输出记录

2#Map output records=4 Map输出行

通过计数器数可以检查出Map还是Reduce出现问题。

舆情监督示例,使用自定义计数器监控出现次数。

//自定义计数器

Counter helloCounter = context.getCounter("Sensitive Words", "hello");

String line = v1.toString();

if(line.contains("hello")){

helloCounter.increment(1L);

}

Hadoop学习记录(4)|MapReduce原理|API操作使用

Combine操作

为什么使用Combine?

Combiner发生在Map端。对数据进行规约处理,数据量变小了,传送到reduce的数据量变小,传输时间变短,作业整体时间变短。

为什么Combine不作为MapReduce的标配,而是可选配置?

因为不是所有的算法都适合使用Combine处理,例如求平均数。

适用于求和

Combine本身已经执行了Reduce操作,为什么Reduce阶段还要执行reduce操作?

combine操作发送在map端,处理一个任务所接收的文件中的数据,不能跨map任务;只有reduce可以接收多个map任务处理数据。

设置参数

job.setCombinerClass(MyCombiner.class);

MyCombiner.class可以使用Reduce

Hadoop学习记录(4)|MapReduce原理|API操作使用

Partitioner编程

对输出的key,value进行分区。

指定自定义partition类,自定义类需要继承HashPartitioner类。覆盖getPartition方法。

分区的实例必须打成Jar包。

作用:

1、根据业务需要,产生多个输出文件。

2、多个reduce任务在运行,提高整体job的运行效率。

根据实际情况来使用,如果有5台机器,而分成100个分区来运行或出现延迟和整体效率低问题。因为需要排队运行!

主要代码:

设置成打包运行 job.setJarByClass(KpiApp.class);

设置分区job.setPartitionerClass(MyPartitioner.class);

设置Reduce任务数 job.setNumReduceTask(2);

自定义Partitioner类需要继承HashPartition类,泛型使用K2,V2的类型。覆写getPartition方法。

排序和分组

排序

在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较。如果让v2也进行排序,需要将k2和v2组装成心的类,作为k2,才能参与比较。

新类需要实现WritableCompareble,覆写readFilds、write、compareTo、hasCode、equals方法。

在自定义map程序中将k2,v2封装到新类中,当做k2写入context。

编写驱动main方法时,设置map输出的类型(job.setMapOutputKeyClass(new.class))

分组

按照K2进行比较,这个k2是分装到自定义类的k2。

自定义分组类实现RewComparetor,覆写compare方法。

public int compare(NewK2 o1, NewK2 o2) {

return (int) (o1.k2 - o2.k2);

}

/**

* @param b1 表示第一个参与比较的字节数组

* @param s1 表示第一个参与比较的字节数组的起始位置

* @param l1 表示第一个参与比较的字节数组的偏移量

*

* @param b2 表示第二个参与比较的字节数组

* @param s2 表示第二个参与比较的字节数组的起始位置

* @param l2 表示第二个参与比较的字节数组的偏移量

*

*/

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,

int l2) {

return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);

}

job任务设置分组比较组 job.setGroupComparatorClass(MyGroup.class);

Shuffle

Hadoop学习记录(4)|MapReduce原理|API操作使用

MapReduce的核心,俗称洗牌、打乱。

shuffle在map任务传送到reduce任务之间。

Map端

1、每个map有一个环形内存缓冲区,用于存储任务的输出,默认100MB,如果达到法制80MB后台线程会把内容写到指定磁盘(mapred.local.dir)下的新建的溢出文件。

2、写入磁盘钱,要partition、sort。如果有combiner,combine后写入。

3、最后记录完成,合并全部溢写文件为一个分区且排序的文件。

Reduce端

1、Reduce通过Http方式得到输出文件的分区。

2、TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce开始复制输出。

3、排序阶段合并map输出,最后运行Reduce阶段。

MapReduce常见算法

单词计数

数据去重

排序

TopK

选择

投影

分组

多表连接

单边关联

上一篇:BZOJ 2337 XOR和路径(高斯消元)


下一篇:Hadoop 学习笔记 (十一) MapReduce 求平均成绩