本质上,Kafka提供的是消息队列的能力,为消息从生产者流向消费中提供稳定、高效、可靠的渠道。但Kafka本身并不提供海量数据存储的能力,这意味着重读kafka中历史数据将不可能。同时,Kafka没有提供开箱即用的数据处理工具(尽管你可以采用kafka streams或者flink等,但这需要你自己写代码逻辑),使得对原始数据进行加工处理成本较高。
我们知道,阿里云OSS提供了灵活、海量、高性价比的存储方式,用户可以根据自己的需要设置合适的存储策略。同时,日志服务SLS则提供了一站式的数据采集、加工、分析、告警可视化与投递功能。本文将会介绍一种基于SLS将Kafka数据入湖OSS的整体方案,为Kafka中的数据增加查询分析和海量存储的能力。
数据接入
SLS提供了主动接入Kafka数据的服务,用户只需配置几项必要的信息,即可将Kafka数据接入到SLS中。通常情况下,只需提供Kafka集群的bootstrap servers、对应的topic以及消费位置信息。但针对阿里云VPC下部署的Kafka集群,为了避免公网访问,则还需提供相关的VPC和ECS实例信息。下面将会具体介绍Kafka集群常见的几种部署形式以及如何通过SLS控制台快速配置Kafka接入的过程。
集群部署方式
不同的Kafka集群部署方式,接入SLS之前,需要进行不同的准备。Kafka集群的部署形式可以分为以下3种情况:
- 基于阿里云ECS实例自建集群
- 阿里云Kafka产品
- 其他场景(非阿里云上部署)
下面将分别介绍以上不同部署形式接入SLS的准备工作以及特点。
基于阿里云ECS实例自建集群
针对这种场景,SLS服务能够打通VPC以内网连接的形式拉取Kafka中的数据,从而提供更安全可靠的数据读取方式。当然,这要求Kafka中的数据必须接入到与VPC region相同的SLS project中,否则只能按照第三种部署形式进行接入。
为了能够打通对用户VPC的访问,用户需要额外提供Kafka集群所在的每个ECS实例的VPC ID和IP信息。如果Kafka broker之间是基于内网域名进行通信,还需提供ECS实例对应的内部域名。总结下来,需要额外提供如下类似的VPC相关信息,其中v1和v2用来标记不同broker对应的ECS实例信息(这里假设只部署了2个Kafka broker)。
{
"config.vpc.vpc_id.v1":"vpc-bp1949587myedyj8s1bqw",
"config.vpc.instance_ip.v1":"192.168.26.34",
"config.vpc.instance_port.v1":"9092",
"config.vpc.instance_host.v1":"kafka-host1",
"config.vpc.vpc_id.v2":"vpc-bp1949587myedyj8s1bqw",
"config.vpc.instance_ip.v2":"192.168.26.35",
"config.vpc.instance_port.v2":"9092",
"config.vpc.instance_host.v2":"kafka-host2"
}
阿里云Kafka产品
阿里云提供了消息队列kafka版,方便用户可以快速地使用Kafka服务。本质上,Kafka云产品也是基于VPC进行集群部署的,例如下图中显示的是Kafka云产品某个实例的具体接入点和VPC ID信息。只不过Kafka云产品提供了相应的控制台,可以协助用户以可视化的形式对Kafka集群进行管理(如果自建集群的话,则一般通过Kafka自带的命令行工具进行管理)。
图 1 阿里云Kafka产品接入信息
需要说明的是,默认情况下Kafka云产品没有开启消费组自动创建,因此在将Kafka云产品中的数据接入SLS之前,必须手动创建好消费组并填写到SLS导入配置中,否则客户端会得到错误信息,如librdkafka返回"JoinGroup failed: Broker: Group authorization failed"。通过对应Kafka云产品实例下的Group管理,可以手动创建新的consumer group,具体如下图所示。
图 2 阿里云Kafka产品手动创建消费组
因此,针对Kafka云产品的场景,除了配置VPC相关的信息外,还需要配置使用到的consumer group。结合图 1和图 2中实例,需要的额外信息如下所示:
{
"group.id":"will-test",
"config.vpc.vpc_id.v1":"vpc-xxxxxxxxxxj8s1bqw",
"config.vpc.instance_ip.v1":"192.168.26.34",
"config.vpc.instance_port.v1":"9092",
"config.vpc.vpc_id.v2":"vpc-xxxxxxxxxxj8s1bqw",
"config.vpc.instance_ip.v2":"192.168.26.35",
"config.vpc.instance_port.v2":"9092",
"config.vpc.vpc_id.v3":"vpc-xxxxxxxxxxj8s1bqw",
"config.vpc.instance_ip.v3":"192.168.26.36",
"config.vpc.instance_port.v3":"9092"
}
其他场景(其他非阿里云上部署)
在这种场景下,只有Kafka集群提供公网访问能力后,SLS服务才能从Kafka broker读取到数据。在SLS控制台,除了提供Kafka集群的公网bootstrap servers、需要接入的topic以及消费位置外,并不需要额外的其他配置信息。然后大多情况下,Kafka集群只提供了内网访问能力,如果需要同时提供公网访问的能力,则需要对Kafka集群进行特定的配置,具体配置可以参考我之前写的文章《Kafka集群如何同时支持内网和外网访问》。
数据接入SLS的步骤
1)登陆SLS控制台,在数据接入/抓取数据下,选择Kafka
图 3 控制台选择Kafka数据接入
2)进入选择日志空间步骤,根据Kafka集群所在的region,选择/创建相应region下的SLS Project和Logstore
图 4 选择Kafka数据接入的Project和Logstore
3)配置Kafka集群相关的数据源信息(这里以Kafka云产品数据接入为例),并点击预览以确保配置无误
图 5 预览Kafka数据
4)根据需要配置日志时间字段,然后在调度间隔选择立即执行
图 6 配置数据的时间字段
5)去对应的Logstore查看Kafka数据是否正确接入到SLS (需要等待几分钟)
图 7 查看数据接入情况
数据加工
Kafka中的原始数据可能存在冗余、不完整、不规整等情况,如果直接用于业务消费,则需要做额外的加工处理,从而增加了业务逻辑实现的复杂性和不必要的成本。而SLS提供了丰富的DSL算子来协助用户对数据进行加工处理,可以方便地实现数据的规整、富化、流转、脱敏和过滤,更多细节可以参考SLS数据加工有关的文档。
下图展示了SLS数据加工的一种使用场景,即将非结构化的syslog日志过滤、规整为用户登陆相关的结构化数据。规整化后的数据,更加便于后续的数据分析,比如查询某个用户在特定时间段内登陆的次数。SLS内置的200多个开箱即用的算子,大大方便了对原始Kafka数据的处理,进而让用户更加专注于业务相关的逻辑。
图 8 数据规整 (来源SLS数据加工)
数据分析
SLS提供了强大的SQL查询能力,能够让用户快速地对海量信息进行过滤和分析,具体可以参考SLS查询和分析相关的官方文档。同时,结合SLS提供的丰富仪表盘,能将分析后的数据以多样化的形式呈现出来,从而提供更为直观的数据洞察能力。比如,下图是基于SLS的SQL查询和仪表盘得到的结果,可以非常直观地看到Top 10用户每天通过主动接入服务导入的数据总量。
图 9 SQL查询
数据入湖
尽管日志服务SLS提供了强大的数据分析、加工等能力,但它会引入额外的存储成本。除了保存原始数据外,它还需要构建倒排索引、列式存储等元数据。因此,开启查询分析后,存入SLS的数据会有额外的开销。对于不常用的数据(如较早的历史数据),SLS支持直接将这些数据以压缩的形式入湖OSS进行存储。另外,OSS不仅支持海量的数据存储,而且还提供了丰富的冷热存储方式(如标准、低频、归档、冷归档等),用户可以根据自己的需要进行选择。
数据投递,是SLS数据入湖到OSS的优选方式,更多细节可以参考文章《SLS投递OSS功能升级:打造更顺畅的日志入湖体验》。通过SLS投递功能,可以将Kafka中不常用的数据以冷存储的形式保存到OSS中。与此同时,SLS还提供了将OSS中的数据导入回SLS的能力。结合SLS数据入湖OSS和OSS数据导入SLS这两个功能,用户可以非常灵活地控制哪些数据需入湖以低成本的形式存储,哪些数据进行查询分析以提供业务价值,具体流程如下图所示。
图 10 Kafka数据入湖整体流程
总结
本文主要介绍了Kafka数据基于SLS入湖OSS的整体方案,其中着重介绍了Kafka的几种常见的部署方式,以及Kafka数据接入SLS的具体过程。虽然Kafka提供了强大的消息队列能力,但由于其缺乏对原始数据的加工/分析的能力以及对海量历史数据的存储能力,使得用户对Kafka原始数据进行分析处理变得困难。而基于SLS提供的加工/分析、SLS数据入湖OSS以及OSS数据导入SLS等特性,则可以弥补Kafka缺失的这两种能力,使得用户可以随时随地对任意时间段内的Kafka原始数据进行分析。