目前主流的框架都是SpringBoot,所以下来详细的阐述下RabbitMQ怎么和SpringBoot进行整合。
一、创建maven工程
首先创建maven的工程,然后创建两个springboot工程的module,具体结构如下:
在如上的目录结构中,可以看到分别创建了生产者和消费者的工程。下面详细的阐述下针对生产者以及
消费者不同的配置以及具体代码的实现过程。
二、生产者工程
2.1、生产者配置
首先需要在配置文件中配置RabbitMQ服务的地址,账户以及密码,和针对生产者的配置代码,具体如下:
spring:
rabbitmq:
addresses: 101.***.***.84:5672
username: wuya
password: java
virtual-host: /
connection-timeout: 15000
publisher-confirms: true
publisher-returns: true
#可靠性投递的机制
template:
mandatory: true
server:
port: 8081
2.2、生产者配置代码
下来我们加载具体的配置信息,它的结构为:
在RabbitConfig编写加载配置的代码,源码具体为:
package com.example.springboot.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan({"com.example.springboot.*"})
public class RabbitConfig
{
}
2.3、生产者核心代码
下来在server的包下编写发送消息的核心代码。在这里我们主要发送实体的数据,所以需要在entity
包下创建新的实体信息,比如这里创建Person,它的字段主要是name,age,sex。同时需要在实体中继承序
列化的部分,因为最终发送的消息都是需要进行序列话的。Person.java的源码具体为:
package com.example.springboot.entity;
public class Person
{
private String name;
private int age;
private String sex;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public Person()
{
}
public Person(String name,int age,String sex)
{
super();
this.name=name;
this.age=age;
this.sex=sex;
}
}
下来编写发送消息的具体代码,源码部分具体如下:
package com.example.springboot.service;
import com.example.springboot.entity.Order;
import com.example.springboot.entity.Person;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
@Component
public class RabbitSend
{
@Autowired
private RabbitTemplate rabbitTemplate;
//实现生产端的确认应答机制
final RabbitTemplate.ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback()
{
//boolean ack:ack的结果信息
//String cause:异常的结果信息
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
{
//消息ID的唯一性,使用uuid来解决
System.err.println("correlationData:"+correlationData);
System.err.println("ack:"+ack);
if(!ack)
{
System.err.println("异常情况...需要补偿机制");
}
}
};
//消息投递确认机制
final RabbitTemplate.ReturnCallback returnCallback=new RabbitTemplate.ReturnCallback()
{
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey)
{
System.err.println("retutn replyCode"+replyCode+",return replyText"+replyText);
System.err.println("return exchange:"+exchange+",return routingKey:"+routingKey);
}
};
/*
* 发送Person的实体数据
* */
public void sendPersonMsg(Person person)throws Exception
{
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("test_exchange_mq","test_mq",person,correlationData);
}
}
2.4、生产者测试代码
编写生产者后,下来编写生产者的测试代码,来验证消息是否发送出去,具体测试代码为:
package com.example.springboot;
import com.example.springboot.entity.Order;
import com.example.springboot.entity.Person;
import com.example.springboot.service.RabbitSend;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerApplicationTests
{
@Autowired
private RabbitSend rabbitSend;
@Test
public void test_person_sender() throws Exception
{
Person person=new Person("无涯",18,"男");
rabbitSend.sendPersonMsg(person);
}
}
xialai
2.5、封装成REST API
三、消费者工程