为保证系统的稳定可靠运行,必须对输入的数据进行严格验证,防止一些非法的异常数据引发系统后续处理流程出错甚至崩溃。同时,对于验证失败的情况,需要输出明确的、友好的错误信息,以便对接方开发和调试,以及上线后运行异常排查。
验证工作主要包括以下内容:
1.验证消息对象的属性,如是否为空、格式是否正确,属于基础验证工作
2.验证消息是否曾经接收过,如是,则不进行后续处理,该点是保证幂等性,对消息进行去重的关键处理
3.验证消息主题编码,是否存在以及是否可用
4.验证应用编码,是否存在以及是否可用
5.验证权限,确保该应用由该消息主题的功能权限
6.验证时效性,约定消息收到时间不能与请求时间相差10分钟以上,否则拒绝服务
此外,对于请求消息的数据验证,还有个特殊点影响到流程处理,即上文提到过的,请求消息的接收方,无论是客户端还是服务端,收到重复消息时,除了要停止后续处理外,还需要发送一个响应给请求方,有两种方案,一种是抛出异常,告知请求方该消息曾经接收过;二是告知请求方,该消息已处理成功。前一种方案会造成消息日志表中该请求消息的响应结果是异常,而真实情况是该消息已经处理成功,因此采用后一种方案更佳。
按照代码复用原则,公用验证代码放到了消息处理器的抽象父类MessageHandler中
/**
* 验证消息属性
*
* @param message 消息
*/
protected void validateProperty(BaseMessage message) {
String errorCode;
String errorMessage;
// id
if (com.baomidou.mybatisplus.core.toolkit.StringUtils.isBlank(message.getId())){
errorCode = "S001";
errorMessage = "消息标识不能为空";
throw new MessageException(errorCode, errorMessage);
}
// 主题
if (com.baomidou.mybatisplus.core.toolkit.StringUtils.isBlank(message.getTopic())) {
errorCode = "S002";
errorMessage = "消息主题不能为空";
throw new MessageException(errorCode, errorMessage);
}
// 发布者
if (com.baomidou.mybatisplus.core.toolkit.StringUtils.isBlank(message.getPublishAppCode())) {
errorCode = "S003";
errorMessage = "消息发布者不能为空";
throw new MessageException(errorCode, errorMessage);
}
// 发布时间
String publishTimeString = message.getPublishTime();
if (com.baomidou.mybatisplus.core.toolkit.StringUtils.isBlank(publishTimeString)) {
errorCode = "S004";
errorMessage = "发布时间不能为空";
throw new MessageException(errorCode, errorMessage);
} else if (ValidateUtil.dateIsNotFormat(publishTimeString))
{
errorCode = "S005";
errorMessage = "发布时间格式不正确";
throw new MessageException(errorCode, errorMessage);
}
}
/**
* 验证权限
*
* @param publishAppCode 应用程序编码
* @param topicCode 主题编码
*/
protected void validatePermission(String publishAppCode, String topicCode) {
boolean hasPermission = apiMessagePermissionService.checkPermission(publishAppCode, topicCode);
if(hasPermission==false){
throw new MessageException("301", "应用无权限");
}
}
/**
* 验证时效性
*
* @param publishTimeString 发布时间字符串
*/
protected void validatePublishTimeValid(String publishTimeString) {
// 数据验证环节已验证可转换,此处不再处理转换异常
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date publishTime = null;
try {
publishTime = dateFormat.parse(publishTimeString);
} catch (ParseException e) {
// 前序环节已验证过日期格式,此处不会抛出异常,仅为编译通过
}
// 获取系统当前时间
Date currentTime = new Date();
// 比较时间
long diff = Math.abs(currentTime.getTime() - publishTime.getTime());
//允许最大的时间差,单位毫秒
int maxTimeSpan = 10*60*1000;
if (diff > maxTimeSpan)
{
// 请求时间超出合理范围(10分钟)
throw new MessageException("S401", "发布时间超出合理范围");
}
}
/**
* 验证应用
*
* @param publishAppCode 应用编码
*/
protected void validateAppCode(String publishAppCode) {
try {
ApiApp app =apiAppService.getByCode(publishAppCode);
if(app.getStatus().equals(StatusEnum.DEAD.name())){
throw new MessageException("S202", "应用被停用");
}
}catch (Exception ex){
throw new MessageException("S201", "应用标识无效");
}
}
/**
* 验证主题编码
*
* @param topicCode 主题编码
*/
protected void validateTopic(String topicCode) {
try {
ApiMessageTopic messageTopic = apiMessageTopicService.getByCode(topicCode);
if(messageTopic.getStatus().equals(StatusEnum.DEAD.name())){
throw new MessageException("S102", "消息主题不可用");
}
}catch (Exception ex){
throw new MessageException("S101", "消息主题不存在");
}
}
/**
* 检测消息是否曾经接收过(用于去重),默认返回false,由子类覆写
* @param id 消息标识
* @return true 是 false 否
*/
protected boolean checkIfReceived(String id) {
return false;
}
请求消息处理器RequestMessageHandler继承于MessageHandler,主要是对处理流程和方法调用
/**
* 消息处理
*
* @param message 消息
* @param channel 通道
*/
public void handleMessage(RequestMessage requestMessage, Channel channel) {
//验证消息属性
validateProperty(requestMessage);
//验证消息是否曾经接收过,如是,发送成功响应,不进行后续处理
boolean hasReceived = checkIfReceived(requestMessage.getId());
if(hasReceived){
//发送响应
sendResponse(channel, MessageResponseResultEnum.SUCCESS.name(), "", requestMessage.getId(),
requestMessage.getTopic());
return;
}
// 消息主题验证(是否存在及是否可用)
validateTopic(requestMessage.getTopic());
// 应用验证(是否存在及是否可用)
validateAppCode(requestMessage.getPublishAppCode());
// 权限验证
validatePermission(requestMessage.getPublishAppCode(),requestMessage.getTopic());
// 时效性验证
validatePublishTimeValid(requestMessage.getPublishTime());
//将请求消息状态默认设置为无需发送
requestMessage.setStatus(MessageStatus.NOT_TO_REQUEST.name());
// 记录消息请求日志
saveLog(requestMessage);
//特殊处理
messageOperation(requestMessage, channel);
//发送响应
sendResponse(channel, MessageResponseResultEnum.SUCCESS.name(), "", requestMessage.getId(),
requestMessage.getTopic());
//消息处理(复制及转发)
repostMessage(requestMessage);
}
/**
* 验证请求消息是否接收过
*/
@Override
protected boolean checkIfReceived(String id) {
return apiMessageLogService.checkRequestMessageExist(id);
}
响应消息处理器ResponseMessageHandler继承于MessageHandler,逻辑简单一些
/**
* 消息处理
*
* @param message 消息
* @param channel 通道
*/
public void handleMessage(ResponseMessage responseMessage, Channel channel) {
//验证消息属性
validateProperty(responseMessage);
//验证消息是否曾经接收过,如是,不进行后续处理
boolean hasReceived = checkIfReceived(responseMessage.getId());
if(hasReceived){
return;
}
// 消息主题验证(是否存在及是否可用)
validateTopic(responseMessage.getTopic());
// 应用验证(是否存在及是否可用)
validateAppCode(responseMessage.getPublishAppCode());
// 权限验证
validatePermission(responseMessage.getPublishAppCode(),responseMessage.getTopic());
// 时效性验证
validatePublishTimeValid(responseMessage.getPublishTime());
// 更新消息日志
updateLog(responseMessage);
//特殊处理
messageOperation(responseMessage, channel);
}
/**
* 验证响应消息是否接收过
*/
@Override
protected boolean checkIfReceived(String id) {
return apiMessageLogService.checkResponseMessageExist(id);
}
注意上面两个处理器,验证消息是否接收过checkIfReceived的方法实现是不同的,取的消息属性的属性不是同一个。