GatewayWorker 实现即时消息

GatewayWorker介绍:

GatewayWorker基于Workerman开发的一个项目框架,用于快速开发TCP长连接应用,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等

GatewayWorker使用经典的Gateway和Worker进程模型。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。

GatewayWorker提供非常方便的API,可以全局广播数据、可以向某个群体广播数据、也可以向某个特定客户端推送数据。配合Workerman的定时器,也可以定时推送数据。

为什么使用GatewayWorker?

项目中需要做一个即时消息功能,研究过workerman也能实现即时消息,不过在多进程时,暂时没有什么好方法可以处理多进程之间进行通信,而GatewayWorker是在Workerman基础上开发的,多进程之间通信也解决了,并且可以处理较高的并发数。

1.下载及启动

http://www.workerman.net/download/GatewayWorker.zip

(1)解压缩GatewayWorker.zip

(2)进入GatewayWorker目录

(3)启动:

linux:

php start.php start

window:

双击 start_for_win.bat

2.修改服务端配置

GatewayWorker 实现即时消息

修改start_gateway.php

<?php
/**
 * Gateway类用于初始化Gateway进程。
 * Gateway进程是暴露给客户端的让其连接的进程。所有客户端的请求都是由Gateway接收然后分发给BusinessWorker处理,
 * 同样BusinessWorker也会将要发给客户端的响应通过Gateway转发出去。
 * @document http://doc2.workerman.net
 */

use \Workerman\Worker;
use \GatewayWorker\Gateway;

// 自动加载类
require_once __DIR__ . '/../vendor/autoload.php';

// gateway进程 设置协议和端口
//$gateway = new Gateway("websocket://192.168.10.103:1234"); // 若非本机访问 可用相应内网ip
$gateway = new Gateway("websocket://0.0.0.0:1234");
// gateway名称,status方便查看
$gateway->name = 'solveGateway';
// gateway进程数
$gateway->count = 4;
// 本机ip,分布式部署时使用内网ip
$gateway->lanIp = '127.0.0.1';
// 内部通讯起始端口,假如$gateway->count=4,起始端口为4000
// 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口 
$gateway->startPort = 4000;
// 服务注册地址
$gateway->registerAddress = '127.0.0.1:1238';

// 心跳间隔
$gateway->pingInterval = 10;
// 心跳数据
$tiao =  [
    'type' => -1,
    'content' => '心跳检测'
];
$gateway->pingData = json_encode($tiao);

// 如果不是在根目录启动,则运行runAll方法
if (!defined('GLOBAL_START')) {
    Worker::runAll();
}

修改Events.php

<?php
/**
 * Events回调类 用于处理消息业务
 *
 * 主逻辑
 * 主要是处理 onConnect onMessage onClose 三个方法
 * onConnect 和 onClose 如果不需要可以不用实现并删除
 */

use \GatewayWorker\Lib\Gateway;


/*
 * 用于检测业务代码死循环或者长时间阻塞等问题
 * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
 * 然后观察一段时间workerman.log看是否有process_timeout异常
 */
// declare(ticks=1);

class Events
{
    protected static $userInfo = [];

    protected static $loginClientId = [];

    // 返回数据结构
    protected static $return = ['type' => 0, 'content' => ''];

    // 假数据 在项目可从数据库查询
    protected static $users = [
        '18101345114' => ['id' => 1, 'name' => '1号', 'phone' => '18101345114', 'token' => '60e7ef4d3e7e342255'],
        '18101345115' => ['id' => 2, 'name' => '2号', 'phone' => '18101345115', 'token' => '60f13533c298b73792'],
    ];

    // 假数据 在项目可从数据库查询
    protected static $users2 = [
        '1' => ['id' => 1, 'name' => '1号', 'phone' => '18101345114', 'token' => '60e7ef4d3e7e342255'],
        '2' => ['id' => 2, 'name' => '2号', 'phone' => '18101345115', 'token' => '60f13533c298b73792'],
    ];

    /**
     * 当客户端连接时触发
     * 只有一个请求头 没有携带其他信息
     * @param int $client_id 连接id
     * @throws Exception
     */
    public static function onConnect($client_id)
    {
        // 向当前client_id发送数据 
        //Gateway::sendToClient($client_id, "建立连接中...\r\n");
    }

