19)消息队列的终极解决方案 Stream

简介: 19)消息队列的终极解决方案 Stream

楔子



前面我们介绍了如何通过 List 和 Pub/Sub 来实现一个消息队列,但很明显它们都有很严重的缺陷,作为消息队列是不合格的。

而 Redis 作者也注意到了这一点,于是开发了 disque,目的是成为一个基于内存的分布式消息中间件。但该项目没什么人关注,于是在 5.0 的时候将 disque 的功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream。

而前面我们说过,一个专业的消息队列应该支持以下功能:

  • 支持阻塞等待拉取消息;
  • 支持发布 / 订阅模式;
  • 消费者下线之后重新上线,仍能消费下线期间生产者发送的消息;
  • 消费者消费失败,可重新消费,也就是支持消息被同一个消费者消费多次;
  • 实例宕机,消息不丢失,数据可持久化;
  • 即使消息大量堆积,也不会丢数据;

那么这些功能 Stream 是不是都支持呢?以及 Stream 类型采用的底层数据结构又是什么呢?带着这些问题,我们来开始 Stream 的学习,首先是它的使用。

剧透:Stream 采用了 Radix Tree 和 listpack 两种数据结构来保存消息,后面会介绍 Radix Tree。


Stream 的使用



首先,Stream 作为消息队列,它保存的消息通常具有以下两个特征:

  • 一条消息由一个或多个键值对组成;
  • 每插入一条消息,这条消息都会对应一个消息 ID;

关于消息 ID,我们一般会让 Redis 自动生成,并且 ID 是递增的。消息 ID 由时间戳和序号组成,时间戳是消息插入时,以毫秒为单位的服务器当前时间;但光有时间戳还不够,因为同一毫秒内,可能会插入多条数据,所以还要有序号。

而 Stream 支持的 API 如下:

  • xadd:添加消息;
  • xread:读取消息;
  • xlen:查询消息的长度;
  • xdel:根据消息 ID 删除消息;
  • xrange:读取某个区间的消息;
  • del:删除整个 Stream,当然 del 可以删除任意 key;

我们实际操作一波。

添加消息

命令:xadd key ID field1 string1 field2 string2···

> xadd girl * name satori age 17
"1663918086746-0"

添加消息之后会返回消息的 ID,由时间戳+序号组成,并且 ID 我们指定的是 *,表示让 Redis 自动生成 ID,当然我们也可以手动指定。

再插入两条消息,此时 girl 这个 Stream 里面就有了三条消息。

> xadd girl * name koishi age 16
"1663918266484-0"
> xadd girl * name marisa age 16
"1663918276271-0"

所以一条消息会包含一组键值对,并且从插入数据和返回结果能够看出,对于 Stream 类型来说,它要保存的数据有以下两个特征。

  • 连续插入的消息 ID,其前缀有较多部分是相同的,因为它们的插入时间非常接近;
  • 连续插入的消息,它们对应的键值对中的键通常是相同的(也可以不同),比如这里都是 name 和 age,因为理论上发往同一个 Stream 里面的消息应该具备相同的特征;

那么针对 Stream 的这两个数据特征,应该使用什么样的数据结构来保存这些消息数据呢?

毫无疑问,我们首先想到的就是哈希表,一个消息 ID 对应哈希表中的一个 key,消息内容对应这个 key 的 value。

但是就像刚才说的一样,消息 ID 和消息中的键经常会有重复的部分。如果使用哈希表,就会导致有不少冗余数据,这会浪费 Redis 宝贵的内存空间。

因此,为了充分节省内存空间,Stream 使用了两种内存友好的数据结构:listpack 和 Radix Tree。其中消息 ID 使用 String 保存,作为 Radix Tree 中的 key,而消息具体数据是使用 listpack 保存,作为 Radix Tree 的 value

关于 Radix Tree 我们一会再说,先回到 Stream 的 API 上面来。

查询消息长度

命令:xlen key

# 显然当前有 3 条消息
> xlen girl
(integer) 3
# 再插入一条
> xadd girl * name scarlet age 400
"1663919561887-0"
# 长度变为 4
127.0.0.1:6379> xlen girl
(integer) 4
# 如果 key 不存在,则长度为 0
> xlen not_exists
(integer) 0


删除消息

命令:xdel key 消息ID·····,可以同时删除多个

> xlen girl
(integer) 4
# 根据消息 ID 删除消息,可同时删除多个
# 并返回删除的消息数量
> xdel girl 1663918276271-0
(integer) 1
# 还剩下 3 个
> xlen girl
(integer) 3


删除 Stream

