Netty网络框架(一)

简介: Netty网络框架

Netty网络框架

(一) 基础篇

1、I/O基础

输入流:InputStream和Reader
输出流:OutputStream和Writer

               字节流                字符流

计算机最小的二进制单位   bit 比特    代表0和1
字节  1 byte = 8bit  计算机处理的最小单位
字符  1 char = 2byte = 16bit   人处理的最小单位

所以,字节流处理文件、图片、视频等二进制数据,而字符流处理文本数据。

2、Socket

原意是“插座”,在计算机领域中,翻译为“套接字”。
本质上,是计算机之间进行通信的一种方式。

Linux,“一切皆文件”,给每个文件映射一个ID,叫做"文件描述符"。
当处理网络连接时,也会看成一个文件,read/write变成和远程计算机的交互。

OSI七层模型 = Open System Interconnection 开放式系统互联
从下到上分别为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。

实际应用的是优化后的TCP/IP模型(四层)
网络接口层/链路层、网络层、传输层、应用层

应用层协议:HTTP、FTP、SMTP(邮件协议)
传输层协议:TCP、UDP

Socket其实是应用层与传输层之间的抽象层,是一组接口。
在设计模式中,是门面模式。

3、NIO

BIO - BlockingIO  同步阻塞
NIO - New IO /  Non-Blocking IO  同步非阻塞
AIO - Asynchronous IO  异步非阻塞

同步和异步,关注的是消息通知的机制
阻塞和非阻塞,关注的是等待消息过程中的状态

多路复用的模型

三大元素:Channel 、Buffer、Selector

1)  Channel

FileChannel   文件管道的数据
Pipe.SinkChannel
Pipe.SourceChannel  线程间通信的管道
ServerSocketChannel
SocketChannel      用于TCP网络通信的管道
DatagramChannel   用于UDP网络通信的管道

2) Buffer

capacity  总体容量大小
limit   存储容量的大小,是可读写和不可读写的界线
position  已读容量的大小,已读和未读区域的界线

【使用原理】
a)  初始化,给定总容量,position=0, limit=capacity
b)  当使用put方法存入数据是,通过position来记录存储的容量变化,position不断后移,直到存储结束(写完成)
c)写完成需要调用flip方法刷新,limit=position,position=0
保障limit记录的是可读写区域的大小,position已读部分重置为空
d)  读数据直到读完成,需要调用clear方法,position=0, limit=capacity

3)  Selector

三个元素: Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键

本质上,Selector是监听器,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用SelectionKey代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时Selector维护的是,通道和事件之间的关联关系。

Selector,管理被注册的通道集合,以及他们的状态
SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api。
FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器上吗?   可以的
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次

SelectionKey,封装了要监听的事件,连接、接收、读、写。
一方面,Selector关心通道要处理哪些事件
另一方面,当事件触发时,通道要处理哪些事件

【使用方式】

a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set , 进行遍历
遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ

4、零拷贝

需求:将磁盘中的文件读取出来,通过socket发送出去

传统的拷贝方式(4次)
Socket网络缓冲区,也属于操作系统的内核缓冲区。

在操作系统中进行的拷贝(如第二次和第三次),叫做CPU拷贝。
连接磁盘或网卡等硬件的拷贝(如第一次和第四次),叫做DMA拷贝。

零拷贝的概念:减少CPU拷贝的次数。

零拷贝是基于操作系统层面的优化方式(以下基于Linux系统)

1) mmap = memory mapping 内存映射

2)sendfile (linux2.1内核支持)

  1. sendfile with scatter/gather  copy(批量sendfile)
    从单个文件的处理,上升到多个物理地址的处理,提高处理速度

4)splice (拼接,在linux2.6内核支持)

   在操作系统内核缓冲区和Socket网络缓冲区之间建立管道,来减少拷贝次数。

线程模型

1) 单线程Reactor模型

顾名思义 就是使用一个线程来处理问题 线程中

  • selector
  • 事件处理 : 连接事件
  • 处理事件:handler

单线程服务器

public class ReactorServer {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    public ReactorServer() {
        try {
            // 初始化监听器 与 channel 通道
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);
            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);
            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件
            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生
            Acceptor acceptor = new Acceptor(selector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //如有接收的时Accpet 事件 获取的就是Acceptor 事件
                    //如果接受的时读写事件 获取的就是 Handler 事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
    }
}

Accpetor 连接事件

public class Acceptor implements Runnable {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }
    @Override
    public void run() {
        try {
            //接受客户端传入的连接时 Socket Channel
            SocketChannel socketChannel = serverSocketChannel.accept();
            //设置异步
            socketChannel.configureBlocking(false);
            SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
            // 创造处理器 处理连接
            //单线程
            //Handler handler = new Handler(key);
            //多线程
            MultHandler handler = new MultHandler(key);
            handler.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Handler 单线程处理

public class Handler implements Runnable {
    private SelectionKey key;
    private State state;
    public Handler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }
    @Override
    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                read();
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }
    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
2)多线程Reactor模型

提高handler的处理效率,首先handler不再负责具体的业务逻辑,当读取出数据后,分发给子线程处理,子线程处理完成后再将结果返回给handler,handler再将结果返回给客户端。

多线程处理 (handler使用线程池)

public class MultHandler implements Runnable {
    private SelectionKey key;
    private State state;
    private ExecutorService pool;
    public MultHandler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }
    @Override
    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                //将最耗时的操作 放入线程池执行
                pool.execute(new Runnable() {
                    @Override
                    public void run() {
                        read();
                    }
                });
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }
    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
3)主从Reactor模型

mainReactor用来接收连接事件,然后分发给acceptor,acceptor在处理过程中,直接将后续的读写事件,注册到slaveReactor之中,以此来达到分流。

主从监听器

//主从模型
public class MultReactorServer {
    private Selector mainselector;
    private Selector slaveselector;
    private ServerSocketChannel serverSocketChannel;
    public MultReactorServer() {
        try {
            // 主 reactor 处理连接事件
            mainselector = Selector.open();
            //从reactor 处理读写事件
            slaveselector = Selector.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);
            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);
            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件 (参数变化)
            SelectionKey key = serverSocketChannel.register(mainselector, SelectionKey.OP_ACCEPT);
            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生 (参数变化)
            Acceptor acceptor = new Acceptor(slaveselector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            //主从监听逻辑分离
            new HandlerLoop(slaveselector).run();
            while (true) {
                //返回事件的个数 处理事件
                int num = mainselector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = mainselector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理主Reactor 只处理连接事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

主从事件处理读写分离

//用于处理从reactor事件监听
public class HandlerLoop implements Runnable {
    private Selector selector;
    public HandlerLoop(Selector selector) {
        this.selector = selector;
    }
    @Override
    public void run() {
        try {
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理从reactor 所以 接受的一定是读写事件
                    Runnable runnable = (Runnable) selectionKey.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


Netty网络框架(二)https://developer.aliyun.com/article/1469520

目录
相关文章
|
3天前
|
安全 网络协议 网络安全
OWASP Top 10 网络安全10大漏洞——A01,源码+原理+手写框架
OWASP Top 10 网络安全10大漏洞——A01,源码+原理+手写框架
|
3天前
|
关系型数据库 MySQL 网络安全
Docker部署MySQL,2024网络安全通用流行框架大全
Docker部署MySQL,2024网络安全通用流行框架大全
|
5天前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
142 2
|
5天前
|
存储 网络协议 Linux
RTnet – 灵活的硬实时网络框架
本文介绍了开源项目 RTnet。RTnet 为以太网和其他传输媒体上的硬实时通信提供了一个可定制和可扩展的框架。 本文描述了 RTnet 的架构、核心组件和协议。
23 0
RTnet – 灵活的硬实时网络框架
|
5天前
|
网络协议 Java API
Python网络编程基础(Socket编程)Twisted框架简介
【4月更文挑战第12天】在网络编程的实践中,除了使用基本的Socket API之外,还有许多高级的网络编程库可以帮助我们更高效地构建复杂和健壮的网络应用。这些库通常提供了异步IO、事件驱动、协议实现等高级功能,使得开发者能够专注于业务逻辑的实现,而不用过多关注底层的网络细节。
|
5天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
61 0
|
5天前
|
XML 网络协议 前端开发
Netty网络框架(三)
Netty网络框架
29 1
|
5天前
|
存储 编解码 网络协议
Netty网络框架(二)
Netty网络框架
38 0
|
8月前
|
监控 Java Linux
由浅入深Netty基础知识NIO网络编程1
由浅入深Netty基础知识NIO网络编程
43 0
|
8月前
|
缓存 安全 Java
由浅入深Netty基础知识NIO三大组件原理实战 2
由浅入深Netty基础知识NIO三大组件原理实战
48 0

热门文章

最新文章