网络通信的核心机制:Socket如何实现高效数据传输(中)

本文涉及的产品
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
数据传输服务 DTS,数据同步 1个月
简介: 网络通信的核心机制:Socket如何实现高效数据传输

通知用户特定事件

这些ChannelHandler实例添加到ChannelPipeline中,在ChannelPipeline中按顺序逐个执行。它类似于一个链条,有使用过Servlet的读者可能会更容易理解。

ChannelPipeline实现了拦截过滤器模式,这意味着我们连接不同的ChannelHandler来拦截并处理经过ChannelPipeline的数据或事件。可以把ChannelPipeline想象成UNIX管道,它允许不同的命令链(ChannelHandler相当于命令)。你还可以在运行时根据需要添加ChannelHandler实例到ChannelPipeline或从ChannelPipeline中删除,这能帮助我们构建高度灵活的Netty程序。此外,访问指定的ChannelPipeline和ChannelConfig,你能在Channel自身上进行操作。Channel提供了很多方法,如下列表:

  • ventLoop(),返回分配给Channel的EventLoop
  • pipeline(),返回分配给Channel的ChannelPipeline
  • isActive(),返回Channel是否激活,已激活说明与远程连接对等
  • localAddress(),返回已绑定的本地SocketAddress
  • remoteAddress(),返回已绑定的远程SocketAddress
  • write(),写数据到远程客户端,数据通过ChannelPipeline传输过去

后面会越来越熟悉这些方法,现在只需要记住我们的操作都是在相同的接口上运行,Netty的高灵活性让你可以以不同的传输实现进行重构。

写数据到远程已连接客户端可以调用Channel.write()方法,如下代码:

[java] view plaincopy
1. Channel channel = ...  
2. //Create ByteBuf that holds data to write  
3. ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);  
4. //Write data  
5. ChannelFuture cf = channel.write(buf);  
6. //Add ChannelFutureListener to get notified after write completes  
7. cf.addListener(new ChannelFutureListener() {  
8.     @Override  
9.     public void operationComplete(ChannelFuture future) {  
10.         //Write operation completes without error  
11.         if (future.isSuccess()) {  
12.             System.out.println(.Write successful.);  
13.         } else {  
14.             //Write operation completed but because of error  
15.             System.err.println(.Write error.);  
16.             future.cause().printStacktrace();  
17.         }  
18.     }  
19. });  
        Channel是线程安全(thread-safe)的,它可以被多个不同的线程安全的操作,在多线程环境下,所有的方法都是安全的。正因为Channel是安全的,我们存储对Channel的引用,并在学习的时候使用它写入数据到远程已连接的客户端,使用多线程也是如此。下面的代码是一个简单的多线程例子:
[java] view plaincopy
1. final Channel channel = ...  
2. //Create ByteBuf that holds data to write  
3. final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8);  
4. //Create Runnable which writes data to channel  
5. Runnable writer = new Runnable() {  
6.     @Override  
7.     public void run() {  
8.         channel.write(buf.duplicate());  
9.     }  
10. };  
11. //Obtain reference to the Executor which uses threads to execute tasks  
12. Executor executor = Executors.newChachedThreadPool();  
13. // write in one thread  
14. //Hand over write task to executor for execution in thread  
15. executor.execute(writer);  
16. // write in another thread  
17. //Hand over another write task to executor for execution in thread  
18. executor.execute(writer);

此外,这种方法保证了写入的消息以相同的顺序通过写入它们的方法。想了解所有方法的使用可以参考Netty API文档。

3)Netty包含的传输实现

Netty自带了一些传输协议的实现,虽然没有支持所有的传输协议,但是其自带的已足够我们来使用。Netty应用程序的传输协议依赖于底层协议,本节我们将学习Netty中的传输协议。

Netty中的传输方式有如下几种:

  • NIO,io.netty.channel.socket.nio,基于java.nio.channels的工具包,使用选择器作为基础的方法。
  • OIO,io.netty.channel.socket.oio,基于http://java.net的工具包,使用阻塞流。
  • Local,io.netty.channel.local,用来在虚拟机之间本地通信。
  • Embedded,io.netty.channel.embedded,嵌入传输,它允许在没有真正网络的运输中使用ChannelHandler,可以非常有用的来测试ChannelHandler的实现。

1)NIO - Nonblocking I/O

NIO传输是目前最常用的方式,它通过使用选择器提供了完全异步的方式操作所有的I/O,NIO从Java 1.4才被提供。NIO中,我们可以注册一个通道或获得某个通道的改变的状态,通道状态有下面几种改变:

一个新的Channel被接受并已准备好

  • Channel连接完成
  • Channel中有数据并已准备好读取
  • Channel发送数据出去

处理完改变的状态后需重新设置他们的状态,用一个线程来检查是否有已准备好的Channel,如果有则执行相关事件。在这里可能只同时一个注册的事件而忽略其他的。选择器所支持的操作在SelectionKey中定义,具体如下:

  • OP_ACCEPT,有新连接时得到通知
  • OP_CONNECT,连接完成后得到通知
  • OP_READ,准备好读取数据时得到通知
  • OP_WRITE,写入数据到通道时得到通知

Netty中的NIO传输就是基于这样的模型来接收和发送数据,通过封装将自己的接口提供给用户使用,这完全隐藏了内部实现。如前面所说,Netty隐藏内部的实现细节,将抽象出来的API暴露出来供使用。

NIO在处理过程也会有一定的延迟,若连接数不大的话,延迟一般在毫秒级,但是其吞吐量依然比OIO模式的要高。Netty中的NIO传输是“zero-file-copy”,也就是零文件复制,这种机制可以让程序速度更快,更高效的从文件系统中传输内容,零复制就是我们的应用程序不会将发送的数据先复制到JVM堆栈在进行处理,而是直接从内核空间操作。接下来我们将讨论OIO传输,它是阻塞的。

2)OIO - Old blocking I/O

OIO就是java中提供的Socket接口,java最开始只提供了阻塞的Socket,阻塞会导致程序性能低。下面是OIO的处理流程图,若想详细了解,可以参阅其他相关资料。

3)Local - In VM transport

Netty包含了本地传输,这个传输实现使用相同的API用于虚拟机之间的通信,传输是完全异步的。每个Channel使用唯一的SocketAddress,客户端通过使用SocketAddress进行连接,在服务器会被注册为长期运行,一旦通道关闭,它会自动注销,客户端无法再使用它。

连接到本地传输服务器的行为与其他的传输实现几乎是相同的,需要注意的一个重点是只能在本地的服务器和客户端上使用它们。Local未绑定任何Socket,值提供JVM进程之间的通信。

4)Embedded transport

Netty还包括嵌入传输,与之前讲述的其他传输实现比较,它是不是一个真的传输呢?若不是一个真的传输,我们用它可以做什么呢?Embedded transport允许更容易的使用不同的ChannelHandler之间的交互,这也更容易嵌入到其他的ChannelHandler实例并像一个辅助类一样使用它们。它一般用来测试特定的ChannelHandler实现,也可以在ChannelHandler中重新使用一些ChannelHandler来进行扩展,为了实现这样的目的,它自带了一个具体的Channel实现,即:EmbeddedChannel。

