Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629
Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927
Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955
Hadoop读书笔记(四)HDFS体系结构 :http://blog.csdn.net/caicongyang/article/details/41322649
Hadoop读书笔记(五)MapReduce统计单词demo:http://blog.csdn.net/caicongyang/article/details/41453579
Hadoop读书笔记(六)MapReduce自定义数据类型demo:http://blog.csdn.net/caicongyang/article/details/41490379
1.说明
功能和上篇一样实现手机流量的统计(ps:可以与前面文章代码做对比)
2.代码:
KpiApp.java
package old; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.HashPartitioner; /** * * <p> * Title: KpiApp.java * Package old * </p> * <p> * Description: hadoop版本1.x的包一般是mapreduce * hadoop版本0.x的包一般是mapred * <p> * @author Tom.Cai * @created 2014-11-25 下午10:23:47 * @version V1.0 * */ public class KpiApp { private static final String INPUT_PATH = "hdfs://192.168.80.100:9000/wlan"; private static final String OUT_PATH = "hdfs://192.168.80.100:9000/wlan_out"; /** * 改动: * 1.不再使用Job,而是使用JobConf * 2.类的包名不再使用mapreduce,而是使用mapred * 3.不再使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(job); * */ public static void main(String[] args) throws Exception { FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), new Configuration()); Path outPath = new Path(OUT_PATH); if (fileSystem.exists(outPath)) { fileSystem.delete(outPath, true); } JobConf job = new JobConf(new Configuration(), KpiApp.class); FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormat(TextInputFormat.class); job.setMapperClass(KpiMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(KpiWite.class); job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); job.setReducerClass(KpiReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWite.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.setOutputFormat(TextOutputFormat.class); JobClient.runJob(job); } /** * 新api:extends Mapper * 老api:extends MapRedcueBase implements Mapper */ static class KpiMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, KpiWite> { @Override public void map(LongWritable key, Text value, OutputCollector<Text, KpiWite> out, Reporter arg3) throws IOException { String[] splited = value.toString().split("\t"); String num = splited[1]; KpiWite kpi = new KpiWite(splited[6], splited[7], splited[8], splited[9]); out.collect(new Text(num), kpi); } } static class KpiReducer extends MapReduceBase implements Reducer<Text, KpiWite, Text, KpiWite> { @Override public void reduce(Text key, Iterator<KpiWite> value, OutputCollector<Text, KpiWite> out, Reporter arg3) throws IOException { long upPackNum = 0L; long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; while (value.hasNext()) { upPackNum += value.next().upPackNum; downPackNum += value.next().downPackNum; upPayLoad += value.next().upPayLoad; downPayLoad += value.next().downPayLoad; } out.collect(key, new KpiWite(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad))); } } } class KpiWite implements Writable { long upPackNum; long downPackNum; long upPayLoad; long downPayLoad; public KpiWite() { } public KpiWite(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) { this.upPackNum = Long.parseLong(upPackNum); this.downPackNum = Long.parseLong(downPackNum); this.upPayLoad = Long.parseLong(upPayLoad); this.downPayLoad = Long.parseLong(downPayLoad); } @Override public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(upPackNum); out.writeLong(downPackNum); out.writeLong(upPayLoad); out.writeLong(downPayLoad); } }
欢迎大家一起讨论学习!
有用的自己收!
记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang