Netty3架构解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介:

前记

很早以前就有读Netty源码的打算了,然而第一次尝试的时候从Netty4开始,一直抓不到核心的框架流程,后来因为其他事情忙着就放下了。这次趁着休假重新捡起这个硬骨头,因为Netty3现在还在被很多项目使用,因而这次决定先从Netty3入手,瞬间发现Netty3的代码比Netty4中规中矩的多,很多概念在代码本身中都有清晰的表达,所以半天就把整个框架的骨架搞清楚了。再读 Netty4对Netty3的改进总结 ,回去读Netty4的源码,反而觉得轻松了,一种豁然开朗的感觉。

记得去年读Jetty源码的时候,因为代码太庞大,并且自己的HTTP Server的了解太少,因而只能自底向上的一个一个模块的叠加,直到最后把所以的模块连接在一起而看清它的真正核心骨架。现在读源码,开始习惯先把骨架理清,然后延伸到不同的器官、血肉而看清整个人体。

本文从Reactor模式在Netty3中的应用,引出Netty3的整体架构以及控制流程;然而除了Reactor模式,Netty3还在ChannelPipeline中使用了 Intercepting Filter 模式,这个模式也在Servlet的Filter中成功使用,因而本文还会从Intercepting Filter模式出发详细介绍ChannelPipeline的设计理念。本文假设读者已经对Netty有一定的了解,因而不会包含过多入门介绍,以及帮Netty做宣传的文字。

Netty3中的Reactor模式

Reactor模式在Netty中应用非常成功,因而它也是在Netty中受大肆宣传的模式,关于Reactor模式可以详细参考本人的另一篇文章 《Reactor模式详解》 ,对Reactor模式的实现是Netty3的基本骨架,因而本小节会详细介绍Reactor模式如何应用Netty3中。

如果读《Reactor模式详解》,我们知道Reactor模式由Handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handler构成,在Java的实现版本中,Channel对应Handle,Selector对应Synchronous Event Demultiplexer,并且Netty3还使用了两层Reactor:Main Reactor用于处理Client的连接请求,Sub Reactor用于处理和Client连接后的读写请求(关于这个概念还可以参考Doug Lea的这篇PPT: Scalable IO In Java )。所以我们先要解决Netty3中使用什么类实现所有的上述模块并把他们联系在一起的,以NIO实现方式为例:

模式是一种抽象,但是在实现中,经常会因为语言特性、框架和性能需要而做一些改变,因而Netty3对Reactor模式的实现有一套自己的设计:
1. ChannelEvent: Reactor是基于事件编程的,因而在Netty3中使用ChannelEvent抽象的表达Netty3内部可以产生的各种事件,所有这些事件对象在Channels帮助类中产生,并且由它将事件推入到ChannelPipeline中,ChannelPipeline构建ChannelHandler管道,ChannelEvent流经这个管道实现所有的业务逻辑处理。ChannelEvent对应的事件有:ChannelStateEvent表示Channel状态的变化事件,而如果当前Channel存在Parent Channel,则该事件还会传递到Parent Channel的ChannelPipeline中,如OPEN、BOUND、CONNECTED、INTEREST_OPS等,该事件可以在各种不同实现的Channel、ChannelSink中产生;MessageEvent表示从Socket中读取数据完成、需要向Socket写数据或ChannelHandler对当前Message解析(如Decoder、Encoder)后触发的事件,它由NioWorker、需要对Message做进一步处理的ChannelHandler产生;WriteCompletionEvent表示写完成而触发的事件,它由NioWorker产生;ExceptionEvent表示在处理过程中出现的Exception,它可以发生在各个构件中,如Channel、ChannelSink、NioWorker、ChannelHandler中;IdleStateEvent由IdleStateHandler触发,这也是一个ChannelEvent可以无缝扩展的例子。注:在Netty4后,已经没有ChannelEvent类,所有不同事件都用对应方法表达,这也意味这ChannelEvent不可扩展,Netty4采用在ChannelInboundHandler中加入userEventTriggered()方法来实现这种扩展,具体可以参考 这里
2. ChannelHandler: 在Netty3中,ChannelHandler用于表示Reactor模式中的EventHandler。ChannelHandler只是一个标记接口,它有两个子接口:ChannelDownstreamHandler和ChannelUpstreamHandler,其中ChannelDownstreamHandler表示从用户应用程序流向Netty3内部直到向Socket写数据的管道,在Netty4中改名为ChannelOutboundHandler;ChannelUpstreamHandler表示数据从Socket进入Netty3内部向用户应用程序做数据处理的管道,在Netty4中改名为ChannelInboundHandler。
3. ChannelPipeline: 用于管理ChannelHandler的管道,每个Channel一个ChannelPipeline实例,可以运行过程中动态的向这个管道中添加、删除ChannelHandler(由于实现的限制,在最末端的ChannelHandler向后添加或删除ChannelHandler不一定在当前执行流程中起效,参考 这里 )。ChannelPipeline内部维护一个ChannelHandler的双向链表,它以Upstream(Inbound)方向为正向,Downstream(Outbound)方向为方向。ChannelPipeline采用Intercepting Filter模式实现,具体可以参考 这里 ,这个模式的实现在后一节中还是详细介绍。
4. NioSelector: Netty3使用NioSelector来存放Selector(Synchronous Event Demultiplexer),每个新产生的NIO Channel都向这个Selector注册自己以让这个Selector监听这个NIO Channel中发生的事件,当事件发生时,调用帮助类Channels中的方法生成ChannelEvent实例,将该事件发送到这个Netty Channel对应的ChannelPipeline中,而交给各级ChannelHandler处理。其中在向Selector注册NIO Channel时,Netty Channel实例以Attachment的形式传入,该Netty Channel在其内部的NIO Channel事件发生时,会以Attachment的形式存在于SelectionKey中,因而每个事件可以直接从这个Attachment中获取相关链的Netty Channel,并从Netty Channel中获取与之相关联的ChannelPipeline,这个实现和Doug Lea的 Scalable IO In Java 一模一样。另外Netty3还采用了 Scalable IO In Java 中相同的Main Reactor和Sub Reactor设计,其中NioSelector的两个实现:Boss即为Main Reactor,NioWorker为Sub Reactor。Boss用来处理新连接加入的事件,NioWorker用来处理各个连接对Socket的读写事件,其中Boss通过NioWorkerPool获取NioWorker实例,Netty3模式使用RoundRobin方式放回NioWorker实例。更形象一点的,可以通过 Scalable IO In Java 的这张图表达:

若与Ractor模式对应,NioSelector中包含了Synchronous Event Demultiplexer,而ChannelPipeline中管理着所有EventHandler,因而NioSelector和ChannelPipeline共同构成了Initiation Dispatcher。
5. ChannelSink: 在ChannelHandler处理完成所有逻辑需要向客户端写响应数据时,一般会调用Netty Channel中的write方法,然而在这个write方法实现中,它不是直接向其内部的Socket写数据,而是交给Channels帮助类,内部创建DownstreamMessageEvent,反向从ChannelPipeline的管道中流过去,直到第一个ChannelHandler处理完毕,最后交给ChannelSink处理,以避免阻塞写而影响程序的吞吐量。ChannelSink将这个MessageEvent提交给Netty Channel中的writeBufferQueue,最后NioWorker会等到这个NIO Channel已经可以处理写事件时无阻塞的向这个NIO Channel写数据。这就是上图的send是从SubReactor直接出发的原因。
6. Channel: Netty有自己的Channel抽象,它是一个资源的容器,包含了所有一个连接涉及到的所有资源的饮用,如封装NIO Channel、ChannelPipeline、Boss、NioWorkerPool等。另外它还提供了向内部NIO Channel写响应数据的接口write、连接/绑定到某个地址的connect/bind接口等,个人感觉虽然对Channel本身来说,因为它封装了NIO Channel,因而这些接口定义在这里是合理的,但是如果考虑到Netty的架构,它的Channel只是一个资源容器,有这个Channel实例就可以得到和它相关的基本所有资源,因而这种write、connect、bind动作不应该再由它负责,而是应该由其他类来负责,比如在Netty4中就在ChannelHandlerContext添加了write方法,虽然netty4并没有删除Channel中的write接口。

Netty3中的Intercepting Filter模式

如果说Reactor模式是Netty3的骨架,那么Intercepting Filter模式则是Netty的中枢。Reactor模式主要应用在Netty3的内部实现,它是Netty3具有良好性能的基础,而Intercepting Filter模式则是ChannelHandler组合实现一个应用程序逻辑的基础,只有很好的理解了这个模式才能使用好Netty,甚至能得心应手。

关于Intercepting Filter模式的详细介绍可以参考 这里 ,本节主要介绍Netty3中对Intercepting Filter模式的实现,其实就是DefaultChannelPipeline对Intercepting Filter模式的实现。在上文有提到Netty3的ChannelPipeline是ChannelHandler的容器,用于存储与管理ChannelHandler,同时它在Netty3中也起到桥梁的作用,即它是连接Netty3内部到所有ChannelHandler的桥梁。作为ChannelPipeline的实现者DefaultChannelPipeline,它使用一个ChannelHandler的双向链表来存储,以DefaultChannelPipelineContext作为节点:
public   interface  ChannelHandlerContext {
    Channel getChannel();

    ChannelPipeline getPipeline();

    String getName();

    ChannelHandler getHandler();

    
boolean  canHandleUpstream();
    
boolean  canHandleDownstream();
    
void  sendUpstream(ChannelEvent e);
    
void  sendDownstream(ChannelEvent e);
    Object getAttachment();

    
void  setAttachment(Object attachment);
}

private   final   class  DefaultChannelHandlerContext  implements  ChannelHandlerContext {
    
volatile  DefaultChannelHandlerContext next;
    
volatile  DefaultChannelHandlerContext prev;
    
private   final  String name;
    
private   final  ChannelHandler handler;
    
private   final   boolean  canHandleUpstream;
    
private   final   boolean  canHandleDownstream;
    
private   volatile  Object attachment;
.....
}
在DefaultChannelPipeline中,它存储了和当前ChannelPipeline相关联的Channel、ChannelSink以及ChannelHandler链表的head、tail,所有ChannelEvent通过sendUpstream、sendDownstream为入口流经整个链表:
public   class  DefaultChannelPipeline  implements  ChannelPipeline {
    
private   volatile  Channel channel;
    
private   volatile  ChannelSink sink;
    
private   volatile  DefaultChannelHandlerContext head;
    
private   volatile  DefaultChannelHandlerContext tail;
......
    
public   void  sendUpstream(ChannelEvent e) {
        DefaultChannelHandlerContext head 
=  getActualUpstreamContext( this .head);
        
if  (head  ==   null ) {
            
return ;
        }
        sendUpstream(head, e);
    }

    
void  sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        
try  {
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
        } 
catch  (Throwable t) {
            notifyHandlerException(e, t);
        }
    }

    
public   void  sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail 
=  getActualDownstreamContext( this .tail);
        
if  (tail  ==   null ) {
            
try  {
                getSink().eventSunk(
this , e);
                
return ;
            } 
catch  (Throwable t) {
                notifyHandlerException(e, t);
                
return ;
            }
        }
        sendDownstream(tail, e);
    }

    
void  sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        
if  (e  instanceof  UpstreamMessageEvent) {
            
throw   new  IllegalArgumentException( " cannot send an upstream event to downstream " );
        }
        
try  {
            ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
        } 
catch  (Throwable t) {
            e.getFuture().setFailure(t);
            notifyHandlerException(e, t);
        }
    }
对Upstream事件,向后找到所有实现了ChannelUpstreamHandler接口的ChannelHandler组成链( getActualUpstreamContext()) ,而对Downstream事件,向前找到所有实现了ChannelDownstreamHandler接口的ChannelHandler组成链( getActualDownstreamContext() ):
     private  DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
        
if  (ctx  ==   null ) {
            
return   null ;
        }
        DefaultChannelHandlerContext realCtx 
=  ctx;
        
while  ( ! realCtx.canHandleUpstream()) {
            realCtx 
=  realCtx.next;
            
if  (realCtx  ==   null ) {
                
return   null ;
            }
        }
        
return  realCtx;
    }
    
private  DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
        
if  (ctx  ==   null ) {
            
return   null ;
        }
        DefaultChannelHandlerContext realCtx 
=  ctx;
        
while  ( ! realCtx.canHandleDownstream()) {
            realCtx 
=  realCtx.prev;
            
if  (realCtx  ==   null ) {
                
return   null ;
            }
        }
        
return  realCtx;
    }
在实际实现ChannelUpstreamHandler或ChannelDownstreamHandler时,调用 ChannelHandlerContext中的sendUpstream或sendDownstream方法将控制流程交给下一个 ChannelUpstreamHandler或下一个ChannelDownstreamHandler,或调用Channel中的write方法发送 响应消息。
public   class  MyChannelUpstreamHandler  implements  ChannelUpstreamHandler {
    
public   void  handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)  throws  Exception {
        
//  handle current logic, use Channel to write response if needed.
        
//  ctx.getChannel().write(message);
        ctx.sendUpstream(e);
    }
}

public   class  MyChannelDownstreamHandler  implements  ChannelDownstreamHandler {
    
public   void  handleDownstream(
            ChannelHandlerContext ctx, ChannelEvent e) 
throws  Exception {
        
//  handle current logic
        ctx.sendDownstream(e);
    }
}
当ChannelHandler向ChannelPipelineContext发送事件时,其内部从当前ChannelPipelineContext节点出发找到下一个ChannelUpstreamHandler或ChannelDownstreamHandler实例,并向其发送ChannelEvent,对于Downstream链,如果到达链尾,则将ChannelEvent发送给ChannelSink:
public   void  sendDownstream(ChannelEvent e) {
    DefaultChannelHandlerContext prev 
=  getActualDownstreamContext( this .prev);
    
if  (prev  ==   null ) {
        
try  {
            getSink().eventSunk(DefaultChannelPipeline.
this , e);
        } 
catch  (Throwable t) {
            notifyHandlerException(e, t);
        }
    } 
else  {
        DefaultChannelPipeline.
this .sendDownstream(prev, e);
    }
}

public   void  sendUpstream(ChannelEvent e) {
    DefaultChannelHandlerContext next 
=  getActualUpstreamContext( this .next);
    
if  (next  !=   null ) {
        DefaultChannelPipeline.
this .sendUpstream(next, e);
    }
}
正是因为这个实现,如果在一个末尾的ChannelUpstreamHandler中先移除自己,在向末尾添加一个新的ChannelUpstreamHandler,它是无效的,因为它的next已经在调用前就固定设置为null了。

ChannelPipeline作为ChannelHandler的容器,它还提供了各种增、删、改ChannelHandler链表中的方法,而且如果某个ChannelHandler还实现了LifeCycleAwareChannelHandler,则该ChannelHandler在被添加进ChannelPipeline或从中删除时都会得到同志:
public   interface  LifeCycleAwareChannelHandler  extends  ChannelHandler {
    
void  beforeAdd(ChannelHandlerContext ctx)  throws  Exception;
    
void  afterAdd(ChannelHandlerContext ctx)  throws  Exception;
    
void  beforeRemove(ChannelHandlerContext ctx)  throws  Exception;
    
void  afterRemove(ChannelHandlerContext ctx)  throws  Exception;
}

public   interface  ChannelPipeline {
    
void  addFirst(String name, ChannelHandler handler);
    
void  addLast(String name, ChannelHandler handler);
    
void  addBefore(String baseName, String name, ChannelHandler handler);
    
void  addAfter(String baseName, String name, ChannelHandler handler);
    
void  remove(ChannelHandler handler);
    ChannelHandler remove(String name);

    
< extends  ChannelHandler >  T remove(Class < T >  handlerType);
    ChannelHandler removeFirst();

    ChannelHandler removeLast();

    
void  replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

    
< extends  ChannelHandler >  T replace(Class < T >  oldHandlerType, String newName, ChannelHandler newHandler);
    ChannelHandler getFirst();

    ChannelHandler getLast();

    ChannelHandler get(String name);

    
< extends  ChannelHandler >  T get(Class < T >  handlerType);
    ChannelHandlerContext getContext(ChannelHandler handler);

    ChannelHandlerContext getContext(String name);

    ChannelHandlerContext getContext(Class
<?   extends  ChannelHandler >  handlerType);
    
void  sendUpstream(ChannelEvent e);
    
void  sendDownstream(ChannelEvent e);
    ChannelFuture execute(Runnable task);

    Channel getChannel();

    ChannelSink getSink();

    
void  attach(Channel channel, ChannelSink sink);
    
boolean  isAttached();
    List
< String >  getNames();
    Map
< String, ChannelHandler >  toMap();
}

在DefaultChannelPipeline的ChannelHandler链条的处理流程为:


相关文章
|
2月前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
73 1
|
29天前
|
运维 监控 持续交付
微服务架构解析:跨越传统架构的技术革命
微服务架构(Microservices Architecture)是一种软件架构风格,它将一个大型的单体应用拆分为多个小而独立的服务,每个服务都可以独立开发、部署和扩展。
181 36
微服务架构解析:跨越传统架构的技术革命
|
1月前
|
存储 Linux API
深入探索Android系统架构:从内核到应用层的全面解析
本文旨在为读者提供一份详尽的Android系统架构分析,从底层的Linux内核到顶层的应用程序框架。我们将探讨Android系统的模块化设计、各层之间的交互机制以及它们如何共同协作以支持丰富多样的应用生态。通过本篇文章,开发者和爱好者可以更深入理解Android平台的工作原理,从而优化开发流程和提升应用性能。
|
2月前
|
弹性计算 持续交付 API
构建高效后端服务:微服务架构的深度解析与实践
在当今快速发展的软件行业中,构建高效、可扩展且易于维护的后端服务是每个技术团队的追求。本文将深入探讨微服务架构的核心概念、设计原则及其在实际项目中的应用,通过具体案例分析,展示如何利用微服务架构解决传统单体应用面临的挑战,提升系统的灵活性和响应速度。我们将从微服务的拆分策略、通信机制、服务发现、配置管理、以及持续集成/持续部署(CI/CD)等方面进行全面剖析,旨在为读者提供一套实用的微服务实施指南。
|
2月前
|
SQL 数据可视化 数据库
多维度解析低代码:从技术架构到插件生态
本文深入解析低代码平台,涵盖技术架构、插件生态及应用价值。通过图形化界面和模块化设计,低代码平台降低开发门槛,提升效率,支持企业快速响应市场变化。重点分析开源低代码平台的优势,如透明架构、兼容性与扩展性、可定制化开发等,探讨其在数据处理、功能模块、插件生态等方面的技术特点,以及未来发展趋势。
|
2月前
|
SQL 数据可视化 数据库
多维度解析低代码:从技术架构到插件生态
本文深入解析低代码平台,从技术架构到插件生态,探讨其在企业数字化转型中的作用。低代码平台通过图形化界面和模块化设计降低开发门槛,加速应用开发与部署,提高市场响应速度。文章重点分析开源低代码平台的优势,如透明架构、兼容性与扩展性、可定制化开发等,并详细介绍了核心技术架构、数据处理与功能模块、插件生态及数据可视化等方面,展示了低代码平台如何支持企业在数字化转型中实现更高灵活性和创新。
53 1
|
2月前
|
SQL 数据可视化 数据库
多维度解析低代码:从技术架构到插件生态
本文深入解析低代码平台,涵盖技术架构、插件生态及应用价值。重点介绍开源低代码平台的优势,如透明架构、兼容性与扩展性、可定制化开发,以及其在数据处理、功能模块、插件生态等方面的技术特点。文章还探讨了低代码平台的安全性、权限管理及未来技术趋势,强调其在企业数字化转型中的重要作用。
|
2月前
|
存储 边缘计算 安全
深入解析边缘计算:架构、优势与挑战
深入解析边缘计算:架构、优势与挑战
60 0
|
8月前
|
前端开发 网络协议 Dubbo
Netty - 回顾Netty高性能原理和框架架构解析
Netty - 回顾Netty高性能原理和框架架构解析
297 0

推荐镜像

更多