每种传输方式在什么时候使用?

不多加赘述,看下面列表:

  • OIO,在低连接数、需要低延迟时、阻塞时使用
  • NIO,在高连接数时使用
  • Local,在同一个JVM内通信时使用
  • ·Embedded,测试ChannelHandler时使用

3.2buffer数据缓冲

  • ByteBuf
  • ByteBufHolder
  • ByteBufAllocator

使用这些接口分配缓冲和执行操作

每当你需要传输数据时,它必须包含一个缓冲区。Java NIO API自带的缓冲区类是相当有限的,没有经过优化,使用JDK的ByteBuffer操作更复杂。缓冲区是一个重要的组建,它是API的一部分。Netty提供了一个强大的缓冲区实现用于表示一个字节序列,并帮助你操作原始字节或自定义的POJO。Netty的ByteBuf相当于JDK的ByteBuffer,ByteBuf的作用是在Netty中通过Channel传输数据。它被重新设计以解决JDK的ByteBuffer中的一些问题,从而使开发人员开发网络应用程序显得更有效率。本章将讲述Netty中的缓冲区,并了解它为什么比JDK自带的缓冲区实现更优秀,还会深入了解在Netty中使用ByteBuf访问数据以及如何使用它。

1)Buffer API

Netty的缓冲API有两个接口:

  • ByteBuf
  • ByteBufHolder

Netty使用reference-counting(引用计数)的时候知道安全释放Buf和其他资源,虽然知道Netty有效的使用引用计数,这都是自动完成的。这允许Netty使用池和其他技巧来加快速度和保持内存利用率在正常水平,你不需要做任何事情来实现这一点,但是在开发Netty应用程序时,你应该处理数据尽快释放池资源。

Netty缓冲API提供了几个优势:

  • 可以自定义缓冲类型
  • 通过一个内置的复合缓冲类型实现零拷贝
  • 扩展性好,比如StringBuffer
  • 不需要调用flip()来切换读/写模式
  • 读取和写入索引分开
  • 方法链
  • 引用计数
  • Pooling(池)

2)ByteBuf -字节数据容器

当需要与远程进行交互时,需要以字节码发送/接收数据。由于各种原因,一个高效、方便、易用的数据接口是必须的,而Netty的ByteBuf满足这些需求,ByteBuf是一个很好的经过优化的数据容器,我们可以将字节数据有效的添加到ByteBuf中或从ByteBuf中获取数据。ByteBuf有2部分:一个用于读,一个用于写。我们可以按顺序的读取数据,并且可以跳到开始重新读一遍。所有的数据操作,我们只需要做的是调整读取数据索引和再次开始读操作。

ByteBuf如何在工作?

写入数据到ByteBuf后,写入索引是增加的字节数量。开始读字节后,读取索引增加。你可以读取字节,直到写入索引和读取索引处理相同的位置,次数若继续读取,则会抛出IndexOutOfBoundsException。调用ByteBuf的任何方法开始读/写都会单独维护读索引和写索引。ByteBuf的默认最大容量限制是Integer.MAX_VALUE,写入时若超出这个值将会导致一个异常。

ByteBuf类似于一个字节数组,最大的区别是读和写的索引可以用来控制对缓冲区数据的访问。下图显示了一个容量为16的ByteBuf:

不同类型的ByteBuf

使用Netty时会遇到3种不同类型的ByteBuf

Heap Buffer(堆缓冲区)

最常用的类型是ByteBuf将数据存储在JVM的堆空间,这是通过将数据存储在数组的实现。堆缓冲区可以快速分配,当不使用时也可以快速释放。它还提供了直接访问数组的方法,通过ByteBuf.array()来获取byte[]数据。

访问非堆缓冲区ByteBuf的数组会导致UnsupportedOperationException,可以使用ByteBuf.hasArray()来检查是否支持访问数组。

Direct Buffer(直接缓冲区)

直接缓冲区,在堆之外直接分配内存。直接缓冲区不会占用堆空间容量,使用时应该考虑到应用程序要使用的最大内存容量以及如何限制它。直接缓冲区在使用Socket传递数据时性能很好,因为若使用间接缓冲区,JVM会先将数据复制到直接缓冲区再进行传递;但是直接缓冲区的缺点是在分配内存空间和释放内存时比堆缓冲区更复杂,而Netty使用内存池来解决这样的问题,这也是Netty使用内存池的原因之一。直接缓冲区不支持数组访问数据,但是我们可以间接的访问数据数组,如下面代码:

[java] view plaincopy
1. ByteBuf directBuf = Unpooled.directBuffer(16);  
2. if(!directBuf.hasArray()){  
3.     int len = directBuf.readableBytes();  
4.     byte[] arr = new byte[len];  
5.     directBuf.getBytes(0, arr);  
6. }

访问直接缓冲区的数据数组需要更多的编码和更复杂的操作,建议若需要在数组访问数据使用堆缓冲区会更好。

Composite Buffer(复合缓冲区)

复合缓冲区,我们可以创建多个不同的ByteBuf,然后提供一个这些ByteBuf组合的视图。复合缓冲区就像一个列表,我们可以动态的添加和删除其中的ByteBuf,JDK的ByteBuffer没有这样的功能。Netty提供了CompositeByteBuf类来处理复合缓冲区,CompositeByteBuf只是一个视图,CompositeByteBuf.hasArray()总是返回false,因为它可能包含一些直接或间接的不同类型的ByteBuf。

例如,一条消息由header和body两部分组成,将header和body组装成一条消息发送出去,可能body相同,只是header不同,使用CompositeByteBuf就不用每次都重新分配一个新的缓冲区。下图显示CompositeByteBuf组成header和body:

若使用JDK的ByteBuffer就不能这样简单的实现,只能创建一个数组或创建一个新的ByteBuffer,再将内容复制到新的ByteBuffer中。下面是使用CompositeByteBuf的例子:

[java] view plaincopy
1. CompositeByteBuf compBuf = Unpooled.compositeBuffer();  
2. ByteBuf heapBuf = Unpooled.buffer(8);  
3. ByteBuf directBuf = Unpooled.directBuffer(16);  
4. //添加ByteBuf到CompositeByteBuf  
5. compBuf.addComponents(heapBuf,directBuf);  
6. //删除第一个ByteBuf  
7. compBuf.removeComponent(0);  
8. Iterator<ByteBuf> iter = compBuf.iterator();  
9. while(iter.hasNext()){  
10.     System.out.println(iter.next().toString());  
11. }  
12. //使用数组访问数据  
13. if(!compBuf.hasArray()){  
14.     int len = compBuf.readableBytes();  
15.     byte[] arr = new byte[len];  
16.     compBuf.getBytes(0, arr);  
17. }

CompositeByteBuf是ByteBuf的子类,我们可以像操作BytBuf一样操作CompositeByteBuf。并且Netty优化套接字读写的操作是尽可能的使用CompositeByteBuf来做的,使用CompositeByteBuf不会操作内存泄露问题。

3)ByteBuf的字节操作

ByteBuf提供了许多操作,允许修改其中的数据内容或只是读取数据。ByteBuf和JDK的ByteBuffer很像,但是ByteBuf提供了更好的性能。

随机访问索引

ByteBuf使用zero-based-indexing(从0开始的索引),第一个字节的索引是0,最后一个字节的索引是ByteBuf的capacity - 1,下面代码是遍历ByteBuf的所有字节:

[java] view plaincopy
1. //create a ByteBuf of capacity is 16  
2. ByteBuf buf = Unpooled.buffer(16);  
3. //write data to buf  
4. for(int i=0;i<16;i++){  
5.     buf.writeByte(i+1);  
6. }  
7. //read data from buf  
8. for(int i=0;i<buf.capacity();i++){  
9.     System.out.println(buf.getByte(i));  
10. }

注意通过索引访问时不会推进读索引和写索引,我们可以通过ByteBuf的readerIndex()或writerIndex()来分别推进读索引或写索引。

顺序访问索引

ByteBuf提供两个指针变量支付读和写操作,读操作是使用readerIndex(),写操作时使用writerIndex()。这和JDK的ByteBuffer不同,ByteBuffer只有一个方法来设置索引,所以需要使用flip()方法来切换读和写模式。

ByteBuf一定符合:0 <= readerIndex <= writerIndex <= capacity。

Discardable bytes废弃字节

我们可以调用ByteBuf.discardReadBytes()来回收已经读取过的字节,discardReadBytes()将丢弃从索引0到readerIndex之间的字节。调用discardReadBytes()方法后会变成如下图:

ByteBuf.discardReadBytes()可以用来清空ByteBuf中已读取的数据,从而使ByteBuf有多余的空间容纳新的数据,但是discardReadBytes()可能会涉及内存复制,因为它需要移动ByteBuf中可读的字节到开始位置,这样的操作会影响性能,一般在需要马上释放内存的时候使用收益会比较大。

可读字节(实际内容)

任何读操作会增加readerIndex,如果读取操作的参数也是一个ByteBuf而没有指定目的索引,指定的目的缓冲区的writerIndex会一起增加,没有足够的内容时会抛出IndexOutOfBoundException。新分配、包装、复制的缓冲区的readerIndex的默认值都是0。下面代码显示了获取所有可读数据:

[java] view plaincopy
1. ByteBuf buf = Unpooled.buffer(16);  
2. while(buf.isReadable()){  
3.     System.out.println(buf.readByte());  
4. }

(代码于原书中有出入,原书可能是基于Netty4之前的版本讲解的,此处基于Netty4)

可写字节Writable bytes

任何写的操作会增加writerIndex。若写操作的参数也是一个ByteBuf并且没有指定数据源索引,那么指定缓冲区的readerIndex也会一起增加。若没有足够的可写字节会抛出IndexOutOfBoundException。新分配的缓冲区writerIndex的默认值是0。下面代码显示了随机一个int数字来填充缓冲区,直到缓冲区空间耗尽:

[java] view plaincopy
1. Random random = new Random();  
2. ByteBuf buf = Unpooled.buffer(16);  
3. while(buf.writableBytes() >= 4){  
4.     buf.writeInt(random.nextInt());  
5. }

清除缓冲区索引Clearing the buffer indexs

调用ByteBuf.clear()可以设置readerIndex和writerIndex为0,clear()不会清除缓冲区的内容,只是将两个索引值设置为0。请注意ByteBuf.clear()与JDK的ByteBuffer.clear()的语义不同。

搜索操作Search operations

各种indexOf()方法帮助你定位一个值的索引是否符合,我们可以用ByteBufProcessor复杂动态顺序搜索实现简单的静态单字节搜索。如果你想解码可变长度的数据,如null结尾的字符串,你会发现bytesBefore(byte value)方法有用。例如我们写一个集成的flash sockets的应用程序,这个应用程序使用NULL结束的内容,使用bytesBefore(byte value)方法可以很容易的检查数据中的空字节。没有ByteBufProcessor的话,我们需要自己做这些事情,使用ByteBufProcessor效率更好。

标准和重置Mark and reset

每个ByteBuf有两个标注索引,一个存储readerIndex,一个存储writerIndex。你可以通过调用一个重置方法重新定位两个索引之一,它类似于InputStream的标注和重置方法,没有读限制。我们可以通过调用readerIndex(int readerIndex)和writerIndex(int writerIndex)移动读索引和写索引到指定位置,调用这两个方法设置指定索引位置时可能抛出IndexOutOfBoundException。

调用duplicate()、slice()、slice(int index, int length)、order(ByteOrder endianness)会创建一个现有缓冲区的视图。衍生的缓冲区有独立的readerIndex、writerIndex和标注索引。如果需要现有缓冲区的全新副本,可以使用copy()或copy(int index, int length)获得。看下面代码:

[java] view plaincopy
1. // get a Charset of UTF-8  
2. Charset utf8 = Charset.forName("UTF-8");  
3. // get a ByteBuf  
4. ByteBuf buf = Unpooled.copiedBuffer("“Netty in Action rocks!“", utf8);  
5. // slice  
6. ByteBuf sliced = buf.slice(0, 14);  
7. // copy  
8. ByteBuf copy = buf.copy(0, 14);  
9. // print "“Netty in Action rocks!“"  
10. System.out.println(buf.toString(utf8));  
11. // print "“Netty in Act"  
12. System.out.println(sliced.toString(utf8));  
13. // print "“Netty in Act"  
14. System.out.println(copy.toString(utf8));

读/写操作以及其他一些操作

有两种主要类型的读写操作:

  • get/set操作以索引为基础,在给定的索引设置或获取字节
  • 从当前索引开始读写,递增当前的写索引或读索引

ByteBuf的各种读写方法或其他一些检查方法可以看ByteBuf的源码,这里不赘述了。

4)ByteBufHolder

ByteBufHolder是一个辅助类,是一个接口,其实现类是DefaultByteBufHolder,还有一些实现了ByteBufHolder接口的其他接口类。ByteBufHolder的作用就是帮助更方便的访问ByteBuf中的数据,当缓冲区没用了后,可以使用这个辅助类释放资源。ByteBufHolder很简单,提供的可供访问的方法也很少。如果你想实现一个“消息对象”有效负载存储在ByteBuf,使用ByteBufHolder是一个好主意。

尽管Netty提供的各种缓冲区实现类已经很容易使用,但Netty依然提供了一些使用的工具类,使得创建和使用各种缓冲区更加方便。下面会介绍一些Netty中的缓冲区工具类。

ByteBufAllocator

Netty支持各种ByteBuf的池实现,来使Netty提供一种称为ByteBufAllocator成为可能。ByteBufAllocator负责分配ByteBuf实例,ByteBufAllocator提供了各种分配不同ByteBuf的方法,如需要一个堆缓冲区可以使用ByteBufAllocator.heapBuffer(),需要一个直接缓冲区可以使用ByteBufAllocator.directBuffer(),需要一个复合缓冲区可以使用ByteBufAllocator.compositeBuffer()。其他方法的使用可以看ByteBufAllocator源码及注释。

获取ByteBufAllocator对象很容易,可以从Channel的alloc()获取,也可以从ChannelHandlerContext的alloc()获取。看下面代码:

[java] view plaincopy
1. ServerBootstrap b = new ServerBootstrap();  
2. b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))  
3.         .childHandler(new ChannelInitializer<SocketChannel>() {  
4.             @Override  
5.             protected void initChannel(SocketChannel ch) throws Exception {  
6.                 // get ByteBufAllocator instance by Channel.alloc()  
7.                 ByteBufAllocator alloc0 = ch.alloc();  
8.                 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  
9.                     @Override  
10.                     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
11.                         //get ByteBufAllocator instance by ChannelHandlerContext.alloc()  
12.                         ByteBufAllocator alloc1 = ctx.alloc();  
13.                         ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);  
14.                     }  
15.                 });  
16.             }  
17.         });

Netty有两种不同的ByteBufAllocator实现,一个实现ByteBuf实例池将分配和回收成本以及内存使用降到最低;另一种实现是每次使用都创建一个新的ByteBuf实例。Netty默认使用PooledByteBufAllocator,我们可以通过ChannelConfig或通过引导设置一个不同的实现来改变。更多细节在后面讲述。

Unpooled

Unpooled也是用来创建缓冲区的工具类,Unpooled的使用也很容易。Unpooled提供了很多方法,详细方法及使用可以看API文档或Netty源码。看下面代码:

[java] view plaincopy
1. //创建复合缓冲区  
2. CompositeByteBuf compBuf = Unpooled.compositeBuffer();  
3. //创建堆缓冲区  
4. ByteBuf heapBuf = Unpooled.buffer(8);  
5. //创建直接缓冲区  
6. ByteBuf directBuf = Unpooled.directBuffer(16);

ByteBufUtil

ByteBufUtil提供了一些静态的方法,在操作ByteBuf时非常有用。ByteBufUtil提供了Unpooled之外的一些方法,也许最有价值的是hexDump(ByteBuf buffer)方法,这个方法返回指定ByteBuf中可读字节的十六进制字符串,可以用于调试程序时打印ByteBuf的内容,十六进制字符串相比字节而言对用户更友好。

5)Summary

本章主要学习Netty提供的缓冲区类ByteBuf的创建和简单实用以及一些操作ByteBuf的工具类。

3.3处理器

  • ChannelPipeline
  • ChannelHandlerContext
  • ChannelHandler
  • Inbound vs outbound(入站和出站)

接受连接或创建他们只是你的应用程序的一部分,虽然这些任何很重要,但是一个网络应用程序旺旺是更复杂的,需要更多的代码编写,如处理传入和传出的数据。Netty提供了一个强大的处理这些事情的功能,允许用户自定义ChannelHandler的实现来处理数据。使得ChannelHandler更强大的是可以连接每个ChannelHandler来实现任务,这有助于代码的整洁和重用。但是处理数据只是ChannelHandler所做的事情之一,也可以压制I/O操作,例如写请求。所有这些都可以动态实现。

ChannelPipeline

ChannelPipeline是ChannelHandler实例的列表,用于处理或截获通道的接收和发送数据。ChannelPipeline提供了一种高级的截取过滤器模式,让用户可以在ChannelPipeline中完全控制一个事件及如何处理ChannelHandler与ChannelPipeline的交互。

对于每个新的通道,会创建一个新的ChannelPipeline并附加至通道。一旦连接,Channel和ChannelPipeline之间的耦合是永久性的。Channel不能附加其他的ChannelPipeline或从ChannelPipeline分离。

ChannelHandler在ChannelPipeline中的I/O处理,一个I/O操作可以由一个ChannelInboundHandler或ChannelOutboundHandler进行处理,并通过调用ChannelInboundHandler处理入站IO或通过ChannelOutboundHandler处理出站IO。

ChannelPipeline是ChannelHandler的一个列表;如果一个入站I/O事件被触发,这个事件会从第一个开始依次通过ChannelPipeline中的ChannelHandler;若是一个入站I/O事件,则会从最后一个开始依次通过

ChannelPipeline中的ChannelHandler。ChannelHandler可以处理事件并检查类型,如果某个ChannelHandler不能处理则会跳过,并将事件传递到下一个ChannelHandler。ChannelPipeline可以动态添加、删除、替换其中的ChannelHandler,这样的机制可以提高灵活性。

修改ChannelPipeline的方法:

  • addFirst(...),添加ChannelHandler在ChannelPipeline的第一个位置
  • addBefore(...),在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler
  • addAfter(...),在ChannelPipeline中指定的ChannelHandler名称之后添加ChannelHandler
  • addLast(ChannelHandler...),在ChannelPipeline的末尾添加ChannelHandler
  • remove(...),删除ChannelPipeline中指定的ChannelHandler
  • replace(...),替换ChannelPipeline中指定的ChannelHandler
[java] view plaincopy
1. ChannelPipeline pipeline = ch.pipeline();  
2. FirstHandler firstHandler = new FirstHandler();  
3. pipeline.addLast("handler1", firstHandler);  
4. pipeline.addFirst("handler2", new SecondHandler());  
5. pipeline.addLast("handler3", new ThirdHandler());  
6. pipeline.remove("“handler3“");  
7. pipeline.remove(firstHandler);  
8. pipeline.replace("handler2", "handler4", new FourthHandler());

被添加到ChannelPipeline的ChannelHandler将通过IO-Thread处理事件,这意味了必须不能有其他的IO-Thread阻塞来影响IO的整体处理;有时候可能需要阻塞,例如JDBC。因此,Netty允许通过一个EventExecutorGroup到每一个ChannelPipeline.add*方法,自定义的事件会被包含在EventExecutorGroup中的EventExecutor来处理,默认的实现是DefaultEventExecutorGroup。

ChannelPipeline除了一些修改的方法,还有很多其他的方法,具体是方法及使用可以看API文档或源码。

1)ChannelHandlerContext

每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext并与之创建的ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler实现进行交互,这是相同ChannelPipeline的一部分。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。

2)通知下一个ChannelHandler

在相同的ChannelPipeline中通过调用ChannelInboundHandler和ChannelOutboundHandler中各个方法中的一个方法来通知最近的handler,通知开始的地方取决你如何设置。下图显示了ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系:

如果你想有一些事件流全部通过ChannelPipeline,有两个不同的方法可以做到:

  • 调用Channel的方法
  • 调用ChannelPipeline的方法

这两个方法都可以让事件流全部通过ChannelPipeline。无论从头部还是尾部开始,因为它主要依赖于事件的性质。如果是一个“入站”事件,它开始于头部;若是一个“出站”事件,则开始于尾部。

下面的代码显示了一个写事件如何通过ChannelPipeline从尾部开始:

[java] view plaincopy
1. @Override  
2. protected void initChannel(SocketChannel ch) throws Exception {  
3.     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  
4.         @Override  
5.         public void channelActive(ChannelHandlerContext ctx) throws Exception {  
6.             //Event via Channel  
7.             Channel channel = ctx.channel();  
8.             channel.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));  
9.             //Event via ChannelPipeline  
10.             ChannelPipeline pipeline = ctx.pipeline();  
11.             pipeline.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));  
12.         }  
13.     });  
14. }

通过Channel或ChannelPipeline的通知

可能你想从ChannelPipeline的指定位置开始,不想流经整个ChannelPipeline,如下情况:

为了节省开销,不感兴趣的ChannelHandler不让通过,排除一些ChannelHandler

在这种情况下,你可以使用ChannelHandlerContext的ChannelHandler通知起点。它使用ChannelHandlerContext执行下一个ChannelHandler。下面代码显示了直接使用ChannelHandlerContext操作:

[java] view plaincopy
1. // Get reference of ChannelHandlerContext  
2. ChannelHandlerContext ctx = ..;  
3. // Write buffer via ChannelHandlerContext  
4. ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

该消息流经ChannelPipeline到下一个ChannelHandler,在这种情况下使用ChannelHandlerContext开始下一个ChannelHandler。

从指定的ChannelHandlerContext开始,跳过前面所有的ChannelHandler,使用ChannelHandlerContext操作是常见的模式,最常用的是从ChannelHanlder调用操作,也可以在外部使用ChannelHandlerContext,因为这是线程安全的。

修改ChannelPipeline

调用ChannelHandlerContext的pipeline()方法能访问ChannelPipeline,能在运行时动态的增加、删除、替换ChannelPipeline中的ChannelHandler。可以保持ChannelHandlerContext供以后使用,如外部Handler方法触发一个事件,甚至从一个不同的线程。

下面代码显示了保存ChannelHandlerContext供之后使用或其他线程使用:

[java] view plaincopy
1. public class WriteHandler extends ChannelHandlerAdapter {  
2.     private ChannelHandlerContext ctx;  
3.   
4.     @Override  
5.     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  
6.         this.ctx = ctx;  
7.     }  
8.       
9.     public void send(String msg){  
10.         ctx.write(msg);  
11.     }  
12. }

请注意,ChannelHandler实例如果带有@Sharable注解则可以被添加到多个ChannelPipeline。也就是说单个ChannelHandler实例可以有多个ChannelHandlerContext,因此可以调用不同ChannelHandlerContext获取同一个ChannelHandler。如果添加不带@Sharable注解的ChannelHandler实例到多个ChannelPipeline则会抛出异常;使用@Sharable注解后的ChannelHandler必须在不同的线程和不同的通道上安全使用。怎么是不安全的使用?看下面代码:

[java] view plaincopy
1. @Sharable  
2. public class NotSharableHandler extends ChannelInboundHandlerAdapter {  
3.   
4.     private int count;  
5.   
6.     @Override  
7.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
8.         count++;  
9.         System.out.println("channelRead(...) called the " + count + " time“");  
10.         ctx.fireChannelRead(msg);  
11.     }  
12.       
13. }

上面是一个带@Sharable注解的Handler,它被多个线程使用时,里面count是不安全的,会导致count值错误。为什么要共享ChannelHandler?使用@Sharable注解共享一个ChannelHandler在一些需求中还是有很好的作用的,如使用一个ChannelHandler来统计连接数或来处理一些全局数据等等。

状态模型

Netty有一个简单但强大的状态模型,并完美映射到ChannelInboundHandler的各个方法。下面是Channel生命周期四个不同的状态:

  • channelUnregistered
  • channelRegistered
  • channelActive
  • channelInactive

Channel的状态在其生命周期中变化,因为状态变化需要触发,下图显示了Channel状态变化:

还可以看到额外的状态变化,因为用户允许从EventLoop中注销Channel暂停事件执行,然后再重新注册。在这种情况下,你会看到多个channelRegistered和channelUnregistered状态的变化,而永远只有一个channelActive和channelInactive的状态,因为一个通道在其生命周期内只能连接一次,之后就会被回收;重新连接,则是创建一个新的通道。

6)ChannelHandler和其子类

Netty中有3个实现了ChannelHandler接口的类,其中2个是接口,一个是抽象类。如下图:

ChannelHandler中的方法

etty定义了良好的类型层次结构来表示不同的处理程序类型,所有的类型的父类是ChannelHandler。ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法。

  • handlerAdded,ChannelHandler添加到实际上下文中准备处理事件
  • handlerRemoved,将ChannelHandler从实际上下文中删除,不再处理事件
  • exceptionCaught,处理抛出的异常

上面三个方法都需要传递ChannelHandlerContext参数,每个ChannelHandler被添加到ChannelPipeline时会自动创建ChannelHandlerContext。ChannelHandlerContext允许在本地通道安全的存储和检索值。Netty还提供了一个实现了ChannelHandler的抽象类:ChannelHandlerAdapter。ChannelHandlerAdapter实现了父类的所有方法,基本上就是传递事件到ChannelPipeline中的下一个ChannelHandler直到结束。

ChannelInboundHandler

ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用。

下面是ChannelInboundHandler的一些方法:

  • channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;
  • channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
  • channelActive,ChannelHandlerContext的Channel已激活
  • channelInactive,ChannelHanderContxt的Channel结束生命周期
  • channelRead,从当前Channel的对端读取消息
  • channelReadComplete,消息读取完成后执行
  • userEventTriggered,一个用户事件被处罚
  • channelWritabilityChanged,改变通道的可写状态,可以使用Channel.isWritable()检查
  • exceptionCaught,重写父类ChannelHandler的方法,处理异常

Netty提供了一个实现了ChannelInboundHandler接口并继承ChannelHandlerAdapter的类:ChannelInboundHandlerAdapter。ChannelInboundHandlerAdapter实现了ChannelInboundHandler的所有方法,作用就是处理消息并将消息转发到ChannelPipeline中的下一个ChannelHandler。ChannelInboundHandlerAdapter的channelRead方法处理完消息后不会自动释放消息,若想自动释放收到的消息,可以使用SimpleChannelInboundHandler<I>。

看下面代码:

[java] view plaincopy
1. /** 
2.  * 实现ChannelInboundHandlerAdapter的Handler,不会自动释放接收的消息对象 
3.  * @author c.k                                                               
4.  *  
5.  */  
6. public class DiscardHandler extends ChannelInboundHandlerAdapter {  
7.     @Override  
8.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
9.         //手动释放消息  
10.         ReferenceCountUtil.release(msg);  
11.     }  
12. }  
[java] view plaincopy
1. /** 
2.  * 继承SimpleChannelInboundHandler,会自动释放消息对象 
3.  * @author c.k 
4.  * 
5.  */  
6. public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {  
7.     @Override  
8.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
9.         //不需要手动释放  
10.     }  
11. }

