RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析

以下转自:http://blog.csdn.net/yangbutao/article/details/10395599

rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message, Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息, 最近翻阅了基于java的客户端的相关源码,简单做个分析。 编程模型伪代码如下: ConnectionFactory factory = new ConnectionFactory(); Connection conn = factory.newConnection(); Channel channel=conn.createChannel(); 创建Connection需要指定MQ的物理地址和端口,是socket tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ channel 以下是基于channel上的两种消费方式。

1、Subscribe订阅方式 boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag",      new DefaultConsumer(channel) {          @Override          public void handleDelivery(String consumerTag,                                     Envelope envelope,                                     AMQP.BasicProperties properties,                                     byte[] body)              throws IOException          {              String routingKey = envelope.getRoutingKey();              String contentType = properties.contentType;              long deliveryTag = envelope.getDeliveryTag();              // (process the message components here ...)              channel.basicAck(deliveryTag, false);          }      });

订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息, 这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。 参见ChannelN中的方法     public String basicConsume(String queue, boolean autoAck, String consumerTag,                                boolean noLocal, boolean exclusive, Map<String, Object> arguments,                                final Consumer callback)         throws IOException     {     ......         rpc((Method)             new Basic.Consume.Builder()              .queue(queue)              .consumerTag(consumerTag)              .noLocal(noLocal)              .noAck(autoAck)              .exclusive(exclusive)              .arguments(arguments)             .build(),             k);

try {             return k.getReply();         } catch(ShutdownSignalException ex) {             throw wrap(ex);         }     }

Consumer接收消息的过程: 创建Connection后,会启动MainLoop后台线程,循环从socket(FrameHandler)中获取数据包(Frame),调用channel.handleFrame(Frame frame)处理消息,     public void handleFrame(Frame frame) throws IOException {         AMQCommand command = _command;         if (command.handleFrame(frame)) { // 对消息进行协议assemble             _command = new AMQCommand(); // prepare for the next one             handleCompleteInboundCommand(command);//对消息消费处理         }     } ChannelN.handleCompleteInboundCommand        ---ChannelN.processAsync            ----dispatcher.handleDelivery                  ---QueueingConsumer.handleDelivery                      ---this._queue.add(new Delivery(envelope, properties, body));//消息最终放到队列中 每个Consumer都有一个BlockQueue,用于缓存从socket中获取的消息。 接下来,Consumer对象就可以调用api来从客户端缓存的_queue中依次获取消息,进行消费,参见QueueingConsumer.nextDelivery()

对于这种长连接的方式,没看到心跳功能,以防止长连接的因网络等原因连接失效

2、poll API方式 ChannelN: GetResponse basicGet(String queue, boolean autoAck) 这种方式比较简单,直接通过RPC从MQ Server端获取队列中的消息

上一篇:Linux 网络编程的5种IO模型:多路复用(select/poll/epoll)


下一篇:centos7.3 64位 安装git