Netty深入浅出学习

简介: Netty深入浅出学习

1.Netty 介绍和应用场景


1.1 介绍


  1. Netty 是jboss的一个开源框架
  2. Netty是一个异步的,基于事件驱动的网络应用框架
  3. 基于nio


1.2 应用场景


  1. Rpc 例如dubbo
  2. 游戏
  3. 大数据


涉及到网络通信的应用都可以使用netty


2. i/o模型


2.1 介绍


  1. bio 同步并阻塞 一个连接对应服务器一个线程   适用于连接数较少的架构 jdk1.4
  2. nio 同步非阻塞 服务器一个线程处理多个连接   适用于连接数较多连接时间短 jdk1.4
  3. aio(nio.2) 异步非阻塞  适用于连接数多且连接时间长 jdk1.7


2.2 bio


blocking i/o


2.2.1 简单demo


开发一个服务端,创建一个线程池,当客户端发送一个请求,服务端对应创建一个线程处理,当有多个客户端请求时,就会创建多个线程对应处理


这里demo的客户端用telnet模拟


public static void handler(Socket socket){

       try(InputStream in = socket.getInputStream();){

           System.out.println("线程信息: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());


           byte[] bytes = new byte[1024];

           while (true){

               int read = in.read(bytes);

               if(read!=-1){

                   System.out.println("输出信息: "+new String(bytes,"UTF-8"));

               }else {

                   break;

               }

           }

       }catch (IOException e){

           e.printStackTrace();

       }

   }


   public static void main(String[] args) {

       try(ServerSocket serverSocket = new ServerSocket(6666);) {

           ExecutorService executorService = Executors.newCachedThreadPool();


           System.out.println("线程信息: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());


           while (true){

               System.out.println("等待链接");

               final Socket socket = serverSocket.accept();

               System.out.println("链接到一个客户端");


               executorService.execute(new Runnable() {

                   @Override

                   public void run() {

                       System.out.println("线程信息: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());

                       handler(socket);

                   }

               });

           }


       } catch (IOException e) {

           e.printStackTrace();

       }


   }


2.3 nio


non-blocking i/o 非阻塞


2.3.1 简介


三大核心


  1. channel 通道
  2. buffer 缓冲区
  3. selector 选择器


简述操作原理:selector 选择可用的channel, channelbuffer可以相互读写,应用程序并不直接对channel进行操作,而是通过对buffer进行操作,间接操作channel


一个线程中会有多个selector,一个selector中可以注册多个channel,如果并没有数据传输,线程还可以做其他事,并不会一直等待


2.4 nio与bio 的区别


  1. nio非阻塞 bio阻塞
  2. nio用块的方式处理io  bio用流的方式处理io 块的方式比流的方式要快
  3. bio基于字节流/字符流  nio基于缓冲区和通道(channel) selector监听多个通道的事件,因此用一个线程就可以处理多个通道的数据


图示: nio



bio



3. nio详解


3.1 nio模型三大组件的关系


  1. 一个线程对应一个selector
  2. 一个selecor对应多个channel
  3. 一个channel对应一个buffer
  4. 一个线程对应多个channel
  5. channel与buffer都是双向的,就是既可以读也可以写 使用flip()方法切换
  6. buffer就是一个内存块,读写内存比较快
  7. selector会根据不同事件切换不同的channel


3.2 Buffer缓冲区


3.2.1 简介


本质是一个读写数据的内存块,可以理解成一个提供了操作内存块方法容器对象(数组)


缓冲区中内置了一些机制,这些机制可以检测到缓冲区的数据变化,状态变化


channel读写的数据必须都经过Buffer


3.2.2 源码分析


常用的几个操作方法


public static void main(String[] args) {

       //allocate 规定intbuffer的长度

       IntBuffer buffer = IntBuffer.allocate(5);


       //capacity()获取容量

       //put()写入

       for(int i = 0;i<buffer.capacity();i++){

           buffer.put(i*2);

       }


       //flip()反转 由写转为读

       buffer.flip();


       //读取

       //get()每次读取后 索引向后移动一位

       for(int i = 0;i<buffer.capacity();i++){

           System.out.println(buffer.get());


       }

   }


3.2.2.1 定义


IntBuffer中定义了一个int数组,其他类型的buffer类似


public abstract class IntBuffer

   extends Buffer

   implements Comparable<IntBuffer>

{


   // These fields are declared here rather than in Heap-X-Buffer in order to

   // reduce the number of virtual method invocations needed to access these

   // values, which is especially costly when coding small buffers.

   //

   final int[] hb;                  // Non-null only for heap buffers

   final int offset;

   boolean isReadOnly;                 // Valid only for heap buffers


最顶层的Buffer类中定义了四个属性


public abstract class Buffer {


   /**

    * The characteristics of Spliterators that traverse and split elements

    * maintained in Buffers.

    */

   static final int SPLITERATOR_CHARACTERISTICS =

       Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;


   // Invariants: mark <= position <= limit <= capacity

   private int mark = -1; //标记

   private int position = 0; //当前索引的位置,不能超过limit

   private int limit;//最大能读写的长度

   private int capacity;//容量 allocate定义的长度


3.2.2.2 反转


  public final Buffer flip() {

       limit = position;

       position = 0;

       mark = -1;

       return this;

   }


可以看到,反转之后,由读变为写,或者由写变为读


将索引归0,最大读写长度不能超过上次操作的索引


3.2 channel 通道


3.2.1 简介


  1. 通道类似于流/连接,但是流只能写入或者读取,通道可以即读取也写入
  2. 通道异步读写数据
  3. 通道可以读写数据到缓存区


3.2.2 层级关系



当有客户端发送请求时,服务端会创建一个ServerSocketChannel(实现类:ServerSocketChannelImpl) 再由ServerSocketChannel创建一个SocketChannel(实现类:SocketChannelImpl即真正读写数据的通道),这个SocketChannel就是与这个客户端请求所对应的


3.2.3 案例剖析


3.2.3.1 FileChannle 输出文件流


import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.FileChannel;


/**

* @author: zhangyao

* @create:2020-08-25 14:50

**/

public class FileChannelTest {

   public static void main(String[] args) {

       FileOutputStream fileOutputStream = null;

       try {

           //文件输出流

           fileOutputStream = new FileOutputStream("D:\\file01.txt");

  //文件输出流包装为FileChannel 此处FileChannel默认实现FileChannelImpl

           FileChannel fileChannel = fileOutputStream.getChannel();


           //创建对应的缓冲区

           ByteBuffer byteBuffer = ByteBuffer.allocate(1024);


           //数据写入缓冲区

           byteBuffer.put("hello nio".getBytes());


           //反转,因为接下来需要从缓冲区读取数据写入Channel

           byteBuffer.flip();


           //从缓冲区写入Channel

           fileChannel.write(byteBuffer);




       } catch (FileNotFoundException e) {

           e.printStackTrace();

       } catch (IOException e) {

           e.printStackTrace();

       } finally {

           //关闭文件流

           if(fileOutputStream!=null){

               try {

                   fileOutputStream.close();

               } catch (IOException e) {

                   e.printStackTrace();

               }

           }

       }

   }

}


整体流程就是把数据写入缓冲区,在读取缓存区写入通道Channel,在由文件输出流输出


图示如下



3.2.3.2 FileChanle 输入文件流


public static void main(String[] args) {

   FileInputStream fileInputStream = null;

   try {

       fileInputStream = new FileInputStream("D:\\file01.txt");


       //获取Channel

       FileChannel channel = fileInputStream.getChannel();


       //创建byteBuffer

       ByteBuffer byteBuffer = ByteBuffer.allocate(1024);


       //从channel中读取数据写入buffer

       channel.read(byteBuffer);


       //反转  下一步需要从buffer中读取数据输出

       byteBuffer.flip();


       //输出

       byte[] array = byteBuffer.array();

       System.out.println(new String(array));


   } catch (FileNotFoundException e) {

       e.printStackTrace();

   } catch (IOException e) {

       e.printStackTrace();

   } finally {

       try {

           fileInputStream.close();

       } catch (IOException e) {

           e.printStackTrace();

       }

   }


}


与上面的例子刚好相反,从文件中读取数据,通过通道写入buffer缓冲区,在输出


图示



3.2.3.3 FileChannel 拷贝文件


其实就是上面两个例子结合,把一个文件中的数据复制到另外一个文件中


public static void main(String[] args) {

   FileInputStream fileInputStream = null;

   FileOutputStream fileOutputStream = null;


   try {

       fileInputStream = new FileInputStream("D:\\file01.txt");

       fileOutputStream = new FileOutputStream("D:\\file02.txt");

       FileChannel channel = fileInputStream.getChannel();


       FileChannel channel1 = fileOutputStream.getChannel();


       ByteBuffer byteBuffer = ByteBuffer.allocate(1024);


       while (true){

           //将byteBuffer复位

           byteBuffer.clear();

           int read = channel.read(byteBuffer);

           if(read==-1){

               break;

           }


           byteBuffer.flip();

           channel1.write(byteBuffer);

       }



   } catch (FileNotFoundException e) {

       e.printStackTrace();

   } catch (IOException e) {

       e.printStackTrace();

   } finally {

       try {

           fileInputStream.close();

           fileOutputStream.close();

       } catch (IOException e) {

           e.printStackTrace();

       }

   }



}


这里使用了byteBuffer.clear()方法


因为ByteBuffer缓冲区是有长度的,当读取的文件超过缓冲区的长度时,如果不对缓冲区进行清空,当进行下一次读取时,就会从上一次读取的位置开始读取,会出现死循环的情况


3.2.3.4 FileChannel 拷贝文件之TransferFrom


public static void main(String[] args) {

   FileInputStream fileInputStream = null;

   FileOutputStream fileOutputStream = null;


   try {

       fileInputStream = new FileInputStream("D:\\file01.txt");

       fileOutputStream = new FileOutputStream("D:\\file02.txt");

       FileChannel channel = fileInputStream.getChannel();


       FileChannel channel1 = fileOutputStream.getChannel();


 

       //从channel通道拷贝到 channel1通道

       channel1.transferFrom(channel, 0, channel.size());


   } catch (FileNotFoundException e) {

       e.printStackTrace();

   } catch (IOException e) {

       e.printStackTrace();

   } finally {

       try {

           fileInputStream.close();

           fileOutputStream.close();

       } catch (IOException e) {

           e.printStackTrace();

       }

   }



}


3.2.4 Buffer的分散和聚集


上面的例子都是使用单个buffer进行数据的读写,如果数据过大,也可用使用多个buffer(buffer数组)进行数据的读写,即用空间换时间


3.3 Selector选择器


3.3.1 基本简介


一个selector管理多个channel通道,使用异步的方式处理io


只有读写真正的发生时,才会处理数据,减小了线程的压力,不用每个请求都维护一个线程


避免了多线程之间的上下文切换导致的开销


3.3.2 selector的api


Selector:


  1. select() 阻塞
  2. select(Long timeout) 有超时时间
  3. selectNow() 非阻塞
  4. wakeup() 立即唤醒selector


3.3.2 selecor的工作流程


其实是Selector SelectionKey ServerSocketChannel SorkcetChannel的工作原理


  1. 当客户端链接时,通过ServerSockertChannel 得到SocketChannel 并且注册到 Selector上
  1. 注册源码

public abstract SelectionKey register(Selector sel, int ops)

   throws ClosedChannelException;

这是SocketChannel注册到Selector上的方法,第一个参数为要注册的Selector对象,第二个参数为事件驱动的类型

public abstract class SelectionKey {

   public static final int OP_READ = 1;

   public static final int OP_WRITE = 4;

   public static final int OP_CONNECT = 8;

   public static final int OP_ACCEPT = 16;

   private volatile Object attachment = null;

  1. 当注册完成后返回一个Selectionkey,这个selectionKey会和SocketChannel关联
  2. Selector通过select方法监听Channel,如果有事件发生,返回对应的selectionKey集合
  1. 源码

public int select(long var1) throws IOException {

       if (var1 < 0L) {

           throw new IllegalArgumentException("Negative timeout");

       } else {

           return this.lockAndDoSelect(var1 == 0L ? -1L : var1);

       }

   }


   public int select() throws IOException {

       return this.select(0L);

   }


   public int selectNow() throws IOException {

       return this.lockAndDoSelect(0L);

   }


private int lockAndDoSelect(long var1) throws IOException {

       synchronized(this) {

           if (!this.isOpen()) {

               throw new ClosedSelectorException();

           } else {

               int var10000;

               synchronized(this.publicKeys) {

                   synchronized(this.publicSelectedKeys) {

                       var10000 = this.doSelect(var1);

                   }

               }


               return var10000;

           }

       }

   }

  1. 通过得到的selectionKey可以反向获取Channel
  1. 源码

   public abstract SelectableChannel channel();

  1. 最后通过channel处理业务


3.3.3 案例


服务端思路:


  1. 创建serverSocketChannel绑定端口6666,把这个channel注册到Selector上,注册事件是OP_ACCEPT
  2. 循环监听,判断是否channel中是否有事件发生,如果有事件发生,判断不同的事件类型进行不同的链接,读/写操作


客户端思路


  1. 创建一个SocketChannel,连接上服务器之后,发送消息,并保持链接不关闭


3.3.3.1 server端


import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.*;

import java.util.Iterator;

import java.util.Set;


/**

* @author: zhangyao

* @create:2020-08-26 16:55

**/

public class ServerChannel {

   public static void main(String[] args) {


       try {

           //生成一个ServerScoketChannel

           ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

           //设置为非阻塞的

           serverSocketChannel.configureBlocking(false);

           //serverSocket监听6666端口

           serverSocketChannel.socket().bind(new InetSocketAddress(6666));


           //创建Selector

           Selector selector = Selector.open();


           //serverSocketChannel注册到Selector

           SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);



           //循环等待链接

           while (true){

               //如果没有事件发生,就继续循环

               if(selector.select(1000) == 0){

                   System.out.println("等待1s,无连接");

                   continue;

               }


               //如果有事件驱动,就需要遍历事件

               Set<SelectionKey> selectionKeys = selector.selectedKeys();

               Iterator<SelectionKey> iterator = selectionKeys.iterator();

               while (iterator.hasNext()){

                   SelectionKey key = iterator.next();

                   //如果事件是连接

                   if(key.isAcceptable()){

                       try {

                           SocketChannel channel = serverSocketChannel.accept();

                           channel.configureBlocking(false);

                           SelectionKey register = channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

                           System.out.println("链接成功");

                       } catch (IOException e) {

                           e.printStackTrace();

                       }

                   }


                   //如果是读取数据

                   if(key.isReadable()){

                       SocketChannel channel = (SocketChannel) key.channel();

                       ByteBuffer byteBuffer = (ByteBuffer) key.attachment();

                       try {

                           int read = channel.read(byteBuffer);

                           byte[] array = byteBuffer.array();

                           System.out.println("读取数据:"+ new String(byteBuffer.array()));

                       } catch (IOException e) {

                           e.printStackTrace();

                       }

                   }

                   iterator.remove();



               };


           }


       } catch (IOException e) {

           e.printStackTrace();

       }



   }

}


3.3.3.2 客户端


import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SocketChannel;


/**

* @author: zhangyao

* @create:2020-08-26 17:24

**/

public class ClientChannel {

   public static void main(String[] args) {


       //创建一个SocketChannel

       try {

           SocketChannel socketChannel = SocketChannel.open();

           socketChannel.configureBlocking(false);

           InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);

           if(!socketChannel.connect(inetSocketAddress)){

               while (!socketChannel.finishConnect()){

                   System.out.println("服务器连接中,线程并不阻塞,可以进行其他操作");

               }

           }


           //连接成功

           socketChannel.write(ByteBuffer.wrap("hello ,server".getBytes()));


           System.in.read();





       } catch (IOException e) {

           e.printStackTrace();

       }

   }

}


4.Netty


4.1 简介


https://netty.io/ 官网


netty是对java nio api的封装,简化了nio程序的开发,jdk要求最低1.6


流行的网络编程通信框架,Dubbo Elasticsearch 等框架底层的网络通信框架都是 Netty


架构模型



版本


netty 共有 3.x 4.x 5.x三个大版本


3.x较老,5.x有重大bug,被官网废弃  现在主要使用4.x


4.2 线程模型


有以下几种线程模型


4.2.1 传统I/O阻塞模型


每一个链接都需要一个对应的线程进行处理,并且当链接建立后,如果当前链接没有数据传输时,此线程会被阻塞在read()方法


4.2.2 Reactor模式



原理图示如上


主要是针对了传统I/O模型一个连接会阻塞一个线程的问题进行了改进,当连接建立后都通过ServiceHandler调用线程池中的线程进行处理,这样就只用阻塞一个ServiceHandler线程,达到多路复用的目的


Reactor模式有三种实现方式


4.2.2.1单Reactor单线程



使用一个线程通过多路复用完成所有操作,包括读写连接


redis使用的就是这种模型 单线程


4.2.2.2单Reactor多线程



相对于单Reactor单线程,主线程不在进行业务处理,当有请求过来之后,具体的业务处理交与线程池中的线程处理,线程处理完成后再通过handler返回给Client


4.2.2.3 主从Reactor多线程



相比于单Reacotr,主从Reactor将Reactor分为MainReactor和SubReactor


MainReactor中负责分发和连接


SubReactor中负责读写


一个MainReactor可以对应多个SubReactor


4.2.3 Netty模型



简述Netty模型


  1. 角色
  1. BossGroup BossGroup的类型是NioEventLoopGroup,其中包含了很多NioEventLoop
  2. NioEventLoop nio事件循环,每个NioEventLoop中都有一个Selctor和一个任务队列
  3. WorkerGroup 类型是NioEventLoopGroup,与BossGroup类似,只不过功能不同,BossGroup只负责与客户端建立连接, WorkerGroup需要读写,处理业务
  4. PipeLine 管道,对Channel进行的封装,具体的业务处理是通过Pipline对Channel进行处理
  1. 具体流程
  1. 当客户端发送请求时,首先进入BossGroup,有NioEventLoop对请求进行事件轮询,如果是连接事件就进行处理
  1. 处理的步骤分为三步
  2. 轮询
  3. 注册 这里的注册指的是将生成的SocketChannel注册到workerGroup中的某个NioEventLoop中的Selector上
  4. 执行任务列表
  1. 当请求的事件是读写时,就有workerGroup对请求进行具体的业务处理
  1. 处理的步骤BossGroup类似
  1. 总结
    由此可以看出,Netty的模型与主从Reactor模型类似,都是由一个主Reactor负责连接事件,由一个从Reactor负责读写事件


4.2.4 案例demo


4.2.4.1 服务端


4.2.4.1.1 NettyServer


package netty;


import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;


/**

* @author: zhangyao

* @create:2020-09-03 08:55

**/

public class NettyServer {


   public static void main(String[] args) {


       //创建BossGroup 和 WorkerGroup

       EventLoopGroup bossGroup = new NioEventLoopGroup();

       EventLoopGroup workerGroup = new NioEventLoopGroup();


       ServerBootstrap serverBootstrap = new ServerBootstrap();

       ChannelFuture channelFuture = null;

       try {

       serverBootstrap.group(bossGroup, workerGroup)

               .channel(NioServerSocketChannel.class)

               .option(ChannelOption.SO_BACKLOG, 128)

               .childOption(ChannelOption.SO_KEEPALIVE, true)

               .childHandler(new ChannelInitializer<SocketChannel>() {


                   @Override

                   protected void initChannel(SocketChannel socketChannel) {

                       socketChannel.pipeline().addLast(new NettyServerHandler());

                   }

               });


       System.out.println("服务器就绪.....");



       //绑定端口

           channelFuture = serverBootstrap.bind(6668).sync();

       }catch (Exception e){

           e.printStackTrace();

       }finally {

           try {

               channelFuture.channel().closeFuture().sync();


               bossGroup.shutdownGracefully();

               workerGroup.shutdownGracefully();

           } catch (InterruptedException e) {

               e.printStackTrace();

           }

       }

   }

}


4.2.4.1.2 NettyServerHandler


package netty;


import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.util.CharsetUtil;


/**

* @author: zhangyao

* @create:2020-09-03 09:12

**/

public class NettyServerHandler extends ChannelInboundHandlerAdapter {


   //读取数据

   @Override

   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

       ByteBuf buf = (ByteBuf) msg;


       System.out.println("客户端发送消息:"+ buf.toString(CharsetUtil.UTF_8));

       System.out.println("客户端地址:"+ ctx.channel().remoteAddress());


   }


   //数据读取完毕

   @Override

   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

       ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",CharsetUtil.UTF_8));

   }


   //处理异常 关闭ctx

   @Override

   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

       ctx.close();

   }

}


4.2.4.2 客户端


4.2.4.2.1 NettyClient


package netty;


import io.netty.bootstrap.Bootstrap;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;


/**

* @author: zhangyao

* @create:2020-09-03 09:52

**/

public class NettyClient {

   public static void main(String[] args) {



       EventLoopGroup executors = new NioEventLoopGroup();



       Bootstrap bootstrap = new Bootstrap();


       try {

           bootstrap.group(executors)

                   .channel(NioSocketChannel.class)

                   .handler(new ChannelInitializer<SocketChannel>() {

                       @Override

                       protected void initChannel(SocketChannel socketChannel) throws Exception {

                           socketChannel.pipeline().addLast(new NettyClientHandler());

                       }

                   });


           System.out.println("客户端就绪........");


           ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();

           //关闭通道

           channelFuture.channel().closeFuture().sync();

       }catch (Exception e){

           e.printStackTrace();

       }finally {

           try {

               executors.shutdownGracefully().sync();

           } catch (InterruptedException e) {

               e.printStackTrace();

           }

       }


   }

}


4.2.4.2.2 NettyClientHandler


package netty;


import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.util.CharsetUtil;


/**

* @author: zhangyao

* @create:2020-09-03 10:00

**/

public class NettyClientHandler extends ChannelInboundHandlerAdapter {



   //就绪时触发

   @Override

   public void channelActive(ChannelHandlerContext ctx) throws Exception {

       System.out.println("ctx: "+ctx);


       ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务端", CharsetUtil.UTF_8));


   }


   //读取信息

   //这里读取的是服务器返回的信息

   @Override

   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


       ByteBuf byteBuf = (ByteBuf) msg;


       System.out.println("服务端发送消息: "+ byteBuf.toString(CharsetUtil.UTF_8));

       System.out.println("服务端地址: "+ ctx.channel().remoteAddress());


   }


   //异常处理

   @Override

   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

       ctx.close();

   }

}


