通过EMR Spark Streaming实时读取Tablestore数据

本文将介绍如何在E-MapReduce中实时流式的处理Tablestore中的数据。

场景设计

随着互联网的发展,企业中积累的数据越来越多,数据的背后隐藏着巨大的价值,在双十一这样的节日中,电子商务企业都会在大屏幕上实时显示订单总量,由于订单总量巨大,不可能每隔一秒就到数据库中进行一次SQL统计,此时就需要用到流计算,而传统的方法都是需要借助Kafka消息队列来做流式计算,数据订单需要写入数据库与Kafka中,Spark Streaming 消费来自Kafka中的订单信息。
而本文使用的Tablestore数据库可以直接利用它的通道服务功能,供Spark Streaming流式消费,进而计算订单的数量及金额,简化了整个流程,具体如下图所示
通过EMR Spark Streaming实时读取Tablestore数据

本文将介绍一个简单的demo,流式统计Tablestore数据表中字段出现的个数。

前提条件

确保将Tablestore实例部署在E-MapReduce集群相同的VPC环境下

准备工作

步骤一 创建Tablestore数据源表

详细开通步骤请参考官方文档,本文demo中所创建出来的表名为SmallTarget,表的Schema如下图所示,该表有PKString和PkInt两个主键,类型分别为String和Interger。
通过EMR Spark Streaming实时读取Tablestore数据

为表SmallTarget建立一个增量通道,如下图所示,通道列表里面会显示该通道的名字、ID以及类型。

通过EMR Spark Streaming实时读取Tablestore数据

技术注解:
通道服务(Tunnel Service)是基于Tablestore数据接口之上的全增量一体化服务,包含三种通道类型:

  • 全量:对数据表中历史存量数据消费处理
  • 增量:对数据表中新增数据消费处理
  • 全量加增量:先对数据表总历史存量数据消费,之后对新增数据消费

通道服务的详细介绍可查询> Tablestore官网文档

步骤二 获取相关jar包并上传到hadoop集群

  • 获取环境依赖的JAR包。
Jar包 获取方法
emr-tablestore-X.X.X.jar
X.X.X: Since 1.9.0+
Maven 库中下载:https://mvnrepository.com/artifact/com.aliyun.emr/emr-tablestore
tablestore-X.X.X-jar-with-dependencies.jar 下载 EMR SDK 相关的Tablestore依赖包。https://repo1.maven.org/maven2/com/aliyun/openservices/tablestore/5.3.0/tablestore-5.3.0-jar-with-dependencies.jar
  • 集群管理页面,单击已创建的Hadoop集群的集群ID ,进入集群与服务管理页面。
  • 在左侧导航树中选择主机列表,然后在右侧查看Hadoop集群中emr-header-1主机的IP信息。
  • 在SSH客户端中新建一个命令窗口,登录Hadoop集群的emr-header-1主机。
  • 上传所有JAR包到emr-header-1节点的某个目录下。

步骤三 运行Spark Streaming作业

  1. 以一个基于emr demo修改的WordCount为样例,编译生成JAR包,JAR包需要上传到Hadoop集群的emr-header-1主机中(参见步骤二),本例测试用的JAR包地址,完整的WordCount代码参见附录(后续会合到emr demo项目中)。
  2. 该样例以Tablestore表作为数据源,统计的是主键PkString。SmallTarget数据表中数据初始时为空,如下图所示。
    通过EMR Spark Streaming实时读取Tablestore数据

3. 启动spark streaming,开启一个统计SmallTarget表中通道zengliang(通道名)实时消费数据的一个监听程序。

spark-submit --class com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount  --jars emr-tablestore-X.X.X-SNAPSHOT.jar,tablestore-X.X.X-jar-with-dependencies.jar examples-X.X.X-shaded.jar <instance> <tableName> <tunnelId> <accessKeyId> <accessKeySecret> <endPoint> <maxOffsetsPerChannel>
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

各个参数的详细说明如下表所示。

参数 参数说明
com.aliyun.emr.example.spark.sql.streaming.StructuredTableStoreWordCount 所要运行的主程序类
emr-tablestore-X.X.X-SNAPSHOT.jar 包含Tablestore source的jar包
tablestore-X.X.X-jar-with-dependencies.jar EMR SDK 相关的Tablestore依赖包
examples-X.X.X-shaded.jar 基于EMR demo修改的包(包含主程序类)
instance Tablestore实例名
tableName Tablestore表名
tunnelId Tablestore表的通道Id
accessKeyId Tablestore的accessKeyId
accessKeySecret Tablestore的秘钥
endPoint Tablestore实例的endPoint
maxOffsetsPerChannel Tablestore通道 Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。
catalog 同步的列名,详见下文Catalog字段说明

向SmallTarget数据表插入数据,spark streaming会实时更新,如下面两张图所示。
通过EMR Spark Streaming实时读取Tablestore数据
通过EMR Spark Streaming实时读取Tablestore数据

Catalog字段说明

Catalog为一个Json字符串,"columns"里面指定的是需要读取的列的自定义配置,这些列的键值对最终会被TableStore Source所输出,如下图所示,会读取PkString, PkInt和col1三列,其类型分别为string, long和string。

{
  "columns": {
    "PkString": {
      "col": "PkString",
      "type": "string"
    },
    "PkInt": {
      "col": "PkInt",
      "type": "long"
    },
    "col1": {
      "col": "col1",
      "type": "string"
    }
  }
}

预定义列说明

除去Catalog字段中的用户自定义列之外,Tablestore Source的输出默认还会加上一些预定义列的值,以供下游系统灵活的使用,预定义字段说明如下表所示:

预定义列名 说明
__ots_record_type__ Tablestore行操作类型(PUT, UPDATE, DELETE)
__ots_record_timestamp__ Tablestore行的时间戳,单位为ms
__ots_column_type_<ColumnName> 某列的操作类型,其中ColumnName为Catalog字段中定义的列

附录

package com.aliyun.emr.example.spark.sql.streaming

import java.util.UUID

import org.apache.spark.sql.SparkSession

object StructuredTableStoreWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 7) {
      System.err.println(
        "Usage: StructuredTableStoreWordCount <ots-instanceName>" +
          "<ots-tableName> <ots-tunnelId> <access-key-id> <access-key-secret> <ots-endpoint>" +
          "<max-offsets-per-channel> [<checkpoint-location>]"
      )
    }

    val Array(
    instanceName,
    tableName,
    tunnelId,
    accessKeyId,
    accessKeySecret,
    endpoint,
    maxOffsetsPerChannel,
    _*
    ) = args

    System.out.println(args.toSeq.toString)

    val checkpointLocation =
      if (args.length > 7) args(7) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession.builder.appName("TableStoreWordCount").master("local[5]").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

    val lines = spark.readStream
      .format("tablestore")
      .option("instance.name", instanceName)
      .option("table.name", tableName)
      .option("tunnel.id", tunnelId)
      .option("endpoint", endpoint)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("maxOffsetsPerChannel", maxOffsetsPerChannel) // default 10000
      .option(
        "catalog",
        "{\"columns\":{\"PkString\":{\"col\":\"PkString\",\"type\":\"string\"},\"PkInt\":{\"col\":\"PkInt\",\"type\":\"long\"}," +
          "\"col1\":{\"col\":\"col1\",\"type\":\"string\"}}}"
      )
      .load()
      .selectExpr("PkString", "PkInt", "__ots_record_type__")
      .as[(String, Long, String)]

    val wordCounts = lines
      .flatMap(line => {
        System.out.println(s"wordCount line: ${line}")
        line._1.split(" ")
      })
      .groupBy("value")
      .count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", checkpointLocation)
      .start()

    query.awaitTermination()
  }
}

写在最后

本篇文章我们介绍了如何在E-MapReduce中实时流式的处理Tablestore中的数据,如果对基于Tablestore的大数据存储分析感兴趣的朋友可以加入我们的技术交流群(钉钉:23307953 或者11789671),来与我们一起探讨。
通过EMR Spark Streaming实时读取Tablestore数据

上一篇:spark 启动thrift server 支持 jdbc连接


下一篇:Sharepoint和Frontpage安全审计工具 – Sparty V0.1