核心代码:
public static class MyPartition extends Partitioner<Text,Text> { @Override public int getPartition(Text text, Text text2, int i) { if("001".equals(text.toString())||"002".equals(text.toString())){ return 0; } else { return 1; } } }
5、MapReduce 个数的确定
在 Job 提交后,任务正式开始计算之前即已经确定 Map 数量的确定:由输入数据文件的总大小、数据格式、块大小综合确定,待冲刺环节详解。 Reduce 数量确定:系统根据输入数据量的大小自动确定,有固定的计算公式,待冲刺环节详 解。另外,用户可以自定义设置,通过参数配置,由用户决定。 6 、自定义 reduce 数量 -Dmapred.reduce.tasks=2 \ 最终效果: [yanyufei@cluster2 ~]$ hdfs dfs -text /user/yanyufei/output17/part-r-00000.gz
001 张一 政治 85
[yanyufei@cluster2 ~]$ hdfs dfs -text /user/yanyufei/output17/part-r-00001.gz
002 张二 Java程序设计 95
003 张三 政治 80
004 张四 政治 80
[yanyufei@cluster2 ~]$
[yanyufei@cluster2 ~]$ hdfs dfs -ls /user/yanyufei/output17/
Found 3 items
-rw-r--r-- 3 yanyufei job018 0 2021-10-12 10:45 /user/yanyufei/output17/_SUCCESS
-rw-r--r-- 3 yanyufei job018 43 2021-10-12 10:45 /user/yanyufei/output17/part-r-00000.gz
-rw-r--r-- 3 yanyufei job018 81 2021-10-12 10:44 /user/yanyufei/output17/part-r-00001.gz
7、读取外部配置文件-Configuration 传递
7.1、步骤分解 实现基于 input_secondsort 文件的一次排序 将本地文件 whitelist.txt 传给 Driver 类,读取到该文件内容 txtContent 将 txtContent 通过 Configuration 的 set 方法传递给 map 和 reduce 任务 在 map 任务中通过 Configuration 对象的 get 方法获取传递过来的值 txtContent 将 txtContent 解析成 Set 对象,对 map 任务重的 map 方法进行过滤输出 由于 map 端已经做了过滤,reduce 端将不需任何改变 实现完整代码:public class WhiteList { //map阶段负责对输入文件进行切分处理,然后汇总再分组给reduce进行处理,以达到高效的分布式计算效率 public static class WhiteListMap extends Mapper<Object, Text,Text,Text>{ public Text reskey=new Text(); public Text resValue=new Text(); public List<String> whiteList; @Override protected void setup(Context context) throws IOException, InterruptedException { //map执行之前先调用这个方法,全局只会调用一次,在这可以进行初始化 //获取配置文件 Configuration conf=context.getConfiguration(); String whilteStr=conf.get("whiteList"); //初始化whiteList变量 whiteList= Arrays.asList(whilteStr.split("\t")); } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[]values=value.toString().split("\t"); if(whiteList.contains(values[0])){ //在名单里应该展示 reskey.set(values[0]); resValue.set(values[1]); context.write(reskey,resValue); } } } public static class WhilteListReducer extends Reducer<Text, Text,Text,Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text val:values){ context.write(key,val); } } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //创建Configuration Configuration conf=new Configuration(); //参数解析器,将mapreduce需要的参数传给配置文件,其他参数传给remainingArgs //处理参数,把mapred参数放到conf里,会把其他参数提取出来 GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); //获取输入的参数,设计输入参数 //第一个参数 输入文件位置,用,隔开 //第二个参数 输出文件位置 //第三个参数 白名单地址 String[] remainingArgs = optionParser.getRemainingArgs(); //调用;把白名单文件内容获取到配置文件里 fileread(conf,remainingArgs[2]); //创建任务Job Job job=Job.getInstance(conf,"白名单2"); //指定Driver job.setJarByClass(WhiteList.class); //指定mapper job.setMapperClass(WhiteListMap.class); //设置partition // job.setPartitionerClass(Partition.MyPartition.class); //指定combine // job.setCombinerClass(CountAversge.CountAverageReduce.class); //指定reducer job.setReducerClass(WhilteListReducer.class); //输入输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //输入输出的文件 for(String inputPath:remainingArgs[0].split(",")){ FileInputFormat.addInputPath(job,new Path(inputPath)); } FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1])); //执行 System.exit(job.waitForCompletion(true)?0:1); } public static void fileread(Configuration conf,String filepath) throws IOException { //把白名单文件内容获取到配置文件里 //IO流 FileInputStream fis=new FileInputStream(filepath); //获取缓冲流 BufferedReader br=new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8)); String line=""; StringBuilder result=new StringBuilder(); boolean isFirst=true; while ((line=br.readLine())!=null){ if(!isFirst){ result.append("\t"); }else { isFirst=false; } result.append(line); } //在配置文件里定义变量,并且这个变量可以传递给么一个map conf.set("whiteList",result.toString()); } }