一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

一:序列化概念

序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Java序列化(java.io.Serializable)

二:Hadoop序列化的特点

(1):序列化格式特点:
  紧凑:高效使用存储空间。
  快速:读写数据的额外开销小。
  可扩展:可透明地读取老格式的数据。
  互操作:支持多语言的交互。

(2):Hadoop的序列化格式:Writable接口

三:Hadoop序列化的作用:

(1):序列化在分布式环境的两大作用:进程间通信,永久存储。
(2):Hadoop节点间通信。

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

四:Writable接口(实现序列化的类实现这个接口)

(1)Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.

(2)MapReduce的任意Key和Value必须实现Writable接口.

(3)MapReduce的任意key必须实现WritableComparable接口.


1:创建一个FlowBean的实体类,实现序列化操作:

 package com.flowSum;

 import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.Writable; /***
*
* @author Administrator
* 1:write 是把每个对象序列化到输出流
* 2:readFields是把输入流字节反序列化
* 3:实现WritableComparable
* Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
*
*/
public class FlowBean implements Writable{ private String phoneNumber;//电话号码
private long upFlow;//上行流量
private long downFlow;//下行流量
private long sumFlow;//总流量 public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
} //为了对象数据的初始化方便,加入一个带参的构造函数
public FlowBean(String phoneNumber, long upFlow, long downFlow) {
this.phoneNumber = phoneNumber;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
public FlowBean() {
} //重写toString()方法
@Override
public String toString() {
return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
} //从数据流中反序列出对象的数据
//从数据流中读取字段时必须和序列化的顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {
phoneNumber = in.readUTF();
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong(); } //将对象数据序列化到流中
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNumber);
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow); } }

创建FlowSumMapper的类实现Mapper这个类:

 package com.flowSum;

 import java.io.IOException;

 import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/***
*
* @author Administrator
* 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
* 所以就必须实现hadoop的相应的序列化接口
* 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
*/
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
//封装成key-value发送出去 @Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//拿到一行数据
String line = value.toString();
//切分成各个字段
String[] fields = StringUtils.split(line,"/t");
//拿到手机号的字段
String phoneNumber = fields[];
//拿到上行流量字段
long up_flow = Long.parseLong(fields[]);
//拿到下行流量字段
long down_flow = Long.parseLong(fields[]); //最后一步,封装数据为key-value进行输出
context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); } }

创建FlowSumReducer类继承Reducer类:

 package com.flowSum;

 import java.io.IOException;

 import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ //框架每传递一组数据<手机号,{flowbean,flowbean,flowbean...}>调用一次我们的reduce方法
//reduce中的业务逻辑就是遍历values,然后累加求和再输出
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
//上行流量计数器和下行流量计数器
long up_flow_counter = ;
long down_flow_counter = ; //上行流量和下行流量累加求和
for(FlowBean bean : values){
up_flow_counter += bean.getUpFlow();
down_flow_counter += bean.getDownFlow();
} //将结果输出
context.write(key, new FlowBean(key.toString(), up_flow_counter, down_flow_counter)); } }

创建FlowSumRunner 类继承Configured实现Tool,规范性操作(Job描述和提交类的规范写法):

 package com.flowSum;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
/***
*
* @author Administrator
* 1:Job描述和提交类的规范写法
*/
public class FlowSumRunner extends Configured implements Tool{ @Override
public int run(String[] args) throws Exception {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf); //设置整个job所用的那些类在哪个jar包
job.setJarByClass(FlowSumRunner.class); //本job使用的mapper和reducer的类
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class); //指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class); //指定reduce的输出数据key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class); //指定要处理的输入数据存放路径
//FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
//FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
//至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
FileInputFormat.setInputPaths(job, new Path(args[0])); //指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job提交给集群运行
//job.waitForCompletion(true);
//正常执行成功返回0,否则返回1
return job.waitForCompletion(true) ? : ;
} public static void main(String[] args) throws Exception {
//规范性调用
int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
//执行结束退出
System.exit(res);
} }

然后打包上传到虚拟机上面,还有模拟数据,过程省略,贴出模拟数据:

         -FD--A4--B8:CMCC    120.196.100.82    i02.c.aliimg.com
