StreamSets实战之路(十五)-实战篇- 数据采集与处理

主要通过一个数据采集与处理的案例来介绍Streamsets(3.13.0)的使用,主要将使用Edge数据流收集streamsets系统的日志和主机性能指标,通过收集数据流收集类数据并进行简单处理,发送至kafka中,性能指标数据入库数据流和日志数据入库数据流分别从kafka中消费数据,并将两类数据进行简单处理加载到数据库中。

学习目的:使用edge和streamset的数据互动,使用streamset进行分布式异步数据处理。

数据流图:

StreamSets实战之路(十五)-实战篇- 数据采集与处理

最终数据流的效果图:

需要配置5个数据流,两个edge采集数据流,一个数据收集数据流,两个数据处理与入库数据流

StreamSets实战之路(十五)-实战篇- 数据采集与处理

前期准备:

(1)需要在数据采集的节点上部署安装Edge(不会使用的同学可以参照前面文章)。

(2)一个现成kafka集群,并创建一个两个topic,kafka集群主要为了让数据流达到分布式异步处理的能力。

(3)一个现成的ES集群。

构建步骤:

1.首先构建日志数据采集器数据流

配置edge数据流发布的地址(该主机上一定要安装部署了edge)

StreamSets实战之路(十五)-实战篇- 数据采集与处理

配一下文件采集文件和数据格式,数据格式我们直接按文本传输

 

StreamSets实战之路(十五)-实战篇- 数据采集与处理

StreamSets实战之路(十五)-实战篇- 数据采集与处理

使用destination 类http client配置一下远程数据收集器的地址和APP ID

StreamSets实战之路(十五)-实战篇- 数据采集与处理

2.性能指标数据采集器数据流

配置edge数据流发布的地址(该主机上一定要安装部署了edge)

StreamSets实战之路(十五)-实战篇- 数据采集与处理

配置一下系统指标采集插件,采集哪些数据和采集的频率,这里我们采集host、cpu、内存、磁盘等,采集频率为两秒

StreamSets实战之路(十五)-实战篇- 数据采集与处理

使用destination 类http client同样配置一下数据收集器远程的url和APP ID

StreamSets实战之路(十五)-实战篇- 数据采集与处理

3.数据收集与处理数据流

使用origin 类http sevice组件,配置数据收集器的端口、最大并发量以及APP ID

StreamSets实战之路(十五)-实战篇- 数据采集与处理

使用Http 路由插件,将接受到的数据路由不到不同分支,这里配置日志和性能指标数据路由。

StreamSets实战之路(十五)-实战篇- 数据采集与处理

使用日志解析插件对收集到的日志数据进行解析,这里我们选择Log4j解析器,选择使用自定义日志格式,这里的格式按照streamset的格式:

 %d{ISO8601} [user:%X{s-user}] [pipeline:%X{s-entity}] [runner:%X{s-runner}] [thread:%t] [stage:%X{s-stage}] %-5p %c{1} - %m%n

StreamSets实战之路(十五)-实战篇- 数据采集与处理

使用kafka生成插件将两类数据输出到不同的topic中。

StreamSets实战之路(十五)-实战篇- 数据采集与处理

4.日志数据入库数据流

配置kafka地址和日志数据的topic和消费组

StreamSets实战之路(十五)-实战篇- 数据采集与处理

5.性能指标数据入库数据流

配置kafka地址和性能指标数据的topic和消费组

StreamSets实战之路(十五)-实战篇- 数据采集与处理

 

Streamsets实战之路正在更新中,尽情期待!!!

 

此文章为博主原创,转载请标明出处和原始链接,谢谢。

上一篇:【附录A SDC】静态时序分析圣经翻译计划


下一篇:Device or resource busy