kafka 消息队列

背景

  1. message bus项目中多处用到了kafka消息队列,对kafka仅知道消息系统这个浅显的认识;对于理解整个项目的实现不够深刻,进而无法从测试角度去评估测试风险;
  2. 通过调研,国内大厂使用kafka较多,对于这一基本技术应该有所了解;

目标

  1. 对kafka的基本概念,基本原理,常见应用有所了解。
  2. 了解messagebus在kafka中的应用
  3. 消息系统选型

  4. 测试角度分析kafka的常见问题

开始学习

 初印象

Kafka 是由 LinkedIn 开发的一个分布式的消息系统,使用 Scala 编写,它以可水平扩展和高吞吐率而被广泛使用。

1.基本概念

  • Producer
    • 发布消息的对象称之为主题生产者(Kafka topic producer)
  • Consumer

    • 订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • Topic

    • Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)。
  • Broker

    • 已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
  • Partition 

    • Parition是物理上的概念,每个Topic包含一个或多个Partition.
  • Consumer Group 

    • 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

以GDS举例:

kafka 消息队列

 

 

配置位于:

sip-goservice/config/configold/conf_staging.yaml

2.基本原理

 

单节点kafka:

kafka 消息队列

 

 

 

broker →  topic → partition

broker 和 topic可以是多对多的关系,其他是一对多。

Topic是发布的消息的类别名,一个topic可以有零个,一个或多个消费者订阅该主题的消息。

对于每个topic,Kafka集群都会维护一个分区log,就像下图中所示:

kafka 消息队列

 

 

 

Topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的

Kafka集群保持所有的消息,直到它们过期(无论消息是否被消费)。实际上消费者所持有的仅有的元数据就是这个offset(偏移量),也就是说offset由消费者来控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更早的位置,重新读取消息。可以看到这种设计对消费者来说操作自如,一个消费者的操作不会影响其它消费者对此log的处理。

 

集群kafka:

kafka 消息队列

 

 

 

副本同步以及选举:

kafka 消息队列

 

 

 

 

3.常见应用

1.异步处理

消息队列的主要特点是异步处理,主要目的是减少请求响应时间,实现非核心流程异步化,提高系统响应性能。

所以典型的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作,作为消息放入消息队列。

 

2.应用解耦

kafka 消息队列

 

 

 


使用了消息队列后,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦。

每个成员不必受其他成员影响,可以更独立自主,只通过消息队列MQ来联系。

举一个例子:用户下订单流程,下订单后会发生扣库存这个动作,上游系统订单和下游系统扣库存,就可以通过上图的消息队列MQ来联系,扣库存异步化,从而实现订单系统与库存系统的应用解耦。

 

3.流量削锋

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

 

4.日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。

 

5.消息通讯

消息队列一般都内置了高效的通信机制,因此也可以用于单纯的消息通讯,比如实现点对点消息队列或者聊天室等。

 

messagebus 项目在Kafka应用

1.设计文档

Shared Service Message Bus

2.设计背景简介

  1. 目前的业务表中 ,一个kafka topic对应了多张物理数据表; 不同的业务方需要各自监控gds binlog ,并且对拿到的message做处理 从而获得自己需要的message ,存在重复作业,冗余度高,不便于管控
  2. 物理表的变化,如水平拆分,垂直拆分对业务部署影响较大,需要重新部署kafka服务

    所以需要提供统一的服务 ,屏蔽基础物理数据库的复杂性以及消息格式的复杂性:
    系统设计总览:
    kafka 消息队列

     

     


    核心模块总览:
    kafka 消息队列

     

     

 

GDS kafka 消费举例:

消息系统选型

Kafka、RabbitMQ、RocketMQ

消息中间件
基本介绍
主要特征
备注
Kafka 开源,Apache

优点:高吞吐量;

缺点:不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

 
RabbitMQ 开源

优点:数据一致性、稳定性和可靠性要求很高的场景

缺点:性能和吞吐量较低;

 
RocketMQ 开源,阿里;RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。 优点:高吞吐量、高可用性、适合大规模分布式系统应用  
       
       
       

性能吞吐量:Kafka > RocketMQ > RabbitMQ

可靠性:Kafka < RocketMQ < RabbitMQ

总结:根据需要,选择当前业务最适合的。

 

 

测试问题分析

  • Case 1: kafka的过期时间,超过过期时间,丢弃消息,所以注意应用场景
  • Case 2: 2次确认,处理完消息,给kafka再发送一条消息,防止消息丢失。
  • Case3:消费者拿到消息之后,服务崩溃/服务超时,消息丢失

                  

 

 

可能问题分析

  • Case1: 怎么样避免重复消费
    • 有的业务不能重复消费,eg:银行算账系统之类的
  • Case2:消息先后顺序
    • 有的业务强顺序,eg:aiitem  price,
    • 2种实现方式:直接用kafka的数据
    • 读db
  • Case3:
    • kafka server是不记录消息是否消费?client消不消费消息都在那里,是由client的offset记录消费到了哪里?
    • 问题:client经常会重启服务?重启之后offset的值没有了?
    • kafka记录每个consumer group消费的位置。
    • kafka server的每个partition维护2个offset,一个是消费的offset, 一个是commit的offset。如果有client重启,新的consumer会从commit的offset开始消费。
  • Case4:
    • 集群的配置,consumer group 的配置参数如果不对,会导致性能缺陷
    • 性能调优问题:

参考文档


kafka 文档:
https://www.orchome.com/5

mac如何本地搭建kafka 

https://colobu.com/2019/09/27/install-Kafka-on-Mac/

kafka basic分享&如何写一个consumer程序

https://drive.google.com/file/d/1oQiT0Erx3iMIJxsKTAzJ12x0q6GSPhic/view?usp=sharing

官方文档:
https://kafka.apache.org/quickstart

CDC
https://farer.org/2018/07/27/change-data-capture/

alibaba
https://github.com/alibaba/canal

rabbitmq

https://www.rabbitmq.com/

rocketmq

源码:https://github.com/apache/rocketmq

专题:https://www.jianshu.com/c/8cfe32491344

 
 

kafka 消息队列

上一篇:设计模式


下一篇:second_stage_property模块