Flume 读取RabbitMq消息队列消息,并将消息写入kafka

首先是关于flume的基础介绍

组件名称

功能介绍

Agent代理

使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。

Client客户端

生产数据,运行在一个独立的线程。

Source源

从Client收集数据,传递给Channel。

Sink接收器

从Channel收集数据,进行相关操作,运行在一个独立线程。

Channel通道

连接 sources 和 sinks ,这个有点像一个队列。

Events事件

传输的基本数据负载。

目前来说flume是支持多种source

其中是支持读取jms消息队列消息,但是并不支持读取rabbitMq,所以需要对flume进行二次开发

这里主要就是flume怎么从rabbitMq读取数据

这里从git上找到了一个关于flume从rabbitMq读取数据的插件

下载地址是:https://github.com/gmr/rabbitmq-flume-plugin

上面有一些英文的描述,大家可以看下

环境介绍

centOS 7.3   jdk1.8   cdh5.14.0

1.用 mvn 打包该项目,会生成两个JAR包

Flume 读取RabbitMq消息队列消息,并将消息写入kafka

2.因为我这边使用的以cdh方式安装集成flume的,所以把这两个jar  扔到  /usr/lib   下面

如果是普通的安装方式需要把这两个jar包复制到 flume安装目录的lib下面

Flume 读取RabbitMq消息队列消息,并将消息写入kafka

3.进入cdh管理页面配置Agent

Flume 读取RabbitMq消息队列消息,并将消息写入kafka

下面是详细的配置,我这边是直接把消息写入kafka集群里 的

tier1.sources  = source1

tier1.channels = channel1

tier1.sinks    = sink1

tier1.sources.source1.type     = com.aweber.flume.source.rabbitmq.RabbitMQSource

tier1.sources.source1.bind     = 127.0.0.1

tier1.sources.source1.port     = 5672

tier1.sources.source1.virtual-host = /

tier1.sources.source1.username = guest

tier1.sources.source1.password = guest

tier1.sources.source1.queue = test

tier1.sources.source1.prefetchCount = 10

tier1.sources.source1.channels = channel1

tier1.sources.source1.threads = 2

tier1.sources.source1.interceptors = i1

tier1.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

tier1.sources.source1.interceptors.i1.preserveExisting = true

tier1.channels.channel1.type   = memory

tier1.sinks.sink1.channel      = channel1

tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink

tier1.sinks.sink1.topic = flume_out

tier1.sinks.sink1.brokerList = 127.0.0.1,127.0.0.1:9093,27.0.0.1:9094

tier1.sinks.sink1.requiredAcks = 1

tier1.sinks.sink11.batchSize = 20

配置完成更新配置重新启动Agent

Flume 读取RabbitMq消息队列消息,并将消息写入kafka

这个是接收到rabbitMq消息

Flume 读取RabbitMq消息队列消息,并将消息写入kafka

大功告成,如果配置中有疑问的可以留言,我看到后会回复

上一篇:2.logback+slf4j+janino 配置项目的日志输出


下一篇:java实时监听日志写入kafka(转)