kkbida - 开源消息投递中间件详细解析

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: kkbida - 开源消息投递中间件详细解析项目简介kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida快速开始从gitee拉取代码git clone https://gitee.

kkbida - 开源消息投递中间件详细解析

项目简介

kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida

快速开始

从gitee拉取代码

git clone https://gitee.com/kekingcn/kkbida

准备一个mysql数据库和redis,修改kk-callcenter-main模块中的 application.properties 配置文件的连接信息

使用maven构建

cd kkbida
mvn clean install -DskipTests

运行

cd kkbida/kk-callcenter-main/target
java -jar kk-callcenter-main-1.0.0-SNAPSHOT.jar

浏览器访问 http://127.0.0.1:8080 即可 用户名/密码:admin/admin

功能介绍

kkbida目前提供首页统计图表历史任务待处理任务失败通知管理四个功能模块

  • 首页统计图表可查看消息总数、成功、失败数和最近一周的每天的统计数据
  • 历史任务提供查看历史任务记录和执行情况功能,对于执行失败的任务,提供手动补偿执行功能
  • 待处理任务提供查看队列中待处理的任务记录情况(包括距下次执行还剩多少时间),同时提供直接手动提前执行功能
  • 失败通知管理即webhook通知,当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知,支持包括钉钉、企业微信等各种带webhook功能的平台,详细配置见:https://gitee.com/kekingcn/kkbida#%E5%9B%9E%E8%B0%83%E5%A4%B1%E8%B4%A5%E9%80%9A%E7%9F%A5

原理解析

系统架构详见:https://gitee.com/kekingcn/kkbida#%E7%B3%BB%E7%BB%9F%E6%9E%B6%E6%9E%84 的说明和架构图

image

下面从代码总体结构sdk任务队列处理webhook四个方面详细解析

1. 代码总体结构

从gitee拉取代码使用idea导入后,看到代码结构如下

image

项目分为三个模块kk-callcenter-mainkk-callcenter-sdkthird-party

2. sdk

如下图所示,kk-callcenter-sdk提供一个CallBackService接口和一个CallBackServiceHttpImpl http 实现类(另外服务端默认提供了dubbo实现,接入时,只需要使用和服务端同一个redis注册中心,@Reference CallBackService就可自动注入),同时提供一个 CallBackTask model类作为任务参数,并提供在任务失败的时候抛出的CallBackException异常类

image

使用详情见 https://gitee.com/kekingcn/kkbida#%E8%8E%B7%E5%8F%96callbackservice%E5%AF%B9%E8%B1%A1https://gitee.com/kekingcn/kkbida#%E8%B0%83%E7%94%A8callbackservice%E5%8F%91%E8%B5%B7%E5%9B%9E%E8%B0%83

同时,如果需要使用其他方式接入,只需要自行实现CallBackService接口就可以

3. 任务队列处理

查看kk-callcenter-sdk模块下TaskService任务处理类源码TaskDueueHadler队列处理类源码

TaskService 类提供如下两个方法

/**
* 添加延迟任务
* @param task
* @param delay
* @param timeUnit
*/
public void addDelayTask(CallBackTask task, Long delay, TimeUnit timeUnit){
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);
    delayedQueue.offer(task,delay,timeUnit);
}

/**
* 添加即时任务
* @param task
*/
public void addTask(CallBackTask task){
    task.setTaskId(UUID.randomUUID().toString());
    task.setCreateDate(new Timestamp(System.currentTimeMillis()));
    task.setLastModifiedDate(new Timestamp(System.currentTimeMillis()));
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    blockingQueue.addAsync(task);
}

分别为添加延迟任务和添加即时任务

  • 当收到任务消息传入时,会调用addTask方法向redis队列添加一条即时任务
  • 当任务处理失败时,会调用addDelayTask方法向reids队列添加一条延时任务

TaskDueueHadler类通过两个线程不停地从redis队列取数据,并调用执行任务方法,如果执行失败会调用TaskServiceaddDelayTask,将任务重新放入队列,延时执行,直到成功或到达最大失败次数。如果到达最大失败次数会调用Webhook执行组件向配置的所有失败提醒地址推送提醒消息

4. webhook

当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知

查看kk-callcenter-sdk模块下TaskService任务处理类源码

/**
 * @auther: chenjh
 * @time: 2019/3/6 14:44
 * @description
 */
@Component
public class WebHookClientComponent {

    private HttpClient httpclient = HttpClients.createDefault();

    private static final String DINGDING_WEBHOOK_URL_PREFIX = "https://oapi.dingtalk.com";

