mapreduce (三) MapReduce实现倒排索引(二)

hadoop api
http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/mapreduce/Reducer.html
改变一下需求:要求“文档词频列表”是经过排序的,即 出现次数高的再前

思路:

代码:
package proj;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.GenericOptionsParser; public class InvertedIndexSortByFreq { // 将词分为<word:num,docid>
public static class InvertedIndexMapper extends
Mapper<Object, Text, Text, Text> { private Text keyInfo = new Text();
private Text valInfo = new Text();
private FileSplit split; public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(" ");
split = (FileSplit) context.getInputSplit();
String docid = split.getPath().getName();
Map<String, Integer> map = new HashMap<String, Integer>();
for (String token : tokens) {
if (map.containsKey(token)) {
Integer newInt = new Integer(map.get(token) + 1);
map.put(token, newInt);
} else {
map.put(token, 1);
}
}
for (String k : map.keySet()) {
Integer num = map.get(k);
keyInfo.set(k + ":" + num);
valInfo.set(docid);
context.write(keyInfo, valInfo);
}
}
} public static class InvertedIndexPartioner extends
HashPartitioner<Text, Text> { private Text term = new Text(); public int getPartition(Text key, Text value, int numReduceTasks) {
term.set(key.toString().split(":")[0] + ":" + value);
return super.getPartition(term, value, numReduceTasks);
}
} // 组合成倒排索引文档
public static class InvertedIndexReducer extends
Reducer<Text, Text, Text, Text> {
private Text keyInfo = new Text(); private Text valInfo = new Text(); private String tPrev = null; private StringBuffer buff = new StringBuffer(); public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException { String[] tokens = key.toString().split(":");
String current = tokens[0]; if (tPrev == null) {
tPrev = current;
for (Text val : values) {
buff.append(tokens[1] + ":" + val.toString() + ";");
}
} if(tPrev.equals(current)){
for (Text val : values) {
buff.append(tokens[1] + ":" + val.toString() + ";");
}
}else{
keyInfo.set(tPrev);
valInfo.set(buff.toString());
context.write(keyInfo,valInfo);
tPrev = current;
buff = new StringBuffer();
for (Text val : values) {
buff.append(tokens[1] + ":" + val.toString() + ";");
}
}
} public void cleanup(Context context) throws IOException, InterruptedException{
keyInfo.set(tPrev);
valInfo.set(buff.toString());
context.write(keyInfo,valInfo);
super.cleanup(context);
} } public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Job job = new Job(conf, "InvertedIndex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(InvertedIndexPartioner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} }
上一篇:把git上的larave项目通过SourceTree安装上再通过composer安装依赖库


下一篇:AJAX中使用post,get接收发送数据的区别