@Component
public class AckListener implements ChannelAwareMessageListener {
/**
* 1、设置手动签收:acknowledge="manual"
* 2、监听器实现接口ChannelAwareMessageListener
* 3、如果消息成功,调用basicAck()签收
* 4、如果消息失败,调用basicNack()拒绝签收,broker重新发送给consumer
* @param message
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//接收消息
System.out.println(new String(message.getBody()));
//处理业务逻辑
System.out.println("处理业务逻辑。。。");
//int i=3/0;//产生错误
//手动签收
channel.basicAck(deliveryTag,true);
}catch (Exception e){
channel.basicNack(deliveryTag,true,true);
}
}
}
配置:
<!--定义监听器容器 acknowledge设置消息签收模式 prefetch设置消费端限流-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
<rabbit:listener ref="ackListener" queue-names="queue_confirm"></rabbit:listener>
</rabbit:listener-container>