问题
主框架(RootApp) WebSocket 断连后重连成功,但是 SubApp(子应用) 重新订阅 WebSocket 失败。
原因
框架重连成功后会重新订阅 topic,是正常的。但是 SubApp 或者组件生命周期内的 $socket
实例并未更新,其实例的 _connected
属性 false
导致重新订阅 topic失败。
此处存在的问题:
- 不应该直接暴露
this.$socket
实例,因为该实例本身存在内部 dirty state - WebSocket 的连接与重连不应该由调用方判断当前状态,增加了调用方的复杂度
- 连接异常、连接断开、连接失败,调用方并不知情
- 重连成功后应该重置
this.$socket
实例的引用
重新设计
目标
- 调用方不需要处理 socket 内部的状态,直接连接
- 连接失败或者断开连接后,调用方将收到错误消息
- 首次或重新连接成功后,调用方的订阅将被重新发起
思路
使用发布订阅(pub-sub)观察者模式重新设计接口。WebSocket 的连接分成几个事件下发给调用方,这样调用发只需要注册各种 listener 函数,达到跟 socket 本身状态完全解耦的目的:
这里存在的问题是:
register(‘connected‘, listenerFn, failedFn)
可能发生在 $socket 连接成功前 ,也可能发生在 连接成功后 。
所以在统一把 listenerFn
放进 queue
里面,如果 $socket.isConnected
为 true
则直接执行 listenerFn,否则给回 pending 状态,待 socket
连接成功后再执行 listenerFn.
API
- pub-sub & callback style [推荐]
无论是首次连接成功,还是重连成功,onMessage callback function 都会被执行。
无论是首次连接失败,还是重连失败,onError callback function 都会被执行 。
PS:
- onMessage 函数在收到服务端推送至客户端的数据时会被执行,若没有数据返回,则不会执行。
- onMessage 在重连后,会立即被执行。这样做的目的是 立即清除客户端 socket 的异常状态 。
const SUBSCRIBE_API = ‘/broadcast/message/user/${userId}‘
this.$socket.subscribe(SUBSCRIBE_API, function onMessage(data) {
console.log(data)
})
}, function onError(error) {
console.log(error)
})
// 关闭当前订阅
this.$socket.unsubscribe(SUBSCRIBE_API)
- promise style [不推荐]
只有首次连接成功,resolve function 才会被执行。
只有首次连接失败,rejection function 才会被执行 。
PS:
Promise 只会在 subscribe 执行的时候触发一次 resolve 或者 reject。因此该方式 __只能用于在订阅成功后立刻收到消息便关闭 __当前 topic 的这种连接。
export default {
created() {
const SUBSCRIBE_API = ‘/broadcast/message/user/${userId}‘
this.$socket.subscribe(SUBSCRIBE_API).then((data) => {
// connect success
console.log(data)
}).catch(error => {
// connect failed
console.error(error)
})
},
beforeDestroy(){
// 关闭当前订阅
this.$socket.unsubscribe(SUBSCRIBE_API)
}
}
DEMO
订阅 topic
this.$socket.subscribe(
this.SUBSCRIBE_API,
function handleSignatureMessage(data) {
if (this.socketErrorMsg) {
this.socketErrorMsg = null
}
console.log(data)
},
function handleSignatureError(error) {
this.socketErrorMsg =
error instanceof Error ? error.message : ‘‘ + error
}
)
关闭订阅
// 清空错误
this.socketErrorMsg = null
// 关闭订阅
this.$socket.unsubscribe(this.$SUBSCRIBE_API)
最终代码
ChannelSubscribe 只实现了 Socket 订阅,使用静态属性 _map
存放所有的 ChannelSubscribe 实例,实现前文提到的 listeners queue。通过静态方法 ChannelSubscribe.Run
重启当前 ChannelSubscribe._map
中的所有 _subscribe
方法。而 _subscribe 和 _unsubscribe 方法需要在 SocketChannel 中被具体实现,所以 ChannelSubscribe 的逻辑可以用到其他的 Channel 实现。比如说 XHRLoopChannel 长轮询通道订阅中,当轮询中断后重启所有的 ChannelSubscribe
实例,实现 ChannelSubscribe 跟 Channel 内部的完全解藕。
- ChannelSubscribe.js:
/**
* 将订阅抽离成单独的对象,内聚其本身的属性和行为
*/
export default class ChannelSubscribe {
/* 用于保存当前所有已存在的 ChannelSubscribe 实例 */
/** @type {Map<string, ChannelSubscribe>} ChannelSubscribeMap */
static _map = new Map()
/**
* 运行所有的 ChannelSubscribe 实例
* @param {import(‘stompjs‘).Client} client 最新的 Client 对象
*/
static Run(client) {
if (ChannelSubscribe._map.size) {
/** @type {Iterable<ChannelSubscribe>} */
const subscribes = ChannelSubscribe._map.values()
for (const subscribe of subscribes) {
subscribe.run(client)
}
}
}
/**
* 实例化一个通道订阅对象 ChannelSubscribe
* @param {string} api
* @param {(data: any) => void} listenerFn
* @param {(error: Error) => void} [failedFn]
*/
constructor(api) {
/* 如果此前已存在该订阅,先直接关闭然后重新订阅 */
this._unsubscribe(api)
/* 具体实现和赋值在 this._subscribe 方法内部 */
/** @type {import(‘stompjs‘).Subscription} */
this._subscription = null
/**
* run() => 执行实例 _subscribe 方法
*
* @param {import(‘stompjs‘).Client} client
* @returns {Promise<any>}
*/
this.run = (client) => {
ChannelSubscribe._map.set(api, this)
return this._subscribe(client)
}
}
/**
* interface API 仅定义接口,不做具体实现
* 具体实现代码在 SocketChannel.subscribe 方法内部
*/
_subscribe(api) {
throw new TypeError(
`Must implements ‘ChannelSubscribe._subscribe(${api})‘ interface.`
)
}
/**
* interface API 仅定义接口,不做具体实现
* 具体实现代码在 SocketChannel.unsubscribe 方法内部
*
* @param {string} api
*/
_unsubscribe(api) {
throw new TypeError(
`Must implements ‘ChannelSubscribe._subscribe(${api})‘ interface.`
)
}
}
- SocketChannel.js:
import Stomp from ‘stompjs‘
import SockJS from ‘sockjs-client‘
import ChannelSubscribe from ‘./ChannelSubscribe‘
import ‘./helpers/rewrite-receive-info‘
import { debug } from ‘@@/utils/index‘
import { handleUserMessage } from ‘./handlers/user-message‘
import { handlePublishVersion } from ‘./handlers/publish-version‘
import {
WEB_SOCKET_API,
SUBSCRIBE_PUB_API,
SUBSCRIBE_USER_API
} from ‘./config/subscribe-url‘
import { MAX_RETRY_COUNT, RECONNECT_DURATION_TIME } from ‘./config/constants‘
import {
CONNECT_ERROR,
LOST_CONNECTION,
NOT_CONNECTED
} from ‘./config/error-message‘
/* 已重试次数 */
let retryTimes = 0
export default class SocketChannel {
/* 私有属性 _client 实例引用对象 */
/** @type {Stomp.Client|null} */
static _client = null
/* 私有属性 _error 连接错误实例 */
/** @type {Error|null} */
static _error = null
/**
* 建立 socket 连接
* @param {number} userId
*/
static connect(userId = ‘‘) {
if (!userId) {
throw new TypeError(
`The userId is required for ‘Socket.connect(userId: number)‘ method not ${typeof userId}!`
)
}
const socket = new SockJS(WEB_SOCKET_API)
// http://jmesnil.net/stomp-websocket/doc/
const client = Stomp.over(socket)
if (client && client.ws && client.ws instanceof WebSocket) {
// https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API
const ws = client.ws
// https://github.com/sockjs/sockjs-client/issues/176#issuecomment-135124313
ws.onerror((error) => {
SocketChannel._client = null
SocketChannel._error = new Error(error)
})
ws.onclose(() => {
SocketChannel._client = null
SocketChannel._error = new Error(‘WebSocket 已断开连接‘)
})
}
client.connect(
{},
() => {
debug(`连接成功`)
// 连接成功
retryTimes = 0
SocketChannel._client = client
SocketChannel._error = null
// 订阅系统内置 topic
client.subscribe(SUBSCRIBE_PUB_API, handlePublishVersion)
client.subscribe(
‘/p2p/‘ + userId + SUBSCRIBE_USER_API,
handleUserMessage(userId)
)
// 订阅 Invokers topic
ChannelSubscribe.Run(client)
},
(error) => {
debug(`连接失败`)
// 连接失败
SocketChannel._client = null
SocketChannel._error = new Error(error || ‘WebSocket 连接失败‘)
ChannelSubscribe.Run(client)
// 尝试重连 ${MAX_RETRY_COUNT} 次
if (retryTimes <= MAX_RETRY_COUNT) {
setTimeout(() => {
debug(`Socket 接口异常,正在重连第${++retryTimes}次`)
SocketChannel.connect && SocketChannel.connect(userId)
}, /* 使用 2 的 n 次方代替原先的间隔常量 */ 2 ** retryTimes * RECONNECT_DURATION_TIME)
}
}
)
}
/**
* subscribe => SubApp Invoker subscribe API
* @param {string} api
* @param {(data: any) => void} listenerFn
* @param {(error: Error) => void} [failedFn]
*
* 此时需要处理两种情况:
* 1. 此时的 client 还未连接
* 1.1 放进 _SubscribeMap 等待连接时按顺序执行连接
* 1.2 连接成功后执行
* 2. 此时的 client 已经连接,则直接使用 client 开始订阅
* 2.1 订阅成功 - resolver
* 2.2 订阅失败 - rejecter
*/
static subscribe(api, listenerFn, failedFn) {
const instance = new ChannelSubscribe(api, listenerFn, failedFn)
/* 实现 ChannelSubscribe._subscribe 接口 */
instance._subscribe = (/** @type {Stomp.Client|null} */ client) => {
const { _client, _error } = SocketChannel
/** @type {Stomp.Client|null} */
client = client || _client
return new Promise((resolve, reject) => {
const onResolve = listenerFn || ((value) => resolve(value))
const onReject = failedFn || ((error) => reject(error))
// 还未连接
if (!client) {
return onReject(new Error(NOT_CONNECTED))
}
// 连接出现错误
if (_error) {
return onReject(new Error(CONNECT_ERROR))
}
// 已断开连接
if (client && !client.connected) {
return onReject(new Error(LOST_CONNECTION))
}
// 已连接
if (client && client.connected) {
/* 赋值 instance._subscription 内部属性 */
instance._subscription = client.subscribe(api, ({ body } = {}) => {
onResolve(typeof body === ‘string‘ ? JSON.parse(body) : body)
})
// 如果之前 ChannelSubscribeMap 中已存在该 api
// FIXME:即当前接口是重连后发起,需要被立即执行
// 这样做的目的是清除客户端 socket 连接的异常状态
if (ChannelSubscribe._map.has(api)) {
onResolve({})
}
}
})
}
/* 实现 ChannelSubscribe._unsubscribe 接口 */
instance._unsubscribe = (/** @type {string} */ api) => {
/** @type {import(‘stompjs‘).Subscription} */
const subscription = instance._subscription
subscription && subscription.unsubscribe()
return ChannelSubscribe._map.delete(api)
}
debugger
return instance.run()
}
/**
* 取消订阅
* @param {string} api
*/
static unsubscribe(api) {
if (ChannelSubscribe._map.has(api)) {
const instance = ChannelSubscribe._map.get(api)
return instance._unsubscribe()
}
}
}