一个简单的TCP服务通信,客户端发送消息,服务端接收消息并返回消息给客户端


4.2.4.3 案例demo源码分析


4.2.4.3.1 NioEventGroup


public NioEventLoopGroup() {

   this(0);

}


public NioEventLoopGroup(int nThreads) {

   this(nThreads, (Executor)null);

}


可以看到,如果使用无参的NioEventGroup,默认传递的是0,也可以指定线程数


一层一层找下去:


private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); //NettyRuntime.availableProcessors()获取当前计算机的核数(逻辑处理器)


protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {

   super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);

}


发现最后找到NioEventGroup父类的方法


如果指定了NioEventGroup的线程数,且不为0的时候,就使用指定的线程数


否则,**就使用当前计算机的核数2作为线程数


debug 看结果



电脑是12核,默认是24个线程


指定一个线程




就只有一个线程数


4.2.5 异步模型


上文中的案例Demo中的 ChannelFuture


ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();


异步模型Future是相对与同步来说的


异步指的是当一个异步调用发出后,不会立刻得到结果,而是通过回调,状态来通知调用者调用的结果


Netty 中的 connect 和 bind() sync方法就是返回了一个异步的结果,之后再通过监听获取到结果


也就是 Future-Listener机制


当Future对象刚创建的时候,处于未完成的状态,可以通过返回的ChannelFuture查看操作执行的状态,也可以注册监听函数来执行完成后的操作



isSucess()是否成功


isDone()是否完成


isCancelable() 是否取消


cause() 失败原因


addListener 增加监听器


//绑定端口

           channelFuture = serverBootstrap.bind(6668).sync();

           channelFuture.addListener(new ChannelFutureListener() {

               @Override

               public void operationComplete(ChannelFuture channelFuture) throws Exception {

                   if(channelFuture.isSuccess()){

                       System.out.println("监听端口成功");

                   }else {

                       System.out.println("监听端口失败");

                   }

               }

           });


4.2.6 Netty Http服务


做一个简单的demo  浏览器(客户端)访问服务器端7001端口,返回一个字符串信息


浏览器访问是http请求,服务器也需要相应一个httpResponse


4.2.6.1 服务端


NettyHttpServer 启动类


package netty.http;


import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;


/**

* @author: zhangyao

* @create:2020-09-04 11:16

**/

public class NettyHttpServer {


   public static void main(String[] args) throws InterruptedException {


       EventLoopGroup bossGroup = new NioEventLoopGroup();

       EventLoopGroup workerGroup = new NioEventLoopGroup();


       ServerBootstrap serverBootstrap = new ServerBootstrap();


       serverBootstrap.group(bossGroup,workerGroup)

               .channel(NioServerSocketChannel.class)

               .childHandler(new NettyHttpInitialize());


       ChannelFuture channelFuture = serverBootstrap.bind(7001).sync();


       channelFuture.channel().closeFuture().sync();


       bossGroup.shutdownGracefully();


       workerGroup.shutdownGracefully();


   }

}


