Springboot+Websocket+JWT实现的即时通讯模块

场景

目前做了一个接口:邀请用户成为某课程的管理员,于是我感觉有能在用户被邀请之后能有个立马通知他本人的机(类似微博、朋友圈被点赞后就有立马能收到通知一样),于是就闲来没事搞了一套。

涉及技术栈

  • Springboot
  • Websocket 协议
  • JWT
  • RabbitMQ 消息中间件

Websocket 协议

⭐推荐阅读:Websocket 协议简介

WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
Springboot+Websocket+JWT实现的即时通讯模块

为什么使用Websocket?
因为普通的http协议一个最大的问题就是:通信只能由客户端发起,服务器响应(半双工)****,而我们希望可以全双工通信。

因此一句话总结就是:建立websocket(以下简称为ws)连接是为了让服务器主动向前端发消息,而无需等待前端的发起请求调用接口。

业务逻辑

我们现在有:

  • 用户A
  • 用户B
  • Springboot服务器
  • 场景:用户A调用接口邀请用户B成为课程成员
  • 涉及数据库MySQL的数据表:
    • course_member_invitation,记录课程邀请记录,其形式如下(忽略时间等列):
id course_id account_id admin_id is_accepted bind_message_id
邀请id 课程id 受邀用户id 邀请人id(因其本身为课程管理员) 受邀用户是否接受了邀请 绑定的消息id
  • (图中没有体现)course_message,记录消息记录,其形式如下(忽略时间等列):
id type account_id source_id is_read is_ignored
消息id 消息类型 收信人用户id 发信人用户id 是否已读 收信人是否忽略
  • (图中没有体现)course_message_type,记录消息类型,其形式如下
id name description
消息类型id 消息类型名称 描述
  • 涉及RabbitMQ(因不是重点,所以此处暂不讨论,最后一章叙述)

Springboot+Websocket+JWT实现的即时通讯模块
业务步骤主要涉及两个方法addCourseMemberInvitationsendMessage和一个组件CourseMemberInvitationListener,分别做:
addCourseMemberInvitation:

  1. 用户A调用接口,邀请用户B成为某门课程的管理员
  2. Springboot服务器收到请求,将这一请求生成邀请记录、消息记录,写入下表:
    • course_member_invitation
    • course_message
  3. 写入DB后,调用sendMessage处理发送消息的业务。
  4. 将执行的结果返回给用户A

sendMessage

  1. 将消息记录放入RabbitMQ中对应的消息队列。

CourseMemberInvitationListener:

  1. 持续监听其绑定的消息队列
  2. 一旦消息队列中有新消息,就尝试通过ws连接发送消息。
    1. 用户B在线,则可发送。
    2. 否则,则消费掉该消息,待用户上线后从DB中读入。

在Springboot中配置Websocket

  • pom.xml文件
<!-- WebSocket相关 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
  • Websocket Server组件配置初步:com.xxxxx.course.webSocket.WebSocketServer
/**
 * 进行前后端即时通信
 * https://blog.csdn.net/qq_33833327/article/details/105415393
 * session: https://www.codeleading.com/article/6950456772/
 * @author jojo
 */
@ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求
@Component
public class WebSocketServer {
    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
     */
    private static int onlineCount = 0;

    /**
     * concurrent 包的线程安全Set,用来存放每个客户端对应的 myWebSocket对象
     * 根据 用户id 来获取对应的 WebSocketServer 示例
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    /**
     * 通信令牌
     */
    private String token = "";

    /**
     * 用户id
     */
    private String accountId ="";

    /**
     * logger
     */
    private static Logger LOGGER = LoggerUtil.getLogger();


    /**
     * 连接建立成功调用的方法
     *
     * @param session
     * @param token 用户令牌
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("uid") String uid) {

        this.session = session;
        this.token = token;

        //设置超时,同httpSession
        session.setMaxIdleTimeout(3600000);

        this.accountId = uid;

        //存储websocket连接,存在内存中,若有同一个用户同时在线,也会存,不会覆盖原有记录
        webSocketMap.put(accountId, this);
        LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));

        addOnlineCount(); // 在线数 +1
        LOGGER.info("有新窗口开始监听:" + accountId + ",当前在线人数为" + getOnlineCount());

        try {
            sendMessage(JSON.toJSONString("连接成功"));
        } catch (IOException e) {
            e.printStackTrace();
            throw new ApiException("websocket IO异常!!!!");
        }

    }

    /**
     * 关闭连接
     */

    @OnClose
    public void onClose() {
        if (webSocketMap.get(this.token) != null) {
            webSocketMap.remove(this.token);
            subOnlineCount(); // 人数 -1
            LOGGER.info("有一连接关闭,当前在线人数为:" + getOnlineCount());
        }
    }