直接使用 del,它可以删除任意多个任意的 key。

> del girl
(integer) 1


查询区间消息


命令:xrange key start end count n,这里的 start 和 end 指的是消息ID。

# 添加消息
> xadd girl * name satori age 17
"1663921038755-0"
> xadd girl * name koishi age 16
"1663921047878-0"
> xadd girl * name marisa age 16
"1663921054277-0"
> xadd girl * name scarlet age 400
"1663921064383-0"
# 查询
> xrange girl 1663921038755-0 1663921064383-0
1) 1) "1663921038755-0"
   2) 1) "name"
      2) "satori"
      3) "age"
      4) "17"
2) 1) "1663921047878-0"
   2) 1) "name"
      2) "koishi"
      3) "age"
      4) "16"
3) 1) "1663921054277-0"
   2) 1) "name"
      2) "marisa"
      3) "age"
      4) "16"
4) 1) "1663921064383-0"
   2) 1) "name"
      2) "scarlet"
      3) "age"
      4) "400"

还是比较简单的,把整个过程想象成数组的截取即可,只不过数组用的是索引,Stream 用的是消息 ID。另外这里我们指定的是第一条和最后一条的消息 ID,所以全部返回了,而返回全量消息还有一种做法:xrange girl - +

  • - 代表第一条消息;
  • + 代表最后一条消息;

并且在返回的时候,还可以指定 count 来限制数量。

# 总共 4 条,但只查询 2 条
> xrange girl - + count 2
1) 1) "1663921038755-0"
   2) 1) "name"
      2) "satori"
      3) "age"
      4) "17"
2) 1) "1663921047878-0"
   2) 1) "name"
      2) "koishi"
      3) "age"
      4) "16"

注意:这个过程并不是先全量查询,然后只返回前 count 条;而是当查询的条数达到 count 时,直接返回。另外即使数量达不到 count 也是可以的,有多少返回多少,比如这里的消息总量是 4,但 count 指定为 10,那么就只会返回 4 条。

最后,虽然这里查询用的是消息ID,但是也要像索引一样注意先后关系。start 对应的消息要在 end 对应的消息之前,类似于索引。

读取某条消息之后的 n 条消息

命令:xread count n streams xxx MESSAGE_ID

从名为 xxx 的 Stream 中,读取消息 ID 为 MESSAGE_ID 之后的 n 条消息。

# 读取 '1663921047878-0' 之后的两条消息
> xread count 2 streams girl 1663921047878-0
1) 1) "girl"
   2) 1) 1) "1663921054277-0"
         2) 1) "name"
            2) "marisa"
            3) "age"
            4) "16"
      2) 1) "1663921064383-0"
         2) 1) "name"
            2) "scarlet"
            3) "age"
            4) "400"
            
# 1663921054277-0 后面只剩下一条消息了
# 所以即便 count 为 2,也只返回了一条
> xread count 2 streams girl 1663921054277-0
1) 1) "girl"
   2) 1) 1) "1663921064383-0"
         2) 1) "name"
            2) "scarlet"
            3) "age"
            4) "400"

并且该命令还提供了一个可以阻塞读取的参数 block,我们可以使用它读取某条数据之后的新增数据。

xread count 1 block 0 streams girl MESSAGE_ID

不使用 block 的话,如果该消息 ID 后面没有消息了,那么会直接返回空。但通过 block 超时时间 可以在没有消息的时候让程序处于阻塞状态,如果超时时间为 0,那么会一直等待,直到队列里面有数据再返回。

一般来说,如果使用 block,那么 MESSAGE_ID 一定是最后一条消息的 ID。如果不是最后一条消息的 ID,那么有没有 block 没什么区别。所以 Redis 为了使用方便,还支持我们使用 $,它代表的就是最后一条消息的 ID。

我们看到程序阻塞在这里了,因为 $ 代表最后一条消息,它后面已经没有消息了。所以该命令会阻塞,直到别的客户端往 girl 这个 Stream 里插入一条消息之后才会返回。

> xadd girl * name sakura age 20
"1663923328212-0"

新开一个终端,往里面写入一条消息,然后查看第一个终端。

发现第一个终端读取到消息,并解除阻塞,而且输出也告诉我们整个过程阻塞了 232.43 秒。当超时时间为 0 时,代表没有上限,如果大于 0,代表阻塞指定的毫秒数。

最后,我们这里的 count 指定的是 1,但不管指定的是多少,只要有消息过来,都会解除阻塞。

所以从这里我们看到,消息队列的第一个特性:支持阻塞式拉取消息,Stream 是满足的。


消费者组



Stream 也支持消费者组,我们来看一下。

创建消费者组

命令:xgroup create <stream_key> <group_key> <ID>

> xgroup create girl group1 0-0
OK
  • girl:stream key 的名称;
  • group1:group key 的名称
  • 0-0:消息 ID,0-0 表示从第一条开始向后读取;

如果要从最后一条消息开始向后读取的话,那么使用 $ 即可。

> xgroup create girl group2 $
OK

以上两个消费者组就创建完毕了。

读取消息

命令:xreadgroup group group_key consumer_key [count n] streams stream_key

  • group_key:创建的分组名;
  • consumer_key:消费者名,随便指定即可;
  • count n:每次读取的数量,可选,不指定全部返回;
  • stream_key:消息队列;
# 从组 'group1' 里面指定一个消费者 'c1'
# 然后消费队列 girl 里面的消息,并且消费 1 条
# 注意结尾有一个 >,表示后续从下一条消息开始消费
6379> xreadgroup group group1 c1 count 1 streams girl >
1) 1) "girl"
   2) 1) 1) "1663921038755-0"
         2) 1) "name"
            2) "satori"
            3) "age"
            4) "17"

然后还可以再启动一个消费者:

# 从组 'group' 里面指定一个消费者 c2,继续消费
# 消费者名字可以随便起
6379> xreadgroup group group1 c2 count 2 streams girl >
1) 1) "girl"
   2) 1) 1) "1663921047878-0"
         2) 1) "name"
            2) "koishi"
            3) "age"
            4) "16"
      2) 1) "1663921054277-0"
         2) 1) "name"
            2) "marisa"
            3) "age"
            4) "16"

这里消费了两条,如果不指定 count,那么默认全部消费。当消息消费完毕之后,会返回空。

# 此时已全部消费完毕了,如果再消费就会返回空
6379> xreadgroup group group1 c3 streams girl >
1) 1) "girl"
   2) 1) 1) "1663921064383-0"
         2) 1) "name"
            2) "scarlet"
            3) "age"
            4) "400"
      2) 1) "1663923328212-0"
         2) 1) "name"
            2) "sakura"
            3) "age"
            4) "20"

注意:消费者的数量是不受限制的。这个有点类似 kafka,一个组里面可以有任意个消费者,它们共同消费一个队列里的数据,实现并行消费。但一条消息最多只能被组里面的一个消费者消费,如果一条消息同时被两个消费者消费,那么这两个消费者应该隶属于不同的消费者组。

此外,xreadgroup 也支持阻塞式拉取消息。

6379> xreadgroup group group1 c4 block 0 streams girl >
# 客户端陷入阻塞

如果我们此时另一个客户端往 girl 里面写入一条消息,那么此处就会解除阻塞,并返回新写入的消息。这里我们不写了,直接停掉,然后创建一个新的消费者组,看看能不能从头开始消费。

# 已经消费到头了,再消费的话,则返回空
> xreadgroup group group1 c4 streams girl >
(nil)
# 新建一个消费者组
> xgroup create girl group1_1 0-0
OK
# 从头开始消费
> xreadgroup group group1_1 c4 count 1 streams girl >
1) 1) "girl"
   2) 1) 1) "1663921038755-0"
         2) 1) "name"
            2) "satori"
            3) "age"
            4) "17"

所以这里我们又可以得出一个结论:Stream 满足消息队列的第二个特点,支持发布 / 订阅模式,就是让多组消费者消费同一批数组。

从图中可以看到,两组消费者获取同一批数据,这样一来就达到了多组消费者「订阅」消费的目的。

消息消费确认

一般消息接收完了,我们会回复一个确认信息,告知已经消费完毕,命令:xack stream-key group-key ID···

> xack girl group1_1 1663921038755-0
(integer) 1

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成。

所以除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到了消息 ID。当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。

如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息,待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

并且即使是生成者在消费者下线期间生产的消息,消费者上线之后也是可以收到的。因此消息队列的第三和第四个特性,Stream 也是支持的。

查询未确认的消息


# 未确认的消息有 6 条
> xpending girl group1
1) (integer) 6
2) "1663921038755-0"
3) "1663923499682-0"
4) 1) 1) "c1"
      2) "1"
   2) 1) "c2"
      2) "2"
   3) 1) "c3"
      2) "3"
# 确认两条      
> xack girl group1 1663921038755-0 1663921047878-0
(integer) 2
# 还剩四条
> xpending girl group1
1) (integer) 4
2) "1663921054277-0"
3) "1663923499682-0"
4) 1) 1) "c2"
      2) "1"
   2) 1) "c3"
      2) "3"


xinfo 信息查询

xinfo stream stream_key:查询 Stream 的相关信息;

> xinfo stream girl
 1) "length"
 2) (integer) 6 # 队列中有6个消息
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1663923499682-0"
 9) "groups" # 5 个消费分组,我中间又创建了几个
10) (integer) 5
11) "first-entry"  # 第一条消息
12) 1) "1663921038755-0"
    2) 1) "name"
       2) "satori"
       3) "age"
       4) "17"
13) "last-entry"  # 最后一条消息
14) 1) "1663923499682-0"
    2) 1) "name"
       2) "sakura"
       3) "age"
       4) "20"

xinfo groups stream_key:查询 Stream 消费者组信息;

> xinfo groups girl
1) 1) "name"
   2) "group1" # 消费者组名称
   3) "consumers"
   4) (integer) 4  # 组里面有 3 个消费者
   5) "pending"
   6) (integer) 3  # 3 个未确认的消息
   7) "last-delivered-id"
   8) "1663923499682-0"
2) 1) "name"
   2) "group1_1"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1663921038755-0"
3) ...
   ...

xinfo consumers stream_key group_key:查询某个消费组的成员信息

> xinfo consumers girl group1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 4667346
2) 1) "name"
   2) "c2"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 4405245
3) 1) "name"
   2) "c3"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 4259811
4) 1) "name"
   2) "c4"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 1879248

xgroup delconsumer stream-key group-key consumer-key:删除组里面的某个消费者

> xgroup delconsumer girl group1 c2
(integer) 1

xgroup destroy stream-key group-key:删除消费者组

> xgroup destroy girl group1
(integer) 1

关于 Stream 的命令我们就介绍完了,并且在过程中,我们知道消息队列的前 4 个特性,Stream 都是满足的。那么问题来了,后两个特性是否也满足呢?

首先倒数第二个特性:实例宕机,消息不丢失,数据可持久化,显然 Stream 是满足的。因为 Stream 是新增的数据类型,与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。我们只需要配置好持久化策略,这样就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

最后一个特性:即使消息大量堆积,也不会丢数据,这个 Stream 是否支持呢?一般来说,当消息队列发生消息堆积时,一般只有 2 个解决方案:

  • 生产者限流:避免消费者处理不及时,导致持续积压;
  • 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息;

而 Redis 在实现 Stream 时,采用了第 2 个方案,在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

# 指定队列长度最大 10000
> XADD queue MAXLEN 10000 * name satori
"1638518032447-0"

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

经过以上分析,我们发现 Redis 的 Stream 几乎覆盖了消息队列的各种场景,那这是不是意味着,Stream 可以作为专业的消息队列中间件来使用呢?其实还不够,就算 Redis 能做到以上这些,也只是「趋近于」专业的消息队列。原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。

下面就来将 Redis 与专业的队列中间件做个对比,看看 Redis 作队列时,还有哪些欠缺?


Redis 和专业消息队列的差异


首先使用消息队列,会涉及三个部分:生产者、中间件本身、消费者。

因此消息是否会丢失,需要考虑以下三种情况:

  • 生产者会不会丢消息;
  • 消费者会不会丢消息;
  • 队列中间件会不会丢消息;

生产者会不会丢消息

当生产者在发布消息时,可能发生以下异常情况:

  • 消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败;
  • 不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了;

如果是第一种,消息根本没发出去,那么重新发一次就好了。如果是第二种,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止。

生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。

也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理。但这也意味着消息可能会重复发送,因为消息可能发送成功了,但消费者不知道而已。所以消费者这边,就需要多做一些逻辑了,对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。

从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。所以无论是 Redis 还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的。

消费者会不会丢消息

这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了,那消费者还能否重新消费失败的消息?

要解决这个问题,就必须要有一个机制,只要当消费者在处理完消息、并告知中间件之后,中间件才能把消息标记已处理,否则仍会把这些数据发给消费者。

这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ, Kafka,其实都是这么做的。所以从这个角度来看,Redis 也是合格的。

中间件本身会不会丢消息

前面两个问题都比较好处理,只要客户端和服务端配合好,就能保证不丢消息。但如果队列中间件本身就不可靠呢?毕竟生产者和消费者都依赖它,如果它不可靠,那么无论生产者和消费者怎么做,都无法保证数据不丢。

在这个方面,Redis 其实没有达到要求,Redis 在以下两个场景下,都会导致数据丢失。

  • AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能;
  • 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库);

基于以上原因我们可以看到,Redis 本身无法保证严格的数据完整性,所以如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。

而像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时一般都是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。也正因为如此,它们在设计时也更复杂,毕竟是专门用作消息队列的。

