RabbitMQ 延迟消息插件
该插件将延迟消息(或预定消息)添加到 RabbitMQ。
用户可以声明一个具有 x-delayed-message 类型的交换,然后发布带有自定义标题 x-delay 的消息,该标题以毫秒为单位表示消息的延迟时间。该消息将在 x-delay 毫秒后被传递到相应的队列中。
永续合约/秒合约/合约交易所开发详情,永续合约/秒合约/合约交易所系统开发技术方案
深入分析代币合约流动性质押挖矿分红系统开发实现技术原理及源码部署
交易所系统开发如何开发?数字货币交易所系统开发成熟技术案例
去中心化交易所系统开发技术原理丨数字货币去中心化交易所系统开发(说明案例)
数字货币交易所系统 数字货币交易所系统开发成品案例 数字货币交易所现成源码部署
交易所系统开发案例说明丨数字货币交易所系统开发技术方案详情
数字货币交易所系统开发方案详细丨数字货币交易所开发成品技术源码
合约交易系统设计与开发|永续合约交易所搭建,合约平台开发|永续合约交易所开发技术|特点介绍
秒合约交易所开发详细丨秒合约交易所系统开发详细及规则丨秒合约交易所系统源码部署
支持的 RabbitMQ 版本
此插件的最新版本以 RabbitMQ 3.10.x 为目标,早于 3.9.x 的系列不受支持。
支持的 Erlang/OTP 版本
此插件需要 Erlang 23.2 或更高版本,与 RabbitMQ 3.8.16+ 相同。
项目成熟度
该插件被认为是相当稳定的,只要用户了解其局限性,就有可能适合生产使用。
在其存在的 ~ 5 年中,它有一些问题和一个基本问题得到了修复。众所周知,它对一些用户来说工作得相当好。它也有已知的限制(见下面一节),包括那些与延迟和信息的复制以及延迟信息的数量有关的限制。
这个插件目前没有得到 Pivotal 的商业支持,但这并不意味着它将被放弃或 RabbitMQ 团队对未来改进它不感兴趣。然而,对于我们的小团队来说,这并不是一个高度优先事项。
因此,请用您的工作量来尝试一下,并由您自己决定。
安装
下载一个二进制版本
二进制版本是通过GitHub发布的。
与所有第三方插件一样,.ez 文件必须放在节点的插件目录中,并且可由 RabbitMQ 进程的有效用户读取。
要知道插件目录是什么,请使用 rabbitmq-plugins 目录
rabbitmq-plugins directories -s
启用插件
然后运行下面的命令。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用方法
要使用延迟消息功能,请声明一个类型为x-delayed-message的交换。
// ... 省略的代码 ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args)。
// ... 更多代码 ...
请注意,我们传递了一个额外的头,叫做x-delayed-type,更多的内容在路由部分。
一旦我们声明了交换,我们就可以发布消息,提供一个标头,告诉插件要延迟多长时间。
// ...省略了代码 ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8")。
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers)。
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes)。
byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8")。
Map<String, Object> headers2 = new HashMap<String, Object>();
headers2.put("x-delay", 1000);
AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2)。
channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2)。
// ... 更多代码 ...
在上面的例子中,我们发布了两条消息,用x-delay头来指定延迟时间。在这个例子中,该插件将首先向我们的队列传递正文为 "更多延迟的有效载荷 "的消息,然后是正文为 "延迟的有效载荷 "的消息。
如果x-delay头不存在,那么该插件将继续无延迟地传送信息。
路由选择
这个插件允许通过x-delayed-type参数进行灵活的路由,这些参数可以在exchange.declaration中传递。在上面的例子中,我们使用 "直接 "作为交换类型。这意味着该插件将具有直接交换所显示的相同路由行为。
如果你想要一个不同的路由行为,那么你可以提供一个不同的交换类型,比如说 "主题"。你也可以指定由插件提供的交换类型。请注意,这个参数是必须的,而且必须指的是现有的交换类型。
性能影响
由于 "x-delayed-type "参数,人们可以用这个交换来代替其他交换,因为 "x-delayed-message "交换将只是作为代理。注意,如果你这样做,可能会有一些性能影响。
对于每个穿过 "x-delayed-message "交换的消息,该插件将尝试确定该消息是否必须过期,确保延迟在范围内,即:Delay > 0, Delay =< ?ERL_MAX_T(在Erlang中,计时器可以设置为未来的(2^32)-1毫秒)。
如果前面的条件成立,那么消息将被持久化到Mnesia,一些其他的逻辑将被启动,以确定这个特定的消息延迟是否需要取代当前的预定定时器,等等。
这意味着,虽然人们可以使用这种交换来代替直接或扇出交换(或任何其他交换),但它将比使用实际交换要慢。如果你不需要延迟消息,那么就使用实际的交换。
限制
延迟的消息被存储在一个Mn中。