业务数据部分:
mysql ---->canal---->kafka----->sparkStreaming(充当ETL)--->ES--->SpringBoot接口--->页面展示
单日订单量及收入
1. 搭建利用canal对mysql中的数据实时监控
canal的安装和部署:https://www.cnblogs.com/shengyang17/p/10834781.html
2. 利用Java程序获得canal数据
创建gmall-canal模块
搭建利用canal对mysql中的数据实时监控: 前提是canal服务端要一直打开这;
搭建gmall-canal的maven模块: 利用Java程序获得canal数据:①CanalClient交互性连接, ②handler是处理具体的业务; ③发送到Kafka
Canal客户端:CanalClient.java
public class CanalClient { public static void main(String[] args) { //连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop101", 11111), "example", "", ""); while (true){ canalConnector.connect(); canalConnector.subscribe("gmall.order_info");//订阅 Message message = canalConnector.get(100); //message:一次canal从日志中抓取的信息,一个message包含多个sql; int size = message.getEntries().size(); if (size == 0){ System.out.println("没有数据休息5s"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }else { for (CanalEntry.Entry entry : message.getEntries()) { //entry:相当于一个sql命令,一个sql可能会对多行记录造成影响: if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA){ // ①type用于区分是数据变化,还是事务变化 CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //②storevalue得到rowchange:entry经过反序列化得到的对象,包含了多行记录的变化值 } catch (InvalidProtocolBufferException e) { //②.1 eventtype数据的变化类型 insert update delete create alter drop;②.2 rowdatalist e.printStackTrace(); } String tableName = entry.getHeader().getTableName(); // ③header.tableName 表名 CanalEntry.EventType eventType = rowChange.getEventType(); //insert update delete List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();//行集, 数据 CanalHandler.handler(tableName, eventType, rowDatasList); //RowDatas:一个rowchange里包含的数据变化集,其中每一个rowdata里面包含了一行的多个字段;包含afterColumnList和beforeColumnList //column: 一个RowData里包含了多个column,每个column包含了 name和 value: columnName和columnValue } } } } } }View Code
CanalHander.java
public class CanalHandler { public static void handler(String tableName, CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDataList){ //下单操作 if ("order_info".equals(tableName) && CanalEntry.EventType.INSERT == eventType){ //遍历行集 for (CanalEntry.RowData rowData : rowDataList) { List<CanalEntry.Column> columnsList = rowData.getAfterColumnsList(); JSONObject jsonObject = new JSONObject(); for (CanalEntry.Column column : columnsList) { System.out.println(column.getName() + ":" + column.getValue()); String propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName()); jsonObject.put(propertyName, column.getValue()); } MyKafkaSender.send(GmallConstant.KAFKA_TOPIC_ORDER, jsonObject.toJSONString()); } } } }View Code
MyKafkaSender.java
public class MyKafkaSender { public static KafkaProducer<String, String> kafkaProducer = null; public static KafkaProducer<String, String> createKafkaProducer(){ Properties properties = new Properties(); properties.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(properties); } catch (Exception e) { e.printStackTrace(); } return producer; } public static void send(String topic, String msg){ if (kafkaProducer == null){ kafkaProducer = createKafkaProducer(); } kafkaProducer.send(new ProducerRecord<String, String>(topic, msg)); } }View Code
3. sparkstreaming消费kafka并保持到ES中
OrderAPP.java 从kafka获取数据到sparkStreaming中并做处理(对手机号做脱敏处理)且写入到ES中,:
object OrderApp { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("gmall").setMaster("local[*]") val streamingContext = new StreamingContext(sparkConf, Seconds(5)) val inputDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstant.KAFKA_TOPIC_ORDER, streamingContext) /* inputDStream.map(_.value()).foreachRDD{ rdd => println(rdd.collect().mkString("\n")) //测试数据是否收集到 }*/ val orderInfoDStream: DStream[OrderInfo] = inputDStream.map { _.value() }.map { orderJson => val orderInfo: OrderInfo = JSON.parseObject(orderJson, classOf[OrderInfo]) //转变为Json对象 val createTimeArr: Array[String] = orderInfo.createTime.split(" ") orderInfo.createDate = createTimeArr(0) //2019-05-03 val timeArr: Array[String] = createTimeArr(1).split(":") // orderInfo.createHour = timeArr(0) //小时 02 orderInfo.createHourMinute = timeArr(0) + ":" + timeArr(1) //02:54 //收件人 电话 脱敏 //切完之后元组 orderInfo.consigneeTel = "*******" + orderInfo.consigneeTel.splitAt(7)._2 //OrderInfo中consigneeTel应该是var可变得 orderInfo } //保存到ES中 orderInfoDStream.foreachRDD{rdd => rdd.foreachPartition{orderItr: Iterator[OrderInfo] => val list: List[OrderInfo] = orderItr.toList MyEsUtil.insertEsBatch(GmallConstant.ES_INDEX_ORDER, list) } } streamingContext.start() streamingContext.awaitTermination() } }View Code
创建index的mapping结构 PUT gmall_order { "mappings" : { "_doc" : { "properties" : { "provinceId" : { "type" : "keyword" }, "consignee" : { "type" : "keyword", "index":false }, "consigneeTel" : { "type" : "keyword", "index":false }, "createDate" : { "type" : "keyword" }, "createHour" : { "type" : "keyword" }, "createHourMinute" : { "type" : "keyword" }, "createTime" : { "type" : "keyword" }, "deliveryAddress" : { "type" : "keyword" }, "expireTime" : { "type" : "keyword" }, "id" : { "type" : "keyword" }, "imgUrl" : { "type" : "keyword", "index":false }, "operateTime" : { "type" : "keyword" }, "orderComment" : { "type" : "keyword", "index":false }, "orderStatus" : { "type" : "keyword" }, "outTradeNo" : { "type" : "keyword", "index":false }, "parentOrderId" : { "type" : "keyword" }, "paymentWay" : { "type" : "keyword" }, "totalAmount" : { "type" : "double" }, "trackingNo" : { "type" : "keyword" }, "tradeBody" : { "type" : "keyword", "index":false }, "userId" : { "type" : "keyword" } } } } }
=========>>>
{
"acknowledged" : true,
"shards_acknowledged" : true,
"index" : "gmall_order"
}
4. 从ES中查询数据,并根据接口发布出来
在Kibana中查询
##########做查询聚合操作########### ##当日总交易金额 GET gmall_order/_search { "query": { "bool": { "filter": { "term": { "createDate": "2019-05-12" } } } }, "aggs": { "sum_totalAmount": { "sum": { "field": "totalAmount" } } } } ==============>>> xxxxxxxxxxxx "aggregations" : { "sum_totalAmount" : { "value" : 15989.0 } } ##分时交易;分组再聚合 GET gmall_order/_search { "query": { "bool": { "filter": { "term": { "createDate": "2019-05-12" } } } }, "aggs": { "groupby_createHour": { "terms": { "field": "createHour", "size": 24 }, "aggs": { "sum_totalAmount": { "sum": { "field": "totalAmount" } } } } } } ========>>> xxxx "aggregations" : { "groupby_createHour" : { "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 0, "buckets" : [ { "key" : "11", "doc_count" : 5, "sum_totalAmount" : { "value" : 1658.0 } }, { "key" : "10", "doc_count" : 3, "sum_totalAmount" : { "value" : 1936.0 } }, { "key" : "12", "doc_count" : 3, "sum_totalAmount" : { "value" : 1199.0 } }, { "key" : "21", "doc_count" : 3, "sum_totalAmount" : { "value" : 2245.0 } }, xxxxx
启动SpringBoot的主类: com.atguigu.gmall.publisher.GmallPublisherApplication
在浏览器中访问:
新增交易额: http://127.0.0.1:8070/realtime-total?date=2019-05-12
[{"name":"新增日活","id":"dau","value":0},{"name":"新增设备","id":"new_mid","value":233},{"name":"新增交易额","id":"order_amount","value":15989.0}]
分时:浏览器中访问:http://127.0.0.1:8070/realtime-hour?id=order_amount&&date=2019-05-12
{"yesterday":
{"00":951.0,"11":1594.0,"22":707.0,"01":2682.0,"12":577.0,"02":1519.0,"13":1478.0,"03":1125.0,"14":1064.0,"04":1487.0,"15":1126.0,"05":517.0,"16":298.0,"06":1785.0,"07":1575.0,"18":463.0,"09":463.0,"20":691.0,"10":2327.0,"21":2283.0},
"today":
{"11":1658.0,"22":1867.0,"12":1199.0,"13":520.0,"03":233.0,"04":1306.0,"15":495.0,"16":1461.0,"05":829.0,"06":811.0,"07":122.0,"18":171.0,"19":1136.0,"10":1936.0,"21":2245.0}}
5. 启动程序,页面展示
启动:gmall-publisher--springBoot的主类: com.atguigu.gmall.publisher.GmallPublisherApplication,给chart的接口,启动
启动:gmall--dw-chart---com.demo.DemoApplication的主类; 接接口展示数据的动态变化
启动:gmall-canal的com.atguigu.gmall.client.CanalClient的客户端,实时监控mysql的变化;
启动:gmall-realtime的com.atguigu.gmall.realtime.app.OrderApp,从kafka中获取数据并实时存到ES中