RabbitMQ生产者消费者模型(二)

       作为主流的MQ消息队列中间件,RabbitMQ也是具备了生产者消费者的模型,那么也就是说

生产者把消息发送后,消费者来作为接收具体的消息。本文章主要详细的概述RabbitMQ的生产者

投递和消费者监听。

一、消息传递流程

        下面主要详细的总结下RabbitMQ消息队列服务器消息彻底的整体流程,具体汇总如下:

  • 生产者只负责把消息投递到Exchange,这个过程不需要刻意的关注Queue
  • 而由Exchange把消息传递给Queue
  • 作为消费者的程序来负责监听Queue的消息
  • 为了保障消息传递的准确性以及及时性,Exchange与Queue会存在一定的绑定关系就是路由Key

二、MQ投递

        依据RabbitMQ的架构模型,在生产者模型和消费者模型中,其实生产者和消费者并不知道

对方的存在,这是异步通信的特性。作为生产者,它只需要把消息投递到Exchange,在这个过程

中生产者并不需要关注Queue,事实上生产者也是无法关注到Queue的,那么消息是如何让消费者

来监听并且接收的了?这就是说会在Exchange和Queue之间建立一种映射关系,而这层关系就不是

生产者所需要关注的了。作为消费者也不需要刻意的关注Exchange,而只需要监听Queue。

2.1、引入RabbitMQ的jar

        要使用RabbitMQ的前提是需要引入RabbitMQ的jar,那么就需要在pom.xml文件里面新增RabbitMQ

的服务端和客户端,具体如下:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </dependency>

2.2、生产者投递步骤

       生产者把消息需要投递给Exchange,那么它的步骤具体总结如下:

ConnectionFactory类负责获取连接工厂

Connection类的对象获取一个连接

Channel创建数据通道信道,可以发送和接收消息

 下面具体是完整的生产者投递的代码,具体如下:

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer
{
    public static void main(String[] args) throws  Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        //配置连接mq的地址信息
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        //连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //通过connection来创建Channel
        Channel channel = connection.createChannel();

        //通过channel来发送具体的数据信息
        String msg = "Hello RabbitMQ";
        channel.basicPublish("saas", "", null, msg.getBytes());
        //发送消息成功后,关闭具体的连接
        channel.close();
        connection.close();
    }
}

在如上中,我们可以看到我们首先需要连接到RabbitMQ的服务器,然后在发送消息message的时候我们需要

指定具体的Exchange,因为对于生产者来说,它只关注的是把消息投递给Exchange。

2.3、消费者监听

        生产者把消息投递到Exchange,那么作为消费者就需要来监听具体的消息了。监听的整个过程首先也是

需要建立RabbitMQ的服务器,这部分涉及到的代码具体如下:

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.*;

public class Consumer
{
    //定义exchange
    private static final String EXCHANGE = "saas";
    //定义队列
    private  static  final String queueName="saas";

    public static void main(String[] args) throws  Exception
    {
        try{
            //创建连接工厂
            ConnectionFactory connectionFactory=new ConnectionFactory();
            //配置连接mq的地址信息
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            //连接工厂创建连接
            Connection connection=connectionFactory.newConnection();

            //通过connection来创建Channel
            Channel channel=connection.createChannel();

            //设置exchange类型为fanout
            channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);

        /*
         定义一个队列
         * 一个队列来接收数据后,消费端才可以从队列里面来接收具体的数据
         * param1:队列名称
         * param2:是否持久化
         * param3:队列是否独占此连接
         * param4:队列不再使用时是否自动删除此队列
         * param5:队列参数
         * */
            channel.queueDeclare(queueName,true,false,false,null);

            channel.queueBind(queueName,EXCHANGE,"");

            //创建一个消费者来消费数据
            DefaultConsumer consumer=new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(
                        String consumerTag,
                        com.rabbitmq.client.Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte [] body) throws  java.io.IOException
                {
                    String message=new String(body);
                    System.out.println("接收到的消息为:"+message);
                };
            };
            // 监听队列,从队列中获取数据
            System.out.println("消费者程序启动成功,准备接收生产者的数据:\n");
            channel.basicConsume(queueName,consumer);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

在如上中,我们看到Exchange与生产端的Exchange名字是一样的,那么只有这样才能够建立绑定关系,

再说的更加简单点来说,生产者把消息给到Exchange,然后Exchange与Queue之间有一个层映射关系,

那么只有这样消费者监听队列才能够收取message的消息。

2.4、绑定关系

        刚才说到Exchange与Queue之间的绑定关系,下面就针对这部分具体的演示下。我们先启动消费者

的程序,启动成功后,就会自动的创建Exchange和Queue,就可以从Exchange的绑定以及Queue的绑定

中能够获取到对应的绑定关系。

2.4.1、Exchange绑定关系

         下面的图是消费者的程序启动后创建的Exchange,以及它的绑定关系,具体如下:

RabbitMQ生产者消费者模型(二)

2.4.2、消费者绑定关系

          在Exchange的绑定关系中,点击To里面saas,就会自动的跳转到Queue,具体如下所示:

RabbitMQ生产者消费者模型(二)

2.5、406错误避免

        很多初学者在学习RabbitMQ的时候,总是提前创建好Exchange和Queue,这样结果导致消费者的程序报很多

的错误,具体错误如下:

java.io.IOException
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
	at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:783)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:252)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:242)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:222)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:227)
	at com.example.rabbitmq.quickstart.Consumer.main(Consumer.java:31)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
	... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
	at java.lang.Thread.run(Thread.java:748)

其实遇到该问题,最简单解决问题的方式就是删除自己创建的Exchange和Queue。删除后,再次执行消费者的

程序,它会自动创建Exchange和Queue,而且也就不会再报一系列的具体问题了。解决了如上的问题后,再次

执行生产者的程序,就可以看到生产者发送的消息就能够被消费者这边监听到。感谢您的阅读,下个文章主要

介绍Exchange详解。

上一篇:安装minikube


下一篇:GBase 8c全文检索-文本匹配