NettyHttpInitialize 处理器类


对之前的ChannelInitialize(SocketChannel)进行封装


package netty.http;


import io.netty.channel.Channel;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.http.HttpServerCodec;


/**

* @author: zhangyao

* @create:2020-09-04 11:21

**/

public class NettyHttpInitialize extends ChannelInitializer<SocketChannel> {



   @Override

   protected void initChannel(SocketChannel sc) throws Exception {

       //得到管道

       ChannelPipeline pipeline = sc.pipeline();


       //管道中加入处理器 主要是处理Http请求的,解析请求头之类的

       pipeline.addLast("myDecoder",new HttpServerCodec());


       //加入处理器

       pipeline.addLast("myServerHandler",new NettyServerHandler());

   }

}


NettyServerHandler 具体的处理(返回http响应)


package netty.http;


import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.*;

import io.netty.util.CharsetUtil;

import org.springframework.http.HttpStatus;


/**

* @author: zhangyao

* @create:2020-09-04 14:01

**/

public class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {

   //读信息

   @Override

   protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {

       System.out.println(httpObject);

       System.out.println("客户端地址+"+ channelHandlerContext.channel().remoteAddress());



       ByteBuf byteBuf = Unpooled.copiedBuffer("hello , im 服务端", CharsetUtil.UTF_8);


       //返回客户端信息

       FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK,byteBuf);


       fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");


       fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());



       channelHandlerContext.writeAndFlush(fullHttpResponse);




   }


   @Override

   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

       ctx.close();

   }

}


出现的问题记录:


  1. 第一次绑定6668端口,浏览器访问失败,换成7001就可以了
    原因: 谷歌浏览器禁用了6665-6669以及其他一些不安全的端口
  2. 访问后第一次请求 出现异常,可以正常返回数据

    原因: NettyServerHandler中没有对异常处理方法进行重写
    加上这部分就可以了,报错信息也报的很明显

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

   ctx.close();

}


4.2.6.2 Http服务的过滤


可以对一些不希望处理的请求进行过滤,其实就是在对http请求的处理过程中判断一下请求的uri


在上文中 NettyServerHandler类中 加入以下代码,即可拦截/favicon.ico请求


HttpRequest re  = (HttpRequest) httpObject;


String uri = re.uri();

if(uri.equals("/favicon.ico")){

   System.out.println("不想处理,返回");

   return;

}


4.3 Netty API梳理


基于上述的各种demo,对Netty常用的类和方法进行系统梳理


4.3.1 Bootstrap


  1. ServerBootstrap 服务端启动引导类
    BootStrap 客户端启动引导类
  1. .group() 给BootStrap设置NioEventLoopGroup,可以设置多个
  2. .channel() 设置服务使用的通道类
  3. .option() 设置通道参数
  4. .handler() 对BossGroup进行设置
  5. .childrenHandler() 对workerGroup 进行设置
  6. .bind() 服务端绑定一个端口号,监听端口
  7. connect() 客户端用于连接服务端


4.3.2 Future


Netty中的io操作都是异步的,也就是不能立刻返回结果,而是当完成了之后再通知调用者


  1. Future
  2. ChannelFutrue
    方法
  1. channel() 返回当前正在进行IO操作的通道
  2. sync() 转为同步,等待异步操作完毕


4.3.3 Channel


不同协议,不同阻塞类型都有对应的Channel


  1. NioSocketChannel 异步tcp协议Socket连接
  2. NioServerSocketChannel 异步tcp协议服务端连接
  3. NioDatagramChannel 异步udp连接
  4. NioSctpChannel 异步sctp客户端连接
  5. NioSctpServerChannel 异步sctp服务端连接


4.3.4 Selector


Netty 基于Nio Selector对象实现多路复用,一个selector管理多个channel


4.3.5 ChannelHandler


主要是用于对数据的处理,其中有很多封装好的方法,使用的时候继承其子类即可


实现类



子类很多,常用的几个


channelHandler


  1. ChannelInboundHandler
  2. ChannelOutboundHandler
  3. 适配器
  1. channelInboundHandlerAdapter
  2. channelOutboundHandlerAdapter


4.3.6 pipeline



结构图如上


channel中可以创建出一个ChannelPipeline, ChannelPipeline中有维护了一个由ChannelHandlerContext组成的双向链表


每个ChannelHandlerContext又对应了一个Channelhandler


常用方法:


addFirst(); 添加一个Handler到链表中的第一个位置


addLast(); 添加到链表的最后一个位置


4.3.7 channelHandlerContext


每一个channelhandlerContext包含了一个channelHandler(业务处理)


channelHandlerContext中还可以获取到对应的channel和pipeline信息


channelHandlerContext.channel();


channelHandlerCOntext.pipeline();


4.3.8 EventLoopGroup


netty一般提供两个EventLoopGroup BossEventLoopGroup 和 workerEventLoopGroup


EventLoopGroup可以指定使用的核心是多少个


4.3.9 Unplooed


Netty提供的用来操作缓冲区数据的工具类


常用方法:


copiedBuffer(); 返回的是Netty提供的 Bytebuf对象


4.3.10 ByteBuf


Netty的数据容器(缓冲区)


可以直接进行读/写 读写之间不需要进行flip(),原因是ByteBuf内部维护了两个索引 readIndex writeIndex


常用方法


getByte()


readByte()


writeByte()


capacity()


4.4 Netty 心跳检测机制


当客户端长时间没有读/写操作时,服务端需要检测客户端是否还处于连接状态,也就是心跳检测


Netty提供了心跳检测的处理类  IdleStateHandler


示例代码


package netty.hearbeat;


import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.ServerSocketChannel;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.logging.LoggingHandler;

