数据生产
1 import java.io.*; 2 import java.text.DecimalFormat; 3 import java.text.ParseException; 4 import java.text.SimpleDateFormat; 5 import java.util.*; 6 7 public class ProductLog { 8 // 存放生产的电话号码 9 private List<String> phoneList = new ArrayList<String>(); 10 private Map<String, String> phoneNameMap = new HashMap<>(); 11 String startTime = "2020-01-01"; 12 String endTime = "2020-12-31"; 13 14 public void initPhone() { 15 //20个随机电话 16 phoneList.add("17078388295"); 17 phoneList.add("13980337439"); 18 phoneList.add("14575535933"); 19 phoneList.add("19902496992"); 20 phoneList.add("18549641558"); 21 phoneList.add("17005930322"); 22 phoneList.add("18468618874"); 23 phoneList.add("18576581848"); 24 phoneList.add("15978226424"); 25 phoneList.add("15542823911"); 26 phoneList.add("17526304161"); 27 phoneList.add("15422018558"); 28 phoneList.add("17269452013"); 29 phoneList.add("17764278604"); 30 phoneList.add("15711910344"); 31 phoneList.add("15714728273"); 32 phoneList.add("16061028454"); 33 phoneList.add("16264433631"); 34 phoneList.add("17601615878"); 35 phoneList.add("15897468949"); 36 37 //随机电话对应的姓名 38 phoneNameMap.put("17078388295", "李雁"); 39 phoneNameMap.put("13980337439", "卫艺"); 40 phoneNameMap.put("14575535933", "仰莉"); 41 phoneNameMap.put("19902496992", "陶欣悦"); 42 phoneNameMap.put("18549641558", "施梅梅"); 43 phoneNameMap.put("17005930322", "金虹霖"); 44 phoneNameMap.put("18468618874", "魏明艳"); 45 phoneNameMap.put("18576581848", "华贞"); 46 phoneNameMap.put("15978226424", "华啟倩"); 47 phoneNameMap.put("15542823911", "仲采绿"); 48 phoneNameMap.put("17526304161", "卫丹"); 49 phoneNameMap.put("15422018558", "戚丽红"); 50 phoneNameMap.put("17269452013", "何翠柔"); 51 phoneNameMap.put("17764278604", "钱溶艳"); 52 phoneNameMap.put("15711910344", "钱琳"); 53 phoneNameMap.put("15714728273", "缪静欣"); 54 phoneNameMap.put("16061028454", "焦秋菊"); 55 phoneNameMap.put("16264433631", "吕访琴"); 56 phoneNameMap.put("17601615878", "沈丹"); 57 phoneNameMap.put("15897468949", "褚美丽"); 58 } 59 60 // 生产数据 61 // caller,callee,buildTime,duration 62 // 主叫,被叫,通话建立时间,通话持续时间 63 public String product() { 64 String caller; 65 String callee; 66 67 // 生成主叫的随机索引 68 int callerIndex = (int) (Math.random() * phoneList.size()); 69 // 通过随机索引获得主叫电话号码 70 caller = phoneList.get(callerIndex); 71 72 while (true) { 73 int calleeIndex = (int) (Math.random() * phoneList.size()); 74 callee = phoneList.get(calleeIndex); 75 // 去重判断 76 if (!caller.equals(callee)) break; 77 } 78 79 // 随机产生通话建立时间 80 String buildTime = randomBuildTime(startTime,endTime); 81 82 // 随机产生通话持续时间 83 DecimalFormat df = new DecimalFormat("0000"); 84 String duration = df.format((int) (30 * 60 * Math.random())); 85 86 StringBuilder sb = new StringBuilder(); 87 sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration); 88 89 return sb.toString(); 90 } 91 92 // 随机生成时间 93 private String randomBuildTime(String startTime, String endTime) { 94 try { 95 SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); 96 Date startDate = sdf1.parse(startTime); 97 Date endDate = sdf1.parse(endTime); 98 99 // 生成时间字符串 100 if(endDate.getTime() <= startDate.getTime()){return null;} 101 102 // (结束 - 起始) * 随机[0,1) + 起始 103 long randomTS = startDate.getTime() + (long)((endDate.getTime() - startDate.getTime())*Math.random()); 104 Date resultDate = new Date(randomTS); 105 SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 106 String resultTimeString = sdf2.format(resultDate); 107 108 return resultTimeString; 109 } catch (ParseException e) { 110 e.printStackTrace(); 111 } 112 return null; 113 } 114 115 public void writeLog(String filePath){ 116 try { 117 OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath,true), "UTF-8"); 118 while(true){ 119 try { 120 Thread.sleep(500); 121 String log = product(); 122 System.out.println(log); 123 osw.write(log+"\n"); 124 osw.flush(); 125 } catch (InterruptedException e) { 126 e.printStackTrace(); 127 } catch (IOException e) { 128 e.printStackTrace(); 129 } 130 } 131 } catch (UnsupportedEncodingException e) { 132 e.printStackTrace(); 133 } catch (FileNotFoundException e) { 134 e.printStackTrace(); 135 } 136 } 137 138 public static void main(String[] args) { 139 args = new String[]{"F:\\idea-workspace\\CT_BD\\data\\calllog.csv"}; 140 ProductLog productLog = new ProductLog(); 141 productLog.initPhone(); 142 productLog.product(); 143 productLog.writeLog(args[0]); 144 } 145 }
producer.sh
#!/bin/bash
java -cp /root/temp/CT_producer-1.0-SNAPSHOT.jar ProductLog /root/temp/calllog.csv
数据消费
- Flume用于监控目标文件的变化,并把信息传递到Kafka
Flume配置
1 #定义agent名, source、channel、sink的名称 2 a1.sources = r1 3 a1.channels = c1 4 a1.sinks = k1 5 6 #具体定义source 7 a1.sources.r1.type = exec 8 a1.sources.r1.command = tail -F -c +0 /root/temp/calllog.csv 9 a1.sources.r1.shell = /bin/bash -c 10 11 #具体定义channel 12 a1.channels.c1.type = memory 13 a1.channels.c1.capacity = 1000 14 a1.channels.c1.transactionCapacity = 100 15 16 #具体定义sink 17 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink 18 a1.sinks.k1.brokerList = bigdata111:9092 19 a1.sinks.k1.topic = call 20 a1.sinks.k1.batchSize = 20
- 启动kafka生产者:bin/kafka-server-start.sh config/server.properties &
- 创建主题:bin/kafka-topics.sh --create --zookeeper bigdata111:2181 -replication-factor 1 --partitions 3 --topic calllog
- 启动kafka消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mydemo1 --from-beginning
- 启动flume:bin/flume-ng agent -c conf/ -n a1 -f /root/temp/flume-kafka.conf
- 生产数据:sh producer.sh
数据存储
- 将产生的数据实时存储在HBase中
- 编写调用HBaseAPI的相关方法,将从Kafka中读取出来的数据写入HBase中
- HBase的描述器:命名空间描述器、表描述器、列族描述器
- 协处理器:主叫插入f1后,被叫插入f2。修改程序和虚拟机中的 hbase-site.xml
数据分析
- 逻辑简单,代码量大
- 按照时间范围(年月日),统计出所属时间范围内所有手机号码的通话次数总和及通话时长总和
- 维度:某个视角,如按时间维度,统计2018年全年的通话记录,表示为2018年*月*日
- 通过Mapper将数据按照不同维度聚合给Reducer
- 通过Reducer拿到按各个维度聚合过来的数据,汇总输出
- 将Reducer输出通过outputformat输出到Mysql表
- 表结构设计
- contacts:存放手机号,联系人姓名
- call:存放某个时间维度下通话次数及通话时长总和
- dimension_data:存放时间
- 数据形式:联系人维度,时间维度
电话号码:123456 Chen
年:2020
月:12
日:31
- HBase-->Mysql
- Sqoop
- 自定义输出:map,reducer,outputformat,runner
- 下载 lombok 插件,并在maven中添加依赖
数据展示
参考
flume
https://www.freesion.com/article/4812259552/
电信项目