kkbida - 开源消息投递中间件详细解析

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: kkbida - 开源消息投递中间件详细解析项目简介kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida快速开始从gitee拉取代码git clone https://gitee.

kkbida - 开源消息投递中间件详细解析

项目简介

kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida

快速开始

从gitee拉取代码

git clone https://gitee.com/kekingcn/kkbida
AI 代码解读

准备一个mysql数据库和redis,修改kk-callcenter-main模块中的 application.properties 配置文件的连接信息

使用maven构建

cd kkbida
mvn clean install -DskipTests
AI 代码解读

运行

cd kkbida/kk-callcenter-main/target
java -jar kk-callcenter-main-1.0.0-SNAPSHOT.jar
AI 代码解读

浏览器访问 http://127.0.0.1:8080 即可 用户名/密码:admin/admin

功能介绍

kkbida目前提供首页统计图表历史任务待处理任务失败通知管理四个功能模块

  • 首页统计图表可查看消息总数、成功、失败数和最近一周的每天的统计数据
  • 历史任务提供查看历史任务记录和执行情况功能,对于执行失败的任务,提供手动补偿执行功能
  • 待处理任务提供查看队列中待处理的任务记录情况(包括距下次执行还剩多少时间),同时提供直接手动提前执行功能
  • 失败通知管理即webhook通知,当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知,支持包括钉钉、企业微信等各种带webhook功能的平台,详细配置见:https://gitee.com/kekingcn/kkbida#%E5%9B%9E%E8%B0%83%E5%A4%B1%E8%B4%A5%E9%80%9A%E7%9F%A5

原理解析

系统架构详见:https://gitee.com/kekingcn/kkbida#%E7%B3%BB%E7%BB%9F%E6%9E%B6%E6%9E%84 的说明和架构图

image

下面从代码总体结构sdk任务队列处理webhook四个方面详细解析

1. 代码总体结构

从gitee拉取代码使用idea导入后,看到代码结构如下

image

项目分为三个模块kk-callcenter-mainkk-callcenter-sdkthird-party

2. sdk

如下图所示,kk-callcenter-sdk提供一个CallBackService接口和一个CallBackServiceHttpImpl http 实现类(另外服务端默认提供了dubbo实现,接入时,只需要使用和服务端同一个redis注册中心,@Reference CallBackService就可自动注入),同时提供一个 CallBackTask model类作为任务参数,并提供在任务失败的时候抛出的CallBackException异常类

image

使用详情见 https://gitee.com/kekingcn/kkbida#%E8%8E%B7%E5%8F%96callbackservice%E5%AF%B9%E8%B1%A1https://gitee.com/kekingcn/kkbida#%E8%B0%83%E7%94%A8callbackservice%E5%8F%91%E8%B5%B7%E5%9B%9E%E8%B0%83

同时,如果需要使用其他方式接入,只需要自行实现CallBackService接口就可以

3. 任务队列处理

查看kk-callcenter-sdk模块下TaskService任务处理类源码TaskDueueHadler队列处理类源码

TaskService 类提供如下两个方法

/**
* 添加延迟任务
* @param task
* @param delay
* @param timeUnit
*/
public void addDelayTask(CallBackTask task, Long delay, TimeUnit timeUnit){
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);
    delayedQueue.offer(task,delay,timeUnit);
}

/**
* 添加即时任务
* @param task
*/
public void addTask(CallBackTask task){
    task.setTaskId(UUID.randomUUID().toString());
    task.setCreateDate(new Timestamp(System.currentTimeMillis()));
    task.setLastModifiedDate(new Timestamp(System.currentTimeMillis()));
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    blockingQueue.addAsync(task);
}
AI 代码解读

分别为添加延迟任务和添加即时任务

  • 当收到任务消息传入时,会调用addTask方法向redis队列添加一条即时任务
  • 当任务处理失败时,会调用addDelayTask方法向reids队列添加一条延时任务

TaskDueueHadler类通过两个线程不停地从redis队列取数据,并调用执行任务方法,如果执行失败会调用TaskServiceaddDelayTask,将任务重新放入队列,延时执行,直到成功或到达最大失败次数。如果到达最大失败次数会调用Webhook执行组件向配置的所有失败提醒地址推送提醒消息

4. webhook

当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知

查看kk-callcenter-sdk模块下TaskService任务处理类源码

/**
 * @auther: chenjh
 * @time: 2019/3/6 14:44
 * @description
 */
@Component
public class WebHookClientComponent {

    private HttpClient httpclient = HttpClients.createDefault();

    private static final String DINGDING_WEBHOOK_URL_PREFIX = "https://oapi.dingtalk.com";

