channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println(“消息发送成功”);
}
}
long end = System.currentTimeMillis();
System.out.println(“发布” + MESSAGE_COUNT + “个单独确认消息,耗时” + (end - begin) +
“ms”);
}
}
执行结果
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。
当然,现在跟你说慢,你莫得感知,下面几种综合起来对比你就会发现他的效率有多低了!
二、批量确认发布
========
与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地 提高吞吐量。
生产者
/**
- 批量发布确认
*/
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//队列名使用uuid来获取不重复的值,不需要自己再进行命名了。
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//批量确认消息大小
int batchSize = 88;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + “”;
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();//确认代码
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println(“发布” + MESSAGE_COUNT + “个批量确认消息,耗时” + (end - begin) +
“ms”);
}
执行结果
**缺点:**当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。
当然这种方案仍然是同步的,也一样阻塞消息的发布。
三、异步确认发布
========
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的。
生产者
/**
- 异步发布确认
*/
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
/**
-
线程安全有序的一个哈希表,适用于高并发的情况
-
1.轻松的将序号与消息进行关联
-
2.轻松批量删除条目 只要给到序列号
-
3.支持并发访问
*/
ConcurrentSkip
《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》
【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享
ListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/**
-
确认收到消息的一个回调
-
1.消息序列号
-
2.true 可以确认小于等于当前序列号的消息
-
false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear();
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println(“发布的消息”+message+“未被确认,序列号”+sequenceNumber);
};
/**
-
添加一个异步确认的监听器
-
1.确认收到消息的回调
-
2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = “消息” + i;
/**
-
channel.getNextPublishSeqNo()获取下一个消息的序列号
-
通过序列号与消息体进行一个关联
-
全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println(“发布” + MESSAGE_COUNT + “个异步确认消息,耗时” + (end - begin) +
“ms”);
}
}
执行结果
很容易看出,这种方式速度快得飞起呀!
如何处理未确认的消息?
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
四、总结
====
单独发布消息
- 耗时:21210ms