通过在E-MapReduce集群中使用Spark SQL访问表格存储。对于流计算,基于通道服务,利用CDC(数据变更捕获)技术完成Spark的mini batch流式消费和计算,同时提供了at-least-once一致性语义。
前提条件
- 已创建E-MapReduce Hadoop集群。具体操作,请参见创建集群。
创建集群时,请确保打开挂载公网开关,将集群挂载到公网,用于Shell远程登录服务器。
说明:本文使用Shell命令演示,如果需要使用E-MapReduce的图形化页面进行数据开发。具体操作,请参见数据开发。
- 已上传emr-datasources_shaded_2.11-2.2.0-SNAPSHOT.jar包到EMR Header服务器。
快速开始
1. 在表格存储侧创建数据表和通道
1.创建Source表和Sink表。具体操作,请参见概述。
Source表名称为OrderSource,主键列分别为UserId(用户ID)和OrderId(订单ID),属性列分别为price(价格)和timestamp(订单时间),数据示例如下图所示。
2.Sink表名称为OrderStreamSink,主键列分别为begin(开始时间)、end(结束时间),属性列分别为count(订单数)和totalPrice(订单总金额)。其中开始时间和结束时间的格式为“yyyy-MM-dd HH:mm:ss”,例如2019-11-27 14:54:00。
3.在Source表上创建通道。具体操作,请参见快速入门。
通道列表中会显示该通道的通道名、通道ID、通道类型等信息。其中通道ID用于后续的流式处理。
2. 在EMR集群侧创建Spark外表
1.登录EMR Header服务器
2.执行如下命令启动spark-sql命令行,用于Spark外表创建和后续的SQL实战操作。
其中Spark的标准启动参数为--num-executors 32 --executor-memory 2g --executor-cores 2,可以根据具体的集群配置进行自定义调整。表示上传jar包的版本信息,请根据实际填写,例如2.1.0-SNAPSHOT。
```shell
spark-sql --jars emr-datasources_shaded_2.11-<Version>.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2
```
3.创建Source外表order_source(对应表格存储的OrderSource表)。
-
参数 参数 说明 endpoint 表格存储实例访问地址,EMR集群中使用VPC地址。 access.key.id 阿里云账号的AccessKey ID。 access.key.secret 阿里云账号的AccessKey Secret。 instance.name 表格存储实例访问地址,EMR集群中使用VPC地址。 table.name 表格存储的数据表名称。 catalog 数据表的Schema定义。 - 示例
DROP TABLE IF EXISTS order_source; CREATE TABLE order_source USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderSource", catalog='{"columns": {"UserId": {"type": "string"}, "OrderId": {"type": "string"},"price": {"type": "double"}, "timestamp": {"type": "long"}}}' );
3. 实时流计算
实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回表格存储的数据表中。
1.创建流计算的Sink外表order_stream_sink(对应表格存储的OrderStreamSink表)。创建Sink外表与创建Source外表的参数设置中只有catalog字段有差别,其他参数设置均相同。
2.在Source外表order_source上创建视图。创建视图时需要设置Source表上通道的通道ID。
3.在视图上运行Stream SQL作业进行实时聚合,且将聚合结果实时写回表格存储的OrderStreamSink表。
```sql
//创建Sink外表order_stream_sink(对应表格存储的OrderStreamSink表)。
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"type": "string"},"end": {"type": "string"}, "count": {"type": "long"}, "totalPrice": {"type": "double"}}}'
);
//在order_source表上创建视图order_source_stream_view。
CREATE SCAN order_source_stream_view ON order_source USING STREAM
OPTIONS(
tunnel.id="4987845a-1321-4d36-9f4e-73d6db63bf0f",
maxoffsetsperchannel="10000");
//在视图order_source_stream_view上运行Stream SQL作业,如下样例会按30s粒度进行订单数和订单金额的聚合。
//将聚合结果实时写回表格存储OrderStreamSink表。
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");
```
运行Stream SQL后,可以实时得到聚合结果,聚合结果样例如下图所示,聚合结果保存在OrderStreamSink表中。