import io.netty.handler.timeout.IdleStateHandler;


import java.util.concurrent.TimeUnit;


/**

* @author: zhangyao

* @create:2020-09-10 10:04

**/

public class MyServer {



   public static void main(String[] args) throws Exception{


       EventLoopGroup bossGroup = new NioEventLoopGroup();

       EventLoopGroup workerGroup = new NioEventLoopGroup();


       ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup,workerGroup)

               .channel(NioServerSocketChannel.class)

               .handler(new LoggingHandler())

               .childHandler(new ChannelInitializer<SocketChannel>() {

                   @Override

                   protected void initChannel(SocketChannel socketChannel) throws Exception {

                       ChannelPipeline pipeline = socketChannel.pipeline();

                       pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));

                       pipeline.addLast("inleHandler",new MyHearBeatHandler());

                   }

               });


       ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();


       channelFuture.channel().closeFuture().sync();


       bossGroup.shutdownGracefully();

       workerGroup.shutdownGracefully();

   }

}


package netty.hearbeat;


import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandler;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;


/**

* @author: zhangyao

* @create:2020-09-10 16:50

**/

public class MyHearBeatHandler extends ChannelInboundHandlerAdapter {

   @Override

   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

       ctx.close();

   }


   @Override

   public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {


       if(o instanceof IdleStateEvent){


           IdleStateEvent event = (IdleStateEvent) o;


           IdleState state = event.state();

           switch (state){

               case READER_IDLE:

                   System.out.println("读空闲");

                   break;

               case WRITER_IDLE:

                   System.out.println("写空闲");

                   break;

               case ALL_IDLE:

                   System.out.println("读写空闲");

                   break;

           }

       }

   }





}


客户端没有区别


4.5 Netty 之 webSocket


使用netty编写webSocket长连接


4.5.1 服务端


package netty.websocket;


import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

import io.netty.handler.stream.ChunkedWriteHandler;


/**

* @author: zhangyao

* @create:2020-09-11 14:43

**/

public class MyServer {



   public static void main(String[] args) throws Exception{


       EventLoopGroup bossGroup = new NioEventLoopGroup();

       EventLoopGroup workerGroup = new NioEventLoopGroup();



       ServerBootstrap serverBootstrap = new ServerBootstrap();

       serverBootstrap.group(bossGroup,workerGroup)

               .channel(NioServerSocketChannel.class)

               .childHandler(new ChannelInitializer<SocketChannel>() {

                   @Override

                   protected void initChannel(SocketChannel socketChannel) {

                       ChannelPipeline pipeline = socketChannel.pipeline();


                       //添加http解码器

                       pipeline.addLast(new HttpServerCodec());

                       //添加块传输处理器

                       pipeline.addLast(new ChunkedWriteHandler());

                       //http分段传输,增加一个聚合处理

                       pipeline.addLast(new HttpObjectAggregator(8192));

                       //增加websocket协议处理

                       pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));



                       //增加自定义处理业务处理器

                       pipeline.addLast(new MyServerHandler());

                   }

               });



       ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();


       channelFuture.channel().closeFuture().sync();


       bossGroup.shutdownGracefully();

       workerGroup.shutdownGracefully();

   }

}


4.5.2 服务端自定义的处理器


package netty.websocket;


import io.netty.channel.Channel;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;


import java.time.LocalDate;

import java.time.LocalDateTime;

import java.util.Date;


/**

* @author: zhangyao

* @create:2020-09-11 14:49

**/

public class MyServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {



   @Override

   public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

       //当客户端关闭连接

       System.out.println("客户端关闭连接..."+ ctx.channel().id());

   }


   @Override

   public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

       //当客户端连接到服务端

       System.out.println("有客户端连接到服务端 id为" + ctx.channel().id());


   }


   @Override

   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

       //异常关闭连接

       ctx.close();

   }


   //收到消息

   @Override

   protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

       //读取消息并返回相同的消息返回给客户端

       String text = textWebSocketFrame.text();

       System.out.println("服务端收到消息" + text);


       //返回给客户端


       Channel channel = channelHandlerContext.channel();

       channel.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now()+"  服务端返回消息:" + text));


   }

}


4.5.3 页面(webSocket客户端)


<!DOCTYPE html>

<html lang="en">

<head>

   <meta charset="UTF-8">

   <title>测试netty+webSocket</title>

</head>

<body>


   <form onsubmit="return false">

       <textarea id="sendMessage" style="height: 300px;width: 300px" placeholder="请输入要发送的消息"></textarea>

       <button onclick="send(document.getElementById('sendMessage').value)">发送消息</button>


       <textarea id="responseMessage" style="height: 300px;width: 300px" ></textarea>

       <button onclick="document.getElementById('responseMessage').value=''">清空消息</button>


   </form>

</body>



