Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

  不多说,直接上干货!

   下面,是版本1。

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一)

    下面是版本2。

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

这篇博客,给大家,体会不一样的版本编程。

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

代码

package zhouls.bigdata.myMapReduce.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class MyKey implements WritableComparable<MyKey>{
//WritableComparable,实现这个方法,要多很多
//readFields是读入,write是写出
private int year;
private int month;
private double hot;
public int getYear() {
return year;
} public void setYear(int year) {
this.year = year;
} public int getMonth() {
return month;
} public void setMonth(int month) {
this.month = month;
} public double getHot() {
return hot;
} public void setHot(double hot) {
this.hot = hot;
}//这一大段的get和set,可以右键,source,产生get和set,自动生成。 public void readFields(DataInput arg0) throws IOException { //反序列化
this.year=arg0.readInt();
this.month=arg0.readInt();
this.hot=arg0.readDouble();
} public void write(DataOutput arg0) throws IOException { //序列化
arg0.writeInt(year);
arg0.writeInt(month);
arg0.writeDouble(hot);
} //判断对象是否是同一个对象,当该对象作为输出的key
public int compareTo(MyKey o) {
int r1 =Integer.compare(this.year, o.getYear());//比较当前的年和你传过来的年
if(r1==){
int r2 =Integer.compare(this.month, o.getMonth());
if(r2==){
return Double.compare(this.hot, o.getHot());
}else{
return r2;
}
}else{
return r1;
}
} }
package zhouls.bigdata.myMapReduce.weather;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{//这里就是洗牌 //执行时间越短越好
public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
return (key.getYear()-)%numReduceTasks;//对于一个数据集,找到最小,1949
}
} //1949-10-01 14:21:02 34c
//1949-10-02 14:01:02 36c
//1950-01-01 11:21:02 32c
//1950-10-01 12:21:02 37c
//1951-12-01 12:21:02 23c
//1950-10-02 12:21:02 41c
//1950-10-03 12:21:02 27c
//1951-07-01 12:21:02 45c
//1951-07-02 12:21:02 46c
//1951-07-03 12:21:03 47c
package zhouls.bigdata.myMapReduce.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator; public class MySort extends WritableComparator{ public MySort(){
super(MyKey.class,true);//把MyKey传进了
} public int compare(WritableComparable a, WritableComparable b) {//这是排序的精髓
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==){//年相同
int r2 =Integer.compare(k1.getMonth(), k2.getMonth());
if(r2==){//月相同
return -Double.compare(k1.getHot(), k2.getHot());//比较气温
}else{
return r2;
}
}else{
return r1;
} }
}
package zhouls.bigdata.myMapReduce.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator; public class MyGroup extends WritableComparator{ public MyGroup(){
super(MyKey.class,true);//把MyKey传进了
} public int compare(WritableComparable a, WritableComparable b) {//这是分组的精髓
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==){
return Integer.compare(k1.getMonth(), k2.getMonth());
}else{
return r1;
} }
}
package zhouls.bigdata.myMapReduce.weather;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob { // 1949-10-01 14:21:02 34c WeatherMapper
// 1949-10-02 14:01:02 36c
// 1950-01-01 11:21:02 32c 分区在MyPartitioner.java
// 1950-10-01 12:21:02 37c
// 1951-12-01 12:21:02 23c 排序在MySort.java
// 1950-10-02 12:21:02 41c
// 1950-10-03 12:21:02 27c 分组在MyGroup.java
// 1951-07-01 12:21:02 45c
// 1951-07-02 12:21:02 46c 再,WeatherReducer
// 1951-07-03 12:21:03 47c //key:每行第一个隔开符(制表符)左边为key,右边为value 自定义类型MyKey,洗牌,
static class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
NullWritable v =NullWritable.get();
// 1949-10-01 14:21:02是自定义类型MyKey,即key
// 34c是DoubleWritable,即value protected void map(Text key, Text value,Context context) throws IOException, InterruptedException {
try {
Date date =sdf.parse(key.toString());
Calendar c =Calendar.getInstance();
//Calendar 类是一个抽象类,可以通过调用 getInstance() 静态方法获取一个 Calendar 对象,
//此对象已由当前日期时间初始化,即默认代表当前时间,如 Calendar c = Calendar.getInstance();
c.setTime(date);
int year =c.get(Calendar.YEAR);
int month =c.get(Calendar.MONTH); double hot =Double.parseDouble(value.toString().substring(, value.toString().lastIndexOf("c")));
MyKey k =new MyKey();
k.setYear(year);
k.setMonth(month);
k.setHot(hot);
context.write(k, new DoubleWritable(hot));
} catch (Exception e) {
e.printStackTrace();
}
}
} static class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,Context arg2)throws IOException, InterruptedException {
int i=;
for(DoubleWritable v :arg1){
i++;
String msg =arg0.getYear()+"\t"+arg0.getMonth()+"\t"+v.get();//"\t"是制表符
arg2.write(new Text(msg), NullWritable.get());
if(i==){
break;
}
}
}
} public static void main(String[] args) {
Configuration config =new Configuration();
// config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
// config.set("yarn.resourcemanager.hostname", "HadoopMaster");
// config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
// config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如","
try {
FileSystem fs =FileSystem.get(config); Job job =Job.getInstance(config);
job.setJarByClass(RunJob.class); job.setJobName("weather"); job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(DoubleWritable.class); job.setPartitionerClass(MyPartitioner.class);
job.setSortComparatorClass(MySort.class);
job.setGroupingComparatorClass(MyGroup.class); job.setNumReduceTasks(); job.setInputFormatClass(KeyValueTextInputFormat.class); // FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/weather.txt"));//输入路径,下有weather.txt
//
// Path outpath =new Path("hdfs://HadoopMaster:9000/out/weather"); FileInputFormat.addInputPath(job, new Path("./data/weather.txt"));//输入路径,下有weather.txt Path outpath =new Path("./out/weather"); if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath); boolean f= job.waitForCompletion(true);
if(f){
}
} catch (Exception e) {
e.printStackTrace();
}
} }
欢迎大家,加入我的微信公众号:大数据躺过的坑
 
 

同时,大家可以关注我的个人博客

 http://www.cnblogs.com/zlslch/   和  http://www.cnblogs.com/lchzls/ 

以及对应本平台的QQ群:161156071(大数据躺过的坑)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

上一篇:AWS 大数据实战 Lab3 - 数据可视化(四)


下一篇:Hadoop HDFS编程 API入门系列之从本地上传文件到HDFS(一)