Rpc双向调用(Ts)


//@ts-ignore
import * as WS from "ws"
const TypeNumber = 1;
const TypeString = 2;
const TypeBoole = 3;
const TypeBuffer = 4;

const call = 0
const message = 1
const noValue = 2
export class Rpc_Base {
    protected _handlers = new Map<string, Function>()
    protected resolvemap = new Map<number, Function>();
    onOpen() {
    }
    onClose() {
    }

    getArraysByBuffer(argsBuff: Buffer) {
        let messType = argsBuff.readUInt8(0);
        let socketIndex = argsBuff.readUInt32LE(1);
        let funNameEnd = argsBuff.readUInt8(5);
        let funName = argsBuff.slice(6, funNameEnd + 6).toString();
        let argsOffset = funNameEnd + 6;
        let args = this.getArgs(argsBuff.slice(argsOffset));
        let obj = {
            messageType: messType,
            socketIndex: socketIndex,
            funName: funName,
            args: args
        }
        return obj;
    }
    getArgs(data: Buffer) {
        let res = [];
        let offset = 0;
        while (offset != data.length) {
            let type = data.readUInt8(offset);
            offset++
            switch (type) {
                case TypeNumber:
                    res.push(data.readDoubleLE(offset))
                    offset += 8;
                    break;
                case TypeString:
                    let strleng = data.readUInt32LE(offset);
                    offset += 4;
                    res.push(data.slice(offset, offset + strleng).toString())
                    offset += strleng;
                    break;
                case TypeBuffer:
                    let bufflen = data.readUInt32LE(offset);
                    offset += 4;
                    res.push(data.slice(offset, offset + bufflen))
                    offset += bufflen;
                    break;
                case TypeBoole:
                    res.push(data.readUInt8(offset) == 1 ? true : false)
                    offset++;
                    break;
                default:
                    throw new Error("类型错误");
            }
        }
        return res;
    }

    getBufferByArgs(method: string, args) {
        let size = method.length + 1;
        for (let i = 0; i < args.length; i++) {
            size++
            switch (typeof (args[i])) {
                case "number":
                    size += 8
                    break
                case "boolean":
                    size += 1
                    break;
                case "string":
                    let strBuff = Buffer.from(args[i]);
                    size += strBuff.length + 4
                    break;
                case "object":
                    if (Buffer.isBuffer(args[i])) {
                        size += args[i].length + 4
                    } else {
                        throw new Error("不支持的传递类型")
                    }
                    break;
                default:
                    throw new Error("不支持的传递类型")
            }
        }
        let buff = Buffer.alloc(size)
        buff.writeUInt8(method.length, 0)
        buff.write(method, 1)
        let offset = method.length + 1;
        for (let i = 0; i < args.length; i++) {
            switch (typeof (args[i])) {
                case "number":
                    buff.writeUInt8(TypeNumber, offset)
                    buff.writeDoubleLE(args[i], offset + 1)
                    offset += 9
                    break;
                case "boolean":
                    buff.writeUInt8(TypeBoole, offset)
                    buff.writeUInt8(args[i] == true ? 1 : 0, offset + 1)
                    offset += 2
                    break;
                case "string":
                    buff.writeUInt8(TypeString, offset)
                    let strBuff = Buffer.from(args[i]);
                    buff.writeInt32LE(strBuff.length, offset + 1)
                    strBuff.copy(buff, offset + 5, 0, strBuff.length)
                    offset += args[i].length + 5
                    break;
                case "object":
                    if (Buffer.isBuffer(args[i])) {
                        let arrBuff = args[i] as Buffer
                        buff.writeUInt8(TypeBuffer, offset);
                        buff.writeInt32LE(args[i].length, offset + 1)
                        arrBuff.copy(buff, offset + 5, 0, args[i].length)
                        offset += args[i].length + 5
                    } else {
                        throw new Error("不支持的传递类型")
                    }
                    break;
                default:
                    throw new Error("不支持的传递类型")
            }

        }
        return buff;
    }

    public registerFuns(serverClass: any) {
        let funs: string[] = serverClass.funs
        try {
            for (let index = 0; index < funs.length; index++) {
                let funcName = funs[index];
                if (serverClass[funcName] && typeof (serverClass[funcName]) == 'function') {
                    this._handlers.set(funcName, serverClass[funcName].bind(serverClass))
                    console.log("注册rpc函数", funcName);
                } else {
                    console.error('注册函数失败:' + funcName)
                }
            }
        } catch (e) {
            console.error(e)
        }
    }
    async onMessage(index: number, msg: Buffer, ws) {
        let obj = this.getArraysByBuffer(msg);
        let funName: string;
        let args = obj.args;
        let argsArr = [];
        if (index == -1) {
            argsArr = args;
        } else {
            for (let i = 0; i < args.length; i++) {
                if (args[i].type == "Buffer") {
                    let buff = Buffer.from(args[i]);
                    argsArr[i] = buff;
                } else {
                    argsArr[i] = args[i];
                }
            }
        }
        switch (obj.messageType) {
            case call:
                funName = obj.funName;
                let ret;
                ret = await this._handlers.get(funName)(index, ...argsArr);
                let buff = this.copyBuff(funName, ret, message, obj.socketIndex)
                if (!ws) {
                    console.error("该socket不存在");
                    return
                }
                ws.send(buff)
                break;
            case message:
                this.resolvemap.get(obj.socketIndex)(argsArr);
                break;
            case noValue:
                funName = obj.funName;
                this._handlers.get(funName)(index, ...argsArr);
            default:
                break;
        }
    }

