Netty网络框架(一)

简介: Netty网络框架

Netty网络框架

(一) 基础篇

1、I/O基础

输入流:InputStream和Reader
输出流:OutputStream和Writer

               字节流                字符流

计算机最小的二进制单位   bit 比特    代表0和1
字节  1 byte = 8bit  计算机处理的最小单位
字符  1 char = 2byte = 16bit   人处理的最小单位

所以,字节流处理文件、图片、视频等二进制数据,而字符流处理文本数据。

2、Socket

原意是“插座”,在计算机领域中,翻译为“套接字”。
本质上,是计算机之间进行通信的一种方式。

Linux,“一切皆文件”,给每个文件映射一个ID,叫做"文件描述符"。
当处理网络连接时,也会看成一个文件,read/write变成和远程计算机的交互。

OSI七层模型 = Open System Interconnection 开放式系统互联
从下到上分别为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。

实际应用的是优化后的TCP/IP模型(四层)
网络接口层/链路层、网络层、传输层、应用层

应用层协议:HTTP、FTP、SMTP(邮件协议)
传输层协议:TCP、UDP

Socket其实是应用层与传输层之间的抽象层,是一组接口。
在设计模式中,是门面模式。

3、NIO

BIO - BlockingIO  同步阻塞
NIO - New IO /  Non-Blocking IO  同步非阻塞
AIO - Asynchronous IO  异步非阻塞

同步和异步,关注的是消息通知的机制
阻塞和非阻塞,关注的是等待消息过程中的状态

多路复用的模型

三大元素:Channel 、Buffer、Selector

1)  Channel

FileChannel   文件管道的数据
Pipe.SinkChannel
Pipe.SourceChannel  线程间通信的管道
ServerSocketChannel
SocketChannel      用于TCP网络通信的管道
DatagramChannel   用于UDP网络通信的管道

2) Buffer

capacity  总体容量大小
limit   存储容量的大小,是可读写和不可读写的界线
position  已读容量的大小,已读和未读区域的界线

【使用原理】
a)  初始化,给定总容量,position=0, limit=capacity
b)  当使用put方法存入数据是,通过position来记录存储的容量变化,position不断后移,直到存储结束(写完成)
c)写完成需要调用flip方法刷新,limit=position,position=0
保障limit记录的是可读写区域的大小,position已读部分重置为空
d)  读数据直到读完成,需要调用clear方法,position=0, limit=capacity

3)  Selector

三个元素: Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键

本质上,Selector是监听器,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用SelectionKey代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时Selector维护的是,通道和事件之间的关联关系。

Selector,管理被注册的通道集合,以及他们的状态
SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api。
FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器上吗?   可以的
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次

SelectionKey,封装了要监听的事件,连接、接收、读、写。
一方面,Selector关心通道要处理哪些事件
另一方面,当事件触发时,通道要处理哪些事件

【使用方式】

a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set , 进行遍历
遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ

4、零拷贝

需求:将磁盘中的文件读取出来,通过socket发送出去

传统的拷贝方式(4次)
Socket网络缓冲区,也属于操作系统的内核缓冲区。

在操作系统中进行的拷贝(如第二次和第三次),叫做CPU拷贝。
连接磁盘或网卡等硬件的拷贝(如第一次和第四次),叫做DMA拷贝。

零拷贝的概念:减少CPU拷贝的次数。

零拷贝是基于操作系统层面的优化方式(以下基于Linux系统)

1) mmap = memory mapping 内存映射

2)sendfile (linux2.1内核支持)

  1. sendfile with scatter/gather  copy(批量sendfile)
    从单个文件的处理,上升到多个物理地址的处理,提高处理速度

4)splice (拼接,在linux2.6内核支持)

   在操作系统内核缓冲区和Socket网络缓冲区之间建立管道,来减少拷贝次数。

线程模型

1) 单线程Reactor模型

顾名思义 就是使用一个线程来处理问题 线程中

  • selector
  • 事件处理 : 连接事件
  • 处理事件:handler

单线程服务器

public class ReactorServer {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    public ReactorServer() {
        try {
            // 初始化监听器 与 channel 通道
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);
            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);
            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件
            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生
            Acceptor acceptor = new Acceptor(selector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //如有接收的时Accpet 事件 获取的就是Acceptor 事件
                    //如果接受的时读写事件 获取的就是 Handler 事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
    }
}

Accpetor 连接事件

public class Acceptor implements Runnable {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }
    @Override
    public void run() {
        try {
            //接受客户端传入的连接时 Socket Channel
            SocketChannel socketChannel = serverSocketChannel.accept();
            //设置异步
            socketChannel.configureBlocking(false);
            SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
            // 创造处理器 处理连接
            //单线程
            //Handler handler = new Handler(key);
            //多线程
            MultHandler handler = new MultHandler(key);
            handler.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Handler 单线程处理

public class Handler implements Runnable {
    private SelectionKey key;
    private State state;
    public Handler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }
    @Override
    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                read();
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }
    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
2)多线程Reactor模型

提高handler的处理效率,首先handler不再负责具体的业务逻辑,当读取出数据后,分发给子线程处理,子线程处理完成后再将结果返回给handler,handler再将结果返回给客户端。