5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4
--7A--CC-0A:CMCC 120.196.100.99
5C-0E-8B-8B-B1-:CMCC 120.197.40.4
--AC-CD-E6-:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站
5C-0E-8B-8C-E8-:7DaysInn 120.197.40.4 122.72.52.12
C4--FE-BA-DE-D9:CMCC 120.196.100.99
5C-0E-8B-C7-BA-:CMCC 120.197.40.4 sug.so..cn 信息安全
-A1-B7---B1:CMCC-EASY 120.196.100.82
5C-0E-8B--5C-:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计
5C-0E-8B-C7-F7-:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎
E8--C4-4E--E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计
C4--FE-BA-DE-D9:CMCC 120.196.100.99
5C-0E-8B-C7-FC-:CMCC-EASY 120.197.40.4
5C-0E-8B-8B-B6-:CMCC 120.197.40.4 .flash2-http.qq.com 综合门户
-FD--A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn
5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户
--DB-4F--1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎
-1F--E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎
-FD--A4-7B-:CMCC 120.196.100.82
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
C4--FE-BA-DE-D9:CMCC 120.196.100.99

可以看到打的包和模拟数据已经上传到虚拟机上:

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

然后将数据上传到hdfs集群(这里是伪分布式集群)上面:

现在集群上面创建一个空白的文件夹flow,然后在文件夹里面创建一个data文件夹存放数据,最后将数据存放到data文件夹里面:

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

然后执行程序,由于是需要传入参数的,所以注意最后两个是参数:

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

然后就报了一个这样子的错,我也是一脸懵逼:

Error: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
    at java.lang.Class.asSubclass(Class.java:3165)
    at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:884)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:981)
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:391)
    at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:80)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:675)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

然后根据你现在学的知识肯定已经被别人学过的理论,and一定有好心的大神会贴出来错误的心态百度一下,然后解决问题:

原来是Text的包导错了(还是小心点好。不然够喝一壶的了)

不是:import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

而是:import org.apache.hadoop.io.Text;

然后打包上传到虚拟机上面运行,然后你会发现这个错误:

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/flow/output already exists
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
    at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
    at com.flowSum.FlowSumRunner.run(FlowSumRunner.java:55)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.flowSum.FlowSumRunner.main(FlowSumRunner.java:60)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

然后你把这个/flow/output的这个output文件夹删除了,因为输出文件夹是程序自动创建的:

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

最后运行程序(由于是需要传入参数的,所以注意最后两个是参数):

然后就报数据越界的异常,我想可能是测试数据不干净:

Error: java.lang.ArrayIndexOutOfBoundsException: 1
    at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:29)
    at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162

然后手动造了一份数据,如下所示:

(好吧,后来测试上面的测试数据又可以运行了,总之多测试几遍吧,都是坑!!!)

            -FD--A4--B8:CMCC      120.196.100.82      i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com

最后将String[] fields = StringUtils.split(line, "\t");修改为了27 String[] fields = StringUtils.split(line, " ");

(后来测试了一下,String[] fields = StringUtils.split(line, "\t");也可以,开始以为空格的大小也影响测试数据呢,代码没问题,就是测试数据的问题。)

 package com.flowSum;

 import java.io.IOException;

 import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/***
*
* @author Administrator
* 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
* 所以就必须实现hadoop的相应的序列化接口
* 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
*/
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
//封装成key-value发送出去 @Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//拿到一行数据
String line = value.toString();
//切分成各个字段
String[] fields = StringUtils.split(line, " ");
//拿到手机号的字段
String phoneNumber = fields[];
//拿到上行流量字段
long up_flow = Long.parseLong(fields[]);
//拿到下行流量字段
long down_flow = Long.parseLong(fields[]); //最后一步,封装数据为key-value进行输出
context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); } }

打包上传到虚拟机上面,然后运行(正常运行结果如下所示):

