mapReducer第一个例子WordCount

mapreducer第一个例子,主要是统计一个目录下各个文件中各个单词出现的次数。

mapper

package com.mapreduce.wordCount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; /*
* TextInputFormat中的recorder 每次读取 一个分片中的 一行文本
* 所以map 函数每次读取一行。规定:
* 输入:key: 行偏移量 value:一行的文本
* 输出: key: 一个词 value: 1
*
* map 做个映射。
*/ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text keyOut = new Text();
IntWritable valueOut = new IntWritable(); protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException { String line = value.toString();
String[] worlds = line.split(" ");
for( String w:worlds){
keyOut.set(w);
valueOut.set(1);
context.write(keyOut,valueOut);
}
} }

reudcer

package com.mapreduce.wordCount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* 输入: 对应maper 的输出 [key: values] {"love":[1,1,1,1,1,1]}
* 输出: 词和每个词的出现次数。
* 中间shuffle 阶段自动排序分区。 因为没有分区,所以输出到一个文件中 // 所以结果文件是按 key 排序的。
*
*/
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ protected void reduce(Text key, Iterable<IntWritable> value,Context context)
throws IOException, InterruptedException {
int count = ;
for( IntWritable v:value){
count += v.get();
}
context.write(key, new IntWritable(count)); }
}

job 驱动

package com.mapreduce.wordCount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCountDemo { public static void main(String[] args) throws Exception { // 1 获取configuration
Configuration configuration = new Configuration(); // 2 job Job job = Job.getInstance(configuration); // 3 作业jar包 job.setJarByClass(WordCountDemo.class); // 4 map, reduce jar 包
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordReducer.class);
// 5 map 输出类型 job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class); // 6 最终 输出类型 (reducer) job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); // 7 inputformatclass , outputformatclass 输入输出入文件类型 可能决定分片信息 job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class); // 8 输入输出文件路径 FileInputFormat.setInputPaths(job, new Path("d:/input"));
FileOutputFormat.setOutputPath(job, new Path("d:/output")); // 9 job提交 job.waitForCompletion(true); } }
上一篇:pinpoint客户端配置


下一篇:Parallel Python——一个简单的分布式计算系统