    /**
     * 当客户端发来消息时触发
     * @param int $client_id 连接id
     * @param mixed $message 具体消息
     * @throws Exception
     */
    public static function onMessage($client_id, $message)
    {
        // 判断格式
        $data = json_decode($message, true);
        if (empty($data)) {
            Gateway::closeClient($client_id, '消息格式不对,走你');
        }
        if (!in_array($client_id, array_keys(self::$loginClientId))) {
            // 鉴权
            $phone = $data['phone'] ?? '';
            $token = $data['token'] ?? '';
            if (empty($data) || empty($phone) || empty($token) || !preg_match('/^[0-9]{11}$/i', $phone)) {
                self::writeRecordLog("1.鉴权有误:参数错误 " . var_export($data, true) . "\r\n");
                self::$return['content'] = '鉴权不符,走你';
                Gateway::closeClient($client_id, json_encode(self::$return));
            }
            // 查询是否有此用户
            $user = self::$users[$phone] ?? [];
            if (empty($user) || $user['token'] != $token) {
                self::writeRecordLog("2.没有该用户或token不正确\r\n");
                self::$return['content'] = '没有该用户或token不正确,走你';
                Gateway::closeClient($client_id, json_encode(self::$return));
            }
            // 连接意外中断后 会重新生成的client_id
            if(isset(self::$userInfo[$user['id']])) {
                self::$return['content'] = '关闭之前的连接';
                Gateway::closeClient(self::$userInfo[$user['id']], json_encode(self::$return));
            }
            // uid与client_id绑定
            Gateway::bindUid($client_id, $user['id']);
            // 记录登录信息
            self::$loginClientId[$client_id] = $user['id'];
            self::$userInfo[$user['id']] = $client_id;
            self::$return['content'] = '登录成功';
            Gateway::sendToClient($client_id, json_encode(self::$return));
            self::writeRecordLog($user['phone'] . "登录成功\r\n");
        } else {
            // 发送消息
            if (isset($data['toid'])) {
                $request_data = ['type' => 1, 'content' => $data['content'], 'time' => date('Y.m-d H:i')];
                switch($data['type']) {
                    case 1://点对点文本消息
                        $user = self::$users2[$data['fromid']] ?? [];
                        $request_data['fromName'] = $user['name'] ?? '未知';
                        break;
                }
                //判断是否登录
                if(Gateway::isUidOnline($data['toid']) && !empty($request_data['fromName'])) {
                    Gateway::sendToUid($data['toid'], json_encode($request_data));
                }
            }
        }
    }

    /**
     * 当用户断开连接时触发
     * @param int $client_id 连接id
     * @throws Exception
     */
    public static function onClose($client_id)
    {
        $uid = self::$loginClientId[$client_id];
        unset(self::$loginClientId[$client_id]);
        unset(self::$userInfo[$uid]);
    }

     /**
     * 日志记录
     * @param $content
     */
    protected static function writeRecordLog($content)
    {
        $filePath = self::createLogFile('workerMan.log', 'logs');
        if (is_array($content)) {
            $content = var_export($content, true);
        }
        file_put_contents($filePath, $content, FILE_APPEND);
    }

    /**
     * 创建日志文件||文件夹 方法
     * @param string $fileName
     * @param string $folder
     * @param string $Suffix
     * @return string
     */
    protected static function createLogFile($fileName, $folder = 'logs', $Suffix = null)
    {
        $path = dirname(__DIR__) . '/' . $folder;
        if (!file_exists($path)) {
            mkdir($path, 0755, true);
        }
        $filePath = rtrim($path, '/') . '/' . $fileName;
        // 处理在windows系统对文件读取时转换文件路径
        if (substr(php_uname(), 0, 1) == 'W') {
            $filePath = str_replace('/', '\\', $filePath);
        }
        if (!empty($Suffix)) {
            $filePath .= '.' . $Suffix;
        }
        if (!file_exists($filePath)) {
            $fp = fopen($filePath, "w+");
            fclose($fp);
        }
        return $filePath;
    }
}

 3.客户端网页

因为只是一个demo,没有做登录操作,故做了两个单网页通过websocket访问服务端

liao.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>聊天</title>
    <style type="text/css">
        .talk_con {
            width: 600px;
            height: 500px;
            border: 1px solid #666;
            margin: 50px auto 0;
            background: #f9f9f9;
        }

        .talk_show {
            width: 580px;
            height: 420px;
            border: 1px solid #666;
            background: #fff;
            margin: 10px auto 0;
            overflow: auto;
        }

        .talk_input {
            width: 580px;
            margin: 10px auto 0;
        }

        .whotalk {
            width: 100px;
            height: 30px;
            float: left;
            outline: none;
        }

        .talk_word {
            width: 400px;
            height: 26px;
            padding: 0px;
            float: left;
            margin-left: 10px;
            outline: none;
            text-indent: 10px;
        }

        .talk_sub {
            width: 56px;
            height: 30px;
            float: left;
            margin-left: 10px;
        }

        .atalk {
            margin: 10px;
        }

        .atalk span {
            display: inline-block;
            background: #0181cc;
            border-radius: 10px;
            color: #fff;
            padding: 5px 10px;
        }

        .btalk {
            margin: 10px;
            text-align: right;
        }

        .btalk span {
            display: inline-block;
            background: #ef8201;
            border-radius: 10px;
            color: #fff;
            padding: 5px 10px;
        }
    </style>
</head>
<body>
<div class="talk_con">
    <div class="talk_show" id="words"></div>
    <div class="talk_input">
        <select class="whotalk" id="toid">
            <option value="2">2号</option>
        </select>
        <input type="text" class="talk_word" id="talkwords">
        <input type="button" value="发送" class="talk_sub" id="talksub">
    </div>
