最近部门有个需求,需要实现一个消息中心,简而言之,就是给各个系统提供与客户交互的桥梁,自然而然需要选择websocket协议,由于我们是使用的spring cloud这一套,因此以springboot为例来进行说明。
一、方案
A、整体方案
先说一下简单的场景,各系统通过Rabbitmq将要发送给客户端的消息推送到消息中心,消息中心再基于ws连接,将消息推送给客户端,实现交互。但是问题来了,生产上有多个节点(至少两台服务器吧),但是客户端只跟其中一台服务器建立ws连接,所以这个session如何维护呢?比如客户端A与服务器1建立连接,此时要推送的消息到了服务器2上,他没有与客户端A的连接,这个消息就无法推送,因此设计方案如下:
a、客户端与消息中心建立ws连接,各节点维护各自的连接,例如使用ConcurrentHashMap
b、各应用将要推送给客户端的消息发送到rabbitmq,rabbitmq通过广播的方式将消息发送到消息中心的各节点
c、消息中心通过userId判断连接是否在本机维护,如果不在,直接忽略,如果session在本机维护,则推送消息到客户端
下面有个重要的问题就是rabbitmq的广播机制如何实现,这时候一百度,都是说不同服务订阅不同的队列就实现广播了。我想问一句,我是一个应用,生产上部署多个节点,代码配置都是同一套,你家上生产就部署一个节点啊???
B、Rabbimq广播机制实现(同应用不同节点)
Rabbitmq的topic模式跟其他mq都不大一样,他是指定exchange到queue的模式,如下图:
所以我选择在系统启动时,基于雪花算法生成随机id,作为队列名,并且队列非持久化,项目一重启,之前的队列就消失。不过这里有个问题,就是应用关闭时,若此时mq中有消息未消费,就全丢失了,不过我们的场景可接受这种情况,因此选用这种方式。
package com.yunzhangfang.platform.message.gateway.service.mq.consumer; import com.alibaba.fastjson.JSON; import com.rabbitmq.client.*; import com.yunzhangfang.platform.message.gateway.client.dto.message.MessageContentDTO; import com.yunzhangfang.platform.message.gateway.client.dto.message.MessageDTO; import com.yunzhangfang.platform.message.gateway.client.dto.user.UserDTO; import com.yunzhangfang.platform.message.gateway.service.dto.MessageSaveDTO; import com.yunzhangfang.platform.message.gateway.service.infrastructure.constant.MqConstant; import com.yunzhangfang.platform.message.gateway.service.infrastructure.dataobject.MessageUser; import com.yunzhangfang.platform.message.gateway.service.service.MessageService; import com.yunzhangfang.platform.message.gateway.service.session.SessionManager; import com.yzf.accounting.common.exception.BizRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; /** * 功能:接收来自于各应用的消息,并且将消息推送到目标用户 * @author lenovo */ @Slf4j @Component public class MessageCreatedConsumer { @Autowired private ConnectionFactory connectionFactory; /** * 接收各应用发送的消息,并将消息保存进入mongodb * @throws IOException */ @PostConstruct public void handleMessage() throws IOException { // id是使用雪花算法随机生成的 String queue = "FLOW_QUEUE_" + id; String exchange = "FANOUT_FLOW_EXCHANGE"; Connection conn = connectionFactory.createConnection(); Channel channel = conn.createChannel(false); // 1、创建一个队列,id是使用雪花算法随机生成的,并且非持久化的,自动删除的 channel.queueDeclare(queue, false, false, true, null); // 2、创建交换器 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT); // 3、将队列和交换器通过路由键进行绑定。fanout模式路由键直接设置为""。 channel.queueBind(queue, exchange, ""); channel.basicConsume(queue, new DefaultConsumer(channel) { // 4、当消息到达时执行回调方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String msg; try { msg = new String(body, "UTF-8"); log.info("消息中心接收到消息,内容为:{}", msg); } catch (Exception e) { log.error("消息中心接收消息出现异常", e); } } }); } }
这地方有个坑,因为队列是使用雪花算法生成的id拼装的,所以没办法使用@RabbitListener这个注解,只能通过channel的方式去实现消息消费,只是写起来麻烦一点,本质是一样的。
二、springboot集成websocket
springboot集成websokcet主要有两种方式,分别如下:
A、直接基于websocket协议实现
1、配置类
package com.chitic.supplywater.common.config.webSocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * 设置webSocket终端服务 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
2、处理类
package com.chitic.supplywater.common.config.webSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; @ServerEndpoint("/webSocket/{sid}") @Component public class WebSocketServer { /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session,@PathParam("sid") String sid) { // 维护会话等 } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { // 销毁会话等 } /** * 收到客户端消息后调用的方法 */ @OnMessage public void onMessage(String message, Session session) { // 收到客户端发送的消息 } /** * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { // 发生错误时 } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群发自定义消息 * */ public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException { // 从会话管理器中获取会话,进行群发 } }
B、基于stomp协议
1、配置类
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config; import com.yunzhangfang.platform.message.gateway.service.infrastructure.constant.WebSocketConstant; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.*; /** * 开启使用STOMP协议来传输基于代理(MessageBroker)的消息,这时候控制器(controller)开始支持@MessageMapping,就像是使用@requestMapping一样。 * @author lenovo */ @EnableWebSocketMessageBroker @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Autowired private WebSocketDecoratorFactory webSocketDecoratorFactory; @Autowired private WebSockHandshakeHandler webSockHandshakeHandler; @Override public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) { // 创建一个serverPoint与前端交互 stompEndpointRegistry.addEndpoint(WebSocketConstant.WEBSOCKET_SERVER_PATH) // 防止跨域问题 .setAllowedOrigins("*") // 握手时handler .setHandshakeHandler(webSockHandshakeHandler) // 指定使用SockJS协议 .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //全局使用的消息前缀(客户端订阅路径上会体现出来) registry.setApplicationDestinationPrefixes("/app"); //用户订阅主题的前缀,/topic 代表发布广播,即群发 ,/queue 代表点对点,即发指定用户 registry.enableSimpleBroker("/topic", "/queue"); //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/ registry.setUserDestinationPrefix("/user"); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration.addDecoratorFactory(webSocketDecoratorFactory); } }
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.context.annotation.Configuration; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.DefaultHandshakeHandler; import javax.servlet.http.HttpServletRequest; import java.security.Principal; import java.util.Map;
@Configuration @Slf4j public class WebSockHandshakeHandler extends DefaultHandshakeHandler { /** * 此类在客户端与服务端握手的时候触发。
* tips:将请求中的参数userId塞到Principal中,可以理解成塞到websocket的session中,后续可通过Principal principal = session.getPrincipal()获取到 */ @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request; HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest(); // 握手取userid final String userId = httpRequest.getParameter("userId"); log.info("客户端入参userId:{}", userId); if (StringUtils.isEmpty(userId)) { return null; } return () -> userId; } return null; } }
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config; import com.yunzhangfang.platform.message.gateway.service.session.SessionManager; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.WebSocketHandlerDecorator; import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory; import java.security.Principal; /** * 服务端和客户端在进行握手挥手时会被执行。进行session的维护 */ @Component @Slf4j public class WebSocketDecoratorFactory implements WebSocketHandlerDecoratorFactory { @Autowired private SessionManager sessionManager; @Override public WebSocketHandler decorate(WebSocketHandler handler) { return new WebSocketHandlerDecorator(handler) { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { log.info("有客户端连接,sessionId:{}", session.getId()); // principal是自定义塞入到session中的数据(例如我们塞的是userId,后续通过userId可以找到其session) Principal principal = session.getPrincipal(); if (principal != null && StringUtils.isNotBlank(principal.getName())) { // principal.getName获取的就是WebSockHandshakeHandler中塞入的userId if(!sessionManager.isConnected(principal.getName())) { // 身份校验成功,缓存socket连接 sessionManager.add(principal.getName(), session); log.info("客户端userId:{}存入redis", principal.getName()); } } super.afterConnectionEstablished(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { log.info("客户端退出连接,sessionId:{}", session.getId()); Principal principal = session.getPrincipal(); if (principal != null) { // 身份校验成功,移除socket连接 sessionManager.remove(principal.getName()); log.info("客户端userId:{}从redis中删除", principal.getName()); } super.afterConnectionClosed(session, closeStatus); } }; } }
通过使用WebSocketDecoratorFactory可在连接建立完成或连接关闭时触发,将userId和session存入本地Map。
2、提供客户端请求controller
@Controller public class MessageCenterController { @Autowired private MessageService messageService; /** * 客户端进入页面或刷新页面调用。获取来源列表、每个来源未读消息数以及每个来源消息列表信息 */ @MessageMapping("/init/query") public void initQuery(String userId) { messageService.initQuery(userId); } }
这里也可以使用http去实现,不过既然已经建立了websocket协议,就直接使用ws协议操作更为合适,无需建立另外的连接。
注:客户端请求"/init/query"地址,服务端是没有办法直接返回响应的,必须客户端订阅地址才能拿到服务端返回的信息。
/** * 根据用户id查询消息组合信息(应用列表、消息未读数以及消息列表) * @param userId * @return */ @Override public void initQuery(String userId) { // 拼装result simpMessagingTemplate.convertAndSendToUser(userId, "/init/query", result); }
因为我在握手的时候将userId存放到websocket连接的session信息中了,因此通过Springboot提供的SimpMessagingTemplate.convertAndSendToUser(userId...)就能将消息发送到对应客户端。
三、客户端代码(js)
var stompClient = null; //加载完浏览器后 调用connect(),打开双通道 $(function(){ //打开双通道 connect() }) //打开双通道 function connect(){ var socket = new SockJS(‘http://localhost:8281/message/center?userId=654366374251302912‘); //连接SockJS的endpoint名称为"endpointAric" stompClient = Stomp.over(socket);//使用STMOP子协议的WebSocket客户端 stompClient.connect({},function(frame){//连接WebSocket服务端 stompQueue(); }); } //列队(一对一) function stompQueue(){ //通过stompClient.subscribe订阅/user/queue/init/query stompClient.subscribe(‘/user/queue/init/query‘,function(response){ var message=JSON.stringify(response.body); //alert(message); }); stompClient.send("/app/init/query",{},‘654366374251302912‘); } //强制关闭浏览器 调用websocket.close(),进行正常关闭 window.onunload = function() { disconnect() } //关闭双通道 function disconnect(){ if(stompClient != null) { stompClient.disconnect(); } console.log("Disconnected"); }