系统环境:Linux CentOS 7.6
Kafka:2.12-2.5.0
1、安装JDK 1.8
yum -y install java-1.8.0*
java -version 命令查看JDK版本,如图安装成功:
2、安装zookeeper服务
3、创建工作目录,下载安装包:
#创建安装目录 mkdir -p /opt/kafka #移动到目录 cd /opt/kafka #下载kafka安装包 wget https://mirrors.aliyun.com/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz #解压缩 tar -zxvf kafka_2.12-2.5.0.tgz
4、配置文件:
#进入配置目录 cd kafka_2.12-2.5.0/config/ #备份配置文件 cp server.properties server.properties.bak #修改配置文件 vim server.properties #修改及添加以下配置 broker.id=1 listeners=PLAINTEXT://127.0.0.1:9092 advertised.listeners=PLAINTEXT://127.0.0.1:9092 #其他自定义配置(根据实际修改) zookeeper.connect=127.0.0.1:2181 zookeeper.connection.timeout.ms=18000 #保存退出 :wq
#配置说明
broker.id:当前机器在集群中的唯一标识。例如有三台Kafka主机,则分别配置为1,2,3。
listeners:服务监听端口。
advertised.listeners:提供给生产者,消费者的端口号,即外部访问地址。默认为listeners的值。
zookeeper.connect:zookeeper连接地址。如有集群配置,每台Kafka主机都需要连接全部zookeeper服务,实例如下:
zookeeper.connect=192.168.1.41:2181,192.168.1.42:2181,192.168.1.47:2181
zookeeper.connection.timeout.ms:zookeeper连接超时时间。
5、启动Kafka:
(首先确保已启动zookeeper)
#移到工作目录 cd /opt/kafka/kafka_2.12-2.5.0/bin/ #启动kafka ./kafka-server-start.sh -daemon ../config/server.properties #关闭kafka服务 ./kafka-server-stop.sh
查看端口已被监听,启动成功:
6、测试创建一个topic:
#移到工作目录 cd /opt/kafka/kafka_2.12-2.5.0/bin/ #创建topic ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic topic1 #查看topic信息 ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic topic1
#启动生产者控制台 ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic1 #启动消费者控制台(新开一个窗口) ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic1 --from-beginning
此时在生成者控制台发一条测试消息,消费者控制台即可收到: