SpringBoot 第二十二篇 之 使用RabbitMQ

1. 简介:

    RabbitMQ是一个在AMQP基础上可复用的企业消息系统。

    AMQP(Advanced(预先) Message Queuing Protocol(协议)) 是一个提供统一消息服务的应用层标准协议,基于此协议的        客户端与消息中间件可传递消息,

     并不受客户端中间件不同产品,不同开发语言的限制。

    RabbitMQ 遵循AMQP协议,用erlang语言开发,用来通过协议在完全不同应用之间共享数据,

    或者将作业排队以便让分布式服务器进行处理。

2. 组件说明

SpringBoot 第二十二篇 之 使用RabbitMQ

1. Broker: 消息队列服务器实体

2. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

3. Queue:  消息队列的载体,每个消息都会被投入到一个,或多个队列。

4. Binding: 绑定,它的作用就是把exchange和队列按照一定的规则绑定起来。

5. Routing Key : 路由关键字,exchange 根据这个关键字进行消息投递,

6. channel: 消息通道,在客户端的每个链接里,可建立多个channel,每个channel

     代表一个绘画任务。

7. vhost: 虚拟主机,一个broker中可开设多个vhost,用作不同用户的权限分离。

8. producer: 消息生产者。

9. consumer: 消息消费者。

 

10. 消息队列的使用过程:

    (1)生产者链接到服务器,声明一个exchange,并设置交换机类型等属性。

    (2)消费者 连接到消息队列服务器,打开一个channel

    (3)消费者 声明一个queue,并设置相关属性。

    (4)消费者 使用routing key, 绑定 exchange 和 queue。

    (5)生产者 投递消息到exchange

    (6)exchange 接收到消息后,根据消息的key和已经设置的binding,进行消息路由,将消息

             投递到一个或多个队列。

总结:

   使用rabbitmq 可以解耦应用程序,使用消息在应用程序中进行传递,

   将一些无需及时返回且耗时的操作提取出来,进行异步处理,可节省服务器相应时间,

    提高系统吞吐量。

2. 安装

    安装rabbitmq前,先要安装erlang 并配置好环境变量(erlang_home, path 都要设)。

    2.1.  监测erlang 是否安装好的方法是:

     cmd到erlang安装目录的bin文件夹下:

       erl -version

    2.2  监测rabbitmq是否安装好的方法是:

        cmd到安装目录的sbin目录下,运行

        rabbitmqctl status

 

3. springboot 使用 rabbitmq 示例:

     Rabbit MQ 服务器 会根据路由键将消息从交换路由到消息队列中,如何处理投递多个队列的情况?

   这里不同类型的交换机起到了重要作用。分别是 fanout,direct,topic, 每种类型实现了不同的路由算法。

 

     3.1 使用 Fanout  Exchange 交换机

         当使用Fanout Exchange 时,无需处理路由键,很像子广播,

          每一台与该交换机绑定的队列都会获得一份消息。

          Fanout 交换机处理消息是最快的。

       示例:

          

          1 所需依赖:

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

       2  加载交换机,队列,到容器,并绑定。

/**
 * 初始化Fanout交换机和队列,并将其绑定
 * */
@Configuration
public class FanoutConfig {

    // 初始化创建交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitConstants.FANOUT_EXCHANGE);
    }

    // 初始化创建队列
    @Bean
    public Queue getQueue1(){
        return  new Queue(RabbitConstants.F_QUEUE1,true);
    }
    @Bean
    public Queue getQueue2(){
        return  new Queue(RabbitConstants.F_QUEUE2,true);
    }
    @Bean
    public Queue getQueue3(){
        return  new Queue(RabbitConstants.F_QUEUE3,true);
    }

    // 将交换机和 和队列绑定
    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(getQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(getQueue2()).to(fanoutExchange());
    }
    @Bean
    public Binding binding3(){
        return BindingBuilder.bind(getQueue3()).to(fanoutExchange());
    }
}

  3. 生产者

package com.*.web;

import com.*.config.RabbitConstants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class FanoutPublisherController implements RabbitTemplate.ConfirmCallback {

    private String exchange_name = "test_fanout_exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/fanout/test")
    public String send(String msg){

        System.out.println("发送消息:【"+ msg+"】");
        // 设置回调函数
        rabbitTemplate.setConfirmCallback(this);
        // 发送消息,
        rabbitTemplate.convertAndSend(RabbitConstants.FANOUT_EXCHANGE,"",msg);

        return msg;

    }


    // 回调确认方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ask, String cause) {
        if(ask) {
            System.out.println("消费成功!");
        }else {
            System.out.println("消费失败!:" + cause);
        }
    }
}

    4  消费者

