kafka集群模拟生产和消费数据:
# 集群所有节点启动kafka
cd /usr/develop/kafka_2.11-0.10.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
#创建kafka的topic: test
bin/kafka-topics.sh --create --zookeeper cdh01:2181,cdh02:2181,cdh03:2181,cdh04:2181,cdh05:2181,cdh06:2181 --replication-factor 2 --partitions 3 --topic test
#列举所有的topics
bin/kafka-topics.sh --list --zookeeper cdh01:2181,cdh02:2181,cdh03:2181,cdh04:2181,cdh05:2181,cdh06:2181
#模拟生产数据
bin/kafka-console-producer.sh --broker-list cdh01:9092,cdh02:9092,cdh03:9092,cdh04:9092,cdh05:9092,cdh06:9092 --topic test
#模拟消费数据
bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper cdh01:2181,cdh02:2181,cdh03:2181,cdh04:2181,cdh05:2181,cdh06:2181
在生产数据端报错:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for test-0
在消费数据端报错:
Failed to send producer request with correlation id 8 to broker 2 with data for partitions [test,0]
java.nio.channels.ClosedChannelException
原因:网络问题。原来是没有关闭SELinux。
我之前在配置关闭SELinux时,本来应该写SELINUX=disabled,我写成了SELINUX=disable。
解决方法:重新配置关闭SELinux。
小结:网络问题可能有很多中,可能时未关闭防火墙,SELinux,也可能网络就是不通。