简介
首先简单说下对kafka的理解:
1、kafka是一个分布式的消息缓存系统;
2、kafka集群中的服务器节点都被称作broker
3、kafka的客户端分为:一是producer(消息生产者)负责往消息队列中放入消息;另一类是consumer(消息消费者)负责从消息队列中取消息。客户端和服务器之间的通信采用tcp协议
4、kafka中不同业务系统的消息可以通过topic(主题)进行区分,也就是说一个主题就是一个消息队列,而且每一个消息topic都会被分区,以分担消息读写的负载
5、parition(分区)是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件。每一个分区都可以有多个副本,以防止数据的丢失
6、某一个分区中的数据如果需要更新,都必须通过该分区所有副本中的leader来更新
7、消费者可以分组,每一个consumer属于特定的组,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。比如有两个消费者组A和B,共同消费一个topic:topic-1,A和B所消费的消息不会重复.
比如 topic-1中有100个消息,每个消息有一个id,编号从0-99,那么,如果A组消费0-49号,B组就消费50-99号
8、消费者在具体消费某个topic中的消息时,可以指定起始偏移量
集群安装、启动
1、下载安装包并解压
tar xf kafka_2.10-0.8.1.1.tgz
cd kafka_2.10-0.8.1.1
2、修改config/server.properties配置文件
broker.id=1
zookeeper.connect=192.168.2.100:2181, 192.168.2.110:2181, 192.168.2.120:2181
注:kafka集群依赖zookeeper集群,所以此处需要配置zookeeper集群;zookeeper集群配置请参见:http://www.cnblogs.com/skyfeng/articles/6701458.html
3、将kafka解压包使用scp命令拷贝至集群其他节点,命令:
scp -r kafka_2.10-0.8.1.1/ 192.168.2.110://home/hadoop/app
4、将zookeeper集群启动,请参见:http://www.cnblogs.com/skyfeng/articles/6701458.html
5、在每一台节点上启动broker
bin/kafka-server-start.sh config/server.properties
//运行在后台命令:
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
//使用jps命令查看是否启动
[hadoop@hadoop1-1 kafka_2.10-0.8.1.1]$ jps
2400 Jps
2360 Kafka
2289 QuorumPeerMain
简单测试
1、在kafka集群中创建一个topic
[hadoop@hadoop1-1 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper 192.168.2.100:2181 --replication-factor 3 --partitions 1 --topic topictest
Created topic "topictest".
replication-factor:表示副本数量
partitions :分区数量
2、用一个producer向某一个topic中写入消息
[hadoop@hadoop1-1 kafka_2.10-0.8.1.1]$ bin/kafka-console-producer.sh --broker-list 192.168.2.100:9092 --topic topictest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
3、用一个comsumer从某一个topic中读取信息
[hadoop@hadoop1-2 kafka_2.10-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper 192.168.2.100:2181 --from-beginning --topic topictest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
在生产者中输入内容,消费者会及时从队列中获取消息,如下图:
4、查看一个topic的分区及副本状态信息
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.110:2181 --topic topictest
Topic:topictest PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topictest Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.100:2181 --topic topictest
Topic:topictest PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topictest Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.120:2181 --topic topictest
Topic:topictest PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topictest Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$
5、查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.2.100: