一文搞定Netty,打造单机百万连接测试!1

简介: 一文搞定Netty,打造单机百万连接测试!

1.Netty框架简介

1.1.Netty简介

netty是jboss提供的一个java开源框架,netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可用性的网络服务器和客户端程序。也就是说netty是一个基于nio的编程框架,使用netty可以快速的开发出一个网络应用。

由于java 自带的nio api使用起来非常复杂,并且还可能出现 Epoll Bug,这使得我们使用原生的nio来进行网络编程存在很大的难度且非常耗时。但是netty良好的设计可以使开发人员快速高效的进行网络应用开发。

1.2.Netty主要特性

  • 统一的API接口,支持多种传输类型,例如OIO,NIO
  • 简单而强大的线程模型
  • 丰富的文档
  • 卓越的性能
  • 拥有比原生Java API 更高的性能与更低的延迟
  • 基于池化和复用技术,使资源消耗更低
  • 安全性
  • 完整的SSL/TLS以及StartTLS支持
  • 可用于受限环境,如Applet以及OSGI

1.3.Netty和Tomcat的区别

Netty和Tomcat最大的区别就在于通信协议,Tomcat是基于Http协议的,他的实质是一个基于http协议的web容器,但是Netty不一样,他能通过编程自定义各种协议,因为netty能够通过codec自己来编码/解码字节流,完成类似redis访问的功能,这就是netty和tomcat最大的不同。

有人说netty的性能就一定比tomcat性能高,其实不然,tomcat从6.x开始就支持了nio模式,并且后续还有APR模式——一种通过jni调用apache网络库的模式,相比于旧的bio模式,并发性能得到了很大提高,特别是APR模式,而netty是否比tomcat性能更高,则要取决于netty程序作者的技术实力了。

1.4.BIO编写Client-Server通信

1、BIOServer服务端

public class BioServer {
    private static final int PORT = 8080;
    public static void main(String[] args) throws IOException {
        //新建socketServer
        ServerSocket serverSocket = null;
        try{
            //绑定对应端口
            serverSocket = new ServerSocket(PORT);
            System.out.println("the time server is start in port :"+PORT);
            Socket socket = null;
            while(true){
                //拿到请求进来的socket
                socket = serverSocket.accept();
                //线程请求
                new Thread(new TimeServerHandler(socket)).start();
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (serverSocket != null){
                System.out.println("the time server close");
                serverSocket.close();
            }
        }
    }
}

2、TimeServerHandler统一时间服务

public class TimeServerHandler implements Runnable{
    private Socket socket;
    public TimeServerHandler(Socket socket) {
        this.socket = socket;
    }
    public TimeServerHandler() {
    }
    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            //为true时autoFlush自动刷新,无需在调用flush方法
            out = new PrintWriter(this.socket.getOutputStream(),true);
            String body  = null;
      //循环监听客户端发送的msg
            while ((body = in.readLine())!=null && body.length()!=0){
                System.out.println("this time server receive msg :"+body);
                out.println(new Date().toString());
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(in != null){
                try {
                    in.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            if(out != null){
                try {
                    out.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            if(this.socket != null){
                try {
                    this.socket.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

3、BIOcClient客户端

public class BioClient {
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 8080;
    public static void main(String[] args) {
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try{
            //创建连接
            socket = new Socket(HOST,PORT);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            out.println("I am client");
            String resp = in.readLine();
            System.out.println("当前服务器时间是:"+resp);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(in != null){
                try {
                    in.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            if(out != null){
                try {
                    out.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            if(socket != null){
                try {
                    socket.close();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

7e680de65411457d809651b454c9ce38.jpga5bed9c95b1a4c75af6094fecdb6b2bf.jpg

BIO的优点就是模型简单,编码简单,缺点是性能瓶颈,请求数和线程数保持一致,当有N个请求发送过来,服务端需要开启N个线程去处理,高并发场景下CPU线程切换上下文损耗大。

2.常见的网络IO模型

2.1.用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件的所有权限。

为例保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分成两部分,一部分为内核空间,一部分为用户空间。只能对Linux操作系统而言,将较高的1G字节供内核使用,称为内核空间,而将较低的3G字节供各个进程使用称为用户空间。

2.2.文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表达指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,他是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

2.3.TCP发送数据的流程



38a84ef56c6a422c87d2e005e1d945da.jpg

  • 第一步:应用A把消息发送到TCP发送缓冲区。
  • 第二步:TCP发送缓冲区把消息发送出去,经过网络传输,消息发送到B服务器的TCP接收缓冲区。
  • 第三步:B再从TCP接收缓冲区中去读取属于自己的数据。

2.4.阻塞/非阻塞,同步/异步

**(1)同步阻塞:**发送方发送请求之后一直等待响应。接收方处理请求时进行的IO操作如果不能马上等到返回结果,就会一致等到返回

结果后,才响应发送方,期间不能进行其他工作。



1d2da18266ee49e68d85ec53ebf43205.jpg

**(2)同步非阻塞:**发送方发送请求后,一致等待响应。接受方处理请求时进行的IO操作如果不能马上得到结果,就立即返回,去做其他事情但是由于没有得到请求处理结果,不响应发送方,发送方一致等待。当IO操作完成后,将完成状态和结果通知接收方,接收方在响应发送方,发送方才进入下一次请求过程。


6e782c4f3c054a77a7de19de8be7683d.jpg

**(3)异步阻塞:**发送方向接收方请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能马上得到记过,就会一直等到返回结果后,才响应发送方,期间不能进行其他操作。

**(4)异步非阻塞:**发送方向接收方请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能马上得到结果,也不等待,而是马上返回去做其他的事情。当IO操作完成后,将完成的状态和结果通知接收方,接收方再响应发送方。


0f7ee951d5e941ac94f556edb25094c1.jpg

2.5.Linux中五种I/O模型

  • IO的操作也就是应用程序从TCP缓冲区中读取数据的时候。
  • 网络I/O的本质是socket的读取,socket在linux中被抽象为流,I/O可以理解为对流的操作。对于一次I/O访问,数据会先被拷贝到操作系统的内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说当一个read操作发生时,他会经历两个阶段:
第一阶段:等待数据准备(Waiting for the data to be ready)
第二阶段:将数据从内核拷贝到进程中(Copy the data from the kernel to the process)

对于socket流而言:

第一步:通常涉及等待网络上的数据分组到达,然后被复制到内核的某个缓冲区
第二步:把数据从内核缓冲区复制到进程缓冲区

1、阻塞IO

(1)什么是阻塞I/O

阻塞IO就是当应用程序向TCP缓冲区发起读取数据申请时,在内核数据没有准备好之前,应用程序会一致处于等待数据的状态,直到内核把数据准备好交给应用程序才结束。

**术语描述:**在应用程序调用recvfrom读取数据时,其系统调用直到数据包到达别并且被复制到应用缓冲区中或者发生错误时才返回,此期间一致处于等待,进程从调用直到返回这段时间被阻塞的成为阻塞IO。

(2)阻塞I/O流程


b911c12ea9e9448c8296d0f9a0013e78.jpg

  • 第一步:应用程序向内核发起recvfrom读取数据
  • 第二步:准备数据报(应用进程阻塞)
  • 第三步:将数据从内核复制到应用空间
  • 第四步:复制完成后,返回成功提示
  • 2、非阻塞I/O

(1)什么是非阻塞I/O

非阻塞I/O就是当应用程序发起读取数据时,如果内核没有准备好数据报,会返回给应用程序,不会让应用程序一致等待,但是应用程序要时不时去尝试调用,当数据包准备好时,将数据从内核复制到用户空间,这个过程也是同步的,阻塞的。

**术语描述:**非阻塞I/O是在应用调用recvfrom读取数据时,如果缓冲区中没有数据的话,就会直接返回一个EWOULDBLOCK错误,不会让应用一致等待。在没有数据时会即刻返回错误标识,那也意味着如果应用要读取数据就需要不断的调用recvfrom请求,直到读取到它要的数据为止。

(2)非阻塞I/O流程

14d145bd890e44d6a02692f6b45d5f76.jpg

  • 第一步:应用进程向内核发起recvfrom读取数据
  • 第二步:没有数据报准备好,即刻返回EWOULDBLOCK错误码
  • 第三步:应用程序向内核再次发起recvfrom读取数据
  • 第四步:已有数据报就从内核拷贝到用户空间,否则还是返回错误码

3、I/O多路复用

(1)什么时I/O多路复用

I/O多路复用的思路就是系统提供了一种函数可以同时监控多个网络请求的操作,这个函数就是我们常说的select、poll、epoll函数,有了这个函数后,应用线程通过调用select函数就可以同时监控多个网络请求,select函数监控的网络请求中只要有任何一个数据状态准备就绪了,select函数就会返回可读状态,这时询问线程再去通知处理数据的线程,对应线程此时再发起recvfrom请求去读取数据。

**术语描述:**进程通过将一个或者多个网络请求传递给select,阻塞在select操作之上,select帮我们侦测多个网络请求是否准备就绪,当有网络请求准备就绪时,select返回数据可读状态,应用程序在调用recvfrom读取数据。

(2)I/O多路复用流程

1825d163e8154641be2cafd9ac60b0f3.jpg

  • 第一步:进程发起网络请求到select函数调用进行阻塞
  • 第二步:select函数调用内核获取数据报
  • 第三步:select函数监控的网络请求中只要有任何一个数据状态准备就绪了,select函数就会返回可读状态
  • 第四步:询问线程再去通知处理数据的线程,对应线程在次发起recvfrom请求去读取数据

4、信号驱动I/O

(1)什么是信号驱动I/O

信号驱动I/O不是循环请求询问的方式去监控数据就绪状态,而是调用sigaction时候建立一个SIGIO的信号联系,当内核数据准备好之后在通过SISGIO信号通知线程数据准备好后的可读状态,当线程收到可读状态的信号后,此时在向内核发起recvfrom读取数据的请求,因为信号驱动I/O的模型下应用线程在发出信号监控后即可返回,不会阻塞,所以这样的方式下,一个应用线程也可以控制多个网络请求。

**术语描述:**首先开启套接字信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数,此时请求即可返回,当数据准备就绪时,就生成对应进程的SIGIO信号,通过信号回调通知应用线程调用recvfrom来读取数据。

(2)信号驱动I/O流程


93f5ebfdc1684158899c186d24a89e66.jpg

  • 第一步:进程建立SIGIO的信号处理程序调用sigaction,然后返回
  • 第二步:内核准备好数据递交SIGIO给信号处理程序
  • 第三步:应用程序收到信号后,调用数据拷贝,复制完成返回数据报

5、异步I/O

(1)什么是异步I/O

异步I/O应用只需要向内核发送一个read请求,告诉内核他要读取数据后即刻返回,内核收到请求后会建立一个信号联系,但数据准备就绪,内核会主动把数据从内核复制到用户空间,等所有操作都完成之后,内核会发起一个通知告诉应用,处理数据报完成。

**术语描述:**应用告知内核启动某个操作,并让内核在整个操作完成之后,通知应用,这种模型与信号驱动的主要区别在于信号驱动I/O是由内核通知我们何时开始下一个I/O,而异步I/O模型是由内核通知我们操作什么时候完成。

(2)异步I/O流程


96056a1515a94d769bce0331e7da2369.jpg

  • 第一步:异步I/O应用只需要向内核发送一个read请求,告诉内核他要读取数据后即刻返回
  • 第二步:内核收到请求后会建立一个信号联系,但数据准备就绪
  • 第三步:内核会主动把数据从内核复制到用户空间,等所有操作都完成之后,内核会发起一个通知告诉应用,处理数据报完成

2.6.I/O多路复用之select,poll,epoll

1、select、poll、epoll简介

目前支持I/O多路复用的系统调用有 select,pselect,poll,epoll,I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作但select,pselect,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

epoll跟select都能提供多路I/O复用的解决方案。在现在的Linux内核里有都能够支持,其中epoll是Linux所特有,而select则应该是POSIX所规定,一般操作系统均有实现。

2、select函数

(1)基本原理

int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout);

select函数监视的文件描述符分为3类,分别是writefds、readfds和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fd_set,来找到就绪的文件描述符。

  • maxfdp1:待测试的文件描述符的个数,它的值是待测试的最大值加1
  • readfds:select监视的可读文件句柄集合
  • writefds:select监视的可写文件句柄集合
  • exceptfds:select监视的异常文件句柄集合
  • timeout:本次select()的超时结束时间。

(2)fd_set

select()提供了一种fd_set的核心数据结构,实际上是一个long类型的数组,每一个数组元素都能与一个打开的文件句柄(不管是Socket句柄,还是其他文件/命令管道/设备句柄)建立联系。当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一个socket或文件可读。

(3)select函数的优缺点

select目前几乎所有平台上支持,其良好的跨平台支持也是它的一个优点。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点:

  • select最大的缺陷就是单个进程所打开的fd是有一定限制的,它由FD_SETSIZE设置,默认值是1024。
一般来说这个数目和系统的内存关系很大,具体数目可以cat /proc/sys/fs/file-max 查看,32位默认是1024,64位默认是2048
  • 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。
当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都要遍历一遍,这会浪费很多CPU时间。
  • 需要维护一个用来存放大量fd的数据结构,这样会使用户空间和内核空间在传递该结构时复制开销大。

(4)select()总结

从执行过程来看,使用基于select的IO多路复用和同步阻塞IO没有太大的区别,而且多添加了监视socket以及调用select函数额外的操作,按理说效率更低。但是,select()可以让用户在一个线程内同时处理多个socket的IO请求,用户可以注册多个感兴趣的socket,然后不断的调用select轮询被激活的socket,即可达到单线程处理多个IO请求的目的。而在同步阻塞的的模型中,必须通过多线程的方式才能达到目的。

3、poll函数

(1)基本原理

int poll(struct pollfd *fds,nfds_t nfds,int timeout);
typedef struct pollfd{
  int fd;     //需要被检测或选择的文件描述符
  short events;   //对文件描述符fd上感兴趣的事件
  short revents;  //文件描述符fd上当前实际发生的事件
} pollfd_t;

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态。如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后并没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后他又要再次遍历fd,这个过程经历了多次无谓的遍历。

poll函数参数说明:

  • pollfd *fds:pollfd类型的数组,指向一个结构体数组的第0个元素,用于存放需要检测状态的socket描述符,并且调用poll函数之后fds数组不会被清空。
  • nfds_t nfds:数组fds中描述符的总数量。
  • timeout:超时连接。
  • pollfd:表示一个被监视的文件描述符,通常传递fds执行poll()监视多个文件描述符。
  • events:指定监视fd的事件(输入,输出,错误),是监视该文件描述符的事件掩码,由用户来设置。
  • revents:文件描述符的操作结果事件掩码,内核在调用返回时设置。

它没有最大连接数限制,原因是它是基于链表来存储的,但是同样有一个缺点:

1、大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义的。
2、poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

 从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降,同样包含大量文件描述符的数组依然会被整体从用户态复制到内核空间,而且内核也要遍历数组,对效率改善不大。


5ab8ab6813db4f0d823504e73f7f64b2.jpg4、epoll函数

epoll是在2.6内核中提出的。是之前的select和poll的增强版本。相比于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

(1)基本原理

epoll()是基于事件驱动的I/O方式,是Linux内核位处理大批量文件描述符而作了改进的poll,其实现机制与select/poll机制完全不同。epoll()没有描述符个数限制,它使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,,这样在用户空间和内核空间的拷贝操作只需要一次。

epoll()通过在内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个操作部分,在linux中,这三个部分对应的函数如下所示:

int epoll_create(int size);
int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event);
int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout);
  • epoll_create:负责建立一个epoll对象,在epoll文件系统中为了句柄对象分配资源。参数size表明内核要监听的描述符数量。
  • epoll_ctl:负责向内核的epoll对象中添加要监听的事件类型(文件描述符),已添加的描述符被维护在一颗红黑树上。
  • epoll_wait:负责收集已就绪事件的连接。

eventpoll

struct eventpoll{
    ...
    //红黑树的根节点,树中存储这所有添加到epoll中需要被监听的事件
    struct rb_root rbr;
    //双链表,存放这通过epoll_wait()返回的就绪事件
    struct list_head rdlist;
}
每个epoll对象都有一个独立的eventpoll,用于存放通过epoll_ctl()添加进来的事件。这些事件维护在红黑树中,红黑树的插入时间效率是log(n)(n为树的高度)
此外,被监听的事件都会与设备驱动程序建立回调关系,每当被监听的事件就绪,系统注册的回调函数就会被调用,将就绪事件放到rdList中,时间复杂度为O(1).
当调用epoll_wait()时,无需遍历整个被监听的描述符集,只需要遍历eventpoll对象中的rdlist双链表中是否有epitem元素即可。然后就把就绪事件复制到用户态,同时将事件数量返回给用户。
struct epitem{
    struct rb_node  rbn;         // 红黑树节点
    struct list_head    rdllink; // 双向链表节点
    struct epoll_filefd  ffd;    // 事件句柄信息
    struct eventpoll *ep;        // 指向所属的eventpoll对象
    struct epoll_event event;    // 期待发生的事件类型
}

epoll除了提供水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这使得用户空间程序有可能缓存IO状态,减少epoll_wait的调用,提高应用程序效率。

  • **水平触发(LT):**默认工作模式,当epoll_wait检测到某描述符事件就绪并通知应用进程时,应用进程可以不立即处理该事件,下次调用epoll_wait时,会再次通知进程。
  • 边缘触发(ET): 当epoll_wait检测到某描述符事件就绪并通知应用进程时,应用进程必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。这减少了同一事件的触发次数,使效率更高。



0eeb8dfd93334b9b8fbcec7e7617d98d.jpg


select
poll epoll
操作方式 遍历 遍历 回调
底层实现 数组 链表 红黑树
IO效率 每次调用都进行线性遍历,时间复杂度为O(n) 每次调用都进行线性遍历,时间复杂度为O(n) 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到readyList里面,时间复杂度O(1)
最大连接数 1024(x86)或2048(x64) 无上限 无上限
fd拷贝 每次调用select,都需要把fd集合从用户态拷贝到内核态 每次调用poll,都需要把fd集合从用户态拷贝到内核态 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝

2.7.Java的I/O演进历史

1、jdk1.4之前是采用同步阻塞模型,也就是BIO
    大型服务一般采用C或者C++, 因为可以直接操作系统提供的异步IO,AIO
2、jdk1.4推出NIO,支持非阻塞IO,jdk1.7升级,推出NIO2.0,提供AIO的功能,支持文件和网络套接字的异步IO

2.8.Reactor三种线程模型

设计模式——Reactor模式(反应器设计模式),是一种基于事件驱动的设计模式,在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求
一般出现在高并发系统中,比如Netty,Redis等

1、单线程模型

单个线程以非阻塞IO或事件IO处理所有IO事件,包括连接、读、写、异常、关闭等等。单线程Reactor模型基于同步事件分离器来分发事件,这个同步事件分离器,可以看作是一个单线程的while循环。下图描述了单线程模型的处理过程:



4ca01579b246402aad603a4e8357d7dc.jpg

注意上面的Selector之所以会有OP_ACEEPT事件,是因为在单线程模型中,Selector轮询的时监听套接字与已连接客户端套接字的所有IO事件。

单线程处理所有IO事件的弊端很明显。没能利用计算机CPU多核的特性,一个线程某个时刻只能处理单个IO事件,此时如果有其他描述符IO事件就绪,这些IO事件将暂时得不到处理。

c++框架libevent中,基于event_base_loop做消息轮询,使用event_base_dispatch来分发IO消息,本质上是对上述模型的封装。如果不适用evthread_use_pthreads,则其默认的就是单线程模型处理请求。

2、多线程模型

一个线程/进程接收连接、一组线程/进程处理IO读写事件。也就是将accept的线程与处理读、写等IO事件的线程分离,并且使用m多个线程以非阻塞IO或者事件IO来处理n个套接字的IO事件,这里的n一般远大于m,线程数m一般取CPU逻辑核心数的1-3倍,而套接字数n则取决于请求数和进程可以打开的最大描述符个数。下图是多线程模型:


17158346e4664b09a91e20425c328401.jpg


可以看到,这里把客户端的已连接套接字,转交给某个IO线程之后,由此线程轮询处理其他之后的所有IO事件,这实际参考了netty4的线程模型设计。实际reactor的多线程模型,并不需要将已连接套接字绑定在某个线程上,也可以统一放在连接池中,由多个IOWork线程从池中取连接进行轮询并处理,但这样会复杂很多,而且容易出问题,比如说不同线程从同一个channel收到了write事件,这就类似惊群问题了;并且多线程并发操作同一个channel,后续很可能需要你将IO事件进行同步,与其如此,不如直接将channel绑定到一个线程,让channel上触发与处理IO事件逻辑上同步。netty3中channel(已连接套接字)入站事件由固定线程处理,出站事件由触发的线程处理,netty4中修改了设计,将channel绑定到固定的eventloop(线程)。


另外一点,每个已连接套接字的IO事件由固定线程处理,不代表事件也一定由此线程触发,恰恰相反,实际业务中,读(入站)事件来自于客户端写数据触发,而写(出站)事件往往由别的线程触发,例如在发起一个异步mysql操作完成之后,在异步回调线程中写结果数据来触发套接字的出站。


3、主从多线程模型


一组线程/进程接收连接、一组线程/进程处理IO读写事件。它与多线程模型的主要区别在于使用一组线程或进程在一个共享的监听套接字上accept连接。这么做的原因是为了应付**单个线程/进程不足以快速处理内核中监听套接字的已连套接字队列(并发量极大)**的情况。如下:


c6dc2240483e4e17b3a46fa2ab9a1271.jpg

4、Netty支持的线程模型

  • Netty支持单线程、多线程模型、主从多线程模型。

8e0f8ba7b2b6414cad43851298203528.jpg

  • 初始化NioEventLoopGroup , 将为ServerSocketChannel 提供一个 bossGroup 线程池,为 SockerChannel 的I/O 事件处理 提供一个workGroup

  • 使用ServerBootstrap 绑定端口等相关信息,此时会初始化一个ServerSocketChannel 和 bossGroup,并且将 ServerSocketChannel 绑定到 bossGroup 中的一个NioEventLoop 中进行监听客户端的连接请求
  • 当 Client 发起连接请求时,首先经过三次握手通过后,然后服务端被触发,接着收到连接成功的通知(因为是异步所以是触发)
  • ServerSocketChannel 收到连接成功的通知后,将建立好的连接交给 workGroup中的某个NioEventLoop,然后将感兴趣的事件注册到 该 NioEventLoop 持有的Selector上,等待Client 下一次请求
  • 当 Client 发起 READ/WRITE 相关的请求时,则提交给NioEventLoop 进行处理

3.Netty搭建Echo服务

3.1.什么是Echo服务

  • 就是一个应答服务(回显服务器),客户端发送什么数据,服务端就响应对应的数据,常用于检测和调试服务。

搭建Netty项目

  • 创建maven项目,加入netty的依赖包
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.32.Final</version>
        </dependency>

3.2.Echo服务搭建

1、Echo服务端搭建-EchoServer

  • 绑定端口
  • 创建主从线程组
  • 启动线程组,指定通道类型,处理传过来的数据内容
  • 监听端口,关闭端口
  • 释放线程
public class EchoServer{
    //设定端口号
    private int port;
    //构造方法传入端口
    public EchoServer(int port){
        this.port = port;
    }
    public void run() throws InterruptedException {
        //创建线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try{
          //创建启动引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //设置线程组
            serverBootstrap.group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>(){
                    protected void initChannel(SocketChannel socketChannel) throws Exception{
                        //设置处理器,可以设置多个
                        socketChannel.pipeline().addLast(new EchoServerHandler());
                    }
                });
            System.out.println("Echo服务启动中...");
            //绑定端口,同步等待
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        }finally{
            //优雅退出
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefullt();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        //设置默认的端口
        int port = 8080;
        if(args.length>0){
            port = Integer.parseInt(args[0]);
        }    
        //调用启动方法
        new EchoServer(port).run();
    }
}

2、Echo服务端的处理器-EchoServerHandler

public EchoServerHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        ByteBuf data = (ByteBuf) msg;
        System.out.println("服务端收到数据:"+data.toString(CharsetUtil.UTF_8));
        //注意:数据一定要回写出去,不然客户端收不到
        ctx.writeAndFlush(data);
    } 
   @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("EchoServerHandler EchoServerHandler()");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3、Echo客户端搭建-EchoClient

public class EchoClient{
    //客户端发起请求的ip地址
    private String host;
    //端口
    private int port;
    //构造方法传入ip+端口初始化
    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void start(){
        //创建线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            //创建启动引导类
            BootStrap bootStrap = new BootStrap();
            bootStrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(host,port))
                .handler(new ChannelInitializer<SocketChannel>{
               @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //设置客户端处理器
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                        }   
            });
            //异步连接,同步阻塞,connect是异步连接
            ChannelFuture channelFuture = bootStrap.connect().sync();
            //阻塞住直到客户端通道关闭
            channelFuture.channel().closeFuture().sync();
        }finally{
            //优雅退出
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args){
        //启动客户端连接
        new EchoClient("127.0.0.1",8080).start();
    } 
}

4、EchoClient处理器-EchoClientHandler

  • 与服务端处理类继承的不一样,但是实质是一样的
  • 打印顺序是,先走Active()方法,先激活,标识服务端建立了通道
  • 读取服务端的数据
  • 读取完成,进入channelReadComplete()方法
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        System.out.println("Client received:"+byteBuf.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("EchoClientHandler.channelActive()");
        //创建一个缓存,使用Unpooled工具类
        ctx.writeAndFlush(Unpooled.copiedBuffer("李祥",CharsetUtil.UTF_8));
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("EchoClientHandler.channelReadComplete()");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

5、Echo服务中名词解析

(1)EventLoop和EventLoopGroup

线程和线程组,前者可以理解为线程,后者可以理解为线程组。

(2)BootStrap

启动引导类,用于配置线程组,启动的时候,同时启动线程组,以及开启通道,初始化通道。和一些处理。

(3)channel

channel是客户端和服务端建立的一个连接,是一个socket连接,具有生命周期,建立成功,读取数据,读取完成,出现异常等等。

(4)channelHandler和channelPipeline

channelHandler是做处理的,对接收的数据进行处理到要直接或间接继承channelHandler。channelPipeline就好比一个处理工厂,可以添加很多handler处理类,当进入channelHandler后都要经过pipeline添加的handler进行处理。

6、测试


4e3caa665bbc401abe0da74a0e98992b.jpg


f6927664285944afa64aa89908d5790a.jpg


相关文章
|
6月前
|
网络协议 安全 Linux
12. 测试搭建百万并发项目
12. 测试搭建百万并发项目
74 0
|
13天前
|
DataWorks NoSQL 关系型数据库
DataWorks操作报错合集之在使用 DataWorks 进行 MongoDB 同步时遇到了连通性测试失败,实例配置和 MongoDB 白名单配置均正确,且同 VPC 下 MySQL 可以成功连接并同步,但 MongoDB 却无法完成同样的操作如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
30 1
|
2月前
|
分布式计算 DataWorks 调度
DataWorks报错问题之DataWorks测试连接数据源报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
4月前
|
前端开发 Java Maven
【Netty 网络通信】启动客户端连接服务端实现通信
【1月更文挑战第9天】【Netty 网络通信】启动客户端连接服务端实现通信
|
11月前
|
网络协议
netty编程实战02-创建一个带有连接重试的tcp客户端程序
netty编程实战02-创建一个带有连接重试的tcp客户端程序
169 0
|
6月前
|
Java 专有云 数据库连接
专有云配置vertica数据源,测试连通性已连通,但是在配置数据集成时,连接报错,报vertica[vjdbc]100176错误,连接vertica数据库错误
专有云配置vertica数据源,测试连通性已连通,但是在配置数据集成时,连接报错,报vertica[vjdbc]100176错误,连接vertica数据库错误
55 1
H8
|
9月前
|
自然语言处理 物联网 Unix
全网最佳IoT命令行超级工具箱|帮你轻松解决百万物联网设备测试和联调
作为一个物联网开发和学习人员,IoT设备协议的测试联调是工作中很重要的一环!我有很多时刻都想拥有一个能集成常见物联网协议的客户端工具可供使用。经过我一通查找,发现和我拥有相同问题的人不在少数。 不仅仅是IoT开发者,包括云厂商、网络运营商都有相同烦恼: 开源物联网平台Thingsboard: coap -> coap.js(需要安装node); 移动OneNET平台: mqtt -> mqtt.fx(几年没更新了); 电信AEP平台:自定义TCP协议 -> sokit工具(只支持windows); 阿里云物联网平台: Nb-IoT协议 -> 需要到电信或移动平台上进行测试; 作者:穆书伟
H8
298 0
|
9月前
|
监控 安全
zabbix测试发邮件报错–连接到Zabbix主机 “localhost“ 失败
zabbix测试发邮件报错–连接到Zabbix主机 “localhost“ 失败
157 0
|
9月前
|
存储 Linux Shell
虚拟机安装(安装(克隆)虚拟机 配置网络 安装Centos7 配置(修改)虚拟机的静态IP 修改网卡的配置文件 测试网络是否互通外部工具 连接linux系统 设置服务器时间 修改主机名)(下)
虚拟机安装(安装(克隆)虚拟机 配置网络 安装Centos7 配置(修改)虚拟机的静态IP 修改网卡的配置文件 测试网络是否互通外部工具 连接linux系统 设置服务器时间 修改主机名)
375 0
|
9月前
|
分布式计算 Hadoop Linux
虚拟机安装(安装(克隆)虚拟机 配置网络 安装Centos7 配置(修改)虚拟机的静态IP 修改网卡的配置文件 测试网络是否互通外部工具 连接linux系统 设置服务器时间 修改主机名)(上)
虚拟机安装(安装(克隆)虚拟机 配置网络 安装Centos7 配置(修改)虚拟机的静态IP 修改网卡的配置文件 测试网络是否互通外部工具 连接linux系统 设置服务器时间 修改主机名)
289 0