如果需要其他状态改变的通知,可以重写Handler的其他方法。通常自定义消息类型来解码字节,可以实现ChannelInboundHandler或ChannelInboundHandlerAdapter。有一个更好的解决方法,使用编解码器的框架可以很容的实现。使用ChannelInboundHandler、ChannelInboundHandlerAdapter、SimpleChannelInboundhandler这三个中的一个来处理接收消息,使用哪一个取决于需求;大多数时候使用SimpleChannelInboundHandler处理消息,使用ChannelInboundHandlerAdapter处理其他的“入站”事件或状态改变。

ChannelInitializer用来初始化ChannelHandler,将自定义的各种ChannelHandler添加到ChannelPipeline中。

ChannelOutboundHandler

ChannelOutboundHandler用来处理“出站”的数据消息。ChannelOutboundHandler提供了下面一些方法:

  • bind,Channel绑定本地地址
  • connect,Channel连接操作
  • disconnect,Channel断开连接
  • close,关闭Channel
  • deregister,注销Channel
  • read,读取消息,实际是截获ChannelHandlerContext.read()
  • write,写操作,实际是通过ChannelPipeline写消息,Channel.flush()属性到实际通道
  • flush,刷新消息到通道

ChannelOutboundHandler是ChannelHandler的子类,实现了ChannelHandler的所有方法。所有最重要的方法采取ChannelPromise,因此一旦请求停止从ChannelPipeline转发参数则必须得到通知。Netty提供了ChannelOutboundHandler的实现:ChannelOutboundHandlerAdapter。

ChannelOutboundHandlerAdapter实现了父类的所有方法,并且可以根据需要重写感兴趣的方法。所有这些方法的实现,在默认情况下,都是通过调用ChannelHandlerContext的方法将事件转发到ChannelPipeline中下一个ChannelHandler。

看下面的代码:

[java] view plaincopy
1. public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {  
2.     @Override  
3.     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
4.         ReferenceCountUtil.release(msg);  
5.         promise.setSuccess();  
6.     }  
7. }

重要的是要记得释放致远并直通ChannelPromise,若ChannelPromise没有被通知可能会导致其中一个ChannelFutureListener不被通知去处理一个消息。

如果消息被消费并且没有被传递到ChannelPipeline中的下一个ChannelOutboundHandler,那么就需要调用ReferenceCountUtil.release(message)来释放消息资源。一旦消息被传递到实际的通道,它会自动写入消息或在通道关闭是释放。

3.4编码器

  • Codec,编解码器
  • Decoder,解码器
  • Encoder,编码器

Netty提供了编解码器框架,使得编写自定义的编解码器很容易,并且也很容易重用和封装。本章讨论Netty的编解码器框架以及使用。

1)编解码器Codec

编写一个网络应用程序需要实现某种编解码器,编解码器的作用就是讲原始字节数据与自定义的消息对象进行互转。网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码,因为编解码器由两部分组成:

  • Decoder(解码器)
  • Encoder(编码器)

解码器负责将消息从字节或其他序列形式转成指定的消息对象,编码器则相反;解码器负责处理“入站”数据,编码器负责处理“出站”数据。编码器和解码器的结构很简单,消息被编码后解码后会自动通过ReferenceCountUtil.release(message)释放,如果不想释放消息可以使用ReferenceCountUtil.retain(message),这将会使引用数量增加而没有消息发布,大多数时候不需要这么做。

2)解码器

Netty提供了丰富的解码器抽象基类,我们可以很容易的实现这些基类来自定义解码器。下面是解码器的一个类型:

  • 解码字节到消息
  • 解码消息到消息
  • 解码消息到字节

本章将概述不同的抽象基类,来帮助了解解码器的实现。深入了解Netty提供的解码器之前先了解解码器的作用是什么?解码器负责解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。实践中使用解码器很简单,就是将入站数据转换格式后传递到ChannelPipeline中的下一个ChannelInboundHandler进行处理;这样的处理时很灵活的,我们可以将解码器放在ChannelPipeline中,重用逻辑。

ByteToMessageDecoder

通常你需要将消息从字节解码成消息或者从字节解码成其他的序列化字节。这是一个常见的任务,Netty提供了抽象基类,我们可以使用它们来实现。Netty中提供的ByteToMessageDecoder可以将字节消息解码成POJO对象,下面列出了ByteToMessageDecoder两个主要方法:

  • decode(ChannelHandlerContext, ByteBuf, List<Object>),这个方法是唯一的一个需要自己实现的抽象方法,作用是将ByteBuf数据解码成其他形式的数据。
  • decodeLast(ChannelHandlerContext, ByteBuf, List<Object>),实际上调用的是decode(...)。

例如服务器从某个客户端接收到一个整数值的字节码,服务器将数据读入ByteBuf并经过ChannelPipeline中的每个ChannelInboundHandler进行处理。

从“入站”ByteBuf读取bytes后由ToIntegerDecoder进行解码,然后向解码后的消息传递到ChannelPipeline中的下一个ChannelInboundHandler。看下面ToIntegerDecoder的实现代码:

[java] view plaincopy
1. /** 
2.  * Integer解码器,ByteToMessageDecoder实现 
3.  * @author c.k 
4.  * 
5.  */  
6. public class ToIntegerDecoder extends ByteToMessageDecoder {  
7.   
8.     @Override  
9.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {  
10.         if(in.readableBytes() >= 4){  
11.             out.add(in.readInt());  
12.         }  
13.     }  
14. }

从上面的代码可能会发现,我们需要检查ByteBuf读之前是否有足够的字节,若没有这个检查岂不更好?是的,Netty提供了这样的处理允许byte-to-message解码,在下一节讲解。除了ByteToMessageDecoder之外,Netty还提供了许多其他的解码接口。

ReplayingDecoder

ReplayingDecoder是byte-to-message解码的一种特殊的抽象基类,读取缓冲区的数据之前需要检查缓冲区是否有足够的字节,使用ReplayingDecoder就无需自己检查;若ByteBuf中有足够的字节,则会正常读取;若没有足够的字节则会停止解码。也正因为这样的包装使得ReplayingDecoder带有一定的局限性。

  • 不是所有的操作都被ByteBuf支持,如果调用一个不支持的操作会抛出DecoderException。
  • ByteBuf.readableBytes()大部分时间不会返回期望值

如果你能忍受上面列出的限制,相比ByteToMessageDecoder,你可能更喜欢ReplayingDecoder。在满足需求的情况下推荐使用ByteToMessageDecoder,因为它的处理比较简单,没有ReplayingDecoder实现的那么复杂。ReplayingDecoder继承与ByteToMessageDecoder,所以他们提供的接口是相同的。下面代码是ReplayingDecoder的实现:

[java] view plaincopy
1. /** 
2.  * Integer解码器,ReplayingDecoder实现 
3.  * @author c.k 
4.  * 
5.  */  
6. public class ToIntegerReplayingDecoder extends ReplayingDecoder<Void> {  
7.   
8.     @Override  
9.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {  
10.         out.add(in.readInt());  
11.     }  
12. }

当从接收的数据ByteBuf读取integer,若没有足够的字节可读,decode(...)会停止解码,若有足够的字节可读,则会读取数据添加到List列表中。使用ReplayingDecoder或ByteToMessageDecoder是个人喜好的问题,Netty提供了这两种实现,选择哪一个都可以。

