文件通过flume导入到kafka

一、数据准备

  • 现在本地有一个users.csv文件,需要将其数据通过flume导入到kafka中
    文件通过flume导入到kafka

  • 注意:该表格是有表头的,数据通过flume导入到kafka中是需要通过拦截器过滤掉表头。

二、数据导入

  • 首先在Linux下面创建一个文件flume1.properties,编辑该文件
//a1:agent
//flume三大组件 source、channel、sink
a1.sources=f1
a1.channels=c1
a1.sinks=k1

//文件监听器
a1.sources.f1.type = spooldir
a1.sources.f1.channels = c1
//将users.csv文件备份至/opt/fd,然后开始监听
a1.sources.f1.spoolDir = /opt/fd
a1.sources.f1.batchSize = 10000

//拦截器:将表的表头过滤掉
a1.sources.f1.interceptors=i1
a1.sources.f1.interceptors.i1.type=regex_filter
//正则匹配,将行开头为user_id的一行数据过滤掉
a1.sources.f1.interceptors.i1.regex=user_id.*
a1.sources.f1.interceptors.i1.excludeEvents=true

//设置临时存放数据的地址以及检查点
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/flume/checkpoint
a1.channels.c1.dataDir = /opt/flume/data

a1.sinks.k1.channel=c1
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
//创建一个kafka的topic为users,这边也可以不设置,自己在kafka中创建
a1.sinks.k1.kafka.topic=users
a1.sinks.k1.kafka.bootstrap.servers=192.168.153.200:9092
a1.sinks.k1.kafka.flumeBatchSize = 100
//设置应答机制acks=1,表示将生产者生产数据后,leader进行同步数据备份,follower异步备份
//由于这边只有一个分区一个consumer,所以这边表示当这个consumer数据同步备份,数据不会丢失
a1.sinks.k1.kafka.producer.acks = 1

文件通过flume导入到kafka

  • 开始导入数据之前,需要启动kafka
kafka-server.start.sh /opt/soft/kafka211/config/server.properties
  • kafka消费者消费,当flume开始向kafka中导入数据时,kafka这边开始消费
kafka-console-consumer.sh --bootstrap-server 192.168.153.200:9092 --topic users --from-beginning
  • 使用flume命令开始向kafka中导入数据
//a1:flume1.properties文件中定义的agent名称
flume-ng agent --name a1 -f /opt/flumecfg/flume1.properties
  • 数据向kafka中开始导入数据后,我们可以查看kafka数据消费信息,生产者生产,消费者同步消费。

文件通过flume导入到kafka

上一篇:新安装Ubuntu配置过程


下一篇:更换树莓派(Raspberry Pi 3B+)软件源为国内镜像源