利用采样器实现mapreduce任务输出全排序

采样器是hadoop内自带的一个可以对目标文件部分数据进行提取的工具类,以方便我们对这些采样的数据做一些参考或者处理。hadoop提供了多种采样器供我们使用,以满足不同的需求。另外,采样器不同于普通mapreduce操作。它是直接在客户端机器上运行的。
常见采样器
IntervalSampler 以一定的间隔定期从划分中选择key,对有排序的数据来说更好
RandomSameler 以指定的采样率均匀的从数据集中选择样本
SplitSampler 只采样一个分片的前n条记录,所以不适合有排序的数据

最简单的使用采样器举例
例1:
JobConf conf = new JobConf(SamplerTest.class);
conf.setJobName("Sampler");

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

//设置采样率为0.1,最大样本数1000,最大分区数10(在最多10个分区中,按照约总条数*0.1的间隔取出一个样本,最多1000个样本,注意每条提取的顺序也是被打乱了的),采样器在客户端运行
//理论上RandomSampler<K, V>中的k,v和inputformat中的map传入值类型一致,但是这里好像有个bug,在后面的sampler.getSample(inf, conf)中返回的list使用的toArrays()方法,这样返回的就只能是Object数组了,所以k只能为Object,否则会抛类转换异常。另外v在采样时没什么用  
RandomSampler<Object, Text> sampler = new RandomSampler<Object, Text>(0.1, 1000, 10);
final InputFormat<Object,Text> inf = (InputFormat<Object,Text>) conf.getInputFormat();

//因为sampler.getSample(inf, conf)的返回类型由inf中的InputFormat中的K,V决定,所以这里也要转换成对应的K,V
//conf.getInputFormat()从配置中获得inputformat,若不配置,默认使用TextInputFormat(这里就没配置)
Object[] keys = sampler.getSample(inf, conf);
for(Object l:keys){
 LongWritable l2 = (LongWritable)l;
 System.out.println(l2);
}
return 0;
    上面的例子只是对采样器进行了简单的使用,从总条数*0.1的间隔取出一个样本,最多1000个样本,最大10个分区中取出样本。并将这些样本打印出来。注意,若这里面任意一个条件满足。则取样本操作结束。比如已经取到了1000个样本,或者已经取了10个分区时,这时就认为样本已提取完成。
这个例子可以看出两个问题:
1.该例子没有进行runJob,所以没有执行mapreduce任务。由此看来采样器的获取是在客户端中进行的
2.该例子得到的sample只能是key的sample,不能是value的sample。若要得到value的sample则需要自己重写该方法了。

利用样本来对输出内容进行全排序
使用hadoop对输出文件进行全排序有很多方法,他们各有优缺点,这里总结一下各个方法的优劣:
1.直接使用一个reduce,且输出一个文件。
    该操作的优点在于简单,无须进行任何排序操作,hadoop在进行reduce的时候就对key进行了内部排序。所以直接输出结果就醒了。但是它的缺点也是显而易见的,就是一旦数据量大了以后,采用一个reduce对数据进行处理显然没有达到分布式处理的效果,有违hadoop的操作宗旨。
2.自定义partitioner,按范围来区分多个reduce任务,输出多个文件,然后再将这些文件进行组合。
    该操作的优点在于能够有效的将大数据进行分布式处理,达到了一定程度的负载均衡。但是缺点是用户依赖较大。除了要自己写partitioner,用户还要要事先知道数据的取之范围,而且还要了解数据分布情况,若数据分布不均匀的时候也可能出现负载不均衡的情况。如一个包含了各地温度的数据。在-20~0度约有5%,0~20度约有80%,20~40度约有14%。若按照每20度来进行划分的话,显然不能达到均衡的效果。
3.采用hadoop提供的TotalOrderPartitioner分区工具进行全排序
    该操作实际上就是hadoop内部使用了采样器来对数据进行采样,然后内部按分布比例进行分区。整个动作已经帮开发者做好了,所以它的优点是能够最大限度的达到负载均衡。

方法1比较简单,下面分别对方法2和方法3做一个简要描述和举例:

利用方法2实现全排序:自定义partitioner的mapreduce任务来实现全排序

    采用hadoop提供的TotalOrderPartitioner分区工具实现全排序简单方便。但是有的时候我们可能需要增加一些自定义的东西来实现更通用的扩展。这个时候就有必要自己写一个类似的功能来实现全排序了。
全排序的思想无外乎就是先对数据进行采样,然后根据采样的数据进行分区。最后得到按指定分区好的并且已排序的数据。按照这个思想,我们就可以写mapreduce任务了。

例2:
1.编写mapper:mapper任务很简单,就是将获取到的数据直接派发给reduce,当然hadoop会合并这些相同的key。
public class MyMap extends MapReduceBase implements
  Mapper<LongWritable, Text, LongWritable, IntWritable> {
 @Override
 public void map(LongWritable key, Text val,
   OutputCollector<LongWritable, IntWritable> output, Reporter reporter)
   throws IOException {
  output.collect(key, new IntWritable(1));
 }

}
2.编写reducer:reducer的任务就是对已排好序的key进行处理并输出。
public class MyRed extends MapReduceBase implements
  Reducer<LongWritable, IntWritable, Text, NullWritable> {
 @Override
 public void reduce(LongWritable key, Iterator<IntWritable> values,
   OutputCollector<Text, NullWritable> output, Reporter reporter)
   throws IOException {
  while(values.hasNext()){
   values.next();
   output.collect(new Text(key.toString()), NullWritable.get());
  }
 }
}
3.编写partitioner:partitioner的作用就是将map任务的结果交给哪个reduce处理,这里需要用到已建立好缓存数据(该数据会在运行客户端主程序时)。该缓存数据告诉patitioner将哪些范围中的数据放到哪个reducer中去
public class MyPartitions implements Partitioner<LongWritable,IntWritable> {

