1、数据样本,w1.csv到w5.csv,每个文件数据样本2000条,第一列是年份从1990到2000随机,第二例是数据从1-100随机,本例辅助排序目标是找出每年最大值,实际上结果每年最大就是100,但是这里通过mapreduce辅助排序方式来找。
1999,71 1994,57 1995,33 1993,44 1994,99 1994,83 1995,59 ... ...
2、核心概念:
这里先预先假设有海量的数据,所以按照hash算法将所有数据分区后,但是确保同一年的数据进入到同一个分区,也就是同一个reduce里,但是因为采用组合键的key,并不能保证同一年的键都到一个组里。 因为还有可能在这个分区里还有其他年份的数据。所以在分区后,需要再进行分组,确保即使一个分区里有多个年份数据,但是每个年份数据都在自己的分组里运行。 分区的目的就是增加并行计算,把各年份的数据分到不同的reduce 因为资源的问题,分区不能无限,但是年份会有很多,所以总会有的分区会多出一年的数据。 这个时候就需要分区内再额外分组,确保符合键(或者自定义对象)可以合并到一个分组里。
3、IntPair,本例把整行数据解析后,年份和数据都放入key,需要自定义一个IntPair对象,实际生产环境中可根据需求自定义各种类.
public class IntPair implements WritableComparable<IntPair> { private int first; private int second; public IntPair() { } public IntPair(int first, int second) { this.first = first; this.second = second; } public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public int compareTo(IntPair o) { int result = Integer.valueOf(first).compareTo(o.getFirst()); if(result==0){ result = Integer.valueOf(second).compareTo(o.getSecond()); } return result; } public static int compare(int first1,int first2){ return Integer.valueOf(first1).compareTo(Integer.valueOf(first2)); } @Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public void readFields(DataInput in) throws IOException { first = in.readInt(); second = in.readInt(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; IntPair intPair = (IntPair) o; if (first != intPair.first) return false; return second == intPair.second; } @Override public int hashCode() { int result = first; result = 31 * result + second; return result; } @Override public String toString() { return first+"\t"+second; } }
4、RecordParser,记录解析器,用于解析数据,规避错误数据
public class RecordParser { private int year; private int data; private boolean valid; public int getYear() { return year; } public int getData() { return data; } public boolean isValid() { return valid; } public void parse(String value){ String[] sValue = value.split(","); try { year = Integer.parseInt(sValue[0]); data = Integer.parseInt(sValue[1]); valid = true; }catch (Exception e){ valid = false; } } }
5、分区器
/** * @Author: xu.dm * @Date: 2019/2/21 11:56 * @Description:根据key进行分区,确保同一个key.first进入相同的分区,泛型类型和mapper输出一致 */ public class FirstPartitioner extends Partitioner<IntPair,IntWritable> { /** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job. * <p> * <p>Typically a hash function on a all or a subset of the key.</p> * * @param key the key to be partioned. * @param value the entry value. * @param numPartitions the total number of partitions. * @return the partition number for the <code>key</code>. */ @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { return Math.abs(key.getFirst() * 127) % numPartitions; } }
6、key比较器,map阶段的key排序使用,如果没有分组比较器,则key比较器也会应用在混洗和reduce阶段。
/** * @Author: xu.dm * @Date: 2019/2/21 11:59 * @Description: key比较器 * 对IntPair的first升序,second降序,在mapper排序的时候被应用 * 最终同样年份的数据第一条是最大的。 */ public class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class,true);//需要实例化 } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair p1=(IntPair)a; IntPair p2=(IntPair)b; int result = IntPair.compare(p1.getFirst(),p2.getFirst()); if(result==0){ result = -IntPair.compare(p1.getSecond(),p2.getSecond()); //前面加一个减号求反 } return result; } }
7、分组比较器,这里最关键,看注释。
/** * @Author: xu.dm * @Date: 2019/2/21 12:16 * @Description: 分组比较器,确保同一个年份的数据在同一个组里 * 这个是本例最特殊的地方,之前key比较器使得key值中的年份升序,数据降序排列。 * 那么这个分组比较器只按年进行比较,意味着,[1990,100]和[1990,00]会被认为是相同的分组, * 而,reduce阶段,相同的KEY只取第一个,哦也,这个时候,reduce阶段后,年份中最大的数据就被保存下来,其他数据都被kickout * 所以,用这种方式变相的达到取最大值得效果。 */ public class GroupComparator extends WritableComparator { public GroupComparator() { super(IntPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair p1=(IntPair)a; IntPair p2=(IntPair)b; return IntPair.compare(p1.getFirst(),p2.getFirst()); } }
8、mapper,如果只取年份里的最大数据,Mapper<LongWritable,Text,IntPair,IntWritable> 的IntWritable可以用NullWritable,这里保留IntWritable是因为,程序稍加改动就可以输出所有年份数据的计数
public class DataMapper extends Mapper<LongWritable,Text,IntPair,IntWritable> { private RecordParser parser = new RecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value.toString()); if(parser.isValid()){ context.write(new IntPair(parser.getYear(),parser.getData()),new IntWritable(1)); context.getCounter("MapValidData","dataCounter").increment(1); //做一个计数,总的数据应该是10000条。 } } }
9、reducer
public class DataReducer extends Reducer<IntPair,IntWritable,IntPair,IntWritable> { @Override protected void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //因为分组器,[1990,100]和[1990,00]会被认为是相同的分组 //这里的计数就会混淆。如果需要年份下各数据的正确的计数结果,则需要注销分组器 // for(IntWritable val:values){ // sum+=val.get(); // } context.write(key,new IntWritable(sum)); } }
10、job
public class DataSecondarySort extends Configured implements Tool { /** * Execute the command with the given arguments. * * @param args command specific arguments. * @return exit code. * @throws Exception */ @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf,"Secondary Sort"); // conf.set("mapreduce.job.ubertask.enable","true"); if(conf==null){ return -1; } job.setJarByClass(DataSecondarySort.class); job.setMapperClass(DataMapper.class); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(KeyComparator.class);
// 决定如何分组 job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(DataReducer.class); // job.setNumReduceTasks(2);//如果数据海量,则可以根据情况设置reduce的数目,也是分区的数量,通过Tool类,也可以在命令行进行设置 job.setOutputKeyClass(IntPair.class); //如果只求最大数,前面的mapper,reducer和这里的输出都可以设置成NullWritable job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); Path outPath = new Path(args[1]); FileSystem fileSystem = outPath.getFileSystem(conf); //删除输出路径 if(fileSystem.exists(outPath)) { fileSystem.delete(outPath,true); } return job.waitForCompletion(true) ? 0:1; } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new DataSecondarySort(),args); System.exit(exitCode); } }
11、如果只求最大值,结果会是这样:
[hadoop@bigdata-senior01 ~]$ hadoop fs -cat /output3/part-r-00000 | more 1990 100 0 1991 100 0 1992 100 0 1993 100 0 1994 100 0 1995 100 0 1996 100 0 1997 100 0 1998 100 0 1999 100 0 2000 100 0
如果要求最大值和计数则会列出所有数据,当然需要注销分组器的set代码,并打开reducer的sum
[hadoop@bigdata-senior01 ~]$ hadoop fs -cat /output/part-r-00000 | more 1990 100 10 1990 99 15 1990 98 10 1990 97 9 1990 96 6 1990 95 4 1990 94 12 1990 93 9 1990 92 12 1990 91 13 1990 90 8 1990 89 9 ... ...
多个分区可以使用job.setNumReduceTasks(n),或者在命令行上指定
[hadoop@bigdata-senior01 ~]$ hadoop jar DataSecondarySort.jar -D mapreduce.job.reduces=3 /sampler /output3
[hadoop@bigdata-senior01 ~]$ hadoop fs -ls /output3
Found 4 items
-rw-r--r-- 1 hadoop supergroup 0 2019-02-21 15:29 /output3/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 2868 2019-02-21 15:29 /output3/part-r-00000
-rw-r--r-- 1 hadoop supergroup 3860 2019-02-21 15:29 /output3/part-r-00001
-rw-r--r-- 1 hadoop supergroup 3850 2019-02-21 15:29 /output3/part-r-00002
总结一下:最容易混淆的概念就是分组,而排序和分组实际上就是MapReduce最核心的地方。