【Netty】Netty传输

简介: 在简单学习了Netty中的组件后,接着学习Netty中数据的传输细节。

一、前言


  在简单学习了Netty中的组件后,接着学习Netty中数据的传输细节。


二、传输  


  2.1 传输示例

  Netty中的数据传输都是使用的字节类型,下面通过一个实例进行说明,该实例中服务器接受请求,然后向客户端发送一个Hi,最后关闭连接。下面是不同方式的实现。

  1. OIO方式

  OIO与NIO对应,使用阻塞式的IO处理,其服务端代码如下 

package com.hust.grid.leesf.chapter4;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
public class PlainOioServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);
        try {
            for (;;) {
                final Socket clientSocket = socket.accept();
                System.out.println("Accepted connection from: " + clientSocket);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));
                            out.flush();
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

说明:其中,在for循环中会不断的去监听是否有新请求到达,当有请求到达后,初始化一个新的线程去处理,完成向客户端发送Hi字符串,最后关闭连接。使用OIO方式的性能较差,扩展性也不好,需要使用异步方式处理。

  2. NIO方式

package com.hust.grid.leesf.chapter4;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class PlainNioServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (;;) {
            try {
                selector.select();
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();    //5
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {                //6
                        ServerSocketChannel server =
                                (ServerSocketChannel)key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE |
                                SelectionKey.OP_READ, msg.duplicate());    //7
                        System.out.println(
                                "Accepted connection from " + client);
                    }
                    if (key.isWritable()) {                //8
                        SocketChannel client =
                                (SocketChannel)key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer)key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) {        //9
                                break;
                            }
                        }
                        client.close();                    //10
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        // ignore on close
                    }
                }
            }
        }
    }
}

 说明:NIO方法使用了Selector和Channel等组件,使用Selector来处理多个Channel,其代码与OIO代码几乎是完全不相同。

  3. Netty的OIO方式

  当使用Netty框架处理时,并且采用OIO的方式,其代码如下 

