7 Publisher Confirms

文章目录


官方文档地址: 7 Publisher Confirms


可靠的发布与发布者确认。

前提条件

本教程假设你已经安装了 RabbitMQ 并在本地主机端口(5672)上运行。

发布者确认

Publisher Confirm是 RabbitMQ 的一个扩展,用于实现可靠的发布。当在一个channel上启用了发布者确认时,客户端发布的消息将由代理异步确认,这意味着它们已在服务器端得到处理。

概述

在本教程中,我们将使用发布者确认来确保已发布的消息安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。

在一个通道上开启发布者确认

发布者确认是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,所以默认情况下不启用。发布者确认在通道级别使用confirmSelect方法启用:

Channel channel = connection.createChannel();
channel.confirmSelect();

必须在希望使用Publisher Confirms的每个通道上调用此方法。并且应该只启用一次,而不是对发布的每个消息都启用。

策略1:单个的发布消息

让我们从Publisher Confirms的最简单方法开始,即发布消息并同步等待确认:

while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    //设置一个5秒的超时时间
    channel.waitForConfirmsOrDie(5_000);
}

在上面的示例中,我们像往常一样发布消息,并使用Channel#waitForConfirmsOrDie(long)方法等待消息的确认。消息一经确认,该方法立即返回。如果消息在超时时间内没有得到确认,或者消息被nack-ed(否定应答,意味着代理由于某种原因不能处理它),该方法将抛出异常。异常处理通常包括记录错误消息和 / 或重试发送消息。

不同的客户端库有不同的方法来进行发布者确认的同步处理,所以请务必仔细阅读您正在使用的客户端的文档。

这种技术非常简单,但也有一个主要的缺点:它显著降低了发布速度,因为对消息的确认会阻塞所有后续消息的发布。这种方法的吞吐量不会超过每秒数百条。不过,这对于某些应用程序来说已经足够了。

发布者确认是异步的吗?

我们在开头提到,代理以异步方式确认已发布的消息,但在第一个示例中,代码以同步方式等待,直到消息得到确认。客户端实际上异步接收确认,并相应地解除waitForConfirmsOrDie调用阻塞的。可以把waitForConfirmsOrDie看作一个依赖于底层异步通知的同步方法。

策略2:批量的发布消息

为了改进前面的示例,我们可以发布一批消息并等待这整批消息得到确认。下面的例子使用100条作为一批:

int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    outstandingMessageCount++;
    if (outstandingMessageCount == batchSize) {
        ch.waitForConfirmsOrDie(5_000);
        outstandingMessageCount = 0;
    }
}
if (outstandingMessageCount > 0) {
    ch.waitForConfirmsOrDie(5_000);
}

与等待单个消息的确认相比,等待一批消息的确认大大提高了吞吐量(远端 RabbitMQ 节点的确认次数可提高20-30倍)。一个缺点是,在失败的情况下,我们不知道到底是哪里出了问题,因此我们可能必须在内存中保存整批的处理,用来记录有意义的内容或重新发布消息。这个解决方案仍然是同步的,所以它会阻塞消息的发布。

策略3:异步确认发布消息

代理可以异步确认发布的消息,只需要在客户端注册一个回调,就可以收到这些确认的通知:

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
    //消息被确认时的代码
}, (sequenceNumber, multiple) -> {
    //消息被nack-ed时的代码
});

有两个回调:一个用于被确认的消息,另一个用于被nack-ed的消息(可能被代理认为丢失的消息)。每个回调函数有2个参数:

  • sequenceNumber:标识已确认或已nack-ed消息的序列号。我们将很快看到如何将它与发布的消息关联起来。
  • multiple:这是一个布尔值。如果为 false,则只确认或nack-ed一条消息,如果为 true,则确认或nack-ed所有序列号小于等于该序列号的消息。

序列号可以在发布之前通过Channel#getNextPublishSeqNo()方法获得:

int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);

