MQTT客户端实战:MQTT客户端和MQTT代理进行连接通信

MQTT客户端实战:MQTT客户端和MQTT代理进行连接通信_php

EMQX安装

EMQX服务器安装

安装文档,见链接不另外写

https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html

启动 EMQX

启动为一个 systemd 服务:

sudo systemctl start emqx

在windows安装客户端

在线 MQTT WebSocket 客户端工具,MQTTX Web 是开源的 MQTT 5.0 浏览器客户端,但是经我测试没有成功,好像有bug.

建议使用MQTT 5.0 命令行客户端工具。使用命令行上的 MQTTX,旨在帮助开发者在不需要使用图形化界面的基础上,也能更快的开发和调试 MQTT 服务与应用。

MQTT客户端实战:MQTT客户端和MQTT代理进行连接通信_服务器_02

由于是后期被写的博文,图是借官方的。请自行区分一下。

平台安装后的地址

1,平台的地址

  • http://127.0.0.1:18083后台登录 用户名:test 密码:test

Laravel中处理MQTT订阅

1,安装MQTT客户端库

在Laravel项目中安装一个MQTT客户端库。你可以使用Composer来安装 php-mqtt/client:

composer require php-mqtt/client

2, 新建command文件

文件路径:app/Console/Commands/MqttClientCommand.php

这段PHP代码是一个用于处理MQTT消息的命令行工具,它使用了Simps的MQTT客户端库。代码中定义了两个类:MQTTUserConfig 和 MqttClientCommand。

MQTTUserConfig 类定义了一些常量,这些常量用于配置MQTT连接。

MqttClientCommand 类继承自 Illuminate\Console\Command,是一个命令行工具,用于订阅或发布MQTT消息。

<?php

namespace App\Console\Commands;

use App\Http\Controllers\Wxapi\DeviceReportController;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;

use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V5;
use Simps\MQTT\Tools\Common;
use Simps\MQTT\Client;
use Simps\MQTT\Config\ClientConfig;
use Simps\MQTT\Hex\ReasonCode;

use Swoole\Coroutine;
use Illuminate\Support\Facades\Redis;

class MQTTUserConfig
{    
    const SIMPS_MQTT_REMOTE_HOST = '*';
    const SIMPS_MQTT_PORT = 1883;
    const SIMPS_MQTT_SUBSCRIBE_PORT = 8083;
    const SIMPS_MQTT_USER = 'test*';
    const SIMPS_MQTT_PASSWORD = 'test*';
}

class MqttClientCommand extends Command
{
    protected $signature = 'mqtt:handle {param1}';

    protected $description = '订阅物联网mqtt消息 param1:null 订阅消息, param1:public 发布消息';
    protected  $mqtt ;
    const SWOOLE_MQTT_CONFIG = [
        'open_mqtt_protocol' => true,
        'package_max_length' => 2 * 1024 * 1024,
        'connect_timeout' => 5.0,
        'write_timeout' => 5.0,
        'read_timeout' => 5.0,
    ];

    //模拟设备
    const CLiENT_IDs = [
        'mqttx_devA',
        'mqttx_devB',
        'mqttx_devC',
        'mqttx_devD'
    ];

    public function __construct()
    {
        parent::__construct();
    }

    public function handle()
    {

        $param1 =$this->argument('param1');
//        $param2 =$this->argument('param2');
        if ($param1=='subscribe') {
            $this->info('启动订阅...');
            $this->subscribeMqtt();
        } elseif ($param1=='public') {
            $this->info('启动发布...');
            $this->publishMQTT();
        }
        echo '\r\n\r\n分配工作执行完成!!!';

    }



