netty系列之:Event、Handler和Pipeline

简介: netty系列之:Event、Handler和Pipeline

目录



简介


上一节我们讲解了netty中的Channel,知道了channel是事件处理器和外部联通的桥梁。


今天本文将会详细讲解netty的剩下几个非常总要的部分Event、Handler和PipeLine。


ChannelPipeline


pipeLine是连接Channel和handler的桥梁,它实际上是一个filter的实现,用于控制其中handler的处理方式。


当一个channel被创建的时候,和它对应的ChannelPipeline也会被创建。


先看下ChannelPipeline的定义:


public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable


首先ChannelPipeline继承自Iterable,表示它是可遍历的,而遍历的结果就是其中一个个的Handler。


作为一个合格的Iterable,ChannelPipeline提供了一系列的add和remote方法,通过这些方法就可以向ChannelPipeline中添加或者移除Handler。因为ChannelPipeline是一个filter,而过滤器是需要指定对应的filter的顺序的,所以ChannelPipeline中有addFirst和addLast这种添加不同顺序的方法。


然后可以看到ChannelPipeline继承了ChannelInboundInvoker和ChannelOutboundInvoker两个接口。


先看一张channelPipeline的工作流程图:


image.png


可以看出ChannelPipeline主要有两种操作,一种是读入Inbound,一种是写出OutBound。


对于Socket.read()这样的读入操作,调用的实际上就是ChannelInboundInvoker中的方法。对于外部的IO写入的请求,调用的就是ChannelOutboundInvoker中的方法。


注意,Inbound和outbound的处理顺序是相反的,比如下面的例子:


ChannelPipeline p = ...;
   p.addLast("1", new InboundHandlerA());
   p.addLast("2", new InboundHandlerB());
   p.addLast("3", new OutboundHandlerA());
   p.addLast("4", new OutboundHandlerB());
   p.addLast("5", new InboundOutboundHandlerX());


上面的代码中我们向ChannelPipeline添加了5个handler,其中2个InboundHandler,2个OutboundHandler和一个同时处理In和Out的Handler。


那么当channel遇到inbound event的时候,就会按照1,2,3,4,5的顺序进行处理,但是只有InboundHandler才能处理Inbound事件,所以,真正执行的顺序是1,2,5。


同样的当channel遇到outbound event的时候,会按照5,4,3,2,1的顺序进行执行,但是只有outboundHandler才能处理Outbound事件,所以真正执行的顺序是5,4,3.

简单的说,ChannelPipeline指定了Handler的执行顺序。


ChannelHandler



netty是一个事件驱动的框架,所有的event都是由Handler来进行处理的。


ChannelHandler可以处理IO、拦截IO或者将event传递给ChannelPipeline中的下一个Handler进行处理。


ChannelHandler的结构很简单,只有三个方法,分别是:


void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;


根据inbound和outbound事件的不同,ChannelHandler可以分为两类,分别是ChannelInboundHandler 和ChannelOutboundHandler.

因为这两个都是interface,实现起来比较麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler。前面两个很好理解,分别是inbound和outbound,最后一个可以同时处理inbound和outbound。


ChannelHandler是由ChannelHandlerContext提供的,并且和ChannelPipeline的交互也是通过ChannelHandlerContext来进行的。


ChannelHandlerContext


ChannelHandlerContext可以让ChannelHandler和ChannelPipeline或者其他的Handler进行交互。它就是一个上下文环境,使得Handler和Channel可以相互作用。


如可以在ChannelHandlerContext中,调用channel()获得绑定的channel。可以通过调用handler()获得绑定的Handler。通过调用fire*方法来触发Channel的事件。


看下ChannelHandlerContext的定义:


public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker


可以看到他是一个AttributeMap用来存储属性,还是一个ChannelInboundInvoker和ChannelOutboundInvoker用来触发和传播相应的事件。


对于Inbound来说传播事件的方法有:


ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()


对于Outbound来说传播事件的方法有:


ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)


这些方法,在一个Handler中调用,然后将事件传递给下一个Handler,如下所示:


public class MyInboundHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {
           System.out.println("Connected!");
           ctx.fireChannelActive();
       }
   }
   public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
        @Override
       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
           System.out.println("Closing ..");
           ctx.close(promise);
       }
   }


ChannelHandler中的状态变量


ChannelHandler是一个Handler类,一般情况下,这个类的实例是可以被多个channel共同使用的,前提是这个ChannelHandler没有共享的状态变量。


但有时候,我们必须要在ChannelHandler中保持一个状态,那么就涉及到ChannelHandler中的状态变量的问题,看下面的一个例子:


public interface Message {
       // your methods here
   }
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
       private boolean loggedIn;
        @Override
       public void channelRead0(ChannelHandlerContext ctx, Message message) {
           if (message instanceof LoginMessage) {
               authenticate((LoginMessage) message);
               loggedIn = true;
           } else (message instanceof GetDataMessage) {
               if (loggedIn) {
                   ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
               } else {
                   fail();
               }
           }
       }
       ...
   }


这个例子中,我们需要在收到LoginMessage之后,对消息进行认证,并保存认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。


那么这样带有状态变量的Handler就只能绑定一个channel,如果绑定多个channel就有可能出现状态不一致的问题。一个channel绑定一个Handler实例,很简单,只需要在initChannel方法中使用new关键字新建一个对象即可。


public class DataServerInitializer extends ChannelInitializer<Channel> {
        @Override
       public void initChannel(Channel channel) {
           channel.pipeline().addLast("handler", new DataServerHandler());
       }
   }


那么除了新建handler实例之外,还有没有其他的办法呢?当然是有的,那就是 ChannelHandlerContext 中的AttributeKey属性。还是上面的例子,我们看一下使用AttributeKey应该怎么实现:


public interface Message {
       // your methods here
   }
    @Sharable
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
       private final AttributeKey<Boolean> auth =
             AttributeKey.valueOf("auth");
        @Override
       public void channelRead(ChannelHandlerContext ctx, Message message) {
           Attribute<Boolean> attr = ctx.attr(auth);
           if (message instanceof LoginMessage) {
               authenticate((LoginMessage) o);
               attr.set(true);
           } else (message instanceof GetDataMessage) {
               if (Boolean.TRUE.equals(attr.get())) {
                   ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
               } else {
                   fail();
               }
           }
       }
       ...
   }


上例中,首先定义了一个AttributeKey,然后使用ChannelHandlerContext的attr方法将Attribute设置到ChannelHandlerContext中,这样该Attribute绑定到这个ChannelHandlerContext中了。后续即使使用同一个Handler在不同的Channel中该属性也是不同的。


下面是使用共享Handler的例子:


public class DataServerInitializer extends ChannelInitializer<Channel> {
       private static final DataServerHandler SHARED = new DataServerHandler();
        @Override
       public void initChannel(Channel channel) {
           channel.pipeline().addLast("handler", SHARED);
       }
   }


注意,在定义DataServerHandler的时候,我们加上了@Sharable注解,如果一个ChannelHandler使用了@Sharable注解,那就意味着你可以只创建一次这个Handler,但是可以将其绑定到一个或者多个ChannelPipeline中。


注意,@Sharable注解是为java文档准备的,并不会影响到实际的代码执行效果。


异步Handler


之前介绍了,可以通过调用pipeline.addLast方法将handler加入到pipeline中,因为pipeline是一个filter的结构,所以加入的handler是顺序进行处理的。


但是,我希望某些handler是在新的线程中执行该怎么办?如果我们希望这些新的线程中执行的Handler是无序的又该怎么办?


比如我们现在有3个handler分别是MyHandler1,MyHandler2和MyHandler3。


顺序执行的写法是这样的:


ChannelPipeline pipeline = ch.pipeline();
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast("MyHandler3", new MyHandler3());


如果要让MyHandler3在新的线程中执行,则可以加入group选项,从而让handler在新的group中运行:


static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast(group,"MyHandler3", new MyHandler3());


但是上例中DefaultEventExecutorGroup加入的Handler也是会顺序执行的,如果确实不想顺序执行,那么可以尝试考虑使用UnorderedThreadPoolEventExecutor 。


总结



本文讲解了Event、Handler和PipeLine,并举例说明他们之间的关系和相互作用。后续会从netty的具体实践出发,进一步加深对netty的理解和应用,希望大家能够喜欢。

相关文章
|
4月前
|
设计模式 Java
Netty Pipeline详解!
本文深入剖析了 Netty 的管道(Pipeline)设计,Netty 是一个基于 Java NIO 的高性能网络应用框架。文章详细介绍了 Pipeline 的原理、源码及设计思维。Pipeline 作为事件处理链条,包含多个处理器,负责处理入站和出站事件。核心组件包括 `ChannelPipeline`、`ChannelHandler` 和 `ChannelHandlerContext`。Netty 通过链式结构和上下文管理实现了高效的事件传播机制,具备高吞吐量和低延迟的特点。本文还探讨了 Pipeline 的职责分离、链式处理、高内聚低耦合等设计原则,为高性能网络编程提供了宝贵的启示。
141 8
|
9月前
|
设计模式 前端开发 数据安全/隐私保护
网络编程的魔法师:探索Netty中Handler的奇妙世界
网络编程的魔法师:探索Netty中Handler的奇妙世界
97 0
04RPC - netty中handler的执行顺序
04RPC - netty中handler的执行顺序
120 0
|
程序员
难倒了80%程序员的题,谈谈你对Netty中,Pipeline工作原理的理解
1位工作8年的小伙伴,去某东面试IM部门,被问到这样一道面试题。说,请你谈一谈你对Netty Pipeline设计原理的理解。当时,他说只是用过Netty的Pipline,原理没有深入了解过,然后就没有然后了。
98 0
|
前端开发 算法 Java
Netty组件Future、Promise、Handler、Pipline、ByteBuf
Netty组件Future、Promise、Handler、Pipline、ByteBuf
173 0
|
Java 关系型数据库 MySQL
10. 成功解决:io.netty.channel.ChannelPipelineException: ***Handler is not a @Sharable handler, so can&#39;t be added or removed multiple times.
使用 SpringBoot 集成 Netty 时,报如下错误:io.netty.channel.ChannelPipelineException: ***Handler is not a @Sharable handler, so can&#39;t be added or removed multiple times.
678 0
|
存储 Java
Netty网络编程(四):Event、Handler和Pipeline
Netty网络编程(四):Event、Handler和Pipeline
142 0
|
缓存 移动开发 网络协议
第 8 章 Netty 编解码器和 Handler 调用机制
第 8 章 Netty 编解码器和 Handler 调用机制
206 0
|
网络协议 算法
小六六学Netty系列之编解码器和handler的调用机制(下)
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
159 0
|
存储 编解码 网络协议
小六六学Netty系列之编解码器和handler的调用机制(上)
前言 文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820… 种一棵树最好的时间是十年前,其次是现在
252 0