    /**
     * 收到客户端消息后调用的方法
     * 这段代码尚未有在使用,可以先不看,在哪天有需求时再改写启用
    * @param message 客户端发送过来的消息
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        LOGGER.info("收到来自用户 [" + this.accountId + "] 的信息:" + message);

        if (!StringTools.isNullOrEmpty(message)) {
            try {
                // 解析发送的报文
                JSONObject jsonObject = JSON.parseObject(message);
                // 追加发送人(防窜改)
                jsonObject.put("fromUserId", this.accountId);
                String toUserId = jsonObject.getString("toUserId");
                // 传送给对应 toUserId 用户的 WebSocket
                if (!StringTools.isNullOrEmpty(toUserId) && webSocketMap.containsKey(toUserId)) {
                    webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
                } else {
                    // 否则不在这个服务器上,发送到 MySQL 或者 Redis
                    LOGGER.info("请求的userId:" + toUserId + "不在该服务器上");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void one rror(Session session, Throwable error) {
        LOGGER.error("用户错误:" + this.accountId + ",原因:" + error);
    }

    /**
     * 实现服务器主动推送
     *
     * @param message 消息字符串
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        //需要使用同步机制,否则多并发时会因阻塞而报错
        synchronized(this.session) {
            try {
                this.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                LOGGER.error("发送给用户 ["+this.accountId +"] 的消息出现错误",e.getMessage());
                throw e;
            }
        }
    }


    /**
     * 点对点发送
     * 指定用户id
     * @param message 消息字符串
     * @param userId 目标用户id
     * @throws IOException
     */
    public static void sendInfo(String message, String userId) throws Exception {

        Iterator entrys = webSocketMap.entrySet().iterator();
        while (entrys.hasNext()) {
            Map.Entry entry = (Map.Entry) entrys.next();
            if (entry.getKey().toString().equals(userId)) {
                webSocketMap.get(entry.getKey()).sendMessage(message);
                LOGGER.info("发送消息到用户id为 [" + userId + "] ,消息:" + message);
                return;
            }
        }
        //错误说明用户没有在线,不用记录log
        throw new Exception("用户没有在线");
    }


    private static synchronized int getOnlineCount() {
        return onlineCount;
    }

    private static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    private static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

几点说明:

  • onOpen方法:服务器与前端建立ws连接成功时自动调用。
  • sendInfo方法:是服务器通过用户id向指定用户发送消息的方法,其为静态公有方法,因此可供各service调用。调用的例子:
// WebSocket 通知前端
try {
    //调用WebsocketServer向目标用户推送消息
    WebSocketServer.sendInfo(JSON.toJSONString(courseMemberInvitation),courseMemberInvitation.getAccountId().toString());
    LOGGER.info("send to "+courseMemberInvitation.getAccountId().toString());
} 
  • @ServerEndpoint注解:
@ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求
这么注解之后,前端只用发起  `ws://xxx.xxx:xxxx/ws/{uid}`即可开启ws连接(或者`wss`协议,增加TLS), 比如前端js代码这么写:
<script>
    var socket;

	/* 启动ws连接 */
    function openSocket() {
        if(typeof(WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        }else{
            console.log("您的浏览器支持WebSocket");
            
            //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
            var socketUrl="http://xxx.xxx.xxx:xxxx/ws/"+$("#uid").val();
            socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //转换成ws协议
            console.log("正在连接:"+socketUrl);
            if(socket!=null){
                socket.close();
                socket=null;
            }
            socket = new WebSocket(socketUrl);
            
            /* websocket 基本方法 */
            
            //打开事件
            socket.onopen = function() {
                console.log(new Date()+"websocket已打开,正在连接...");
                //socket.send("这是来自客户端的消息" + location.href + new Date());
            };
            
            //获得消息事件
            socket.onmessage = function(msg) {
                console.log(msg.data);
                //发现消息进入    开始处理前端触发逻辑
            };
            
            //关闭事件
            socket.onclose = function() {
                console.log(new Date()+"websocket已关闭,连接失败...");
                //重新请求token
            };
            
            //发生了错误事件
            socket.onerror = function() {
                console.log("websocket连接发生发生了错误");
            }
        }
    
    }

	/* 发送消息 */
    function sendMessage() {
        if(typeof(WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        }else {
            console.log("您的浏览器支持WebSocket");
            console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
            socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
        }
    }
</script>

存在的问题

一切看起来很顺利,我只要放个用户id进去,就可以想跟谁通讯就跟谁通讯咯!
但设想一个场景, 我是小明,uid为250,我想找uid为520的小花聊天,理论上我只要发起ws://xxx.xxx:xxxx/ws/250请求与服务器连接,小花也发起ws://xxx.xxx:xxxx/ws/520与服务器建立ws连接,我们就能互发消息了吧!
这时候出现了uid为1的小黄,他竟然想挖墙脚!?他竟然学过js,自己发了ws://xxx.xxx:xxxx/ws/520跟服务器建立ws连接,而小花根本不想和我发消息,所以实际上是小黄冒充了小花,把小花NTR了(实际上人家并不在乎

上一篇:java解析.net生成的token(JWT)


下一篇:JWT令牌生成与校验