kkbida - 开源消息投递中间件详细解析
项目简介
kkbida为凯京科技开源的消息投递中间件,谐音必达
,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida
快速开始
从gitee拉取代码
git clone https://gitee.com/kekingcn/kkbida
准备一个mysql数据库和redis,修改kk-callcenter-main模块中的 application.properties 配置文件的连接信息
使用maven构建
cd kkbida
mvn clean install -DskipTests
运行
cd kkbida/kk-callcenter-main/target
java -jar kk-callcenter-main-1.0.0-SNAPSHOT.jar
浏览器访问 http://127.0.0.1:8080 即可 用户名/密码:admin/admin
功能介绍
kkbida目前提供首页统计图表
、历史任务
、待处理任务
、失败通知管理
四个功能模块
-
首页统计图表
可查看消息总数、成功、失败数和最近一周的每天的统计数据 -
历史任务
提供查看历史任务记录和执行情况功能,对于执行失败的任务,提供手动补偿执行功能 -
待处理任务
提供查看队列中待处理的任务记录情况(包括距下次执行还剩多少时间),同时提供直接手动提前执行功能 -
失败通知管理
即webhook通知,当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知,支持包括钉钉、企业微信等各种带webhook功能的平台,详细配置见:https://gitee.com/kekingcn/kkbida#%E5%9B%9E%E8%B0%83%E5%A4%B1%E8%B4%A5%E9%80%9A%E7%9F%A5
原理解析
系统架构详见:https://gitee.com/kekingcn/kkbida#%E7%B3%BB%E7%BB%9F%E6%9E%B6%E6%9E%84 的说明和架构图
下面从代码总体结构
、sdk
、任务队列处理
、webhook
四个方面详细解析
1. 代码总体结构
从gitee拉取代码使用idea导入后,看到代码结构如下
项目分为三个模块kk-callcenter-main
、kk-callcenter-sdk
、third-party
-
kk-callcenter-main
为主服务,向外提供dubbo和http接口服务,并提供web管理页面功能 -
kk-callcenter-sdk
为提供给接入系统用的sdk包,通过sdk包能方便快速的向主服务发送消息任务,详细使用说明见 https://gitee.com/kekingcn/kkbida#sdk%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95 -
third-party
为klock
(同为凯京科技开源的分布式锁组件,详见:https://gitee.com/kekingcn/spring-boot-klock-starter )依赖
2. sdk
如下图所示,kk-callcenter-sdk
提供一个CallBackService
接口和一个CallBackServiceHttpImpl
http 实现类(另外服务端默认提供了dubbo实现,接入时,只需要使用和服务端同一个redis注册中心,@Reference CallBackService就可自动注入),同时提供一个 CallBackTask
model类作为任务参数,并提供在任务失败的时候抛出的CallBackException
异常类
使用详情见 https://gitee.com/kekingcn/kkbida#%E8%8E%B7%E5%8F%96callbackservice%E5%AF%B9%E8%B1%A1 和 https://gitee.com/kekingcn/kkbida#%E8%B0%83%E7%94%A8callbackservice%E5%8F%91%E8%B5%B7%E5%9B%9E%E8%B0%83
同时,如果需要使用其他方式接入,只需要自行实现CallBackService接口就可以
3. 任务队列处理
查看kk-callcenter-sdk
模块下TaskService
任务处理类源码 和TaskDueueHadler
队列处理类源码
TaskService
类提供如下两个方法
/**
* 添加延迟任务
* @param task
* @param delay
* @param timeUnit
*/
public void addDelayTask(CallBackTask task, Long delay, TimeUnit timeUnit){
RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);
delayedQueue.offer(task,delay,timeUnit);
}
/**
* 添加即时任务
* @param task
*/
public void addTask(CallBackTask task){
task.setTaskId(UUID.randomUUID().toString());
task.setCreateDate(new Timestamp(System.currentTimeMillis()));
task.setLastModifiedDate(new Timestamp(System.currentTimeMillis()));
RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
blockingQueue.addAsync(task);
}
分别为添加延迟任务和添加即时任务
- 当收到任务消息传入时,会调用
addTask
方法向redis队列添加一条即时任务 - 当任务处理失败时,会调用
addDelayTask
方法向reids队列添加一条延时任务
TaskDueueHadler
类通过两个线程不停地从redis队列取数据,并调用执行任务方法,如果执行失败会调用TaskService
的addDelayTask
,将任务重新放入队列,延时执行,直到成功或到达最大失败次数。如果到达最大失败次数会调用Webhook执行组件向配置的所有失败提醒地址推送提醒消息
4. webhook
当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知
查看kk-callcenter-sdk
模块下TaskService
任务处理类源码
/**
* @auther: chenjh
* @time: 2019/3/6 14:44
* @description
*/
@Component
public class WebHookClientComponent {
private HttpClient httpclient = HttpClients.createDefault();
private static final String DINGDING_WEBHOOK_URL_PREFIX = "https://oapi.dingtalk.com";
public SendResult send(WebHookPO webhook, String msgContent) throws IOException, WebHookException {
if (webhook == null || msgContent.trim().isEmpty()) {
throw new WebHookException("webhook对象或消息对象为空");
}
if (!RequestMethodEnum.POST.equals(webhook.getRequestMethod())) {
throw new WebHookException("暂只支持POST请求");
}
HttpPost httppost = new HttpPost(webhook.getUrl());
httppost.addHeader("Content-Type", webhook.getContentType());
StringEntity se = new StringEntity(msgContent, "utf-8");
httppost.setEntity(se);
SendResult sendResult = new SendResult();
HttpResponse response = httpclient.execute(httppost);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
//钉钉消息,判断返回码
if (webhook.getUrl().toLowerCase().startsWith(DINGDING_WEBHOOK_URL_PREFIX)) {
String result = EntityUtils.toString(response.getEntity());
JSONObject obj = JSONObject.parseObject(result);
Integer errcode = obj.getInteger("errcode");
sendResult.setErrorCode(errcode);
sendResult.setErrorMsg(obj.getString("errmsg"));
sendResult.setIsSuccess(errcode.equals(0));
} else {
sendResult.setIsSuccess(true);
}
} else {
sendResult.setIsSuccess(false);
}
return sendResult;
}
}
通知发送组件会按失败通知(webhook)配置的参数,封装http请求,发送请求,完成webhook通知成功与否判定
结语
读完本文,相信您已经了解kkbida应该如何使用,并了解其主要的内部设计与实现。希望kkbida能帮助您简化系统间消息通知,助力您提升开发效率。如果您对kkbida项目有好的建议与意见,欢迎到issue区讨论,同时欢迎开发者提交pr参与项目贡献
作者简介
陈精华,2018年8月加入凯京科技。kkbida项目开发者,任职于凯京研发中心架构组,参与凯京科技统一支付平台、结算平台设计和开发,负责凯京科技开源中间件开发与维护。
欢迎加入凯京开源技术QQ群:613025121,和我们一起交流互联网应用的技术架构落地实践
关于架构&运维部
凯京研发中心架构&运维部的工作主要分两大部分,架构组主要负责框架中间件的研究,如dubbo、apollo、skywalking、xxljob、分布式事务等、公司内开源项目以及公共服务公共组件的研发维护、新技术的引进以及落地等。运维组主要负责devops系统研发以及k8s容器环境的维护等工作。
架构组招聘
目前架构组还有两个虚位以待,欢迎志同道合的你来和我们一起交流。简历可发送至邮箱:
chenkailing@keking.cn