hadoop系列之MR经典案例分享二

hadoop系列之MR经典案例分享二

 浪1234 浪尖聊大数据 

4、MapReduce的join(hive已经实现)


http://database.51cto.com/art/201410/454277.htm

hadoop系列之MR经典案例分享二

这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据倾斜降到最低,使我们的代码倾向性更好


1)在Reudce端进行连接(最常见)

  • Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

  • Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。


原理非常简单,下面来看一个实例:

自定义一个value返回类型:

hadoop系列之MR经典案例分享二

hadoop系列之MR经典案例分享二

MapReduce主体

hadoop系列之MR经典案例分享二

hadoop系列之MR经典案例分享二

hadoop系列之MR经典案例分享二


hadoop系列之MR经典案例分享二

hadoop系列之MR经典案例分享二



其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

2)Mapduanjoin

使用场景:一张表十分小、一张表很大

  • 先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。

  • 扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。

hadoop系列之MR经典案例分享二

将小表的数据读取出来,存放在内存中

右键 -> Source -> Override/Implement Methods ->setup+cleanup

hadoop系列之MR经典案例分享二


案例


  1. package com.mr.mapSideJoin;


  2. import java.io.BufferedReader;  

  3. import java.io.FileReader;  

  4. import java.io.IOException;  

  5. import java.util.HashMap;  

  6. import org.apache.hadoop.conf.Configuration;  

  7. import org.apache.hadoop.conf.Configured;  

  8. import org.apache.hadoop.filecache.DistributedCache;  

  9. import org.apache.hadoop.fs.Path;  

  10. import org.apache.hadoop.io.Text;  

  11. import org.apache.hadoop.mapreduce.Job;  

  12. import org.apache.hadoop.mapreduce.Mapper;  

  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  

  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

  17. import org.apache.hadoop.util.Tool;  

  18. import org.apache.hadoop.util.ToolRunner;  

  19. import org.slf4j.Logger;  

  20. import org.slf4j.LoggerFactory;


  21. /**  

  22. * @author zengzhaozheng  

  23. *  

  24. * 用途说明:  

  25. * Map side join中的left outer join  

  26. * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段  

  27. * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),  

  28. * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":  

  29. * id     name  orderid  city_code  is_show  

  30. * 0       其他        9999     9999         0  

  31. * 1       长春        1        901          1  

  32. * 2       吉林        2        902          1  

  33. * 3       四平        3        903          1  

  34. * 4       *        4        904          1  

  35. * 5       通化        5        905          1  

  36. * 6       辽源        6        906          1  

  37. * 7       白城        7        907          1  

  38. * 8       白山        8        908          1  

  39. * 9       延吉        9        909          1  

  40. * -------------------------风骚的分割线-------------------------------  

  41. * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)  

  42. * tb_user_profiles.dat文件内容,分隔符为"|":  

  43. * userID   network     flow    cityID  

  44. * 1           2G       123      1  

  45. * 2           3G       333      2  

  46. * 3           3G       555      1  

  47. * 4           2G       777      3  

  48. * 5           3G       666      4  

  49. * -------------------------风骚的分割线-------------------------------  

  50. *  结果:  

  51. *  1   长春  1   901 1   1   2G  123  

  52. *  1   长春  1   901 1   3   3G  555  

  53. *  2   吉林  2   902 1   2   3G  333  

  54. *  3   四平  3   903 1   4   2G  777  

  55. *  4   *  4   904 1   5   3G  666  

  56. */

  57. public class MapSideJoinMain extends Configured implements Tool{  

  58.    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class); 

  59.  

  60.    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {

  61.        private HashMap<String,String> city_info = new HashMap<String, String>();  

  62.        private Text outPutKey = new Text();  

  63.        private Text outPutValue = new Text();  

  64.        private String mapInputStr = null;  

  65.        private String mapInputSpit[] = null;  

  66.        private String city_secondPart = null; 

  67.  

  68.        /**  

  69.         * 此方法在每个task开始之前执行,这里主要用作从DistributedCache  

  70.         * 中取到tb_dim_city文件,并将里边记录取出放到内存中。  

  71.         */

  72.        @Override

  73.        protected void setup(Context context) throws IOException, InterruptedException {  

  74.            BufferedReader br = null;  

  75.            //获得当前作业的DistributedCache相关文件  

  76.            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());  

  77.            String cityInfo = null;  

  78.            for(Path p : distributePaths){  

  79.                if(p.toString().endsWith("tb_dim_city.dat")){  

  80.                    //读缓存文件,并放到mem中  

  81.                    br = new BufferedReader(new FileReader(p.toString()));  

  82.                    while(null!=(cityInfo=br.readLine())){  

  83.                        String[] cityPart = cityInfo.split("\\|",5);  

  84.                        if(cityPart.length ==5){  

  85.                            city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);  

  86.                        }  

  87.                    }  

  88.                }  

  89.            }  

  90.        }

  91.        /**  

  92.         * Map端的实现相当简单,直接判断tb_user_profiles.dat中的  

  93.         * cityID是否存在我的map中就ok了,这样就可以实现Map Join了  

  94.         */

  95.        @Override

  96.        protected void map(Object key, Text value, Context context)  

  97.                throws IOException, InterruptedException {  

  98.            //排掉空行  

  99.            if(value == null || value.toString().equals("")){  

  100.                return;  

  101.            }  

  102.            mapInputStr = value.toString();  

  103.            mapInputSpit = mapInputStr.split("\\|",4);  

  104.            //过滤非法记录  

  105.            if(mapInputSpit.length != 4){  

  106.                return;  

  107.            }  

  108.            //判断链接字段是否在map中存在  

  109.            city_secondPart = city_info.get(mapInputSpit[3]);  

  110.            if(city_secondPart != null){  

  111.                this.outPutKey.set(mapInputSpit[3]);  

  112.                this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);  

  113.                context.write(outPutKey, outPutValue);  

  114.            }  

  115.        }  

  116.    }  


  117.    @Override

  118.    public int run(String[] args) throws Exception {  

  119.            Configuration conf=getConf(); //获得配置文件对象  

  120.            DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件  

  121.            Job job=new Job(conf,"MapJoinMR");  

  122.            job.setNumReduceTasks(0);

  123.            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径  

  124.            FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径

  125.            job.setJarByClass(MapSideJoinMain.class);  

  126.            job.setMapperClass(LeftOutJoinMapper.class);

  127.            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式  

  128.            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

  129.            //设置map的输出key和value类型  

  130.            job.setMapOutputKeyClass(Text.class);

  131.            //设置reduce的输出key和value类型  

  132.            job.setOutputKeyClass(Text.class);  

  133.            job.setOutputValueClass(Text.class);  

  134.            job.waitForCompletion(true);  

  135.            return job.isSuccessful()?0:1;  

  136.    }  


  137.    public static void main(String[] args) throws IOException,  

  138.            ClassNotFoundException, InterruptedException {  

  139.        try {  

  140.            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);  

  141.            System.exit(returnCode);  

  142.        } catch (Exception e) {  

  143.            // TODO Auto-generated catch block  

  144.            logger.error(e.getMessage());  

  145.        }  

  146.    }  

  147. }

DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性,接下来有用再整理研究一下写一篇blog,这里就不详细说了。

 

另外还有一种比较变态的Map Join方式,就是结合HBase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。

3)semi join

就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。

说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。


  1. package com.mr.SemiJoin;  


  2. import java.io.BufferedReader;  

  3. import java.io.FileReader;  

  4. import java.io.IOException;  

  5. import java.util.ArrayList;  

  6. import java.util.HashSet;  

  7. import org.apache.hadoop.conf.Configuration;  

  8. import org.apache.hadoop.conf.Configured;  

  9. import org.apache.hadoop.filecache.DistributedCache;  

  10. import org.apache.hadoop.fs.Path;  

  11. import org.apache.hadoop.io.Text;  

  12. import org.apache.hadoop.mapreduce.Job;  

  13. import org.apache.hadoop.mapreduce.Mapper;  

  14. import org.apache.hadoop.mapreduce.Reducer;  

  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

  16. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  

  17. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  

  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

  20. import org.apache.hadoop.util.Tool;  

  21. import org.apache.hadoop.util.ToolRunner;  

  22. import org.slf4j.Logger;  

  23. import org.slf4j.LoggerFactory;  


  24. /**  

  25. * @author zengzhaozheng  

  26. *  

  27. * 用途说明:  

  28. * reudce side join中的left outer join  

  29. * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段  

  30. * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)  

  31. * tb_dim_city.dat文件内容,分隔符为"|":  

  32. * id     name  orderid  city_code  is_show  

  33. * 0       其他        9999     9999         0  

  34. * 1       长春        1        901          1  

  35. * 2       吉林        2        902          1  

  36. * 3       四平        3        903          1  

  37. * 4       *        4        904          1  

  38. * 5       通化        5        905          1  

  39. * 6       辽源        6        906          1  

  40. * 7       白城        7        907          1  

  41. * 8       白山        8        908          1  

  42. * 9       延吉        9        909          1  

  43. * -------------------------风骚的分割线-------------------------------  

  44. * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)  

  45. * tb_user_profiles.dat文件内容,分隔符为"|":  

  46. * userID   network     flow    cityID  

  47. * 1           2G       123      1  

  48. * 2           3G       333      2  

  49. * 3           3G       555      1  

  50. * 4           2G       777      3  

  51. * 5           3G       666      4  

  52. * -------------------------风骚的分割线-------------------------------  

  53. * joinKey.dat内容:  

  54. * city_code  

  55. * 1  

  56. * 2  

  57. * 3  

  58. * 4  

  59. * -------------------------风骚的分割线-------------------------------  

  60. *  结果:  

  61. *  1   长春  1   901 1   1   2G  123  

  62. *  1   长春  1   901 1   3   3G  555  

  63. *  2   吉林  2   902 1   2   3G  333  

  64. *  3   四平  3   903 1   4   2G  777  

  65. *  4   *  4   904 1   5   3G  666  

  66. */

  67. public class SemiJoin extends Configured implements Tool{  

  68.    private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);  

  69.    public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {  

  70.        private CombineValues combineValues = new CombineValues();  

  71.        private HashSet<String> joinKeySet = new HashSet<String>();  

  72.        private Text flag = new Text();  

  73.        private Text joinKey = new Text();  

  74.        private Text secondPart = new Text();  

  75.        /**  

  76.         * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b  

  77.         */

  78.        @Override

  79.        protected void setup(Context context) throws IOException, InterruptedException {  

  80.            BufferedReader br = null;  

  81.            //获得当前作业的DistributedCache相关文件  

  82.            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());

  83.            String joinKeyStr = null;  

  84.            for(Path p : distributePaths){  

  85.                if(p.toString().endsWith("joinKey.dat")){  

  86.                    //读缓存文件,并放到mem中  

  87.                    br = new BufferedReader(new FileReader(p.toString()));  

  88.                    while(null!=(joinKeyStr=br.readLine())){  

  89.                        joinKeySet.add(joinKeyStr);  

  90.                    }  

  91.                }  

  92.            }  

  93.        }  


  94.        @Override

  95.        protected void map(Object key, Text value, Context context)  

  96.                throws IOException, InterruptedException {  

  97.            //获得文件输入路径  

  98.            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();  

  99.            //数据来自tb_dim_city.dat文件,标志即为"0"  

  100.            if(pathName.endsWith("tb_dim_city.dat")){  

  101.                String[] valueItems = value.toString().split("\\|");  

  102.                //过滤格式错误的记录  

  103.                if(valueItems.length != 5){  

  104.                    return;  

  105.                }  

  106.                //过滤掉不需要参加join的记录  

  107.                if(joinKeySet.contains(valueItems[0])){  

  108.                    flag.set("0");  

  109.                    joinKey.set(valueItems[0]);  

  110.                    secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);  

  111.                    combineValues.setFlag(flag);  

  112.                    combineValues.setJoinKey(joinKey);  

  113.                    combineValues.setSecondPart(secondPart);  

  114.                    context.write(combineValues.getJoinKey(), combineValues);  

  115.                }else{  

  116.                    return ;  

  117.                }  

  118.            }//数据来自于tb_user_profiles.dat,标志即为"1"  

  119.            else if(pathName.endsWith("tb_user_profiles.dat")){  

  120.                String[] valueItems = value.toString().split("\\|");  

  121.                //过滤格式错误的记录  

  122.                if(valueItems.length != 4){  

  123.                    return;  

  124.                }  

  125.                //过滤掉不需要参加join的记录  

  126.                if(joinKeySet.contains(valueItems[3])){  

  127.                    flag.set("1");  

  128.                    joinKey.set(valueItems[3]);  

  129.                    secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);  

  130.                    combineValues.setFlag(flag);  

  131.                    combineValues.setJoinKey(joinKey);  

  132.                    combineValues.setSecondPart(secondPart);  

  133.                    context.write(combineValues.getJoinKey(), combineValues);  

  134.                }else{  

  135.                    return ;  

  136.                }  

  137.            }  

  138.        }  

  139.    }  


  140.    public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {  

  141.        //存储一个分组中的左表信息  

  142.        private ArrayList<Text> leftTable = new ArrayList<Text>();  

  143.        //存储一个分组中的右表信息  

  144.        private ArrayList<Text> rightTable = new ArrayList<Text>();  

  145.        private Text secondPar = null;  

  146.        private Text output = new Text();  


  147.        /**  

  148.         * 一个分组调用一次reduce函数  

  149.         */

  150.        @Override

  151.        protected void reduce(Text key, Iterable<CombineValues> value, Context context)  

  152.                throws IOException, InterruptedException {  

  153.            leftTable.clear();  

  154.            rightTable.clear();  

  155.            /**  

  156.             * 将分组中的元素按照文件分别进行存放  

  157.             * 这种方法要注意的问题:  

  158.             * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,  

  159.             * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最  

  160.             * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。  

  161.             */

  162.            for(CombineValues cv : value){  

  163.                secondPar = new Text(cv.getSecondPart().toString());  

  164.                //左表tb_dim_city  

  165.                if("0".equals(cv.getFlag().toString().trim())){  

  166.                    leftTable.add(secondPar);  

  167.                }  

  168.                //右表tb_user_profiles  

  169.                else if("1".equals(cv.getFlag().toString().trim())){  

  170.                    rightTable.add(secondPar);  

  171.                }  

  172.            }  

  173.            logger.info("tb_dim_city:"+leftTable.toString());  

  174.            logger.info("tb_user_profiles:"+rightTable.toString());  

  175.            for(Text leftPart : leftTable){  

  176.                for(Text rightPart : rightTable){  

  177.                    output.set(leftPart+ "\t" + rightPart);  

  178.                    context.write(key, output);  

  179.                }  

  180.            }  

  181.        }  

  182.    }  


  183.    @Override

  184.    public int run(String[] args) throws Exception {  

  185.            Configuration conf=getConf(); //获得配置文件对象  

  186.            DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);

  187.            Job job=new Job(conf,"LeftOutJoinMR");  

  188.            job.setJarByClass(SemiJoin.class);

  189.            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径  

  190.            FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径

  191.            job.setMapperClass(SemiJoinMapper.class);  

  192.            job.setReducerClass(SemiJoinReducer.class);

  193.            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式  

  194.            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

  195.            //设置map的输出key和value类型  

  196.            job.setMapOutputKeyClass(Text.class);  

  197.            job.setMapOutputValueClass(CombineValues.class);

  198.            //设置reduce的输出key和value类型  

  199.            job.setOutputKeyClass(Text.class);  

  200.            job.setOutputValueClass(Text.class);  

  201.            job.waitForCompletion(true);  

  202.            return job.isSuccessful()?0:1;  

  203.    }  

  204.    public static void main(String[] args) throws IOException,  

  205.            ClassNotFoundException, InterruptedException {  

  206.        try {  

  207.            int returnCode =  ToolRunner.run(new SemiJoin(),args);  

  208.            System.exit(returnCode);  

  209.        } catch (Exception e) {  

  210.            logger.error(e.getMessage());  

  211.        }  

  212.    }  

  213. }

    


上一篇:伪分布式yarn上运行mr程序


下一篇:filibuster an climate change