数据采集配置

目录

1.将生成的jar包拷贝到CentOS0 /opt/module 分发jar包

2.在CentOS0上执行jar程序

配置登录远程服务器立即source一下环境变量

4./bin目录下创建脚本lg.sh

5./bin目录下创建集群时间同步修改脚本dt.sh

6.集群所有进程查看脚本

7.在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

8.拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面(YSJ-1.0-SNAPSHOT.jar)

9.分发Flume到其他主机

10.日志采集Flume启动停止脚本

11.Kafka集群启动停止脚本

12.查看所有Kafka Topic

13.创建 Kafka Topic

14.生产消息

15.消费消息

16.Kafka Manager安装

17.进入到/opt/module/kafka-manager-1.3.3.22/conf

18.Kafka Manager启动停止脚本

19.CentOS0:/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

20.CentOS0服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置


1.将生成的jar包拷贝到CentOS0 /opt/module 分发jar包

2.在CentOS0上执行jar程序

[root@CentOS0 module]$ java -classpath 包名 com.Charlie.Guo.AppMain > /opt/module/test.log

配置登录远程服务器立即source一下环境变量

[root@CentOS0/1/2 ~]$ echo source /etc/profile >> ~/.bashrc

4./bin目录下创建脚本lg.sh

[root@CentOS0 bin]$ vim lg.sh
#! /bin/bash



    for i in CentOS0 CentOS1

    do

        ssh $i "java -classpath /opt/module/YSJ-1.0-SNAPSHOT-jar-with-dependencies.jar com.Charlie.Guo.AppMain  >/opt/module/test.log"

    done
[root@CentOS0 bin]$ chmod 777 lg.sh

[root@CentOS0 module]$ lg.sh

[root@CentOS0 module]# cd data  然后ls

 

5./bin目录下创建集群时间同步修改脚本dt.sh

[root@CentOS0 bin]$ vim dt.sh
#!/bin/bash



log_date=$1" $2"



echo $log_date



for i in centos0 centos1 centos2

do

        ssh -t $i "sudo date -s '$log_date'"

done
[root@CentOS0 bin]$ chmod 777 dt.sh

[root@CentOS0 bin]$ dt.sh 2021-06-02

6.集群所有进程查看脚本

[root@CentOS0 bin]$ vim xcall.sh
#! /bin/bash



for i in centos0 centos1 centos2

do

        echo --------- $i ----------

        ssh $i "$*"

done

赋权,执行

7.在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

[root@CentOS0 conf]$ vim file-flume-kafka.conf
a1.sources=r1

a1.channels=c1 c2



# configure source

a1.sources.r1.type = TAILDIR

a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /opt/module/data/app.+

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1 c2



#interceptor

a1.sources.r1.interceptors =  i1 i2

a1.sources.r1.interceptors.i1.type = com.Charlie.Guo.ETLInterceptor$Builder

a1.sources.r1.interceptors.i2.type = com.Charlie.Guo.TypeInterceptor$Builder



a1.sources.r1.selector.type = multiplexing

a1.sources.r1.selector.header = topic

a1.sources.r1.selector.mapping.topic_start = c1

a1.sources.r1.selector.mapping.topic_event = c2



# configure channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = centos0:9092,centos1:9092,centos2:9092

a1.channels.c1.kafka.topic = topic_start

a1.channels.c1.parseAsFlumeEvent = false

a1.channels.c1.kafka.consumer.group.id = flume-consumer



a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c2.kafka.bootstrap.servers = centos0:9092,centos1:9092,centos2:9092

a1.channels.c2.kafka.topic = topic_event

a1.channels.c2.parseAsFlumeEvent = false

a1.channels.c2.kafka.consumer.group.id = flume-consumer

8.拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面(YSJ-1.0-SNAPSHOT.jar)

9.分发Flume到其他主机

[root@CentOS0 module]$ xsync flume/

[root@CentOS0 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

10.日志采集Flume启动停止脚本

[root@CentOS0 bin]$ vim f1.sh
#! /bin/bash



case $1 in

"start"){

        for i in centos0 centos1

        do

                echo " --------启动 $i 采集flume-------"

                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &"

        done

};;

"stop"){

        for i in centos1 centos2

        do

                echo " --------停止 $i 采集flume-------"

                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"

        done



};;

esac

 

[root@CentOS0 bin]$ vim f2.sh
#! /bin/bash



case $1 in

"start"){

        for i in CentOS2

        do

                echo " --------启动 $i 消费flume-------"

                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt   2>&1 &"

        done

};;