    public SendResult send(WebHookPO webhook, String msgContent) throws IOException, WebHookException {
        if (webhook == null || msgContent.trim().isEmpty()) {
            throw new WebHookException("webhook对象或消息对象为空");
        }
        if (!RequestMethodEnum.POST.equals(webhook.getRequestMethod())) {
            throw new WebHookException("暂只支持POST请求");
        }
        HttpPost httppost = new HttpPost(webhook.getUrl());
        httppost.addHeader("Content-Type", webhook.getContentType());
        StringEntity se = new StringEntity(msgContent, "utf-8");
        httppost.setEntity(se);
        SendResult sendResult = new SendResult();
        HttpResponse response = httpclient.execute(httppost);
        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
            //钉钉消息,判断返回码
            if (webhook.getUrl().toLowerCase().startsWith(DINGDING_WEBHOOK_URL_PREFIX)) {
                String result = EntityUtils.toString(response.getEntity());
                JSONObject obj = JSONObject.parseObject(result);
                Integer errcode = obj.getInteger("errcode");
                sendResult.setErrorCode(errcode);
                sendResult.setErrorMsg(obj.getString("errmsg"));
                sendResult.setIsSuccess(errcode.equals(0));
            } else {
                sendResult.setIsSuccess(true);
            }
        } else {
            sendResult.setIsSuccess(false);
        }
        return sendResult;
    }
}
AI 代码解读

通知发送组件会按失败通知(webhook)配置的参数,封装http请求,发送请求,完成webhook通知成功与否判定

结语

读完本文,相信您已经了解kkbida应该如何使用,并了解其主要的内部设计与实现。希望kkbida能帮助您简化系统间消息通知,助力您提升开发效率。如果您对kkbida项目有好的建议与意见,欢迎到issue区讨论,同时欢迎开发者提交pr参与项目贡献

作者简介

陈精华,2018年8月加入凯京科技。kkbida项目开发者,任职于凯京研发中心架构组,参与凯京科技统一支付平台、结算平台设计和开发,负责凯京科技开源中间件开发与维护。

欢迎加入凯京开源技术QQ群:613025121,和我们一起交流互联网应用的技术架构落地实践

image

关于架构&运维部

凯京研发中心架构&运维部的工作主要分两大部分,架构组主要负责框架中间件的研究,如dubbo、apollo、skywalking、xxljob、分布式事务等、公司内开源项目以及公共服务公共组件的研发维护、新技术的引进以及落地等。运维组主要负责devops系统研发以及k8s容器环境的维护等工作。

架构组招聘

目前架构组还有两个虚位以待,欢迎志同道合的你来和我们一起交流。简历可发送至邮箱:
chenkailing@keking.cn

目录
打赏
0
31
35
38
12
分享
相关文章
Resume Matcher:增加面试机会!开源AI简历优化工具,一键解析简历和职位描述并优化
Resume Matcher 是一款开源AI简历优化工具,通过解析简历和职位描述,提取关键词并计算文本相似性,帮助求职者优化简历内容,提升通过自动化筛选系统(ATS)的概率,增加面试机会。
304 18
Resume Matcher:增加面试机会!开源AI简历优化工具,一键解析简历和职位描述并优化
ViDoRAG:开源多模态文档检索框架,多智能体推理+图文理解精准解析文档
ViDoRAG 是阿里巴巴通义实验室联合中国科学技术大学和上海交通大学推出的视觉文档检索增强生成框架,基于多智能体协作和动态迭代推理,显著提升复杂视觉文档的检索和生成效率。
206 8
ViDoRAG:开源多模态文档检索框架,多智能体推理+图文理解精准解析文档
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
FastExcel 是一款基于 Java 的高性能 Excel 处理工具,专注于优化大规模数据处理,提供简洁易用的 API 和流式操作能力,支持从 EasyExcel 无缝迁移。
1399 65
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
R1-Onevision:开源多模态推理之王!复杂视觉难题一键解析,超越GPT-4V
R1-Onevision 是一款开源的多模态视觉推理模型,基于 Qwen2.5-VL 微调,专注于复杂视觉推理任务。它通过整合视觉和文本数据,能够在数学、科学、深度图像理解和逻辑推理等领域表现出色,并在多项基准测试中超越了 Qwen2.5-VL-7B 和 GPT-4V 等模型。
195 0
R1-Onevision:开源多模态推理之王!复杂视觉难题一键解析,超越GPT-4V
Cobalt:开源的流媒体下载工具,支持解析和下载全平台的视频、音频和图片,支持多种视频质量和格式,自动提取视频字幕
cobalt 是一款开源的流媒体下载工具,支持全平台视频、音频和图片下载,提供纯净、简洁无广告的体验
1025 9
Cobalt:开源的流媒体下载工具,支持解析和下载全平台的视频、音频和图片,支持多种视频质量和格式,自动提取视频字幕
免费开源法律文档比对工具:技术解析与应用
这款免费开源的法律文档比对工具,利用先进的文本分析和自然语言处理技术,实现高效、精准的文档比对。核心功能包括文本差异检测、多格式支持、语义分析、批量处理及用户友好的可视化界面,广泛适用于法律行业的各类场景。
361 1
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
156 3
万界星空科技商业开源MES系统全面解析
万界星空MES系统支持对生产现场的实时监控,包括设备运行状态、生产进度、质量数据等关键信息的即时反馈。通过可视化的数据展示,管理者能够直观掌握生产全貌,及时发现问题并采取措施
216 6
国产化中间件正在侵蚀开源中间件
国产化中间件正在侵蚀开源中间件
1314 6
深入解析 Laravel 中的 HTTP 中间件
【8月更文挑战第31天】
95 0

推荐镜像

更多
  • DNS
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等

    登录插画

    登录以查看您的控制台资源

    管理云资源
    状态一览
    快捷访问