MapReduce基本用法
一 MapReduce基本定义
1.简介
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)“和"Reduce (归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
2.核心流程
- MapReduce是一个编程模型
- 将大量数据分解为多个小数据,分由多台设备同时处理
- Map(映射):
- 将数据处理为键值对映射(以统计字母数量为例,键为字母,值为+1)
- 通常将处理结果以键的哈希值进行分组(HashCode%n)
- 将分组的数据发送到n台reduce设备上
- Reduce(规约):
- reduce一次处理一组数据(某个键的所有键值对)
- reduce传入数据的格式为该组同一的键为reduce的键,组内所有的值组成一个集合存储在迭代器中
- 在reduce中通过迭代器处理数据,并将结果输出
- main方法中需要声明:
- HDFS配置对象
- 创建工作对象,并起名(不知道什么用)
- mapper类和reducer类
- map类和reduce类的输出类型(如果两者输出类型相同,map的可以省略)
- 参与Reduce的设备数量(可不写,默认1)
- 要处理的文件路径
- 结果输出路径
job.waitForCompletion("true")
3.基本模型演示
例题演示:求每部电影最高的三个评分
- Map:
static class MovieGradeMap extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//切割数据
String s = value.toString();
String[] split = s.split("\\s+");
//获取电影名和评分
String movie = split[1];
int rate = Integer.parseInt(split[3]);
context.write(new Text(movie),new IntWritable(rate));
}
}
- Reduce:
static class MovieGradeReduce extends Reducer<Text,IntWritable,Text,Text> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//创建list集合接收评分
ArrayList<Integer> list = new ArrayList<>();
for (IntWritable value : values) {
list.add(value.get());
}
//进行排序
list.sort((Integer i1,Integer i2)->i2-i1);
//归纳输出结果
String s ="";
if (list.size()>=3)
{
s = "该电影最高的三个评分是:"+list.get(0)+"/"+list.get(1)+"/"+list.get(2);
}else {
for (Integer in : list) {
s+=in+"/";
}
s = "该电影最高的三个评分是:"+s;
}
context.write(key,new Text(s));
}
}
- main方法:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
public static void main(String[] args) throws Exception {
//获取工作对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, MovieGrade.class.getSimpleName());
job.setMapperClass(MovieGradeMap.class);
job.setReducerClass(MovieGradeReduce.class);
//设置map的输出格式(如相同,可省略)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置总的输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,new Path("D://duoyi/xxx/movieInfo.txt"));
FileOutputFormat.setOutputPath(job,new Path("D://duoyi/xxx/moviesresult"));
boolean b = job.waitForCompletion(true);
}
二 MapReduce及其组件
1.独特的Iterable
以下内容为现阶段个人理解,可能存在错误
- MapReduce中reduce里面传入的Iterable(reduce-Iterable)和java中的迭代器原理存在细微不同
- 当迭代对象类型数据时,reduce-Iterable中的next方法,获取的对象不是新建的
- 新建该Object对象是在Iterable类中完成的
- next方法中只是改变了object对象的参数,实际指向的一直是堆内存中的同一空间
- java中的迭代器与之不同
- 其next方法中可以看做每次都新建了一个Object对象供其使用
2.setup方法
-
Mapper和Reducer中,均包含一个setup方法
-
该方法都是在map和reduce方法执行前,执行一次
-
通常用来获取切块文件的文件名
//使用setup获取文件名 @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit fs = (FileSplit)context.getInputSplit(); fileName = fs.getPath().getName(); }
3.run方法
- 在Mapper和Reducer中,都有一个run方法用于主体运行
//以Reducer中的run方法为例
public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
while(context.nextKey()) {
this.reduce(context.getCurrentKey(), context.getValues(), context);
Iterator<VALUEIN> iter = context.getValues().iterator();
if (iter instanceof ValueIterator) {
((ValueIterator)iter).resetBackupStore();
}
}
} finally {
this.cleanup(context);
}
}
- 循环运行,判断是否有下一个key,有则执行一遍reduce(map)
-
cleanup
方法:- 在
maptask
和reducetask
都有,在map任务
和reduce任务
执行完毕后,运行一次 - 可重写,可用来进行结果统计
- 在
4.POJO对象
-
POJO是
Plain Ordinary Java Objects
的缩写,是简单的Java对象。POJO实际就是普通JavaBeans; -
BeanUtils.copyProperties(新,旧);
用于对象的复制的方法 -
在
MapReduce
中使用自己编写的类时,要保证可以序列化 -
实现
Writable
接口,重写其中的两个方法:@Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(uid); dataOutput.writeUTF(name); dataOutput.writeInt(age); dataOutput.writeUTF(gender); dataOutput.writeUTF(friend); dataOutput.writeUTF(oid); dataOutput.writeUTF(dataType); } @Override public void readFields(DataInput dataInput) throws IOException { this.uid = dataInput.readUTF(); this.name = dataInput.readUTF(); this.age = dataInput.readInt(); this.gender = dataInput.readUTF(); this.friend = dataInput.readUTF(); this.oid = dataInput.readUTF(); this.dataType = dataInput.readUTF(); }
5.MapReduce原理加强
5.1 文件读取
- 关联类:
-
FileInputFormat
主要工作类,其中包含了setInputPaths(设置路径)
、getSplits(获取切片)
等方法 - 继承自
FileInputFormat
的TextInputFormat
等方法,则包含了RecordReader
等实际用于读写的方法
-
- 运行流程:
- 用户通过
FileInputFormat
的setinputpaths
方法,设置源文件路径 -
TextInputFormat
等工作类获取文件切片,并进行循环读取(RecordReader
方法) - 读取的数据以键值对的格式传递给Mapper
- 用户通过
5.2 Mapper
-
关联类:
- Mapper类:主要为run方法和map方法,用来处理数据
-
MapOutputBuffer
类是MapperTask
的成员内部类,包含:- 环形缓冲数组:
kvBuffer
-
collect
方法 - 溢出组件(Spiller),其中包含多个方法
- 合并组件(Merger),对写出数据进行合并和重排
- 环形缓冲数组:
-
运行流程:
- Mapper运行run方法,以
hasNextkey
为判断依据,循环执行重写的map
方法 - 通过
context
方法将处理结果传出,到环形缓冲数组kvBuffer
-
kvBuffer
是类MapOutputBuffer
中的变量 -
MapOutputBuffer
会调用其中的collect
方法,收集map输出的K、V - 并在
collect
方法中,会调用分区器HashPartitioner
的getPartition
方法,通过key,返回一个int
类型的p,用以实现分区的效果
-
- 环形缓冲数组一侧存储
collect
收集的K、V、P ,一侧存储每组kvp的源数据 - 缓冲数组容量为100M,当存到80%的时候,本组数据停止接收,通过快排方法进行区内排序
- 此时kv已经完成了及排序,本组数据分区溢出(Spiller)到磁盘中
- 在本组数据开始排序时,下一组数据会重新获取缓冲数组的赤道,利用剩余的20%空间,开始写入数据(若在本组数据溢出前,将20%的空间用完,开始阻塞,直到完全溢出)
- 当有下一组数据溢出到磁盘时,会调用合并组件(Merger),将两次或多次溢出的数据进行合并,并重新排序(快排)
- 最后,Mapper会提供一个HTTP下载服务,供Reducer拉取属于自己的数据
- Mapper运行run方法,以
5.3 Reducer及输出
- 运行流程:
- 通过抓取器(fetcher),从磁盘抓取属于自己的数据
- 通过合并组件(merger),对数据进行合并排序
- 调用分组器
GroupingComparetor
对数据进行分组- 使用
compareTo
方法进行冒泡比较的方式进行分组
- 使用
- 分组后将每组的键作为key,所有的值放到Iterator(迭代器)中,传到Reducer中
- 通过Reducer的run方法,循环处理数据
- 处理完的数据通过context传递,通过
FileOutputFormat
类写出到磁盘中
三 实践应用
1.求共同好友
package cn.doit.ab.day1118.demo02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
public class Friends {
public static void main(String[] args) throws Exception{
runJob1();
runJob2();
}
static class Friends1Mapper extends Mapper<LongWritable, Text,Text, Text>{
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String s = value.toString();
String[] split = s.split(":");
String own = split[0];
v.set(own);
String[] fs = split[1].split(",");
for (String f : fs) {
k.set(f);
context.write(k,v);
System.out.println(k+"---"+v);
}
}
}
static class Friends1Reducer extends Reducer<Text, Text,Text,Text>{
Text k = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
ArrayList<String> list = new ArrayList<>();
for (Text value : values) {
String s = value.toString();
list.add(s);
}
if (list.size()>1) {
for (int i = 0; i < list.size()-1; i++) {
for (int i1 = i+1 ; i1 < list.size() ; i1++) {
String s = list.get(i)+"和"+list.get(i1)+"的共同好友是:";
k.set(s);
context.write(k,key);
}
}
}
}
}
private static void runJob1() throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Friends1");
job.setMapperClass(Friends1Mapper.class);
job.setReducerClass(Friends1Reducer.class);
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输出文件的编码格式为Sequence
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\input"));
FileOutputFormat.setOutputPath(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\out1"));
boolean b = job.waitForCompletion(true);
}
static class Friends2Mapper extends Mapper<Text, Text,Text, Text>{
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}
static class Friends2Reducer extends Reducer<Text, Text,Text, Text>{
Text v= new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(value+" ");
}
v.set(sb.toString());
context.write(key,v);
}
}
private static void runJob2() throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Friends2");
job.setMapperClass(Friends2Mapper.class);
job.setReducerClass(Friends2Reducer.class);
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入文件的编码格式为Sequence
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\out1"));
FileOutputFormat.setOutputPath(job,new Path("D:\\duoyi\\09_Hadoop\\MR案例\\friends\\outover"));
boolean b = job.waitForCompletion(true);
}
}
2.自定义分区器和分组器
-
POJO
- 由于要对其进行序列化和自定义比较器,直接实现WritableComparable<>
package cn.doit.ab.day1121.demo01; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class MovieBean implements WritableComparable<MovieBean> { private String movie; private double rate; private String timeStamp; private String uid; public MovieBean() { } public MovieBean(String movie, double rate, String timeStamp, String uid) { this.movie = movie; this.rate = rate; this.timeStamp = timeStamp; this.uid = uid; } public String getMovie() { return movie; } public void setMovie(String movie) { this.movie = movie; } public double getRate() { return rate; } public void setRate(double rate) { this.rate = rate; } public String getTimeStamp() { return timeStamp; } public void setTimeStamp(String timeStamp) { this.timeStamp = timeStamp; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } @Override public String toString() { return "movie='" + movie + '\'' + ", rate=" + rate + ", timeStamp='" + timeStamp + '\'' + ", uid='" + uid + '\'' ; } @Override public int compareTo(MovieBean o) { if (o.getMovie().equals(this.movie)) { return Double.compare(o.rate,this.rate); }else { return o.getMovie().compareTo(this.movie); } } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(movie); dataOutput.writeDouble(rate); dataOutput.writeUTF(timeStamp); dataOutput.writeUTF(uid); } @Override public void readFields(DataInput dataInput) throws IOException { this.movie = dataInput.readUTF(); this.rate = dataInput.readDouble(); this.timeStamp = dataInput.readUTF(); this.uid = dataInput.readUTF(); } }