开发者社区> 凯京技术团队> 正文

kkbida - 开源消息投递中间件详细解析

简介: kkbida - 开源消息投递中间件详细解析 项目简介 kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida 快速开始 从gitee拉取代码 git clone https://gitee.
+关注继续查看

kkbida - 开源消息投递中间件详细解析

项目简介

kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida

快速开始

从gitee拉取代码

git clone https://gitee.com/kekingcn/kkbida

准备一个mysql数据库和redis,修改kk-callcenter-main模块中的 application.properties 配置文件的连接信息

使用maven构建

cd kkbida
mvn clean install -DskipTests

运行

cd kkbida/kk-callcenter-main/target
java -jar kk-callcenter-main-1.0.0-SNAPSHOT.jar

浏览器访问 http://127.0.0.1:8080 即可 用户名/密码:admin/admin

功能介绍

kkbida目前提供首页统计图表历史任务待处理任务失败通知管理四个功能模块

  • 首页统计图表可查看消息总数、成功、失败数和最近一周的每天的统计数据
  • 历史任务提供查看历史任务记录和执行情况功能,对于执行失败的任务,提供手动补偿执行功能
  • 待处理任务提供查看队列中待处理的任务记录情况(包括距下次执行还剩多少时间),同时提供直接手动提前执行功能
  • 失败通知管理即webhook通知,当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知,支持包括钉钉、企业微信等各种带webhook功能的平台,详细配置见:https://gitee.com/kekingcn/kkbida#%E5%9B%9E%E8%B0%83%E5%A4%B1%E8%B4%A5%E9%80%9A%E7%9F%A5

原理解析

系统架构详见:https://gitee.com/kekingcn/kkbida#%E7%B3%BB%E7%BB%9F%E6%9E%B6%E6%9E%84 的说明和架构图

image

下面从代码总体结构sdk任务队列处理webhook四个方面详细解析

1. 代码总体结构

从gitee拉取代码使用idea导入后,看到代码结构如下

image

项目分为三个模块kk-callcenter-mainkk-callcenter-sdkthird-party

2. sdk

如下图所示,kk-callcenter-sdk提供一个CallBackService接口和一个CallBackServiceHttpImpl http 实现类(另外服务端默认提供了dubbo实现,接入时,只需要使用和服务端同一个redis注册中心,@Reference CallBackService就可自动注入),同时提供一个 CallBackTask model类作为任务参数,并提供在任务失败的时候抛出的CallBackException异常类

image

使用详情见 https://gitee.com/kekingcn/kkbida#%E8%8E%B7%E5%8F%96callbackservice%E5%AF%B9%E8%B1%A1https://gitee.com/kekingcn/kkbida#%E8%B0%83%E7%94%A8callbackservice%E5%8F%91%E8%B5%B7%E5%9B%9E%E8%B0%83

同时,如果需要使用其他方式接入,只需要自行实现CallBackService接口就可以

3. 任务队列处理

查看kk-callcenter-sdk模块下TaskService任务处理类源码TaskDueueHadler队列处理类源码

TaskService 类提供如下两个方法

/**
* 添加延迟任务
* @param task
* @param delay
* @param timeUnit
*/
public void addDelayTask(CallBackTask task, Long delay, TimeUnit timeUnit){
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);
    delayedQueue.offer(task,delay,timeUnit);
}

/**
* 添加即时任务
* @param task
*/
public void addTask(CallBackTask task){
    task.setTaskId(UUID.randomUUID().toString());
    task.setCreateDate(new Timestamp(System.currentTimeMillis()));
    task.setLastModifiedDate(new Timestamp(System.currentTimeMillis()));
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    blockingQueue.addAsync(task);
}

分别为添加延迟任务和添加即时任务

  • 当收到任务消息传入时,会调用addTask方法向redis队列添加一条即时任务
  • 当任务处理失败时,会调用addDelayTask方法向reids队列添加一条延时任务

TaskDueueHadler类通过两个线程不停地从redis队列取数据,并调用执行任务方法,如果执行失败会调用TaskServiceaddDelayTask,将任务重新放入队列,延时执行,直到成功或到达最大失败次数。如果到达最大失败次数会调用Webhook执行组件向配置的所有失败提醒地址推送提醒消息

4. webhook

当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知

查看kk-callcenter-sdk模块下TaskService任务处理类源码

/**
 * @auther: chenjh
 * @time: 2019/3/6 14:44
 * @description
 */
@Component
public class WebHookClientComponent {

    private HttpClient httpclient = HttpClients.createDefault();

    private static final String DINGDING_WEBHOOK_URL_PREFIX = "https://oapi.dingtalk.com";

