【原创】modb 功能设计之“支持多消费者单生产者”

简介:
【功能的具体化】
      首先,因为经我改造后的 rabbitmq-c 客户端程序是基于 libevent 的,所以天然可以做到单线程中同时处理多 TCP 连接。理论上,可以对外宣称”该客户端程序支持任意数量生产者和消费者的组合“,而不用担心多线程切换的开销。唯一可能拖慢整个系统的地方,是在 Consumer 针对 RabbitMQ 服务器应答进行回调处理的过程中,即回调处理函数中不能,或者说不应该,进行耗时的操作。  

a. 处理应答的回调函数中可能需要实现的处理逻辑有哪些?  
  1. 获取本地应用发来的 rabbitmq msg,再按目的地(routing_key)进行纯转发动作。对应模型为【1P + 1C + 内部转发 Queue】,运行在单独一个线程中。其中 P 代表 Producer,C 代表 Consumer,均是 RabbitMQ 里的概念。
  2. 获取外部应用发来的 rabbitmq msg,提取消息中含有的 sql 语句并执行,执行成功后再发送 rabbitmq msg 通知本地应用数据库更新成功。对应模型为【1P + nC + 内部 sql 处理 Queue】,运行于两个线程中,一个用于 rabbitmq 相关消息处理和转发,另一用于 sql 处理。
  3. 需要支持 json 或/和 XML 解析。
b. 采用单线模型还是多线程模型(每个线程中都有独自的 event_base)?  
      对于功能 1,单线程足矣;对于功能 2,目前看来至少需要2个线程,即 1P + nC 使用一个线程,sql 处理使用单独一个线程。  

c. 1P + 1C 和 1P + nC 是否可以或者应该在一个线程中处理?  
      按照前文的说明,一个线程中可以处理任意多 TCP 连接(当前改造后的 rabbitmq-c 的基础前提是:一个 TCP 连接上仅使用一个 channel,所以每个 Producer 和 Consumer 都需要独占一个 TCP 连接),所以关键问题是是否应该这么做。对于 Consumer 来说,因为可读事件是会被即时检测的,所以对于两种业务模型的差别就在于回调函数的实现:前者仅需关注转发目的地,后者需要额外的线程来执行 sql 语句。
      一种可能的执行流程如下:Consumer 在回调函数处理中将待执行 sql 语句提取出来后放入内部 sql 处理 Queue 中,待 sql 处理线程从中提取并完成 sql 执行后,将结果 push 进另外一个内部 sql_result Queue 中,而 Producer 会通过定时检测的方式获取 sql_result Queue 中的结果,并发送到用于通知本地业务的 exchange 上。
      上述流程没有详细说明的细节:Consumer 从 RabbitMQ 服务器消费消息时采用的是”自动应答“模式还是”手动应答“模式,这将会对消息的可靠性产生影响,间接影响到数据的一致性。因为在”自动应答“模式下,消息一旦被消费就会被 RabbitMQ 服务器删除掉,而一旦在 sql 执行过程中出现异常,则会导致数据不一致,需要自行实现某种机制保证数据的一致性。若在”手动应答“模式下,则可能的一种实现方式为,针对从 RabbitMQ 消费消息的应答,需要等待 sql 执行完成后才能进行。一则会拖慢整个事件驱动的轮转,二则当 sql 执行失败后,即使使用 REJECT 信令告之 RabbitMQ 服务器无法正确处理,该消息仍然会被删除掉,依然会有数据不一致的问题存在。如果使用了 带有 requeue 属性的 REJECT 信令呢?结果还是一样的,因为只有唯一一个 Consumer 去消费该消息,当再次重新消费该消息时,之前出现的 sql 异常可能还会出现。看来只有华山一条路了,自己实现某种纠错机制,保证数据的一致性。(关于纠错的问题,后续博客再说) 

【实现中遇到的挫折】

a. rabbitmq 线程与 sql 执行线程之间的跨线程通信手段
      如何在 sql 线程执行完成后,告之 Producer 去发送通知消息给其他业务。手段可能有很多种,但是考虑到 rabbitmq 线程是基于 libevent 运转的,这就需要一点小技巧了,后续有单独博客进行说明。

b. 消息传递 Queue 的使用
 
      在最初的设计中,【1P + 1C 
+ 内部转发 Queue】模型里使用的内部转发 Queue 是基于 Producer 和 Consumer 所使用的 conn 连接的,即每一个 conn 上都有这样一个 Queue 的存在。当开始调试【1P + 1C】功能时,发现当 Consumer 拿到 msg 后只能将其存放在自身 conn 上的 Queue 中,而 Producer 无法直接获得这些 msg (当然可以借助外部处理来 pop 和 push ,但似乎不是个好方案),所以决定还是采用全局 msgQ(转发 msg 用)的方式来处理。
 
      在改为全局 Queue 后又发现一个新问题,即 Consumer 对 msg 的接收具有即时性,但 Producer 对 msg 的发送却无法做到。原因在于,可以针对 Consumer 对应 conn 的 sockfd 监听可读和超时事件,但针对 Producer 对应 conn 的 sockfd 却不能去监听可写事件。这就导致了一个问题,即 Producer 只能按照定时器指定的时间发送 msg 。换句话说,即使设置的定时时间再短,也达不到即时发送的效果。那么如何解决这个问题呢?留个思考给大家把~~

(在只使用可读、可写、超时事件的情况下应该是无解的,估计可以利用信号事件解决)


最后附上一张【1P + 1C】的结构图:  
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
消息中间件 网络协议 Kafka
Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
【2月更文挑战第21天】Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
81 3
|
9月前
|
负载均衡 网络性能优化
EMQ如何保证消息不重复消费?
EMQ(Erlang MQTT Broker)通过以下机制来保证消息不重复消费
459 2
|
9月前
|
消息中间件 物联网
EMQ支不支持延迟消息, 如何实现
EMQ 是一个基于 Erlang/OTP 架构的开源物联网消息中间件(MQTT Broker)。目前的 EMQ 版本(截至 2023 年 7 月)不直接支持延迟消息。然而,你可以通过以下方法实现延迟消息的功能:
72 0
|
8月前
|
消息中间件 Java RocketMQ
顺利拿下Offer 通过分析rocketMq消费者拉取消息源码
顺利拿下Offer 通过分析rocketMq消费者拉取消息源码
83 0
|
9月前
RabbmitMQ学习笔记-自定义消费者
RabbmitMQ学习笔记-自定义消费者
35 0
|
消息中间件 存储 RocketMQ
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
655 0
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
|
消息中间件 Java RocketMQ
消息消费要注意的细节|学习笔记
快速学习消息消费要注意的细节
63 0
消息消费要注意的细节|学习笔记
|
消息中间件 存储 数据采集
CreateDirectStream 消费数据补充|学习笔记
快速学习 CreateDirectStream 消费数据补充
68 0
|
消息中间件 Kafka
|
Java 开发者
【第06个代码模型】综合案例:生产者与消费者(解决同步问题)|学习笔记
快速学习 【第 06 个代码模型】综合案例:生产者与消费者(解决同步问题)