TopN 案例|学习笔记

开发者学堂课程【Hadoop 企业优化及扩展案例:TopN 案例】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/96/detail/1575


TopN 案例

1.需求

对需求 2.4 输出结果进行加工,输出流量使用量在前 10 的用户信息。
(1)输入数据top10input.xt
(2)输出数据part-r-00000.txt


2.需求分析

TopN 案例|学习笔记

3、实现代码

(1) 编写 FlowBean 类

package com. atguigu.mr. top;.​​import java. io. DataInput;

import java. io. Dataoutput;+import java. io. IOException;.​​import org. apache . hadoop. io. WritableComparable;.​​public class FlowBean implements WritableComparable<FlowBean>​​{​​private long upFlow;

Private​​  ​​lon​​g  ​​downFlow;'​​private​​  ​​long sumFlow;,​​public​​  ​​FlowBean() 

super ()​​;

public FlowBean (long upFlow, long downFlow)​​ {

super ();.​​this.upFlow=upFlow;​​this.downFlow=downFlow;

}​​@override.

public void write(DataOutput out)throws​​ ​​IOException​​ {​​out. writeLong (upFlow);.​​out​​.​​ writeLong (downFlow);.​​out. writeLong (sumFlow);

}​​.​​@override,​​public void readFields(DataInput in)​​ ​​throws IOException ​​{​​upFlow = in.readLong ();​​downFlow = in.readLong();,​​sumFlow = in. readLong(​​)​​;

}

public long getUpFlow() ​​{

return upFlow;,​​public void setUpFlow (long upFlow) ​​{

this. upFlow = upFlow;.​​public long getDownFlow() ​​{

return downFlow;,​​public void setDownFlow (long downFlow) ​​{

this.down​​F​​loW = downF​​l​​oW;

}​​public long getSumFlow() 

return sumFlow;

}​​public void setSumFlow (long sumFlow) ​​{

this.sumFlow = sumFlow;·​​@override.​​public string tostring()​​{​​return upFlow + "​​\​​t" + downFlow + "​​\​​t" + sumFlow;​​public void set(long downFlow2, long upFlow2)​​{

downFlow = downFlow2;

upFlow = upFlow2;·​​sumFlow = downFlow2 + upFlow2;

}​​@override.​​public int compareTo (FlowBean ​​B​​ean) ​​{​​int​​  ​​result; ​​if (this.sumFlow>bean.getSumFlow())​​{​​result​​ ​​=​​ ​​-1 ;,​​}​​else if (this.sumFlow < bean.getSumFlow() )​​{

result = 1;​​}​​else ​​{

Return​​ ​​result; 

}

(2) 编写 TopNMapper 类
package com.atguigu.mr.top;.​​import java.io. IOException;.​​import​​  ​​java.util. Iterator;.

import java.util. TreeMap;​​import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.​​T​​ext;​​import org.​​a​​pache.hadoop.mapreduce.Mapper;​​public​​  ​​class​​  ​​TopNMapper​​ ​​extends​​ ​​Mapper <LongWritable,Text,​​FlowBean,Text>{
//定义一个 TreeMap 作为存储数据的容器(天然按 key 排序)

Private​​ ​​TreeMap<FlowBean,​​  ​​Text>​​  ​​flowMap​​  = ​​new​​TreeMap<FlowBean,Text>();​​private FlowBean kBean; ,​​@override.​​Protected​​  ​​void​​ ​​map(LongWritable​​ ​​key,Text​​ ​​value​​  ​​Context​​,​​context) throws IOException, InterruptedException​​ {

kBean =newFlowBean();​​Text V = newText();,
//  1 获取一行

String line = value.tostring();
//  2 切割
string []fields = line.split ("\t");

//  3 封装数据
String phoneNum = fields[O];,​​long upFlow = Long​​.​​parseLong (fields[1]);

Long​​  ​​downFlow = Long. parseLong(fields[2] ) ;

long sumFlow​​ ​​=​​  L​​ong.parseLong (fields[3]);​​kBean.set​​.​​DownFlow(downFlow);

kBean.set​​.​​UpFlow(upFlow);.​​kBean.set​​.​​SumFlow(sumFlow) ;,​​V.set(phoneNum);

//4向 TreeMap 中添加数据

flowMap.put (kBean,v) ;·
//5限制 TreeMap 的数据量,超过 10 条就删除掉流量最小的一条数据​​if (flowMap.size() >10)​​{
flowMap.remove(flowMap. firstKey());
flowMap.remove(flowMap. lastKey());

}

@override.protected void cleanup (context  context) throws IOException,Inter ruptedException {
//6遍历 treeMap 集合,输出数据.
Iterator<FlowBean> bean = flowMap.keySet().iterator();​​while (bean.hasNext())​​{​​FlowBean k = bean.next () ;​​context.write (k,flowMap.get(k));

}
(3)编写 TopNReducer 类 
package com.atguigu.mr.top;.​​import java.io.IOException;

importjava.util.Iterator;

importjava.util.TreeMap;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;.public class TopNReducer extends Reducer<FlowBean,Text,Text,FlowBean> 
//定义一个 TreeMap 作为存储数据的容器(天然按 key 排序)
TreeMap<FlowBean,Text>FlowMap=newTreeMap<FlowBean,Text>();,@override.protected void reduce (FlowBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException {for  (Text value:values) {FlowBean bean = new FlowBean();bean.set (key.getDownFlow(),key.getUpFlow());
//1向 treeMap 集合中添加数据。
flowMap.put (bean, newText(value));.
//2限制 TreeMap 数据量,超过 10 条就删除掉流量最小的一条数据。​​if(flowMap.size () > 10) ​​{
/​​/ flowMap. remove (flowMap. firstKey());

flowMap. remove ( flowMap. lastKey() ) ;,@overrideprotected  void  cleanup (Reducer<FlowBean,Text,Text,FlowBean> . Context context )throws   IOException,InterruptedException{
//3遍历集合,输出数据
Iterator<FlowBean> it = flowMap.keySet().iterator();while (it.hasNext()) {context.write (new Text (flowMab.get (v) )
(4)编写 TopNDriver 类
package com. atguigu.mr.top;'​​import org.apache .hadoop . conf . Configuration;.​​import​​  ​​org.apache .hadoop. fs. Path;.​​import org.apache .hadoop. io. Text;.​​import org.apache .hadoop .mapreduce .Job;+​​import org.apache .hadoop. mapreduce.lib. input. File InputFormat;

Impor

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;.public class TopNDriver {public static void main(String[] args) throws Exception i.args=new string[]("e: /output1", "e: /output3")/
//1获取配置信息,或者 job 对象实例。
//6指定本程序的 jar 包所在的本地路径 job.setJarByClass (TopNDriver.class) ;
//2指定本业务 job 要使用的 mapper/Reducer 业务类。job. setMappe rClass (TopNMapper .class) ;job. setReducerClass (TopNReducer .class) ;
//3指定 mappe r: 输出数据的 kv 类型。
job. setMapOutputKeyClass (FlowBean.class);

job. setMapOutputValueClass (Text.class) ;.
//4指定最终输出的数据的 kv 类型
job. setoutputKeyClass (Text.class) ;
job. setoutputValueClass (FlowBean.class) ;
//5指定 job 的输入原始文件所在目录
FileInputFormat. setInputPaths (job,new Path(args[0]) ) ;.

FileOutputFormat. setoutputPath(job, newPath(args[1]) ) ;
//7将 job 中配置的相关参数,以及 job 所用的 java 类所在的 jar 包,
提交给 yarn 去运行
boolean result​​=​​job. waitForCompletion (true);​​System.exit (result ? 0 : 1) ;.​​Configuration configuration = new Configuration() ;

Job job = Job.getInsance(configuration);.

上一篇:三分钟帮你集成极光推送——和那些你可能不知道的事


下一篇:Android进阶之自定义View(1)实现可换行的TextView