仿淘宝开放平台之消息服务——服务端消息验证

为保证系统的稳定可靠运行,必须对输入的数据进行严格验证,防止一些非法的异常数据引发系统后续处理流程出错甚至崩溃。同时,对于验证失败的情况,需要输出明确的、友好的错误信息,以便对接方开发和调试,以及上线后运行异常排查。

验证工作主要包括以下内容:
1.验证消息对象的属性,如是否为空、格式是否正确,属于基础验证工作
2.验证消息是否曾经接收过,如是,则不进行后续处理,该点是保证幂等性,对消息进行去重的关键处理
3.验证消息主题编码,是否存在以及是否可用
4.验证应用编码,是否存在以及是否可用
5.验证权限,确保该应用由该消息主题的功能权限
6.验证时效性,约定消息收到时间不能与请求时间相差10分钟以上,否则拒绝服务

此外,对于请求消息的数据验证,还有个特殊点影响到流程处理,即上文提到过的,请求消息的接收方,无论是客户端还是服务端,收到重复消息时,除了要停止后续处理外,还需要发送一个响应给请求方,有两种方案,一种是抛出异常,告知请求方该消息曾经接收过;二是告知请求方,该消息已处理成功。前一种方案会造成消息日志表中该请求消息的响应结果是异常,而真实情况是该消息已经处理成功,因此采用后一种方案更佳。

按照代码复用原则,公用验证代码放到了消息处理器的抽象父类MessageHandler中

 /**
     * 验证消息属性
     *
     * @param message 消息
     */
    protected void validateProperty(BaseMessage message) {

        String errorCode;
        String errorMessage;
        // id
        if (com.baomidou.mybatisplus.core.toolkit.StringUtils.isBlank(message.getId())){

            errorCode = "S001";
            errorMessage = "消息标识不能为空";
            throw new MessageException(errorCode, errorMessage);
        }
        // 主题
        if (com.baomidou.mybatisplus.core.toolkit.StringUtils.isBlank(message.getTopic())) {

            errorCode = "S002";
            errorMessage = "消息主题不能为空";
            throw new MessageException(errorCode, errorMessage);
        }
        // 发布者
        if (com.baomidou.mybatisplus.core.toolkit.StringUtils.isBlank(message.getPublishAppCode())) {

            errorCode = "S003";
            errorMessage = "消息发布者不能为空";
            throw new MessageException(errorCode, errorMessage);
        }

        // 发布时间
        String publishTimeString = message.getPublishTime();
        if (com.baomidou.mybatisplus.core.toolkit.StringUtils.isBlank(publishTimeString)) {
            errorCode = "S004";
            errorMessage = "发布时间不能为空";
            throw new MessageException(errorCode, errorMessage);

        } else if (ValidateUtil.dateIsNotFormat(publishTimeString))
        {
            errorCode = "S005";
            errorMessage = "发布时间格式不正确";
            throw new MessageException(errorCode, errorMessage);

        }
    }


    /**
     * 验证权限
     *
     * @param publishAppCode 应用程序编码
     * @param topicCode          主题编码
     */
    protected void validatePermission(String publishAppCode, String topicCode) {

        boolean hasPermission = apiMessagePermissionService.checkPermission(publishAppCode, topicCode);
        if(hasPermission==false){
            throw new MessageException("301", "应用无权限");
        }

    }

    /**
     * 验证时效性
     *
     * @param publishTimeString 发布时间字符串
     */
    protected void validatePublishTimeValid(String publishTimeString) {


        // 数据验证环节已验证可转换,此处不再处理转换异常
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date publishTime = null;
        try {
            publishTime = dateFormat.parse(publishTimeString);
        } catch (ParseException e) {
            // 前序环节已验证过日期格式,此处不会抛出异常,仅为编译通过
        }

        // 获取系统当前时间
        Date currentTime = new Date();
        // 比较时间
        long diff = Math.abs(currentTime.getTime() - publishTime.getTime());
        //允许最大的时间差,单位毫秒
        int maxTimeSpan = 10*60*1000;
        if (diff  > maxTimeSpan)
        {
            // 请求时间超出合理范围(10分钟)
            throw new MessageException("S401", "发布时间超出合理范围");
        }
    }

    /**
     * 验证应用
     *
     * @param publishAppCode 应用编码
     */
    protected void validateAppCode(String publishAppCode) {
        try {
            ApiApp app =apiAppService.getByCode(publishAppCode);
            if(app.getStatus().equals(StatusEnum.DEAD.name())){
                throw new MessageException("S202", "应用被停用");

            }
        }catch (Exception ex){
            throw new MessageException("S201", "应用标识无效");
        }
    }

    /**
     * 验证主题编码
     *
     * @param topicCode 主题编码
     */
    protected void validateTopic(String topicCode) {
        try {
            ApiMessageTopic messageTopic = apiMessageTopicService.getByCode(topicCode);
            if(messageTopic.getStatus().equals(StatusEnum.DEAD.name())){
                throw new MessageException("S102", "消息主题不可用");
            }
        }catch (Exception ex){
            throw new MessageException("S101", "消息主题不存在");
        }

    }

    /**
     * 检测消息是否曾经接收过(用于去重),默认返回false,由子类覆写
     * @param id 消息标识
     * @return  true 是  false 否
     */
    protected boolean checkIfReceived(String  id) {
        return false;
    }




请求消息处理器RequestMessageHandler继承于MessageHandler,主要是对处理流程和方法调用

 /**
     * 消息处理
     *
     * @param message 消息
     * @param channel 通道
     */
    public void handleMessage(RequestMessage requestMessage, Channel channel) {


        //验证消息属性
        validateProperty(requestMessage);
        //验证消息是否曾经接收过,如是,发送成功响应,不进行后续处理
        boolean hasReceived = checkIfReceived(requestMessage.getId());
        if(hasReceived){
            //发送响应
            sendResponse(channel, MessageResponseResultEnum.SUCCESS.name(), "", requestMessage.getId(),
                    requestMessage.getTopic());
            return;
        }

        // 消息主题验证(是否存在及是否可用)
        validateTopic(requestMessage.getTopic());
        // 应用验证(是否存在及是否可用)
        validateAppCode(requestMessage.getPublishAppCode());
        // 权限验证
        validatePermission(requestMessage.getPublishAppCode(),requestMessage.getTopic());
        // 时效性验证
        validatePublishTimeValid(requestMessage.getPublishTime());

        //将请求消息状态默认设置为无需发送
        requestMessage.setStatus(MessageStatus.NOT_TO_REQUEST.name());
        // 记录消息请求日志
        saveLog(requestMessage);

        //特殊处理
        messageOperation(requestMessage, channel);

        //发送响应
        sendResponse(channel, MessageResponseResultEnum.SUCCESS.name(), "", requestMessage.getId(),
                requestMessage.getTopic());
        //消息处理(复制及转发)
        repostMessage(requestMessage);
    }


    /**
     * 验证请求消息是否接收过
     */
     @Override
    protected boolean checkIfReceived(String id) {
        return apiMessageLogService.checkRequestMessageExist(id);
    }

响应消息处理器ResponseMessageHandler继承于MessageHandler,逻辑简单一些


    /**
     * 消息处理
     *
     * @param message 消息
     * @param channel 通道
     */
    public void handleMessage(ResponseMessage responseMessage, Channel channel) {

        //验证消息属性
        validateProperty(responseMessage);
        //验证消息是否曾经接收过,如是,不进行后续处理
        boolean hasReceived = checkIfReceived(responseMessage.getId());
        if(hasReceived){

            return;
        }
        // 消息主题验证(是否存在及是否可用)
        validateTopic(responseMessage.getTopic());
        // 应用验证(是否存在及是否可用)
        validateAppCode(responseMessage.getPublishAppCode());
        // 权限验证
        validatePermission(responseMessage.getPublishAppCode(),responseMessage.getTopic());
        // 时效性验证
        validatePublishTimeValid(responseMessage.getPublishTime());

        // 更新消息日志
        updateLog(responseMessage);

        //特殊处理
        messageOperation(responseMessage, channel);
    }

    /**
     * 验证响应消息是否接收过
     */
     @Override
    protected  boolean checkIfReceived(String  id) {
        return apiMessageLogService.checkResponseMessageExist(id);
    }

注意上面两个处理器,验证消息是否接收过checkIfReceived的方法实现是不同的,取的消息属性的属性不是同一个。

上一篇:VMware vSphere Client的简单使用教程


下一篇:Java集合---Arrays类源码解析