import java.io.*;
import java.util.*; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
public class Score_Process extends Configured implements Tool {
/**
* 程序说明:主要用来实现计算学生的平均成绩。
* 数据输入:文件形式输入,每一行包含学生姓名 学生成绩。一个学生有多门成绩则有多行。例如:张三 98
* 数据输出:张三 84 学生姓名 学生平均成绩
* 实现思路:在map阶段<张三,(98,68,……)>
* **/ public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String line=value.toString();
System.out.println(line);//测试
StringTokenizer tokenizer=new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
String name=tokenizer.nextToken();
String strscore=tokenizer.nextToken();
int intscore=Integer.parseInt(strscore);
context.write(new Text(name), new IntWritable(intscore));
} } } public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{
int sun=0,count=0;
for(IntWritable val:values){
sun+=val.get();
count++;
}
int averscore=(int)sun/count;
context.write(key, new IntWritable(averscore));
} } public int run(String[] args) throws Exception{
Job job=new Job(getConf());
job.setJarByClass(Score_Process.class);
job.setJobName("Score_Process"); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class);
//job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success=job.waitForCompletion(true); return success?0:1;
} public static void main(String[] args)throws Exception{
int ret=ToolRunner.run(new Score_Process(), args);
System.exit(ret);
} }