教你用纯Java实现一个即时通讯系统(附源码)(下)

RocketMq的服务生产者Bean配置


package org.idea.web.socket.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.idea.web.socket.config.MqProducerConfig;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
 * @Author linhao
 * @Date created in 11:05 上午 2021/5/10
 */
@Configuration
@Slf4j
@EnableConfigurationProperties({MqProducerConfig.class})
public class MqProducerAutoConfig {
    @Resource
    private MqProducerConfig mqProducerConfig;
    @Bean
    @ConditionalOnMissingBean
    //意味着DefaultMQProducer的配置可以被覆盖
    public DefaultMQProducer defaultMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());
        producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());
        //没有则自动创建topic的key
//        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
        producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());
        producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());
        producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());
        try {
            producer.start();
            log.info("【 MqProducerAutoConfig 】mq producer is started!");
        } catch (Exception e) {
            log.error("[MqProducerAutoConfig] start fail, e is ", e);
        }
        return producer;
    }
}


然后是对RocketMq内部发送消息事件的一层函数封装


package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.idea.web.socket.config.MqProducerConfig;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/**
 * 消息广播发送端
 *
 * @Author linhao
 * @Date created in 10:43 下午 2021/5/9
 */
@Component
@Slf4j
public class BroadcastMqProducer {
    @Resource
    private DefaultMQProducer defaultMQProducer;
    @Resource
    private MqProducerConfig mqProducerConfig;
    private static String TOPIC = "ws-topic";
    private static String TAGS = "ws-tag";
    public static Integer ALL_USER_RECEIVE_TYPE = 1;
    public static Integer ONE_USER_RECEIVE_TYPE = 2;
    /**
     * 点对点之间的消息发送
     *
     * @param destSessionKey
     * @param msg
     * @return
     */
    public SendResult sendWebSocketToUser(String destSessionKey,String msg) {
        if (StringUtils.isEmpty(msg)) {
            log.error("[sendWebSocketToUser] msg can not be null!");
            return null;
        }
        Message message = null;
        SendResult sendResult = null;
        try {
            BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
            broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);
            broadcastMqDTO.setMessage(msg);
            broadcastMqDTO.setSessionKey(destSessionKey);
            message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = defaultMQProducer.send(message);
        } catch (Exception e) {
            log.error("[sendWebSocketBroadcastMsg] e is ", e);
        }
        return sendResult;
    }
    /**
     * 广播消息发送
     *
     * @param msg
     * @return
     */
    public SendResult sendWebSocketBroadcastMsg(String msg) {
        if (StringUtils.isEmpty(msg)) {
            log.error("[sendWebSocketBroadcastMsg] msg can not be null!");
            return null;
        }
        Message message = null;
        SendResult sendResult = null;
        try {
            BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
            broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);
            broadcastMqDTO.setMessage(msg);
            message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = defaultMQProducer.send(message);
        } catch (Exception e) {
            log.error("[sendWebSocketBroadcastMsg] e is ", e);
        }
        return sendResult;
    }
}


对消息的订阅模块实现代码如下:


package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import com.oracle.tools.packager.Log;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.idea.web.socket.manager.SocketManager;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
import java.util.List;
import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;
import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;
/**
 * @Author linhao
 * @Date created in 10:59 上午 2021/5/10
 */
@Component
@Slf4j
public class MessageListenerHandler implements MessageListenerConcurrently {
    @Resource
    private SocketManager socketManager;
    @Resource
    private SimpMessagingTemplate template;
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(list)) {
            Log.info("receive empty msg");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = list.get(0);
        byte[] bytes = messageExt.getBody();
        String json = new String(bytes);
        BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);
        log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);
        if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
            log.info("[consumeMessage] 广播发送消息:触发----》消息内容为:" + broadcastMqDTO);
            template.convertAndSend("/topic/sendTopic", broadcastMqDTO);
        } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
            String sessionKey = broadcastMqDTO.getSessionKey();
            WebSocketSession webSocketSession = socketManager.get(sessionKey);
            if (webSocketSession != null) {
                template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());
                log.info("[consumeMessage] 点对点发送消息;触发----》消息内容为:" + broadcastMqDTO);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}


整体设计结构如下图:


教你用纯Java实现一个即时通讯系统(附源码)(下)


于是按照这个结构进行了一版本的紧急开发迭代,原先的单台服务器扩展为了服务集群。


业务拓展后续产品经理提出一个需求,要求支持在同一间房内的两个用户之间发送悄悄话功能。这就需要我们进行一个点对点之间传输通讯的功能了。因此需要在mq通知到每台机器的时候加一个本地Session遍历的逻辑,如果当前机器存有用户token对应的session变量,那么就单独针对那个Session进行WebSocket的发送通知。


教你用纯Java实现一个即时通讯系统(附源码)(下)


设计弊端一旦某台机器出现了异常崩溃,那么就意味着这台机器上的所有语音连接可能会出现中断情况。目前这一块的问题也在考虑解决,计划是将WebSocketSession存入到分布式缓存的redis中保证数据可靠存储,但是在后续尝试的时候发现WebSocketSession对象没有实现序列化接口,在存储到Redis的时候会出现异常。目前这个问题还在寻找解决思路中,不知道各位读者朋友们有什么好的思路。


遇到的问题点用户请求直接访问到了我们的内部服务器,如果在请求的中间加入一台nginx做负载均衡则需要在nginx中配置一些额外信息。


项目的源代码比较多,这里我把核心部分的代码整理了一份,感兴趣的朋友可以到我的gitee上边去下载:


https://gitee.com/IdeaHome_admin/socket-framework


推荐好文


>>【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能


>>分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!

>>能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

上一篇:在RELEASE版本中快速定位DATA ABORT的方法


下一篇:经过验证的python发送邮件程序