MapReduce进阶

1 、文件格式 文件格式按面向的存储形式不同,分为面向行和面向列的两大类文件格式。 2、 压缩格式 压缩格式按其可切分计算性,分为可切分计算和不可切分计算两种。 3、配置文件 //确认压缩 -Dmapred.output.compress=true \ //指定压缩方式 -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \ 4、 Partition partition是在map阶段完成后执行的。然后将分好区的数据传输到reduce端,也就是由Partitioner来决定每条记录应该送往哪个reducer节点。mapreduce中默认的分区是HashPartition类;
           核心代码:    
public static class MyPartition extends Partitioner<Text,Text> {
    @Override
    public int getPartition(Text text, Text text2, int i) {
        if("001".equals(text.toString())||"002".equals(text.toString())){
            return 0;
        }
        else {
            return  1;
        }
    }
}

5、MapReduce 个数的确定

在 Job 提交后,任务正式开始计算之前即已经确定   Map 数量的确定:由输入数据文件的总大小、数据格式、块大小综合确定,待冲刺环节详解。 Reduce 数量确定:系统根据输入数据量的大小自动确定,有固定的计算公式,待冲刺环节详 解。另外,用户可以自定义设置,通过参数配置,由用户决定。 6 、自定义 reduce 数量 -Dmapred.reduce.tasks=2 \ 最终效果:

  [yanyufei@cluster2 ~]$ hdfs dfs -text /user/yanyufei/output17/part-r-00000.gz
001    张一    政治    85    
[yanyufei@cluster2 ~]$ hdfs dfs -text /user/yanyufei/output17/part-r-00001.gz
002    张二    Java程序设计    95    
003    张三    政治    80    
004    张四    政治    80    
[yanyufei@cluster2 ~]$ 
[yanyufei@cluster2 ~]$ hdfs dfs -ls /user/yanyufei/output17/
Found 3 items
-rw-r--r--   3 yanyufei job018          0 2021-10-12 10:45 /user/yanyufei/output17/_SUCCESS
-rw-r--r--   3 yanyufei job018         43 2021-10-12 10:45 /user/yanyufei/output17/part-r-00000.gz
-rw-r--r--   3 yanyufei job018         81 2021-10-12 10:44 /user/yanyufei/output17/part-r-00001.gz

7、读取外部配置文件-Configuration 传递

7.1、步骤分解   实现基于 input_secondsort 文件的一次排序   将本地文件 whitelist.txt 传给 Driver 类,读取到该文件内容 txtContent   将 txtContent 通过 Configuration 的 set 方法传递给 map 和 reduce 任务 在 map 任务中通过 Configuration 对象的 get 方法获取传递过来的值 txtContent   将 txtContent 解析成 Set 对象,对 map 任务重的 map 方法进行过滤输出   由于 map 端已经做了过滤,reduce 端将不需任何改变 实现完整代码:
public class WhiteList {
    //map阶段负责对输入文件进行切分处理,然后汇总再分组给reduce进行处理,以达到高效的分布式计算效率
    public static class WhiteListMap extends Mapper<Object, Text,Text,Text>{
        public Text reskey=new Text();
        public Text resValue=new Text();
        public List<String> whiteList;
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //map执行之前先调用这个方法,全局只会调用一次,在这可以进行初始化
            //获取配置文件
            Configuration conf=context.getConfiguration();
            String whilteStr=conf.get("whiteList");
            //初始化whiteList变量
            whiteList= Arrays.asList(whilteStr.split("\t"));
        }

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[]values=value.toString().split("\t");
            if(whiteList.contains(values[0])){
                //在名单里应该展示
                reskey.set(values[0]);
                resValue.set(values[1]);
                context.write(reskey,resValue);
            }
        }
    }

    public static class WhilteListReducer extends Reducer<Text, Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
           for(Text val:values){
               context.write(key,val);
           }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //创建Configuration
        Configuration conf=new Configuration();
        //参数解析器,将mapreduce需要的参数传给配置文件,其他参数传给remainingArgs
        //处理参数,把mapred参数放到conf里,会把其他参数提取出来
        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        //获取输入的参数,设计输入参数
        //第一个参数 输入文件位置,用,隔开
        //第二个参数 输出文件位置
        //第三个参数 白名单地址
        String[] remainingArgs = optionParser.getRemainingArgs();

        //调用;把白名单文件内容获取到配置文件里
        fileread(conf,remainingArgs[2]);


        //创建任务Job
        Job job=Job.getInstance(conf,"白名单2");
        //指定Driver
        job.setJarByClass(WhiteList.class);
        //指定mapper
        job.setMapperClass(WhiteListMap.class);
        //设置partition
//        job.setPartitionerClass(Partition.MyPartition.class);
        //指定combine
//        job.setCombinerClass(CountAversge.CountAverageReduce.class);
        //指定reducer
        job.setReducerClass(WhilteListReducer.class);
        //输入输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //输入输出的文件
        for(String inputPath:remainingArgs[0].split(",")){
            FileInputFormat.addInputPath(job,new Path(inputPath));
        }
        FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));

        //执行
        System.exit(job.waitForCompletion(true)?0:1);
    }

    public static void fileread(Configuration conf,String filepath) throws IOException {
        //把白名单文件内容获取到配置文件里
        //IO流
        FileInputStream fis=new FileInputStream(filepath);
        //获取缓冲流
        BufferedReader br=new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));
        String line="";
        StringBuilder result=new StringBuilder();
        boolean isFirst=true;
        while ((line=br.readLine())!=null){
            if(!isFirst){
                result.append("\t");
            }else {
                isFirst=false;
            }
            result.append(line);
        }
        //在配置文件里定义变量,并且这个变量可以传递给么一个map
        conf.set("whiteList",result.toString());
    }
}
上一篇:04


下一篇:详细讲解MOSFET管驱动电路(转)