[root@master hadoop]# hadoop jar flow.jar com.flowSum.FlowSumRunner /flow/data /flow/output
17/09/20 09:35:26 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/20 09:35:26 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/20 09:35:27 INFO input.FileInputFormat: Total input paths to process : 1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: number of splits:1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505814887677_0007
17/09/20 09:35:27 INFO impl.YarnClientImpl: Submitted application application_1505814887677_0007
17/09/20 09:35:27 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505814887677_0007/
17/09/20 09:35:27 INFO mapreduce.Job: Running job: job_1505814887677_0007
17/09/20 09:35:33 INFO mapreduce.Job: Job job_1505814887677_0007 running in uber mode : false
17/09/20 09:35:33 INFO mapreduce.Job:  map 0% reduce 0%
17/09/20 09:35:37 INFO mapreduce.Job:  map 100% reduce 0%
17/09/20 09:35:43 INFO mapreduce.Job:  map 100% reduce 100%
17/09/20 09:35:43 INFO mapreduce.Job: Job job_1505814887677_0007 completed successfully
17/09/20 09:35:43 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1179
        FILE: Number of bytes written=187971
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=2467
        HDFS: Number of bytes written=279
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=2691
        Total time spent by all reduces in occupied slots (ms)=2582
        Total time spent by all map tasks (ms)=2691
        Total time spent by all reduce tasks (ms)=2582
        Total vcore-seconds taken by all map tasks=2691
        Total vcore-seconds taken by all reduce tasks=2582
        Total megabyte-seconds taken by all map tasks=2755584
        Total megabyte-seconds taken by all reduce tasks=2643968
    Map-Reduce Framework
        Map input records=23
        Map output records=23
        Map output bytes=1127
        Map output materialized bytes=1179
        Input split bytes=93
        Combine input records=0
        Combine output records=0
        Reduce input groups=10
        Reduce shuffle bytes=1179
        Reduce input records=23
        Reduce output records=10
        Spilled Records=46
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=126
        CPU time spent (ms)=1240
        Physical memory (bytes) snapshot=218099712
        Virtual memory (bytes) snapshot=726839296
        Total committed heap usage (bytes)=137433088
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=2374
    File Output Format Counters
        Bytes Written=279
[root@master hadoop]#

查看输出结果如下所示:

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

总之吧,学习新知识,难免各种错误,静下心去解决吧。


2:流量求和统计排序案例实践:

将Mapper类和Reducer类都写成静态内部类(又遇到上面比较骚气的问题了String[] fields = StringUtils.split(line, "\t");就是跑步起来,各种报数组越界异常,郁闷,换成了String[] fields = StringUtils.split(line, " ");就跑起来了,真是一脸懵逼);

 package com.flowSort;

 import java.io.IOException;

 import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowSortMapReduce { /***
* mapper静态内部类
* @author Administrator
*
*/
public static class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//获取到一行数据
String line = value.toString();
//对这一行数据进行截取
String[] fields = StringUtils.split(line, ""); //获取数据里面的数据
String phoneNumber = fields[];
long up_flow = Long.parseLong(fields[]);
long down_flow = Long.parseLong(fields[]); //将数据进行封装传递给reduce
context.write(new FlowBean(phoneNumber, up_flow, down_flow), NullWritable.get());
} } /***
* reducer的静态内部类
* @author Administrator
*
*/
public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ @Override
protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException { String phoneNumber = key.getPhoneNumber();
context.write(new Text(phoneNumber), key);
}
} /***
* 主方法
* @param args
* @throws InterruptedException
* @throws IOException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf); //设置整个job所用的那些类在哪个jar包
job.setJarByClass(FlowSortMapReduce.class); //本job使用的mapper和reducer的类
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class); //指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class); //指定reduce的输出数据key-value类型Text
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class); //指定要处理的输入数据存放路径
//FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
//FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
//至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
FileInputFormat.setInputPaths(job, new Path(args[])); //指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path(args[])); //将job提交给集群运行
//job.waitForCompletion(true);
//正常执行成功返回0,否则返回1
System.exit(job.waitForCompletion(true) ? : );
} }

实体类改造,进行总流量排序处理:

 package com.flowSort;

 import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable; /***
*
* @author Administrator
* 1:write 是把每个对象序列化到输出流
* 2:readFields是把输入流字节反序列化
* 3:实现WritableComparable
* Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
*
*/
public class FlowBean implements WritableComparable<FlowBean>{ private String phoneNumber;//电话号码
private long upFlow;//上行流量
private long downFlow;//下行流量
private long sumFlow;//总流量 public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
} //为了对象数据的初始化方便,加入一个带参的构造函数
public FlowBean(String phoneNumber, long upFlow, long downFlow) {
this.phoneNumber = phoneNumber;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
public FlowBean() {
} //重写toString()方法
@Override
public String toString() {
return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
} //从数据流中反序列出对象的数据
//从数据流中读取字段时必须和序列化的顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {
phoneNumber = in.readUTF();
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong(); } //将对象数据序列化到流中
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNumber);
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow); } //流量比较的实现方法
@Override
public int compareTo(FlowBean o) { //大就返回-1,小于等于返回1,进行倒序排序
return sumFlow > o.sumFlow ? - : ;
} }

