Netty网络框架(一)

简介: Netty网络框架

Netty网络框架

(一) 基础篇

1、I/O基础

输入流:InputStream和Reader
输出流:OutputStream和Writer

               字节流                字符流

计算机最小的二进制单位   bit 比特    代表0和1
字节  1 byte = 8bit  计算机处理的最小单位
字符  1 char = 2byte = 16bit   人处理的最小单位

所以,字节流处理文件、图片、视频等二进制数据,而字符流处理文本数据。

2、Socket

原意是“插座”,在计算机领域中,翻译为“套接字”。
本质上,是计算机之间进行通信的一种方式。

Linux,“一切皆文件”,给每个文件映射一个ID,叫做"文件描述符"。
当处理网络连接时,也会看成一个文件,read/write变成和远程计算机的交互。

OSI七层模型 = Open System Interconnection 开放式系统互联
从下到上分别为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。

实际应用的是优化后的TCP/IP模型(四层)
网络接口层/链路层、网络层、传输层、应用层

应用层协议:HTTP、FTP、SMTP(邮件协议)
传输层协议:TCP、UDP

Socket其实是应用层与传输层之间的抽象层,是一组接口。
在设计模式中,是门面模式。

3、NIO

BIO - BlockingIO  同步阻塞
NIO - New IO /  Non-Blocking IO  同步非阻塞
AIO - Asynchronous IO  异步非阻塞

同步和异步,关注的是消息通知的机制
阻塞和非阻塞,关注的是等待消息过程中的状态

多路复用的模型

三大元素:Channel 、Buffer、Selector

1)  Channel

FileChannel   文件管道的数据
Pipe.SinkChannel
Pipe.SourceChannel  线程间通信的管道
ServerSocketChannel
SocketChannel      用于TCP网络通信的管道
DatagramChannel   用于UDP网络通信的管道

2) Buffer

capacity  总体容量大小
limit   存储容量的大小,是可读写和不可读写的界线
position  已读容量的大小,已读和未读区域的界线

【使用原理】
a)  初始化,给定总容量,position=0, limit=capacity
b)  当使用put方法存入数据是,通过position来记录存储的容量变化,position不断后移,直到存储结束(写完成)
c)写完成需要调用flip方法刷新,limit=position,position=0
保障limit记录的是可读写区域的大小,position已读部分重置为空
d)  读数据直到读完成,需要调用clear方法,position=0, limit=capacity

3)  Selector

三个元素: Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键

本质上,Selector是监听器,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用SelectionKey代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时Selector维护的是,通道和事件之间的关联关系。

Selector,管理被注册的通道集合,以及他们的状态
SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api。
FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器上吗?   可以的
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次

SelectionKey,封装了要监听的事件,连接、接收、读、写。
一方面,Selector关心通道要处理哪些事件
另一方面,当事件触发时,通道要处理哪些事件

【使用方式】

a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set , 进行遍历
遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ

4、零拷贝

需求:将磁盘中的文件读取出来,通过socket发送出去

传统的拷贝方式(4次)
Socket网络缓冲区,也属于操作系统的内核缓冲区。

在操作系统中进行的拷贝(如第二次和第三次),叫做CPU拷贝。
连接磁盘或网卡等硬件的拷贝(如第一次和第四次),叫做DMA拷贝。

零拷贝的概念:减少CPU拷贝的次数。

零拷贝是基于操作系统层面的优化方式(以下基于Linux系统)

1) mmap = memory mapping 内存映射

2)sendfile (linux2.1内核支持)

  1. sendfile with scatter/gather  copy(批量sendfile)
    从单个文件的处理,上升到多个物理地址的处理,提高处理速度

4)splice (拼接,在linux2.6内核支持)

   在操作系统内核缓冲区和Socket网络缓冲区之间建立管道,来减少拷贝次数。

线程模型

1) 单线程Reactor模型

顾名思义 就是使用一个线程来处理问题 线程中

  • selector
  • 事件处理 : 连接事件
  • 处理事件:handler

单线程服务器

public class ReactorServer {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    public ReactorServer() {
        try {
            // 初始化监听器 与 channel 通道
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);
            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);
            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件
            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生
            Acceptor acceptor = new Acceptor(selector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //如有接收的时Accpet 事件 获取的就是Acceptor 事件
                    //如果接受的时读写事件 获取的就是 Handler 事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
    }
}

Accpetor 连接事件

public class Acceptor implements Runnable {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }
    @Override
    public void run() {
        try {
            //接受客户端传入的连接时 Socket Channel
            SocketChannel socketChannel = serverSocketChannel.accept();
            //设置异步
            socketChannel.configureBlocking(false);
            SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
            // 创造处理器 处理连接
            //单线程
            //Handler handler = new Handler(key);
            //多线程
            MultHandler handler = new MultHandler(key);
            handler.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Handler 单线程处理

public class Handler implements Runnable {
    private SelectionKey key;
    private State state;
    public Handler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }
    @Override
    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                read();
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }
    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
2)多线程Reactor模型

