一:序列化概念
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Java序列化(java.io.Serializable)
二:Hadoop序列化的特点
(1):序列化格式特点:
紧凑:高效使用存储空间。
快速:读写数据的额外开销小。
可扩展:可透明地读取老格式的数据。
互操作:支持多语言的交互。(2):Hadoop的序列化格式:Writable接口
三:Hadoop序列化的作用:
(1):序列化在分布式环境的两大作用:进程间通信,永久存储。
(2):Hadoop节点间通信。
四:Writable接口(实现序列化的类实现这个接口)
(1)Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.
(2)MapReduce的任意Key和Value必须实现Writable接口.
(3)MapReduce的任意key必须实现WritableComparable接口.
1:创建一个FlowBean的实体类,实现序列化操作:
package com.flowSum; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.Writable; /***
*
* @author Administrator
* 1:write 是把每个对象序列化到输出流
* 2:readFields是把输入流字节反序列化
* 3:实现WritableComparable
* Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
*
*/
public class FlowBean implements Writable{ private String phoneNumber;//电话号码
private long upFlow;//上行流量
private long downFlow;//下行流量
private long sumFlow;//总流量 public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
} //为了对象数据的初始化方便,加入一个带参的构造函数
public FlowBean(String phoneNumber, long upFlow, long downFlow) {
this.phoneNumber = phoneNumber;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
public FlowBean() {
} //重写toString()方法
@Override
public String toString() {
return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
} //从数据流中反序列出对象的数据
//从数据流中读取字段时必须和序列化的顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {
phoneNumber = in.readUTF();
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong(); } //将对象数据序列化到流中
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNumber);
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow); } }
创建FlowSumMapper的类实现Mapper这个类:
package com.flowSum; import java.io.IOException; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/***
*
* @author Administrator
* 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
* 所以就必须实现hadoop的相应的序列化接口
* 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
*/
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
//封装成key-value发送出去 @Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//拿到一行数据
String line = value.toString();
//切分成各个字段
String[] fields = StringUtils.split(line,"/t");
//拿到手机号的字段
String phoneNumber = fields[];
//拿到上行流量字段
long up_flow = Long.parseLong(fields[]);
//拿到下行流量字段
long down_flow = Long.parseLong(fields[]); //最后一步,封装数据为key-value进行输出
context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); } }
创建FlowSumReducer类继承Reducer类:
package com.flowSum; import java.io.IOException; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ //框架每传递一组数据<手机号,{flowbean,flowbean,flowbean...}>调用一次我们的reduce方法
//reduce中的业务逻辑就是遍历values,然后累加求和再输出
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
//上行流量计数器和下行流量计数器
long up_flow_counter = ;
long down_flow_counter = ; //上行流量和下行流量累加求和
for(FlowBean bean : values){
up_flow_counter += bean.getUpFlow();
down_flow_counter += bean.getDownFlow();
} //将结果输出
context.write(key, new FlowBean(key.toString(), up_flow_counter, down_flow_counter)); } }
创建FlowSumRunner 类继承Configured实现Tool,规范性操作(Job描述和提交类的规范写法):
package com.flowSum; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
/***
*
* @author Administrator
* 1:Job描述和提交类的规范写法
*/
public class FlowSumRunner extends Configured implements Tool{ @Override
public int run(String[] args) throws Exception {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf); //设置整个job所用的那些类在哪个jar包
job.setJarByClass(FlowSumRunner.class); //本job使用的mapper和reducer的类
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class); //指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class); //指定reduce的输出数据key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class); //指定要处理的输入数据存放路径
//FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
//FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
//至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
FileInputFormat.setInputPaths(job, new Path(args[0])); //指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job提交给集群运行
//job.waitForCompletion(true);
//正常执行成功返回0,否则返回1
return job.waitForCompletion(true) ? : ;
} public static void main(String[] args) throws Exception {
//规范性调用
int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
//执行结束退出
System.exit(res);
} }
然后打包上传到虚拟机上面,还有模拟数据,过程省略,贴出模拟数据:
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4
--7A--CC-0A:CMCC 120.196.100.99
5C-0E-8B-8B-B1-:CMCC 120.197.40.4
--AC-CD-E6-:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站
5C-0E-8B-8C-E8-:7DaysInn 120.197.40.4 122.72.52.12
C4--FE-BA-DE-D9:CMCC 120.196.100.99
5C-0E-8B-C7-BA-:CMCC 120.197.40.4 sug.so..cn 信息安全
-A1-B7---B1:CMCC-EASY 120.196.100.82
5C-0E-8B--5C-:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计
5C-0E-8B-C7-F7-:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎
E8--C4-4E--E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计
C4--FE-BA-DE-D9:CMCC 120.196.100.99
5C-0E-8B-C7-FC-:CMCC-EASY 120.197.40.4
5C-0E-8B-8B-B6-:CMCC 120.197.40.4 .flash2-http.qq.com 综合门户
-FD--A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn
5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户
--DB-4F--1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎
-1F--E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎
-FD--A4-7B-:CMCC 120.196.100.82
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
C4--FE-BA-DE-D9:CMCC 120.196.100.99
可以看到打的包和模拟数据已经上传到虚拟机上:
然后将数据上传到hdfs集群(这里是伪分布式集群)上面:
现在集群上面创建一个空白的文件夹flow,然后在文件夹里面创建一个data文件夹存放数据,最后将数据存放到data文件夹里面:
然后执行程序,由于是需要传入参数的,所以注意最后两个是参数:
然后就报了一个这样子的错,我也是一脸懵逼:
Error: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
at java.lang.Class.asSubclass(Class.java:3165)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:884)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:981)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:80)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:675)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
然后根据你现在学的知识肯定已经被别人学过的理论,and一定有好心的大神会贴出来错误的心态百度一下,然后解决问题:
原来是Text的包导错了(还是小心点好。不然够喝一壶的了)
不是:import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
而是:import org.apache.hadoop.io.Text;
然后打包上传到虚拟机上面运行,然后你会发现这个错误:
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/flow/output already exists
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at com.flowSum.FlowSumRunner.run(FlowSumRunner.java:55)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.flowSum.FlowSumRunner.main(FlowSumRunner.java:60)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
然后你把这个/flow/output的这个output文件夹删除了,因为输出文件夹是程序自动创建的:
最后运行程序(由于是需要传入参数的,所以注意最后两个是参数):
然后就报数据越界的异常,我想可能是测试数据不干净:
Error: java.lang.ArrayIndexOutOfBoundsException: 1
at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:29)
at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162
然后手动造了一份数据,如下所示:
(好吧,后来测试上面的测试数据又可以运行了,总之多测试几遍吧,都是坑!!!)
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
-FD--A4--B8:CMCC 120.196.100.82 i02.c.aliimg.com
最后将String[] fields = StringUtils.split(line, "\t");修改为了27 String[] fields = StringUtils.split(line, " ");
(后来测试了一下,String[] fields = StringUtils.split(line, "\t");也可以,开始以为空格的大小也影响测试数据呢,代码没问题,就是测试数据的问题。)
package com.flowSum; import java.io.IOException; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/***
*
* @author Administrator
* 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
* 所以就必须实现hadoop的相应的序列化接口
* 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
*/
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
//封装成key-value发送出去 @Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//拿到一行数据
String line = value.toString();
//切分成各个字段
String[] fields = StringUtils.split(line, " ");
//拿到手机号的字段
String phoneNumber = fields[];
//拿到上行流量字段
long up_flow = Long.parseLong(fields[]);
//拿到下行流量字段
long down_flow = Long.parseLong(fields[]); //最后一步,封装数据为key-value进行输出
context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); } }
打包上传到虚拟机上面,然后运行(正常运行结果如下所示):
[root@master hadoop]# hadoop jar flow.jar com.flowSum.FlowSumRunner /flow/data /flow/output
17/09/20 09:35:26 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/20 09:35:26 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/20 09:35:27 INFO input.FileInputFormat: Total input paths to process : 1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: number of splits:1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505814887677_0007
17/09/20 09:35:27 INFO impl.YarnClientImpl: Submitted application application_1505814887677_0007
17/09/20 09:35:27 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505814887677_0007/
17/09/20 09:35:27 INFO mapreduce.Job: Running job: job_1505814887677_0007
17/09/20 09:35:33 INFO mapreduce.Job: Job job_1505814887677_0007 running in uber mode : false
17/09/20 09:35:33 INFO mapreduce.Job: map 0% reduce 0%
17/09/20 09:35:37 INFO mapreduce.Job: map 100% reduce 0%
17/09/20 09:35:43 INFO mapreduce.Job: map 100% reduce 100%
17/09/20 09:35:43 INFO mapreduce.Job: Job job_1505814887677_0007 completed successfully
17/09/20 09:35:43 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1179
FILE: Number of bytes written=187971
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2467
HDFS: Number of bytes written=279
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2691
Total time spent by all reduces in occupied slots (ms)=2582
Total time spent by all map tasks (ms)=2691
Total time spent by all reduce tasks (ms)=2582
Total vcore-seconds taken by all map tasks=2691
Total vcore-seconds taken by all reduce tasks=2582
Total megabyte-seconds taken by all map tasks=2755584
Total megabyte-seconds taken by all reduce tasks=2643968
Map-Reduce Framework
Map input records=23
Map output records=23
Map output bytes=1127
Map output materialized bytes=1179
Input split bytes=93
Combine input records=0
Combine output records=0
Reduce input groups=10
Reduce shuffle bytes=1179
Reduce input records=23
Reduce output records=10
Spilled Records=46
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=126
CPU time spent (ms)=1240
Physical memory (bytes) snapshot=218099712
Virtual memory (bytes) snapshot=726839296
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2374
File Output Format Counters
Bytes Written=279
[root@master hadoop]#
查看输出结果如下所示:
总之吧,学习新知识,难免各种错误,静下心去解决吧。
2:流量求和统计排序案例实践:
将Mapper类和Reducer类都写成静态内部类(又遇到上面比较骚气的问题了String[] fields = StringUtils.split(line, "\t");就是跑步起来,各种报数组越界异常,郁闷,换成了String[] fields = StringUtils.split(line, " ");就跑起来了,真是一脸懵逼);
package com.flowSort; import java.io.IOException; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowSortMapReduce { /***
* mapper静态内部类
* @author Administrator
*
*/
public static class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//获取到一行数据
String line = value.toString();
//对这一行数据进行截取
String[] fields = StringUtils.split(line, ""); //获取数据里面的数据
String phoneNumber = fields[];
long up_flow = Long.parseLong(fields[]);
long down_flow = Long.parseLong(fields[]); //将数据进行封装传递给reduce
context.write(new FlowBean(phoneNumber, up_flow, down_flow), NullWritable.get());
} } /***
* reducer的静态内部类
* @author Administrator
*
*/
public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ @Override
protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException { String phoneNumber = key.getPhoneNumber();
context.write(new Text(phoneNumber), key);
}
} /***
* 主方法
* @param args
* @throws InterruptedException
* @throws IOException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf); //设置整个job所用的那些类在哪个jar包
job.setJarByClass(FlowSortMapReduce.class); //本job使用的mapper和reducer的类
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class); //指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class); //指定reduce的输出数据key-value类型Text
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class); //指定要处理的输入数据存放路径
//FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
//FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
//至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
FileInputFormat.setInputPaths(job, new Path(args[])); //指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path(args[])); //将job提交给集群运行
//job.waitForCompletion(true);
//正常执行成功返回0,否则返回1
System.exit(job.waitForCompletion(true) ? : );
} }
实体类改造,进行总流量排序处理:
package com.flowSort; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable; /***
*
* @author Administrator
* 1:write 是把每个对象序列化到输出流
* 2:readFields是把输入流字节反序列化
* 3:实现WritableComparable
* Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
*
*/
public class FlowBean implements WritableComparable<FlowBean>{ private String phoneNumber;//电话号码
private long upFlow;//上行流量
private long downFlow;//下行流量
private long sumFlow;//总流量 public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
} //为了对象数据的初始化方便,加入一个带参的构造函数
public FlowBean(String phoneNumber, long upFlow, long downFlow) {
this.phoneNumber = phoneNumber;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
public FlowBean() {
} //重写toString()方法
@Override
public String toString() {
return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
} //从数据流中反序列出对象的数据
//从数据流中读取字段时必须和序列化的顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {
phoneNumber = in.readUTF();
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong(); } //将对象数据序列化到流中
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNumber);
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow); } //流量比较的实现方法
@Override
public int compareTo(FlowBean o) { //大就返回-1,小于等于返回1,进行倒序排序
return sumFlow > o.sumFlow ? - : ;
} }
效果就是这样,总之问题不断:
[root@master hadoop]# hadoop jar flowsort.jar com.flowSort.FlowSortMapReduce /flow/output4 /flow/sortoutput
17/09/21 19:32:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/21 19:32:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/21 19:32:29 INFO input.FileInputFormat: Total input paths to process : 1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: number of splits:1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505991512603_0004
17/09/21 19:32:29 INFO impl.YarnClientImpl: Submitted application application_1505991512603_0004
17/09/21 19:32:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505991512603_0004/
17/09/21 19:32:29 INFO mapreduce.Job: Running job: job_1505991512603_0004
17/09/21 19:32:33 INFO mapreduce.Job: Job job_1505991512603_0004 running in uber mode : false
17/09/21 19:32:33 INFO mapreduce.Job: map 0% reduce 0%
17/09/21 19:32:38 INFO mapreduce.Job: map 100% reduce 0%
17/09/21 19:32:44 INFO mapreduce.Job: map 100% reduce 100%
17/09/21 19:32:44 INFO mapreduce.Job: Job job_1505991512603_0004 completed successfully
17/09/21 19:32:44 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=822
FILE: Number of bytes written=187379
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=635
HDFS: Number of bytes written=526
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2031
Total time spent by all reduces in occupied slots (ms)=2599
Total time spent by all map tasks (ms)=2031
Total time spent by all reduce tasks (ms)=2599
Total vcore-seconds taken by all map tasks=2031
Total vcore-seconds taken by all reduce tasks=2599
Total megabyte-seconds taken by all map tasks=2079744
Total megabyte-seconds taken by all reduce tasks=2661376
Map-Reduce Framework
Map input records=21
Map output records=21
Map output bytes=774
Map output materialized bytes=822
Input split bytes=109
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=822
Reduce input records=21
Reduce output records=21
Spilled Records=42
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=121
CPU time spent (ms)=700
Physical memory (bytes) snapshot=218284032
Virtual memory (bytes) snapshot=726839296
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=526
File Output Format Counters
Bytes Written=526
[root@master hadoop]# hadoop fs -ls /flow/sortoutput
Found 2 items
-rw-r--r-- 1 root supergroup 0 2017-09-21 19:32 /flow/sortoutput/_SUCCESS
-rw-r--r-- 1 root supergroup 526 2017-09-21 19:32 /flow/sortoutput/part-r-00000
[root@master hadoop]# hadoop fs -cat /flow/sortoutput/part-r-00000
13726238888 2481 24681 27162
13726230503 2481 24681 27162
13925057413 63 11058 11121
18320173382 18 9531 9549
13502468823 102 7335 7437
13660577991 9 6960 6969
13922314466 3008 3720 6728
13560439658 5892 400 6292
84138413 4116 1432 5548
15013685858 27 3659 3686
15920133257 20 3156 3176
13602846565 12 1938 1950
15989002119 3 1938 1941
13926435656 1512 200 1712
18211575961 12 1527 1539
13560436666 954 200 1154
13480253104 180 200 380
13760778710 120 200 320
13826544101 0 200 200
13926251106 0 200 200
13719199419 0 200 200
[root@master hadoop]#