Netty基础篇:详解Netty底层NIO

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Netty基础篇:详解Netty底层NIO


前言

一:Java当前体系核心组成部分

客户端+服务端。
客户端和服务端进行通讯,如何与我们的服务端进行沟通,这是由我们的网络技术,早期使用netty的这些人,
他们往往再游戏领域。

二:并发问题

用什么服务支持用户的并发。早期支持并发的方式,我们使用水平拓展,早期的话就是部署多个Tomcat实例,做好负载均衡(LB),提高整个系统的并发支撑能力,这里边里离不开更多的硬件支持。

三:进程间通信

进程间通信都是基于RPC的。
netty是负责进程间通讯,进程间通讯都是基于RPC,进程间调用都是必须基于网络的。我们的服务都得搭建集群
我们的集群不论是主从式集群还是主备式集群,都得走网络,离不开网络通讯。
Rpc这个技术的底层设计思想就是应用了我们的代理设计模式。
grpc是谷歌为我们做的Rpc的一种技术手段。也是我们认为单纯Rpc角度做的最好的。
grpc本身是由Go语言做的,实际上也是可以通过Java接口处理。我们需要把代理设计模式再Rpc体系中研究透彻。
Dubbo底层本质上也会做代理:Dobbu底层基于Spring的Aop适配了代理。
通讯,我们需要解决序列化,需要解决序列化过程的效率问题。

1:Socket

1):什么是Socket

Socket是一种基于基于TCP/IP协议的网络编程接口又叫做套接字,他是一个抽象层,应用程序可以通过他发送和接收数据
套接字允许应用程序将IO插入到网络中,并与网络中的其他应用程序进行通信,网络套接字是Ip和端口的组合
为了满足不同的应用程序一般的通络系统,提供了三种不同类型的套接字:流式套接字,数据报套接字和
原始套接字
winsock:基于TCP/IP协议的编程接口
winsock=windows+sockets
socket:插座,使用Socket可不需要了解底层的实现细节。

2):网络通信程序如何辨识自己和对方?

三点:ip地址+端口号+通信协议
端口号对应应用程序,端口号位于传输层,有两种协议,一种是TCP协议,一种是UDP协议,这两中协议端口号
是公用的,我们通过ip+端口号+通信协议,就可以唯一标识双方:
本机ip+本机端口+本机协议
远程ip+远程端口+远程协议
可以标识方双,双方一定是同样的协议就能进行通信。

第一章:什么是Netty

一:简介

Netty是一个NIO的客户端服务框架,快速容易的开发网络应用,简化了网络编程,如TCP和UDP套接字服务器。
快速和简单 "并不意味着开发出来的应用程序会出现可维护性或性能问题。Netty的设计是经过精心设计的,
其经验来自于许多协议的实施,如FTP、SMTP、HTTP以及各种基于二进制和文本的遗留协议。因此,Netty
成功地找到了一种方法来实现开发的简易性、性能、稳定性和灵活性,而没有任何妥协。

1:版本

当前Netty的版本,以稳定以4.x版本为核心,中间件最稳定的还是4.x版本。5还是开发版。

2:为什么选Netty

1. Netty已经是行业内网络通信编程的标准,广泛应用于通信领域和很多其他中间件技术的底层。
2. 应用非常广泛
   1. 游戏行业
   2. 很多框架的通信底层,解决进程间通信。
      Spring WebFlux、storm、rocketMQ、dubbo等,是分布式系统,通信的核心。
3. 什么是NIO
NIO全称成为None Blocking IO (非阻塞IO),JDK1.4引入。
非阻塞主要应用在网络通信中,能够合理利用资源,提高系统的并发效率。支持高并发的系统访问。

3:传统网络通信中的开发方式及问题

阻塞是我们传统网络开发当中一定会遇到的问题,传统网络开发方式就是基于Socket编程,我们传统开发基于Tomcat服务器,基于Http应用层协议,仿佛没有应用到Socket编程,这个理念是错误的。Http是应用层协议,我们说的是底层的协议。换句话说,我们编程时使用的是HTTP协议,但是底层还是Socket(套接字)

1):多线程版网络编程

过程描述:

传统的网络Socket(套接字)网络编程的话,客户端同时发出多个请求之后,后台服务器需要对应产生多个线程来处理请求,我们每一个request都会对应一个线程,底层上来讲都会产生这样的要给线程。

一个客户端发出一个请求,客户端一定会有一个线程与之对应。当有一百万个请求之后,也就会有一百多万个线程,然而线程创建的时候会有开销的,因为这是操作系统层面的东西。我们这就相当于操作虚拟机与操作系统进行通讯,这个过程是有大量的实践操作的,类似的操作还有我们的IO和数据库的连接。这些都是稀缺资源。这就是我们说的线程创建的开销。另外一个就是我们的线程会占用很多内存个,一个线程会占用1mb左右的内存,啥也不干的情况下,单单启动这些线程就会占用很多资源。另外,线程都是有线程上下文的,线程切换CPU的占用的时候,线程上下文切换一定是占用CPU的,浪费了CPU的资源。这就是传统的阻塞开发面临的问题。

以上的核心问题:无法控制线程的创建数量。

2):线程池版网络编程

过程描述:

在上述基础上,我们基于池化思想来使用线程池。预先创建好一些线程,解决了第一个问题,也就是创建线程的时间开销(后边即使在创建也是有上限的),这样也就不会无限制的创建线程,本质上是线程数量上限定了。但是当我们池中资源使用完了,则新入请求进入排队当中等待线程。这样内存的占用也是一个合理的请求,而且上下文切换好像也是问题不大了。

代码解释:

当前我们CPU4核的话,也就支持4个线程并行执行,这样的话核心线程数设置成4个最好。上述代码中是为了获取当前计算机的CPU的核数,但是这个JDK8以前或取的都是当前CPU的核数,JDK8以后支持我们指定参数,因为JDK8版本以后,我们的Java代码往往跑在Docker中去了,不直接运行在操作系统层面上了,这个CPU核数已经不准了。这个编码过程中不要写死我们的core pool size。20代表我们最大可以创建多少线程数,队列也需要设置多少个线程可以进去。120L最大等待时长,如果我们线程不忙的时候,等待120就会把这个线程给销毁掉直接等到数量到核心线程数。后边那个是等待市场的单位。

解决了线程了创建的开销。内存占用高,不能无限制创建线程,CPU使用率较高。但是来了一个新的问题。因为阻塞,造成了有限的线程资源的浪费。

新问题:

客户端1阻塞了,比方说客户端1建立连接之后,需要输入点东西。导致后台线程一致空空置状态。这个问题当前环境是没有办法解决的。这个就会导致并发性变差,因为有大量新请求进入到阻塞队列当中。

本质上来讲,这种方式很好解决了多线程版的问题,基于线程池解决了大量创建线程的开销,合理的适用内存喝CPU但是也会出现因为客户端阻塞造成的后台线程资源浪费。

3):NIO网络通信中的非阻塞编程

IO解决的主要是两个问题:1、文件的IO 2、网络通信。

NIO的网络通信中的特点在于,客户端不再使用流这种传统的IO方式进行,传统的流的这种形式都会基于InputStream或者OutPutStream来进行输入和输出。但是在NIO当中,是使用的是管道的这种形式。

每个客户端对应着一个管道,每一个请求发送给我们的服务器,我们NIO在网络编程当中,不仅仅保留了ServerSocket,还引入了一个非常重要的类型Selector(选择器或着叫多路复用器)

作为Selector来讲,能帮我们解决的问题是这样的,监控个客户端的channel这是一种主动式的监控,如果是客户端发短信进行通信的时候,是否能进行正常的通讯(1.正常的鉴别原因,没有阻塞。)监控过程当中有正常的读写,且无阻塞,就会给他分配线程。当其中某一个客户端channel阻塞了之后,会被Selector监控到,就会把分配给他的后台线程解放出来,给到其他的Channel,这跟传统的网络通通信有一个本质的区别,后台的线程当中可能同时给多个Channel使用。这就是NIO版的网络通信,解决客户端阻塞导致后台资源闲置的方式。基于NIO网络通信技术,在客户端和后台服务体系中引入Channel和Selector之后,后台线程的阻塞问题得到了解决。

为什么要讲NIO,以为Netty是对NIO的封装,Netty的底层就是NIO。

第二章:NIO的基本开发方式

NIO当中最为核心的部分为Channel和Buffer,为什么没有Selector因为Selector只存在于网络编程中。对于我们的网络编程,核心部分肯定是有Selector的

一:Channel简介

IO通信管道,类似于我们的InputStream和OutputStream,但是现在Channel没有任何方向性的。

1):常见的Channel

文件操作

FileChannel读写文件中的数据。

网络操作

SocketChannel,通过TCP读写网络中的数据

ServerSocketChannel,监听新进来的TCP链接,像web服务器那样,对每一个新进来的TCP链接创建一个SocketChannel

DatagramChannel,通过UDP读取网络中的数据。

2):获取方式

一定不是通过我们直接new出来的,而是通过工厂的方式进行创建。

文件相关Channel:

1.通过传统的FileInputStream或者是FileOutPutStream来获取

2.通过RandomAccessFile来获取

网络操作:

1.Socket

2.ServerSocket

3.DataGramSocket来获取

我们通过这些类来进行获取即可。

二:Buffer

1:Buffer简介

Buffer一定学好,Netty当中大量都是对Buffer和Channel的封装。

1.Channel读取或者写入的数据,都要写到Buffer当中才可以被程序操作。

2.因为Channel没有方向性,所以Buffer为了区分读写,引入了读模式和写模式来进行区分。

Channel当中的数据最终都是会留入到Buffer当中。Channel作为通信管道,讲数据存储到Buffer当中,基于Buffer中对数据的存储,程序在进行后续的操作。

1):为什么写入到Buffer当中

这是一个效率的问题,IO将一部分数据操作到缓冲区当中在让我们的程序进行操作会大大提升效率。

2):区分读写操作

写模式:Buffer新建,Clear()方法执行后,compact()方法执行后 – 写入Buffer 读模式:flip()方法执行完毕之后。 – 从Buffer当中读取

我们的Buffer是有状态的,分为读模式和写模式。Channel没有方向,读写从Channel当中无法可知。同一时间Buffer模式只能有一种,不能又是读,又是写,Buffer线程独享的。

1:常用的Buffer

Buffer是一个抽象类。常用的Buffer如下:

ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
MappedByteBuffer

八种基本类型都会对应一个Buffer,ByteBuffer是我们后续使用率最高的Buffer,因为我们后续不论是字节还是字符,最终落脚点都是字节。MapperByteBuffer是ByteBuffer的子类,也是非常重要的一个Buffer

3:Buffer获取方式

1.通过ByteBufer.allocate(10);直接通过这种方式分配内容,这个10指的是什么代表的是10Byte

2.通过"xxxxxx".encode()来获取,可以直接将字符生成一个ByteBuffer

三:第一个NIO程序

当我们文件中的内容比buffer缓冲区大小大的时候。读文件的时候读不全。

public class TestNIO1{
    public static void main(String[] args) throws IOException {
        //1 创建Channel通道  FileChannel
        FileChannel channel = new FileInputStream("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data.txt").getChannel();
        //2 创建缓冲区
        //1234567890
        ByteBuffer buffer = ByteBuffer.allocate(10);//10个字节也就是10BIT
        while (true) {
            //3把通道内获取的文件数据,写入缓冲区
            int read = channel.read(buffer);//本次从管道中读到了多少
            if (read == -1) break;
            //4.程序读取buffer内容,后续的操作。设置buffer为读模式。
            buffer.flip();
            //5.循环读取缓冲区中的数据
            while (buffer.hasRemaining()) {
                byte b = buffer.get();
                System.out.println("(char)b = " + (char) b);
            }
            //6. 设置buffer的写模式
            buffer.clear();//这是一个非常好的习惯。
        }
    }
}

UTF-8字符集的编码下,一个数子或者一个字母对应一个bit,一个汉字占用三个字节。正常来讲在UTF-8字符下,他会用1-4个字节来为字符编码,askII码占用的都是一个字节。拉丁文、希腊文、阿拉伯文都是两个字节,中文、韩文、日文都是三个字节,少数国文是四个字节

GBK编码下:一个中文占用的是两个字节。

刚才我们1234567890一共需要10个字节就可以了,能够存的下,但是当我们后边在家东西的时候,我们的buffer就已经存不下了,这样后面的东西就丢了。怎么解决这个问题呢?我们要分批次的循环的去读取去进行读取。

channel.read(buffer)如果读取的时候读不到新的东西了,那个返回值会变成-1 ,这样的设计,我们就重复利用了我们的Buffer。如果我们在执行完一边读数据之后,没有将Buffer的模式设置为写模式,在最外层大循环中,就不会再进行读数据了。这样就变成了死循环。

这个buffer我们到底设置成多大的内存呢?决定于我们的内存的大小,我们一般设置为1024的倍数,这个空间写大了占内存,写少了会占用CPU。

四:第一个NIO程序分析

public class TestNIO1 {
    public static void main(String[] args) throws IOException {
        //1 创建Channel通道  FileChannel
        FileChannel channel = new FileInputStream("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data.txt").getChannel();
        //2 创建缓冲区
        //1234567890
        ByteBuffer buffer = ByteBuffer.allocate(10);//10个字节
        while (true) {
            //3把通道内获取的文件数据,写入缓冲区
            int read = channel.read(buffer);
            if (read == -1) break;
            //4.程序读取buffer内容,后续的操作。设置buffer为读模式。
            buffer.flip();
            //5.循环读取缓冲区中的数据
            while (buffer.hasRemaining()) {
                byte b = buffer.get();
                System.out.println("(char)b = " + (char) b);//不强制转换的话,打印的是int值
            }
            //6. 设置buffer的写模式
            buffer.clear();
        }
    }
}
public class TestNIO2 {
    public static void main(String[] args) {
        //RadomAccessFile 异常处理
        FileChannel channel = null;
        try {
            channel = new RandomAccessFile("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data.txt", "rw").getChannel();
            ByteBuffer buffer = ByteBuffer.allocate(10);
            while (true) {
                int read = channel.read(buffer);
                if (read == -1) break;
                buffer.flip();
                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    System.out.println("(char) b = " + (char) b);
                }
                buffer.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}
public class TestNIO3 {
    public static void main(String[] args) {
        try (FileChannel channel = FileChannel.open(Paths.get("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data.txt"), StandardOpenOption.READ);) {
            ByteBuffer buffer = ByteBuffer.allocate(10);
            while (true) {
                int read = channel.read(buffer);
                if (read == -1) break;
                buffer.flip();
                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    System.out.println("(char)b = " + (char) b);
                }
                buffer.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    } 
}

我们在读取一个文件的时候,除了使用fileinputStream获取fileChannel,也可以使用RandomAcessFile或者使用1.7JDK提供的FileChannel.open()来获取。

try(new resource){}catch(Exception e){}这种写法会编译器会自动给我们调用关闭方法。省写一些东西。

五:NIO开发步骤总结

1. 获取Channel 
2. 创建Buffer
3. 循环的从Channel中获取数据,读入到Buffer中。进行操作.
    channel.read(buffer);
    buffer.flip();//设置读模式
    循环从buffer中获取数据。
    buffer.get();
    buffer.clear();//设置写模式

第三章:ByteBuffer详解

我们的学习重点应该放到Buffer上边。从这里我们把主要精力放到这里。

1:抽象类ByteBuffer

ByteBuffer是一个抽象类,抽象类和接口很像,那在面向对象设计的过程中,我们什么时候设计成接口,什么时候设计成抽象类呢?注意,在我们的设计过程中, 我们名词的一般设计成抽象类,抽象的动作、功能设计成接口。汽车也是一个抽象的概念。DAO,或者Service也设计成接口。唯独只见过一个特例,就是这个输入流和输出流InputStream和OutputStream,应该设计成接口类。

名词设计成抽象类,动词设计成接口。

1):具体实现类

1. HeapByteBuffer    堆ByteBuffer        JVM内的堆内存  --->  读写操作 效率低 会收到GC影响 
2. MappedByteBuffer(DirectByteBuffer)       OS内存       --->  读写操作 效率高 不会收到GC影响 。 
不主动析构(析构函数:释放内存),会造成内存的泄露

两个核心子类型,后续我们在编程过程当中最常使用的两个类型:HeapByteBuffer和DirectByteBuffer这两种。

HeapByteBuffer使用的是JVM内存,他使用的过程中会间接的操作操作系统,这样和传统的IO是没啥区别的。我们的虚拟机是操作系统上的一个进程,JVM想要访问文件系统需要首先调用操作系统API打通到操作系统间的访问,然后在通过操作系统的API简介访问文件系统。

这样的文件数据首先会存在于操作系统的内存区域当中,在从操作系统的内存区域当中拷贝到JVM内存当中。而第二种的话,只需要将数据存储于操作系统的内存区域当中,然后直接由JVM直接访问操作系统的内存区域。这样就很大节省了时间,后边我们将的0拷贝会详细讲解这个东西。

内存溢出

总内存100,所需内存较大超过100,已无多余内存进行分配。outofmemory

内存泄露

100内存中,实际上处理的数据是80,但是有的内存没被释放,内存就不够了。

主要原因:

1.没有主动析构:也就是及时的释放内存

2.内存碎片过大:我们的内存被划分成一个一个的小区域,结果加起来够,但是单独谁也不够。他们彼此之间有缝隙,这个缝隙就是碎片。这个缝如果太大的话,就会导致,这些内存浪费了。

我们需要使用更优秀的内存管理器,这样碎片就很小。比如Redis,他整合了优秀的第三方内存你管理器,让缝隙更小。

2):获取方式

获取ByteBuffer对象的方式常用的一共有两种。

1. ByteBuffer.allocate(10);//一旦分配空间,不可以动态调整,想要调整,创建新的。
2. encode()

数据入Buffer一定是写模式,数据出Buffer一定是读模式。

在我们后续的Netty当中,Netty对NIO做了封装,他的获取是可以动态调整容量的。他把他看NIO不爽的地方都更改了。

encode主要用在字符串处理的时候来进行处理。上述两种,我们基于实际情况进行处理。

3):核心结构

ByteBuffer是一个类似于数组的结构(对数组做了封装),整个结构中包含三个主要的参数
4. Capacity 
   buffer的容量,类似于数组的size
5. Position
   buffer当前缓存的下标,在读取操作时记录读到了那个位置,在写操作时记录写到了那个位置。从0开始,每读取一次,
   下标+1 读取第6次的时候下标是5
6. Limit
   读写限制,在读操作时,设置了你能读多少字节的数据,在写操作时,设置了你还能写多少字节的数据
   控制我们能读多少,能写多少的数据。换句话说,从缓冲区读写操作的时候的一个上限值。读到那个位置,
   或者写到那个位置就必须停止。

所谓的读写模式,本质上就是这几个状态的变化。主要有Position和Limit联合决定了Buffer的读写数据区域。

**我们所谓的调用方法改变改变读写模式的状态,本质上就是对于以上三个参数的修改。**以下两张图会对这个问题做很好的解释。

这种方式是不会把数据清空掉的。在往里边进行存数据的时候,会将这些数据给覆盖掉。(这样好像有坑)

compack的这种方式是,我本次读取没有读完,然后调用compack方法将未读完内容前移,假如未读完的是2个,那个当前position执向的是index是2,这时写模式的话从下标2开始进行。所以compack方法一共做了两件事1、未读取内容前移2、修改position下标。这种情况在网络通信领域很常见,适用于我们的包不全的这种情况

代码说明:

public class TestNIO4 {
  //模型:上来指0,get一次右挪一次,最后一次上最后一个的右角。
    @Test
    public void testState5() {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put(new byte[]{'a', 'b', 'c', 'd'});
        buffer.flip();
        System.out.println("buffer.get() = " + (char)buffer.get());//a
        System.out.println("buffer.get() = " + (char)buffer.get());//b
        System.out.println("buffer.capacity() = " + buffer.capacity());//10
        System.out.println("buffer.position() = " + buffer.position());//2
        System.out.println("buffer.limit() = " + buffer.limit());//4
        System.out.println("-----------------------------------------------");
        buffer.compact();
        System.out.println("buffer.capacity() = " + buffer.capacity());//10
        System.out.println("buffer.position() = " + buffer.position());//2
        System.out.println("buffer.limit() = " + buffer.limit());//10
        buffer.flip();
        System.out.println("buffer.get(0) = " + (char)buffer.get(0));
        System.out.println("buffer.get(1) = " + (char)buffer.get(1));
    }
    @Test
    public void testState1() {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        System.out.println("buffer.capacity() = " + buffer.capacity());//10
        System.out.println("buffer.position() = " + buffer.position());//0
        System.out.println("buffer.limit() = " + buffer.limit());//10
    }
    @Test
    public void testState2() {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put(new byte[]{'a', 'b', 'c', 'd'});
        System.out.println("buffer.capacity() = " + buffer.capacity());//10
        System.out.println("buffer.position() = " + buffer.position());//4
        System.out.println("buffer.limit() = " + buffer.limit());//10
    }
    @Test
    public void testState3() {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put(new byte[]{'a', 'b', 'c', 'd'});
        buffer.flip();
        System.out.println("buffer.capacity() = " + buffer.capacity());//10
        System.out.println("buffer.position() = " + buffer.position());//0
        System.out.println("buffer.limit() = " + buffer.limit());//4
    }
    @Test
    public void testState4() {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put(new byte[]{'a', 'b', 'c', 'd'});
        buffer.clear();//全部给清空了
        System.out.println("buffer.capacity() = " + buffer.capacity());//10
        System.out.println("buffer.position() = " + buffer.position());//0
        System.out.println("buffer.limit() = " + buffer.limit());//10
    }
}
最后总结一下
写入Buffer数据之前要设置写模式
1. 写模式
   1. 新创建的Buffer自动是写模式
   2. 调用了clear,compact方法
读取Buffer数据之前要设置读模式
2. 读模式
   1. 调用flip方法

2:核心API

1):Buffer中写入数据

写模式实现三种方式: 创建一个bytebuffer ,clear(),compact()

1. channel的read方法
   channel.read(buffer)
2. buffer的put方法
   buffer.put(byte)    buffer.put((byte)'a')     buffer.put(byte[])

2):从buffer中读出数据