    public SendResult send(WebHookPO webhook, String msgContent) throws IOException, WebHookException {
        if (webhook == null || msgContent.trim().isEmpty()) {
            throw new WebHookException("webhook对象或消息对象为空");
        }
        if (!RequestMethodEnum.POST.equals(webhook.getRequestMethod())) {
            throw new WebHookException("暂只支持POST请求");
        }
        HttpPost httppost = new HttpPost(webhook.getUrl());
        httppost.addHeader("Content-Type", webhook.getContentType());
        StringEntity se = new StringEntity(msgContent, "utf-8");
        httppost.setEntity(se);
        SendResult sendResult = new SendResult();
        HttpResponse response = httpclient.execute(httppost);
        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
            //钉钉消息,判断返回码
            if (webhook.getUrl().toLowerCase().startsWith(DINGDING_WEBHOOK_URL_PREFIX)) {
                String result = EntityUtils.toString(response.getEntity());
                JSONObject obj = JSONObject.parseObject(result);
                Integer errcode = obj.getInteger("errcode");
                sendResult.setErrorCode(errcode);
                sendResult.setErrorMsg(obj.getString("errmsg"));
                sendResult.setIsSuccess(errcode.equals(0));
            } else {
                sendResult.setIsSuccess(true);
            }
        } else {
            sendResult.setIsSuccess(false);
        }
        return sendResult;
    }
}

通知发送组件会按失败通知(webhook)配置的参数,封装http请求,发送请求,完成webhook通知成功与否判定

结语

读完本文,相信您已经了解kkbida应该如何使用,并了解其主要的内部设计与实现。希望kkbida能帮助您简化系统间消息通知,助力您提升开发效率。如果您对kkbida项目有好的建议与意见,欢迎到issue区讨论,同时欢迎开发者提交pr参与项目贡献

作者简介

陈精华,2018年8月加入凯京科技。kkbida项目开发者,任职于凯京研发中心架构组,参与凯京科技统一支付平台、结算平台设计和开发,负责凯京科技开源中间件开发与维护。

欢迎加入凯京开源技术QQ群:613025121,和我们一起交流互联网应用的技术架构落地实践

image

关于架构&运维部

凯京研发中心架构&运维部的工作主要分两大部分,架构组主要负责框架中间件的研究,如dubbo、apollo、skywalking、xxljob、分布式事务等、公司内开源项目以及公共服务公共组件的研发维护、新技术的引进以及落地等。运维组主要负责devops系统研发以及k8s容器环境的维护等工作。

架构组招聘

目前架构组还有两个虚位以待,欢迎志同道合的你来和我们一起交流。简历可发送至邮箱:
chenkailing@keking.cn

相关文章
|
2月前
|
缓存 前端开发 中间件
[go 面试] 前端请求到后端API的中间件流程解析
[go 面试] 前端请求到后端API的中间件流程解析
|
8天前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
22 3
|
1月前
|
监控 数据可视化 搜索推荐
万界星空科技商业开源MES系统全面解析
万界星空MES系统支持对生产现场的实时监控,包括设备运行状态、生产进度、质量数据等关键信息的即时反馈。通过可视化的数据展示,管理者能够直观掌握生产全貌,及时发现问题并采取措施
58 5
|
1月前
|
消息中间件 存储 NoSQL
国产化中间件正在侵蚀开源中间件
国产化中间件正在侵蚀开源中间件
108 6
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
【热门开源项目】阿里开源巨擘:Qwen-2 72B深度解析与推荐
在人工智能的浪潮中,开源模型如同璀璨的星辰,指引着开发者们探索未知的领域。而今天,我们将聚焦在阿里云推出的开源模型Qwen-2 72B上,从其项目介绍、技术特点、代码解析等多个角度,深入解析并推荐这一卓越的开源项目。
134 1
|
2月前
|
中间件 PHP 开发者
深入解析 Laravel 中的 HTTP 中间件
【8月更文挑战第31天】
27 0
|
3月前
|
监控 数据可视化 搜索推荐
万界星空科技商业开源MES系统全面解析
万界星空科技提供商业开源MES系统,基于Java的开源版本,含源码及拖拽式数据大屏,适用于定制开发。系统集成ERP、PDM、QC,实现无缝对接与智能调度,优化资源配置。具备实时监控、质量控制、灵活定制等功能,支持低代码定制,广泛应用于多个制造业领域。欲了解更多,可访问官网或搜索联系。
99 10
|
3月前
|
中间件 数据库 开发者
解析Python Web框架的四大支柱:模板、ORM、中间件与路由
【7月更文挑战第20天】Python Web框架如Django、Flask、FastAPI的核心包括模板(如Django的DTL和Flask的Jinja2)、ORM(Django的内置ORM与Flask的SQLAlchemy)、中间件(Django的全局中间件与Flask的装饰器实现)和路由(Django的urls.py配置与Flask的@app.route()装饰器)。这些组件提升了代码组织和数据库操作的便捷性,确保了Web应用的稳定性和可扩展性。
60 0
|
4月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
962 0
|
3月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
263 3

推荐镜像

更多