代码如下:
package cn.toto.bigdata.mr.index; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class IndexCreateStepOne { public static class IndexCreateMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); FileSplit inputSplit = (FileSplit) context.getInputSplit(); //获取到word(单词)所在的文件的名称 String fileName = inputSplit.getPath().getName(); //最终输出的格式效果如: key:单词---文件名 value:1 for(String word : words) { k.set(word + "--" + fileName); context.write(k, v); } } } public static class IndexCreateReducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } v.set(count); context.write(key, v); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //告诉框架,我们的程序所在jar包的路径 // job.setJar("c:/wordcount.jar"); job.setJarByClass(IndexCreateStepOne.class); //告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(IndexCreateMapper.class); job.setReducerClass(IndexCreateReducer.class); job.setCombinerClass(IndexCreateReducer.class); //告诉框架,我们的mapperreducer输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("E:/wordcount/inverindexinput/")); //告诉框架,我们的处理结果要输出到哪里 FileOutputFormat.setOutputPath(job, new Path("E:/wordcount/index-1/")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }准备条件
1、要处理的数据文件
b.txt的内容如下:
其它的c.txt,d.txt和上面的类似
运行后的结果如下:
这样,可以列出各各单词在每个文件中的数量了
接着,做如下的功能:单词作为key,在文件和文件中的个数的数值作为value,然后去做统计,实例代码如下:
package cn.toto.bigdata.mr.index; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.mockito.internal.stubbing.StubbedInvocationMatcher; import io.netty.handler.codec.http.HttpHeaders.Values; public class IndexCreateStepTwo { public static class IndexCreateStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> { Text k = new Text(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String word_file = fields[0]; String count = fields[1]; String[] split = word_file.split("--"); String word = split[0]; String file = split[1]; k.set(word); v.set(file + "--" + count); context.write(k, v); } } public static class IndexCreateStepTwoReducer extends Reducer<Text, Text, Text, Text> { Text v = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text value : values) { sb.append(value.toString()).append(" "); } v.set(sb.toString()); context.write(key, v); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //告诉框架,我们的程序所在jar包的路径 // job.setJar("c:/wordcount.jar"); job.setJarByClass(IndexCreateStepTwo.class); //告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(IndexCreateStepTwoMapper.class); job.setReducerClass(IndexCreateStepTwoReducer.class); job.setCombinerClass(IndexCreateStepTwoReducer.class); //告诉框架,我们的mapperreducer输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("E:/wordcount/index-1/")); //告诉框架,我们的处理结果要输出到哪里去 FileOutputFormat.setOutputPath(job, new Path("E:/wordcount/index-2/")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
程序运行的结果如下: