RabbitMQ通过HTTP API获取队列消息内容

简介: RabbitMQ通过HTTP API获取队列消息内容 背景 为配合公司数据中台建设,我们对公司内部使用的mysql binlog数据分发中间件(现已开源:https://gitee.com/kekingcn/keking-binlog-distributor )进行定制化改造,使其能够同时支持re.

RabbitMQ通过HTTP API获取队列消息内容

背景

为配合公司数据中台建设,我们对公司内部使用的mysql binlog数据分发中间件进行定制化改造,使其能够同时支持redis和rabbitmq两个分发渠道,并能在前端页面监控队列长度、队列消息内容等信息

需求

由于之前已经完成后端分发功能、redis队列监控的开发,目前需要前端同时支持rabbitmq队列监控,并需要能够预览队列中的内容

方案选择

要在前端实现显示rabbitmq队列及队列内容,有如下两种实现方案:

  • 后端主服务使用rabbitmq客户端,使用amqpadmin读取当前vhost下队列列表,注册消费方,消费队列且不发送ACK确认,让其重新入队列
  • 后端主服务通过rabbitmq提供的HTTP API直接获取队列列表,和队列中的内容(rabbitmq前台管理页面就是通过HTTP API获取的队列列表和获取队列内消息)

对比两种方案,发现前者存在两点劣势:

  1. 主服务需要额外依赖rabbitmq客户端,且ampqadmin需要有管理员权限的账号和密码
  2. 主服务需要和业务系统同时消费队列数据,编码不当可能给整个系统带来稳定性问题,且消费数据不发送ACK确认应答不太合理

而且作为对比,HTTP API只需要能登陆rabbitmq管理页面的权限,并且官方前台管理页面已经实现了获取消息功能,不用自己消费队列数据

综上分析,选择使用HTTP API方式获取队列及队列内容更为合理

难点分析

  • rabbitmq的HTTP API认证方式不明,通过浏览器调试查看官方前台管理页面http请求并不能看出明确的认证信息
  • rabbitmq的HTTP API获取消息接口获取到的数据是base64编码的字符串,我们在消息中传输的是java实体对象,base64编码转二进制流再反序列化成java实体对象过程可能有障碍。api返回数据如下图所示

    image

方案验证

  1. rabbitmq的HTTP API的认证问题可以通过其官方提供的http-client包解决,只需要添加如下依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>http-client</artifactId>
        <version>3.1.1.RELEASE</version>
    </dependency>

    然后使用com.rabbitmq.http.client.Client构造方法传入如下apiUrl, username, password三个参数

    new Client(url, username, password);

    此client中的http请求对象 private final RestTemplate rt;就可以发送http请求并通过认证

  2. base64转二进制byte[]可以直接用apache-common-codec包直接解码,依赖如下

    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.12</version>
    </dependency>

    直接调用org.apache.commons.codec.binary.Base64.decodeBase64(String base64String)即可将base64编码的字符串转成二进制byte[]数组,至于反序列化,只需要保证和rabbitmq序列化过程对应即可将二进制字节码反序列化为java实体对象

编码实现

经过方案验证,编码实现就非常简单了,具体实现如下,重写com.rabbitmq.http.client.Client类,添加如下方法

/**
  * @param vhost Virtrul Host
  * @param queuename 队列名
  * @param count 要取的队列消息数
  * @return EventBaseDTO实体列表
  */
public List<EventBaseDTO> getMessageList(String vhost, String queuename, long count) throws IOException, ClassNotFoundException {
    String bodyStr = "{\"count\":" + count + ",\"requeue\":true,\"encoding\":\"base64\"}";
    Map<String, String> body = JSON.parseObject(bodyStr, Map.class);
    final URI uri = uriWithPath("./queues/" + encodePathSegment(vhost) + "/" + encodePathSegment(queuename) + "/get");
    ResponseEntity<List> result= rt.postForEntity(uri, body, List.class);
    List<EventBaseDTO> resultList = new ArrayList<>();
    List list = result.getBody();
    for (Object entry : list) {
        if (entry instanceof Map) {
            String base64Str = ((Map) entry).get("payload") == null ? "" : ((Map) entry).get("payload").toString();
            byte[] bytes = Base64.decodeBase64(base64Str);
            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
            ObjectInputStream ois = new ObjectInputStream(bais);
            EventBaseDTO dto = (EventBaseDTO) ois.readObject();
            resultList.add(dto);
        }
    }
    return resultList;
}

实现效果

后端提供接口后,完成前端开发,即可实现同时对redis和rabbitmq队列的监控,效果如下图所示

image

此次改进很快也会同步到开源社区版本,希望我们的开源项目能反哺社区,为大家工作带来便利。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
17天前
|
JSON 监控 API
掌握使用 requests 库发送各种 HTTP 请求和处理 API 响应
本课程全面讲解了使用 Python 的 requests 库进行 API 请求与响应处理,内容涵盖环境搭建、GET 与 POST 请求、参数传递、错误处理、请求头设置及实战项目开发。通过实例教学,学员可掌握基础到高级技巧,并完成天气查询应用等实际项目,适合初学者快速上手网络编程与 API 调用。
297 130
|
1月前
|
XML JSON API
识别这些API接口定义(http,https,api,RPC,webservice,Restful api ,OpenAPI)
本内容介绍了API相关的术语分类,包括传输协议(HTTP/HTTPS)、接口风格(RESTful、WebService、RPC)及开放程度(API、OpenAPI),帮助理解各类API的特点与应用场景。
|
2月前
|
SQL 缓存 监控
SqlRest让SQL秒变Http API,还支持20+数据库(含国产数据库)
杭州奥零数据科技有限公司成立于2023年,专注于数据中台业务,维护开源项目AllData并提供商业版解决方案。AllData提供数据集成、存储、开发、治理及BI展示等一站式服务,支持AI大模型应用,助力企业高效利用数据价值。
|
3月前
|
JSON 编解码 API
Go语言网络编程:使用 net/http 构建 RESTful API
本章介绍如何使用 Go 语言的 `net/http` 标准库构建 RESTful API。内容涵盖 RESTful API 的基本概念及规范,包括 GET、POST、PUT 和 DELETE 方法的实现。通过定义用户数据结构和模拟数据库,逐步实现获取用户列表、创建用户、更新用户、删除用户的 HTTP 路由处理函数。同时提供辅助函数用于路径参数解析,并展示如何设置路由器启动服务。最后通过 curl 或 Postman 测试接口功能。章节总结了路由分发、JSON 编解码、方法区分、并发安全管理和路径参数解析等关键点,为更复杂需求推荐第三方框架如 Gin、Echo 和 Chi。
|
11月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
676 6
|
11月前
|
API
使用`System.Net.WebClient`类发送HTTP请求来调用阿里云短信API
使用`System.Net.WebClient`类发送HTTP请求来调用阿里云短信API
187 0
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
760 2
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
254 0