3.RabbitMq点对点(Direct)模式通信
点对点模式采用的直连型交换机,根据消息携带的路由键投递给对应队列。
这里采用的Spring Boot配置类创建,先创建一个DirectRabbitConfi配置类,配置类需要在类名上加入@Configuration注解,每个函数名上加入@bean注入对象实例化。
/**
* @author zzj
* @date 2021-12-10 19:32:02
* 创建一个生产者队列
*/
@Configuration
public class DirectRabbitConfig {
}
下面我们创建一个消息队列,每个
/**
* 消息队列
*/
@Bean
public Queue producerDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("producerDirectQueue",true);
}
然后创建直连交换机
/**
* DirectExchange
* 直连交换机
* @return
*/
@Bean
DirectExchange producerDirectExchange() {
return new DirectExchange("producerDirectExchange",true,false);
}
最后将我们创建的队列和直连交换机通过路由键绑定
/**
* //绑定 将队列和交换机绑定, 路由键为producer
* @return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(producerDirectQueue()).to(producerDirectExchange()).with("producer");
}
通过上述步骤我们就创建好了点对点通信模型。
下面我们通过发送消息检验一下模型,发送消息我们采用控制器实现,方便整合前端发送请求,首先创建一个控制类,在其注入RabbitTemplate对象,利用该对象向交换机发送消息。调用http://ip:8080/sendDirectMessage/yourMessage接口即可发送消息。
/**
* @author zzj
* @date 2021-12-10 19:38:14
*/
@RestController
public class SendMessageController {
/**
* 使用RabbitTemplate,这提供了接收/发送等等方法
* 注入对象
*/
@Resource
RabbitTemplate rabbitTemplate;
/**
* 直连发送消息
* @param message 消息内容
* @return 是否成功
*/
@GetMapping("/sendDirectMessage")
public String sendDirectMessage(@RequestParam("message")String message) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
String curTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("message",message);
map.put("curTime",curTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
//此处巨坑 要传入前段必须为字符串 其它类型直接断开连接。。。。。。。。。。。
rabbitTemplate.convertAndSend("producerDirectExchange", "producer", mapper.writeValueAsString(map));
return "1";
}
}
我们发送"Hello Word"
消息出现在我们创建的ProducerDirectQueue中。
此处完成了生产者生成一条消息,下面实现消费者来提取这条消息。提取消息我们采用的前端JS提取消息,因为方便在前端显示,如果要java代码通过Spring Boot注解的方式可以很快提取。下面我们采用前端来提取。
前端提取需要采用前端的协议端口为15674,需要在服务器将RabbitMQ的插件启动,同时打开服务器的防火墙放行此端口号。
启动js连接插件
rabbitmq-plugins enable rabbitmq_stomp
rabbitmq-plugins enable rabbitmq_web_stomp_examples
首先前端其他需要加入的JS库有以下,需要可以自己在网上查找,也可以通过cdn方式引入
<!-- stomp协议的客户端脚本 -->
<script src="js/stomp.js"></script>
<!-- SockJS的客户端脚本 -->
<script src="js/sockjs.js"></script>
之后的大体步骤如下
script type="text/javascript">
//创建连接
var ws = new SockJS('http://x.x.x.x:15674/stomp');
// 获得Stomp client对象
var client = Stomp.over(ws);
// 传入传出频率 为0 不断开
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
// 定义连接成功回调函数
var on_connect = function (x) {
//data.body是接收到的数据
client.subscribe("/amq/queue/yourQueueName", function (data) {
var msg = data.body;
console.log(msg)
});
};
// 定义错误时回调函数
var on_error = function () {
console.log('error');
};
// 连接RabbitMQ
client.connect('guest', 'guest', on_connect, on_error, '/');
console.log(">>>连接");
</script>
下面展示一下前端发送与接收效果