一、数据准备
-
现在本地有一个users.csv文件,需要将其数据通过flume导入到kafka中
-
注意:该表格是有表头的,数据通过flume导入到kafka中是需要通过拦截器过滤掉表头。
二、数据导入
- 首先在Linux下面创建一个文件flume1.properties,编辑该文件
//a1:agent
//flume三大组件 source、channel、sink
a1.sources=f1
a1.channels=c1
a1.sinks=k1
//文件监听器
a1.sources.f1.type = spooldir
a1.sources.f1.channels = c1
//将users.csv文件备份至/opt/fd,然后开始监听
a1.sources.f1.spoolDir = /opt/fd
a1.sources.f1.batchSize = 10000
//拦截器:将表的表头过滤掉
a1.sources.f1.interceptors=i1
a1.sources.f1.interceptors.i1.type=regex_filter
//正则匹配,将行开头为user_id的一行数据过滤掉
a1.sources.f1.interceptors.i1.regex=user_id.*
a1.sources.f1.interceptors.i1.excludeEvents=true
//设置临时存放数据的地址以及检查点
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flume/checkpoint
a1.channels.c1.dataDir = /opt/flume/data
a1.sinks.k1.channel=c1
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
//创建一个kafka的topic为users,这边也可以不设置,自己在kafka中创建
a1.sinks.k1.kafka.topic=users
a1.sinks.k1.kafka.bootstrap.servers=192.168.153.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 100
//设置应答机制acks=1,表示将生产者生产数据后,leader进行同步数据备份,follower异步备份
//由于这边只有一个分区一个consumer,所以这边表示当这个consumer数据同步备份,数据不会丢失
a1.sinks.k1.kafka.producer.acks = 1
- 开始导入数据之前,需要启动kafka
kafka-server.start.sh /opt/soft/kafka211/config/server.properties
- kafka消费者消费,当flume开始向kafka中导入数据时,kafka这边开始消费
kafka-console-consumer.sh --bootstrap-server 192.168.153.200:9092 --topic users --from-beginning
- 使用flume命令开始向kafka中导入数据
//a1:flume1.properties文件中定义的agent名称
flume-ng agent --name a1 -f /opt/flumecfg/flume1.properties
- 数据向kafka中开始导入数据后,我们可以查看kafka数据消费信息,生产者生产,消费者同步消费。