RabbitMQ入门篇

1 rabbitMQ入门教程-哔站-百知教育

在此感谢哔哩哔哩的up主:编程不良人,视频地址:https://www.bilibili.com/video/BV1dE411K7MG ,于2019-12-30学习后用Typora0.9.98整理的观后感。

1.1 MQ引言

1.1.1 MQ介绍

  • MQ(message quene):消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断向队列中获取消息。因为消息的生产者和消费者都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入轻松实现系统间解耦。

  • MQ种类:

    • ActiveMQ
    • Kafka
    • RocketMQ
    • RabbitMQ
  • RabbitMQ:基于AMQP协议,erlang语言开发的,是部署最广泛的开源消息中间件。

1.2 rabbitmq安装

1.3 管理命令行及管理界面初识

1.4 消息发布模式

1.4.1 第一种模型:直连

  1. 引入依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    
  2. 开发生产者

    //创建连接mq的连接工厂对象
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置连接rabbitmq的主机
    connectionFactory.setHost("127.0.0.1");
    connectionFactory.setPort(5672);
    //设置连接哪个虚拟主机
    connectionFactory.setVirtualHost("/virHost1");
    //设置访问虚拟主机的用户名及密码
    connectionFactory.setUsername("root");
    connectionFactory.setPassword("root");
    
    //获取连接对象
    Connection connection = connectionFactory.newConnection();
    
    //获取连接中的通道
    Channel channel = connection.createChannel();
    
    //通道绑定对应的消息队列 消费者与生产者的特性要一致
    // argument1:队列名称(不存在会自动创建)
    // argument2:定义队列是否要持久化,仅仅是队列持久化,若想消息持久化还得额外设置(发布消息时) true-持久化
    // argument3:druable 是否独占队列,即通道独占 true-独占
    // argument4:autoDelete 是否在消费完成后自动删除队列 true-自动删除
    // argument5:额外附加参数
    channel.queueDeclare("hello", false, false, false, null);
    
    //发布消息
    // argument1:交换机名称
    // argument2:队列名称
    // argument3:传递消息的额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN:消息持久化
    // argument4:消息的具体内容)
    channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello message".getBytes());
    
    channel.close();
    connection.close();
    
  3. 开发消费者

    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("127.0.0.1");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/virHost1");
    connectionFactory.setUsername("root");
    connectionFactory.setPassword("root");
    
    Connection connection = connectionFactory.newConnection();
    
    Channel channel = connection.createChannel();
    
    channel.queueDeclare("hello", false, false, false, null);
    
    //消费消息 (消费的队列名称, 开始消息的自动确认机制, 消费时的回调接口)
    channel.basicConsume("hello", true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
            System.out.println( new String(body) );
        }
    });
    
    /* 不关闭则可以一直监听队列
    channel.close();
    connection.close();
    */
    
  4. 总结:消费者不能用@Test注解和关闭通道连接,否则看不到执行结果,因为多线程问题未等到处理便被关闭了

1.4.2 第二种模型:work queue 任务模型

WorkQueue任务模型,当消息比较耗时,生产消息的速度大于消费者速度,就会导致堆积越来越多无法处理。此时就可以使用该模型:让多个消费者绑定到一个队列,共同消费队列中的消息。

1.4.2.1 work模型之平均消费消息-轮询

  1. rabbitmq中连接工具类封装
package messagequeue.rabbitmq.baizhiedu;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitmqUtils {
    private static ConnectionFactory connectionFactory;
    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/virHost1");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
    }

    //定义提供连接对象的方法
    public static Connection getConnection() {
        try{
            return connectionFactory.newConnection();
        }catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    //关闭通道和关闭连接
    public static void closeConnectionCannel(Channel channel, Connection connection) {
        try {
            if (channel!=null) channel.close();
            if (connection!=null) connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
  1. 开发生产者
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for(int i=0; i<100; i++) {
    channel.basicPublish("", "work", null, ("workMessage"+i).getBytes());
}
RabbitmqUtils.closeConnectionCannel(channel, connection);
  1. 开发两个消费者
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者1:"+ new String(body));
    }
});
  1. 执行结果:如图示消费者之间采用轮询方式消费,

RabbitMQ入门篇

1.4.2.2 消息确定机制和能者多劳实现

  1. 修改上述中消费者1将其降速
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者1:"+ new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
  1. 执行结果如图所示:默认情况下RabbitMQ每个消费者会收到相同数量的消息,消费慢的会慢慢消费

RabbitMQ入门篇

  1. 消息自动确认机制:

channel.basicConsume方法中自动确认autoAck=true时,不关心业务到底是否处理完(宕机即丢失),拿到即告诉队列消费完,队列即删出这些队列,

Connection connection = RabbitmqUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);                                                //通道每次只能消费一个消息
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", false, new DefaultConsumer(channel){   //消费消息时关闭自动确认
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者2:"+ new String(body));
        channel.basicAck(envelope.getDeliveryTag(), false);          //argument1:手动确认消息标识,argument2:false为每次确认一个
    }
});

RabbitMQ入门篇

上一篇:2021-11-10


下一篇:2021-11-03