上面讲了byte-to-message的解码实现方式,那message-to-message该如何实现呢?Netty提供了MessageToMessageDecoder抽象类。

MessageToMessageDecoder

将消息对象转成消息对象可是使用MessageToMessageDecoder,它是一个抽象类,需要我们自己实现其decode(...)。message-to-message同上面讲的byte-to-message的处理机制一样,看下图:

看下面的实现代码:

[java] view plaincopy
1. /** 
2.  * 将接收的Integer消息转成String类型,MessageToMessageDecoder实现 
3.  * @author c.k 
4.  * 
5.  */  
6. public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {  
7.   
8.     @Override  
9.     protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {  
10.         out.add(String.valueOf(msg));  
11.     }  
12. }

3)解码器总结

解码器是用来处理入站数据,Netty提供了很多解码器的实现,可以根据需求详细了解。那我们发送数据需要将数据编码,Netty中也提供了编码器的支持。下一节将讲解如何实现编码器。

编码器

Netty提供了一些基类,我们可以很简单的编码器。同样的,编码器有下面两种类型:

  • 消息对象编码成消息对象
  • 消息对象编码成字节码

相对解码器,编码器少了一个byte-to-byte的类型,因为出站数据这样做没有意义。编码器的作用就是将处理好的数据转成字节码以便在网络中传输。对照上面列出的两种编码器类型,Netty也分别提供了两个抽象类:MessageToByteEncoder和MessageToMessageEncoder。

MessageToByteEncoder

MessageToByteEncoder是抽象类,我们自定义一个继承MessageToByteEncoder的编码器只需要实现其提供的encode(...)方法。其工作流程如下图:

实现代码如下:

[java] view plaincopy
1. /** 
2.  * 编码器,将Integer值编码成byte[],MessageToByteEncoder实现 
3.  * @author c.k 
4.  * 
5.  */  
6. public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {  
7.     @Override  
8.     protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {  
9.         out.writeInt(msg);  
10.     }  
11. }

MessageToMessageEncoder

需要将消息编码成其他的消息时可以使用Netty提供的MessageToMessageEncoder抽象类来实现。例如将Integer编码成String。

代码实现如下:

[java] view plaincopy
1. /** 
2.  * 编码器,将Integer编码成String,MessageToMessageEncoder实现 
3.  * @author c.k                                                     
4.  * 
5.  */  
6. public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {  
7.   
8.     @Override  
9.     protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {  
10.         out.add(String.valueOf(msg));  
11.     }  
12. }

4)编解码器

实际编码中,一般会将编码和解码操作封装太一个类中,解码处理“入站”数据,编码处理“出站”数据。知道了编码和解码器,对于下面的情况不会感觉惊讶:

  • byte-to-message编码和解码
  • message-to-message编码和解码

如果确定需要在ChannelPipeline中使用编码器和解码器,需要更好的使用一个抽象的编解码器。同样,使用编解码器的时候,不可能只删除解码器或编码器而离开ChannelPipeline导致某种不一致的状态。使用编解码器将强制性的要么都在ChannelPipeline,要么都不在ChannelPipeline。

考虑到这一点,我们在下面几节将更深入的分析Netty提供的编解码抽象类。

Netty4较之前的版本,其结构有很大的变化,在Netty4中实现byte-to-byte提供了2个类:ByteArrayEncoder和ByteArrayDecoder。这两个类用来处理字节到字节的编码和解码。下面是这两个类的源码,一看就知道是如何处理的:

[java] view plaincopy
1. public class ByteArrayDecoder extends MessageToMessageDecoder<ByteBuf> {  
2.     @Override  
3.     protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {  
4.          // copy the ByteBuf content to a byte array  
5.         byte[] array = new byte[msg.readableBytes()];  
6.         msg.getBytes(0, array);  
7.   
8.         out.add(array);  
9.     }  
10. }  
[java] view plaincopy
1. @Sharable  
2. public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]> {  
3.     @Override  
4.     protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {  
5.         out.add(Unpooled.wrappedBuffer(msg));  
6.     }  
7. }

ByteToMessageCodec

ByteToMessageCodec用来处理byte-to-message和message-to-byte。如果想要解码字节消息成POJO或编码POJO消息成字节,对于这种情况,ByteToMessageCodec<I>是一个不错的选择。ByteToMessageCodec是一种组合,其等同于ByteToMessageDecoder和MessageToByteEncoder的组合。MessageToByteEncoder是个抽象类,其中有2个方法需要我们自己实现:

  • encode(ChannelHandlerContext, I, ByteBuf),编码
  • decode(ChannelHandlerContext, ByteBuf, List<Object>),解码

MessageToMessageCodec

MessageToMessageCodec用于message-to-message的编码和解码,可以看成是MessageToMessageDecoder和MessageToMessageEncoder的组合体。MessageToMessageCodec是抽象类,其中有2个方法需要我们自己实现:

  • encode(ChannelHandlerContext, OUTBOUND_IN, List<Object>)
  • decode(ChannelHandlerContext, INBOUND_IN, List<Object>)

但是,这种编解码器能有用吗?

有许多用例,最常见的就是需要将消息从一个API转到另一个API。这种情况下需要自定义API或旧的API使用另一种消息类型。下面的代码显示了在WebSocket框架APIs之间转换消息:

[java] view plaincopy
1. package netty.in.action;  
2.   
3. import java.util.List;  
4.   
5. import io.netty.buffer.ByteBuf;  
6. import io.netty.channel.ChannelHandlerContext;  
7. import io.netty.channel.ChannelHandler.Sharable;  
8. import io.netty.handler.codec.MessageToMessageCodec;  
9. import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;  
10. import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;  
11. import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;  
12. import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;  
13. import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;  
14. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;  
15. import io.netty.handler.codec.http.websocketx.WebSocketFrame;  
16.   
17. @Sharable  
18. public class WebSocketConvertHandler extends  
19.         MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {  
20.   
21.     public static final WebSocketConvertHandler INSTANCE = new WebSocketConvertHandler();  
22.   
23.     @Override  
24.     protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {  
25.         switch (msg.getType()) {  
26.         case BINARY:  
27.             out.add(new BinaryWebSocketFrame(msg.getData()));  
28.             break;  
29.         case CLOSE:  
30.             out.add(new CloseWebSocketFrame(true, 0, msg.getData()));  
31.             break;  
32.         case PING:  
33.             out.add(new PingWebSocketFrame(msg.getData()));  
34.             break;  
35.         case PONG:  
36.             out.add(new PongWebSocketFrame(msg.getData()));  
37.             break;  
38.         case TEXT:  
39.             out.add(new TextWebSocketFrame(msg.getData()));  
40.             break;  
41.         case CONTINUATION:  
42.             out.add(new ContinuationWebSocketFrame(msg.getData()));  
43.             break;  
44.         default:  
45.             throw new IllegalStateException("Unsupported websocket msg " + msg);  
46.         }  
47.     }  
48.   
49.     @Override  
50.     protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {  
51.         if (msg instanceof BinaryWebSocketFrame) {  
52.             out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, msg.content().copy()));  
53.             return;  
54.         }  
55.         if (msg instanceof CloseWebSocketFrame) {  
56.             out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, msg.content().copy()));  
57.             return;  
58.         }  
59.         if (msg instanceof PingWebSocketFrame) {  
60.             out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, msg.content().copy()));  
61.             return;  
62.         }  
63.         if (msg instanceof PongWebSocketFrame) {  
64.             out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, msg.content().copy()));  
65.             return;  
66.         }  
67.         if (msg instanceof TextWebSocketFrame) {  
68.             out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, msg.content().copy()));  
69.             return;  
70.         }  
71.         if (msg instanceof ContinuationWebSocketFrame) {  
72.             out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, msg.content().copy()));  
73.             return;  
74.         }  
75.         throw new IllegalStateException("Unsupported websocket msg " + msg);  
76.     }  
77.   
78.     public static final class MyWebSocketFrame {  
79.         public enum FrameType {  
80.             BINARY, CLOSE, PING, PONG, TEXT, CONTINUATION  
81.         }  
82.   
83.         private final FrameType type;  
84.         private final ByteBuf data;  
85.   
86.         public MyWebSocketFrame(FrameType type, ByteBuf data) {  
87.             this.type = type;  
88.             this.data = data;  
89.         }  
90.   
91.         public FrameType getType() {  
92.             return type;  
93.         }  
94.   
95.         public ByteBuf getData() {  
96.             return data;  
97.         }  
98.
99.     }  
100. }


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
Sqoop 企业级大数据迁移方案实战
Sqoop是一个用于在Hadoop和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到Hadoop HDFS,并从Hadoop文件系统导出到关系数据库。 本课程主要讲解了Sqoop的设计思想及原理、部署安装及配置、详细具体的使用方法技巧与实操案例、企业级任务管理等。结合日常工作实践,培养解决实际问题的能力。本课程由黑马程序员提供。
相关文章
|
1月前
|
安全 网络安全 数据安全/隐私保护
访问控制列表(ACL)是网络安全中的一种重要机制,用于定义和管理对网络资源的访问权限
访问控制列表(ACL)是网络安全中的一种重要机制,用于定义和管理对网络资源的访问权限。它通过设置一系列规则,控制谁可以访问特定资源、在什么条件下访问以及可以执行哪些操作。ACL 可以应用于路由器、防火墙等设备,分为标准、扩展、基于时间和基于用户等多种类型,广泛用于企业网络和互联网中,以增强安全性和精细管理。
172 7
|
3月前
|
开发者 Python
Python Socket编程:不只是基础,更有进阶秘籍,让你的网络应用飞起来!
在数字时代,网络应用成为连接世界的桥梁。Python凭借简洁的语法和丰富的库支持,成为开发高效网络应用的首选。本文通过实时聊天室案例,介绍Python Socket编程的基础与进阶技巧。基础篇涵盖服务器和客户端的建立与数据交换;进阶篇则探讨多线程与异步IO优化方案,助力提升应用性能。通过本案例,你将掌握Socket编程的核心技能,推动网络应用飞得更高、更远。
70 1
|
29天前
|
存储 网络协议 物联网
C 语言物联网开发之网络通信与数据传输难题
本文探讨了C语言在物联网开发中遇到的网络通信与数据传输挑战,分析了常见问题并提出了优化策略,旨在提高数据传输效率和系统稳定性。
|
2月前
|
网络协议 Java 应用服务中间件
深入浅出Tomcat网络通信的高并发处理机制
【10月更文挑战第3天】本文详细解析了Tomcat在处理高并发网络请求时的机制,重点关注了其三种不同的IO模型:NioEndPoint、Nio2EndPoint 和 AprEndPoint。NioEndPoint 采用多路复用模型,通过 Acceptor 接收连接、Poller 监听事件及 Executor 处理请求;Nio2EndPoint 则使用 AIO 异步模型,通过回调函数处理连接和数据就绪事件;AprEndPoint 通过 JNI 调用本地库实现高性能,但已在 Tomcat 10 中弃用
深入浅出Tomcat网络通信的高并发处理机制
|
1月前
|
Kubernetes 网络协议 Python
Python网络编程:从Socket到Web应用
在信息时代,网络编程是软件开发的重要组成部分。Python作为多用途编程语言,提供了从Socket编程到Web应用开发的强大支持。本文将从基础的Socket编程入手,逐步深入到复杂的Web应用开发,涵盖Flask、Django等框架的应用,以及异步Web编程和微服务架构。通过本文,读者将全面了解Python在网络编程领域的应用。
36 1
|
2月前
|
Java
[Java]Socket套接字(网络编程入门)
本文介绍了基于Java Socket实现的一对一和多对多聊天模式。一对一模式通过Server和Client类实现简单的消息收发;多对多模式则通过Server类维护客户端集合,并使用多线程实现实时消息广播。文章旨在帮助读者理解Socket的基本原理和应用。
33 1
|
2月前
|
消息中间件 监控 网络协议
Python中的Socket魔法:如何利用socket模块构建强大的网络通信
本文介绍了Python的`socket`模块,讲解了其基本概念、语法和使用方法。通过简单的TCP服务器和客户端示例,展示了如何创建、绑定、监听、接受连接及发送/接收数据。进一步探讨了多用户聊天室的实现,并介绍了非阻塞IO和多路复用技术以提高并发处理能力。最后,讨论了`socket`模块在现代网络编程中的应用及其与其他通信方式的关系。
250 3
|
2月前
|
网络协议 Linux 应用服务中间件
Socket通信之网络协议基本原理
【10月更文挑战第10天】网络协议定义了机器间通信的标准格式,确保信息准确无损地传输。主要分为两种模型:OSI七层模型与TCP/IP模型。
|
3月前
|
网络协议 Python
网络世界的建筑师:Python Socket编程基础与进阶,构建你的网络帝国!
在数字宇宙中,网络如同复杂脉络连接每个角落,Python Socket编程则是开启这一世界的钥匙。本文将引导你从基础概念入手,逐步掌握Socket编程,并通过实战示例构建TCP/UDP服务器与客户端。你将学会使用Python的socket模块进行网络通信,了解TCP与UDP的区别,并运用多线程与异步IO提升服务器性能。跟随本文指引,成为网络世界的建筑师,构建自己的网络帝国。
40 2
|
3月前
|
网络协议 Python
告别网络编程迷雾!Python Socket编程基础与实战,让你秒变网络达人!
在网络编程的世界里,Socket编程是连接数据与服务的关键桥梁。对于初学者,这往往是最棘手的部分。本文将用Python带你轻松入门Socket编程,从创建TCP服务器与客户端的基础搭建,到处理并发连接的实战技巧,逐步揭开网络编程的神秘面纱。通过具体的代码示例,我们将掌握Socket的基本概念与操作,让你成为网络编程的高手。无论是简单的数据传输还是复杂的并发处理,Python都能助你一臂之力。希望这篇文章成为你网络编程旅程的良好开端。
65 3