Netty服务端和客户端开发实例—官方原版

简介: Netty服务端和客户端开发实例—官方原版

一、Netty服务端开发

在开始使用 Netty 开发 TimeServer 之前,先回顾一下使用 NIO 进行服务端开发的步骤。

(1)创建ServerSocketChannel,配置它为非阻塞模式;

(2)绑定监听,配置TCP 参数,例如 backlog 大小;

(3)创建一个独立的I/O线程,用于轮询多路复用器 Selector;

(4)创建 Selector,将之前创建的 ServerSocketChannel 注册到 Selector 上,监听SelectionKey.ACCEPT;

(5)启动I/0线程,在循环体中执行 Selectorselect0)方法,轮询就绪的 Channel;

(6)当轮询到了处于就绪状态的 Channel 时,需要对其进行判断,如果是OP ACCEPT状态,说明是新的客户端接入,则调用 ServerSocketChannel.accept()方法接受新的客户端:(7)设置新接入的客户端链路 SocketChannel 为非阻塞模式,配置其他的一些TCP 参数(8)将SocketChannel注册到 Selector,监听 OP READ 操作位;

(9)如果轮询的Channel为OP READ,则说明 SocketChannel 中有新的就绪的数据包需要读取,则构造ByteBuffer 对象,读取数据包;

(10)如果轮询的Channel为OP WRITE,说明还有数据没有发送完成,需要继续发送。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
public class nettyServer {
    Logger logger = LoggerFactory.getLogger(nettyServer.class);
    @Value("${netty.port}")
    int port;
    @PostConstruct
    public void bind() {
        EventLoopGroup bossrGroup = new NioEventLoopGroup();//接收客户端传过来的请求
        EventLoopGroup wokerGroup = new NioEventLoopGroup();//接收到请求后将后续操作
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossrGroup, wokerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //.childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new serverHandlerAdapter());
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new HeartbeatHandler());
                            ch.pipeline().addLast(new IdleStateHandler(10, 1, 1));
                        }
                    });
            ChannelFuture f = b.bind(port).sync();
        } catch (Exception e) {
        }
    }