将消息与序列号关联起来的一种简单方法是使用映射。假设我们想要发布字符串,因为它们很容易转换为用于发布的字节数组。下面是一个使用映射将发布序列号与消息的字符串体关联起来的代码示例:

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... 稍后将提供确认回调的代码
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());

发布代码现在使用一个map跟踪出站消息。我们需要在消息确认到达时清理这个map,并做一些事情,比如当消息被nack-ed时记录一个警告:

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
    if (multiple) {
        ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
          sequenceNumber, true
        );
        confirmed.clear();
    } else {
        outstandingConfirms.remove(sequenceNumber);
    }
};

channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
    String body = outstandingConfirms.get(sequenceNumber);
    System.err.format(
      "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
      body, sequenceNumber, multiple
    );
    cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... 发布代码

前面的示例包含一个回调,在确认到达时清除map。注意:这个回调函数可以处理一次确认和多次确认。当确认到达时使用这个回调(Channel#addConfirmListener的第一个参数)。nack-ed消息的回调将检索消息体并发出警告。然后,它重新使用之前的回调来清理未完成确认的map(无论确认消息还是nack-ed消息,它们在映射中的对应条目都必须被删除)。

如何追踪未完成的确认?

我们的示例使用ConcurrentNavigableMap跟踪未完成的确认。这种数据结构之所以非常方便,有几个原因。它可以很容易地将序列号与消息关联起来(无论消息数据是什么),并轻松地清理掉给定序列号的条目(以处理多个确认/nacks)。最后,它支持并发访问,因为确认回调是在客户端拥有的线程中调用的,应该与发布线程保持不同。

除了使用复杂的映射实现之外,还有其他方法可以跟踪未完成的确认,比如使用一个简单的并发哈希映射和一个变量来跟踪发布序列的下界,但是它们通常比较复杂,不属于该教程范围。

总之,异步处理发布者确认通常需要以下步骤:

  • 提供将发布序列号与消息关联起来的方法。
  • 在通道上注册一个确认侦听器,以便在发布者ack /nacks确认信息到达时通知它执行适当的操作,比如记录或重新发布一个被nack-ed的消息。在此步骤中,序列号到消息的关联机制(也就是那个map)可能需要进行一些清理。
  • 在发布消息之前跟踪发布序列号。

重新发布被nack-ed的消息?

从相应的回调中重新发布nack-ed消息可能很诱人,但是应该避免这样做,因为确认回调是在I/O线程中分派的,而通道不应该在这个线程中执行操作。更好的解决方案是将消息排队放入由发布线程轮询的内存队列中。像ConcurrentLinkedQueue这样的类是在确认回调和发布线程之间传输消息的好选择。

综述

在某些应用程序中,确保已发布的消息到达代理是非常重要的。发布者确认是一个有助于满足这一需求的 RabbitMQ 特性。发布者确认在本质上是异步的,但也可以同步处理它们。没有确定的方法规定怎么来实现Publisher Confirm,这通常取决于应用程序和整个系统中的要求。典型的技术:

  • 单个的发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量的发布消息,同步等待批处理的确认:简单、合理的吞吐量,但是当出现问题时很难进行处理。
  • 异步处理:最好的性能和使用资源,在出现错误时能很好的控制,但需要一个正确的实现。

把它们放一起

PublisherConfirms.java类:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BooleanSupplier;

public class PublisherConfirms {

    static final int MESSAGE_COUNT = 50_000;

    static Connection createConnection() throws Exception {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("localhost");
        cf.setUsername("guest");
        cf.setPassword("guest");
        return cf.newConnection();
    }

    public static void main(String[] args) throws Exception {
        publishMessagesIndividually();
        publishMessagesInBatch();
        handlePublishConfirmsAsynchronously();
    }

    static void publishMessagesIndividually() throws Exception {
        try (Connection connection = createConnection()) {
            Channel ch = connection.createChannel();

            String queue = UUID.randomUUID().toString();
            ch.queueDeclare(queue, false, false, true, null);
			//开启发布者确认
            ch.confirmSelect();
            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                ch.basicPublish("", queue, null, body.getBytes());
                ch.waitForConfirmsOrDie(5_000);
            }
            long end = System.nanoTime();
            System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        }
    }

    static void publishMessagesInBatch() throws Exception {
        try (Connection connection = createConnection()) {
            Channel ch = connection.createChannel();

            String queue = UUID.randomUUID().toString();
            ch.queueDeclare(queue, false, false, true, null);
			//开启发布者确认
            ch.confirmSelect();

            int batchSize = 100;
            int outstandingMessageCount = 0;

            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                ch.basicPublish("", queue, null, body.getBytes());
                outstandingMessageCount++;

                if (outstandingMessageCount == batchSize) {
                    ch.waitForConfirmsOrDie(5_000);
                    outstandingMessageCount = 0;
                }
            }

            if (outstandingMessageCount > 0) {
                ch.waitForConfirmsOrDie(5_000);
            }
            long end = System.nanoTime();
            System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        }
    }

    static void handlePublishConfirmsAsynchronously() throws Exception {
        try (Connection connection = createConnection()) {
            Channel ch = connection.createChannel();

            String queue = UUID.randomUUID().toString();
            ch.queueDeclare(queue, false, false, true, null);
			//开启发布者确认
            ch.confirmSelect();

            ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

            ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
                            sequenceNumber, true
                    );
                    confirmed.clear();
                } else {
                    outstandingConfirms.remove(sequenceNumber);
                }
            };

            ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
                String body = outstandingConfirms.get(sequenceNumber);
                System.err.format(
                        "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
                        body, sequenceNumber, multiple
                );
                cleanOutstandingConfirms.handle(sequenceNumber, multiple);
            });

            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                //序列号和消息进行关联
                outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
                ch.basicPublish("", queue, null, body.getBytes());
            }

            if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
                throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
            }

            long end = System.nanoTime();
            System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        }
    }

    static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
        int waited = 0;
        while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
            Thread.sleep(100L);
            waited = +100;
        }
        return condition.getAsBoolean();
    }

}

