TopK的一个简单实现

转自:http://rangerwolf.iteye.com/blog/2119096

题外话:

《Hadoop in Action》 是一本非常不错的交Hadoop的入门书,而且建议看英文版。此书作者的英文表达非常简单易懂。相信有一定英文阅读能力的同学直接用英文版就能非常容易的上手~

进入正题。 这个题目是《Hadoop in Action》 上面的一道题目,求出Top K的值。

我自己随便弄了一个输入文件:

  1. g   445
  2. a   1117
  3. b   222
  4. c   333
  5. d   444
  6. e   123
  7. f   345
  8. h   456

讲讲我的思路:

对于Top K的问题,首先要在每个block/分片之中找到这部分的Top K。并且由于只能输出一次,所以输出的工作需要在cleanup方法之中进行。为了简单,使用的是java之中的TreeMap,因为这个数据结构天生就带有排序功能。 而Reducer的工作流程跟Map其实是完全一致的,只是光Map一步还不够,所以只能再加一个Reduce步骤。

最终输出的格式为如下:(K=2)

  1. 1117    a
  2. 456    g

所以需要使用map。 如果只需要输出大小的话,直接使用TreeSet会更高效一点。

下面是实现的代码:

  1. package hadoop_in_action_exersice;
  2. import java.io.IOException;
  3. import java.util.TreeMap;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. public class TopK {
  15. public static final int K = 2;
  16. public static class KMap extends Mapper<LongWritable, Text, IntWritable, Text> {
  17. TreeMap<Integer, String> map = new TreeMap<Integer, String>();
  18. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  19. String line = value.toString();
  20. if(line.trim().length() > 0 && line.indexOf("\t") != -1) {
  21. String[] arr = line.split("\t", 2);
  22. String name = arr[0];
  23. Integer num = Integer.parseInt(arr[1]);
  24. map.put(num, name);
  25. if(map.size() > K) {
  26. map.remove(map.firstKey());
  27. }
  28. }
  29. }
  30. @Override
  31. protected void cleanup(
  32. Mapper<LongWritable, Text, IntWritable, Text>.Context context)
  33. throws IOException, InterruptedException {
  34. for(Integer num : map.keySet()) {
  35. context.write(new IntWritable(num), new Text(map.get(num)));
  36. }
  37. }
  38. }
  39. public static class KReduce extends Reducer<IntWritable, Text, IntWritable, Text> {
  40. TreeMap<Integer, String> map = new TreeMap<Integer, String>();
  41. public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  42. map.put(key.get(), values.iterator().next().toString());
  43. if(map.size() > K) {
  44. map.remove(map.firstKey());
  45. }
  46. }
  47. @Override
  48. protected void cleanup(
  49. Reducer<IntWritable, Text, IntWritable, Text>.Context context)
  50. throws IOException, InterruptedException {
  51. for(Integer num : map.keySet()) {
  52. context.write(new IntWritable(num), new Text(map.get(num)));
  53. }
  54. }
  55. }
  56. public static void main(String[] args) {
  57. // TODO Auto-generated method stub
  58. Configuration conf = new Configuration();
  59. try {
  60. Job job = new Job(conf, "my own word count");
  61. job.setJarByClass(TopK.class);
  62. job.setMapperClass(KMap.class);
  63. job.setCombinerClass(KReduce.class);
  64. job.setReducerClass(KReduce.class);
  65. job.setOutputKeyClass(IntWritable.class);
  66. job.setOutputValueClass(Text.class);
  67. FileInputFormat.setInputPaths(job, new Path("/home/hadoop/DataSet/Hadoop/WordCount-Result"));
  68. FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/DataSet/Hadoop/TopK-output1"));
  69. System.out.println(job.waitForCompletion(true));
  70. } catch (IOException e) {
  71. // TODO Auto-generated catch block
  72. e.printStackTrace();
  73. } catch (ClassNotFoundException e) {
  74. // TODO Auto-generated catch block
  75. e.printStackTrace();
  76. } catch (InterruptedException e) {
  77. // TODO Auto-generated catch block
  78. e.printStackTrace();
  79. }
  80. }
  81. }
上一篇:Thinkphp 缓存微信jssdk相关认证参数


下一篇:一文详解面试常考的TopK问题