日志服务数据如何同步到 MaxCompute |学习笔记

开发者学堂课程【SaaS 模式云数据仓库系列课程 —— 2021 数仓必修课:日志服务数据如何同步到 MaxCompute】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/55/detail/1040


日志服务数据如何同步到 MaxCompute

目录:

实验目的

方案介绍

方案比较及场景应用

操作步骤

 

一.实验目的

日常工作中,企业需要将通过 ECS 、容器、移动端、开源软件、网站服务、 JS 等接入的实时日志数据进行日志实时查询与分析、采集与消费、数据清洗与流计算( ETL / Stream Processing )、数据仓库对接 ( Data Warehouse )等场景的使用。

通过本次实验,我们可以学习了解日志数据如何同步到 MaxCompute 。

 

二.方案介绍

方案一:

使用 Tunnol 命令上传日志数据到 MaxCompute 。

方案二:

通过 DataHub 投递数据到 MaxCompute 。 DataHub DataConnector 是把 DataHub 服务中的流式数据同步到其他云产品中的功能,目前支持将 Topic 中的数据实时/准实时同步到 MaxCompute 、 OSS 、 ElasticSearch 、 RDS Mysql 、 ADS 、 TableStore 中。用户只需要向 DataHub 中写入一次数据,并在 DataHub 服务中配置好同步功能,便可以在各个云产品中使用这份数据。

方案三:

通过 SLS 实时采集与消费( LogHub )投递数据到 MaxCompute 。

通过 DataWorks 的数据集成( Data Integration )功能投递至 MaxCompute 。

方案四:

通过 Kafka 订阅实时数据投递至 MaxCompute 。

 

三.方案比较及场景应用

Tunnel 

Tunnel 用于批量上传数据到离线表里,适用于离线计算的场景。对于特殊格式日志,我们一般建议作为一个字段上传,到 Maxcomopute 里在进行拆分。

DataHub

Datahub 用于实时上传数据的场景,主要用于流式计算的场景。数据上传后会保存到实时表里,后续会在几分钟内通过定时任务的形式同步到离线表里,供离线计算使用。

日志服务( SLS )

(实时采集与消费) LogHub 用途:数据清洗( ETL )、流计算 ( Stream compute )、监控与报警、机器学习与迭代计算等。

投递数仓( LogShipper ):稳定可靠的日志投递。将日志中枢数据投递存储类服务进行存储。支持压缩、自定义 Partition 、以及行列等各种存储方式。

(实时采集与消费) LogHub :实时性强,写入即可消费。 Logtail (采集 Agont )实时采集传输,秒内到服务端(99.9%情况)。写入即可查询分析。

海量数据:对数据量不设上限。

种类丰富:支持行、列、 TextFile 等各种存储格式。

配置灵活:支持用户自定义 Partition 等配置。

Kafka

Kafka 是一款分布式发布与订阅的消息中间件,有高性能、高吞量的特点,每秒能处理上百万的消息。 Kafka 适用于流式数据处理。

大数据领域:主要应用于用户行为跟踪、日志收集等场景。

数据集成:将消息导入 MaxCompute 、 OSS 、 RDS , Hadoop 、 HBase 等离线数据仓库。


四.操作步骤

方案一:

通过 Tunnel 上传日志数据到 MaxCompute 环境准备及步骤:

(1) 开通 MaxCompute 服务,安装 odpscmd 客户端。

(2)准备日志服务数据。

(3)创建 MaxCompute 表,用来储存日志数据。(4使用命令:

 tunnel u C :\ Users \ Desktop\weijing _ loghub _ demo . csv tunnel _ log .

(5)查询表数据是否导入成功。

注意事项:

(1)使用 Tunnel 命令行工具上传数据当前不支持通配符或正则表达式命令。

(2)对于特殊格式的日志数据,我们一股建议作为一个字段上传,到 MaxComopute 里在进行拆分。

方案二:

通过 DataHub 投递日志数据到 MaxCompute 环境准备及步骤:

(1) 登录阿里云 DataHub 控制台,创建 Project 。

(2) 进入 Project 列表-> Project 查看,创建 Topic .

(3)选择导入 MaxCompute 表结构、并勾选自动创建 DataConector .

注意:准备对应的 MaxCompute 表,该表字段类型、名称、顺序必须与 DataHub Topic 字段完全一致,如果三个条件中的任意个不满足,则归档 Connector 无法创建。

(4)创建好的 DataConnector 详细信息如下图所示。

(5)如果已经创建 Topic ,只需要在详情页的右上角点击+ DataConnector 。

+ Sbsaipdon + DdaCo Г mecdor

分区范围: SYSTEM _ TIME 、 EVENT _ TIME 、 USER _ DEFINE 三种模式, SystemTime 模式会使用写入时间转化为字符串进行分区, EventTime 模式会根据 topic 中固定的 event time 字段时间进行分区(需要在创建 Topic 时增加一个 TIMESTAMP 类型名称为 event time 的字段,并且写入数据时向这个字段写入其微妙时间),UserDefine模式将会直接使用用户自定义的分区字段字符串分区。

(6) 回到 DataHub 控制台,点击 Topic ,点击 DataConnector。

(7) 日志数据抽样。

( 8)测试日志数据是否投递成功。

注意事项:

(1)目前所有 DataConnector 均仅支持同- Rogion 的云服务之间同步数据,不支持同步数据到跨 Region 的服务。

(2) DataConnector 所配置的目标服务 Endpoint 需要填写相应的内网域名(经典网络),不支持使用公网域名同步。

(3)数据同步目前仅支持 at least once 语义,在网络服务异常等小概率场景下可能会导致目的端的数据产生重复但不会丢失,需要做去重处理。

(4) topic 默认20个,如果需要创建更多,需提交工单申请。

方案三:

通过 LogHub 投递日志数据到 MaxCompute 环境准备及步骤:

(1) 开通日志服务,登录日志服务控制台,创建新的 Projoct 或者单击已经创好的 Projoct 名称。

(2)创腱新的 Logstore 或者单击已经创建好的 Logstore 名称。

(3)单击对应的 LLogstore ,查询分析导入到 LogHub 的日志数据。

(4)选择需要投递的日志库名称并依次展开节点,日志库名称—>数据处理—>导出—> Maxompute 。单击开始投递。

(5)单击开启投递以进入 LogHub —>数据投递页面。

(6)配置投递规则,在 LogHub —>数据投递页面配置字段关联等相关内容。

 a )自定义一个投递名称

 b ) MaxCompute 表名称,请输入自定义的新建的 MaxCompute 表名称或者选择已有的 MaxCompute 表。

 c)胺序,左边填写与 MaxCompute 表数据列相映射的日志服务字段名称,右边填写或选择 MaxCompute 表的普通字段名称及字段类型。

 d )_ partition time _格式:将日志时间作为分区字段,通过日期来筛选数据是 MaxCompute 常见的过滤数据方法。 partition _ time 是根据日志_ time _值计算得到

(不是日志写入服务端时间,也不是日志投递时间),结合分区时间格式,向下取整。

(7)在投递管理页面,单击修改即可针对之前的配置信息进行编辑。其中如果想到新增列,可以在大数据计算服务MaxComout修改投递的数据表列信息,则单击修改后会加载最新的数据表信息。

(8)投递任务管理。

在启动股递功能后,日志服务后台会定期启动离线投递任务。用户可以在控制台上看到这些投递任务的状态和措误信息。当投递任务发生错误时,请查看错误信息,问题解决后可以通过云控制台中日志投递任务管理或 SDK 来重试失败任务。

(9)查看日志投递的运行状态.

(10)日志投送 MaxCompute 后,检查数据完整性。

a )通过控制台或 APV / SDK 判斯(推荐)

使用AP1、 SDK 或者控制台获取指定 Project / Logstore 投递任务列表。控制台会对该返回结果进行可视化展示。

 b )通过 MaxCompute 分区粗略估计

比如在 MaxCompute 中以半小时做一次分区,投递任务为每30分钟一次,当表中包含以下分区:

2019_10_25_10_00

2019_10_25_10_30

当发现分区2019_10_25_1100出现时,说明11:00之前分区数据已经完整。