但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。最后,我们来看消息积压怎么办?

消息积压怎么办

因为 Redis 的数据都存储在内存中,这意味着一旦发生消息积压,就会导致 Redis 的内存持续增长,如果超过机器内存上限,则面临被 OOM 的风险。所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多。当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。综上我们可以看到,把 Redis 当作队列来使用时,始终面临的两个问题:

  • Redis 本身可能会丢数据;
  • 面对消息积压,Redis 内存资源紧张;

到这里,关于 Redis 是否可以用作队列,结论已经很清晰了。如果你的业务场景不复杂,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。而且 Redis 相比于 Kafka 和 RabbitMQ,部署和运维也更加轻量。

关于 Redis 用作消息队列,我们再总结一下:

该部分内容引用自:水滴与银弹《把Redis当作队列来用,真的合适吗?》

Stream 类型相关的内容我们就介绍完了,再来看看 Stream 是怎么实现的?


Stream 底层结构



Stream 结构如下:

typedef struct stream {
    //保存消息的Radix Tree
    rax *rax;               
    //消息流中的消息个数
    uint64_t length;        
    //当前消息流中最后插入的消息的ID
    streamID last_id;       
    //当前消息流的消费组信息,也是用Radix Tree保存
    rax *cgroups;           
} stream;

所以重点是 Radix Tree,它是前缀树的一种,前缀树也被称为字典树,英文是 Trie。刷 LeetCode 的话,应该会遇到相关的问题。

前缀树的特点是,每个 key 会被拆分成单字符,然后逐一保存在树上的节点中。前缀树的根节点不保存任何字符,而除了根节点以外的其他节点,每个节点只保存一个字符。当我们把从根节点到当前节点的路径上的字符拼接在一起时,就可以得到相应 key 的值了。

前缀树在查找指定字符串的时候,时间复杂度是 O(K),换句话说它只和要查找的字符串的长度有关。并且 pen 和 pencil 都具有相同的前缀,如果采用哈希表,那么 pen 三个字符会被保存两遍。

但话虽如此,前缀树不可能保证每个相同的字符,都能被共享,举个例子:

对于当前这个例子,我们就无法保证每个字符都能被共享。

相关文章
|
消息中间件 Java 开发工具
消息队列和应用工具产品体系-消息队列 Rocket 版的基本使用
消息队列和应用工具产品体系-消息队列 Rocket 版的基本使用
消息队列和应用工具产品体系-消息队列 Rocket 版的基本使用
|
消息中间件 Java Kafka
一款消息队列的客户端框架——启明信息车联网MQ演进实践分享
一款消息队列的客户端框架——启明信息车联网MQ演进实践分享 分享人:阿里云MVP曾宪宇,2014开始 就职于启明信息,负责车联网平台的架构和建设,坐标吉林长春。 分享内容:结合主流MQ,介绍一款基于Java的开源消息队列客户端框架。
2948 0
一款消息队列的客户端框架——启明信息车联网MQ演进实践分享
|
2月前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
55 5
|
4月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 JavaScript Java
消息队列 MQ产品使用合集之如何动态增加consumerquque
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 中间件
【主流技术】聊一聊消息队列 RocketMQ 的基本结构与概念
2.6Broker 代理服务器(Broker)是消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 2.7Pull Consumer 拉取式消费(Pull Consumer)是 Consumer 消费的一种类型,也是默认的类型。下游应用系统通常主动调用 Consumer 的拉消息方法从 Broke r服务器拉消息,即主动权由下游应用控制。一旦获取了批量消息,应用就会启动消费过程。
|
4月前
|
消息中间件 监控 Java
消息队列 MQ产品使用合集之vipchannel的作用有哪些
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
消息中间件 存储 负载均衡
消息队列和应用工具产品体系-消息队列的基本概念
消息队列和应用工具产品体系-消息队列的基本概念
消息队列和应用工具产品体系-消息队列的基本概念
|
5月前
|
消息中间件 存储 数据库
深度剖析 RocketMQ 5.0,流存储:流场景的诉求是什么?
本文将从使用的角度出发,来更详细的展示一下流存储的场景,看看它和业务消息的场景有哪些区别。 RocketMQ 5.0 面向流存储的场景,提供了哪些特性。再结合两个数据集成的案例,来帮助大家了解流存储的用法。
3479 2
|
5月前
|
消息中间件 存储 Cloud Native
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」打造新一代云原生"消息、事件、流"统一消息引擎的融合处理平台
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」打造新一代云原生"消息、事件、流"统一消息引擎的融合处理平台
78 0
下一篇
无影云桌面