</div>
<script src="../../jquery-3.2.1.js"></script>
<script type="text/javascript">
    //var ws = new WebSocket('ws://192.168.10.103:1234');
    var ws = new WebSocket('ws://127.0.0.1:1234');
    ws.onopen = function () {
        var login = '{"phone":"18101345114","token":"60e7ef4d3e7e342255"}';
        ws.send(login);
    };
    ws.onmessage = function (e) {
        var msg = JSON.parse(e.data);
        var sender = '系统消息:';
        switch (msg.type) {
            case 1:
                sender = msg.fromName + ':';
                break;
            case 2:
                sender = '群消息:';
                return;
        }
        var data = sender + msg.content;
        if (msg.type != -1) {
            listMsg('<div class="atalk"><span>' + data + '</span></div>');
        }
    };

    $("#talksub").click(function () {
        var cont = $("#talkwords").val();
        if (cont == "") {
            alert("消息不能为空");
            return;
        }
        var toid = $("#toid").val();
        var data = '{"toid":"' + toid + '","fromid":"1","type":"1","content":"' + cont + '"}';
        ws.send(data);
        $("#talkwords").val('');
        listMsg('<div class="btalk"><span>1号:' + cont + '</span></div>');
    });

    /**
     * 将消息内容添加到输出框中,并将滚动条滚动到最下方
     */
    function listMsg(data) {
        var msg_list = document.getElementById("words");
        msg_list.innerHTML = msg_list.innerHTML + data;
        msg_list.scrollTop = msg_list.scrollHeight;
    }
</script>
</body>
</html>

liao2.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>聊天2</title>
    <style type="text/css">
        .talk_con {
            width: 600px;
            height: 500px;
            border: 1px solid #666;
            margin: 50px auto 0;
            background: #f9f9f9;
        }

        .talk_show {
            width: 580px;
            height: 420px;
            border: 1px solid #666;
            background: #fff;
            margin: 10px auto 0;
            overflow: auto;
        }

        .talk_input {
            width: 580px;
            margin: 10px auto 0;
        }

        .whotalk {
            width: 100px;
            height: 30px;
            float: left;
            outline: none;
        }

        .talk_word {
            width: 400px;
            height: 26px;
            padding: 0px;
            float: left;
            margin-left: 10px;
            outline: none;
            text-indent: 10px;
        }

        .talk_sub {
            width: 56px;
            height: 30px;
            float: left;
            margin-left: 10px;
        }

        .atalk {
            margin: 10px;
        }

        .atalk span {
            display: inline-block;
            background: #0181cc;
            border-radius: 10px;
            color: #fff;
            padding: 5px 10px;
        }

        .btalk {
            margin: 10px;
            text-align: right;
        }

        .btalk span {
            display: inline-block;
            background: #ef8201;
            border-radius: 10px;
            color: #fff;
            padding: 5px 10px;
        }
    </style>
</head>
<body>
<div class="talk_con">
    <div class="talk_show" id="words"></div>
    <div class="talk_input">
        <select class="whotalk" id="toid">
            <option value="1">1号</option>
        </select>
        <input type="text" class="talk_word" id="talkwords">
        <input type="button" value="发送" class="talk_sub" id="talksub">
    </div>
</div>
<script src="../../jquery-3.2.1.js"></script>
<script type="text/javascript">
    //var ws = new WebSocket('ws://192.168.10.103:1234');
    var ws = new WebSocket('ws://127.0.0.1:1234');
    ws.onopen = function () {
        var login = '{"phone":"18101345115","token":"60f13533c298b73792"}';
        ws.send(login);
    };
    ws.onmessage = function (e) {
        var msg = JSON.parse(e.data);
        var sender = '系统消息:';
        switch (msg.type) {
            case 1:
                sender = msg.fromName + ':';
                break;
            case 2:
                sender = '群消息:';
                return;
        }
        var data = sender + msg.content;
        if (msg.type != -1) {
            listMsg('<div class="atalk"><span>' + data + '</span></div>');
        }
    };

    $("#talksub").click(function () {
        var cont = $("#talkwords").val();
        if (cont == "") {
            alert("消息不能为空");
            return;
        }
        var toid = $("#toid").val();
        var data = '{"toid":"' + toid + '","fromid":"2","type":"1","content":"' + cont + '"}';
        ws.send(data);
        $("#talkwords").val('');
        listMsg('<div class="btalk"><span>2号:' + cont + '</span></div>');
    });

    /**
     * 将消息内容添加到输出框中,并将滚动条滚动到最下方
     */
    function listMsg(data) {
        var msg_list = document.getElementById("words");
        msg_list.innerHTML = msg_list.innerHTML + data;
        msg_list.scrollTop = msg_list.scrollHeight;
    }
</script>
</body>
</html>

4.启动后,进行聊天

服务端启动

GatewayWorker 实现即时消息

 浏览器访问

GatewayWorker 实现即时消息

GatewayWorker 实现即时消息

 比较简单的就实现了互相通信聊天!

上一篇:电信客服_Kafka消费者写入HBase


下一篇:【SQL】更新到另一个库的表。