//@ts-ignore
import * as WS from "ws"
const TypeNumber = 1;
const TypeString = 2;
const TypeBoole = 3;
const TypeBuffer = 4;
const call = 0
const message = 1
const noValue = 2
export class Rpc_Base {
protected _handlers = new Map<string, Function>()
protected resolvemap = new Map<number, Function>();
onOpen() {
}
onClose() {
}
getArraysByBuffer(argsBuff: Buffer) {
let messType = argsBuff.readUInt8(0);
let socketIndex = argsBuff.readUInt32LE(1);
let funNameEnd = argsBuff.readUInt8(5);
let funName = argsBuff.slice(6, funNameEnd + 6).toString();
let argsOffset = funNameEnd + 6;
let args = this.getArgs(argsBuff.slice(argsOffset));
let obj = {
messageType: messType,
socketIndex: socketIndex,
funName: funName,
args: args
}
return obj;
}
getArgs(data: Buffer) {
let res = [];
let offset = 0;
while (offset != data.length) {
let type = data.readUInt8(offset);
offset++
switch (type) {
case TypeNumber:
res.push(data.readDoubleLE(offset))
offset += 8;
break;
case TypeString:
let strleng = data.readUInt32LE(offset);
offset += 4;
res.push(data.slice(offset, offset + strleng).toString())
offset += strleng;
break;
case TypeBuffer:
let bufflen = data.readUInt32LE(offset);
offset += 4;
res.push(data.slice(offset, offset + bufflen))
offset += bufflen;
break;
case TypeBoole:
res.push(data.readUInt8(offset) == 1 ? true : false)
offset++;
break;
default:
throw new Error("类型错误");
}
}
return res;
}
getBufferByArgs(method: string, args) {
let size = method.length + 1;
for (let i = 0; i < args.length; i++) {
size++
switch (typeof (args[i])) {
case "number":
size += 8
break
case "boolean":
size += 1
break;
case "string":
let strBuff = Buffer.from(args[i]);
size += strBuff.length + 4
break;
case "object":
if (Buffer.isBuffer(args[i])) {
size += args[i].length + 4
} else {
throw new Error("不支持的传递类型")
}
break;
default:
throw new Error("不支持的传递类型")
}
}
let buff = Buffer.alloc(size)
buff.writeUInt8(method.length, 0)
buff.write(method, 1)
let offset = method.length + 1;
for (let i = 0; i < args.length; i++) {
switch (typeof (args[i])) {
case "number":
buff.writeUInt8(TypeNumber, offset)
buff.writeDoubleLE(args[i], offset + 1)
offset += 9
break;
case "boolean":
buff.writeUInt8(TypeBoole, offset)
buff.writeUInt8(args[i] == true ? 1 : 0, offset + 1)
offset += 2
break;
case "string":
buff.writeUInt8(TypeString, offset)
let strBuff = Buffer.from(args[i]);
buff.writeInt32LE(strBuff.length, offset + 1)
strBuff.copy(buff, offset + 5, 0, strBuff.length)
offset += args[i].length + 5
break;
case "object":
if (Buffer.isBuffer(args[i])) {
let arrBuff = args[i] as Buffer
buff.writeUInt8(TypeBuffer, offset);
buff.writeInt32LE(args[i].length, offset + 1)
arrBuff.copy(buff, offset + 5, 0, args[i].length)
offset += args[i].length + 5
} else {
throw new Error("不支持的传递类型")
}
break;
default:
throw new Error("不支持的传递类型")
}
}
return buff;
}
public registerFuns(serverClass: any) {
let funs: string[] = serverClass.funs
try {
for (let index = 0; index < funs.length; index++) {
let funcName = funs[index];
if (serverClass[funcName] && typeof (serverClass[funcName]) == 'function') {
this._handlers.set(funcName, serverClass[funcName].bind(serverClass))
console.log("注册rpc函数", funcName);
} else {
console.error('注册函数失败:' + funcName)
}
}
} catch (e) {
console.error(e)
}
}
async onMessage(index: number, msg: Buffer, ws) {
let obj = this.getArraysByBuffer(msg);
let funName: string;
let args = obj.args;
let argsArr = [];
if (index == -1) {
argsArr = args;
} else {
for (let i = 0; i < args.length; i++) {
if (args[i].type == "Buffer") {
let buff = Buffer.from(args[i]);
argsArr[i] = buff;
} else {
argsArr[i] = args[i];
}
}
}
switch (obj.messageType) {
case call:
funName = obj.funName;
let ret;
ret = await this._handlers.get(funName)(index, ...argsArr);
let buff = this.copyBuff(funName, ret, message, obj.socketIndex)
if (!ws) {
console.error("该socket不存在");
return
}
ws.send(buff)
break;
case message:
this.resolvemap.get(obj.socketIndex)(argsArr);
break;
case noValue:
funName = obj.funName;
this._handlers.get(funName)(index, ...argsArr);
default:
break;
}
}
copyBuff(funcName: string, args: any, meType: number, socketIndex: number = 0) {
let buff = this.getBufferByArgs(funcName, args);
let arrBuff = Buffer.alloc(buff.length + 1 + 4);
arrBuff.writeUInt8(call, 0)
arrBuff.writeUInt32LE(socketIndex, 1)
buff.copy(arrBuff, 5, 0, buff.length);
return arrBuff
}
}
export class Rpc_Server extends Rpc_Base {
private _defaultListenPort = 8010
private _clientId = 0;
private _socketArr = [];
get socketArr() {
return this._socketArr
}
private index = 0;
public startServer(listenPort: number = this._defaultListenPort) {
let wss = new WS.Server({ port: listenPort });
wss.on("connection", (ws, req) => {
let index = this._clientId;
this._socketArr[index] = ws;
this._clientId++;
console.log('rpc client connected ', req.connection.remoteAddress + ":" + req.connection.remotePort)
ws.on('message', (date: Buffer) => {
this.onMessage(index, date, ws);
});
ws.on("error", (err) => {
console.error("rpc client connection is error! ", err);
});
ws.on("close", () => {
this.onClose()
});
})
}
async call(clientId: number, funcName: string, args: any) {
return new Promise((resolve, reject) => {
let socketIndex = this.index;
let buff = this.copyBuff(funcName, args, call, socketIndex);
this.index++
let ws = this._socketArr[clientId];
if (!ws) {
console.error("该socket不存在");
resolve("该socket不存在")
return
}
ws.send(buff)
this.resolvemap.set(socketIndex, resolve);
})
}
copyBuff(funcName: string, args: any, meType: number, socketIndex?: number) {
let buff = this.getBufferByArgs(funcName, args);
let arrBuff = Buffer.alloc(buff.length + 1 + 4);
arrBuff.writeUInt8(meType, 0)
arrBuff.writeUInt32LE(socketIndex, 1)
buff.copy(arrBuff, 5, 0, buff.length);
return arrBuff
}
public send(clientId: number, funName: string, args: any) {
let ws = this._socketArr[clientId];
if (!ws) {
console.error("该socket不存在");
return
}
let buff = this.copyBuff(funName, args, noValue)
ws.send(buff)
}
}
export class Rpc_Client extends Rpc_Base {
public isClose: boolean = true
protected _socket: WS
public clientId: number
private _host: string;
private _port: number;
private _name: string
protected _timeReconnect = 10 * 1000;
resolvemap = new Map<number, Function>();
private index = 0;
get name() {
return this._name
}
get host() {
return this._host
}
get port() {
return this._port
}
Functionmap = new Map<string, Function>();
public i: number = 0;
public connectRpcServer() {
let url = "ws://" + this._host + ":" + this._port
let ws_client: WS = new WS(url)
this._socket = ws_client;
let self = this;
ws_client.on("open", (date) => {
self.isClose = false;
self.onOpen();
})
ws_client.on('message', (msg: Buffer) => {
this.onMessage(-1, msg, ws_client)
});
//断线重连
ws_client.on("close", () => {
this.isClose = true;
this.onClose()
console.log("rpc server close! ")
})
//失败重连
ws_client.on("error", (err) => {
this.isClose = true;
console.error('rpc client error! ', err)
})
}
public send(funName: string, args: any) {
let buff = this.copyBuff(funName, args, noValue)
this._socket.send(buff)
}
async call(funcName: string, args: any) {
return new Promise((resolve, reject) => {
let socketIndex = this.index;
let buff = this.copyBuff(funcName, args, call, socketIndex);
this.index++
this._socket.send(buff)
this.resolvemap.set(socketIndex, resolve);
})
}
/**
* 初始化RPC CLIENT
*/
public startClient(host: string, port: number, name: string, version: string) {
this._host = host;
this._port = port;
this._name = name;
this.setReconnectServer();
this.connectRpcServer();
}
/**
* 重连RPC SERVER
*/
private setReconnectServer() {
setInterval(() => {
if (this.isClose) {
this.connectRpcServer()
}
}, this._timeReconnect)
}
}