本文的Spring boot版本是2.1.7.RELEASE。
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml
spring: rabbitmq: virtual-host: / host: localhost port: 5672 username: guest password: 123456 listener: simple: concurrency: 10 max-concurrency: 20 prefetch: 5 mq: env: test cart: place: order: queue: ${mq.env}.cart.place.order.queue exchange: ${mq.env}.cart.place.order.exchange routing: key: ${mq.env}.cart.place.order.routing.key
Queue.config
@Configuration public class QueueConfig { @Autowired private Environment environment; /** * cart place order apply */ /** * durable = "true" persistent Rabbitmq does not need to create a new queue when restarting */ @Bean public Queue placeOrderQueue() { return new Queue(environment.getProperty("cart.place.order.queue"), true); } /** * durable = "true" Rabbitmq does not need to create a new exchange when restarting * * 1.Direct exchange: relatively simple, the matching rule is: if the routing keys match, the message is delivered to the relevant queue * 2.Fanout exchange: There is no concept of routing keys. He sends messages to all queues bound to this exchange.。 * 3.Topic exchange: uses the principle of fuzzy matching routing keys to forward messages to the queue * * Key: The key value of the queue in the direct-exchange. When the message is sent to the specified key in the direct-exchange, the * message will be forwarded to the message queue specified by the queue parameter. */ @Bean public DirectExchange placeOrderExchange() { return new DirectExchange(environment.getProperty("cart.place.order.exchange"), true, false); } @Bean public Binding placeOrderBinding() { return BindingBuilder.bind(placeOrderQueue()).to(placeOrderExchange()).with(environment.getProperty("cart.place.order.routing.key")); } }
Rabbitmq.config
@Configuration public class RabbitmqConfig { private static final Logger logger = LogManager.getLogger(RabbitmqConfig.class); @Autowired private Environment environment; @Autowired private CachingConnectionFactory cachingConnectionFactory; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; @Autowired private ObjectMapper objectMapper; /** * singleton can't set multi times callback * @return */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { cachingConnectionFactory.setPublisherConfirms(true); cachingConnectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { logger.info("message send succeed:correlationData({}),ack({}),cause({})",correlationData, ack, cause); } else { logger.info("message send failed:correlationData({}),ack({}),cause({})",correlationData, ack, cause); } }); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { logger.info("message lose:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); }); return rabbitTemplate; } @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory singleListenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(cachingConnectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory, cachingConnectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.concurrency", int.class)); factory.setMaxConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.max-concurrency", int.class)); factory.setPrefetchCount(environment.getProperty("spring.rabbitmq.listener.simple.prefetch", int.class)); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } /** * Message persistent: Set the deliveryMode of the message to 2 and the consumer can continue to consume * the messages after persistence after restarting; * Use convertAndSend to send a message. The message is persistent by default. The following is the source code: * new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2; * * @param exchangeName * @param routingKeyName * @param content * @param flag * @param messageId * @param <T> */ public <T> void sendMessage(String exchangeName, String routingKeyName, T content, boolean flag, String messageId) { logger.info("message send :messageId({}), exchangeName({}), routingKeyName({}), content({}), flag({})", messageId, exchangeName, routingKeyName, content); CorrelationData correlationData = new CorrelationData(); try { if (flag) { MessageProperties properties = new MessageProperties(); properties.setCorrelationId(messageId); correlationData.setId(messageId); this.rabbitTemplate().convertAndSend(exchangeName, routingKeyName, MessageBuilder.withBody(objectMapper.writeValueAsBytes(content)).andProperties(properties).build(), correlationData); } else { this.rabbitTemplate().convertAndSend(exchangeName, routingKeyName, content, correlationData); } } catch (Exception e) { logger.error("error message :e.getMessage({})", e.getMessage()); } } }
生产者只需要你调取RabbitmqConfig中的sendMessage()方法,其中如果你的消费者需要correlationDataId这个参数,那么就将flag设置成true,messageId设置成correlationDataId,就可以在消费端获取了。
String messageId = UUID.randomUUID().toString(); rabbitmqConfig.sendMessage(environment.getProperty("cart.create.order.result.exchange"), environment.getProperty("cart.create.order.result.routing.key"), response, flag, messageId);
消费者
@Autowired private RabbitmqConfig rabbitmqConfig; @Autowired private ObjectMapper objectMapper; @RabbitListener(queues = "${cart.place.order.queue}", containerFactory = "multiListenerContainer") public void placeOrder(Message message, Channel channel) { try { logger.info("place order : message ({})" + objectMapper.readValue(message.getBody(), RequestInventoryItems.class)); logger.info("place order : message ({})" + message.getMessageProperties().getCorrelationId()); // logger.info("place order : message ({})" + ); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception es) { logger.error("error message :es.getMessage({})", es.getMessage()); try { //The third parameter is whether to return to the queue.Pay attention to the endless loop. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (Exception e) { logger.error("error message :e.getMessage({})", e.getMessage()); } } }
以上就是集成Rabbitmq的全部步骤了。