kkbida - 开源消息投递中间件详细解析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: kkbida - 开源消息投递中间件详细解析项目简介kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida快速开始从gitee拉取代码git clone https://gitee.

kkbida - 开源消息投递中间件详细解析

项目简介

kkbida为凯京科技开源的消息投递中间件,谐音必达,旨在保证异构系统间消息通知时消息投递必达,详情见 https://gitee.com/kekingcn/kkbida

快速开始

从gitee拉取代码

git clone https://gitee.com/kekingcn/kkbida

准备一个mysql数据库和redis,修改kk-callcenter-main模块中的 application.properties 配置文件的连接信息

使用maven构建

cd kkbida
mvn clean install -DskipTests

运行

cd kkbida/kk-callcenter-main/target
java -jar kk-callcenter-main-1.0.0-SNAPSHOT.jar

浏览器访问 http://127.0.0.1:8080 即可 用户名/密码:admin/admin

功能介绍

kkbida目前提供首页统计图表历史任务待处理任务失败通知管理四个功能模块

  • 首页统计图表可查看消息总数、成功、失败数和最近一周的每天的统计数据
  • 历史任务提供查看历史任务记录和执行情况功能,对于执行失败的任务,提供手动补偿执行功能
  • 待处理任务提供查看队列中待处理的任务记录情况(包括距下次执行还剩多少时间),同时提供直接手动提前执行功能
  • 失败通知管理即webhook通知,当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知,支持包括钉钉、企业微信等各种带webhook功能的平台,详细配置见:https://gitee.com/kekingcn/kkbida#%E5%9B%9E%E8%B0%83%E5%A4%B1%E8%B4%A5%E9%80%9A%E7%9F%A5

原理解析

系统架构详见:https://gitee.com/kekingcn/kkbida#%E7%B3%BB%E7%BB%9F%E6%9E%B6%E6%9E%84 的说明和架构图

image

下面从代码总体结构sdk任务队列处理webhook四个方面详细解析

1. 代码总体结构

从gitee拉取代码使用idea导入后,看到代码结构如下

image

项目分为三个模块kk-callcenter-mainkk-callcenter-sdkthird-party

2. sdk

如下图所示,kk-callcenter-sdk提供一个CallBackService接口和一个CallBackServiceHttpImpl http 实现类(另外服务端默认提供了dubbo实现,接入时,只需要使用和服务端同一个redis注册中心,@Reference CallBackService就可自动注入),同时提供一个 CallBackTask model类作为任务参数,并提供在任务失败的时候抛出的CallBackException异常类

image

使用详情见 https://gitee.com/kekingcn/kkbida#%E8%8E%B7%E5%8F%96callbackservice%E5%AF%B9%E8%B1%A1https://gitee.com/kekingcn/kkbida#%E8%B0%83%E7%94%A8callbackservice%E5%8F%91%E8%B5%B7%E5%9B%9E%E8%B0%83

同时,如果需要使用其他方式接入,只需要自行实现CallBackService接口就可以

3. 任务队列处理

查看kk-callcenter-sdk模块下TaskService任务处理类源码TaskDueueHadler队列处理类源码

TaskService 类提供如下两个方法

/**
* 添加延迟任务
* @param task
* @param delay
* @param timeUnit
*/
public void addDelayTask(CallBackTask task, Long delay, TimeUnit timeUnit){
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);
    delayedQueue.offer(task,delay,timeUnit);
}

/**
* 添加即时任务
* @param task
*/
public void addTask(CallBackTask task){
    task.setTaskId(UUID.randomUUID().toString());
    task.setCreateDate(new Timestamp(System.currentTimeMillis()));
    task.setLastModifiedDate(new Timestamp(System.currentTimeMillis()));
    RBlockingQueue<CallBackTask> blockingQueue = client.getBlockingQueue(Constants.REDIS_QUEUE_NAME);
    blockingQueue.addAsync(task);
}

分别为添加延迟任务和添加即时任务

  • 当收到任务消息传入时,会调用addTask方法向redis队列添加一条即时任务
  • 当任务处理失败时,会调用addDelayTask方法向reids队列添加一条延时任务

TaskDueueHadler类通过两个线程不停地从redis队列取数据,并调用执行任务方法,如果执行失败会调用TaskServiceaddDelayTask,将任务重新放入队列,延时执行,直到成功或到达最大失败次数。如果到达最大失败次数会调用Webhook执行组件向配置的所有失败提醒地址推送提醒消息

4. webhook

当消息发送失败次数达到重试限制次数,kkbida会触发webhook发起失败通知

查看kk-callcenter-sdk模块下TaskService任务处理类源码

/**
 * @auther: chenjh
 * @time: 2019/3/6 14:44
 * @description
 */
@Component
public class WebHookClientComponent {

    private HttpClient httpclient = HttpClients.createDefault();

    private static final String DINGDING_WEBHOOK_URL_PREFIX = "https://oapi.dingtalk.com";

