工作模式,默认平均分配,代码与直连一样
之所以默认平均分配,与消息确认机制有关。如果自动确认设置为true,当某一消费者接收到消息是,会直接给对列确认,对列就会删除数据,若消费者消费到一半宕机了,消息就丢失了。
能者多劳---轮询
轮询方法
package com.huixiang.rabbitmq.work.lunxun;
import com.huixiang.utils.RabbitmqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitmqUtils.setConnection();
channel = connection.createChannel();
//队列绑定通道
String queueName = "queueName1";
// 设置一次消费多少消息
channel.basicQos(1);
channel.queueDeclare(queueName,false,false,false,null);
/**
* 通道绑定消息队列
* @params1 队列名称
* @params2 开启消费确认机制,false不会自动确认
* @params3 消费时的回调接口
* @params4 是否自动化删除,随着最后一个消费者消费完毕后,是否删除队列
*/
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("消费者1:收到消息" + new String(message.getBody(), "UTF-8"));
try {
Thread.sleep(10000);
}catch (Exception e){
e.printStackTrace();
}
/**
* 消息手动确认
* @params1 确认的是哪一个消息的标志
* @params2 是否开启多个消息同时确认
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("失败");
}
});
}catch (Exception e){
e.printStackTrace();
}
}
}