环境:
Hadoop1.x,CentOS6.5,三台虚拟机搭建的模拟分布式环境
数据:任意数量、格式的文本文件(我用的四个.java代码文件)
方案目标:
根据提供的文本文件,提取出每个单词在哪个文件中出现了几次,组成倒排索引,格式如下
Ant FaultyWordCount.java : 1 , WordCount.java : 1
思路:
因为这个程序需要用到三个变量:单词、文件名、出现的频率,因此需要自定义Writable类,以单词为key,将文件名和出现的频率打包。
1.先将每行文本的单词进行分割,以K/V=Word/Filename:1的格式分割。
2.利用Combiner类,将本Map一个文件的先进行一次计数,减少传输量
3.在Reduce中对Combiner中传输过来的同一个单词的在不同文件出现的频率数据进行组合。
难点:这个程序主要是用到了一个Combiner和自定义了Writable类。在实现的时候,需要注意的是Writable必须默认无参构造函数。
主调用Main类:
package ren.snail; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class Main extends Configured implements Tool { public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
int result = ToolRunner.run(new Configuration(), new Main(), args);
System.exit(result);
} @Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration configuration = getConf();
Job job = new Job(configuration, "InvertIndex");
job.setJarByClass(Main.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.setMapperClass(InvertMapper.class);
job.setCombinerClass(Combinner.class); //设置Combiner类
job.setReducerClass(InvertReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FileFreqWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
} }
自定义Writbale类
package ren.snail; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; public class FileFreqWritable implements Writable {
Text documentID;
IntWritable fequence; public FileFreqWritable() //必须提供无参构造函数
{
this.documentID = new Text();
this.fequence = new IntWritable();
}
public FileFreqWritable(Text id,IntWritable feq) {
// TODO Auto-generated constructor stub
this.documentID=id;
this.fequence =feq;
} public void set(String id,int feq)
{
this.documentID.set(id);
this.fequence.set(feq);
} @Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
documentID.readFields(in);
fequence.readFields(in); } @Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
documentID.write(out);
fequence.write(out);
} public Text getDocumentID() {
return documentID;
} public String toString()
{
return documentID.toString()+" : "+fequence.get();
}
public IntWritable getFequence() {
return fequence;
} }
Map
package ren.snail; import java.io.IOException; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class InvertMapper extends Mapper<LongWritable, Text, Text, FileFreqWritable>{
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String data = value.toString().replaceAll("[^a-zA-Z0-9]+", " "); //将不需要的其他字符都设为空
String[] values = data.split(" ");
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String filename = fileSplit.getPath().getName();
for (String temp : values) {
FileFreqWritable obj = new FileFreqWritable(new Text(filename),new IntWritable(1));
context.write(new Text(temp), obj);
} }
}
Combiner
package ren.snail; import java.io.IOException; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class Combinner extends Reducer<Text, FileFreqWritable, Text, FileFreqWritable>{
public void reduce(Text key,Iterable<FileFreqWritable> values,Context context) throws IOException,InterruptedException
{
int count = 0 ;
String id = "";
for (FileFreqWritable temp : values) {
count++;
if(count == 1)
{
id=temp.getDocumentID().toString();
}
}
context.write(key,new FileFreqWritable(new Text(id), new IntWritable(count)));
}
}
Reduce
package ren.snail; import java.io.IOException; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class InvertReducer extends Reducer<Text, FileFreqWritable, Text, Text> { public void reduce(Text key,Iterable<FileFreqWritable> values,Context context) throws IOException,InterruptedException {
StringBuilder value = new StringBuilder();
for (FileFreqWritable fileFreqWritable : values) {
String temp = fileFreqWritable.toString();
value.append(temp+" , ");
}
context.write(key,new Text(value.toString()));
}
}
其实我的Reduce实现思路可能有点问题,不过大致是这样