Rabbit的消息确认机制(事务+confirm)
在rabbmitmq中我们可以通过持久化数据解决rabbitmq服务器异常的数据丢失问题
问题:生产者将消息发送出去之后消息到底有没有到达rabbitmq服务器默认的情况是不知道的;
事物两种方式:
AMQP实现了事务机制
Confirm模式
事务机制
txSelect、txCommit、txRollback
txSelect:用户将当前 channel设置成transation横式
txCommit:用于搜交事务
txRollback:回滚事务
一、AMQ模式
AMQ模式耗时,降低Rabbitmq消息吞吐量性能不好
生产者
package tx; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import utils.ConnectionUtil; import java.io.IOException; public class TxSend { private static String QUE_NAME = "test_que_tx"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUE_NAME,false,false,false,null); String msg = "hellow tx"; try { channel.txSelect(); channel.basicPublish("", QUE_NAME, null, msg.getBytes()); channel.txCommit(); } catch (IOException e) { channel.txRollback(); e.printStackTrace(); } System.out.println("txsend=====>" + msg); channel.close(); connection.close(); } }
消费者
package tx; import com.rabbitmq.client.*; import utils.ConnectionUtil; import java.io.IOException; public class TxRecv { private static String QUE_NAME = "test_que_tx"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUE_NAME,false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { //当消息到达时执行回调方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[receivetx]:" + message); } }; channel.basicConsume(QUE_NAME,true,consumer); } }
二、Confirm模式
生产者将信道设置成confirm模式,一旦信道进入 confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出, broker回传给生产者的确认消息中 deliver-tag域包含了确认消息的序列号,此外broker也可以设置 basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
Confirm的最大好处是异步
开启方法:channel.confirmSelect()
编程模式有三种方法:
(1)普通发一条 waitForConfirms()
(2)发一批 waitForConfirms()
(3)异步confirm模式:提供回调方法
1)普通模式代码
生产者
package confirm; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send1 { static String QUEUE_NAME = "test_queue_confirm1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.confirmSelect(); String message = "This is confirm queue"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8")); if(!channel.waitForConfirms()) { System.out.println("message send failed"); } else { System.out.println("message send success"); } channel.close(); connection.close(); } }
消息者
package confirm; import com.rabbitmq.client.*; import utils.ConnectionUtil; import java.io.IOException; public class Recv1 { private static String QUE_NAME = "test_queue_confirm1"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUE_NAME,false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[receiveconfirm1]:" + message); } }; channel.basicConsume(QUE_NAME,true,consumer); } }
2)批量模式
生产者
package confirm; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send1 { static String QUEUE_NAME = "test_queue_confirm2"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.confirmSelect(); String message = "This is confirm queue batch"; for (int i = 0; i < 10; i++) { channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8")); } //发送完一起确认 if(!channel.waitForConfirms()) { System.out.println("message send failed"); } else { System.out.println("message send success"); } channel.close(); connection.close(); } }
消费者
package confirm; import com.rabbitmq.client.*; import utils.ConnectionUtil; import java.io.IOException; public class Recv1 { private static String QUE_NAME = "test_queue_confirm2"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUE_NAME,false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[receiveconfirm1]:" + message); } }; channel.basicConsume(QUE_NAME,true,consumer); } }
三、异步模式
Channel对象提供的ConfirmListener()回调方法只包含 deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个 Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次 handbAck方法,unconfim集合删掉相应的一条(multiple= false)或多条( multiple=tmue)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构
生产者
package confirm; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import utils.ConnectionUtil; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeoutException; public class Send3 { static String QUEUE_NAME = "test_queue_confirm3"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.confirmSelect(); final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException {//成功调用 if(multiple) { System.out.println("----------handleAck---------multiple----------"); confirmSet.headSet(deliveryTag+1).clear(); } else { System.out.println("----------handleAck---------multiple----------false"); confirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException {//失败调用 if(multiple) { System.out.println("----------handleNack---------multiple----------"); confirmSet.headSet(deliveryTag+1).clear(); } else { System.out.println("----------handleNack---------multiple----------false"); confirmSet.remove(deliveryTag); } } }); String message = "This is confirm queue synchronized"; while (true) { long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes("utf-8")); confirmSet.add(seqNo); } } }
消费者
package confirm; import com.rabbitmq.client.*; import utils.ConnectionUtil; import java.io.IOException; public class Recv3 { private static String QUE_NAME = "test_queue_confirm3"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUE_NAME,false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[receiveconfirm3]:" + message); } }; channel.basicConsume(QUE_NAME,true,consumer); } }