mapreduce是一种计算模型,是google的一篇论文向全世界介绍了MapReduce。MapReduce其实可以可以用多种语言编写Map或Reduce程序,因为hadoop是java写的,所以通常情况下我们都是选择java编程语言。其实mr的编写格式或者说语法要求很简单,其实复杂的是我们要学会利用这个模型,将问题分解计算。
MapReduce计算模型
MapReduce Job
每个mr任务都被初始化成一个job,后续我们在编写自己的第一个mr任务的时候也会感受到。每个job分为Map阶段和reduce阶段,其实绝大部分情况我们还有combiner,这个combiner具体是什么在后续关于wordcount第一个mr任务的时候再介绍,这样比较好理解。Map函数接收一个<k1,v1>形式输入,输出<key,value>然后hadoop会将所有相同的中间key的value值进行合并,格式类似<key,list(values)>作为reduce的输入,大致过程如下:
INPUT =====> K1,V1 ======>(MAP) K2,V2 ======>(REDUCE) K3,V3 =====>OUTPUT
直接动手第一个程序 WordCount
wordcount任务是计算文件中每个单词出现的次数,类似:select count(1) as total,word from words group by word.
package com.wangke.hadoop.example; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException;
import java.util.StringTokenizer; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
} public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable(); public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0 ;
for (IntWritable value : values) {
sum += value.get();
}
result.set(sum);
context.write(key,result);
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
接下来的步骤,maven打成jar包,拷贝到你部署hadoop的机器,执行以下相关命令(我用的是2.8.1)
-
$ bin/hdfs namenode -format
sbin/start-dfs.sh
$ bin/hdfs dfs -mkdir input
$ bin/hdfs dfs -put etc/hadoop/*.xml input- bin/hadoop jar study-1.0-SNAPSHOT.jar com.wangke.hadoop.example.WordCount input output
- bin/hdfs dfs -cat output/* (查看结果)
关于此小程序的一些介绍
- Mapper和Reduce都是直接继承抽象类 Mapper,,Reducer,这两个之所以是抽象类,因为我们自己写的Mapper/Reducer真正实现过程是抽象父类实现的
setup函数:called once at the start of the task
cleanup函数:called once at the end of the task
run函数:run函数的过程其实map或者reduce的执行过程,可以看源代码就知道,这能够帮助我们更好的去理解map或者reduce函数是怎么执行和得到结果的
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context); try {
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
- InputSplite InputFormat OutputFormat
InputSplit是Hadoop中把输入数据传送给每个单独的Map,InputSplite存储的并非数据本身,而是一个分片长度和以及记录数据位置的数组。而生成InputSplit的方式时可以通过Inputformat()来设置,所以InputFormat()方法是用来生成可以提供Map处理的<key,value>的。常见的InputFormat子类:
BileyBorweinPlouffe.BbpInputFormat DBInputFormat ComposableInputFormat FileInputFormat(CombineFileInputFormat,KeyValueTextInputFormat,TextInputFormat)等等
OutputFormat是跟InputFormat相对应的,也就是mr结果的输出格式,我们示例中就是FileOutputFormat.
- Map ,Reduce数据流和控制流以及示例代码中的 job.setCombinerClass(IntSumReducer.class)中的Combiner是什么
这个是WordCount数据流程图,FileInputStream被处理形成两个InputSplit,然后输入到两个Map中(一般计算和存储都是在同一机器上,避免数据传输,也就是移动计算,避免移动存储),Map输出的结果是直接写在磁盘上的,而不是HDFS。而Reduce这时候会读取Map的输出数据,将结果写到HDFS。在Reduce过程中时无法避免数据传输的,这时候你应该会想到为了避免过多的数据传输,我们会将每个Map的结果先局部的做一个WordCount,这个过程就是我们的Combiner了,所以Combiner其实也就是一个reducer过程。
MapReduce的优化与性能调优
MapReduce计算模型的优化主要集中在两个方面:计算性能方面的优化,I/O操作方面的优化。
优化的几个方面:
- 任务调度:任务分配给空闲的机器;尽量将Map任务分配给InputSplit所在的机器,移动计算来减少网络I/O
- 数据预处理与InputSplit的大小:Hadoop擅长处理少量的大数据而不是处理大量的小数据。,在MaxCompute使用时,我们也会发现在执行task前 会有一步合并小文件的步骤。
- Map和Reduce任务的数量:调整相关参数,设置map和reduce的数量
- 上节中提到的Combine函数
- 压缩:对Map和最终的输出结果进行压缩,其中压缩算法是可以配置的
- 自定义comparator:在hadoop中可以自定义数据类型
- 过滤数据以及数据倾斜:通过数据过滤可以降低数据规模。在MaxCompute中我们知道用mapjoin来解决大表和小表关联的性能问题,在hadoop中我们用Bloom Filter来解决类似问题,关于BloomFilter我们在下一小节具体介绍
Bloom Filter:
Bloom Filter是由Howard Bloom提出的二进制向量数据结构。在保存所有集合元素特征的同时,他能在保证高效空间效率和一定出错率的前提下迅速检测一个元素是不是集合中的成员。怎样去利用Bloom Filter去解决大表和小表的内连接呢,也就是怎么实现Maxcompute的mapjoin。
先创建BloomFilter对象,将小表中所有连接列上的值都保存在Bloom Filter中(数据量小,直接加载在内存中),然后开始通过mr执行内连接。在map阶段,读小表的数据直接以连接列值为key,以数据未value的<key,value>;读大表数据时,在输出前先判断当前元祖的连接列值是否在BF中,如果不存在就不需要输出,如果存在 就采用与小表同样的方式输出,对于不在集合内的元素一定是判断正确,这样就可以过滤掉不需要的数据。最后在reduce阶段,针对每个连接列值连接两个表的元组并输出结果。
BF实现原理:他有两个重要接口add() ,membershipTest(),add负责保存集合元素的特征到位数组,membershipTest判断某个值是否是集合中的元素。BF利用k个相互独立的Hash函数将集合中的每个元素迎神到(1,2,..,M)个范围内,这时候就相当于是一个k维特征向量,如果判断一个元素是否存在(也就是该元素与小表中的某个元素相等),其实看他们的向量是否一样,因为hash的特性,A != B,但是对应的向量是一样的,这种情况很少见,所以会有少量错误率,但是性能大大提高。我们如果增加k和M时可以降低错误率的
Map ReduceJob中全局共享数据
- 读写HDFS文件:Map task和Reduce task甚至是不同的Job都可以通过读写HDFS中预定好的同一个文件共享全局变量
- 配置Job属性:在MapReduce执行过程中,task可以读取Job的属性,基于这个特性,大家可以在任务启动之初利用Cofiguration类中的set(name,value)将一些曲剧变量封装进去,然后通过get方法读取。
- 使用distributedCache:这个是MapReduce为应用提供缓存文件的只读工具。在使用时,用户可以在作业配置时使用本地或者HDFS文件的url来将其设置成共享缓存文件。task启动之前,MR框架会将缓存文件复制到执行任务节点的本地。优点是每个Job共享文件只会启动之后复制一次,且适用于大量的共享数据,缺点就是只读。
链接MapReduce Job
在日常数据处理过程中,并不是一个mr就能解决问题的,需要多个mr作业配合完成,其实实际中我们更多的是利用一个mapper多个reducer。
- 线性MapReduce Job流:就是多个Job按照一定顺序,第一个job的输出作为后面一个Job的输入。缺点:顺序结构,且流程不好控制
- 复杂MapReduce Job流:例如 Job3需要Job1 和Job2的结果,其实MR框架提供了ControlledJob和JobControl类控制,具体不介绍了。
本地测试
对于写好一个mrJob ,不能每次都通过打包放到集群测试,这样效率太低,Score_Process类继承与Configure的实现接口Toll,通过run方法可以实现对程序进行测试。
import com.wangke.hadoop.example.WordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class ScoreTest implements Tool{
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean success = job.waitForCompletion(true);
return success ? 0:1;
} public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new ScoreTest(),args);
System.exit(ret);
} public void setConf(Configuration configuration) { } public Configuration getConf() {
return null;
}
}