该方法不依赖 AP ,判断方式简单但结果并不精确,仅用作粗略估计。

(11) 查询 MaxCompute 表中数据。

注意事项:

(1)数加控制台创建、修改投递配置必须由主账号完成,不支持子账号操作。

(2)不同 Logstore 的数据请勿导入到同一个 MaxCompute 表中,否则会造成分区冲突、丢失数据等后果。

(3) MaxCompute 表至少包含一个数据列、一个分区列。

(4) MaxCompute 单表有分区数目6万的限制,分区数超出后无法再写入数据,所以日志服务导入 MaxCompute 表至多持3个分区列。请谨慎选择自定义字段作为分区列,保证其值是可枚举的。

(5)日志服务数据的一个字段最多允许映射到一个 MaxCompute 表的列(数据列或分区列),不支持字段冗余,同一个学段名第二次使用时其投递的值为 nul ,如果 nul 出现在分区列会导致数据无法被投递。

(6)股递 MaxCompute 是批量任务,请谨慎设置分区列及其类型:保证一个同步任务内处理的数据分区数小于512个;用作分区列的字段值不能为空或包括/等 MaxCompute 保留字段。

(7)不支持海外 Region 的 MaxCompute 投递,海外 Region 的 MaxCompute 请使用 DataWorks 进行数据同步。

环境准备及步骤:

(1)登录阿里云 LogHub 控制台,创 Project ,

(2)登录 DataWorks 控制台,单击对应项目进入数据集成。

(3)进入同步资源管理—>数据源页面,单击右上角的新增数据源。

(4)选择数据源类型为 LogHub ,填写新增 LogHub 数据源对话框中的配置。单击测试连通性。测试连通性通过后,单击确定。

(5)配置同步任务

可选择向导模式,通过简单便捷的可视化页面完成任务配置;或者选择脚本模式,深度自定义配置您的同步任务。

新建业务流程—>数据集成—>新建数据集成节点—>数据同步进入数据同步任务配置页面。

日志开始时间:数据消费的开始时间位点,为 ywyMMddHHmmss 格式的时间字符串(比如20191025103000),左闭右开。

日志结束时间:数据消费的结束时间位点,为 yYwyMMddHHmmss 格式的时间字符串(比如20191025113000),左闭右开。

批量条数:一次读取的数据条数,默认为256。

 a )向导模式配置同步任务。

(一)配置数据源及数据去向。

(二)保存数据同步的配置,提交并运行。

(三)查询 MaxCompute 表中数据,确保日志服务数据已经成功同步到 MaxCompute 。

 b )脚本模式配置同步任务。

(一)导入模板,选择数据源和目标数据源。

(二)编辑脚本。

(三)保存脚本数据节点,提交并运行。

(四)查询 MaxCompute 表中数据,确保日志服务数据已经成功同步到 MaxCompute 。

方案四:通过 Kafka 投递日志数据到 Maxcompute 环境准备及步骤:

(1)搭建 Kafka 集群。

(2)在控制台创建 Topic 和 Consumer Group 。

(3) Flume 读取日志文件数据写入到 Kafka ,

 a )为 flume 构建 agent :先进去 lume 下的配文件夹里面编写构建 agent 的配置文件。

 b )启动 flume 的 agent

 bin / flume - ng agent - c conf - f 配置文件夹名/配置文件名- n a1- Dflume . root . logger = lNFO .console

 c )启动 kafka 的消费者 bin / kafka - console - consumer . sh -Z0okeeper主机名:2181-topic kafka _ odps

这样就开启了日志采集,文件会写入到 kafka 中。

(4)通过数据集成 DataWorks 同步数据到 Maxcompute 。

新建业务流程->新建数据同步节点->转换为脚本->配置脚本。

(5)配置脚本信息,运行脚本进行数据节点的同步。

(6)查询 Maxcompute 表中数据,确保日志服务数据已经成功同步到 MaxCompute 。

注意事项:

(1) 数据同步的脚本编写。 Reader 、Writer 的配置。

(2) Flume 采集日志数据中配置文件的配置。

上一篇:如何为K8S生产系统配置安全管理?


下一篇:DataWorks智能监控模块介绍|学习笔记