1. channel的write方法
2. buffer的get方法 //每调用一次get方法会影响,position的位置。这也是我们用的最多的方法
3. rewind方法(手风琴),可以将postion重置成0 ,用于复读数据。
4. mark&reset方法,通过mark方法进行标记(position),通过reset方法跳回标记,从新执行.
5. get(i) 方法,获取特定position上的数据,但是不会对position的位置产生影响,并且对读写模式无感
package com.suns.nio02;
import java.nio.ByteBuffer;
public class TestNIO5 {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put(new byte[]{'a', 'b', 'c', 'd'});
        buffer.flip();
        while (buffer.hasRemaining()) {
            System.out.println("buffer.get() = " + (char) buffer.get());
        }
        //postion = 4 limit = 4
        System.out.println("-----------------------------------------");
        buffer.rewind();//position = 0
        while (buffer.hasRemaining()) {
            System.out.println("buffer.get() = " + (char) buffer.get());
        }
    }
}
public class TestNIO6 {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put(new byte[]{'a', 'b', 'c', 'd'});
        buffer.flip();
        System.out.println("buffer.get() = " + (char) buffer.get());//a
        System.out.println("buffer.get() = " + (char) buffer.get());//b
        buffer.mark();
        System.out.println("buffer.get() = " + (char) buffer.get());//c
        System.out.println("buffer.get() = " + (char) buffer.get());//d
        buffer.reset();
        System.out.println("buffer.get() = " + (char) buffer.get());//c
        System.out.println("buffer.get() = " + (char) buffer.get());//d
    }
}
public class TestNIO7 {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put(new byte[]{'a', 'b', 'c', 'd'});
        buffer.flip();
        System.out.println("buffer.get() = " + (char)buffer.get());//a position=1
        System.out.println("buffer.get(0) = " + (char)buffer.get(0));//a
        System.out.println(buffer.position());//1
    }
}

3:字符串如何操作

1):字符串存储到Buffer中

第一种方法:
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put("sunshuai".getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
  System.out.println("buffer.get() = " + (char)buffer.get());
}
buffer.clear();
上述代码没有中文,这么写可以,但是很具有局限性,中文就不好使了。
第二种方法:
ByteBuffer buffer = Charset.forName("UTF-8").encode("sunshuai");
1、encode方法自动 把字符串按照字符集编码后,存储在ByteBuffer.
2、自动把ByteBuffer设置成读模式,且不能手工调用flip方法。
flip只能写一次,如果调用两次之后,他会把上一步的position赋值给limit,这样limit就成了0
第三种方法:
ByteBuffer buffer = StandardCharsets.UTF_8.encode("sunshuai");
while (buffer.hasRemaining()) {
  System.out.println("buffer.get() = " + (char) buffer.get());
}
buffer.clear();
1、encode方法自动 把字符串按照字符集编码后,存储在ByteBuffer.
2、自动把ByteBuffer设置成读模式,且不能手工调用flip方法。
第四种方式:
ByteBuffer buffer = ByteBuffer.wrap("sunshuai".getBytes());
while (buffer.hasRemaining()) {
  System.out.println("buffer.get() = " + (char) buffer.get());
}
buffer.clear();

2):Buffer中数据转字符串

ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put("孙".getBytes());//mac可以这么写。
buffer.flip();
CharBuffer result = StandardCharsets.UTF_8.decode(buffer);
System.out.println("result.toString() = " + result.toString());

4:粘包与半包

一旦出现粘包和半包的时候,我们如何通过NIO程序去解决它,这是这一快内容的核心。

1):什么是粘包与半包

一个网络通信/文件读写的场景:

我们客户端想要给我们的服务端发送几句话:

Hi  sunShuai 
I love you
Do you like me?

在网络传递的过程中,我直接传递的话能不能代表着三句话呢?我们应该给他添加上换行符,加上特殊的标识之后,服务端才会认可这是三句话:

Hi  sunshuai\n
I love you\n
Do you like me?\n

当我们发送数据的时候,我们基于NIO来进行传输数据,这时候用到两大组件,一个是Channel另一个是Buffer,Buffer大小我们如何设置呢?1.我们设置成一个超级大的一次性存储三行数据,这样的好处是什么?这个思路从实际角度可能会有问题,就是当我们数据量很大的时候就没办法搞了,就没法定义这个Buffer了。更常见的思路是我们不奢求一次性存储所有数据,我们可以一次一次的获取,我们每次都获取一部分。这才是一个正常的思路。

假如我们在这里把我们的容量配置为20,我们需要多次来进行读取。第一次:Hi sunshuai\nI love y这就是我们所谓的粘包或者是半包,这种情况如何解决呢?

我们让/n作为每一行的分隔符,后面的我们使用compact压缩到前边去。然后进行二次读取,这样就可以读取到完成的语句。

2):粘包与半包解决代码实例

package com.suns.nio02;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class TestNIO10 {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(50);
        buffer.put("Hi sunshuai\nl love y".getBytes());
        doLineSplit(buffer);
        buffer.put("ou\nDo you like me?\n".getBytes());
        doLineSplit(buffer);
    }
    // ByteBuffer接受的数据 \n
    private static void doLineSplit(ByteBuffer buffer) {
        buffer.flip();
        for (int i = 0; i < buffer.limit(); i++) {
            if (buffer.get(i) == '\n') {//多个换行符的话,后续的ByteBuffer长度被严重估长了。
                int length = i + 1 - buffer.position();//所以,这里的长度应该这么定义。
                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    target.put(buffer.get());
                }
                //截取工作完成
                target.flip();
                System.out.println("StandardCharsets.UTF_8.decode(target).toString() = " + StandardCharsets.UTF_8.decode(target).toString());
            }
        }
        buffer.compact();
    }
}

第四章:NIO的开发使用

一:文件操作

1:读取文件内容

public class TestNIO1 {
    public static void main(String[] args) throws IOException {
        //1 创建Channel通道  FileChannel
        FileChannel channel = new FileInputStream("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data.txt").getChannel();
        //2 创建缓冲区
        //1234567890
        ByteBuffer buffer = ByteBuffer.allocate(10);
        while (true) {
            //3把通道内获取的文件数据,写入缓冲区
            int read = channel.read(buffer);
            if (read == -1) break;
            //4.程序读取buffer内容,后续的操作。设置buffer为读模式。
            buffer.flip();
            //5.循环读取缓冲区中的数据
            while (buffer.hasRemaining()) {
                byte b = buffer.get();
                System.out.println("(char)b = " + (char) b);
            }
            //Charset.forName("UTF-8").decode(buffer).toString ---> String
            //6. 设置buffer的写模式
            buffer.clear();
        }
    }
}

2:写入文件内容

public class TestNIO11 {
    public static void main(String[] args) throws IOException {
        //1 获得Channel  FileOutputStream, RandomAccessFile
        FileChannel channel = new FileOutputStream("data1").getChannel();
        //2 获得Buffer
        ByteBuffer buffer = Charset.forName("UTF-8").encode("sunshuai");
        //3write
        channel.write(buffer);
    }
 }

3:文件复制

public class TestNIO12 {
    public static void main(String[] args) throws IOException {
        //data---data2
      /*  FileInputStream inputStream = new FileInputStream("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data.txt");
        FileOutputStream fileOutputStream = new FileOutputStream("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data2.txt");
        byte[] buffer = new byte[1024];
        while (true) {
            int read = inputStream.read(buffer);
            if (read == -1) break;
            fileOutputStream.write(buffer, 0, read);
        }*/
       /* FileInputStream inputStream = new FileInputStream("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data.txt");
        FileOutputStream fileOutputStream = new FileOutputStream("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data2.txt");
        IOUtils.copy(inputStream,fileOutputStream);*/
        FileChannel from = new FileInputStream("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data.txt").getChannel();
        FileChannel to = new FileOutputStream("/Users/sunshuai/Develop/code/java/idea/netty-proj-lession/netty-basic-01/data2.txt").getChannel();
        //传输数据上线的 2G-1
        // 若果实际文件大小就是超过2G 如何进行文件的copy
        //from.transferTo(0, from.size(), to);
    //分段拷贝,只要他>0我们就一直去拷贝。
        long left = from.size();
        while (left > 0) {
            left = left - from.transferTo(from.size()-left, left, to);
        }
    }
}

使用NIO对文件进行拷贝比传统拷贝文件效率很高,因为使用NIO拷贝文件涉及到0拷贝,直接操作的是操作系统内存,效率更高。

