X-Pack Spark对接阿里云日志服务LogHub

概述

X-Pack Spark分析引擎是基于Spark提供的复杂分析、流式处理、机器学习的能力。Spark分析引擎可以对接阿里云的多种数据源,例如:云HBase数据库、MongoDB、Phoenix等,同时也支持对接阿里云日志服务LogHub。阿里云日志服务(Log Service,简称LOG)是针对实时日志数据的一站式服务,提供日志类数据采集、消费、投递及查询分析功能,全面提升海量日志处理和分析能力。

场景介绍

某一款销售平台的APP,针对用户在APP中打开首页、搜索、商品详细页以及最终下订单购买商品等操作,操作所产生的事件均记录到阿里云日志系统中。现需要对APP的用户的行为数据做一些统计分析,每天、每周出详细的运营数据、以及给用户提供在线查询账单等。

如何实现

通过阿里云的日志服务+X-Pack Spark+云HBase完成这些诉求。先看下整理的数据流图:

X-Pack Spark对接阿里云日志服务LogHub


由上图可见数据流程为:用户通过LogHub对接APP的日志数->Spark Streming 对接LogHub同步数到HBase(Phoenix)->在线数据同步到Spark离线数仓->离线数仓批量计算输出运营数据等。
APP日中包含用户的使用APP所产生的事件信息,下面以一个简单的例子说明下每一个步骤的实现。

LogHub对接APP日志

假设APP的日志产生在某机器的目录文件中,通过LogHub可以对接机器的文件,读取解析日志。假设日志的字段信息如下:

event_time: long #事件产生的时间戳
user_id: string #用户ID,唯一值。
device_id: String #设备id,APP使用的设备。
event_name: String #事件名称,如:首页、搜索、明细页、购买
prod_id: String #商品ID。
stay_times: int #停留时间。

上述信息在APP的日志中使用逗号分隔符,所以在LogHub配置指定采集模式时选择逗号分隔。

SparkStreaming 对接APP

SparkStreaming 对接APP可以使用X-Pack Connectors中对接LogHub的插件(可参考:Spark对接LogHub快速入门)。SparkStreaming对接LogHub可以设置每个1分钟同步一次数据到Phoenix。
同步数据之前需要在Phoenix中创建一张表,如下:

CREATE TABLE IF NOT EXISTS user_event (
   event_time BIGINT NOT NULL,
   user_id VARCHAR NOT NULL,
   device_id VARCHAR,
   event_name VARCHAR,
   prod_id VARCHAR
   CONSTRAINT my_pk PRIMARY KEY (event_time, user_id)
  );

Phoenix表user_event使用user_event和user_id作为组合主键,主要是为了使用user_id进行运营明细查询,时间信息方便按照时间范围同步数据到Spark。
SparkStreaming同步LogHub数据到Phoenix的代码主要逻辑如下:

val loghubStream = LoghubUtils.createStream(
        ssc,
        loghubProject,
        logStore,
        loghubGroupName,
        endpoint,
        numReceiver,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK)

      loghubStream.foreachRDD { rdd =>
        rdd.foreachPartition { pt =>
          // 获取Phoenix的链接
          val phoenixConn = DriverManager.getConnection("jdbc:phoenix:" + zkAddress)
          val statment = phoenixConn.createStatement()
          var i = 0
          while (pt.hasNext) {
            val value = pt.next()
            //获取的LogHub的数据是json格式的,需要进行转换
            val valueFormatted = JSON.parseObject(new String(value))
            //构造phonenix 插入语句
            val insetSql = s"upsert into $phoenixTableName values(" +
              s"${valueFormatted.getLong("event_time")}," +
              s"'${valueFormatted.getString("user_id").trim}'," +
              s"'${valueFormatted.getString("device_id").trim}'," +
              s"'${valueFormatted.getString("event_name").trim}'," +
              s"'${valueFormatted.getString("prod_id").trim}')"
            statment.execute(insetSql)
            i = i + 1
            // 每隔batchSize行提交一次commit到Phoenix。
            if (i % batchSize == 0) {
              phoenixConn.commit()
              println(s"====finish upsert $i rows====")
            }
          }
          phoenixConn.commit()
          println(s"==last==finish upsert $i rows====")
          phoenixConn.close()
          }
      }

SparkStreaming同步数据到Phoenix后,可以对Phoenix数据库进行用户明细查询。例如:

# 查询用户user_id_1006所有浏览明细。
select * from user_event where user_id = 'user_id_1006';

同步到Spark离线数仓

Phoenix在线数据库适合明细查询,如果需要进行统计、离线计算需要用到Spark数仓。Phoenix同步数据到Spark数仓实质就是在Spark上创建表,然后把数据同步一份到Spark表中。
归档数据到Spark请参考:批量归档。 本文用Sql表示下同步的逻辑,这里假设数据每天同步一次到Spark 。
Spark 中建表、同步的方法如下:

#在Spark中创建Parquet格式表:user_event_parquet,使用dt作为分区字段。
create table user_event_parquet(
    event_time long,
    user_id string,
    device_id string,
    event_name string,
    prod_id string, 
    dt string
) using parquet
partitioned by(dt);

#  在Spark中创建表user_event_phoenix映射Phoenix数据库的表。
CREATE TABLE user_event_phoenix USING org.apache.phoenix.spark
OPTIONS (
'zkUrl' 'hb-xx-master3-001.hbase.rds.aliyuncs.com:2181,hb-xx-master1-001.hbase.rds.aliyuncs.com:2181,hb-xx-master2-001.hbase.rds.aliyuncs.com:2181',
'table' 'user_event'
);

# 向Parquet表:user_event_parquet插入一天:2019-01-01的数据
insert into user_event_parquet select EVENT_TIME,USER_ID,DEVICE_ID,EVENT_NAME,PROD_ID,'2019-01-01' from user_event_phoenix where EVENT_TIME >=1546272000 and EVENT_TIME < 1546358400

离线数仓批量计算

数据同步到Spark可以对Spark数据做统计分析预算,例如:

#统计每天的访问数
select dt, count(*) from user_event_parquet group by dt
#统计前十的访问
select dt, count(*) total from user_event_parquet group by dt order by total desc limit 10
#统计前100个用户的访问数
select dt,user_id, count(*) total  from user_event_parquet group by dt,user_id order by total desc limit 100

计算的结果可以回写到业务数据库,供业务查询、出报表等。

小结

本文简单介绍了Spark如何对接LogHub以及如何同步数据等常用的操作。参考链接如下:

上一篇:云HBase Spark分析引擎对接云数据库POLARDB


下一篇:mica v2.0.0 强化基础工具集