    protected function getTestMQTT5ConnectConfig()
        {

            $config = new ClientConfig();
            $UserConfig = new MQTTUserConfig();
            return $config->setUserName($UserConfig::SIMPS_MQTT_USER)
                ->setPassword($UserConfig::SIMPS_MQTT_PASSWORD)
                ->setClientId(Client::genClientID())
                ->setKeepAlive(10)
                ->setDelay(3000) // 3s
                ->setMaxAttempts(5)
                ->setProperties([
                    'session_expiry_interval' => 60,
                    'receive_maximum' => 65535,
                    'topic_alias_maximum' => 65535,
                ])
                ->setProtocolLevel(5)
                ->setSwooleConfig( [
                    'open_mqtt_protocol' => true,
                    'package_max_length' => 2 * 1024 * 1024,
                    'connect_timeout' => 5.0,
                    'write_timeout' => 5.0,
                    'read_timeout' => 5.0,
                ]);

    }

    private function heartbeat($message) {
        if ($message) {
            parse_str($message,$array);
            $device = $array['imei'];
            $hash = ':mqtt:heartbeat:online'.":{$device}";
            Redis::expire($hash,30);  ##30s有效
            Redis::sAdd($hash,1);
        }

    }
    /*
     * 订阅
     *  private function subscribeMqtt(){


        Coroutine\run(function () {
            $client = new Client('39.108.230.87', 1883, $this->getTestMQTT5ConnectConfig());
            ....
     */
    private function subscribeMqtt(){

        Coroutine\run(function () {
            $UserConfig = new MQTTUserConfig();
            $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, 1883,
            $this->getTestMQTT5ConnectConfig());
            $will = [
                'topic' => 'simps-mqtt/dinweiyi/delete',
                'qos' => 1,
                'retain' => 0,
                'message' => 'byebye',
                'properties' => [
                    'will_delay_interval' => 60,
                    'message_expiry_interval' => 60,
                    'content_type' => 'test',
                    'payload_format_indicator' => true, // false 0 1
                ],
            ];
            $client->connect(true, $will);

            $topics['simps-mqtt/dinweiyi/subscribe_message'] = [
                'qos' => 2,
                'no_local' => true,
                'retain_as_published' => true,
                'retain_handling' => 2,
            ];

            $res = $client->subscribe($topics);
            $timeSincePing = time();
            var_dump($res);

            echo '\r\n\r\n connect success !!!';
            while (true) {
                try {
                    $buffer = $client->recv();
                    $message = null;
                    if ($buffer && $buffer !== true) {
                        $message = $buffer["message"];

                        // QoS1 PUBACK
                        if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {
                            $client->send(
                                [
                                    'type' => Types::PUBACK,
                                    'message_id' => $buffer['message_id'],
                                ],
                                false
                            );
                        }
                        if ($buffer['type'] === Types::DISCONNECT) {
                            echo sprintf(
                                "Broker is disconnected, The reason is %s [%d]\n",
                                ReasonCode::getReasonPhrase($buffer['code']),
                                $buffer['code']
                            );
                            $client->close($buffer['code']);
                            break;
                        }
                        $reportObj = new DeviceReportController();

                        $ret = $reportObj->store($message);
                        var_dump("182>>>",$ret);
                        unset($reportObj);
                    }
                    if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {
                        $buffer = $client->ping();
                        if ($buffer) {
                            echo 'send ping success ...' ;
                            $this->heartbeat($message);
                            $timeSincePing = time();
                        }
                    }

                } catch (\Throwable $e) {
                    throw $e;
                }
            }

        });


    }

    protected function getMessage() {
        $client_ids = [
            'mqttx_devA',
//            'mqttx_devB',
            'mqttx_devC',
            'mqttx_devD'
        ];
        $message = [];
        $message['clientID'] = self::CLiENT_IDs[array_rand($client_ids)];
        $message['time'] = time();
        $message['location'] = ["x"=>rand(1000,9999),"y"=>rand(1000,9999)];
        return json_encode($message);
    }
    /*
     * 发布
     */
    public function publishMQTT() {
        Coroutine\run(function () {
            $UserConfig = new MQTTUserConfig();
            $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, $UserConfig::SIMPS_MQTT_PORT,
            $this->getTestMQTT5ConnectConfig());
            $client->connect();
            while (true) {
                $message = $this->getMessage();
                $response = $client->publish(
                    'simps-mqtt/user/subscribe_message',
                    $message,
                    1,
                    0,
                    0,
                    [
                        'topic_alias' => 1,
                        'message_expiry_interval' => 12,
                    ]
                );
                var_dump( 'publishMQTT>>>',$message);
                Coroutine::sleep(1);
            }
        });
    }

}