"stop"){

        for i in CentOS2

        do

                echo " --------停止 $i 消费flume-------"

                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"

        done



};;

esac

11.Kafka集群启动停止脚本

[root@hadoop102 bin]$ vim kf.sh
#! /bin/bash



case $1 in

"start"){

        echo " -------- 启动 KafkaManager -------"

        nohup /opt/module/kafka-manager-1.3.3.15/bin/kafka-manager   -Dhttp.port=7456 >start.log 2>&1 &

};;

"stop"){

        echo " -------- 停止 KafkaManager -------"

        ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill

};;

esac

12.查看所有Kafka Topic

[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181 --list

13.创建 Kafka Topic

[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181, CentOS1:2181, CentOS2:2181  --create --replication-factor 1 --partitions 1 --topic topic_start
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181, CentOS1:2181, CentOS2:2181  --create --replication-factor 1 --partitions 1 --topic topic_event

注意:这里的两个Topic需要提前在HDFS中创建路径

hadoop fs -mkdir -p + 路径

14.生产消息

[root@CentOS0 kafka]$ bin/kafka-console-producer.sh \

--broker-list CentOS0:9092 --topic topic_start

>hello world

>root  root

15.消费消息

[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181 \

--describe --topic topic_start

16.Kafka Manager安装

[root@CentOS0 module]$ unzip kafka-manager-1.3.3.22.zip

17.进入到/opt/module/kafka-manager-1.3.3.22/conf

[root@CentOS0 conf]$ vim application.conf

修改为:

kafka-manager.zkhosts="CentOS0:2181,CentOS1:2181,CentOS2:2181"

18.Kafka Manager启动停止脚本

[root@CentOS0 bin]$ vim km.sh
#! /bin/bash



case $1 in

"start"){

        echo " -------- 启动 KafkaManager -------"

        nohup /opt/module/kafka-manager-1.3.3.15/bin/kafka-manager   -Dhttp.port=7456 >start.log 2>&1 &

};;

"stop"){

        echo " -------- 停止 KafkaManager -------"

        ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill

};;

esac

19.CentOS0:/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

vim kafka-flume-hdfs.conf
## 组件

a1.sources=r1 r2

a1.channels=c1 c2

a1.sinks=k1 k2



## source1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.batchSize = 5000

a1.sources.r1.batchDurationMillis = 2000

a1.sources.r1.kafka.bootstrap.servers = CentOS0:9092,CentOS1:9092,CentOS2:9092

a1.sources.r1.kafka.topics=topic_start



## source2

a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r2.batchSize = 5000

a1.sources.r2.batchDurationMillis = 2000

a1.sources.r2.kafka.bootstrap.servers = CentOS0:9092,CentOS1:9092,CentOS2:9092

a1.sources.r2.kafka.topics=topic_event



## channel1

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1

a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/

a1.channels.c1.maxFileSize = 2146435071

a1.channels.c1.capacity = 1000000

a1.channels.c1.keep-alive = 6



## channel2

a1.channels.c2.type = file

a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2

a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/

a1.channels.c2.maxFileSize = 2146435071

a1.channels.c2.capacity = 1000000

a1.channels.c2.keep-alive = 6



## sink1

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d

a1.sinks.k1.hdfs.filePrefix = logstart-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = second



##sink2

a1.sinks.k2.type = hdfs

a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d

a1.sinks.k2.hdfs.filePrefix = logevent-

a1.sinks.k2.hdfs.round = true

a1.sinks.k2.hdfs.roundValue = 10

a1.sinks.k2.hdfs.roundUnit = second



## 不要产生大量小文件

a1.sinks.k1.hdfs.rollInterval = 10

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0



a1.sinks.k2.hdfs.rollInterval = 10

a1.sinks.k2.hdfs.rollSize = 134217728

a1.sinks.k2.hdfs.rollCount = 0



## 控制输出文件是原生文件。

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k2.hdfs.fileType = CompressedStream



a1.sinks.k1.hdfs.codeC = lzop

a1.sinks.k2.hdfs.codeC = lzop



## 拼装

a1.sources.r1.channels = c1

a1.sinks.k1.channel= c1



a1.sources.r2.channels = c2

a1.sinks.k2.channel= c2

20.CentOS0服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

同步配置到其他机器

conf]$ xsync flume-env.sh

 

上一篇:你不知道的冷知识 | 指数退避思想及其在Flume/Hadoop中的应用


下一篇:使用flume时出现NoSuchMethodError: com.google.common.base.Preconditions.checkArgument