上面的类包含我们介绍的技术的代码。我们可以编译它,按原来的方式执行它,看看它们是如何执行的:

javac -cp $CP PublisherConfirms.java
java -cp $CP PublisherConfirms

输出如下:

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

如果客户端和服务器位于同一台计算机上,则计算机上的输出应该类似。单独发布的性能不如预期,异步处理与批量发布相比也有点令人失望。

Publisher Comfirms非常依赖于网络,因此我们最好尝试使用远程节点,因为在生产环境中,客户端和服务器通常不在同一台机器上,所以远程节点更现实。PublisherConfirms.java可以很容易地更改为使用非本地节点:

static Connection createConnection() throws Exception {
    ConnectionFactory cf = new ConnectionFactory();
    cf.setHost("remote-host");
    cf.setUsername("remote-user");
    cf.setPassword("remote-password");
    return cf.newConnection();
}

重新编译该类,再次执行,并等待结果:

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们看到单独发布的表现非常糟糕。但是有了客户端和服务器之间的网络,现在批量发布和异步处理的执行结果类似,对发布者确认的异步处理有一个小小的优势。

请记住,批量发布实现起来很简单,但是在发布者没有确认的情况下,想要知道哪些消息无法发送给代理是不容易的。实现对Publisher Comfirms的异步处理要复杂得多,但提供了更好的粒度和更好的操作控制,以便在发布消息被挂起时执行。

请记住,批发布很容易实现,但在发布者确认为否定的情况下,不容易知道哪些消息不能发送到代理。异步处理发布者确认在实现上更复杂一点,但可以提供更好的粒度和对发布消息nack-ed时要执行的操作的更好控制。

上一篇:C++11新特性之 rvalue Reference(右值引用)


下一篇:“Affinity Publisher”让排版更简单,轻松完成你的设计大作