package com.*.web;

import com.*.config.RabbitConstants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutCustomer {

    // 定义监听字符串队列
    @RabbitListener(queues = {RabbitConstants.F_QUEUE1})
    public void receiveMsg1(String msg){
        System.out.println("F_QUEUE1 收到消息:"+msg);

    }

    // 定义监听字符串队列
    @RabbitListener(queues = {RabbitConstants.F_QUEUE2})
    public void receiveMsg2(String msg){
        System.out.println("F_QUEUE2 收到消息:"+msg);
    }

    // 定义监听字符串队列
    @RabbitListener(queues = {RabbitConstants.F_QUEUE3})
    public void receiveMsg3(String msg){
        System.out.println("F_QUEUE3 收到消息:"+msg);
    }

}

    源码地址: https://github.com/sss996/springboot-rabbitmq 

   3.2 使用Topic Exchange

       topic exchange 将路由键 和 某个模式进行匹配。

       此时队列需要绑定在一个模式上。

       通过“#” 匹配一个或多个词,

       通过“*”匹配一个词。因此 “fin.#” 能匹配到“fin.aa.bb",

       但是fin.* 只能匹配到fin.aa

       代码示例:

         1. 初始化交换机,队列,并绑定

            

package com.*.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * 初始化Fanout交换机和队列,并将其绑定
 * */
@Configuration
public class TopicConfig {

    // 初始化创建交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(RabbitConstants.TOPIC_EXCHANGE);
    }

    // 初始化创建队列
    @Bean
    public Queue getQueue1(){
        return  new Queue(RabbitConstants.T_QUEUE1,true);
    }
    @Bean
    public Queue getQueue2(){
        return  new Queue(RabbitConstants.T_QUEUE2,true);
    }

    // 将交换机和 和队列绑定
    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(getQueue1()).to(topicExchange()).with("insert.#");
    }
    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(getQueue2()).to(topicExchange()).with("insert.user");
    }

}

     3.2.2  生产者

package com.*.web;

import com.*.config.RabbitConstants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TopicPublisherController implements RabbitTemplate.ConfirmCallback {

    private String exchange_name = "test_fanout_exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/topic/test")
    public String send(String msg,String routingKey){

        System.out.println("发送消息:【"+ msg+"】");
        // 设置回调函数
        rabbitTemplate.setConfirmCallback(this);
        // 发送消息,
        rabbitTemplate.convertAndSend(RabbitConstants.TOPIC_EXCHANGE,"insert.user",msg);

        return msg;

    }

    // 回调确认方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ask, String cause) {
        if(ask) {
            System.out.println("消费成功!");
        }else {
            System.out.println("消费失败!:" + cause);
        }
    }
}

  3.2.3  消费者

package com.*.web;

import com.*.config.RabbitConstants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicCustomer {

    // 定义监听字符串队列T
    @RabbitListener(queues = {RabbitConstants.T_QUEUE1})
    public void receiveMsg1(String msg){
        System.out.println("T_QUEUE1 收到消息:"+msg);

    }

    // 定义监听字符串队列
    @RabbitListener(queues = {RabbitConstants.T_QUEUE2})
    public void receiveMsg2(String msg){
        System.out.println("T_QUEUE2 收到消息:"+msg);
    }

}

       源码地址: https://github.com/sss996/springboot-rabbitmq  

   3.3  使用 Direct Exchange 

        Direct Exchange .需要将一个队列绑定到交换机上,要求该消息与路由键完全匹配

        用法和topic 基本相同不在重复。

          源码地址: https://github.com/sss996/springboot-rabbitmq 

        

4.   借鉴的博客  https://blog.csdn.net/u013045552/column/info/17329

5.   踩过的坑

     在浏览器中输入localhost:15672 可以打开http客户端。

注意:server端口号是 5672 ,sprnigboot项目里注意不能绑定15672,应该是5672 否则会报如下错误。

java.net.SocketException: Socket Closed
	at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_151]
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_151]
	at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_151]
	at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_151]
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_151]
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_151]
	at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_151]
	at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580) ~[amqp-client-5.1.2.jar:5.1.2]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

2019-07-30 20:06:02.943 ERROR 13320 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).

org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
	at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:62) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:484) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:626) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:240) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1797) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:345) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:995) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: java.net.ConnectException: Connection refused: connect
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_151]
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_151]
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_151]
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_151]
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_151]
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_151]
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_151]
	at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_151]
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:955) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:847) ~[amqp-client-5.1.2.jar:5.1.2]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:457) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	... 9 common frames omitted

 

    

上一篇:Rabbit MQ 学习参考


下一篇:rabbitmq常用命令