<script type="application/javascript">


   var websocket;


   if(!window.WebSocket){

       alert("浏览器不支持webSocket")

   }else {

       //进行webSocket的开启 关闭


       websocket = new WebSocket("ws://localhost:7000/hello");

       //webSocket 开启事件

       //给消息返回框加入一条数据

       websocket.onopen = function (ev) {

           document.getElementById("responseMessage").value = '连接到服务端';

       }


       websocket.onclose = function (ev) {

           document.getElementById("responseMessage").value += '\n 连接关闭';

       }


       //当服务端响应消息时触发  将服务端返回的消息回显致文本框

       websocket.onmessage = function (ev) {

           document.getElementById("responseMessage").value += '\n ' ;

           document.getElementById("responseMessage").value += ev.data ;

       }


   }


   //发送消息

   function send (message) {

       if(!window.websocket){

           alert("socket还未初始化完成");

           return;

       }


       if(websocket.readyState == WebSocket.OPEN){

           websocket.send(message);

           document.getElementById('sendMessage').value=''

       }

   }



</script>

</html>


4.6 Netty 编码解码



网络传输过程中的编码解码过程


codec 编解码器 包含 encoder编码器 和 decoder解码器


netty中提供了一些StringCodec ObjectCodec的编解码器,但是这些编解码器还是依赖java底层的序列化技术,java底层的序列化是比较低效的,所以需要引入新的高效的序列化技术


4.6.1 ProtoBuf


4.6.2 自定义编码解码器


package netty.inboundAndOutbound;


import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import netty.inboundAndOutbound.client.MyClientMessageToByteHandler;


/**

* @author: zhangyao

* @create:2020-09-18 14:53

**/

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

   @Override

   protected void initChannel(SocketChannel socketChannel) throws Exception {

       ChannelPipeline pipeline = socketChannel.pipeline();


       //添加入栈解码器

       pipeline.addLast(new ByteToMessageHandler());


       //添加出栈编码器

       pipeline.addLast(new MyClientMessageToByteHandler());


       //添加自定义处理器

       pipeline.addLast(new MyServerHandler());

   }

}


package netty.inboundAndOutbound;


import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;


import java.util.List;


/**

* @author: zhangyao

* @create:2020-09-18 14:54

**/

public class ByteToMessageHandler extends ByteToMessageDecoder {

   //自定义实现的入栈解码器

   @Override

   protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

       if(byteBuf.readableBytes()>=8){

           list.add(byteBuf.readLong());

       }

   }

}


package netty.inboundAndOutbound.client;


import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;


import java.util.List;


/**

* @author: zhangyao

* @create:2020-09-18 16:16

**/

public class MyClientByteToLong extends ByteToMessageDecoder {

   @Override

   protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

       if(byteBuf.readableBytes()>=8){

           list.add(byteBuf.readLong());

       }

   }

}


4.6.3 handler处理机制及出栈入栈


通过上面的编码解码进而延申到handler的处理机制



简单的netty出栈入栈的解释图


出栈和入栈是相对于而言的,当客户端发送消息到达服务端,对于客户端来说就是出栈,对于服务端来说就是入栈,反之亦然


4.7 Tcp沾包 拆包


4.7.1 沾包拆包介绍


tcp服务再发送消息的时候,如果发送的多个包.数据量小且包的数量比较多,Tcp就会通过算法将多个包合并为一个大的数据包发送给接受端,这样产生的问题就是接收端无法识别出完整的包,由此产生的问题就是沾包拆包



如上图所示,Client端向Server端发送D1 D2两个数据包


Server端读取的时候,可能会产生四种情况


1.分两次读取 分别读取到了D1 D2两个数据包,不存在沾包拆包现象


2.一次读取,读到了D1D2两个数据包合在一起的包,出现沾包现象


3.分两次读取,第一次读取到了D1和D2的一部分数据,第二次读取到了D2的剩余部分数据,出现了拆包现象


4.分两次读取,第一次读取到了D1的一部分数据,第二次读取到 了D1的剩余部分数据和D2的所有数据,出现拆包现象


4.7.2 解决方案


思路:控制接收端读取内容的长度来解决问题


方案: 通过自定义解析+编解码器来解决拆包沾包问题

目录
相关文章
|
4月前
|
开发工具 git
网络编程(三)netty学习demo和笔记和推荐的4本书
网络编程(三)netty学习demo和笔记和推荐的4本书
107 0
|
4月前
|
存储 网络协议 Java
Netty应用实例学习
Netty应用实例学习
21 0
|
4月前
|
编解码 缓存 网络协议
Netty核心功能学习
Netty核心功能学习
30 0
|
4月前
|
编解码 网络协议 Java
Netty基础入门学习
Netty基础入门学习
33 0
|
6月前
|
前端开发 网络协议 API
学习Netty BootStrap的核心知识,成为网络编程高手!
学习Netty BootStrap的核心知识,成为网络编程高手!
45 0
|
11月前
|
Rust Dubbo 网络协议
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
12560 3
|
弹性计算 缓存 网络协议
netty学习(三)
Java NIO 编程
83 0
|
Java
netty学习(二)
Java BIO 编程
95 0
netty学习(二)
|
分布式计算 Dubbo 网络协议
netty学习(一)
netty的介绍和应用场景
135 0
netty学习(一)
|
存储 编解码 安全
基于Netty的IM聊天加密技术学习:一文理清常见的加密概念、术语等
本文正好借此机会,以Netty编写的IM聊天加密为例,为入门者理清什么是PKI体系、什么是SSL、什么是OpenSSL、以及各类证书和它们间的关系等,并在文末附上简短的Netty代码实示例,希望能助你通俗易懂地快速理解这些知识和概念!
186 0
基于Netty的IM聊天加密技术学习:一文理清常见的加密概念、术语等