消息中间件-Kafka(1)

第一部分:Kafka架构与实操

1.1概念和基本架构

1.1.1kafka介绍

  Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多生产者、多订阅者,基 于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日 志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为*开源项目。 主要应用场景是:日志收集系统和消息系统。 Kafka主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的 访问性能。 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。 同时支持离线数据处理和实时数据处理。 支持在线水平扩展

消息中间件-Kafka(1)

 

  Kafka是一种消息发布-订阅模式,kafka只有消息的拉取。

1.1.2kafka的基本架构

  先来介绍一下kafka中常见的一些名词术语:

  (1)消息和批次:kafka的数据单元称为消息,可看成数据库里的一行记录,消息由字节数组组成。消息有键,当消息需要以一种可控的模式写入想要指定的分区时,需要给消息设置键。

为了提高效率,消息会被分批写入kafka,一批次的消息来自同一个主题和分区,可以减小网络开销。

  (2)模式:消息模式(schema)有许多可用的选项,以便于理解。如JSON和XML,但是它们缺乏强类型处理 能力。Kafka的许多开发者喜欢使用Apache Avro。Avro提供了一种紧凑的序列化格式,模式和消息体 分开。当模式发生变化时,不需要重新生成代码,它还支持强类型和模式进化,其版本既向前兼容,也 向后兼容。

  (3)主题和分区:kafka的消息通过主题进行区分,主题可以被分为若干个分区,一个主题通过分区分布在kafka集群当中,提供了横向扩展的能力。

消息中间件-Kafka(1)

 

 

   (4)生产者和消费者:生产者创建消息,消费者消费消息。一个消息被发送到特定的主题上。分区的选取通常有以下三种方式:1、直接指定分区.2、若有key,则取hash计算出分区。3若

消息不包含key且不手动指定分区则采用轮询的方式进行分配。

消息中间件-Kafka(1)

  消费者通过偏移量来区分消息消费到了哪里,消费者是消费组的一部分,通过消费组避免消息的重复消费。

  (5)broker和集群:一个独立的Kafka服务器称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交 消息到磁盘保存。broker为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘上的消 息。单个broker可以轻松处理数千个分区以及每秒百万级的消息量。

消息中间件-Kafka(1)

 

  每个集群都有一个broker是集群控制器,自动从集群的活跃成员中选举出来,控制器负责监控broker和将分区分配给broker。

  (6)replicas:副本。每个分区可以指定多个副本,副本有以下两种类型: 首领副本 每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。 跟随者副本 首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首 领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提升为新首领。跟随者副本包括同步副本和不同步副本,在发生首领副本切换的时候,只有同步副本可以切换为首 领副本。

  分区中的所有副本统称为AR。AR=ISR+OSR。

  ISR:所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集 合是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消 息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程 度”是指可以忍受的滞后范围,这个范围可以通过参数进行配置。

  OSR:与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas)。在正常 情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR集合为空。

1.2kafka的安装和配置

  1、java环境为前提,安装好java。

  配置环境变量 vi /etc/profile        export   JAVA_HOME=java路径                  export PATH=$PATH:$JAVA_HOME/bin      source /etc/profile

  2、安装zookeeper

  安装包解压  

  备份zoo_sample.conf文件  

  修改配置文件,设置zookeeper存储数据的路径

  配置环境变量  同java步骤

  3、安装配置kafka

  安装解压、配置环境变量

  修改conf目录中的servers.properties文件。

消息中间件-Kafka(1)

 

   4、启动kafka  kafka-server-start.sh  配置文件路径。我们一般采用后台启动,命令为:kafka-server-start.sh -daemon config/server.properties。启动后,我们打开zookeeper,可以看到我们上面的

myKafka路径下生成了相关的一些文件。具体每个文件是何用?以后会 介绍到。

消息中间件-Kafka(1)

 

1.3 常用控制台命令

主题管理

  主题管理采用kafka-topics.sh脚本。

  查看当前集群的主题信息:kafka-topics.sh --zookeeper localhost:2181/myKafka --list

  创建主题(包含分区数、副本数)kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_1 --partitions 1 --replication-factor 1

  查看指定主题的详细信息:kafka-topics.sh --zookeeper localhost:2181/myKafka -- describe --topic topic_1

  删除指定的主题:kafka-topics.sh --zookeeper localhost:2181/myKafka -- delete --topic topic_1

生产者管理

  向指定主题发送消息:kafka-console.producer.sh --broker-list localhost:9020 --topic topic_1

消费者管理

  消费者消费指定主题的消息:kafka-console-consumer.sh --bootstrap-server localhost:9092 - -topic topic_1

第二部分:Kafka高级特性解析

2.1 数据生产流程解析

 

 消息中间件-Kafka(1)

 

1、生产者创建的时候,同时会创建一个sender线程作为守护线程。

2、生产消息内部其实是异步流程,生产的消息经过拦截器、序列化器、分区器后将消息缓存在缓冲区,缓冲区也是在生产者创建时创建。

3、批次发送的条件,batch.size和linger.ms两个参数哪个先达标就算哪个。

4、批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且 失败原因允许重试,那么客户端内部会对该消息进行重试。

5、消息存到broker后会将元数据返回给生产者。生产者可以通过同步或者异步的方式获取元数据。

生产者常配参数:

消息中间件-Kafka(1)

 

消息中间件-Kafka(1)

 

 2.2序列化器

消息中间件-Kafka(1)

 

   由于kafka中的数据都是字节数组,所以在将消息发送到kafka之前必须都序列化为字节数组。若我们要通过kafka发送自定义的对象数据时,则要自定义序列化器完成对象的序列化。

  具体步骤如下,

消息中间件-Kafka(1)

 

   以该user类为例 第一步:定义一个类继承serializer接口,接口中泛型传入user

  第二步:重写serialize方法

消息中间件-Kafka(1)

  核心在于创建一个byteBuffer对象,给对象分配三个空间,第一个四字节存储KEY,第二个四字节存储消息value的长度,第三个为value的长度,然后添加

进入bytebuffer即可。

2.3分区器

  默认分区计算规则:如果生产者消费record指定了分区,则消息发往指定分区。否则若消息含key则根据hash(key)计算分区,剩下情况则轮询发送。

 自定义分区器:

消息中间件-Kafka(1)

 

   创建一个类实现partitioner结构,重写partition方法。在生产者中设置分区器。

消息中间件-Kafka(1)

 

 2.4生产者拦截器

  对发送的消息做统一性的处理。

  自定义拦截器:1、实现producerIntercepter接口,onsend方法在消息发送前调用,onacknowledgement在消息发送返回确认信息时调用

消息中间件-Kafka(1)

 

 消息中间件-Kafka(1)

 

 

 2.5生产者原理剖析

  消息中间件-Kafka(1)

  

 

 

消息中间件-Kafka(1)

上一篇:rabbitMQ面试题


下一篇:Swagger的使用和部署