MapReduce原理深入理解

1、MapReduce概述及原理

MapReduce是一种分布式计算模型

 MapReduce是分布式运行的,由两个阶段组成:Map和Reduce,Map阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据。Reduce阶段是一个独立的程序,有很多个节点同时运行,每个节点处理一部分数据【在这先把reduce理解为一个单独的聚合程序即可】。 MapReduce框架都有默认实现,用户只需要覆盖map()和reduce()两个函数,即可实现分布式计算,非常简单。 这两个函数的形参和返回值都是<key、value>,使用的时候一定要注意构造<k,v>。

2、MapReduce在yarn上的执行流程

MapReduce原理深入理解

MapReduce原理深入理解

3、reduce join

MapReduce原理深入理解


/**
 * map读取两个文件,并对两个文件做拼接
 * map     1.判断数据来源
 *         2.根据数据来源打标记
 * reduce  3.循环获取数据
 *         4.判断数据标记,获取的是什么数
 *         5.根据不同的数据,做不同的处理
 */

public class MR07Join {
    public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //通过InputSplit可获取block块地址
            //通过context获取切片(student.txt,score.txt)
            InputSplit inputSplit = context.getInputSplit();
            FileSplit fileSplit =(FileSplit) inputSplit;
            //获取路径,有两个(stu,sco)
            String path = fileSplit.getPath().toString();
            //判断路径
            if(path.contains("student")) {
                String id = value.toString().split(",")[0];
                String stu = "$"+value.toString();
                context.write(new Text(id),new Text(stu));
            }else{
                String id = value.toString().split(",")[0];
                String sco="#"+value.toString();
                context.write(new Text(id),new Text(sco));
            }

        }
    }
    //id:{$学生信息,#成绩,#成绩,#成绩}
    public static  class JoinReducer extends Reducer<Text,Text,Text,NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String stu =null;
            ArrayList<Integer> scos = new ArrayList<Integer>();
            for (Text value : values) {
                String s = value.toString();
                if (s.startsWith("$")){//是学生信息
                    stu=s;
                }else{//是成绩
                    int scoce = Integer.parseInt(s.split(",")[2]);
                    scos.add(scoce);
                }
            }
            //先求和,再拼接
            long sum=0;
            for (Integer sco : scos) {
                sum+=sco;
            }
            stu=stu+","+sum;
            context.write(new Text(stu),NullWritable.get());
        }
    }

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJobName("join");
        job.setJarByClass(MR07Join.class);

        job.setMapperClass(JoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(JoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        Path input1 = new Path("/student.txt");
        FileInputFormat.addInputPath(job,input1);
        Path input2 = new Path("/score.txt");
        FileInputFormat.addInputPath(job,input2);
        Path output = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);

        job.waitForCompletion(true);
        System.out.println("join");
    }
}

4、map join

在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

​

public class MR08MapJoin {
    public static class MapJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
        //使用HashMap存储小表student中的数据
        HashMap<String ,String > stus =new HashMap<String,String>();

        @Override
        //setup(在map task之前执行,每次都会执行,读小表)
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSystem fs = FileSystem.get(new Configuration());
            Path path = new Path("/student.txt");
            FSDataInputStream open = fs.open(path);
            BufferedReader br = new BufferedReader(new InputStreamReader(open));
            String line;
            if((line=br.readLine())!=null){
                stus.put(line.split(",")[0],line);
            }
            super.setup(context);
        }

        @Override
        //map(每一条数据都执行一次)
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            String id = line.split(",")[0];
            //join stu + score
            String stu = stus.get(id);
            String str =stu+","+split[2];
            context.write(new Text(str),NullWritable.get());
        }
    }

    public static void main(String[] args) throws Exception{
        Job job = Job.getInstance();
        job.setJobName("MapJoin");
        job.setJarByClass(MR08MapJoin.class);
        //reduce如果没有 必须通过参数设置为0
        //mapreduce中有一个默认的reduce代码,并且reduce task默认为1
        job.setNumReduceTasks(0);

        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        Path input = new Path("/score.txt");
        Path output = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileInputFormat.addInputPath(job,input);
        FileOutputFormat.setOutputPath(job,output);

        job.waitForCompletion(true);
    }
}

​

5、MapReduce的shuffle原理

1)block进行split切分

2)一个split切片对应一个maptask

3)每个map有一个环形内存缓冲区,用于存储map的输出。默认大小100MB,一旦达到阀值80%,数据写入到磁盘(数据落地)

4)写磁盘前,要partition,sort(分区排序)

5)等最后记录写完,合并全部文件为一个分区且排序的文件。

6)合并保存到磁盘(数据落地)

7)得到完整文件,通过reduce发送到hdfs

注意:shuffle过程中有两次数据落地

MapReduce原理深入理解

 

6、Combiner预聚合

combiner发生在map端的reduce操作。 作用是减少map端的输出,减少shuffle过程中网络传输的数据量,提高作业的执行效率。

combiner仅仅是单个map task的reduce,没有对全部map的输出做reduce。 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以,Combine适合于等幂操作,比如累加,最大值等。求平均数不适合


/**
 * combine预聚合
 * map之前reduce之后
 */


public class MR05Gender {
    public static class GenderMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            if("男".equals(split[3])){
                context.write(new Text("男"),new LongWritable(1));
            }
        }
    }
    public static  class CombineReducer extends Reducer <Text,LongWritable,Text,LongWritable>{
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long count=0;
            for (LongWritable value : values) {
                count+=value.get();
            }
            context.write(key,new LongWritable(count));
        }
    }
    public static class GenderReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long count=0;
            for (LongWritable value : values) {
                count+=value.get();
            }
            context.write(key,new LongWritable(count));
        }
    }

    public static void main(String[] args) throws Exception{
        Job job = Job.getInstance();
        job.setJobName("combine预聚合");
        job.setJarByClass(MR05Gender.class);

        job.setMapperClass(GenderMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setCombinerClass(CombineReducer.class);

        job.setReducerClass(GenderReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        Path input = new Path("/student.txt");
        FileInputFormat.addInputPath(job,input);
        Path output = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);

        job.waitForCompletion(true);
        System.out.println("combine预聚合");
    }
}

7、MapReduce优化总结

1)通过修改map的切片大小控制map数据量(尽量和block大小保持一致) 

2)合并小文件。因为一个文件会至少生成一个map

3)避免数据倾斜,即key的分布不均(可手动给key加标记,时间戳)

4)combine操作(map端的预聚合)

5)mapjoin操作(map小表广播)

6)适当备份,因为备份多可以本地化生成map任务

上一篇:Mybatis学习笔记(四)


下一篇:06-MapReduce介绍