作者介绍
占怀旻,花名心渡,阿里云数字产业产研部-工业大脑团队的大数据工程师,目前的工作方向是利用大数据与AI技术,为工业企业客户构建数据中台,支撑工业企业的数字化转型和智能制造落地,用大数据技术来普惠更多的中国制造企业。
随着2020年云栖大工业大脑3.0的发布,工业大脑已经经历了多年的发展。本文将为大家分享,在工业数据中台建设中使用DeltaLake的优秀实践,主要包括:
(1) 异地异构流消息的处理
(2) 流批融合的数据分析
(3) 对事务的处理和对算法的支持
- 1.异地异构流消息的处理
对工业企业来说,数据源往往分散于世界各地,集团级别的用户,往往希望以数据中台为中心获取数据,如下图所示:
其中DeltaLake与Structured Streaming结合作用,完成以下两件事情:
1、将各厂区的Kafka实时数据汇总后写入中台的Kafka,供实时类型数据应用使用
2、将各厂区的Kafka数据归档写入中台的HDFS,供离线分析类型数据应用使用
有很多大数据组件可以完成上面的任务,比如Flink和Flume等,但是以下理由让我们最终选择了DeltaLake:
1、支持使用正则消费多个Kafka Topic
使用SubscribePattern,可以使用正则实现同时消费多个Topic的数据,在一个园区有许多个Topic需要消费的场景下非常方便
2、对HDFS的支持和小文件合并的封装
在遇到“将Kafka的数据实时写入HDFS”的场景时,用DeltaLake也很方便,主要有2点原因:
1、天然对写HDFS的支持,可以免去使用Flink的时候需要编写HDFS Sinker,或者额外运维Flume集群带来的麻烦
2、每一个流式入库的场景,对于数据架构师来说都是一个性能与时效性的权衡取舍过程,不管是Flink、Flume还是SparkStreaming,都会有“滚动写入容量(或条数)阈值”和“滚动写入时间阈值”的设计,在实际的实施过程中,根据业务对于数据延迟和性能的需求不同,来权衡二者。例如对于延迟容忍度很低的场景,可以将容量或条数阈值设置的很小(甚至为1)来让新的数据快速滚动写入,但是这样带来的副作用是Sinker的频繁IO,比如在HDFS产生很多的小文件,影响数据读写或DataNode的性能;在延迟容忍度较高的场景下,交付工程师则往往选择将条数阈值和时间阈值加大,带来更好的IO性能,但牺牲数据延迟。这是一种通用的方法,但在实际生产过程中,你会发现,要为许多的流作业维护许多不同的配置,这项工作的成本依然不小。
使用DeltaLake来处理,则可以轻松很多,你可以将所有的流作业的滚动写入阈值设置成一样的(比如都比较小),这样所有的流作业都可以得到比较好的数据延迟,同时结合使用DeltaLake的特性功能Optimize和Vacuum,配置定时调度任务来周期执行,对小文件进行合并或删除,来保障HDFS的性能,这样可以使整个数据开发工作简单很多,也更好运维。
关于Optimize特性的参考
- 2.流批融合的数据分析
在生产制造环节,机器设备的稳定运行对于产成品质量至关重要,而判定设备是否稳定运行的最直观方法,就是查看某些传感器的历史长时间历史趋势,在实际项目实施过程中,交付工程师往往使用流作业,将Kafka中大量的传感器时序数据加工后写入OLAP存储(例如阿里云ADB、TSDB或HBase等),来支撑上层数据分析应用的高并发、低响应时间的实时查询需求。
但是实际情况往往比这复杂得多,由于工业企业的信息化和数字化水平普遍不高,不同行业的生产过程自动化程度也参次不齐,有许多的设备实时数据其实并不准确,它们需要在若干时间以后(数分钟或者数小时),经过人工干预或者重新计算较正后才能使用。
所以在实际实施过程中,往往采用一种“滚动覆盖”的模式来不断改写OLAP存储中的数据,将OLAP分为“实时增量区”和“周期覆盖区”,例如下图所示:
上图使一个OLAP存储,所有的数据被分为橙色和蓝色两部分,上层数据应用可以无差别地查询这两个区域的数据,唯一的差别是:橙色的最新数据,由流计算作业实时从Kafka获取,做加工后写入;而蓝色区域,则由历史数据周期性计算(加入矫正逻辑)后写入,对昨天或更久之前的实时数据进行订正,这样周期往复,在保障数据时效性的同时,对历史数据做订正覆盖,来保障数据的正确性。
在以往的做法中,往往使用一个流+批的Lambda架构,用两种不同的计算引擎来处理流与批,如下图所示:
Lambda架构的弊端也可由此可见,在两个不同的平台维护两台代码,还要保障它们两的计算逻辑完全一致,是比较费功夫的事情,在引入DeltaLake之后,事情变得相对简单,Spark天生的流批一体设计,就很好地解决了代码复用和跨平台逻辑统一的问题,结合DeltaLake的特性(例如ACID,OPTIMIZE等),可以更优雅地完成这项工作,如下图:
另外值得一提的是,流批一体并不是Spark的独有特性,但是阿里云EMR在SparkSQL和Spark Streaming之上又对SQL进行了一层封装,使得业务人员能够更低门槛地使用类似Flink SQL的语法来进行作业开发,使得流批场景下的代码复用和运维工作变得更加简单,这一点对于项目交付提效意义很大,点击此处可具体参考。
- 3.对事务的处理和对算法的支持
传统的数据仓库,很少会在建模过程中引入事务,由于数据仓库要反映数据的变化情况,所以往往使用缓慢变化维度等方法来记录数据的状态变化,而并不会用ACID来让数据仓库与业务系统保持一致。
但是在工业数据中台的实施过程中,事务有他独特的使用场景,例如排产排程,是每一个工业企业都关心的重大问题,排产,往往从集团级别进行,根据客户订单、物料库存和工厂产能等角度来对当期的生产需求进行合理的分解和编排,来达到产能合理分配;排程则往往更加微观,在工厂级别,根据工单、物料和实际的生产情况来实时动态调整生产计划,达到资源利用率最大。它们都是需要众多数据融合求解的规划问题,如下图所示:
排产排程算法所需要的原始数据,往往来自多个业务系统,例如ERP提供订单和计划数据,WMS提供物料数据,MES提供工单和工序数据,这些数据必须融合到一起(物理上和逻辑上),才能作为排产排程算法的有效输入,所以在实施过程中往往需要一个统一的存储来存放来自各系统的数据。同时排产排程算法对数据的实效性也有一定的要求,它需要输入的数据能够尽量与各个业务系统保持一致,这样才能真实地反映出当时的生产情况,以便更好的进行排程。
在以往,我们这么处理这种场景:
1)利用各个业务系统的CDC能力,或者单独编写程序来轮询,准实时地获取数据变化
2)写入关系型数据库,在此过程中处理数据Merge的逻辑,让关系型数据库中的数据与业务系统数据准实时地保持一致
3)排产排程引擎在被触发的时候,从RDB拉取数据进行运算
这种架构有一些显而易见的问题,主要有:
1)用RDB替代大数据存储,计算的时候把数据Query到内存中,对于数据量比较大的情况会很困难
2)如果用Hive引擎来替代中间的RDB,虽然在Hive3.X支持ACID,但是实时性和MapReduce编程框架对于算法(求解器)的支持都难以满足工程需求
目前我们正在尝试引入DeltaLake,结合Spark的特性来优化这个架构,如下图:
优化后的架构,有如下优点:
1)使用HDFS+Spark替代RDB作为中台存储,解决数据量大时候的存储问题
2)使用Spark Streaming+DeltaLake来对接原始数据,利用DeltaLake的ACID特性来处理数据进入中台存储时的Merge逻辑,同时在流式入库的时候同时对数据进行Merge+Optimize,保障读写性能
3)排产排程引擎不再从中台Query数据到内存计算,而是把算法任务封装成Spark作业,下发到计算平台完成计算,这样利用Spark ML编程框架对算法和Python的良好支持,以及Spark本身的分布式计算能力,对需要多轮迭代的规划算法进行分布式处理
4)利用DeltaLake的Time Travel特性,对数据版本进行管理或回滚,这对于算法模型的调试和评估是非常有利的
- 4.总结
1)DeltaLake的核心能力ACID对于数据实时性和准确性要求较高的应用很有帮助,尤其是算法应用,可以更有效地利用Spark对ML的天然支持
2)结合使用DeltaLake的Optimize+Vacuum和Streaming的流式入库能力,在大批量对接上游Kafka数据的时候会有更好的兼容性,同时可以有效的降低运维成本
3)利用阿里云EMR团队封装的Streaming SQL开发流作业,在大规模的数据中台项目实施过程中可以有效降低开发门槛和成本
目前,DeltaLake在工业大脑的应用尚在实验阶段,例如流式入库、排产排程引擎、流批融合等多个场景正在工业大脑多个项目中应用,同时这些场景也在逐渐沉淀为工业大脑的标准产品,后续结合工业大脑3.0的数据+算法场景的可视化编辑和复制能力,可以快速复制到离散制造、汽车、钢铁等多个行业的场景中,用AI能力普惠中国工业。
感兴趣的同学可以点击此处参考
了解更多相关技术信息,扫描下方钉群二维码