    public SendResult send(WebHookPO webhook, String msgContent) throws IOException, WebHookException {
        if (webhook == null || msgContent.trim().isEmpty()) {
            throw new WebHookException("webhook对象或消息对象为空");
        }
        if (!RequestMethodEnum.POST.equals(webhook.getRequestMethod())) {
            throw new WebHookException("暂只支持POST请求");
        }
        HttpPost httppost = new HttpPost(webhook.getUrl());
        httppost.addHeader("Content-Type", webhook.getContentType());
        StringEntity se = new StringEntity(msgContent, "utf-8");
        httppost.setEntity(se);
        SendResult sendResult = new SendResult();
        HttpResponse response = httpclient.execute(httppost);
        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
            //钉钉消息,判断返回码
            if (webhook.getUrl().toLowerCase().startsWith(DINGDING_WEBHOOK_URL_PREFIX)) {
                String result = EntityUtils.toString(response.getEntity());
                JSONObject obj = JSONObject.parseObject(result);
                Integer errcode = obj.getInteger("errcode");
                sendResult.setErrorCode(errcode);
                sendResult.setErrorMsg(obj.getString("errmsg"));
                sendResult.setIsSuccess(errcode.equals(0));
            } else {
                sendResult.setIsSuccess(true);
            }
        } else {
            sendResult.setIsSuccess(false);
        }
        return sendResult;
    }
}

通知发送组件会按失败通知(webhook)配置的参数,封装http请求,发送请求,完成webhook通知成功与否判定

结语

读完本文,相信您已经了解kkbida应该如何使用,并了解其主要的内部设计与实现。希望kkbida能帮助您简化系统间消息通知,助力您提升开发效率。如果您对kkbida项目有好的建议与意见,欢迎到issue区讨论,同时欢迎开发者提交pr参与项目贡献

作者简介

陈精华,2018年8月加入凯京科技。kkbida项目开发者,任职于凯京研发中心架构组,参与凯京科技统一支付平台、结算平台设计和开发,负责凯京科技开源中间件开发与维护。

欢迎加入凯京开源技术QQ群:613025121,和我们一起交流互联网应用的技术架构落地实践

image

关于架构&运维部

凯京研发中心架构&运维部的工作主要分两大部分,架构组主要负责框架中间件的研究,如dubbo、apollo、skywalking、xxljob、分布式事务等、公司内开源项目以及公共服务公共组件的研发维护、新技术的引进以及落地等。运维组主要负责devops系统研发以及k8s容器环境的维护等工作。

架构组招聘

目前架构组还有两个虚位以待,欢迎志同道合的你来和我们一起交流。简历可发送至邮箱:
chenkailing@keking.cn

相关文章
|
10天前
|
机器学习/深度学习 人工智能 Rust
Grok-1:史上最大开源LLM的技术解析
Grok-1:史上最大开源LLM的技术解析
1116 1
|
5月前
|
算法 Java 程序员
月薪3W+ 多亏发现GitHub开源的百万星Java神技-中高级核心知识解析
一提到高薪职业,大多数人最先想到的就是程序员。前几天,阿嘴正好刷到一条关于“程序员薪资”的帖子,一位网友问:想听个实话,程序员真的很容易月薪三四万吗? 这条帖子快速吸引了许多人的关注,短短几日内评论量上涨到300+,程序员的薪资待遇真的有那么好吗?大家是怎么回答的?和阿嘴一起来看看吧!
|
1月前
|
开发框架 安全 中间件
38、中间件漏洞解析-IIS6.0
38、中间件漏洞解析-IIS6.0
10 0
|
1月前
|
Java API Spring
开源!一款基于Spring Boot的二维码生成和解析工具
开源!一款基于Spring Boot的二维码生成和解析工具
31 1
|
2月前
|
JSON fastjson 数据库
字符编码导致Rapidjson(腾讯开源的json解析库)到Fastjson(阿里开发的Java json解析库)转换失败的原因分析
最近在客户端的开发的过程中,使用到了RapidJson,公司的开发是客户端和数据库端都由不同的人进行开发,我负责的客户端的逻辑开发(使用c++),开发工具同时使用了VS2017和QT的编译环境,使用QT主要是为了客户端界面开发方便,而使用了VS环境主要是维护公司开发的数据库接口库,这个库的唯一作用就是作为一个中间桥梁,使用Rapidjson将数据库接口的json数据格式解析为结构体数据,从而在客户端界面进行展示,或者接收客户端的数据,使用Rapidjson将其转换为json数据,发送给数据库接口以保存数据使用 。不太明白的可以参考我上一篇文章说明Rapidjson的使用过程-Parse解析数组
39 0
|
3月前
|
缓存 应用服务中间件 网络安全
10个顶级Linux开源反向代理服务器 - 解析与导航
10个顶级Linux开源反向代理服务器 - 解析与导航
364 2
|
4月前
|
XML 数据采集 JavaScript
基于.Net开源Html解析器,此外还支持SVG、XML等格式
基于.Net开源Html解析器,此外还支持SVG、XML等格式
22 0
|
4月前
|
缓存 监控 NoSQL
一个.Net Core开源缓存中间件,让你更加简单、方便使用缓存
一个.Net Core开源缓存中间件,让你更加简单、方便使用缓存
128 0
|
5月前
|
监控 Kubernetes Java
焯!一份京东开源的微服务架构深度解析,竟让大厂人熬夜也要读完
什么是微服务,为什么需要用微服务? 一、微服务是什么? 定义:微服务是一些协同工作的小而自治的服务,这个服务是高凝聚力和松散耦合的。
|
5月前
|
Cloud Native Linux Go
开源许可证解析:从MIT到GPL
开源许可证解析:从MIT到GPL
95 0

相关产品

  • 云消息队列 MQ
  • 云消息队列 Kafka 版
  • 微服务引擎
  • 推荐镜像

    更多