 @Override
 public void configure(JobConf job) {
 }
 @Override
 public int getPartition(LongWritable key, IntWritable value,
   int numPartitions) {
  //PartitionsUtils.findPartitions的作用就是依据分布式缓存数据,按key进行分区,具体代码就不贴了,具体操作如下
  //获取分布式缓存文件中的数据,取得对应的数字,这些数字按从小到达顺序排列,返回其插值序数。比如:3,3,9。那么当key<3时返回0,key=3时返回1,3<key<=9时返回2,key>9时返回3。
  return PartitionsUtils.findPartitions(key,value,numPartitions);
 }
}
4.最后编写客户端主程序:运行mapreduce任务驱动程序,并产生分布式缓存数据
public class SamplerTest3 extends Configured implements Tool {

 public int run(String[] args) throws Exception {
  int reduceTasks = 4;
  JobConf conf = new JobConf(SamplerTest3.class);
  conf.setJobName("Sampler");

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  //MyTextInputFormat是自定义的一个InputFormat,该InputFormat与TextInputFormat的区别是key就是Long.parseLong(value.toString());
  conf.setInputFormat(MyTextInputFormat.class);
  conf.setOutputFormat(TextOutputFormat.class);

  conf.setOutputKeyClass(Text.class);
  //因为是返回原数据中已排好序的数字,而这些数据是来自key的,无须value,所以value类型为NullWritable即可
  conf.setOutputValueClass(NullWritable.class);
  
  conf.setMapOutputKeyClass(LongWritable.class);
  conf.setMapOutputValueClass(IntWritable.class);

  conf.setReducerClass(MyRed.class);
  conf.setMapperClass(MyMap.class);
  
  conf.setNumReduceTasks(reduceTasks);
  conf.setPartitionerClass(MyPartitions.class);
  
  //进行采样
  RandomSampler<Object, Text> sampler = new RandomSampler<Object, Text>(0.1, 1000, 10);
  final InputFormat<Object,Text> inf = (InputFormat<Object,Text>) conf.getInputFormat();
  Object[] keys = sampler.getSample(inf, conf);
  Path catchPath = new Path("/cache");
  URI partitionUri = new URI(catchPath.toString()+ "#myCache");

  FileSystem fs = FileSystem.get(conf);
  FSDataOutputStream out = fs.create(catchPath);
  //MakeCacheFile方法为写入分布式缓存文件内容。
  //内容如下:keys为取到的一系列的数字样本。这里将这些样本数据按从小到大进行排序,然后平分成n段,共reduceTasks段
  //取右端的数,存入分布式缓存中。最后一段不用存
  //比如采样的数字样本是1112|2222|2367|7788 分成了4段,取右端的数就是2,2,7,8。再去除最后一个数最终得2,2,7,存入分布式缓存中
  MakeCacheFile(out,keys,reduceTasks);
  IOUtils.closeStream(out);
  
  //添加到分布式缓存
  DistributedCache.addCacheFile(partitionUri, conf);
  DistributedCache.createSymlink(conf);
  
  JobClient.runJob(conf);
  return 0;
 }

 public static void main(String[] args) throws Exception {
  System.exit(ToolRunner.run(new SamplerTest3(), args));
 }

}
利用方法3实现全排序:采用hadoop提供的TotalOrderPartitioner分区工具来实现全排序

    采用hadoop的TotalOrderPartitioner简化了开发,原理实际上和上面的例子类似,都是先采样,然后进行分区。但是我们无需关心如何划分partition,并且也不用关心分布式缓存如何生成和使用。只要提供对应的key的样本即可。下面这个例子就是使用了hadoop提供的分区工具。

例3:
public class SamplerTest4 extends Configured implements Tool {

 public int run(String[] args) throws Exception {
  JobConf conf = new JobConf(SamplerTest4.class);
  conf.setJobName("Sampler");

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  //MyTextInputFormat和例2一样
  conf.setInputFormat(MyTextInputFormat.class);
  conf.setOutputFormat(TextOutputFormat.class);

  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(NullWritable.class);
  
  conf.setMapOutputKeyClass(LongWritable.class);
  conf.setMapOutputValueClass(IntWritable.class);

  //MyRed,MyMap和例2一样
  conf.setReducerClass(MyRed.class);
  conf.setMapperClass(MyMap.class);

  conf.setNumReduceTasks(4);
  conf.setPartitionerClass(TotalOrderPartitioner.class);

  
  RandomSampler<Object, Text> sampler = new RandomSampler<Object, Text>(0.1, 1000, 10);
  //告诉hadoop分布式缓存文件放在哪里好
  Path catchPath = new Path("/partitionFile");
  TotalOrderPartitioner.setPartitionFile(conf, catchPath);
  //自动生成缓存文件
  InputSampler.writePartitionFile(conf, sampler);

  URI partitionUri = new URI(catchPath.toString()+ "#_partitions");
  //添加到分布式缓存
  DistributedCache.addCacheFile(partitionUri, conf);
  DistributedCache.createSymlink(conf);
  JobClient.runJob(conf);
  return 0;
 }
 public static void main(String[] args) throws Exception {
  System.exit(ToolRunner.run(new SamplerTest4(), args));
 }
}
    上面只是对有关全排序进行一个针对性的描述。实际使用中还可以进一步优化,比如采用压缩机制。或者自定义优化对key的排序,这样会时mapredoce任务运行效率更高。

上一篇:对Map的按值排序


下一篇:某大型互联网企业用户上网行为日志分析系统——云计算项目实战