image.gif

    1. NioEventLoopGroup是一个处理I/O操作的多线程事件循环。Netty为不同类型的传输提供了各种EventLoopGroup实现。在本例中,我们实现了一个服务器端应用程序,因此将使用两个NioEventLoopGroup。第一个通常被称为“boss”,接受传入连接。第二个通常称为“worker”,在boss接受连接并将接受的连接注册给worker后,它处理接受的连接的流量。使用多少线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
      1. ServerBootstrap是一个设置服务器的助手类。您可以直接使用频道设置服务器。然而,请注意,这是一个乏味的过程,在大多数情况下,您不需要这样做。
        1. 这里,我们指定使用NioServerSocketChannel类,该类用于实例化一个新的Channel以接受传入连接。
          1. 此处指定的处理程序将始终由新接受的Channel评估。ChannelInitializer是一个特殊的处理程序,用于帮助用户配置新的Channel。您很可能希望通过添加一些处理程序(如DiscardServerHandler)来配置新频道的ChannelPipeline,以实现网络应用程序。随着应用程序变得复杂,您很可能会向管道中添加更多的处理程序,并最终将这个匿名类提取到顶级类中。
          import io.netty.channel.ChannelHandlerContext;
          import io.netty.channel.ChannelInboundHandlerAdapter;
          public class serverHandlerAdapter extends ChannelInboundHandlerAdapter {
              @Override
              public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  super.channelActive(ctx);
              }
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
             ByteBuf in = (ByteBuf) msg;
              try {
                  while (in.isReadable()) { // (1)
                      System.out.print((char) in.readByte());
                      System.out.flush();
                  }
              } finally {
                  ReferenceCountUtil.release(msg); // (2)
              }
              }
              @Override
              public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                  super.channelReadComplete(ctx);
              }
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  super.exceptionCaught(ctx, cause);
              }

          image.gif

            1. serverHandlerAdapter扩展 ChannelHandlerAdapter,它是 ChannelHandler 的实现。通道处理程序提供了可以重写的各种事件处理程序方法。目前,扩展 ChannelHandlerAdapter 就足够了,而不是自己实现处理程序接口。
              1. 我们在此处重写channelRead()事件处理程序方法。每当从客户端接收到新数据时,都会使用收到的消息调用此方法。在此示例中,收到的消息的类型为 ByteBuf。
                1. 若要实现协议,处理程序必须忽略收到的消息。ByteBuf 是一个引用计数的对象,必须通过该方法显式释放。请记住,处理程序负责释放传递给处理程序的任何引用计数对象。通常,处理程序方法的实现方式如下:DISCARDrelease()channelRead()

                二、Netty客户端开发

                import io.netty.bootstrap.Bootstrap;
                import io.netty.channel.ChannelHandler;
                import io.netty.channel.ChannelInitializer;
                import io.netty.channel.EventLoopGroup;
                import io.netty.channel.nio.NioEventLoopGroup;
                import io.netty.channel.socket.SocketChannel;
                import io.netty.channel.socket.nio.NioSocketChannel;
                public class nettyClient {
                    clientHandlerAdapter clientHandlerAdapter = new clientHandlerAdapter();
                    public void conect(String ip, int port) {
                        EventLoopGroup group = new NioEventLoopGroup();
                        try {
                            Bootstrap b = new Bootstrap();
                            b.group(group)
                                    .channel(NioSocketChannel.class)
                                    .remoteAddress("", port)
                                    .handler(new ChannelInitializer<SocketChannel>() {
                                        @Override
                                        protected void initChannel(SocketChannel ch) throws Exception {
                                            ch.pipeline().addLast(clientHandlerAdapter);
                                        }
                                    });
                            b.bind(ip, port).sync();
                        } catch (Exception e) {
                        }
                    }
                    public boolean sendFile() {
                        return clientHandlerAdapter.sendFile();
                    }
                }

                image.gif

                我们从 connect 方法讲起,在第 13 行首先创建客户端处理 I/0 读写的 NioEventLoopGroup 线程组,然后继续创建客户端辅助启动类 Bootstrap,随后需要对其进行配置。与服务端不同的是,它的 Channel 需要设置为 NioSocketChannel,然后为其添加 handler,此处为了简单直接创建匿名内部类,实现 initChannel 方法,其作用是当创建 NioSocketChannel成功之后,在初始化它的时候将它的 ChannelHandler 设置到 ChannelPipeline 中,用于处理网络I/O事件。

                客户端启动辅助类设置完成之后,调用 connect 方法发起异步连接,然后调用同步方法等待连接成功。

                最后,当客户端连接关闭之后,客户端主函数退出,在退出之前,释放 NIO 线程组的资源。

                下面我们继续看下TimeClientHandler 的代码如何实现

                import com.entity.Message;
                import com.google.gson.Gson;
                import com.google.gson.JsonObject;
                import io.netty.buffer.ByteBuf;
                import io.netty.channel.ChannelHandlerContext;
                import io.netty.channel.ChannelInboundHandlerAdapter;
                import org.springframework.beans.factory.annotation.Autowired;
                import java.nio.charset.Charset;
                public class clientHandlerAdapter extends ChannelInboundHandlerAdapter {
                    @Autowired
                    Gson gson;
                    ChannelHandlerContext ctx;
                    public clientHandlerAdapter() {
                        super();
                    }
                    @Override
                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        super.channelRegistered(ctx);
                    }
                    @Override
                    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                        super.channelUnregistered(ctx);
                    }
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        super.channelActive(ctx);
                        this.ctx = ctx;
                    }
                    @Override
                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                        super.channelInactive(ctx);
                    }
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        String jsonObject = gson.toJson(msg);
                        Message message = gson.fromJson(jsonObject, Message.class);
                        String type = message.getMsgType();
                        if (type != null) {
                            ByteBuf out = getByteBuf(ctx);
                            ctx.channel().writeAndFlush(out);
                        }
                        super.channelRead(ctx, msg);
                    }
                    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
                        byte[] bytes = "我是发送给客户端的数据:请重启冰箱!".getBytes(Charset.forName("utf-8"));
                        ByteBuf buffer = ctx.alloc().buffer();
                        buffer.writeBytes(bytes);
                        return buffer;
                    }
                    @Override
                    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                        super.channelReadComplete(ctx);
                    }
                    @Override
                    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                        super.userEventTriggered(ctx, evt);
                    }
                    @Override
                    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
                        super.channelWritabilityChanged(ctx);
                    }
                    @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                        super.exceptionCaught(ctx, cause);
                    }
                    public boolean sendFile() {
                        byte[] bytes=new byte[2048];
                        ctx.writeAndFlush(bytes);
                        return true;
                    }
                }

                image.gif

                三、处理基于流的传输

                在基于流的传输(如 TCP/IP)中,接收的数据存储在套接字接收缓冲区中。遗憾的是,基于流的传输的缓冲区不是数据包队列,而是字节队列。这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将它们视为两条消息,而只是一堆字节。因此,无法保证您阅读的内容正是您的远程对等方所写的内容

                1、示例一:

                现在让我们回到TIME客户端示例。我们这里也有同样的问题。32位整数是非常少量的数据,不太可能经常被分割。然而,问题是它可能是碎片化的,并且随着流量的增加,碎片化的可能性会增加。

                简单的解决方案是创建一个内部累积缓冲区,并等待所有4个字节被接收到内部缓冲区。以下是修正了问题的TimeClientHandler实现:

                package io.netty.example.time;
                import java.util.Date;
                public class TimeClientHandler extends ChannelHandlerAdapter {
                    private ByteBuf buf;
                    @Override
                    public void handlerAdded(ChannelHandlerContext ctx) {
                        buf = ctx.alloc().buffer(4); // (1)
                    }
                    @Override
                    public void handlerRemoved(ChannelHandlerContext ctx) {
                        buf.release(); // (1)
                        buf = null;
                    }
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                        ByteBuf m = (ByteBuf) msg;
                        buf.writeBytes(m); // (2)
                        m.release();
                        if (buf.readableBytes() >= 4) { // (3)
                            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
                            System.out.println(new Date(currentTimeMillis));
                            ctx.close();
                        }
                    }
                    @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                        cause.printStackTrace();
                        ctx.close();
                    }
                }

                image.gif

                  1. ChannelHandler有两个生命周期侦听器方法:handlerAdded()和handlerRemoved()。您可以执行任意(取消)初始化任务,只要它不会长时间阻塞。
                    1. 首先,所有接收到的数据都应该累积到buf中。
                      1. 然后,处理程序必须检查buf是否有足够的数据(在本例中为4个字节),然后继续执行实际的业务逻辑。否则,当更多数据到达时,Netty将再次调用channelRead()方法,最终将累积所有4个字节。
                      目录
                      相关文章
                      |
                      3月前
                      |
                      Java Maven
                      【Netty 网络通信】启动通信服务端
                      【1月更文挑战第9天】【Netty 网络通信】启动通信服务端
                      |
                      2月前
                      |
                      Java Unix Linux
                      【Netty技术专题】「原理分析系列」Netty强大特性之Native transports扩展开发实战
                      当涉及到网络通信和高性能的Java应用程序时,Netty是一个强大的框架。它提供了许多功能和组件,其中之一是JNI传输。JNI传输是Netty的一个特性,它为特定平台提供了高效的网络传输。 在本文中,我们将深入探讨Netty提供的特定平台的JNI传输功能,分析其优势和适用场景。我们将介绍每个特定平台的JNI传输,并讨论其性能、可靠性和可扩展性。通过了解这些特定平台的JNI传输,您将能够更好地选择和配置适合您应用程序需求的网络传输方式,以实现最佳的性能和可靠性。
                      57 7
                      【Netty技术专题】「原理分析系列」Netty强大特性之Native transports扩展开发实战
                      |
                      16天前
                      |
                      网络协议 Java 物联网
                      Spring Boot与Netty打造TCP服务端(解决粘包问题)
                      Spring Boot与Netty打造TCP服务端(解决粘包问题)
                      26 1
                      |
                      30天前
                      |
                      NoSQL Redis
                      Netty实战:模拟Redis的客户端
                      Netty实战:模拟Redis的客户端
                      14 0
                      |
                      2月前
                      |
                      安全 Java Go
                      springboot+netty化身Udp服务端,go化身客户端模拟设备实现指令联动
                      springboot+netty化身Udp服务端,go化身客户端模拟设备实现指令联动
                      70 0
                      |
                      3月前
                      |
                      测试技术
                      Netty4 websocket 开启服务端并设置IP和端口号
                      Netty4 websocket 开启服务端并设置IP和端口号
                      66 0
                      |
                      7月前
                      |
                      弹性计算 Java Unix
                      搭稳Netty开发的地基,用漫画帮你分清同步异步阻塞非阻塞
                      Netty Netty是一款非常优秀的网络编程框架,是对NIO的二次封装,本文将重点剖析Netty客户端的启动流程,深入底层了解如何使用NIO编程客户端。 Linux网络编程5种IO模型 根据UNIX网络编程对于IO模型的分类,UNIX提供了5种IO模型,分别是 阻塞IO 、 非阻塞IO、 IO复用 、 信号驱动IO 、 异步IO 。这几种IO模型在《UNIX网络编程》中有详解,这里作者只简单介绍,帮助大家回忆一下这几种模型。 对于Linux来说,所有的操作都是基于文件的,也就是我们非常熟悉的fd,在缺省的情况下,基于文件的操作都是 阻塞的 。下面就通过系统调用 recvfrom 来回顾下
                      63 0
                      |
                      3月前
                      |
                      前端开发 Java Maven
                      【Netty 网络通信】启动客户端连接服务端实现通信
                      【1月更文挑战第9天】【Netty 网络通信】启动客户端连接服务端实现通信
                      |
                      6月前
                      |
                      存储 前端开发 Java
                      Netty 爱好者必看!一文详解 ChannelHandler 家族,助你快速掌握 Netty 开发技巧!
                      Netty 爱好者必看!一文详解 ChannelHandler 家族,助你快速掌握 Netty 开发技巧!
                      126 0
                      |
                      9月前
                      |
                      编解码 JSON 应用服务中间件
                      Netty入门到超神系列-Netty开发Http服务器
                      这章我们使用Netty来写一个Http服务器类似于Tomcat ,当然Netty和Tomcat是有很多的异同的,比如通信协议,Tomcat是一个基于Http协议的Web容器,而Netty能够通过codec自己来编码/解码字节流 ,因此Netty可以通过编程自定义各种协议,我们今天的目的还是对Netty练练手。
                      115 0