kafka操作文档
前提
1.为什么要写这篇操作文档?
我们在大数据开发过程中,大部分数据都来自kafka,熟练操作kafka命令是必要的
2.这边文章解决什么问题?
满足一些日常工作中kafka的消费命令
生产使用
需求文档会提供:kafka目录、topic、kafak地址
下面是我的测试
kafka地址【服务器地址,后面默认在该目录下执行kafka命令】:/opt/app/kafka
topic :test1
kafka地址 【我配置了hosts,没配置就输入IP】:node01:2181,node02:2181,node03:2181
一般在服务器中,这三类就能满足基本的使用kafka版本、信息都是上游提供的,不归我们负责
1.【消费一条样例数据】最常使用
例:消费kafka的一条数据【一般用来取一条样例数据,获取kafka数据的格式】,便于后续开发
bin/kafka-console-consumer.sh --zookeeper 192.168.88.100:2181,192.168.88.101:2181,192.168.88.100:2181 --topic bigdata2301 --from-beginning --max-messages 1
2.【消费存入文档】从头消费kafka数据,写入文件wsy.log中,[ '>wsy.log' 覆盖写入 ,'>>wsy.log' 追加写入]
bin/kafka-console-consumer.sh --zookeeper 192.168.88.100:2181,192.168.88.101:2181,192.168.88.100:2181 --topic bigdata2301 --from-beginning > wsy.log
3.【过滤消费】消费kafka,根据指定字段一行一行过滤[过滤20210817天,9点的数据时间的数据,SEND_DATE,SEND_TIME是kafka的字段]【我也用IP举个例子】
bin/kafka-console-consumer.sh --zookeeper 192.168.88.100:2181,192.168.88.101:2181,192.168.88.100:2181 --topic bigdata2301 --from-beginning |grep --line-buffered '"SEND_DATE":20210817'|grep --line-buffered '"SEND_TIME":9' >wsy.log
开发使用
1.【程序需要重新消费,补数使用】我们程序在运行过程中,因为程序需求添加或更改,需要重新消费kafka数据。
重新消费kafka数据:消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):
(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);
(2)指定"auto.offset.reset"参数的值为earliest; //配置有三种:earliest,latest,none情况
结论:
1、如果存在已经提交的offest时,不管设置为earliest 或者latest 都会从已经提交的offest处开始消费
2、如果不存在已经提交的offest时,earliest 表示从头开始消费,latest 表示从最新的数据消费,也就是新产生的数据.
3、none topic各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常
了解这个就行:如果不存在已经提交的offest时,earliest 表示从头开始消费,latest 表示从最新的数据消费,也就是新产生的数据.
测试使用
一般自己做测试使用,改一下topic 和kafka地址就好
1、启动kafka【前提需要启动zookeeper】
bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &
2、创建topic
bin/kafka-topics.sh --create --topic test1 --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181
3、列举所有topic
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
4、查看topic【可以查看一些分区信息】
bin/kafka-topics.sh --describe --topic test1 --zookeeper node01:2181,node02:2181,node03:2181
5、删除topic
bin/kafka-topics.sh --delete --topic test1 --zookeeper node01:2181,node02:2181,node03:2181
6、生产者
bin/kafka-console-producer.sh --topic test1 --broker-list node01:9092,node02:9092,node03:9092
7、消费者
bin/kafka-console-consumer.sh --topic test1 --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning
附录一:zookeeper启动脚本
需要先配置集群间免密登陆
zkstart.sh
#!/bin/bash
echo "start zkServer"
for i in 01 02 03
do
ssh node$i "source /etc/profile;zkServer.sh start"
done
执行过程
[root@node01 bin]# ./zkstart.sh
start zkServer
JMX enabled by default
Using config: /opt/app/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
JMX enabled by default
Using config: /opt/app/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
JMX enabled by default
Using config: /opt/app/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
附录二:kafka启动脚本
自己在使用的时候改一下 kafka 的部署目录和ip
kafka-start.sh
#!/bin/sh
for host in node01 node02 node03
do
ssh $host "source /etc/profile;/opt/app/kafka/bin/kafka-server-start.sh /opt/app/kafka/config/server.properties >/dev/null 2>&1 &"
echo "$host kafka is running"
done
执行过程
[root@node01 bin]# ./kafka-start.sh
node01 kafka is running
node02 kafka is running
node03 kafka is running