目录
1.将生成的jar包拷贝到CentOS0 /opt/module 分发jar包
7.在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
8.拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面(YSJ-1.0-SNAPSHOT.jar)
17.进入到/opt/module/kafka-manager-1.3.3.22/conf
19.CentOS0:/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
20.CentOS0服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置
1.将生成的jar包拷贝到CentOS0 /opt/module 分发jar包
2.在CentOS0上执行jar程序
[root@CentOS0 module]$ java -classpath 包名 com.Charlie.Guo.AppMain > /opt/module/test.log
配置登录远程服务器立即source一下环境变量
[root@CentOS0/1/2 ~]$ echo source /etc/profile >> ~/.bashrc
4./bin目录下创建脚本lg.sh
[root@CentOS0 bin]$ vim lg.sh
#! /bin/bash
for i in CentOS0 CentOS1
do
ssh $i "java -classpath /opt/module/YSJ-1.0-SNAPSHOT-jar-with-dependencies.jar com.Charlie.Guo.AppMain >/opt/module/test.log"
done
[root@CentOS0 bin]$ chmod 777 lg.sh
[root@CentOS0 module]$ lg.sh
[root@CentOS0 module]# cd data 然后ls
5./bin目录下创建集群时间同步修改脚本dt.sh
[root@CentOS0 bin]$ vim dt.sh
#!/bin/bash
log_date=$1" $2"
echo $log_date
for i in centos0 centos1 centos2
do
ssh -t $i "sudo date -s '$log_date'"
done
[root@CentOS0 bin]$ chmod 777 dt.sh
[root@CentOS0 bin]$ dt.sh 2021-06-02
6.集群所有进程查看脚本
[root@CentOS0 bin]$ vim xcall.sh
#! /bin/bash
for i in centos0 centos1 centos2
do
echo --------- $i ----------
ssh $i "$*"
done
赋权,执行
7.在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
[root@CentOS0 conf]$ vim file-flume-kafka.conf
a1.sources=r1
a1.channels=c1 c2
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.Charlie.Guo.ETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.Charlie.Guo.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = centos0:9092,centos1:9092,centos2:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = centos0:9092,centos1:9092,centos2:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
8.拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面(YSJ-1.0-SNAPSHOT.jar)
9.分发Flume到其他主机
[root@CentOS0 module]$ xsync flume/
[root@CentOS0 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
10.日志采集Flume启动停止脚本
[root@CentOS0 bin]$ vim f1.sh
#! /bin/bash
case $1 in
"start"){
for i in centos0 centos1
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &"
done
};;
"stop"){
for i in centos1 centos2
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac
[root@CentOS0 bin]$ vim f2.sh
#! /bin/bash
case $1 in
"start"){
for i in CentOS2
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt 2>&1 &"
done
};;
"stop"){
for i in CentOS2
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac
11.Kafka集群启动停止脚本
[root@hadoop102 bin]$ vim kf.sh
#! /bin/bash
case $1 in
"start"){
echo " -------- 启动 KafkaManager -------"
nohup /opt/module/kafka-manager-1.3.3.15/bin/kafka-manager -Dhttp.port=7456 >start.log 2>&1 &
};;
"stop"){
echo " -------- 停止 KafkaManager -------"
ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill
};;
esac
12.查看所有Kafka Topic
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181 --list
13.创建 Kafka Topic
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181, CentOS1:2181, CentOS2:2181 --create --replication-factor 1 --partitions 1 --topic topic_start
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181, CentOS1:2181, CentOS2:2181 --create --replication-factor 1 --partitions 1 --topic topic_event
注意:这里的两个Topic需要提前在HDFS中创建路径
hadoop fs -mkdir -p + 路径
14.生产消息
[root@CentOS0 kafka]$ bin/kafka-console-producer.sh \
--broker-list CentOS0:9092 --topic topic_start
>hello world
>root root
15.消费消息
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181 \
--describe --topic topic_start
16.Kafka Manager安装
[root@CentOS0 module]$ unzip kafka-manager-1.3.3.22.zip
17.进入到/opt/module/kafka-manager-1.3.3.22/conf
[root@CentOS0 conf]$ vim application.conf
修改为:
kafka-manager.zkhosts="CentOS0:2181,CentOS1:2181,CentOS2:2181"
18.Kafka Manager启动停止脚本
[root@CentOS0 bin]$ vim km.sh
#! /bin/bash
case $1 in
"start"){
echo " -------- 启动 KafkaManager -------"
nohup /opt/module/kafka-manager-1.3.3.15/bin/kafka-manager -Dhttp.port=7456 >start.log 2>&1 &
};;
"stop"){
echo " -------- 停止 KafkaManager -------"
ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill
};;
esac
19.CentOS0:/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
vim kafka-flume-hdfs.conf
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = CentOS0:9092,CentOS1:9092,CentOS2:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = CentOS0:9092,CentOS1:9092,CentOS2:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
20.CentOS0服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
同步配置到其他机器
conf]$ xsync flume-env.sh