实时--业务数据分析

业务数据部分:

mysql ---->canal---->kafka----->sparkStreaming(充当ETL)--->ES--->SpringBoot接口--->页面展示

单日订单量及收入

1. 搭建利用canal对mysql中的数据实时监控

canal的安装和部署:https://www.cnblogs.com/shengyang17/p/10834781.html 

2. 利用Java程序获得canal数据

创建gmall-canal模块

  搭建利用canal对mysql中的数据实时监控: 前提是canal服务端要一直打开这;

  搭建gmall-canal的maven模块: 利用Java程序获得canal数据:①CanalClient交互性连接,  ②handler是处理具体的业务; ③发送到Kafka

Canal客户端:CanalClient.java

实时--业务数据分析
public class CanalClient {
    public static void main(String[] args) {
        //连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop101", 11111), "example", "", "");
        while (true){
            canalConnector.connect();
            canalConnector.subscribe("gmall.order_info");//订阅
            Message message = canalConnector.get(100); //message:一次canal从日志中抓取的信息,一个message包含多个sql; 
            int size = message.getEntries().size();

            if (size == 0){
                System.out.println("没有数据休息5s");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else {
                for (CanalEntry.Entry entry : message.getEntries()) { //entry:相当于一个sql命令,一个sql可能会对多行记录造成影响:
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA){ // ①type用于区分是数据变化,还是事务变化
                        CanalEntry.RowChange rowChange = null;
                        try {
                           rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //②storevalue得到rowchange:entry经过反序列化得到的对象,包含了多行记录的变化值
                        } catch (InvalidProtocolBufferException e) {  //②.1 eventtype数据的变化类型  insert  update  delete  create  alter  drop;②.2 rowdatalist
                            e.printStackTrace();
                        }
                        String tableName = entry.getHeader().getTableName();    // ③header.tableName 表名
                        CanalEntry.EventType eventType = rowChange.getEventType(); //insert update delete
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();//行集, 数据
                        CanalHandler.handler(tableName, eventType, rowDatasList);
                        //RowDatas:一个rowchange里包含的数据变化集,其中每一个rowdata里面包含了一行的多个字段;包含afterColumnList和beforeColumnList
                        //column: 一个RowData里包含了多个column,每个column包含了 name和 value: columnName和columnValue
                    }

                }
            }

        }
    }
}
View Code

CanalHander.java

实时--业务数据分析
public class CanalHandler {
    public static void handler(String tableName, CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDataList){
        //下单操作
        if ("order_info".equals(tableName) && CanalEntry.EventType.INSERT == eventType){
            //遍历行集
            for (CanalEntry.RowData rowData : rowDataList) {
                List<CanalEntry.Column> columnsList = rowData.getAfterColumnsList();
                JSONObject jsonObject = new JSONObject();
                for (CanalEntry.Column column : columnsList) {
                    System.out.println(column.getName() + ":" + column.getValue());
                    String propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
                    jsonObject.put(propertyName, column.getValue());
                }
                MyKafkaSender.send(GmallConstant.KAFKA_TOPIC_ORDER, jsonObject.toJSONString());

            }
        }
    }
}
View Code

MyKafkaSender.java

实时--业务数据分析
public class MyKafkaSender {

    public static KafkaProducer<String, String> kafkaProducer = null;
    public static KafkaProducer<String, String> createKafkaProducer(){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = null;

        try {
            producer = new KafkaProducer<String, String>(properties);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return producer;
    }
    public static void send(String topic, String msg){
        if (kafkaProducer == null){
            kafkaProducer = createKafkaProducer();
        }
        kafkaProducer.send(new ProducerRecord<String, String>(topic, msg));
    }
}
View Code

3. sparkstreaming消费kafka并保持到ES中

 

OrderAPP.java   从kafka获取数据到sparkStreaming中并做处理(对手机号做脱敏处理)且写入到ES中,:

实时--业务数据分析
object OrderApp {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("gmall").setMaster("local[*]")
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val inputDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstant.KAFKA_TOPIC_ORDER, streamingContext)
/*    inputDStream.map(_.value()).foreachRDD{ rdd =>
      println(rdd.collect().mkString("\n")) //测试数据是否收集到
    }*/

