场景
目前做了一个接口:邀请用户成为某课程的管理员,于是我感觉有能在用户被邀请之后能有个立马通知他本人的机(类似微博、朋友圈被点赞后就有立马能收到通知一样),于是就闲来没事搞了一套。
涉及技术栈
- Springboot
- Websocket 协议
- JWT
- RabbitMQ 消息中间件
Websocket 协议
⭐推荐阅读:Websocket 协议简介
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
为什么使用Websocket?
因为普通的http协议一个最大的问题就是:通信只能由客户端发起,服务器响应(半双工)****,而我们希望可以全双工通信。
因此一句话总结就是:建立websocket(以下简称为ws)连接是为了让服务器主动向前端发消息,而无需等待前端的发起请求调用接口。
业务逻辑
我们现在有:
用户A
用户B
-
Springboot
服务器 - 场景:
用户A调用接口邀请用户B成为课程成员
- 涉及数据库
MySQL
的数据表:-
course_member_invitation
,记录课程邀请记录,其形式如下(忽略时间等列):
-
id | course_id | account_id | admin_id | is_accepted | bind_message_id |
---|---|---|---|---|---|
邀请id | 课程id | 受邀用户id | 邀请人id(因其本身为课程管理员) | 受邀用户是否接受了邀请 | 绑定的消息id |
- (图中没有体现)
course_message
,记录消息记录,其形式如下(忽略时间等列):
id | type | account_id | source_id | is_read | is_ignored |
---|---|---|---|---|---|
消息id | 消息类型 | 收信人用户id | 发信人用户id | 是否已读 | 收信人是否忽略 |
- (图中没有体现)
course_message_type
,记录消息类型,其形式如下
id | name | description |
---|---|---|
消息类型id | 消息类型名称 | 描述 |
- 涉及
RabbitMQ
(因不是重点,所以此处暂不讨论,最后一章叙述)
业务步骤主要涉及两个方法addCourseMemberInvitation
与sendMessage
和一个组件CourseMemberInvitationListener
,分别做:addCourseMemberInvitation
:
-
用户A
调用接口,邀请用户B
成为某门课程的管理员 -
Springboot
服务器收到请求,将这一请求生成邀请记录、消息记录,写入下表:course_member_invitation
course_message
- 写入DB后,调用
sendMessage
处理发送消息的业务。 - 将执行的结果返回给
用户A
sendMessage
:
- 将消息记录放入
RabbitMQ
中对应的消息队列。
CourseMemberInvitationListener
:
- 持续监听其绑定的消息队列
- 一旦消息队列中有新消息,就尝试通过ws连接发送消息。
- 若
用户B
在线,则可发送。 - 否则,则消费掉该消息,待用户上线后从DB中读入。
- 若
在Springboot中配置Websocket
-
pom.xml
文件
<!-- WebSocket相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
-
Websocket Server
组件配置初步:com.xxxxx.course.webSocket.WebSocketServer
/**
* 进行前后端即时通信
* https://blog.csdn.net/qq_33833327/article/details/105415393
* session: https://www.codeleading.com/article/6950456772/
* @author jojo
*/
@ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求
@Component
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
*/
private static int onlineCount = 0;
/**
* concurrent 包的线程安全Set,用来存放每个客户端对应的 myWebSocket对象
* 根据 用户id 来获取对应的 WebSocketServer 示例
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 通信令牌
*/
private String token = "";
/**
* 用户id
*/
private String accountId ="";
/**
* logger
*/
private static Logger LOGGER = LoggerUtil.getLogger();
/**
* 连接建立成功调用的方法
*
* @param session
* @param token 用户令牌
*/
@OnOpen
public void onOpen(Session session, @PathParam("uid") String uid) {
this.session = session;
this.token = token;
//设置超时,同httpSession
session.setMaxIdleTimeout(3600000);
this.accountId = uid;
//存储websocket连接,存在内存中,若有同一个用户同时在线,也会存,不会覆盖原有记录
webSocketMap.put(accountId, this);
LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));
addOnlineCount(); // 在线数 +1
LOGGER.info("有新窗口开始监听:" + accountId + ",当前在线人数为" + getOnlineCount());
try {
sendMessage(JSON.toJSONString("连接成功"));
} catch (IOException e) {
e.printStackTrace();
throw new ApiException("websocket IO异常!!!!");
}
}
/**
* 关闭连接
*/
@OnClose
public void onClose() {
if (webSocketMap.get(this.token) != null) {
webSocketMap.remove(this.token);
subOnlineCount(); // 人数 -1
LOGGER.info("有一连接关闭,当前在线人数为:" + getOnlineCount());
}
}
/**
* 收到客户端消息后调用的方法
* 这段代码尚未有在使用,可以先不看,在哪天有需求时再改写启用
* @param message 客户端发送过来的消息
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) {
LOGGER.info("收到来自用户 [" + this.accountId + "] 的信息:" + message);
if (!StringTools.isNullOrEmpty(message)) {
try {
// 解析发送的报文
JSONObject jsonObject = JSON.parseObject(message);
// 追加发送人(防窜改)
jsonObject.put("fromUserId", this.accountId);
String toUserId = jsonObject.getString("toUserId");
// 传送给对应 toUserId 用户的 WebSocket
if (!StringTools.isNullOrEmpty(toUserId) && webSocketMap.containsKey(toUserId)) {
webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
} else {
// 否则不在这个服务器上,发送到 MySQL 或者 Redis
LOGGER.info("请求的userId:" + toUserId + "不在该服务器上");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void one rror(Session session, Throwable error) {
LOGGER.error("用户错误:" + this.accountId + ",原因:" + error);
}
/**
* 实现服务器主动推送
*
* @param message 消息字符串
* @throws IOException
*/
public void sendMessage(String message) throws IOException {
//需要使用同步机制,否则多并发时会因阻塞而报错
synchronized(this.session) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOGGER.error("发送给用户 ["+this.accountId +"] 的消息出现错误",e.getMessage());
throw e;
}
}
}
/**
* 点对点发送
* 指定用户id
* @param message 消息字符串
* @param userId 目标用户id
* @throws IOException
*/
public static void sendInfo(String message, String userId) throws Exception {
Iterator entrys = webSocketMap.entrySet().iterator();
while (entrys.hasNext()) {
Map.Entry entry = (Map.Entry) entrys.next();
if (entry.getKey().toString().equals(userId)) {
webSocketMap.get(entry.getKey()).sendMessage(message);
LOGGER.info("发送消息到用户id为 [" + userId + "] ,消息:" + message);
return;
}
}
//错误说明用户没有在线,不用记录log
throw new Exception("用户没有在线");
}
private static synchronized int getOnlineCount() {
return onlineCount;
}
private static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
private static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
几点说明:
- onOpen方法:服务器与前端建立ws连接成功时自动调用。
- sendInfo方法:是服务器通过用户id向指定用户发送消息的方法,其为静态公有方法,因此可供各service调用。调用的例子:
// WebSocket 通知前端
try {
//调用WebsocketServer向目标用户推送消息
WebSocketServer.sendInfo(JSON.toJSONString(courseMemberInvitation),courseMemberInvitation.getAccountId().toString());
LOGGER.info("send to "+courseMemberInvitation.getAccountId().toString());
}
-
@ServerEndpoint
注解:
@ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求
这么注解之后,前端只用发起 `ws://xxx.xxx:xxxx/ws/{uid}`即可开启ws连接(或者`wss`协议,增加TLS), 比如前端js代码这么写:
<script>
var socket;
/* 启动ws连接 */
function openSocket() {
if(typeof(WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
}else{
console.log("您的浏览器支持WebSocket");
//实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
var socketUrl="http://xxx.xxx.xxx:xxxx/ws/"+$("#uid").val();
socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //转换成ws协议
console.log("正在连接:"+socketUrl);
if(socket!=null){
socket.close();
socket=null;
}
socket = new WebSocket(socketUrl);
/* websocket 基本方法 */
//打开事件
socket.onopen = function() {
console.log(new Date()+"websocket已打开,正在连接...");
//socket.send("这是来自客户端的消息" + location.href + new Date());
};
//获得消息事件
socket.onmessage = function(msg) {
console.log(msg.data);
//发现消息进入 开始处理前端触发逻辑
};
//关闭事件
socket.onclose = function() {
console.log(new Date()+"websocket已关闭,连接失败...");
//重新请求token
};
//发生了错误事件
socket.onerror = function() {
console.log("websocket连接发生发生了错误");
}
}
}
/* 发送消息 */
function sendMessage() {
if(typeof(WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
}else {
console.log("您的浏览器支持WebSocket");
console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
}
}
</script>
存在的问题
一切看起来很顺利,我只要放个用户id进去,就可以想跟谁通讯就跟谁通讯咯!
但设想一个场景, 我是小明,uid为250,我想找uid为520的小花聊天,理论上我只要发起ws://xxx.xxx:xxxx/ws/250
请求与服务器连接,小花也发起ws://xxx.xxx:xxxx/ws/520
与服务器建立ws连接,我们就能互发消息了吧!
这时候出现了uid为1的小黄,他竟然想挖墙脚!?他竟然学过js,自己发了ws://xxx.xxx:xxxx/ws/520
跟服务器建立ws连接,而小花根本不想和我发消息,所以实际上是小黄冒充了小花,把小花NTR了(实际上人家并不在乎