producer:
**默认超时时间**
/**
* Timeout for sending messages.
*/
private int sendMsgTimeout = 3000;
// 异步发送时 重试次数,默认 2
producer.setRetryTimesWhenSendAsyncFailed(1);
// 同步发送时 重试次数,默认 2
producer.setRetryTimesWhenSendFailed(1);
// 是否向其他broker发送请求 默认false
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
Consumer:
消费超时,单位分钟
`consumer.setConsumeTimeout()`
发送ack,消费失败
`RECONSUME_LATER`
broker投递:
只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试
重投使用`messageDelayLevel`
默认值:
messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1、生产者样例
//添加重试机制 public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("MQ2Group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); //异步发送 重试次数 系统默认是2 producer.setRetryTimesWhenSendAsyncFailed(1); //同步发送 重试次数 系统默认是2 // producer.setRetryTimesWhenSendFailed(1); producer.send(new Message("MQ2Topic","回调消息!".getBytes())); producer.setRetryAnotherBrokerWhenNotStoreOK(true); // producer.shutdown(); System.out.println("生产者下线!"); }
2、消费者样例
//接收消息 public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MQ2Group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("MQ2Topic","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt mes: list) { System.out.println("mes : "+new String(mes.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer start..."); }