开发者学堂课程【Hadoop 企业优化及扩展案例:TopN 案例】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/96/detail/1575
TopN 案例
1.需求:
对需求 2.4 输出结果进行加工,输出流量使用量在前 10 的用户信息。
(1)输入数据:top10input.xt
(2)输出数据:part-r-00000.txt
2.需求分析
3、实现代码
(1) 编写 FlowBean 类
package com. atguigu.mr. top;.
import java. io. DataInput;
import java. io. Dataoutput;+import java. io. IOException;.
import org. apache . hadoop. io. WritableComparable;.
public class FlowBean implements WritableComparable<FlowBean>
{
private long upFlow;
Private
lon
g
downFlow;'
private
long sumFlow;,
public
FlowBean()
super ()
;
public FlowBean (long upFlow, long downFlow)
{
super ();.
this.upFlow=upFlow;
this.downFlow=downFlow;
}
@override.
public void write(DataOutput out)throws
IOException
{
out. writeLong (upFlow);.
out
.
writeLong (downFlow);.
out. writeLong (sumFlow);
}
.
@override,
public void readFields(DataInput in)
throws IOException
{
upFlow = in.readLong ();
downFlow = in.readLong();,
sumFlow = in. readLong(
)
;
}
public long getUpFlow()
{
return upFlow;,
public void setUpFlow (long upFlow)
{
this. upFlow = upFlow;.
public long getDownFlow()
{
return downFlow;,
public void setDownFlow (long downFlow)
{
this.down
F
loW = downF
l
oW;
}
public long getSumFlow()
return sumFlow;
}
public void setSumFlow (long sumFlow)
{
this.sumFlow = sumFlow;·
@override.
public string tostring()
{
return upFlow + "
\
t" + downFlow + "
\
t" + sumFlow;
public void set(long downFlow2, long upFlow2)
{
downFlow = downFlow2;
upFlow = upFlow2;·
sumFlow = downFlow2 + upFlow2;
}
@override.
public int compareTo (FlowBean
B
ean)
{
int
result;
if (this.sumFlow>bean.getSumFlow())
{
result
=
-1 ;,
}
else if (this.sumFlow < bean.getSumFlow() )
{
result = 1;
}
else
{
Return
result;
}
(2) 编写 TopNMapper 类
package com.atguigu.mr.top;.
import java.io. IOException;.
import
java.util. Iterator;.
import java.util. TreeMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.
T
ext;
import org.
a
pache.hadoop.mapreduce.Mapper;
public
class
TopNMapper
extends
Mapper <LongWritable,Text,
FlowBean,Text>{
//定义一个 TreeMap 作为存储数据的容器(天然按 key 排序)
Private
TreeMap<FlowBean,
Text>
flowMap
=
new
TreeMap<FlowBean,Text>();
private FlowBean kBean; ,
@override.
Protected
void
map(LongWritable
key,Text
value
Context
,
context) throws IOException, InterruptedException
{
kBean =newFlowBean();
Text V = newText();,
// 1 获取一行
String line = value.tostring();
// 2 切割
string []fields = line.split ("
\
t");
// 3 封装数据
String phoneNum = fields[O];,
long upFlow = Long
.
parseLong (fields[1]);
Long
downFlow = Long. parseLong(fields[2] ) ;
long sumFlow
=
L
ong.parseLong (fields[3]);
kBean.set
.
DownFlow(downFlow);
kBean.set
.
UpFlow(upFlow);.
kBean.set
.
SumFlow(sumFlow) ;,
V.set(phoneNum);
//4向 TreeMap 中添加数据
flowMap.put (kBean,v) ;·
//5限制 TreeMap 的数据量,超过 10 条就删除掉流量最小的一条数据if (flowMap.size() >10)
{
flowMap.remove(flowMap. firstKey());
flowMap.remove(flowMap. lastKey());
}
@override.
protected void cleanup (context
context) throws IOException,
Inter ruptedException
{
//6遍历 treeMap 集合,输出数据.
Iterator<FlowBean> bean = flowMap.keySet().iterator();
while (bean.hasNext())
{
FlowBean k = bean.next () ;
context.write (k,flowMap.get(k));
}
(3)编写 TopNReducer 类
package com.atguigu.mr.top;.
import java.io.IOException;
importjava.util.Iterator;
importjava.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;.
public class TopNReducer extends Reducer<FlowBean,Text,Text,FlowBean>
//定义一个 TreeMap 作为存储数据的容器(天然按 key 排序)
TreeMap<FlowBean,Text>
F
lowMap=newTreeMap<FlowBean,Text>();,
@override.
protected void reduce (FlowBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException
{
for
(Text value:values)
{
FlowBean bean = new FlowBean();
bean.set (key.getDownFlow(),key.getUpFlow());
//1向 treeMap 集合中添加数据。
flowMap.put (bean, newText(value));.
//2限制 TreeMap 数据量,超过 10 条就删除掉流量最小的一条数据。if(flowMap.size () > 10)
{
/
/ flowMap. remove (flowMap. firstKey());
flowMap. remove ( flowMap. lastKey() ) ;,
@override
protected
voi
d
cleanup (Reducer<FlowBean,Text,Text,
FlowBean> . Context
context
)
throws
IOException,InterruptedException
{
//3遍历集合,输出数据
Iterator<FlowBean> it = flowMap.keySet().iterator();
while (it.hasNext())
{
context.write (new Text (flowMab.get (v) )
;
(4)编写 TopNDriver 类
package com. atguigu.mr.top;'
import org.apache .hadoop . conf . Configuration;.
import
org.apache .hadoop. fs. Path;.
import org.apache .hadoop. io. Text;.
import org.apache .hadoop .mapreduce .Job;+
import org.apache .hadoop. mapreduce.lib. input. File InputFormat;
Impor
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;.
public class TopNDriver
{
public static void main(String[] args) throws Exception i.
args
=
new string
[]
("e: /output1", "e: /output3")
/
//1获取配置信息,或者 job 对象实例。
//6指定本程序的 jar 包所在的本地路径 job.setJarByClass (TopNDriver.class) ;
//2指定本业务 job 要使用的 mapper/Reducer 业务类。job. setMappe rClass (TopNMapper .class) ;
job. setReducerClass (TopNReducer .class) ;
//3指定 mappe r: 输出数据的 kv 类型。
job. setMapOutputKeyClass (FlowBean.class);
job. setMapOutputValueClass (Text.class)
;.
//4指定最终输出的数据的 kv 类型
job. setoutputKeyClass (Text.class) ;
job. setoutputValueClass (FlowBean.class) ;
//5指定 job 的输入原始文件所在目录
FileInputFormat. setInputPaths (job,new Path(args[0]) ) ;.
FileOutputFormat. setoutputPath(job, newPath(args[1]) ) ;
//7将 job 中配置的相关参数,以及 job 所用的 java 类所在的 jar 包,
提交给 yarn 去运行
boolean result
=
job. waitForCompletion (true);
System.exit (result ? 0 : 1) ;.
Configuration configuration = new Configuration() ;
Job job = Job.getInsance(configuration);.