使用flume将本地数据导入kafka

文章目录

创建topic

[root@hadoop1 kafka]# kafka-topics.sh --zookeeper hadoop1:2181 --create --topic users --partitions 1 --replication-factor 1
Created topic "users".

flume操作

创建所需文件夹

[root@hadoop1 jobkb09]# mkdir dataSourceFile
[root@hadoop1 jobkb09]# cd dataSourceFile
[root@hadoop1 dataSourceFile]# mkdir users 
[root@hadoop1 dataSourceFile]# cd ..
[root@hadoop1 jobkb09]# mkdir dataChannelFile
[root@hadoop1 jobkb09]# cd dataChannelFile
[root@hadoop1 dataChannelFile]# mkdir users 
[root@hadoop1 dataChannelFile]# cd ..
[root@hadoop1 jobkb09]# mkdir checkpointFile
[root@hadoop1 jobkb09]# cd checkpointFile
[root@hadoop1 checkpointFile]# mkdir users 
[root@hadoop1 checkpointFile]# cd ..
[root@hadoop1 jobkb09]# vi users-flume-kafka.conf

users-flume-kafka.conf内容如下

users.sources=usersSource
users.channels=usersChannel
users.sinks=usersSink

users.sources.usersSource.type=spooldir
users.sources.usersSource.spoolDir=/opt/flume/conf/jobkb09/dataSourceFile/users
users.sources.usersSource.deserializer=LINE
users.sources.usersSource.deserializer.maxLineLength=1000
users.sources.usersSource.interceptors=head_filter
users.sources.usersSource.interceptors.head_filter.type=regex_filter
users.sources.usersSource.interceptors.head_filter.regex=^user_id*
users.sources.usersSource.interceptors.head_filter.excludeEvents=true
users.sources.usersSource.includePattern=users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv

users.channels.usersChannel.type=file
users.channels.usersChannel.checkpointDir=/opt/flume/conf/jobkb09/checkpointFile/users
users.channels.usersChannel.dataDirs=/opt/flume/conf/jobkb09/dataChannelFile/users

users.sinks.usersSink.type=org.apache.flume.sink.kafka.KafkaSink
users.sinks.usersSink.batchSize=640
users.sinks.usersSink.brokerList=192.168.153.10:9092
users.sinks.usersSink.topic=users

users.sources.usersSource.channels=usersChannel
users.sinks.usersSink.channel=usersChannel

执行命令

[root@hadoop1 flume]# wc -l conf/jobkb09/dataSourceFile/users/users_2020-12-08.csv
38210 conf/jobkb09/dataSourceFile/users/users_2020-12-08.csv
[root@hadoop1 flume]# flume-ng agent -n users -c conf -f conf/jobkb09/tmp/users-flume-kafka.conf -Dflume.root.logger=INFO,console

使用kafka命令验证

//比38210少一行的原因是我们使用flume操作的时候将表列字段删除了
[root@hadoop1 flume]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.153.10:9092 --topic users -time -1 --offsets 1
users:0:38209
[root@hadoop1 flume]# kafka-console-consumer.sh --bootstrap-server 192.168.153.10:9092 --topic users --from-beginning
3197468391,id_ID,1993,male,2012-10-02T06:40:55.524Z,Medan  Indonesia,480
3537982273,id_ID,1992,male,2012-09-29T18:03:12.111Z,Medan  Indonesia,420
823183725,en_US,1975,male,2012-10-06T03:14:07.149Z,Stratford  Ontario,-240
1872223848,en_US,1991,female,2012-11-04T08:59:43.783Z,Tehran  Iran,210
3429017717,id_ID,1995,female,2012-09-10T16:06:53.132Z,,420
627175141,ka_GE,1973,female,2012-11-01T09:59:17.590Z,Tbilisi  Georgia,240
2752000443,id_ID,1994,male,2012-10-03T05:22:17.637Z,Medan  Indonesia,420
3473687777,id_ID,1965,female,2012-10-03T12:19:29.975Z,Medan  Indonesia,420
2966052962,id_ID,1979,male,2012-10-31T10:11:57.668Z,Medan  Indonesia,420
264876277,id_ID,1988,female,2012-10-02T07:28:09.555Z,Medan  Indonesia,420
1534483818,en_US,1992,male,2012-09-25T13:38:04.083Z,Medan  Indonesia,420
2648135297,en_US,1996,female,2012-10-30T05:09:45.592Z,Phnom Penh,420
............................................
上一篇:Python key值相同合并value值


下一篇:ERROR - org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)