RocketMQ中NettyRemotingServer的Reactor多线程模型
1、 一个 Reactor 主线程(eventLoopGroupBoss) 负责监听 TCP网络连接请求 建立好连接 创建SocketChannel 并注册到selector上 2、拿到网络数据后 再丢给Worker线程池(eventLoopGroupSelector) 3、在真正执行业务逻辑之前先需要defaultEventExecutorGroup进行 SSL验证、编解码、空闲检查、网络连接管理 4、根据 RomotingCommand 的业务请求码code去processorTable 这个本地缓存变量中找到对应的 processor 然后封装成task任务后 提交给对应的业务processor处理线程池来执行
消息过滤
1、 Consumer端订阅消息 是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引 然后再从CommitLog里面读取真正的消息实体内容 2、ConsumeQueue的存储结构 有8个字节存储的Message Tag的哈希值 基于Tag的消息过滤正式基于这个字段值的
- Tag过滤方式
1、 一个消息有多个TAG Consumer端在订阅消息时可以指定Topic或指定TAG 2、 发送一个Pull消息的请求给Broker端 3、 Broker端从RocketMQ的文件存储层—Store读取数据之前 a、 会用这些数据先构建一个MessageFilter 然后传给Store b、 Store从 ConsumeQueue读取到一条记录后 会用它记录的消息tag hash值去做过滤 c、 由于在服务端只是根据hashcode进行判断 无法精确对tag原始字符串进行过滤 故在消息消费端拉取到消息后 还需要对消息的原始tag字符串进行比对 如果不同,则丢弃该消息,不进行消息消费
- SQL92的过滤方式
和上面的Tag过滤方式区别只是在Store层的具体过滤过程不太一样 Store层通过SQL表达式检索ConsumeQueue索引 因会影响效率 所以在检索之前通过布隆过滤器避免每次都通过SQL表达式检索
负载均衡
Producer负载均衡
1、 Producer端在发送消息的时候 会先根据Topic找到指定的TopicPublishInfo 2、 在获取了TopicPublishInfo路由信息后 RocketMQ的客户端在默认方式下selectOneMessageQueue()方法 会从TopicPublishInfo中的messageQueueList中 3、 选择一个队列(MessageQueue)进行发送消息
具体的容错策略
何为不可用
就是按之前失败的 按一定的时间做退避 如果上次请求的延迟超过550Lms 就退避3000Lms 在这期间该broker代理不可用