使用NIO进行文件拷贝,最大支持2G-1个字节这么大,因为方法里边做了一个Math.min(var1,2G-1);

二:网络编程

网络编程是NIO最应该关注的部分:

网络编程过程中一定有两个特别关注的角色:客户端和服务端。在我们JavaWeb领域,我们使用的客户端指的都是浏览器、App、App中的浏览器,本质上都是浏览器,对于服务端来讲,我们的服务端就是指的Tomcat服务器,对于我们web领域,我们都会抽象成上边两个概念,客户端和服务端。客户端和服务端进行通讯都会使用这个Socket,基于Socket建立连接,包括进行三次握手等等就可以完成整个过程。

当这套东西映射到我们的NIO的这套解决方案的时候,应该如何编码呢?

1. 服务端 接受请求       ServerScoketChannel
2. 进行实际通信          ScoketChannel

1:基于NIO两种阻塞

通过这版代码 证明了 服务器端 存在2中阻塞
1. 连接阻塞 ---->  accept方法存在阻塞---> ServerSocketChannel阻塞。 
2. IO阻塞   ----> channel的read方法存在阻塞---> SocketChannel阻塞。
上述分析 对应着的2个问题。
public class MyServer {
    public static void main(String[] args) throws IOException {
        //1. 创建ServerScoketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //2. 设置服务端的监听端口:---》client通过网络进行访问 ip:port http://localhost:8989
        serverSocketChannel.bind(new InetSocketAddress(8000));
        List<SocketChannel> channelList = new ArrayList<>();
        ByteBuffer buffer = ByteBuffer.allocate(20);
        //3. 接受client的连接
        while (true) {
            //4. ScoketChannle 代表 服务端与Client链接的一个通道
            System.out.println("等待连接服务器...");
            SocketChannel socketChannel = serverSocketChannel.accept();//阻塞 程序等待client(进行连接)
            System.out.println("服务器已经连接..."+socketChannel);
            channelList.add(socketChannel);
            //5. client与服务端 通信过程 NIO代码
            for (SocketChannel channel : channelList) {
                System.out.println("开始实际的数据通信....");
                channel.read(buffer);//阻塞 对应的IO通信的阻塞(等客户端传过来数据才能读)
                buffer.flip();
                CharBuffer decode = Charset.forName("UTF-8").decode(buffer);
                System.out.println("decode.toString() = " + decode.toString());
                buffer.clear();
                System.out.println("通信已经结束....");
            }
        }
    }
}
package com.suns.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
public class MyClient {
    public static void main(String[] args) throws IOException {
        //连接服务端  端口号?
        SocketChannel socketChannel =  SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8000));
        System.out.println("--------------------------------------");
    }
}

端口号:Http协议走80端口和tcp协议走80端口,会不会发生冲突?不同协议层析同一个端口号不会发生冲突,比如:http://1521和jdbc://1521不是一个层次,不会发生冲突。端口号和协议是挂钩的。

建立连接之后的反应:

通过debug中的expression执行代码,看到的服务器端的反应。

两种阻塞:

1.等待客户端建立连接的阻塞
2.等待连接进行IO通信的阻塞

如何使用idea启动多个main函数实例?Edit当中allow multi instance.,Idea当中启动服务端和客户端。客户端进行debug,将debug窗口float最好了。

2:阻塞和不阻塞的现象展示

1):阻塞

2):非阻塞

3:如何解决两种阻塞

1):代码示例

package com.suns.socket;
public class MyServer1 {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置ServerSocketChannel 非阻塞
        //这一行代码不仅仅可以使得等待客户端连接变得非阻塞,还可以使得等待客户端发送请请求变得不阻塞。
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8081));
        List<SocketChannel> channelList = new ArrayList<>();
        ByteBuffer buffer = ByteBuffer.allocate(20);
        while (true) {
            //在没有客户端连接的情况下,返回的是null
            SocketChannel socketChannel = serverSocketChannel.accept();//阻塞 已经解决了 不阻塞了
            if (socketChannel != null) {
                socketChannel.configureBlocking(false);
                channelList.add(socketChannel);
            }
            //5. client与服务端 通信过程 NIO代码
            for (SocketChannel channel : channelList) {
                int read = channel.read(buffer);//阻塞 对应的IO通信的阻塞
                if (read > 0) {
                    System.out.println("开始实际的数据通信....");
                    buffer.flip();
                    CharBuffer decode = Charset.forName("UTF-8").decode(buffer);
                    System.out.println("decode.toString() = " + decode.toString());
                    buffer.clear();
                    System.out.println("通信已经结束....");
                }
            }
        }
    }
}

我们的阻塞是以两个对象为代表的,通过代码解决这个对象阻塞,就可以解决这两个阻塞问题。解决了之后,就可以实现客户端并向服务端发送请求了。

2):解决手段

1.等待客户端建立连接的阻塞
serverSocketChannel.configureBlocking(false);
2.等待连接进行IO通信的阻塞
socketChannel.configureBlocking(false);

解决完之后的效果:

3):残留的问题

我们当前这段代码使用while true 这种死循环的方式去处理,有没有客户端发过来的请求,都会机型死循环
这样就会导致我们的cpu飙高。内存也会收到影响,虽然解决了阻塞的问题,但是后续的隐患也是很大的,
所以我们更希望更细粒度的解决问题,也就是别死循环空转着,而是有一个组件来帮我监控着,是不是真的
有链接进来,以及后续网络通信过程中是不是有网络通信过来,是不是真正的要发生读写。我们现在需要一
个管理者。

4:引入Selector

1):Selector核心作用

监控连接和网络IO(网络通信,或者叫网络读取)这两个动作,也就是监控ServerSocketChannel和
SocketChannel这两个对象

2):Selector监管机制

ServerSocketChannel和SocketChannel这两个对象特定的情境和状态下才会被监管,状态如下:
1. ACCEPT -- 建立连接 -- ServerSocketChannel
2. READ -- 网络通信-- SocketChannel
3. WHITE -- 网络通信-- SocketChannel
4. CONNECT -- 不用监管-- 一般用于客户端。
package com.suns.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class MyServler2 {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8000));
        serverSocketChannel.configureBlocking(false);//Selector 只有在非阻塞的情况下 才可以使用。
        //引入监管者
        //NIO当中的几个关键对象都是通过open方法创建的。一般工厂模式或者单例模式会这么写。
        Selector selector = Selector.open();//1. 工厂,2. 单例
        //监管者 管理谁? selector.xxxx(ssc); //管理者 ssc  ---> Accept
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);//serverSocketChannel交给selector管理,向它注册。
        // selector监控 serverSocketChannel ACCEPT
        // selector 当中有一个属性keys,他的类型是HashSet
        // 所有的selector监管的 serverSocketChannel和SocketChannel都会存储到keys下的他的类型是HashSet中。
        // register注册 ssc
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);//在哪个状态下才会被管理:建立连接。
        System.out.println("MyServler2.main");
        //监控
        while (true) {
            selector.select();//等待.只有监控到了 有实际的连接 或者 读写操作 ,才会处理。这里本质就是一种阻塞。
            //对应的 有ACCEPT状态的SSC 和 READ WRITE状态的 SC 存起来
            //存到另一个属性当中:SelectionsKeys HashSet
            //SSC和SC一旦被注册之后,都会被放到keys当中,但是只有selector.select()执行之后,对应的状态满足的SSC和SC才会被放到SelectionsKeys当中。
            System.out.println("-------------------------");
            //我们要用迭代器,而不是foreach,因为我们要在迭代器当中对某些key执行删除操作。
            //我们虽然拿到的是selectionkey但是实际上是我们的ServerSocketChannel和SocketChannel对象。
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                /*ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel accept = channel.accept();
                System.out.println(accept);*/
                if (key.isAcceptable()) {
                    // SSC
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel accept = channel.accept();
                    System.out.println(accept);
                } else if (key.isReadable()) {
                    // SC --- buffer --- read
                }
            }
        }
    }
}

