RocketMq的服务生产者Bean配置
package org.idea.web.socket.mq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.idea.web.socket.config.MqProducerConfig; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * @Author linhao * @Date created in 11:05 上午 2021/5/10 */ @Configuration @Slf4j @EnableConfigurationProperties({MqProducerConfig.class}) public class MqProducerAutoConfig { @Resource private MqProducerConfig mqProducerConfig; @Bean @ConditionalOnMissingBean //意味着DefaultMQProducer的配置可以被覆盖 public DefaultMQProducer defaultMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName()); producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr()); //没有则自动创建topic的key // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize()); producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout()); producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed()); try { producer.start(); log.info("【 MqProducerAutoConfig 】mq producer is started!"); } catch (Exception e) { log.error("[MqProducerAutoConfig] start fail, e is ", e); } return producer; } }
然后是对RocketMq内部发送消息事件的一层函数封装
package org.idea.web.socket.mq; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.idea.web.socket.config.MqProducerConfig; import org.idea.web.socket.dto.BroadcastMqDTO; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.UnsupportedEncodingException; /** * 消息广播发送端 * * @Author linhao * @Date created in 10:43 下午 2021/5/9 */ @Component @Slf4j public class BroadcastMqProducer { @Resource private DefaultMQProducer defaultMQProducer; @Resource private MqProducerConfig mqProducerConfig; private static String TOPIC = "ws-topic"; private static String TAGS = "ws-tag"; public static Integer ALL_USER_RECEIVE_TYPE = 1; public static Integer ONE_USER_RECEIVE_TYPE = 2; /** * 点对点之间的消息发送 * * @param destSessionKey * @param msg * @return */ public SendResult sendWebSocketToUser(String destSessionKey,String msg) { if (StringUtils.isEmpty(msg)) { log.error("[sendWebSocketToUser] msg can not be null!"); return null; } Message message = null; SendResult sendResult = null; try { BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO(); broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE); broadcastMqDTO.setMessage(msg); broadcastMqDTO.setSessionKey(destSessionKey); message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET)); sendResult = defaultMQProducer.send(message); } catch (Exception e) { log.error("[sendWebSocketBroadcastMsg] e is ", e); } return sendResult; } /** * 广播消息发送 * * @param msg * @return */ public SendResult sendWebSocketBroadcastMsg(String msg) { if (StringUtils.isEmpty(msg)) { log.error("[sendWebSocketBroadcastMsg] msg can not be null!"); return null; } Message message = null; SendResult sendResult = null; try { BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO(); broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE); broadcastMqDTO.setMessage(msg); message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET)); sendResult = defaultMQProducer.send(message); } catch (Exception e) { log.error("[sendWebSocketBroadcastMsg] e is ", e); } return sendResult; } }
对消息的订阅模块实现代码如下:
package org.idea.web.socket.mq; import com.alibaba.fastjson.JSON; import com.oracle.tools.packager.Log; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.idea.web.socket.dto.BroadcastMqDTO; import org.idea.web.socket.manager.SocketManager; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.web.socket.WebSocketSession; import javax.annotation.Resource; import java.util.List; import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE; import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE; /** * @Author linhao * @Date created in 10:59 上午 2021/5/10 */ @Component @Slf4j public class MessageListenerHandler implements MessageListenerConcurrently { @Resource private SocketManager socketManager; @Resource private SimpMessagingTemplate template; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { if (CollectionUtils.isEmpty(list)) { Log.info("receive empty msg"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); byte[] bytes = messageExt.getBody(); String json = new String(bytes); BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class); log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO); if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) { log.info("[consumeMessage] 广播发送消息:触发----》消息内容为:" + broadcastMqDTO); template.convertAndSend("/topic/sendTopic", broadcastMqDTO); } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) { String sessionKey = broadcastMqDTO.getSessionKey(); WebSocketSession webSocketSession = socketManager.get(sessionKey); if (webSocketSession != null) { template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage()); log.info("[consumeMessage] 点对点发送消息;触发----》消息内容为:" + broadcastMqDTO); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
整体设计结构如下图:
于是按照这个结构进行了一版本的紧急开发迭代,原先的单台服务器扩展为了服务集群。
业务拓展后续产品经理提出一个需求,要求支持在同一间房内的两个用户之间发送悄悄话功能。这就需要我们进行一个点对点之间传输通讯的功能了。因此需要在mq通知到每台机器的时候加一个本地Session遍历的逻辑,如果当前机器存有用户token对应的session变量,那么就单独针对那个Session进行WebSocket的发送通知。
设计弊端一旦某台机器出现了异常崩溃,那么就意味着这台机器上的所有语音连接可能会出现中断情况。目前这一块的问题也在考虑解决,计划是将WebSocketSession存入到分布式缓存的redis中保证数据可靠存储,但是在后续尝试的时候发现WebSocketSession对象没有实现序列化接口,在存储到Redis的时候会出现异常。目前这个问题还在寻找解决思路中,不知道各位读者朋友们有什么好的思路。
遇到的问题点用户请求直接访问到了我们的内部服务器,如果在请求的中间加入一台nginx做负载均衡则需要在nginx中配置一些额外信息。
项目的源代码比较多,这里我把核心部分的代码整理了一份,感兴趣的朋友可以到我的gitee上边去下载: