RocketMQ基于Netty长连接

简介: Reactor主线程与长短连接Broker的 “Reactor” 线程,负责监听网络端口,如监听2888,39150这样的端口。

1 Reactor主线程与长短连接


Broker的 “Reactor” 线程,负责监听网络端口,如监听2888,39150这样的端口。

13.png



1.1 短连接

短连接,若你要给别人发送一个请求,须建立连接 -> 发送请求 -> 接收响应 -> 断开连接,下一次你要发送请求时,这过程得重来一遍。


每次建立一个连接后,使用这个连接发送请求的时间很短,很快就会断开这个连接,所以他存在时间太短,就是短连接。


1.2 长连接

长连接,建立一个连接 -> 发送请求 -> 接收响应 -> 发送请求 -> 接收响应。


整个过程是,当你建立好一个长连接后,可不停发送请求和接收响应,连接不会断开,等你不需要时再断开即可,该连接会存在很长时间,即长连接。


1.3 TCP长连接

基于TCP协议建立的长连接。


2 Pro和Broker建立一个长连接


此时有个Pro要跟Broker建立一个TCP长连接,则Broker上的这个Reactor主线程,它会在端口上监听到该Pro建立连接的请求。

12.png



接着该Reactor主线程就专职跟这Pro按TCP协议规定的一系列步骤和规范,建立好一个长连接。


2.1 SocketChannel

Pro里有个SocketChannel,Broker里也有个SocketChannel,这两个SocketChannel代表他们两建立好的这个长连接。


两个SocketChannel配对代表一个连接。

11.png



Pro就是通过SocketChannel去发消息给Broker。


3 基于Reactor线程池监听连接中的请求

3.1 Reactor线程池

在Broker中有个概念,即Reactor线程池。该线程池默认3个线程。


当Reactor主线程建立好的每个连接SocketChannel,都会交给这个Reactor线程池里的其中一个线程去监听请求。

10.png



整理分析

有了Reactor线程池后,即可让Pro发请求过来,它发送一个消息过来到达Broker的SocketChannel,此时Reactor线程池里的一个线程会监听到这个SocketChannel中有请求到达。


4 基于Worker线程池完成一系列准备工作

接着Reactor线程从SocketChannel读取出来一个请求,该请求在正式处理前,须先进行一些准备工作和预处理,如SSL加密验证、编码解码、连接空闲检查、网络连接管理等。


4.1 Worker线程池

Worker线程池,默认8个线程。Reactor线程收到的请求会叫个Worker线程池中的一个线程进行处理,来完成上述的一系列准备工作。


9.png


5 基于业务线程池完成请求的处理

经过Worker线程完成预处理后,就需对请求进行正式业务处理。即将请求转交给业务线程池。


5.1 SendMessage线程池

业务线程池的一种。


场景:对于处理发送消息请求而言,就会把请求转交给SendMessage线程池。SendMessage线程是可以配置的,配置的越多,处理消息的吞吐量越高。


业务处理逻辑

当Pro发消息过来,Broker接收到消息,肯定要写入CommitLog文件,后续要有一些ConsumeQueue之类需处理,类似这种操作,就是业务处理逻辑。


这一系列预处理后的请求都要转发给业务线程池。

8.png



6 为何这套网络通信框架会是高性能及高并发的?


由于专门分配了一个Reactor主线程,和各种Pro、Con建立长连接。


连接建立好之后,大量长连接均匀分配给Reactor线程池里的多个线程。


每个Reactor线程负责监听一部分连接的请求,通过多线程并发的监听不同连接的请求,有效提升网络框架并发力。


接着后续对大量并发过来的请求都是基于Worker线程池预处理,当Worker线程池预处理多个请求的时候,Reactor线程还可有条不紊继续监听和接收大量连接请求是否到达。


而且最终的读写磁盘文件之类的操作都交给业务线程池处理,当他并发执行多个请求的磁盘读写操作的时候,不影响其他线程池同时接收请求、预处理请求,没任何的影响。


所以最终效果:


Reactor主线程在端口上监听Producer建立连接的请求,建立长连接

Reactor线程池并发的监听多个连接的请求是否到达

Worker请求并发的对多个请求进行预处理

业务线程池并发的对多个请求进行磁盘读写业务操作

通过这样一套网络通信架构,最终实现可以高并发、高吞吐的对大量网络连接发送过来的大量请求进行预处理,从而保证Broker实现高吞吐。


总结


NIO概念同步非阻塞,每个请求对应一个socketchannel通道数据通过bytebuffer来传输,所有的socketchannel注册到selector选择器上reactor线程池从轮询来处理请求调用select poll epoll函数来获取数据,获取后的数据交给worker线程池来进行参数验证和信息封装,业务线程再去读取数据进行写入,每个线程池负责不同的内容相互不影响来提升并发。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
264 2
|
消息中间件 缓存 网络协议
使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案 上
使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案 上
使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案 上
|
编解码 负载均衡 网络协议
Netty实战三-如何让单机下Netty支持百万长连接?
Netty实战三-如何让单机下Netty支持百万长连接?
1268 0
|
消息中间件 缓存 网络协议
使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案 下
使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案 下
|
NoSQL Dubbo JavaScript
SpringBoot+Netty+Redis 搭建长连接集群
SpringBoot+Netty+Redis 搭建长连接集群
SpringBoot+Netty+Redis 搭建长连接集群
|
网络协议 前端开发 JavaScript
Netty进阶 -- WebSocket长连接开发
Netty进阶 -- WebSocket长连接开发
774 0
Netty进阶 -- WebSocket长连接开发
|
监控 数据可视化 Java
Netty(一) SpringBoot 整合长连接心跳机制(下)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
Netty(一) SpringBoot 整合长连接心跳机制(中)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
|
监控 Java
Netty(一) SpringBoot 整合长连接心跳机制(上)
Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13534 1