    copyBuff(funcName: string, args: any, meType: number, socketIndex: number = 0) {
        let buff = this.getBufferByArgs(funcName, args);
        let arrBuff = Buffer.alloc(buff.length + 1 + 4);
        arrBuff.writeUInt8(call, 0)
        arrBuff.writeUInt32LE(socketIndex, 1)
        buff.copy(arrBuff, 5, 0, buff.length);
        return arrBuff
    }

}

export class Rpc_Server extends Rpc_Base {
    private _defaultListenPort = 8010
    private _clientId = 0;
    private _socketArr = [];
    get socketArr() {
        return this._socketArr
    }

    private index = 0;
    public startServer(listenPort: number = this._defaultListenPort) {
        let wss = new WS.Server({ port: listenPort });
        wss.on("connection", (ws, req) => {
            let index = this._clientId;
            this._socketArr[index] = ws;
            this._clientId++;
            console.log('rpc client connected ', req.connection.remoteAddress + ":" + req.connection.remotePort)
            ws.on('message', (date: Buffer) => {
                this.onMessage(index, date, ws);
            });

            ws.on("error", (err) => {
                console.error("rpc client connection is error! ", err);
            });

            ws.on("close", () => {
                this.onClose()
            });
        })
    }

    async call(clientId: number, funcName: string, args: any) {
        return new Promise((resolve, reject) => {
            let socketIndex = this.index;
            let buff = this.copyBuff(funcName, args, call, socketIndex);
            this.index++
            let ws = this._socketArr[clientId];
            if (!ws) {
                console.error("该socket不存在");
                resolve("该socket不存在")
                return
            }
            ws.send(buff)
            this.resolvemap.set(socketIndex, resolve);

        })
    }
    copyBuff(funcName: string, args: any, meType: number, socketIndex?: number) {
        let buff = this.getBufferByArgs(funcName, args);
        let arrBuff = Buffer.alloc(buff.length + 1 + 4);
        arrBuff.writeUInt8(meType, 0)
        arrBuff.writeUInt32LE(socketIndex, 1)
        buff.copy(arrBuff, 5, 0, buff.length);
        return arrBuff
    }

    public send(clientId: number, funName: string, args: any) {
        let ws = this._socketArr[clientId];
        if (!ws) {
            console.error("该socket不存在");
            return
        }
        let buff = this.copyBuff(funName, args, noValue)
        ws.send(buff)
    }
}

export class Rpc_Client extends Rpc_Base {
    public isClose: boolean = true
    protected _socket: WS
    public clientId: number
    private _host: string;
    private _port: number;
    private _name: string
    protected _timeReconnect = 10 * 1000;
    resolvemap = new Map<number, Function>();
    private index = 0;
    get name() {
        return this._name
    }
    get host() {
        return this._host
    }
    get port() {
        return this._port
    }
    Functionmap = new Map<string, Function>();
    public i: number = 0;
    public connectRpcServer() {
        let url = "ws://" + this._host + ":" + this._port
        let ws_client: WS = new WS(url)
        this._socket = ws_client;
        let self = this;
        ws_client.on("open", (date) => {
            self.isClose = false;
            self.onOpen();
        })

        ws_client.on('message', (msg: Buffer) => {
            this.onMessage(-1, msg, ws_client)
        });

        //断线重连
        ws_client.on("close", () => {
            this.isClose = true;
            this.onClose()
            console.log("rpc server close! ")
        })
        //失败重连
        ws_client.on("error", (err) => {
            this.isClose = true;
            console.error('rpc client error! ', err)
        })
    }
    public send(funName: string, args: any) {
        let buff = this.copyBuff(funName, args, noValue)
        this._socket.send(buff)
    }
    async call(funcName: string, args: any) {
        return new Promise((resolve, reject) => {
            let socketIndex = this.index;
            let buff = this.copyBuff(funcName, args, call, socketIndex);
            this.index++
            this._socket.send(buff)
            this.resolvemap.set(socketIndex, resolve);
        })
    }

    /**
      * 初始化RPC CLIENT
      */
    public startClient(host: string, port: number, name: string, version: string) {
        this._host = host;
        this._port = port;
        this._name = name;
        this.setReconnectServer();
        this.connectRpcServer();
    }

    /**
     * 重连RPC SERVER
     */
    private setReconnectServer() {
        setInterval(() => {
            if (this.isClose) {
                this.connectRpcServer()
            }
        }, this._timeReconnect)
    }
}

 

 
上一篇:C文件读写#我有1支铅笔,但是仓库里有353628支#


下一篇:说说redis中简单动态字符串(SDS)的空间预分配实现