提高handler的处理效率,首先handler不再负责具体的业务逻辑,当读取出数据后,分发给子线程处理,子线程处理完成后再将结果返回给handler,handler再将结果返回给客户端。

多线程处理 (handler使用线程池)

public class MultHandler implements Runnable {
    private SelectionKey key;
    private State state;
    private ExecutorService pool;
    public MultHandler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }
    @Override
    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                //将最耗时的操作 放入线程池执行
                pool.execute(new Runnable() {
                    @Override
                    public void run() {
                        read();
                    }
                });
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }
    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
3)主从Reactor模型

mainReactor用来接收连接事件,然后分发给acceptor,acceptor在处理过程中,直接将后续的读写事件,注册到slaveReactor之中,以此来达到分流。

主从监听器

//主从模型
public class MultReactorServer {
    private Selector mainselector;
    private Selector slaveselector;
    private ServerSocketChannel serverSocketChannel;
    public MultReactorServer() {
        try {
            // 主 reactor 处理连接事件
            mainselector = Selector.open();
            //从reactor 处理读写事件
            slaveselector = Selector.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);
            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);
            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件 (参数变化)
            SelectionKey key = serverSocketChannel.register(mainselector, SelectionKey.OP_ACCEPT);
            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生 (参数变化)
            Acceptor acceptor = new Acceptor(slaveselector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            //主从监听逻辑分离
            new HandlerLoop(slaveselector).run();
            while (true) {
                //返回事件的个数 处理事件
                int num = mainselector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = mainselector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理主Reactor 只处理连接事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

主从事件处理读写分离

//用于处理从reactor事件监听
public class HandlerLoop implements Runnable {
    private Selector selector;
    public HandlerLoop(Selector selector) {
        this.selector = selector;
    }
    @Override
    public void run() {
        try {
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理从reactor 所以 接受的一定是读写事件
                    Runnable runnable = (Runnable) selectionKey.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


Netty网络框架(二)https://developer.aliyun.com/article/1469520

目录
打赏
0
1
1
0
333
分享
相关文章
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
193 6
基于图神经网络的大语言模型检索增强生成框架研究:面向知识图谱推理的优化与扩展
本文探讨了图神经网络(GNN)与大型语言模型(LLM)结合在知识图谱问答中的应用。研究首先基于G-Retriever构建了探索性模型,然后深入分析了GNN-RAG架构,通过敏感性研究和架构改进,显著提升了模型的推理能力和答案质量。实验结果表明,改进后的模型在多个评估指标上取得了显著提升,特别是在精确率和召回率方面。最后,文章提出了反思机制和教师网络的概念,进一步增强了模型的推理能力。
84 4
基于图神经网络的大语言模型检索增强生成框架研究:面向知识图谱推理的优化与扩展
|
2月前
|
Nettyの网络聊天室&扩展序列化算法
通过本文的介绍,我们详细讲解了如何使用Netty构建一个简单的网络聊天室,并扩展序列化算法以提高数据传输效率。Netty的高性能和灵活性使其成为实现各种网络应用的理想选择。希望本文能帮助您更好地理解和使用Netty进行网络编程。
50 12
WebDreamer:基于大语言模型模拟网页交互增强网络规划能力的框架
WebDreamer是一个基于大型语言模型(LLMs)的网络智能体框架,通过模拟网页交互来增强网络规划能力。它利用GPT-4o作为世界模型,预测用户行为及其结果,优化决策过程,提高性能和安全性。WebDreamer的核心在于“做梦”概念,即在实际采取行动前,用LLM预测每个可能步骤的结果,并选择最有可能实现目标的行动。
91 1
WebDreamer:基于大语言模型模拟网页交互增强网络规划能力的框架
Swift 中的网络编程,主要介绍了 URLSession 和 Alamofire 两大框架的特点、用法及实际应用
本文深入探讨了 Swift 中的网络编程,主要介绍了 URLSession 和 Alamofire 两大框架的特点、用法及实际应用。URLSession 由苹果提供,支持底层网络控制;Alamofire 则是在 URLSession 基础上增加了更简洁的接口和功能扩展。文章通过具体案例对比了两者的使用方法,帮助开发者根据需求选择合适的网络编程工具。
63 3
Netty从入门到精通:高性能网络编程的进阶之路
【11月更文挑战第17天】Netty是一个基于Java NIO(Non-blocking I/O)的高性能、异步事件驱动的网络应用框架。使用Netty,开发者可以快速、高效地开发可扩展的网络服务器和客户端程序。本文将带您从Netty的背景、业务场景、功能点、解决问题的关键、底层原理实现,到编写一个详细的Java示例,全面了解Netty,帮助您从入门到精通。
355 0
网络安全法律框架:全球视角下的合规性分析
网络安全法律框架:全球视角下的合规性分析
76 1
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
152 4
精选2款C#/.NET开源且功能强大的网络通信框架
精选2款C#/.NET开源且功能强大的网络通信框架
111 0
一个整合性、功能丰富的.NET网络通信框架
一个整合性、功能丰富的.NET网络通信框架

热门文章

最新文章