    public SendResult send(WebHookPO webhook, String msgContent) throws IOException, WebHookException {
        if (webhook == null || msgContent.trim().isEmpty()) {
            throw new WebHookException("webhook对象或消息对象为空");
        }
        if (!RequestMethodEnum.POST.equals(webhook.getRequestMethod())) {
            throw new WebHookException("暂只支持POST请求");
        }
        HttpPost httppost = new HttpPost(webhook.getUrl());
        httppost.addHeader("Content-Type", webhook.getContentType());
        StringEntity se = new StringEntity(msgContent, "utf-8");
        httppost.setEntity(se);
        SendResult sendResult = new SendResult();
        HttpResponse response = httpclient.execute(httppost);
        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
            //钉钉消息,判断返回码
            if (webhook.getUrl().toLowerCase().startsWith(DINGDING_WEBHOOK_URL_PREFIX)) {
                String result = EntityUtils.toString(response.getEntity());
                JSONObject obj = JSONObject.parseObject(result);
                Integer errcode = obj.getInteger("errcode");
                sendResult.setErrorCode(errcode);
                sendResult.setErrorMsg(obj.getString("errmsg"));
                sendResult.setIsSuccess(errcode.equals(0));
            } else {
                sendResult.setIsSuccess(true);
            }
        } else {
            sendResult.setIsSuccess(false);
        }
        return sendResult;
    }
}

通知发送组件会按失败通知(webhook)配置的参数,封装http请求,发送请求,完成webhook通知成功与否判定

结语

读完本文,相信您已经了解kkbida应该如何使用,并了解其主要的内部设计与实现。希望kkbida能帮助您简化系统间消息通知,助力您提升开发效率。如果您对kkbida项目有好的建议与意见,欢迎到issue区讨论,同时欢迎开发者提交pr参与项目贡献

作者简介

陈精华,2018年8月加入凯京科技。kkbida项目开发者,任职于凯京研发中心架构组,参与凯京科技统一支付平台、结算平台设计和开发,负责凯京科技开源中间件开发与维护。

欢迎加入凯京开源技术QQ群:613025121,和我们一起交流互联网应用的技术架构落地实践

image

关于架构&运维部

凯京研发中心架构&运维部的工作主要分两大部分,架构组主要负责框架中间件的研究,如dubbo、apollo、skywalking、xxljob、分布式事务等、公司内开源项目以及公共服务公共组件的研发维护、新技术的引进以及落地等。运维组主要负责devops系统研发以及k8s容器环境的维护等工作。

架构组招聘

目前架构组还有两个虚位以待,欢迎志同道合的你来和我们一起交流。简历可发送至邮箱:
chenkailing@keking.cn

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
18716 0
【Android 异步操作】Handler 机制 ( Android 提供的 Handler 源码解析 | Handler 构造与消息分发 | MessageQueue 消息队列相关方法 )
【Android 异步操作】Handler 机制 ( Android 提供的 Handler 源码解析 | Handler 构造与消息分发 | MessageQueue 消息队列相关方法 )
15 0
开源项目推荐:物联网消息引擎EMQ X/大数据平台TDengine/可视化平台Grafana
开源项目推荐:物联网消息引擎EMQ X/大数据平台TDengine/可视化平台Grafana
108 0
Apache ShardingSphere:由开源驱动的分布式数据库中间件生态
2021 年 7 月 21 日 2021 亚马逊云科技中国峰会现场,SphereEx 联合创始人、Apache ShardingSphere PMC 潘娟受邀参与此次峰会,以《Apache ShardingSphere 分布式数据库中间件开源生态构建》为主题,围绕开源理念扩散、社区建设、ShardingSphere 如何践行 Apache Way 等方面展开了介绍,本文总结自潘娟内容分享。
104 0
kkbida - 开源消息投递中间件详细解析
kkbida - 开源消息投递中间件详细解析 项目简介 kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida 快速开始 从gitee拉取代码 git clone https://gitee.
1291 0
【直播预告】:Java Spring Boot开发实战系列课程【第11讲】:消息中间件 RabbitMQ 与api源码解析
mq消息中间件在高并发系统架构中扮演关键角色,阿里双11高并发使用了mq技术。本次课程一起学习最新Java Spring Boot 2.0、RabbitMQ中间件的最新特性与实战应用,同样会分析核心api源码。
1325 0
Java编程架构深入解析-RPC 消息协议设计
本节我们开始讲解 RPC 的消息协议设计背后的基本原理,了解 RPC 的协议开发背后有哪些需要考虑的基本点。在通晓原理之后,我们就可以自己设计一套协议来开发属于自己的 RPC 系统。
1318 0
📊📈🦋🦋🦋 iOS 图表框架 AAChartKit ---强大、精美、易用的开源 iOS 图表
AAChartKit 项目,是AAInfographics的 Objective-C 语言版本,是在流行的开源前端图表框架Highcharts的基础上,封装的面向对象的,一组简单易用,极其精美的图表绘制控件.
2310 0
SpringMVC源码剖析5:消息转换器HttpMessageConverter与@ResponseBody注解
SpringMVC关于json、xml自动转换的原理研究[附带源码分析] 目录 前言 现象 源码分析 实例讲解 关于配置 总结 参考资料 前言 SpringMVC是目前主流的Web MVC框架之一。
1755 0
蚂蚁集团自研数据库 OceanBase 宣布正式开源 并成立开源社区
6月1日, 蚂蚁集团自研数据库 OceanBase 宣布正式开源,并成立 OceanBase 开源社区,社区官网同步上线。
721 0
+关注
凯京技术团队
我们是凯京科技技术团队。技术有无穷的魅力,我们不满足当前的现状,脚踏实地锐意进取,定能触碰到星空
24
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载