产品管理服务--连接EMQ X

文章目录

简介

从EMQ X中获取消息的方式比较多

  • 购买EMQ X的企业版,包括很多消息流转模块;

  • 创建个EMQ X超级用户,订阅所有的消息事件;

  • 使用规则引擎;

  • 移植EMQ X的kafaka插件;

  • 通过WebHook插件获取消息。

    产品管理服务--连接EMQ X

本文通过WebHook插件获取消息。设备的所有事件会通过webhook发送到产品服务器。

产品管理服务--连接EMQ X

WebHook介绍

WebHook 是由 emqx_web_hook (opens new window)插件提供的 将 EMQ X 中的钩子事件通知到某个 Web 服务的功能。

可以理解为EMQ X创建了一个客户端,这个客户端可以收集设备的在线、下下线记录、订阅与消息存储、消息送达确认等事件消息,通过钩子上挂载回调函数将事件发送到web器。

    Client      |    EMQ X     |  emqx_web_hook |   HTTP       +------------+
  =============>| - - - - - - -> - - - - - - - ->===========>  | Web Server |
                |    Broker    |                |  Request     +------------+
WebHook消息是单向的。

Webhook配置

web.hook.url

Type Value
string http://192.168.31.216:9200/emqx/webhook

Webhook 请求转发的目的 Web 服务器地址。

web.hook.headers.

web.hook.headers.content-type = application/json
web.hook.headers.accept = */*
web.hook.headers.webhook-username = makerknz
web.hook.headers.webhook-password = 123456

指定 HTTP 请求头部中的数据。<Key> 指定 HTTP 请求头部中的字段名,此配置项的值为相应的字段值。<Key> 可以是标准的 HTTP 请求头部字段,也可以自定义的字段,可以配置多个不同的请求头部字段。

webhook-username和webhook-password 作为访问后端的凭据,当emq x是一个集群时用来区别不同的服务器。当然认证也可以通过添加证书进行SSL认证,这部分感兴趣的同学可以自己探索。

触发规则

etc/plugins/emqx_web_hooks.conf 可配置触发规则,其配置的格式如下:

## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>

## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish"}

项目中会设置全部转发到后端服务器,不设置topic过滤。

Event 触发事件

目前支持以下事件:

名称 说明 执行时机
client.connect 处理连接报文 服务端收到客户端的连接报文时
client.connack 下发连接应答 服务端准备下发连接应答报文时
client.connected 成功接入 客户端认证完成并成功接入系统后
client.disconnected 连接断开 客户端连接层在准备关闭时
client.subscribe 订阅主题 收到订阅报文后,执行 client.check_acl 鉴权前
client.unsubscribe 取消订阅 收到取消订阅报文后
session.subscribed 会话订阅主题 完成订阅操作后
session.unsubscribed 会话取消订阅 完成取消订阅操作后
message.publish 消息发布 服务端在发布(路由)消息前
message.delivered 消息投递 消息准备投递到客户端前
message.acked 消息回执 服务端在收到客户端发回的消息 ACK 后
message.dropped 消息丢弃 发布出的消息被丢弃后

Number

同一个事件可以配置多个触发规则,配置相同的事件应当依次递增。

Rule

触发规则,其值为一个 JSON 字符串,其中可用的 Key 有:

  • action:字符串,取固定值
  • topic:字符串,表示一个主题过滤器,操作的主题只有与该主题匹配才能触发事件的转发

例如,我们只将与 a/b/cfoo/# 主题匹配的消息转发到 Web 服务器上,其配置应该为:

web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}  

这样 Webhook 仅会转发与 a/b/cfoo/# 主题匹配的消息,例如 foo/bar 等,而不是转发 a/b/dfo/bar

WebFlux 介绍

WebFlux是Spring中的异步非阻塞的响应式web框架,EMQ X的设备有12种事件类型,对于单机安装的EMQ X并发连接11万会产品webhook的请求,后端服务器是同步MVC会导致请求堆积,使服务处理逻辑。

如果通过web.hook.pool_size 配置连接池数,又会导致数据的延迟。

产品管理服务--连接EMQ X

MVC JDBC是一个同步请求的过程,每一次请求都会在一个线程内执行,等一次请求完成之后才能释放资源,如果在service方法中业务逻辑如果碰到io操作时间比较长的操作,这样这个service方法就会长时间占用tomcat容器线程池中的线程,这样是不利于其他请求的处理的,当线程池中的线程处理任务时,任务由于长时间io操作,肯定会阻塞线程处理其他任务。

WebFlux R2DBC是一个异步非阻塞的请求过程,此处会涉及大量的EMQX的请求,如果请求堆积,可以通过设置背压调整每次处理的数量来达到对请求的削峰处理。

## WebHook接口设计

增加WebFlux接口

添加依赖包

添加包WebFlux使用的包。r2dbc-mysql用来连接数据库。

    implementation('org.springframework.boot:spring-boot-starter-webflux')
    implementation('org.springframework.boot:spring-boot-starter-data-r2dbc')
    implementation('dev.miku:r2dbc-mysql')

配置

spring:
  r2dbc:
    url: r2dbcs:mysql://www.makerknz.cn:3306/product_manager?characterEncoding=utf-8&useSSL=false
    username: product_manager
    password: 123456

添加repository

WebHook目前设计只对外开放一个接口,仅用到device_events和device两张表,所以只写这两张表的repository即可。

DeviceEventsRepository

package cn.makerknz.product.server.repository;

import cn.makerknz.product.server.entity.DeviceEvents;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

public interface DeviceEventsRepository extends ReactiveCrudRepository<DeviceEvents, Integer> {
}

DeviceRepository

package cn.makerknz.product.server.repository;

import cn.makerknz.product.server.entity.Device;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Mono;

public interface DeviceRepository extends ReactiveCrudRepository<Device, Integer> {

    Mono<Device> findByClientId(String clientId);

}

service实现

在DeviceEventsServiceImpl增加实现

package cn.makerknz.product.server.service.impl;

import cn.makerknz.product.server.entity.DeviceEvents;
import cn.makerknz.product.server.mapper.DeviceEventsMapper;
import cn.makerknz.product.server.repository.DeviceEventsRepository;
import cn.makerknz.product.server.service.IDeviceEventsService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author maker knz
 * @since 2021-10-27
 */

@Service
public class DeviceEventsServiceImpl extends ServiceImpl<DeviceEventsMapper, DeviceEvents> implements IDeviceEventsService {

    @Autowired
    private DeviceEventsRepository deviceEventsRepository;

    @Override
    public Mono<DeviceEvents> add(DeviceEvents deviceEvents) {
        return deviceEventsRepository.save(deviceEvents);
    }

}

在DeviceServiceImpl中增加实现

package cn.makerknz.product.server.service.impl;

import cn.makerknz.product.server.entity.Device;
import cn.makerknz.product.server.mapper.DeviceMapper;
import cn.makerknz.product.server.repository.DeviceRepository;
import cn.makerknz.product.server.service.IDeviceService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author maker knz
 * @since 2021-10-27
 */
@Service
public class DeviceServiceImpl extends ServiceImpl<DeviceMapper, Device> implements IDeviceService {

    @Autowired
    private DeviceRepository deviceRepository;

    @Override
    public Mono<Device> findByClientId(String clientId) {
        return deviceRepository.findByClientId(clientId);
    }
}

对外接口

package cn.makerknz.product.server.controller;

import cn.makerknz.product.server.annotation.CheckEmqxWebhookIdentity;
import cn.makerknz.product.server.domain.enums.ResponseEnum;
import cn.makerknz.product.server.domain.form.EmqxWebhookForm;
import cn.makerknz.product.server.domain.vo.ResultVO;
import cn.makerknz.product.server.entity.DeviceEvents;
import cn.makerknz.product.server.service.IDeviceEventsService;
import cn.makerknz.product.server.service.IDeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.UUID;

@RestController
@RequestMapping("/emqx")
public class EmqxController {

    @Autowired
    private IDeviceEventsService deviceEventsService;

    @Autowired
    private IDeviceService deviceService;

    @CheckEmqxWebhookIdentity
    @PostMapping("/webhook")
    public Mono<ResultVO<Object>> webhook(@RequestBody EmqxWebhookForm emqxWebhookForm) {

        return deviceService.findByClientId(emqxWebhookForm.getClientid()).flatMap(e -> {
            DeviceEvents deviceEvents = DeviceEvents.builder()
                    .productId(e.getProductId())
                    .data(emqxWebhookForm.toString())
                    .deviceId(e.getId())
                    .eventAction(emqxWebhookForm.getAction())
                    .streamId(UUID.randomUUID().toString())
                    .topic(emqxWebhookForm.getTopic())
                    .dataType(1)
                    .build();
            return deviceEventsService.add(deviceEvents).then(Mono.just(ResultVO.success()));
        }).defaultIfEmpty(ResultVO.error(ResponseEnum.ERROR));
    }

}

Webhook接口权限

原因

webhook接口如果不做权限限制任何人都可以访问,可能会导致数据恶意添加。

配置

对于分布式的EMQ X 每台可以配置不同的账户和密码

emqx:
  webhook:
    users:
      makerknz: 1234567
      makerknz_1: 12313123

读取配置

package cn.makerknz.product.server.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@EnableConfigurationProperties
@Configuration
@ConfigurationProperties(prefix = "emqx.webhook")
@Data
public class EmqxConfig {

    private Map<String,String> users;

}

增加切面注解

package cn.makerknz.product.server.annotation;

import java.lang.annotation.*;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD })
public @interface CheckEmqxWebhookIdentity {
}

Webhook接口认证实现

package cn.makerknz.product.server.auth;

import cn.makerknz.product.server.config.EmqxConfig;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;

/**
 * @Author: maker_knz
 * @Date: 2021/6/1/001 10:05
 * @Version 1.0
 */

@Aspect
@Component
public class EmqxWebhookIdentityAspect {

    @Autowired
    private EmqxConfig emqxConfig;

    /**
     * 切面验证webhook接口是否可以访问
     * @param point
     * @return
     */
    @Around("@annotation(cn.makerknz.product.server.annotation.CheckEmqxWebhookIdentity)")
    public Object checkLogin(ProceedingJoinPoint point) {

        try {
            HttpServletRequest request = this.getHttpServletRequest();

            // 1.从header中获取username和passowrd
            String username = request.getHeader("webhook-username");
            String password = request.getHeader("webhook-password");

            // 2.验证账户的有效性
            String userPassword = emqxConfig.getUsers().get(username);
            if (!userPassword.equals(password)) {
                throw new SecurityException("没有权限使用WebHook");
            }

            return point.proceed();
        } catch (Throwable throwable) {
            throw new SecurityException("WebHook配置错误");
        }
    }

    /**
     * 从请求头中获取HttpServletRequest
     * @return
     */
    private HttpServletRequest getHttpServletRequest() {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) requestAttributes;
        return servletRequestAttributes.getRequest();
    }

}

Webhook修改配置

docker ps
docker exec -it c46cb71330f1 bash
cd /opt/emqx/etc/plugins
vi vi emqx_web_hook.conf
exit
docker restart c46cb71330f1

修改配置文件为

##====================================================================
## WebHook
##====================================================================

## Webhook URL
##
## Value: String
web.hook.url = http://192.168.31.97:9200/emqx/webhook

## HTTP Headers
##
## Example:
## 1. web.hook.headers.content-type = application/json
## 2. web.hook.headers.accept = *
##
## Value: String
web.hook.headers.content-type = application/json

## 配置接口访问权限,不使用CA证书
web.hook.headers.webhook-username = makerknz
web.hook.headers.webhook-password = 123456

## The encoding format of the payload field in the HTTP body
## The payload field only appears in the on_message_publish and on_message_delivered actions
##
## Value: plain | base64 | base62
web.hook.body.encoding_of_payload_field = plain

##-----------------------------使用https请求---------------------------------------
## PEM format file of CA's
##
## Value: File
## web.hook.ssl.cacertfile  = <PEM format file of CA's>

## Certificate file to use, PEM format assumed
##
## Value: File
## web.hook.ssl.certfile = <Certificate file to use>

## Private key file to use, PEM format assumed
##
## Value: File
## web.hook.ssl.keyfile = <Private key file to use>

## Turn on peer certificate verification
##
## Value: true | false
## web.hook.ssl.verify = false

## If not specified, the server's names returned in server's certificate is validated against
## what's provided `web.hook.url` config's host part.
## Setting to 'disable' will make EMQ X ignore unmatched server names.
## If set with a host name, the server's names returned in server's certificate is validated
## against this value.
##
## Value: String | disable
## web.hook.ssl.server_name_indication = disable

## Connection process pool size
##
## Value: Number
## HTTP 连接进程池大小。
web.hook.pool_size = 32

## Whether to enable HTTP Pipelining
##
## See: https://en.wikipedia.org/wiki/HTTP_pipelining
web.hook.enable_pipelining = true

##--------------------------------------------------------------------
## Hook Rules
## These configuration items represent a list of events should be forwarded
##
## Format:
##   web.hook.rule.<HookName>.<No> = <Spec>
## 有客户端连接时触发
web.hook.rule.client.connect.1       = {"action": "on_client_connect"}
## EMQ X下发连接应答
web.hook.rule.client.connack.1       = {"action": "on_client_connack"}
## 客户端成功接入
web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
## 客户端断开连接
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}
## 客户端订阅事件
web.hook.rule.client.subscribe.1     = {"action": "on_client_subscribe"}
## 客户端取消订阅事件
web.hook.rule.client.unsubscribe.1   = {"action": "on_client_unsubscribe"}
## EMQ X确认订阅事件
web.hook.rule.session.subscribed.1   = {"action": "on_session_subscribed"}
## EMQ X确认取消订阅事件
web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
## 会话终止
web.hook.rule.session.terminated.1   = {"action": "on_session_terminated"}
## 发布消息
web.hook.rule.message.publish.1      = {"action": "on_message_publish"}
## 消息投递成功
web.hook.rule.message.delivered.1    = {"action": "on_message_delivered"}
## 消息已应答
web.hook.rule.message.acked.1        = {"action": "on_message_acked"}

测试

接口测试

权限测试

总结

这里使用WebFlux可以很好的起到消息中间件的作用,这也是为什么不使用kafka作为消息件的原因。如果使用消息订阅的方式,后面对MQTT消息处理是一件复杂的过程。

上一篇:C/C++游戏项目详细教程:《黄金矿工》


下一篇:Java Shutdown Hook 场景使用和源码分析