shell脚本并发数据到kafka topic
需求:
每秒发送大量数据到kafka,验证下游系统性能,数据中的时间戳要求为当前时间,可以之间采集系统当前时间替换文件中旧的时间戳,保证每次发送的数据都为最新时间。
利用kafka自带的脚本,将待发数据写入文件中,然后通过读取文件 方式,将数据批量发送到kafka task节点
#在kafka master 节点用户home 目录下创建data 目录
[root@node1 home]# mkdir data
#进入data目录
[root@node1 home]# cd data/
#将准备好的数据放入data目录中
[root@node1 data]# ls
batch1-1000.log batch2-1000.log batch-send.sh
#编写batch-send.sh 脚本
[root@node1 data]# vi batch-send.sh
shell content:
#!/bin/bash
#响应Ctrl+C中断
trap 'onCtrlC' INT
function onCtrlC () {
echo 'Ctrl+C is captured'
exit 1
}
#kafka及data所在目录
dataPath=/home/data
kafkaPath=/usr/local/kafka
#broker list
brokerlist=192.168.1.10:9092,192.168.1.11:9092,192.168.1.12:9092
#kafka的topic,这里设置发送到2个kafka topic
topic1=test-topic1
topic2=test-topic2
#消息总数,可根据需要配置
totalNum=600
#循环计时参数
batchNum=10
#开始时间
start=$(date +%s)
for ((i=1; i<=${totalNum}; i ++))
do
{
#取余数
modVal=$(( ${i} % ${batchNum} ))
#如果循环计数达到设置参数,就发送一批次消息
if [ ${modVal} = 0 ] ; then
#在控制台显示进度,*num,取决于数据文件中的数据量
echo “$(( ${i}*100 )) of $(( ${totalNum}*100 )) sent”
# 获取每次循环发送的当前时间
current=`date "+%Y-%m-%d %H:%M:%S"`
# 将时间转为UTC 时间,精确到秒
timeStamp=`date -d "$current" +%s`
#将current转换为时间戳,精确到毫秒,10#`date "+%N"是为了避免循环过程中出现error:value too great for base
currentTimeStamp=$((timeStamp*1000+10#`date "+%N"`/1000000))
#每次循环发送数据前,都将数据文件中的recvTs 参数改为当前时间,放在前台执行,由于是顺序执行,需要等待文件修改完后,才会下一步执行发送文件数据给kafka,每个文件中1000条数据
sed -i 's/\("recvTs":\)[0-9]*\(,\)/\1'$currentTimeStamp',/g' ${dataPath}/batch1-1000.log ${dataPath}/batch2-1000.log
#批量发送消息,并且将控制台返回的提示符重定向到/dev/null,&放到后台并发执行
cat ${dataPath}/batch1-1000.log | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic1} | > /dev/null &
cat ${dataPath}/batch1-1000.log | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic1} | > /dev/null &
fi
}
#每个for循环等待0.1秒,batchNum=10,即每秒执行一次数据发送,
sleep 0.1
done
#脚本结束时间,并统计脚本执行时间,打印到控制台
end=$(date +%s)
echo $(date)
echo $(( $end - $start ))
脚本执行效果
[root@node1 data]# ./batch-send.sh
Sat Jan 30 16:15:53 CST 2021
“1000 of 10000 sent”
“2000 of 10000 sent”
“3000 of 10000 sent”
“4000 of 10000 sent”
“5000 of 10000 sent”
“6000 of 10000 sent”
“7000 of 10000 sent”
“8000 of 10000 sent”
“9000 of 10000 sent”
“10000 of 10000 sent”
Sat Jan 30 16:16:04 CST 2021
11
[root@node1 data]#
这可以通过修改文件中的数据量和循环计时参数,实现每秒发送10000 甚至100000 条数据到kafka。