Netty源码解读(三)Channel与Pipeline

简介:

Channel是理解和使用Netty的核心。Channel的涉及内容较多,这里我使用由浅入深的介绍方法。在这篇文章中,我们主要介绍Channel部分中Pipeline实现机制。为了避免枯燥,借用一下《盗梦空间》的“梦境”概念,希望大家喜欢。

一层梦境:Channel实现概览

在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。

那么ChannelPipeline是什么呢?我觉得可以理解为ChannelHandler的容器:一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。

在Netty中,ChannelEvent是数据或者状态的载体,例如传输的数据对应MessageEvent,状态的改变对应ChannelStateEvent。当对Channel进行操作时,会产生一个ChannelEvent,并发送到ChannelPipeline。ChannelPipeline会选择一个ChannelHandler进行处理。这个ChannelHandler处理之后,可能会产生新的ChannelEvent,并流转到下一个ChannelHandler。

channel pipeline

例如,一个数据最开始是一个MessageEvent,它附带了一个未解码的原始二进制消息ChannelBuffer,然后某个Handler将其解码成了一个数据对象,并生成了一个新的MessageEvent,并传递给下一步进行处理。

到了这里,可以看到,其实Channel的核心流程位于ChannelPipeline中。于是我们进入ChannelPipeline的深层梦境里,来看看它具体的实现。

二层梦境:ChannelPipeline的主流程

Netty的ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。ChannelPipeline接口包含了两个重要的方法:sendUpstream(ChannelEvent e)sendDownstream(ChannelEvent e),就分别对应了Upstream和Downstream。

对应的,ChannelPipeline里包含的ChannelHandler也包含两类:ChannelUpstreamHandlerChannelDownstreamHandler。每条线路的Handler是互相独立的。它们都很简单的只包含一个方法:ChannelUpstreamHandler.handleUpstreamChannelDownstreamHandler.handleDownstream

Netty官方的javadoc里有一张图(ChannelPipeline接口里),非常形象的说明了这个机制(我对原图进行了一点修改,加上了ChannelSink,因为我觉得这部分对理解代码流程会有些帮助):

channel pipeline

什么叫ChannelSink呢?ChannelSink包含一个重要方法ChannelSink.eventSunk,可以接受任意ChannelEvent。“sink”的意思是”下沉”,那么”ChannelSink”好像可以理解为”Channel下沉的地方”?实际上,它的作用确实是这样,也可以换个说法:“处于末尾的万能Handler”。最初读到这里,也有些困惑,这么理解之后,就感觉简单许多。只有Downstream包含ChannelSink,这里会做一些建立连接、绑定端口等重要操作。为什么UploadStream没有ChannelSink呢?我只能认为,一方面,不符合”sink”的意义,另一方面,也没有什么处理好做的吧!

这里有个值得注意的地方:在一条“流”里,一个ChannelEvent并不会主动的”流”经所有的Handler,而是由上一个Handler显式的调用ChannelPipeline.sendUp(Down)stream产生,并交给下一个Handler处理。也就是说,每个Handler接收到一个ChannelEvent,并处理结束后,如果需要继续处理,那么它需要调用sendUp(Down)stream新发起一个事件。如果它不再发起事件,那么处理就到此结束,即使它后面仍然有Handler没有执行。这个机制可以保证最大的灵活性,当然对Handler的先后顺序也有了更严格的要求。

下面我们从代码层面来对这里面发生的事情进行深入分析,这部分涉及到一些细节,需要打开项目源码,对照来看,会比较有收获。

三层梦境:深入ChannelPipeline内部

DefaultChannelPipeline的内部结构

ChannelPipeline的主要的实现代码在DefaultChannelPipeline类里。列一下DefaultChannelPipeline的主要字段:


1 public class DefaultChannelPipeline implements ChannelPipeline {
2  
3     private volatile Channel channel;
4     private volatile ChannelSink sink;
5     private volatile DefaultChannelHandlerContext head;
6     private volatile DefaultChannelHandlerContext tail;
7     private final Map<String, DefaultChannelHandlerContext> name2ctx =
8         new HashMap<String, DefaultChannelHandlerContext>(4);
9 }
这里需要介绍一下 ChannelHandlerContext这个接口。顾名思义,ChannelHandlerContext保存了Netty与Handler相关的的上下文信息。而咱们这里的 DefaultChannelHandlerContext,则是对 ChannelHandler的一个包装。一个 DefaultChannelHandlerContext内部,除了包含一个 ChannelHandler,还保存了”next”和”prev”两个指针,从而形成一个双向链表。

因此,在DefaultChannelPipeline中,我们看到的是对DefaultChannelHandlerContext的引用,而不是对ChannelHandler的直接引用。这里包含”head”和”tail”两个引用,分别指向链表的头和尾。而name2ctx则是一个按名字索引DefaultChannelHandlerContext用户的一个map,主要在按照名称删除或者添加ChannelHandler时使用。

sendUpstream和sendDownstream

前面提到了,ChannelPipeline接口的两个重要的方法:sendUpstream(ChannelEvent e)sendDownstream(ChannelEvent e)。所有事件的发起都是基于这两个方法进行的。Channels类有一系列fireChannelBound之类的fireXXXX方法,其实都是对这两个方法的facade包装。

下面来看一下这两个方法的实现。先看sendUpstream(对代码做了一些简化,保留主逻辑):


01 public void sendUpstream(ChannelEvent e) {
02     DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
03     head.getHandler().handleUpstream(head, e);
04 }
05  
06 private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
07     DefaultChannelHandlerContext realCtx = ctx;
08     while (!realCtx.canHandleUpstream()) {
09         realCtx = realCtx.next;
10         if (realCtx == null) {
11             return null;
12         }
13     }
14     return realCtx;
15 }

这里最终调用了ChannelUpstreamHandler.handleUpstream来处理这个ChannelEvent。有意思的是,这里我们看不到任何”将Handler向后移一位”的操作,但是我们总不能每次都用同一个Handler来进行处理啊?实际上,我们更为常用的是ChannelHandlerContext.handleUpstream方法(实现是DefaultChannelHandlerContext.sendUpstream方法):


1 public void sendUpstream(ChannelEvent e) {
2     DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
3     DefaultChannelPipeline.this.sendUpstream(next, e);
4 }

可以看到,这里最终仍然调用了ChannelPipeline.sendUpstream方法,但是它会将Handler指针后移。

我们接下来看看DefaultChannelHandlerContext.sendDownstream:

01 public void sendDownstream(ChannelEvent e) {
02     DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
03     if (prev == null) {
04         try {
05             getSink().eventSunk(DefaultChannelPipeline.this, e);
06         } catch (Throwable t) {
07             notifyHandlerException(e, t);
08         }
09     } else {
10         DefaultChannelPipeline.this.sendDownstream(prev, e);
11     }
12 }

与sendUpstream好像不大相同哦?这里有两点:一是到达末尾时,就如梦境二所说,会调用ChannelSink进行处理;二是这里指针是往前移的,所以我们知道了:

UpstreamHandler是从前往后执行的,DownstreamHandler是从后往前执行的。在ChannelPipeline里添加时需要注意顺序了!

DefaultChannelPipeline里还有些机制,像添加/删除/替换Handler,以及ChannelPipelineFactory等,比较好理解,就不细说了。

回到现实:Pipeline解决的问题

好了,深入分析完代码,有点头晕了,我们回到最开始的地方,来想一想,Netty的Pipeline机制解决了什么问题?

我认为至少有两点:

一是提供了ChannelHandler的编程模型,基于ChannelHandler开发业务逻辑,基本不需要关心网络通讯方面的事情,专注于编码/解码/逻辑处理就可以了。Handler也是比较方便的开发模式,在很多框架中都有用到。

二是实现了所谓的”Universal Asynchronous API”。这也是Netty官方标榜的一个功能。用过OIO和NIO的都知道,这两套API风格相差极大,要从一个迁移到另一个成本是很大的。即使是NIO,异步和同步编程差距也很大。而Netty屏蔽了OIO和NIO的API差异,通过Channel提供对外接口,并通过ChannelPipeline将其连接起来,因此替换起来非常简单。

universal API

理清了ChannelPipeline的主流程,我们对Channel部分的大致结构算是弄清楚了。可是到了这里,我们依然对一个连接具体怎么处理没有什么概念。在下篇文章,我们会分析一下,在Netty中,究竟是如何处理连接的建立、数据的传输这些事情的。