3):运行过程关键展示

A:关键运行状态分析

selectedKeys用完之后一定要删除,因为有的ssc这种被遍历的时候,如果没有新的连接请求进来,ssc.accept()非阻塞时获取的sc是null,会发生NPE,删了之后不用担心,外层大循环走的时候,有新的链接过来,又会加入到selectedKeys当中。

B:服务端的写操作

演示服务器的写操作:

package com.suns.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
/*
   这个代码 演示服务器 写操作
 */
public class MyServer5 {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8001));
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();//selector注册 集合 SocketChannel
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey sscKey = iterator.next();
                iterator.remove();
                if (sscKey.isAcceptable()) {
                    SocketChannel sc = serverSocketChannel.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
                    //准备数据
                    StringBuffer sb = new StringBuffer();
                    for (int i = 0; i < 2000000; i++) {
                        sb.append("s");
                    }
                    //NIO  Buffer存储数据 channel写
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    while(buffer.hasRemaining()){
                        int write = sc.write(buffer);
                        System.out.println("write = " + write);
                    }
                } else if (sscKey.isWritable()) {
                    //循环含义的
                    //channel
                    SocketChannel sc = (SocketChannel) sscKey.channel();
                    //buffer
                    ByteBuffer buffer = (ByteBuffer) sscKey.attachment();
                    //写操作
                    int write = sc.write(buffer);
                    System.out.println("write = " + write);
                    if (!buffer.hasRemaining()) {
                        sscKey.attach(null);
                        sscKey.interestOps(sscKey.interestOps() - SelectionKey.OP_WRITE);
                    }
                }
            }
        }
    }
}

客户端日志:

C:\Program Files\Java\jdk1.8.0_241\bin\java.exe
read = 131071
read = 262142
read = 393213
read = 524284
read = 655355
read = 786426
read = 917497
read = 1048568
read = 1179639
read = 1310710
read = 1441781
read = 1572852
read = 1703923
read = 1834994
read = 1966065
read = 2000000

服务端日志:

C:\Program Files\Java\jdk1.8.0_241\bin\java.exe
write = 2000000

或者展示出来的是这种情况:

write = 42877
write = 0
write = 0
write = 0
write = 0
write = 123123
write = 0
write = 0
write = 0
write = 0
write = 0
write = 30005
这点数据是模拟的,他们加起来应该等于200W

说明:

整个的过程说明:

发送端:

当我们代用sc.write()方法的时候,实际上这里边做了两件事

1:jvm调用操作系统Api将jvm内存中的Buffer对象中的数据写入到操作系统内核的内存中。

2:jvm请求socket开始基于TCP协议将数据进行分包开始将数据通过网络进行传输

接收端:

3:Socket基于TCP协议开发读取数据读入到操作系统内存中。

4:sc.read()执行将数据基于Channel将操作操作系统中数据读取到Jvm的Buffer内存中。

思考:

1:以上四个过程应该不是顺序执行,应该是交替执行,换句话说,同一时间,同一个网络通信中,四个步骤有可能同时进行(数据量较大的时候)

2:为什么sc.write()控制台执行的结果有时候是一次性写完,有时候分批写?

这个过程是直接将Buffer中数据写入到操作系统内存中,有时候多个线程下同时操作操作系统内存,导致内存被占用,进而出现数据一次性写不完的情况,也就是在控制台出现多次write记录的情况。

3:所以严格意义上来讲,sc.write()的时候出现写出数据=0的情况,是因为写操作系统内存的时候“接收不到”,而不是担心服务端的接收不到。但是服务端接收不到写0的时候,是传输层层面上的现象和事情,在控制台打印的结果虽然现象一直,单并不是一回事

站在我们服务端的角度,这种流量控制有没有什么问题?

站在我们服务端的角度,我们的服务端就是一个线程,我们发空数据这种操作站在服务端的角度他是一个做事但是没有任何意思,尤其是操作操作系统的Api,这个线程被无端占用了而是没有任何实际意义。那么多线程下(咱们的代码是单线程的)处理其他的客户端操作了。如何处理呢?

分析过程:

监控状态:Writeable:允许写在写,有实际意义内容的时候在写。

package com.suns.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
/*
   这个代码 演示服务器 写操作
 */
public class MyServer5 {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8000));
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();//selector注册 集合 SocketChannel
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey sscKey = iterator.next();
                iterator.remove();
                if (sscKey.isAcceptable()) {
                    SocketChannel sc = serverSocketChannel.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
                    //准备数据
                    StringBuffer sb = new StringBuffer();
                    for (int i = 0; i < 2000000; i++) {
                        sb.append("s");
                    }
                    //NIO  Buffer存储数据 channel写
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int write = sc.write(buffer);
                    System.out.println("write = " + write);
                    if (buffer.hasRemaining()) {
                        //为当前的SoketChannel增加 Write的监听
                        //READ + Write
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                        //把剩余数据存储的buffer传递过去
                        sckey.attach(buffer);
                    }
                } else if (sscKey.isWritable()) {
                    //循环含义的
                    //channel
                    SocketChannel sc = (SocketChannel) sscKey.channel();
                    //buffer
                    ByteBuffer buffer = (ByteBuffer) sscKey.attachment();
                    //写操作
                    int write = sc.write(buffer);
                    System.out.println("write = " + write);
                    if (!buffer.hasRemaining()) {
                        sscKey.attach(null);
                        sscKey.interestOps(sscKey.interestOps() - SelectionKey.OP_WRITE);
                    }
                }
            }
        }
    }
}
package com.suns.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
// client接受服务端 传递过来的数据
public class MyClient1 {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8001));
        int read = 0;
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            read += socketChannel.read(buffer);
            System.out.println("read = " + read);
            buffer.clear();
        }
    }
}

select 多路复用,排除一切不必要的操作,尽可能的使用线程的资源。

在Linux操作系统中,如果当我们的服务并发上来了,但是CPU和内存没有明显的变化(并发上来之后,CPU要处理更多的请求,内存要分配更多的内存空间)这说明我们的socket开的不够,没开那么多接入端。

第五章:Reactor模式

一:单线程Reactor模式

netty当中都是使用reator模式的。上图中,左边是客户端,右边是服务器端。当前的服务器端是单线程完成的。

这是当前我们讲解的NIO开发,实际上就是Reactor的单线程版。

多路复用器值得就是我们的Selector,主要的作用就是分发的操作,一部分是连接器,就是ssc.accept()获取到连接之后,通过多路复用组件记性分发,去进行执行。

解码操作:二进制数据编写成字符串

处理:执行业务,返回业务数据

编码操作:字符串编写成二进制数据

写数据:将数据write()给客户端

我们上边所写的就是Reactor的单线程版,这个其实是一个非常简单的东西,这个在我们的Netty当中既有单线程又有多线程版。

单线程版的问题:

多路复用器Selector:既要负责链接又要负责读写操作,这个效率是很低的。这个时候就需要我们的主从式的reactor

主从架构:Master+Slave

MySQL和Redis当中的读写分离概念,读写分离从架构角度来讲是主从式架构,主从式架构的特点是主也干活,从也干活,只不过干的活是不一样的,主:写,次要:从,如果从坏了,对架构影响比较小,如果从挂了,影响不是很大

主备架构:Master+backup

包含双主从架构,都是主然后带一些小弟,但是对外服务的时候,只有一个主从对外提供服务,另外一个是备用,当第一个一个挂了的时候,备用的在启用,Redis当中有没有主备使架构,Redis主备使架构是如何实现的?哨兵机制。哨兵一定不是主从,一定是主备。

