kafka生产操作

kafka操作文档

前提

1.为什么要写这篇操作文档?

我们在大数据开发过程中,大部分数据都来自kafka,熟练操作kafka命令是必要的

2.这边文章解决什么问题?

满足一些日常工作中kafka的消费命令

生产使用

需求文档会提供:kafka目录、topic、kafak地址

下面是我的测试

kafka地址【服务器地址,后面默认在该目录下执行kafka命令】:/opt/app/kafka

topic :test1

kafka地址 【我配置了hosts,没配置就输入IP】:node01:2181,node02:2181,node03:2181

一般在服务器中,这三类就能满足基本的使用kafka版本、信息都是上游提供的,不归我们负责

1.【消费一条样例数据】最常使用

例:消费kafka的一条数据【一般用来取一条样例数据,获取kafka数据的格式】,便于后续开发

bin/kafka-console-consumer.sh --zookeeper 192.168.88.100:2181,192.168.88.101:2181,192.168.88.100:2181 --topic bigdata2301 --from-beginning --max-messages 1

2.【消费存入文档】从头消费kafka数据,写入文件wsy.log中,[ '>wsy.log' 覆盖写入 ,'>>wsy.log' 追加写入]

bin/kafka-console-consumer.sh --zookeeper 192.168.88.100:2181,192.168.88.101:2181,192.168.88.100:2181 --topic bigdata2301 --from-beginning > wsy.log

3.【过滤消费】消费kafka,根据指定字段一行一行过滤[过滤20210817天,9点的数据时间的数据,SEND_DATE,SEND_TIME是kafka的字段]【我也用IP举个例子】

bin/kafka-console-consumer.sh --zookeeper 192.168.88.100:2181,192.168.88.101:2181,192.168.88.100:2181 --topic bigdata2301 --from-beginning |grep --line-buffered '"SEND_DATE":20210817'|grep --line-buffered '"SEND_TIME":9' >wsy.log

开发使用

1.【程序需要重新消费,补数使用】我们程序在运行过程中,因为程序需求添加或更改,需要重新消费kafka数据。

重新消费kafka数据:消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):
(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);
(2)指定"auto.offset.reset"参数的值为earliest;  //配置有三种:earliest,latest,none情况
结论:
1、如果存在已经提交的offest时,不管设置为earliest 或者latest 都会从已经提交的offest处开始消费
2、如果不存在已经提交的offest时,earliest 表示从头开始消费,latest 表示从最新的数据消费,也就是新产生的数据.
3、none topic各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常

了解这个就行:如果不存在已经提交的offest时,earliest 表示从头开始消费,latest 表示从最新的数据消费,也就是新产生的数据.

测试使用

一般自己做测试使用,改一下topic 和kafka地址就好

1、启动kafka【前提需要启动zookeeper】

bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &

2、创建topic

bin/kafka-topics.sh  --create --topic test1 --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181

3、列举所有topic

bin/kafka-topics.sh  --list --zookeeper node01:2181,node02:2181,node03:2181

4、查看topic【可以查看一些分区信息】

bin/kafka-topics.sh  --describe --topic test1 --zookeeper node01:2181,node02:2181,node03:2181

5、删除topic

bin/kafka-topics.sh  --delete --topic test1 --zookeeper node01:2181,node02:2181,node03:2181

6、生产者

bin/kafka-console-producer.sh --topic test1 --broker-list  node01:9092,node02:9092,node03:9092

7、消费者

bin/kafka-console-consumer.sh --topic test1 --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning

附录一:zookeeper启动脚本

需要先配置集群间免密登陆

zkstart.sh

#!/bin/bash
echo "start zkServer"
for i in 01 02 03
do
ssh node$i "source /etc/profile;zkServer.sh start"
done

执行过程

[root@node01 bin]# ./zkstart.sh 
start zkServer
JMX enabled by default
Using config: /opt/app/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
JMX enabled by default
Using config: /opt/app/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
JMX enabled by default
Using config: /opt/app/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

附录二:kafka启动脚本

自己在使用的时候改一下 kafka 的部署目录和ip

kafka-start.sh

#!/bin/sh
for host in node01 node02 node03
do
        ssh $host "source /etc/profile;/opt/app/kafka/bin/kafka-server-start.sh /opt/app/kafka/config/server.properties >/dev/null 2>&1 &"
        echo "$host kafka is running"
done

执行过程

[root@node01 bin]# ./kafka-start.sh 
node01 kafka is running
node02 kafka is running
node03 kafka is running
上一篇:kafka相关教程(使用内置zookeeper)


下一篇:⽇志收集介绍