kafka分布式消息队列介绍以及集群安装

简介

首先简单说下对kafka的理解:

  1、kafka是一个分布式的消息缓存系统;

  2、kafka集群中的服务器节点都被称作broker

  3、kafka的客户端分为:一是producer(消息生产者)负责往消息队列中放入消息;另一类是consumer(消息消费者)负责从消息队列中取消息。客户端和服务器之间的通信采用tcp协议

  4、kafka中不同业务系统的消息可以通过topic(主题)进行区分,也就是说一个主题就是一个消息队列,而且每一个消息topic都会被分区,以分担消息读写的负载

  5、parition(分区)是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件。每一个分区都可以有多个副本,以防止数据的丢失

  6、某一个分区中的数据如果需要更新,都必须通过该分区所有副本中的leader来更新

  7、消费者可以分组,每一个consumer属于特定的组,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。比如有两个消费者组A和B,共同消费一个topic:topic-1,A和B所消费的消息不会重复.

  比如 topic-1中有100个消息,每个消息有一个id,编号从0-99,那么,如果A组消费0-49号,B组就消费50-99号

  8、消费者在具体消费某个topic中的消息时,可以指定起始偏移量

集群安装、启动

  1、下载安装包并解压

tar xf kafka_2.10-0.8.1.1.tgz
cd kafka_2.10-0.8.1.1

  2、修改config/server.properties配置文件

broker.id=1
zookeeper.connect=192.168.2.100:2181, 192.168.2.110:2181, 192.168.2.120:2181

  注:kafka集群依赖zookeeper集群,所以此处需要配置zookeeper集群;zookeeper集群配置请参见:http://www.cnblogs.com/skyfeng/articles/6701458.html

  3、将kafka解压包使用scp命令拷贝至集群其他节点,命令:

scp -r kafka_2.10-0.8.1.1/ 192.168.2.110://home/hadoop/app

  4、将zookeeper集群启动,请参见:http://www.cnblogs.com/skyfeng/articles/6701458.html

  5、在每一台节点上启动broker

bin/kafka-server-start.sh config/server.properties
//运行在后台命令:
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 & 
//使用jps命令查看是否启动
[hadoop@hadoop1-1 kafka_2.10-0.8.1.1]$ jps
2400 Jps
2360 Kafka
2289 QuorumPeerMain

简单测试  

  1、在kafka集群中创建一个topic

[hadoop@hadoop1-1 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper 192.168.2.100:2181 --replication-factor 3 --partitions 1 --topic topictest
Created topic "topictest".

    replication-factor:表示副本数量

    partitions :分区数量

  2、用一个producer向某一个topic中写入消息

[hadoop@hadoop1-1 kafka_2.10-0.8.1.1]$ bin/kafka-console-producer.sh --broker-list 192.168.2.100:9092 --topic topictest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

  3、用一个comsumer从某一个topic中读取信息

[hadoop@hadoop1-2 kafka_2.10-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper 192.168.2.100:2181 --from-beginning --topic topictest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

  在生产者中输入内容,消费者会及时从队列中获取消息,如下图:

  kafka分布式消息队列介绍以及集群安装

  4、查看一个topic的分区及副本状态信息

[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.110:2181 --topic topictest
Topic:topictest PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topictest Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.100:2181 --topic topictest
Topic:topictest PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topictest Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.120:2181 --topic topictest
Topic:topictest PartitionCount:1 ReplicationFactor:3 Configs:
Topic: topictest Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$

  5、查看topic

bin/kafka-topics.sh --list --zookeeper 192.168.2.100:
上一篇:[LeetCode] BFS解决的题目


下一篇:Composer 是什么