Hadoop MapReduce编程 API入门系列之分区和合并(十四)

  不多说,直接上代码。

Hadoop MapReduce编程 API入门系列之分区和合并(十四)

Hadoop MapReduce编程 API入门系列之分区和合并(十四)

Hadoop MapReduce编程 API入门系列之分区和合并(十四)

Hadoop MapReduce编程 API入门系列之分区和合并(十四)

Hadoop MapReduce编程 API入门系列之分区和合并(十四)

Hadoop MapReduce编程 API入门系列之分区和合并(十四)

代码

 package zhouls.bigdata.myMapReduce.Star;

 import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @function 统计分别统计出男女明星最大搜索指数
* @author 小讲
*/ /*
姓名 性别 搜索指数
李易峰 male 32670
朴信惠 female 13309
林心如 female 5242
黄海波 male 5505
成龙 male 7757
刘亦菲 female 14830
angelababy female 55083
王宝强 male 9472
郑爽 female 9279
周杰伦 male 42020
莫小棋 female 13978
朱一龙 male 10524
宋智孝 female 12494
吴京 male 6684
赵丽颖 female 24174
尹恩惠 female 5985
李金铭 female 5925
关之琳 female 7668
邓超 male 11532
钟汉良 male 8289
周润发 male 4808
甄子丹 male 5479
林妙可 female 5306
柳岩 female 8221
蔡琳 female 7320
张佳宁 female 6628
裴涩琪 female 5658
李晨 male 9559
周星驰 male 11483
杨紫 female 11094
全智贤 female 5336
张柏芝 female 9337
孙俪 female 7295
鲍蕾 female 5375
杨幂 female 20238
刘德华 male 19786
柯震东 male 6398
张国荣 male 5013
王阳 male 5169
李小龙 male 6859
林志颖 male 4512
林正英 male 5832
吴秀波 male 5668
陈伟霆 male 12817
陈奕迅 male 10472
赵又廷 male 5190
张馨予 female 35062
陈晓 male 17901
赵韩樱子 female 7077
乔振宇 male 8877
宋慧乔 female 5708
韩艺瑟 female 5426
张翰 male 7012
谢霆锋 male 6654
刘晓庆 female 5553
陈翔 male 7999
陈学冬 male 8829
秋瓷炫 female 6504
王祖蓝 male 6662
吴亦凡 male 16472
陈妍希 female 32590
倪妮 female 9278
高梓淇 male 7101
赵奕欢 female 7197
赵本山 male 12655
高圆圆 female 13688
陈赫 male 6820
鹿晗 male 32492
贾玲 female 5304
宋佳 female 6202
郭碧婷 female 5295
唐嫣 female 12055
杨蓉 female 10512
李钟硕 male 26278
郑秀晶 female 10479
熊黛林 female 26732
金秀贤 male 11370
古天乐 male 4954
黄晓明 male 10964
李敏镐 male 10512
王丽坤 female 5501
谢依霖 female 7000
陈冠希 male 9135
范冰冰 female 13734
姚笛 female 6953
彭于晏 male 14136
张学友 male 4578
谢娜 female 6886
胡歌 male 8015
古力娜扎 female 8858
黄渤 male 7825
周韦彤 female 7677
刘诗诗 female 16548
郭德纲 male 10307
郑恺 male 21145
赵薇 female 5339
李连杰 male 4621
宋茜 female 11164
任重 male 8383
李若彤 female 9968 得到:
angelababy female 55083
周杰伦 male 42020
*/
public class Star extends Configured implements Tool{
/**
* @function Mapper 解析明星数据
* @input key=偏移量 value=明星数据
* @output key=gender value=name+hotIndex
*/
public static class ActorMapper extends Mapper<Object,Text,Text,Text>{
//在这个例子里,第一个参数Object是Hadoop根据默认值生成的,一般是文件块里的一行文字的行偏移数,这些偏移数不重要,在处理时候一般用不上
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
//拿:周杰伦 male 42020
//value=name+gender+hotIndex
String[] tokens = value.toString().split("\t");//使用分隔符\t,将数据解析为数组 tokens
String gender = tokens[1].trim();//性别,trim()是去除两边空格的方法
//tokens[0] tokens[1] tokens[2]
//周杰伦 male 42020
String nameHotIndex = tokens[0] + "\t" + tokens[2];//名称和关注指数
//输出key=gender value=name+hotIndex
context.write(new Text(gender), new Text(nameHotIndex));//写入gender是k2,nameHotIndex是v2
// context.write(gender,nameHotIndex);等价
//将gender和nameHotIndex写入到context中
}
} /**
* @function Partitioner 根据sex选择分区
*/
public static class ActorPartitioner extends Partitioner<Text, Text>{
@Override
public int getPartition(Text key, Text value, int numReduceTasks){
String sex = key.toString();//按性别分区 // 默认指定分区 0
if(numReduceTasks==0)
return 0; //性别为male 选择分区0
if(sex.equals("male"))
return 0;
//性别为female 选择分区1
if(sex.equals("female"))
return 1 % numReduceTasks;
//其他性别 选择分区2
else
return 2 % numReduceTasks; }
} /**
* @function 定义Combiner 合并 Mapper 输出结果
*/
public static class ActorCombiner extends Reducer<Text, Text, Text, Text>{
private Text text = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
int maxHotIndex = Integer.MIN_VALUE;
int hotIndex = 0;
String name="";
for (Text val : values){//星型for循环,即把values的值传给Text val
String[] valTokens = val.toString().split("\\t");
hotIndex = Integer.parseInt(valTokens[1]);
if(hotIndex>maxHotIndex){
name = valTokens[0];
maxHotIndex = hotIndex;
}
}
text.set(name+"\t"+maxHotIndex);
context.write(key, text);
}
} /**
* @function Reducer 统计男、女明星最高搜索指数
* @input key=gender value=name+hotIndex
* @output key=name value=gender+hotIndex(max)
*/
public static class ActorReducer extends Reducer<Text,Text,Text,Text>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
int maxHotIndex = Integer.MIN_VALUE; String name = " ";
int hotIndex = 0;
// 根据key,迭代 values 集合,求出最高搜索指数
for (Text val : values){//星型for循环,即把values的值传给Text val
String[] valTokens = val.toString().split("\\t");
hotIndex = Integer.parseInt(valTokens[1]);
if (hotIndex > maxHotIndex){
name = valTokens[0];
maxHotIndex = hotIndex;
}
}
context.write(new Text(name), new Text(key + "\t"+ maxHotIndex));//写入name是k3,key + "\t"+ maxHotIndex是v3
// context.write(name,key + "\t"+ maxHotIndex);//等价
}
} /**
* @function 任务驱动方法
* @param args
* @return
* @throws Exception
*/ public int run(String[] args) throws Exception{
// TODO Auto-generated method stub Configuration conf = new Configuration();//读取配置文件,比如core-site.xml等等
Path mypath = new Path(args[1]);//Path对象mypath
FileSystem hdfs = mypath.getFileSystem(conf);//FileSystem对象hdfs
if (hdfs.isDirectory(mypath)){
hdfs.delete(mypath, true);
} Job job = new Job(conf, "star");//新建一个任务
job.setJarByClass(Star.class);//主类 job.setNumReduceTasks(2);//reduce的个数设置为2
job.setPartitionerClass(ActorPartitioner.class);//设置Partitioner类 job.setMapperClass(ActorMapper.class);//Mapper
job.setMapOutputKeyClass(Text.class);//map 输出key类型
job.setMapOutputValueClass(Text.class);//map 输出value类型 job.setCombinerClass(ActorCombiner.class);//设置Combiner类 job.setReducerClass(ActorReducer.class);//Reducer
job.setOutputKeyClass(Text.class);//输出结果 key类型
job.setOutputValueClass(Text.class);//输出结果 value类型 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
job.waitForCompletion(true);//提交任务
return 0;
} /**
* @function main 方法
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
// String[] args0 = { "hdfs://HadoopMaster:9000/star/star.txt",
// "hdfs://HadoopMaster:9000/out/star/" };
String[] args0 = { "./data/star/star.txt",
"./out/star" }; int ec = ToolRunner.run(new Configuration(), new Star(), args0);
System.exit(ec);
}
}
上一篇:Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(十)


下一篇:AWS 大数据实战 Lab3 - 数据可视化(四)