package com.hust.grid.leesf.chapter4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
public class NettyOioServer {
    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
             .channel(OioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

可以看到上述代码很多都是和之前示例中使用的代码相同,再来看看在Netty框架下使用NIO时的处理方式。

  4. Netty的NIO方式 

package com.hust.grid.leesf.chapter4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
public class NettyNioServer {
    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
             .channel(NioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

 可以看到Netty框架下的NIO和OIO的代码基本相同。

  2.2 传输细节

  传输的核心是Channel接口,其继承结构图如下所示

  5.png

  其中,Channel继承了AttributeMap和Comparable接口,其包含ChannelPipeline和ChannelConfig两个实例,ChannelConfig包含了所有的配置信息,ChannelPipeline包含了所有的ChannelHandler实例,其中用于存放用户的处理逻辑。典型的ChannelHandler用法如下

    · 将数据类型进行转化

    · 提供异常通知

    · 提供Channel变为活动或非活动的通知

    · 提供当Channel在EventLoopGroup中注册或者注销时的通知

    · 提供用户定义事件的通知

  Netty中的Channel实现是线程安全的,所以在多线程环境中可安全使用。

  2.3 传输方案

  Netty提供了多种传输方案,你可根据应用的不同选择合适的传输方案。

  1. NIO-非阻塞型I/O

  NIO提供了完全异步的IO实现,其使用基于选择器的API,选择器的核心概念是将其作为注册表,当通道的状态变化时会接受到通知,可能有的变化状态如下

    · 一个新通道被接受并已准备好

    · 一个通道的连接已经完成

    · 一个通道已经有准备好读取的数据

    · 一个通道可以写入数据

  当应用程序对状态的变化做出反应后,会重置选择器并重复处理,根据不同的状态给出不同的响应,选择操作有如下四种类型OP_ACCEPT 、OP_CONNECT、OP_READ 、OP_WRITE,其中选择器的处理流程如下图所示

  6.png

  2. Epoll-Linux的本机非阻塞传输

  Netty可在任何系统上运行,但对于不同的系统会有不同的折中,Linux系统中的epoll具有高可扩展的I/O的事件通知,Linux上的JDK的NIO则是基于epoll,当使用epoll取代NIO时,可以使用Netty中的EpollEventLoopGroup取代NioEventLoopGroup,使用EpollServerSocketChannel.class取代NioServerSocketChannel.class。

  3. OIO-阻塞型I/O

  Netty的OIO是一种妥协方案,其使用JAVA中原生态的旧的API,其是同步阻塞的。在java.net的API中,通常使用一个线程接受来自指定端口的请求,当创建一个套接字时,就会创建一个新的线程来进行处理,其处理流程图如下图所示

  7.png

  4. 在JVM内进行通信的本地传输

  Netty提供了在同一个JVM中的客户端与服务端之间的异步通信,在此传输中,与服务器通道相关联的SocketAddress不绑定到物理网络地址,相反,当服务器运行时,其存储在注册表中,关闭时取消注册。因为传输不能接受真正的网络流量,所以它不能与其他传输实现互操作。

  5. 嵌入式传输

  Netty还提供了一个额外的传输,其可以将ChannelHandler作为帮助类嵌入其他的ChannelHandler中,以这种方式,你无需修改其内部代码便能扩展ChannelHandler的功能。

  2.4 传输示例

  并非所有的传输都支持所有的传输协议,下表是传输方式与传输协议之间的关系

  8.png

  而对于不同的应用,可根据下表选择合适的传输方式

  9.png


三、总结


  本篇博文讲解了Netty中的传输细节,了解了多种不同的传输方式,以及其和不同传输协议之间的关系,也谢谢各位园友的观看~

 

目录
相关文章
|
3月前
|
Java
Netty传输object并解决粘包拆包问题
Netty传输object并解决粘包拆包问题
30 0
|
存储 Java Linux
Netty实战(四)Netty的传输
流经网络的数据总是具有相同的类型:字节。这些字节是如何流动的主要取决于我们所说的网络传输。
166 0
|
网络协议 前端开发 Linux
快来体验快速通道,netty中epoll传输协议详解
在前面的章节中,我们讲解了kqueue的使用和原理,接下来我们再看一下epoll的使用。两者都是更加高级的IO方式,都需要借助native的方法实现,不同的是Kqueue用在mac系统中,而epoll用在liunx系统中。
|
编解码
Netty(二)之整合Marshalling传输实体类
Netty(二)之整合Marshalling传输实体类
73 0
|
编解码 安全 前端开发
在netty中使用protobuf并实现数据加密传输
在netty中使用protobuf并实现数据加密传输
497 0
在netty中使用protobuf并实现数据加密传输
|
网络协议
netty系列之:kequeue传输协议详解
在前面的章节中,我们介绍了在netty中可以使用kequeue或者epoll来实现更为高效的native传输方式。那么kequeue和epoll和NIO传输协议有什么不同呢? 本章将会以kequeue为例进行深入探讨。
|
前端开发 Ubuntu Java
netty系列之:在netty中使用native传输协议
对于IO来说,除了传统的block IO,使用最多的就是NIO了,通常我们在netty程序中最常用到的就是NIO,比如NioEventLoopGroup,NioServerSocketChannel等。
|
设计模式 JSON Java
RPC框架(3 - 实现Netty传输和通用序列化接口)
RPC框架(3 - 实现Netty传输和通用序列化接口)
|
安全 网络安全 数据安全/隐私保护
netty案例,netty4.1中级拓展篇十三《Netty基于SSL实现信息传输过程中双向加密验证》
SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS与SSL在传输层对网络连接进行加密。
431 0
netty案例,netty4.1中级拓展篇十三《Netty基于SSL实现信息传输过程中双向加密验证》
|
Java 数据处理 API
netty案例,netty4.1中级拓展篇十一《Netty基于ChunkedStream数据流切块传输》
在Netty这种异步NIO框架的结构下,服务端与客户端通信过程中,高效、频繁、大量的写入大块数据时,因网络传输饱和的可能性就会造成数据处理拥堵、GC频繁、用户掉线的可能性。那么由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大块数据时,需要对大块数据进行切割发送处理。
204 0
netty案例,netty4.1中级拓展篇十一《Netty基于ChunkedStream数据流切块传输》