MQ消息队列

MQ消息队列

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

使用场景主要有异步处理,应用解耦,流量削锋和消息通讯四个场景

目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

JMS消息服务

JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

消息模型

两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)

P2P

P2P的特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。

Pub/Sub模式

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
  • 为了消费消息,订阅者必须保持运行的状态
  • 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
  • 如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。

同步

订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;

#####异步
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

常见的消息中间件

ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

ActiveMQ的基本使用

安装

http://activemq.apache.org/download-archives.html

由于springboot5.2.2中支持的是activeMQ5.16,所以安装类似版本

下载完成后进行解压,执行 activemq.bat start 启动。 如果能成功访问 http://localhost:8161/admin。用户名和密码默认为admin,则启动成功

编程使用

依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

核心配置 application.properties

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin

主类上添加注解启动JMS支持

@EnableJms
@SpringBootApplication
public class Mq01Application {
    public static void main(String[] args) {
        SpringApplication.run(Mq01Application.class, args);
    }
}

添加配置类用于定义队列Queue和主题Topic对象

@Configuration
public class ActiveMqConfig {
    @Bean(name = "testQueue")
    public Queue queue() {
        return new ActiveMQQueue("test.myqueue");
    }
}

定义消息的生产者

@SpringBootTest
class Mq01ApplicationTests {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private Queue destination;
    @Test
    void contextLoads() {
        jmsMessagingTemplate.convertAndSend(destination, "message");
    }

}

定义消息的消费者

  • 生产者和消费者之间没有任何耦合关系
  • 消息是异步接收的
@Component
public class QueueConsumerListener {
    @JmsListener(destination="test.myqueue")
    public void readActiveQueue(String message) {
        System.out.println("queue接受到:" + message);
    }
}

topic模式的生产者

//参数1是一个Topic对象
 jmsMessagingTemplate.convertAndSend(destination, "message===");

这个topic对象定义在配置类中

@Configuration
public class ActiveMqConfig {

    @Bean(name = "testTopic")
    public Topic topic() {
        return new ActiveMQTopic("test.mytopic");
    }
}

topic模式的消费者


问题

过期策略

死信队列

慢消费者策略

上一篇:解决 Git 报 error unknown switch `e‘ 错误


下一篇:【MQ 快速入门】介绍、分类、组成、优缺点、测试点