    val orderInfoDStream: DStream[OrderInfo] = inputDStream.map {
      _.value()
    }.map { orderJson =>
      val orderInfo: OrderInfo = JSON.parseObject(orderJson, classOf[OrderInfo])
      //转变为Json对象
      val createTimeArr: Array[String] = orderInfo.createTime.split(" ")
      orderInfo.createDate = createTimeArr(0) //2019-05-03
      val timeArr: Array[String] = createTimeArr(1).split(":") //
      orderInfo.createHour = timeArr(0) //小时 02
      orderInfo.createHourMinute = timeArr(0) + ":" + timeArr(1)
      //02:54
      //收件人 电话 脱敏  //切完之后元组
      orderInfo.consigneeTel = "*******" + orderInfo.consigneeTel.splitAt(7)._2 //OrderInfo中consigneeTel应该是var可变得
      orderInfo
    }
    //保存到ES中
    orderInfoDStream.foreachRDD{rdd =>
      rdd.foreachPartition{orderItr: Iterator[OrderInfo] =>
        val list: List[OrderInfo] = orderItr.toList
        MyEsUtil.insertEsBatch(GmallConstant.ES_INDEX_ORDER, list)
      }
    }
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
View Code

 

创建index的mapping结构
PUT gmall_order 
{
  "mappings" : {
    "_doc" : {
      "properties" : {
        "provinceId" : {
          "type" : "keyword"
        },
        "consignee" : {
          "type" : "keyword",
          "index":false
        },
        "consigneeTel" : {
          "type" : "keyword",
          "index":false
        },
        "createDate" : {
          "type" : "keyword"
        },
        "createHour" : {
          "type" : "keyword"
        },
        "createHourMinute" : {
          "type" : "keyword"
        },
        "createTime" : {
          "type" : "keyword"
        },
        "deliveryAddress" : {
          "type" : "keyword"
        },
        "expireTime" : {
          "type" : "keyword"
        },
        "id" : {
          "type" : "keyword"
        },
        "imgUrl" : {
          "type" : "keyword",
          "index":false
        },
        "operateTime" : {
          "type" : "keyword"
        },
        "orderComment" : {
          "type" : "keyword",
          "index":false
        },
        "orderStatus" : {
          "type" : "keyword"
        },
        "outTradeNo" : {
          "type" : "keyword",
          "index":false 
        },
        "parentOrderId" : {
          "type" : "keyword" 
        },
        "paymentWay" : {
          "type" : "keyword"
        },
        "totalAmount" : {
          "type" : "double"
        },
        "trackingNo" : {
          "type" : "keyword"
        },
        "tradeBody" : {
          "type" : "keyword",
          "index":false
        },
        "userId" : {
          "type" : "keyword"
        }
      }
    }
  }
}

  =========>>>
  {
    "acknowledged" : true,
    "shards_acknowledged" : true,
    "index" : "gmall_order"
  }

4. 从ES中查询数据,并根据接口发布出来

在Kibana中查询

##########做查询聚合操作###########

##当日总交易金额
GET gmall_order/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "createDate": "2019-05-12"
        }
      }
    }
  },
  "aggs": {
    "sum_totalAmount": {
      "sum": {
        "field": "totalAmount"
      }
    }
  }
}
==============>>>
 xxxxxxxxxxxx
  "aggregations" : {
    "sum_totalAmount" : {
      "value" : 15989.0
    }
  }

##分时交易;分组再聚合
GET gmall_order/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "createDate": "2019-05-12"
        }
      }
    }
  },
  "aggs": {
    "groupby_createHour": {
      "terms": {
        "field": "createHour",
        "size": 24
      },
      "aggs": {
        "sum_totalAmount": {
          "sum": {
            "field": "totalAmount"
          }
        }
      }
    }
  }
}

========>>>
xxxx
 "aggregations" : {
    "groupby_createHour" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "11",
          "doc_count" : 5,
          "sum_totalAmount" : {
            "value" : 1658.0
          }
        },
        {
          "key" : "10",
          "doc_count" : 3,
          "sum_totalAmount" : {
            "value" : 1936.0
          }
        },
        {
          "key" : "12",
          "doc_count" : 3,
          "sum_totalAmount" : {
            "value" : 1199.0
          }
        },
        {
          "key" : "21",
          "doc_count" : 3,
          "sum_totalAmount" : {
            "value" : 2245.0
          }
        },
        xxxxx

启动SpringBoot的主类: com.atguigu.gmall.publisher.GmallPublisherApplication

在浏览器中访问:

新增交易额: http://127.0.0.1:8070/realtime-total?date=2019-05-12

[{"name":"新增日活","id":"dau","value":0},{"name":"新增设备","id":"new_mid","value":233},{"name":"新增交易额","id":"order_amount","value":15989.0}]

分时:浏览器中访问:http://127.0.0.1:8070/realtime-hour?id=order_amount&&date=2019-05-12

{"yesterday":
  {"00":951.0,"11":1594.0,"22":707.0,"01":2682.0,"12":577.0,"02":1519.0,"13":1478.0,"03":1125.0,"14":1064.0,"04":1487.0,"15":1126.0,"05":517.0,"16":298.0,"06":1785.0,"07":1575.0,"18":463.0,"09":463.0,"20":691.0,"10":2327.0,"21":2283.0},
"today":
  {"11":1658.0,"22":1867.0,"12":1199.0,"13":520.0,"03":233.0,"04":1306.0,"15":495.0,"16":1461.0,"05":829.0,"06":811.0,"07":122.0,"18":171.0,"19":1136.0,"10":1936.0,"21":2245.0}}

5. 启动程序,页面展示 

启动:gmall-publisher--springBoot的主类: com.atguigu.gmall.publisher.GmallPublisherApplication,给chart的接口,启动

 启动:gmall--dw-chart---com.demo.DemoApplication的主类; 接接口展示数据的动态变化

启动:gmall-canal的com.atguigu.gmall.client.CanalClient的客户端,实时监控mysql的变化;

启动:gmall-realtime的com.atguigu.gmall.realtime.app.OrderApp,从kafka中获取数据并实时存到ES中

实时--业务数据分析

 



 

上一篇:Kubernetes-保障集群内节点和网络安全


下一篇:轻量级网络:MutualNet:,Adaptive ConvNet via Mutual Learning from Network Width and Resolution