1.简介
由于遇到异步的接口调用,异步任务处理结果会写在rabbitmq中,部署方式为了实现高可用会使用开启多个微服务实例。无论哪个微服务消费了mq,都能把消息推送到所有的微服务的前端。
2.配置
2.1pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
2.2application.yml
spring:
rabbitmq:
host: 10.1.1.6
port: 5678
username: lys
password: lys
# publisher-confirms: true
publisher-returns: true
#以下配置消费者方需要
listener:
simple:
acknowledge-mode: manual
template:
mandatory: true
redis:
host: 10.6.6.6
port: 32252
timeout: 5001
password: lys
3.mq消息
3.1TopicRabbitConfig
package com.lys.config.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Auther: liuysh
* @Date: 2021/5/24 15:51
* @Description:
*/
@Configuration
public class TopicRabbitConfig {
/**
* 给topic队列起名
*/
public static final String TOPIC_QUEUE = "myTopic";
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout_exchange",true,false);
}
@Bean
public Queue myQueue(){
return new Queue(TopicRabbitConfig.TOPIC_QUEUE ,true,false,false);
}
@Bean
public Binding bind(){
return BindingBuilder.bind(myQueue()).to(fanoutExchange());
}
}
3.2 MessageListenerConfig
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Auther: liuysh
* @Date: 2021/5/24 16:39
* @Description:
*/
@Configuration
public class MessageListenerConfig {
private final CachingConnectionFactory connectionFactory;
@Autowired
public MessageListenerConfig(
CachingConnectionFactory connectionFactory
) {
this.connectionFactory = connectionFactory;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
}
3.3 FanoutListenerFirst
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @Auther: liuysh
* @Date: 2021/5/24 16:03
* @Description:
*/
@Slf4j
@Component
public class FanoutListenerFirst implements ChannelAwareMessageListener {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
@RabbitListener(queues = TopicRabbitConfig.TOPIC_CDB_RDS_CONSOLE_QUEUE)
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String json=new String(message.getBody());
stringRedisTemplate.convertAndSend("myRedisTopic",json);
//为true表示确认之前的所有消息 false表示只来处理着当前的消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("topic模式 监听者 one 处理消息时显示异常,异常是:{},现拒绝消费当前消息且不再放回队列",e);
//为true会重新放回队列
channel.basicReject(deliveryTag, false);
// 为了防止存在有异常的message堆积,故异常的也进行消费
// channel.basicAck(deliveryTag, false);
}
}
}
4.redis 订阅消费通信
4.1RedisCacheConfig
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @Auther: liuysh
* @Date: 2021/8/2 15:29
* @Description:
*/
@Configuration
@EnableCaching
public class RedisCacheConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener,配置不同的交换机
container.addMessageListener(listenerAdapter, new PatternTopic("myRedisTopic"));
return container;
}
/**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
4.2 RedisReceiver
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
/**
* @Auther: liuysh
* @Date: 2021/8/2 15:29
* @Description:
*/
@Component
@Slf4j
public class RedisReceiver {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
public void receiveMessage(String message) {
log.info("redis 订阅者收到的消息:"+message);
JSONObject jSONObject=JSON.parseObject(message);
simpMessagingTemplate.convertAndSend(WsConstant.WS_TOPIC, message);
}
}
5.WebSocketConfig
package com.sugon.cloud.config;
import com.sugon.cloud.common.bean.RdsWsConstant;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* @Auther: liuysh
* @Date: 2021/7/10 13:37
* @Description:
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//注册一个Stomp 协议的endpoint,并指定 SockJS协议
registry.addEndpoint("/ws_endpoint")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//广播式应配置一个/topic 消息代理
registry.enableSimpleBroker("/topic/instanceStatus");
}
}
6.前端代码
<!DOCTYPE html>
<html lang="zh-CN" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8" />
<title>Spring Boot+WebSocket+广播式</title>
</head>
<body onl oad="disconnect()">
<noscript><h2 style="color: #ff0000">貌似你的浏览器不支持websocket</h2></noscript>
<div>
<div>
<label>输入实例instanceId</label><input type="text" id="name" value="1" />
<button id="connect" onclick="connect();">连接</button>
<button id="disconnect" disabled="disabled" onclick="disconnect();">断开连接</button>
</div>
接收的数据:
<div id="conversationDiv">
<p id="response"></p>
</div>
</div>
<script th:src="@{sockjs.min.js}"></script>
<script th:src="@{stomp.min.js}"></script>
<script th:src="@{jquery.js}"></script>
<script type="text/javascript">
var stompClient = null;
var instanceId=null;
function setConnected(connected) {
document.getElementById('connect').disabled = connected;
document.getElementById('disconnect').disabled = !connected;
document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';
$('#response').html();
}
function connect() {
var instanceId= $('#name').val();
var socket = new SockJS('/ws_endpoint'); //链接SockJS 的endpoint 名称为"/ws_endpoint"
stompClient = Stomp.over(socket);//使用stomp子协议的WebSocket 客户端
stompClient.connect({}, function(frame) {//链接Web Socket的服务端。
setConnected(true);
console.log('Connected: ' + frame);
var topic=instanceId.length>0?'/topic/instanceStatus/'+instanceId:'/topic/instanceStatus';
stompClient.subscribe(topic, function(respnose){
//订阅/topic/getResponse 目标发送的消息。这个是在控制器的@SendTo中定义的。
showResponse(Date()+"::"+instanceId+":::"+respnose.body);
});
});
}
function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
setConnected(false);
console.log("Disconnected");
}
function showResponse(message) {
$('#response').append("<b>Received: " + message + "</b><br/>")
}
</script>
</body>
</html>