3, 代码流程图

上述PHP代码的流程图:

graph TD
    A[开始] --> B[构造函数 __construct]
    B --> C[handle 方法]
    C --> D{param1 参数}
    D -- subscribe --> E[调用 subscribeMqtt]
    D -- public --> F[调用 publishMQTT]
    E --> G[Coroutine 运行 subscribeMqtt]
    G --> H[创建 MQTT 客户端并连接]
    H --> I[设置遗嘱消息]
    I --> J[订阅主题]
    J --> K{接收消息}
    K -- 收到消息 --> L[处理消息]
    K -- 心跳超时 --> M[发送心跳]
    L --> N[心跳函数 heartbeat]
    N --> O[存储消息]
    O --> P{是否断开连接}
    P -- 是 --> Q[关闭连接]
    P -- 否 --> K
    F --> R[Coroutine 运行 publishMQTT]
    R --> S[创建 MQTT 客户端并连接]
    S --> T{循环发布消息}
    T --> U[获取测试消息]
    U --> V[发布消息]
    V --> T
    Q --> W[结束]
    V --> W

流程说明:

  1. 开始:程序启动。
  2. 构造函数 __construct:初始化命令行工具。
  3. handle 方法:处理命令行输入。
  4. param1 参数:根据输入的参数决定是订阅还是发布。
  5. 调用 subscribeMqtt:如果参数是subscribe,则调用此方法。
  6. 调用 publishMQTT:如果参数是public,则调用此方法。
  7. Coroutine 运行 subscribeMqtt:在协程中运行订阅方法。
  8. 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
  9. 设置遗嘱消息:设置遗嘱消息,以便在客户端意外断开时发送。
  10. 订阅主题:订阅特定的MQTT主题。
  11. 接收消息:持续监听并接收消息。
  12. 处理消息:对接收到的消息进行处理。
  13. 心跳函数 heartbeat:检查设备心跳。
  14. 存储消息:将消息存储到数据库或其他存储系统。
  15. 是否断开连接:检查客户端是否断开连接。
  16. 关闭连接:如果断开,则关闭连接。
  17. Coroutine 运行 publishMQTT:在协程中运行发布方法。
  18. 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
  19. 循环发布消息:循环发布消息。
  20. 获取测试消息:生成要发布的测试消息。
  21. 发布消息:将消息发布到MQTT服务器。
  22. 结束:程序结束。

后台常驻运行

1,php artisan命令在后台运行
  1. 打开您的终端或SSH到您的服务器。
  2. 使用nohup命令运行您的Artisan命令进行测试,如下所示
php /www/wwwroot/denwei_laraveladmin/artisan mqtt:handle subscribe

3.命令行的php的版本与web php的版本号要一致

2,使用宝塔的守护进程开启进程

MQTT客户端实战:MQTT客户端和MQTT代理进行连接通信_php_03

也可以添加守护进程。 以上2种最好是只选一个

测试

打开emqx web ,在浏览器输入http://127.0.0.0.1:18083/#/websocket

MQTT客户端实战:MQTT客户端和MQTT代理进行连接通信_php_04

MQTT客户端实战:MQTT客户端和MQTT代理进行连接通信_服务器_05

主题:

主题跟php代码内的主题是一致的。

Payload:

是发出的字符串。由于在测试中遇到json字符串转换失败。所以选择了组装字符格式。

已发送

会出现发布的主题和内容

检查发送的结果

打开数据库,检查device_report表是否成功。成功应下图所示:

MQTT客户端实战:MQTT客户端和MQTT代理进行连接通信_客户端_06

实操完成

上一篇:基于 LangChain 的自动化测试用例的生成与执行


下一篇:互联网运维工作规划