Netty网络编程(四):Event、Handler和Pipeline

简介: Netty网络编程(四):Event、Handler和Pipeline

简介

上一节我们讲解了netty中的Channel,知道了channel是事件处理器和外部联通的桥梁。今天本文将会详细讲解netty的剩下几个非常总要的部分Event、Handler和PipeLine。


ChannelPipeline

pipeLine是连接Channel和handler的桥梁,它实际上是一个filter的实现,用于控制其中handler的处理方式。


当一个channel被创建的时候,和它对应的ChannelPipeline也会被创建。


先看下ChannelPipeline的定义:

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable 

首先ChannelPipeline继承自Iterable,表示它是可遍历的,而遍历的结果就是其中一个个的Handler。


作为一个合格的Iterable,ChannelPipeline提供了一系列的add和remote方法,通过这些方法就可以向ChannelPipeline中添加或者移除Handler。因为ChannelPipeline是一个filter,而过滤器是需要指定对应的filter的顺序的,所以ChannelPipeline中有addFirst和addLast这种添加不同顺序的方法。


然后可以看到ChannelPipeline继承了ChannelInboundInvoker和ChannelOutboundInvoker两个接口。


先看一张channelPipeline的工作流程图:


39.png

可以看出ChannelPipeline主要有两种操作,一种是读入Inbound,一种是写出OutBound。


对于Socket.read()这样的读入操作,调用的实际上就是ChannelInboundInvoker中的方法。对于外部的IO写入的请求,调用的就是ChannelOutboundInvoker中的方法。


注意,Inbound和outbound的处理顺序是相反的,比如下面的例子:

  ChannelPipeline p = ...;
   p.addLast("1", new InboundHandlerA());
   p.addLast("2", new InboundHandlerB());
   p.addLast("3", new OutboundHandlerA());
   p.addLast("4", new OutboundHandlerB());
   p.addLast("5", new InboundOutboundHandlerX());

上面的代码中我们向ChannelPipeline添加了5个handler,其中2个InboundHandler,2个OutboundHandler和一个同时处理In和Out的Handler。


那么当channel遇到inbound event的时候,就会按照1,2,3,4,5的顺序进行处理,但是只有InboundHandler才能处理Inbound事件,所以,真正执行的顺序是1,2,5。


同样的当channel遇到outbound event的时候,会按照5,4,3,2,1的顺序进行执行,但是只有outboundHandler才能处理Outbound事件,所以真正执行的顺序是5,4,3.


简单的说,ChannelPipeline指定了Handler的执行顺序。


ChannelHandler

netty是一个事件驱动的框架,所有的event都是由Handler来进行处理的。ChannelHandler可以处理IO、拦截IO或者将event传递给ChannelPipeline中的下一个Handler进行处理。


ChannelHandler的结构很简单,只有三个方法,分别是:

void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

根据inbound和outbound事件的不同,ChannelHandler可以分为两类,分别是ChannelInboundHandler 和ChannelOutboundHandler.


因为这两个都是interface,实现起来比较麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler。前面两个很好理解,分别是inbound和outbound,最后一个可以同时处理inbound和outbound。


ChannelHandler是由ChannelHandlerContext提供的,并且和ChannelPipeline的交互也是通过ChannelHandlerContext来进行的。


ChannelHandlerContext

ChannelHandlerContext可以让ChannelHandler和ChannelPipeline或者其他的Handler进行交互。它就是一个上下文环境,使得Handler和Channel可以相互作用。


如可以在ChannelHandlerContext中,调用channel()获得绑定的channel。可以通过调用handler()获得绑定的Handler。通过调用fire*方法来触发Channel的事件。


看下ChannelHandlerContext的定义:

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker 

可以看到他是一个AttributeMap用来存储属性,还是一个ChannelInboundInvoker和ChannelOutboundInvoker用来触发和传播相应的事件。

对于Inbound来说传播事件的方法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

对于Outbound来说传播事件的方法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)

这些方法,在一个Handler中调用,然后将事件传递给下一个Handler,如下所示:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {
           System.out.println("Connected!");
           ctx.fireChannelActive();
       }
   }
   public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
        @Override
       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
           System.out.println("Closing ..");
           ctx.close(promise);
       }
   }

ChannelHandler中的状态变量

ChannelHandler是一个Handler类,一般情况下,这个类的实例是可以被多个channel共同使用的,前提是这个ChannelHandler没有共享的状态变量。


但有时候,我们必须要在ChannelHandler中保持一个状态,那么就涉及到ChannelHandler中的状态变量的问题,看下面的一个例子:

  public interface Message {
       // your methods here
   }
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
       private boolean loggedIn;
        @Override
       public void channelRead0(ChannelHandlerContext ctx, Message message) {
           if (message instanceof LoginMessage) {
               authenticate((LoginMessage) message);
               loggedIn = true;
           } else (message instanceof GetDataMessage) {
               if (loggedIn) {
                   ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
               } else {
                   fail();
               }
           }
       }
       ...
   }

这个例子中,我们需要在收到LoginMessage之后,对消息进行认证,并保存认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。


那么这样带有状态变量的Handler就只能绑定一个channel,如果绑定多个channel就有可能出现状态不一致的问题。一个channel绑定一个Handler实例,很简单,只需要在initChannel方法中使用new关键字新建一个对象即可。

   public class DataServerInitializer extends ChannelInitializer<Channel> {
        @Override
       public void initChannel(Channel channel) {
           channel.pipeline().addLast("handler", new DataServerHandler());
       }
   }

那么除了新建handler实例之外,还有没有其他的办法呢?当然是有的,那就是 ChannelHandlerContext 中的AttributeKey属性。还是上面的例子,我们看一下使用AttributeKey应该怎么实现:

   public interface Message {
       // your methods here
   }
    @Sharable
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
       private final AttributeKey<Boolean> auth =
             AttributeKey.valueOf("auth");
        @Override
       public void channelRead(ChannelHandlerContext ctx, Message message) {
           Attribute<Boolean> attr = ctx.attr(auth);
           if (message instanceof LoginMessage) {
               authenticate((LoginMessage) o);
               attr.set(true);
           } else (message instanceof GetDataMessage) {
               if (Boolean.TRUE.equals(attr.get())) {
                   ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
               } else {
                   fail();
               }
           }
       }
       ...
   }

上例中,首先定义了一个AttributeKey,然后使用ChannelHandlerContext的attr方法将Attribute设置到ChannelHandlerContext中,这样该Attribute绑定到这个ChannelHandlerContext中了。后续即使使用同一个Handler在不同的Channel中该属性也是不同的。


下面是使用共享Handler的例子:

   public class DataServerInitializer extends ChannelInitializer<Channel> {
       private static final DataServerHandler SHARED = new DataServerHandler();
        @Override
       public void initChannel(Channel channel) {
           channel.pipeline().addLast("handler", SHARED);
       }
   }

注意,在定义DataServerHandler的时候,我们加上了@Sharable注解,如果一个ChannelHandler使用了@Sharable注解,那就意味着你可以只创建一次这个Handler,但是可以将其绑定到一个或者多个ChannelPipeline中。


注意,@Sharable注解是为java文档准备的,并不会影响到实际的代码执行效果。


异步Handler


之前介绍了,可以通过调用pipeline.addLast方法将handler加入到pipeline中,因为pipeline是一个filter的结构,所以加入的handler是顺序进行处理的。


但是,我希望某些handler是在新的线程中执行该怎么办?如果我们希望这些新的线程中执行的Handler是无序的又该怎么办?


比如我们现在有3个handler分别是MyHandler1,MyHandler2和MyHandler3。


顺序执行的写法是这样的:

ChannelPipeline pipeline = ch.pipeline();
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast("MyHandler3", new MyHandler3());

如果要让MyHandler3在新的线程中执行,则可以加入group选项,从而让handler在新的group中运行:

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast(group,"MyHandler3", new MyHandler3());

但是上例中DefaultEventExecutorGroup加入的Handler也是会顺序执行的,如果确实不想顺序执行,那么可以尝试考虑使用UnorderedThreadPoolEventExecutor 。


总结

本文讲解了Event、Handler和PipeLine,并举例说明他们之间的关系和相互作用。后续会从netty的具体实践出发,进一步加深对netty的理解和应用,希望大家能够喜欢。

相关文章
|
3月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
17天前
|
消息中间件 编解码 网络协议
Netty从入门到精通:高性能网络编程的进阶之路
【11月更文挑战第17天】Netty是一个基于Java NIO(Non-blocking I/O)的高性能、异步事件驱动的网络应用框架。使用Netty,开发者可以快速、高效地开发可扩展的网络服务器和客户端程序。本文将带您从Netty的背景、业务场景、功能点、解决问题的关键、底层原理实现,到编写一个详细的Java示例,全面了解Netty,帮助您从入门到精通。
60 0
|
2月前
|
设计模式 Java
Netty Pipeline详解!
本文深入剖析了 Netty 的管道(Pipeline)设计,Netty 是一个基于 Java NIO 的高性能网络应用框架。文章详细介绍了 Pipeline 的原理、源码及设计思维。Pipeline 作为事件处理链条,包含多个处理器,负责处理入站和出站事件。核心组件包括 `ChannelPipeline`、`ChannelHandler` 和 `ChannelHandlerContext`。Netty 通过链式结构和上下文管理实现了高效的事件传播机制,具备高吞吐量和低延迟的特点。本文还探讨了 Pipeline 的职责分离、链式处理、高内聚低耦合等设计原则,为高性能网络编程提供了宝贵的启示。
77 8
|
3月前
|
存储 机器人 Linux
Netty(二)-服务端网络编程常见网络IO模型讲解
Netty(二)-服务端网络编程常见网络IO模型讲解
|
4月前
|
网络协议 C# 开发者
WPF与Socket编程的完美邂逅:打造流畅网络通信体验——从客户端到服务器端,手把手教你实现基于Socket的实时数据交换
【8月更文挑战第31天】网络通信在现代应用中至关重要,Socket编程作为其实现基础,即便在主要用于桌面应用的Windows Presentation Foundation(WPF)中也发挥着重要作用。本文通过最佳实践,详细介绍如何在WPF应用中利用Socket实现网络通信,包括创建WPF项目、设计用户界面、实现Socket通信逻辑及搭建简单服务器端的全过程。具体步骤涵盖从UI设计到前后端交互的各个环节,并附有详尽示例代码,助力WPF开发者掌握这一关键技术,拓展应用程序的功能与实用性。
123 0
|
4月前
|
Java 应用服务中间件 Linux
(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽!
现如今的开发环境中,分布式/微服务架构大行其道,而分布式/微服务的根基在于网络编程,而Netty恰恰是Java网络编程领域的无冕之王。Netty这个框架相信大家定然听说过,其在Java网络编程中的地位,好比JavaEE中的Spring。
151 3
|
4月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
224 2
|
4月前
|
存储 网络协议 Java
【Netty 神奇之旅】Java NIO 基础全解析:从零开始玩转高效网络编程!
【8月更文挑战第24天】本文介绍了Java NIO,一种非阻塞I/O模型,极大提升了Java应用程序在网络通信中的性能。核心组件包括Buffer、Channel、Selector和SocketChannel。通过示例代码展示了如何使用Java NIO进行服务器与客户端通信。此外,还介绍了基于Java NIO的高性能网络框架Netty,以及如何用Netty构建TCP服务器和客户端。熟悉这些技术和概念对于开发高并发网络应用至关重要。
77 0
|
5月前
|
前端开发 Java 数据处理
使用Netty构建高性能的网络应用
使用Netty构建高性能的网络应用
|
6月前
|
网络协议 Java 物联网
Netty是什么?深入理解高性能网络框架
Netty是什么?深入理解高性能网络框架
338 1