多线程处理 (handler使用线程池)

public class MultHandler implements Runnable {
    private SelectionKey key;
    private State state;
    private ExecutorService pool;
    public MultHandler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }
    @Override
    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                //将最耗时的操作 放入线程池执行
                pool.execute(new Runnable() {
                    @Override
                    public void run() {
                        read();
                    }
                });
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }
    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
3)主从Reactor模型

mainReactor用来接收连接事件,然后分发给acceptor,acceptor在处理过程中,直接将后续的读写事件,注册到slaveReactor之中,以此来达到分流。

主从监听器

//主从模型
public class MultReactorServer {
    private Selector mainselector;
    private Selector slaveselector;
    private ServerSocketChannel serverSocketChannel;
    public MultReactorServer() {
        try {
            // 主 reactor 处理连接事件
            mainselector = Selector.open();
            //从reactor 处理读写事件
            slaveselector = Selector.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);
            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);
            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件 (参数变化)
            SelectionKey key = serverSocketChannel.register(mainselector, SelectionKey.OP_ACCEPT);
            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生 (参数变化)
            Acceptor acceptor = new Acceptor(slaveselector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            //主从监听逻辑分离
            new HandlerLoop(slaveselector).run();
            while (true) {
                //返回事件的个数 处理事件
                int num = mainselector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = mainselector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理主Reactor 只处理连接事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

主从事件处理读写分离

//用于处理从reactor事件监听
public class HandlerLoop implements Runnable {
    private Selector selector;
    public HandlerLoop(Selector selector) {
        this.selector = selector;
    }
    @Override
    public void run() {
        try {
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理从reactor 所以 接受的一定是读写事件
                    Runnable runnable = (Runnable) selectionKey.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


Netty网络框架(二)https://developer.aliyun.com/article/1469520

目录
相关文章
|
1月前
|
监控 安全
从 Racket 语言出发,创新员工网络监控软件的框架
在数字化企业环境中,员工网络监控软件对于保障信息安全和提升效率至关重要。Racket 语言凭借其独特特性和强大功能,为开发创新的监控软件提供了新可能。通过捕获和分析网络数据包、记录员工网络活动日志,甚至构建复杂的监控框架,Racket 能够满足企业的定制化需求,为企业信息安全和管理提供强有力支持。未来,基于 Racket 的创新解决方案将不断涌现。
40 6
|
15天前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
59 6
|
23天前
|
机器学习/深度学习 人工智能
类人神经网络再进一步!DeepMind最新50页论文提出AligNet框架:用层次化视觉概念对齐人类
【10月更文挑战第18天】这篇论文提出了一种名为AligNet的框架,旨在通过将人类知识注入神经网络来解决其与人类认知的不匹配问题。AligNet通过训练教师模型模仿人类判断,并将人类化的结构和知识转移至预训练的视觉模型中,从而提高模型在多种任务上的泛化能力和稳健性。实验结果表明,人类对齐的模型在相似性任务和出分布情况下表现更佳。
53 3
|
8天前
|
存储 安全 网络安全
网络安全法律框架:全球视角下的合规性分析
网络安全法律框架:全球视角下的合规性分析
19 1
|
16天前
|
数据采集 前端开发 中间件
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
39 4
|
16天前
|
网络协议 物联网 API
Python网络编程:Twisted框架的异步IO处理与实战
【10月更文挑战第26天】Python 是一门功能强大且易于学习的编程语言,Twisted 框架以其事件驱动和异步IO处理能力,在网络编程领域独树一帜。本文深入探讨 Twisted 的异步IO机制,并通过实战示例展示其强大功能。示例包括创建简单HTTP服务器,展示如何高效处理大量并发连接。
38 1
|
3天前
|
网络协议 Unix Linux
精选2款C#/.NET开源且功能强大的网络通信框架
精选2款C#/.NET开源且功能强大的网络通信框架
|
4天前
|
网络协议 网络安全 Apache
一个整合性、功能丰富的.NET网络通信框架
一个整合性、功能丰富的.NET网络通信框架
|
5天前
|
消息中间件 编解码 网络协议
Netty从入门到精通:高性能网络编程的进阶之路
【11月更文挑战第17天】Netty是一个基于Java NIO(Non-blocking I/O)的高性能、异步事件驱动的网络应用框架。使用Netty,开发者可以快速、高效地开发可扩展的网络服务器和客户端程序。本文将带您从Netty的背景、业务场景、功能点、解决问题的关键、底层原理实现,到编写一个详细的Java示例,全面了解Netty,帮助您从入门到精通。
22 0
|
1月前
|
机器学习/深度学习 数据采集 算法
目标分类笔记(一): 利用包含多个网络多种训练策略的框架来完成多目标分类任务(从数据准备到训练测试部署的完整流程)
这篇博客文章介绍了如何使用包含多个网络和多种训练策略的框架来完成多目标分类任务,涵盖了从数据准备到训练、测试和部署的完整流程,并提供了相关代码和配置文件。
46 0
目标分类笔记(一): 利用包含多个网络多种训练策略的框架来完成多目标分类任务(从数据准备到训练测试部署的完整流程)