主备机构一个主挂了之后,备用的主如何把第一个主的数据带过来?

这里边有几个方案可以用

1:进程之间独立线程进行同步数据,数据不是实施的,极有可能丢失数据。MySQL用的就是这个手段

但是Java体系当中这种东西不是很成熟。

2:Java体系中我们会用ZK,服务将数据挂在ZK节点上,ZK有一个存储数据的作用,ZK还能监控到各个节点,如果

如果挂了之后,还能将这个数据的源数据从新打到备用实例上,实现高可用。

那么ZK是如何实现高可用的呢?ZK本身节点是一个集群,可以进行选举机制。主实例挂了之后,会先选举出来新的主实例在对外提供服务,遵循严格的CP原则,保证数据的一致性。

主备式架构中,另一个问题,当我们某一个主当了之后,切换到另外一个主上边,但是服务端的IP怎么办?这个时候,我们需要:

老使用方式:(客户端无感)

1:VIP 虚拟IP:经过虚拟IP转换成真实的IP,LVS可以做这个,keepavild这也可以做虚拟IP

2:是HAproxy IP飘逸技术

新的处理方式:

1:微服务的处理方式。

2:加一个网关

反向代理:代理的是服务端,接入到反向代理之后,我们可以做很多东西,可以做负载均衡,动静分离,url重写

正向代理:

主从多线程Reactor:将单线程中的链接和读写分离开来。主从架构一定是主从或者多从。主在Netty当中被称为Boss,他不负责Netty的读写操作,交给一个Worker的线程来进行读写操作。Worker这个一定是有多个线程的,Worker是在Boss接入了链接之后,当真正发生读写的时候,来执行读写。多个worker执行读写操作(具体多少个需要考虑CPU的核数),这就是主从版的多线程,直观来看,从一个线程变成多个线程,这样性能就变得高多了。

每个线程都会有一个独立的selector,因为他们要做独立的事。但是他们之间要进行联系。

二:主从Reactor模式

Boss是一个独立的线程,用于接收用户的请求,并将后续的读写操作交给Worker线程来做。一些老鸟,将这个Boss这个分发的线程叫做dispatcher在我们Boss当中创建客户端链接之后,下一次客户端进行操作就会在Worker线程当中进行,Worker线程可以有多个,站在我们Worker的角度来讲,我们的Worker肯定用的是线程池,里边负责监听读写状态,当状态达到了,再去进行读写,所以他其中肯定也会有Selector

实现一个主从的Reactor

在这里插入代码片

Run方法中的异常只能进行捕获,不能进行抛出。

多线程状态下我们成员变量最好加一个volitile,编译之后,这个变量会加一个lock的前缀,可以保证所有线程对这个变量操作的时候,具有一个可见性的问题,当一个线程修改了之后,对于另外一个线程可以立即看到,不能保证原子性操作。取代不了Sychronized关键字。他本身没有同步的概念,他保证不了原子性。

我们的Selector方法一旦阻塞就能在进行注册方法了。这是上边这个代码的死了。的原因。

多线程环境下打断点,我们的断点,一定要设置好,断点挂起设置成按照线程挂起,如果我们不设置,我们线程默认挂起的All也就是一个挂起,会全部挂起,

核心问题就是Seleor.select()先与注册方法执行。另外一个关键点就是只要是Selector.select()方法执行了一次,那么这个注册的方法肯定是不能执行了,所以这个唤醒的方法理论上也不好使,因为存在这种情况,我们刚刚唤醒还未注册,当我们的run方法当中的代码又执行了一遍selctor.select()就又阻塞了。注册和select()在两个线程当中,不能保证先后顺序。只能放在一个线程当中。

selector.wakeup()都能让select.select()阻塞唤醒一次,不论是在他前面还是后面执行。所谓的唤醒,就是没有注册的也要去跳过这次阻塞,走下边的代码。

package com.suns.reactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
/*
    SLF4 门面
    实现  log42
         logback
 */