目录
相关文章
|
12月前
|
编解码 安全 Java
Netty源码—1.服务端启动流程
本文主要介绍了服务端启动整体流程及关键方法、服务端启动的核心步骤、创建服务端Channel的源码、初始化服务端Channel的源码、注册服务端Channel的源码、绑定服务端端口的源码、服务端启动流程源码总结。
|
6月前
|
Java Linux 开发工具
Linux 安装 JDK 8 jdk-8u291-linux-x64.tar.gz 详细步骤教程(附安装包)
下载JDK 8安装包并解压至Linux系统(如/opt目录),配置JAVA_HOME、PATH和CLASSPATH环境变量,最后通过java -version和javac -version验证安装成功,即可运行Java程序。
1928 0
|
运维 关系型数据库 MySQL
PolarDB产品使用问题之PolarDB MySQL版和PolarDB-X的区别是什么
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
设计模式 网络协议 Java
技术笔记:Reactor设计模式
技术笔记:Reactor设计模式
422 0
|
容器 Cloud Native 安全
CTF本地靶场搭建——基于阿里云ACR实现动态flag题型的创建
本文介绍了如何利用阿里云ACR服务创建动态flag题型。阿里云容器镜像服务ACR是一个支持 OCI 标准的云原生制品托管和分发平台,提供全球化加速、大规模分发等功能,简化云原生应用交付。由于dockerhub访问不便,文章建议使用ACR作为替代。步骤包括在虚拟机内创建【GZCTF】->【WEB】->【src】文件夹,编写index.php和flag.sh文件,然后创建Dockerfile。接着,用户需在阿里云注册并使用ACR,构建、推送镜像,并在靶场部署动态容器。通过ACR,可以实现不同账号看到不同flag的动态更新,完成了动态flag题型的创建。
|
Prometheus Kubernetes 监控
最佳实践:Kubernetes 集群中 DNS 故障的可观测性与根因诊断
本文介绍了 CoreDNS 服务器、客户端侧的常见 DNS 异常、故障根因,异常观测方案和故障处理流程,希望对大家的问题诊断有所帮助。DNS 服务对于 Kubernetes 集群是至关重要的,除了观测异常之外,我们在架构设计之初就应充分考虑 DNS 服务的稳定性,采纳一些例如 DNS 本地缓存之类的最佳实践。
最佳实践:Kubernetes 集群中 DNS 故障的可观测性与根因诊断
|
存储 人工智能 Go
探索Gin框架:Golang使用Gin完成文件上传
探索Gin框架:Golang使用Gin完成文件上传
|
移动开发 监控 小程序
Python3.7配合Django2.0来调用钉钉(dingding)在线api实时监测员工考勤打卡情况
新冠疫情期间,大多数公司为了避免交叉感染都或多或少的采用了远程办公的方式,这显然是一个明智的选择,基本上钉钉(dingding)作为一个远程办公平台来用的话,虽然差强人意,但是奈何市面上没有啥更好的选择,矬子里拔将军,也还是可以凑合用的,不过远程办公有个问题,就是每天需要检查员工的考勤,居家办公虽然灵活,但是大家究竟有没有办公,则是另外一回事,钉钉提供的解决方案就是考勤在线打卡功能,但是检查出勤钉钉在移动端就有点费劲,需要在钉钉app里点击至少5次,还不能实时刷新,pc端的钉钉oa系统做的更烂,还不如移动端来得方便,另外如果你在一家上千人的企业里,这家企业有大大小小几十个部门,你又非常倒霉的担
Python3.7配合Django2.0来调用钉钉(dingding)在线api实时监测员工考勤打卡情况
|
弹性计算 网络安全 数据中心
阿里云专有网络RFC私网地址172、10和196网段选择攻略
2023阿里云专有网络RFC私网地址172、10和196网段选择攻略,阿里云专有网络VPC私网网段可选192.168.0.0/16、172.16.0.0/12或10.0.0.0/8,如何选择?阿里云百科来详细说下阿里云专有网络IPv4网段选择方法:
2048 0
阿里云专有网络RFC私网地址172、10和196网段选择攻略
|
JavaScript Linux 内存技术
Linux安装nvm Node版本管理工具
  NVM 是一个非常方便的node包管理工具,可以实现在NodeJS 各个不同版本之间自由的进行切换。下面,介绍用root权限安装NVM工具。到2021年7月,nvm的最新版本为v0.39.1。
Linux安装nvm Node版本管理工具