效果就是这样,总之问题不断:

[root@master hadoop]# hadoop jar flowsort.jar com.flowSort.FlowSortMapReduce /flow/output4 /flow/sortoutput
17/09/21 19:32:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/21 19:32:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/21 19:32:29 INFO input.FileInputFormat: Total input paths to process : 1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: number of splits:1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505991512603_0004
17/09/21 19:32:29 INFO impl.YarnClientImpl: Submitted application application_1505991512603_0004
17/09/21 19:32:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505991512603_0004/
17/09/21 19:32:29 INFO mapreduce.Job: Running job: job_1505991512603_0004
17/09/21 19:32:33 INFO mapreduce.Job: Job job_1505991512603_0004 running in uber mode : false
17/09/21 19:32:33 INFO mapreduce.Job:  map 0% reduce 0%
17/09/21 19:32:38 INFO mapreduce.Job:  map 100% reduce 0%
17/09/21 19:32:44 INFO mapreduce.Job:  map 100% reduce 100%
17/09/21 19:32:44 INFO mapreduce.Job: Job job_1505991512603_0004 completed successfully
17/09/21 19:32:44 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=822
        FILE: Number of bytes written=187379
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=635
        HDFS: Number of bytes written=526
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=2031
        Total time spent by all reduces in occupied slots (ms)=2599
        Total time spent by all map tasks (ms)=2031
        Total time spent by all reduce tasks (ms)=2599
        Total vcore-seconds taken by all map tasks=2031
        Total vcore-seconds taken by all reduce tasks=2599
        Total megabyte-seconds taken by all map tasks=2079744
        Total megabyte-seconds taken by all reduce tasks=2661376
    Map-Reduce Framework
        Map input records=21
        Map output records=21
        Map output bytes=774
        Map output materialized bytes=822
        Input split bytes=109
        Combine input records=0
        Combine output records=0
        Reduce input groups=21
        Reduce shuffle bytes=822
        Reduce input records=21
        Reduce output records=21
        Spilled Records=42
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=121
        CPU time spent (ms)=700
        Physical memory (bytes) snapshot=218284032
        Virtual memory (bytes) snapshot=726839296
        Total committed heap usage (bytes)=137433088
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=526
    File Output Format Counters
        Bytes Written=526
[root@master hadoop]# hadoop fs -ls /flow/sortoutput
Found 2 items
-rw-r--r--   1 root supergroup          0 2017-09-21 19:32 /flow/sortoutput/_SUCCESS
-rw-r--r--   1 root supergroup        526 2017-09-21 19:32 /flow/sortoutput/part-r-00000
[root@master hadoop]# hadoop fs -cat /flow/sortoutput/part-r-00000
13726238888    2481    24681    27162
13726230503    2481    24681    27162
13925057413    63    11058    11121
18320173382    18    9531    9549
13502468823    102    7335    7437
13660577991    9    6960    6969
13922314466    3008    3720    6728
13560439658    5892    400    6292
84138413    4116    1432    5548
15013685858    27    3659    3686
15920133257    20    3156    3176
13602846565    12    1938    1950
15989002119    3    1938    1941
13926435656    1512    200    1712
18211575961    12    1527    1539
13560436666    954    200    1154
13480253104    180    200    380
13760778710    120    200    320
13826544101    0    200    200
13926251106    0    200    200
13719199419    0    200    200
[root@master hadoop]#

上一篇:Python-Day15 JavaScript/DOM


下一篇:windows清理时间痕迹