public class ReactorBossServer {
    private static final Logger log = LoggerFactory.getLogger(ReactorBossServer.class);
    public static void main(String[] args) throws IOException, InterruptedException {
        log.debug("boss thread start ....");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8000));
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        //模拟多线程的环境,在实际开发中,还是要使用线程池
        /*
        Worker worker = new Worker("worker1");
        */
        Worker[] workers = new Worker[2];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker - " + i);//worker-0 worker-1
        }
        AtomicInteger index = new AtomicInteger();
        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey sscSelectionKey = iterator.next();
                iterator.remove();
                if (sscSelectionKey.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    //sc.register(selector, SelectionKey.OP_READ);
                    log.debug("boss invoke worker register ...");
                    //worker-0 worker-1 worker-0 worker-1
                    //hash取摸    x%2= 0  1 [0,1,0,1]
                    workers[index.getAndIncrement()% workers.length].register(sc);
                    log.debug("boss invoked worker register");
                }
            }
        }
    }
}
package com.suns.reactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Worker implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private Selector selector;
    private Thread thread;
    private String name;
    private volatile boolean isCreated;//false
    private ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<>();
    //构造方法
    //为什么不好?
    //Select Thread
    public Worker(String name) throws IOException {
        this.name = name;
       /* thread = new Thread(this, name);
        thread.start();
        selector = Selector.open();*/
    }
    //线程的任务
    public void register(SocketChannel sc) throws IOException, InterruptedException {
        log.debug("worker register invoke....");
        if (!isCreated) {
            thread = new Thread(this, name);
            thread.start();
            selector = Selector.open();
            isCreated = true;
        }
        //通过队列实现线程间代码的传递
        runnables.add(() -> {
            try {
                sc.register(selector, SelectionKey.OP_READ);//reigster  select方法之前运行 。。
            } catch (ClosedChannelException e) {
                throw new RuntimeException(e);
            }
        });
        selector.wakeup();//select
    }
    @Override
    public void run() {
        while (true) {
            log.debug("worker run method invoke....");
            try {
                selector.select();
                Runnable poll = runnables.poll();
                if (poll != null) {
                    poll.run();
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey sckey = iterator.next();
                    iterator.remove();
                    if (sckey.isReadable()) {
                        SocketChannel sc = (SocketChannel) sckey.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(30);
                        sc.read(buffer);
                        buffer.flip();
                        String result = Charset.defaultCharset().decode(buffer).toString();
                        System.out.println("result = " + result);
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
package com.suns.reactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class MyClient {
    public static void main(String[] args) throws IOException {
        //连接服务端  端口号?
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8000));
        //        socketChannel.write(Charset.defaultCharset().encode("hello\nsuns\n"));
        socketChannel.write(Charset.defaultCharset().encode("hellosuns\n"));
        System.out.println("----------------------------------------------");
    }
}

多线程进行调试:

第十一讲

一定要进行回顾。再牛逼的老师也培养不了你。

Reactor主从版架构回顾:

private ConcurrentLinkedQueue runnables = new ConcurrentLinkedQueue<>();

可以完成一些线程之间功能的传递功能的传递。

Run快捷键:ctrl + shift+ F10

我们打了很多断点,如何取消 ctrl + shirt +F8 搞到所有的断点,然后取消掉没有用的。

上一次课我们初步完成了Reactor的主从结构,但是我们现在需要完成多个worker的实现。

上一版的多线程体现在我们的这个Boss和Worker分离了,但是目前Woker之有一个。

实现worker的多线程:

worker来自于我们的线程池。最低多少个,最多多少个

最低,与cpu的个数进行匹配,一个cpu对应一个线程,这样设计的目的是不需要多线程竞争我么的CPU资源,可以并行执行。Runtime.process.getcore应该是核数-1给Boss留一个。

Netty当中就是基于线程池就操作Woker的,

woker[0] 数组,然后如何分配,基于Hash取模,对2进行取模,定义一个计数器,对计数器取模即可。

计数器我们使用AtomicInter。

模拟多线程版的Rector

package com.suns.reactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
/*
    SLF4 门面
    实现  log42
         logback
 */
public class ReactorBossServer {
    private static final Logger log = LoggerFactory.getLogger(ReactorBossServer.class);
    public static void main(String[] args) throws IOException, InterruptedException {
        log.debug("boss thread start ....");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8000));
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        //模拟多线程的环境,在实际开发中,还是要使用线程池
        /*
        Worker worker = new Worker("worker1");
        */
        Worker[] workers = new Worker[2];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker - " + i);//worker-0 worker-1
        }
        AtomicInteger index = new AtomicInteger();
        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey sscSelectionKey = iterator.next();
                iterator.remove();
                if (sscSelectionKey.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    //sc.register(selector, SelectionKey.OP_READ);
                    log.debug("boss invoke worker register ...");
                    //worker-0 worker-1 worker-0 worker-1
                    //hash取摸    x%2= 0  1 [0,1,0,1]
                    workers[index.getAndIncrement()% workers.length].register(sc);
                    log.debug("boss invoked worker register");
                }
            }
        }
    }
}
package com.suns.reactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Worker implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private Selector selector;
    private Thread thread;
    private String name;
    private volatile boolean isCreated;//false
    private ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<>();
    //构造方法
    //为什么不好?
    //Select Thread
    public Worker(String name) throws IOException {
        this.name = name;
       /* thread = new Thread(this, name);
        thread.start();
        selector = Selector.open();*/
    }
    //线程的任务
    public void register(SocketChannel sc) throws IOException, InterruptedException {
        log.debug("worker register invoke....");
        if (!isCreated) {
            thread = new Thread(this, name);
            thread.start();
            selector = Selector.open();
            isCreated = true;
        }
        runnables.add(() -> {
            try {
                sc.register(selector, SelectionKey.OP_READ);//reigster  select方法之前运行 。。
            } catch (ClosedChannelException e) {
                throw new RuntimeException(e);
            }
        });
        selector.wakeup();//select
    }
    @Override
    public void run() {
        while (true) {
            log.debug("worker run method invoke....");
            try {
                selector.select();
                Runnable poll = runnables.poll();
                if (poll != null) {
                    poll.run();
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey sckey = iterator.next();
                    iterator.remove();
                    if (sckey.isReadable()) {
                        SocketChannel sc = (SocketChannel) sckey.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(30);
                        sc.read(buffer);
                        buffer.flip();
                        String result = Charset.defaultCharset().decode(buffer).toString();
                        System.out.println("result = " + result);
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

第六章:零拷贝

一:零拷贝

1:什么是零拷贝?【概念,体会到含义即可】

JAVA进行IO操作的一个基本的模型。

我们通过Java进行IO 编码的时候非常的简单,但是当我们处理那么加单的IO Apid时候,底层是怎么进行流转的?

IO的操作本质都是对于硬件的操作,或者读写文件(硬盘)0,或者网络读写。(网卡处理)我们的jAVA来讲,我们进行一个文件操作,我们网络操作,硬件是由操作系统来进行直接处理的。操作帮我们直接操作的驱动。驱动帮我们去处理硬件。没有驱动,操作系统也无能为力。程序所需要的各种资源,Java就是个真个通信过程当中的一个使用者。

程序运行在内存中,我们的应用程序所占用的内存属于一个用户地址空间。包括我们浏览器,画图器,都是在用户地址空间,操作系统的内核地址,被称为内核地址空间–为应用程序提功能服务的地址空间+他自己运行需要的空间。

每一个用户都有自己的独立的用户地址空间,互相独立互相不受影响。用户地址空间和内核地址空间是孤立的,但是可以通信的。

这些都是进程级别的。Java创建线程一定不会独立开辟内存,线程享用的是进程内存。

虚拟机是一个进程。两种内存是独立的,但是两种内存可以进行相互访问。

为什么说IO是一个很贵的资源,因为自身的JAVA代码自己玩不了,都要借用操作系统资源,都需要关闭资源,不然,占着这个资源,别的进程或者线程就用不了了,尽量减少交互次数,交互次数越多,性能越差。

Java不能直接操作文件,必须借助于操作系统API(操作系统C语言的API)去读写文件。

操作系统中的C语言的API去调用驱动,去读取文件。这个驱动是对于硬件的驱动。我们的QQ的时候不需要这个毛驱动,这是操作系统和硬件之间的事。是这样的一个顺序。

高速页缓存当中,也叫做内核缓存(这个就比较粗糙了。)

读的时候,是两次的传递,也就是两次拷贝。第一次操作系统读取硬盘中数据读取到高速页缓存中,第二次是JAVA将高速页缓存中的数据读取到虚拟机当中的Buffer当中,这是两次拷贝。

我们应用程序写东西都不是直接写,都是写入到操作系统内核缓存中。这也是两次

这里的操作系统都是这一个尿性。

redis持久化的过程当中也是这么一个情况。

上边我们如何进行优化,

通过 NIO的缓冲区实现。读的时候,数据进入操作系统高速页缓存区,不在网应用缓存当中去拷贝,而是通过内存映射,将地址直接交给缓存当中进行保存了一下。两者共享,做了一个共享。想做这个内存映射的时候,一定要使用NIO的API,用右边的API即可。

所以,这块本质上还是,高速页缓存,到socket缓存的直接拷贝是吧?

只不过这里边有jvm的参与,所以这块浪费了点效率

使用直接内存,我们JVM虚拟机的GC管不到这块了,必须手动析构。

这样这个过程已经减少了一次。

这个时候已经比上边的图进步了一点点,这个时候的2.1的时候已经从,操作系统的Api直接从一个内存到另外的一个内存,完全绕过了。我们的JVM虚拟机。

我们Java当中基于transform to这样的方直接通知操作系统执行os内存之间的拷贝。已无JVM虚拟机的参与,所以,这性能比较高。

这里的零拷贝,指的是已经不在往虚拟机内存中写数据了。这个时候效率已经大幅度提高了

当内核升级到2.4之后的,效率再次大幅度提升,我们调用transformto的时候,高速页缓存已经直接能写网卡了,再次少了一次数据拷贝了。

socket缓存也是一种高速页缓存。这个也是0拷贝。领拷贝无虚拟机直接参与。我们尽量使用transform这样的方法,享受零拷贝。

当前我们的版本都已经高于2.4了。

相关文章
|
8月前
|
监控 Java Linux
由浅入深Netty基础知识NIO网络编程1
由浅入深Netty基础知识NIO网络编程
40 0
|
8月前
|
缓存 安全 Java
由浅入深Netty基础知识NIO三大组件原理实战 2
由浅入深Netty基础知识NIO三大组件原理实战
48 0
|
3月前
|
移动开发 编解码 网络协议
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
|
3月前
|
编解码 网络协议 Java
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
|
8月前
|
存储 Java Docker
由浅入深Netty基础知识NIO网络编程 2
由浅入深Netty基础知识NIO网络编程
46 0
|
4月前
|
设计模式 网络协议 Java
Java NIO 网络编程 | Netty前期知识(二)
Java NIO 网络编程 | Netty前期知识(二)
77 0
|
5月前
|
编解码 网络协议
Netty基础篇:NIO中缓冲区设置太小
Netty基础篇:NIO中缓冲区设置太小
|
7月前
|
缓存 网络协议 前端开发
从BIO到NIO在到Netty线程模型详解
从BIO到NIO在到Netty线程模型详解
137 1
|
7月前
|
Java 容器
【深入研究NIO与Netty线程模型的源码】
【深入研究NIO与Netty线程模型的源码】
|
2月前
|
存储 Java 数据处理