一、概念
Intel® DPDK全称Intel Data Plane Development Kit,是intel提供的数据平面开发工具集,为Intel architecture(IA)处理器架构下用户空间高效的数据包处理提供库函数和驱动的支持,它不同于Linux系统以通用性设计为目的,而是专注于网络应用中数据包的高性能处理。目前已经验证可以运行在大多数Linux操作系统上。DPDK使用了BSDLicense,极大的方便了企业在其基础上来实现自己的协议栈或者应用。目前出现了很多基于 dpdk 的高性能网络框架,OVS 和 VPP 是常用的数据面框架,mTCP 和 f-stack 是常用的用户态协议栈。
需要强调的是,DPDK应用程序是运行在用户空间上利用自身提供的数据平面库来收发数据包,绕过了Linux内核协议栈对数据包处理过程。Linux内核将DPDK应用程序看作是一个普通的用户态进程,包括它的编译、连接和加载方式和普通程序没有什么两样。如下图2所示DPDK包处理流程绕过了内核直接到用户层进行处理,区别于传统的数据包先到内核最后再到用户层。
图1 传统网络包处理路径 图2 DPDK数据包处理路径
Kni(Kernel NIC Interface)内核网卡接口,是DPDK允许用户态和内核态交换报文的解决方案,例如DPDK的协议栈是专门处理DNS报文,其余的报文通过KNI接口返回给内核来处理。KNI模拟了一个虚拟的网口,提供dpdk的应用程序和linux内核之间通讯。用于DPDK和内核的交互,kni接口允许报文从用户态接收后转发到内核协议栈去。
DPDK的包全部在用户空间使用内存池管理,内核空间与用户空间的内存交互不用进行拷贝,只做控制权转移。DPDK的主要对外函数接口都以rte_作为前缀,抽象化函数接口是典型软件设计思路,rte是指runtime environment,eal是指environmentabstraction layer。下图3是kni的mbuf使用流程图,可以看出报文的流向,因为报文在代码中其实就是一个个内存指针。其中rx_q右边是用户态,左边是内核态。最后通过调用netif_rx()将报文送入linux协议栈,这其中需要将dpdk的mbuf转换成skb_buf。当linux向kni端口发送报文时,调用回调函数kni_net_tx(),然后报文经过转换之后发送到端口上。rte_pktmbut_free()把内存重新释放到mbuf内存池中。
图3 数据包通过KNI的流程
二、DPDK的Helloworld代码示例
HelloWorld是最基础的入门程序,代码简短,功能也不复杂。它建立了一个多核(线程)运行的基础环境,每个线程会打印“hello from core #”,core # 是由操作系统管理的。
int main(int argc, char **argv) { int ret; unsigned lcore_id; ret = rte_eal_init(argc, argv); ) rte_panic("Cannot init EAL\n"); /* call lcore_hello() on every slave lcore */ RTE_LCORE_FOREACH_SLAVE(lcore_id) { rte_eal_remote_launch(lcore_hello, NULL, lcore_id); } /* call it on master lcore too */ lcore_hello(NULL); rte_eal_mp_wait_lcore(); ; }
对于HelloWorld这个实例,最需要的参数是“-c ”,线程掩码(coremask)指定了需要参与运行的线程(核)集合。rte_eal_init本身所完成的工作是复杂的,它读取入口参数,解析并保存作为DPDK运行的系统信息,依赖这些信息,构建一个针对包处理设计的运行环境。主要动作分解为:配置初始化-->内存初始化-->内存池初始化-->队列初始化-->告警初始化-->中断初始化-->PCI初始化-->定时器初始化-->检测内存本地化(NUMA)-->插件初始化-->主线程初始化-->轮询设备初始化-->建立主从线程通道-->将从线程设置在等待模式-->PCI设备的探测与初始化...对于DPDK库的使用者,这些操作已经被EAL封装起来,接口清晰。如果需要对DPDK进行深度定制,二次开发,需要仔细研究内部操作,详见官方网站https://doc.dpdk.org/guides/prog_guide/。
还不熟悉的朋友,这里可以先领取一份dpdk新手学习资料包(入坑不亏):
DPDK面向多核设计,程序会试图独占运行在逻辑核(lcore)上。Main函数里重要部分是启动多核运行环境,RTE_LCORE_FOREACH_SLAVE(lcore_id)如名所示,遍历所有EAL指定可以使用的lcore,然后通过rte_eal_remote_launch在每个lcore上,启动被指定的线程。
int rte_eal_remote_launch(int (*f)(void *), void *arg, unsignedslave_id);
第一个参数是从线程,是被征召的线程,第二个参数是传给从线程的参数,第三个参数是指定的逻辑核,从线程会执行在这个core上。具体来说,int rte_eal_remote_launch(lcore_hello,NULL, lcore_id);参数lcore_id指定了从线程ID,运行入口函数lcore_hello.
运行函数lcore_hello,它读取自己的逻辑核编号(lcore_id), 打印出“hellofrom core #”
static int lcore_hello(__attribute__((unused)) void *arg) { unsigned lcore_id; lcore_id = rte_lcore_id(); printf("hello from core %u\n", lcore_id); ; }
以上仅是个简单示例,在真实的DPDK处理场景中,该处理函数会是一个循环运行的处理过程。
DPDK也有自身的劣势,对于低负荷的场景不建议使用DPDK:
- 内核栈转移至用户层增加了开发成本.
- 低负荷服务器不实用,会造成内核空转.
三、DPDK库函数
3.1 EAL库
EAL环境适配层
环境抽象层(EAL)负责获得对底层资源(如硬件和内存空间)的访问。对于应用程序和其他库来说,使用这个通用接口可以不用关系具体操作系统的环境细节。rte_eal_init初始化例程负责决定如何分配操作系统的这些资源(即内存空间、设备、定时器、控制台等等)。
EAL(Environment Abstraction Layer,环境抽象层)对 DPDK 的运行环境(e.g. Linux 操作系统)进行初始化,并为上层应用(用户态 DPDK App)提供了一个通用接口,隐藏了与底层库与设备打交道的相关细节。EAL 主要实现了 DPDK 运行的初始化工作,包括:HugePage 内存分配、NUMA 亲和性、CPU 绑定、Memory 划分、Buffer 划分、Ring 队列分配、原子性无锁操作等,并通过 UIO 或 VFIO 技术将 PCI/PCIe 设备地址映射到用户空间,方便了用户态的 DPDK App 调用。
- DPDK App 的加载和启动:将 DPDK App 和 DPDK Lib 链接成一个独立的进程,并以指定的方式加载。
- NUMA 亲和性与 CPU 绑定:将 DPDK App 的执行单元(进程、线程)绑定到特定的 Core 上。
- 内存分配:EAL 实现了不同区域的内存分配,例如:为 PCI 设备接口提供了物理内存。
- PCI 地址(BAR)抽象:EAL 提供了对 PCI 地址空间的访问接口。
- 跟踪调试功能(Debug):日志信息,堆栈打印、异常挂起等等。
- 通用功能:提供了标准 libc,不提供的自旋锁、原子计数器等。
- CPU Feature 标识功能:用于决定 CPU 运行时的一些特殊功能,决定当前 CPU 所支持的特性,以便编译对应的二进制文件。
- 中断处理:提供接口作为中断注册/解注册的回调函数。
- 告警功能:提供接口用于设置特定环境下的告警。
Linux环境下的EAL
在 Linux 用户空间中,DPDK App 通过 pthread 库作为一个用户态的进程运行。PCI 设备的信息和 BAR 地址空间通过 /sys 内核接口及内核模块,例如:uio_pci_generic 或 igb_uio 来发现并注册的。
正如 Linux 内核文档中对 UIO 的描述,PCI 设备的存储器空间信息是通过 mmap 重新映射到用户态的,相对于传统的 read/write 调用少了一次数据拷贝(内核缓存 => 用户态缓冲)的过程。EAL 通过对 hugetlb 使用 mmap 接口来实现 hugetlbfs 文件系统的空间映射到用户进程的虚拟内存地址空间。这部分内存暴露给 DPDK 服务层,如:Mempool Library。最后,DPDK 通过设置 CPU 绑定和 NUMA 亲和性调用,将每个执行单元(进程、线程)分配给特定的逻辑核,以 User-level 等级运行。
另外,DPDK 的定时器是通过 CPU 的 TSC(时间戳计数器)或者通过 mmap 调用内核的 HPET 系统接口来实现的。
DPDK App 的初始化和运行
DPDK App 的初始化从 glibc 的 start() 开始执行,检查也在初始化过程中被执行,用于保证配置文件所选择的处理器架构宏定义是当前 CPU 所支持的,然后才开始调用 DPDK App 的 main()。Master/logic Core 的初始化和运行时在 rte_eal_init() 上执行的,包括:对 pthread 库的调用。初始化流程如下图所示:
- rte_eal_init() 的初始化,包括:内存区间、Ring、内存池、lPM 表或 HASH 表等,必须作为整个 DPDK App 初始化的一部分,在 Master Core 上完成。创建和初始化这些对象的函数不是多线程安全的,但是,一旦初始化完成后,这些对象本身是线程安全的。
- 多进程支持:Linux 上运行的 DPDK App 支持多进程运行模式。
- 内存分配:连续的物理内存分配是通过 hugetlbfs 内核文件系统来实现的。 EAL 提供了相应的接口(函数)用于申请指定名字的、连续的内存空间。 这个接口同时会将这段连续空间的地址返回给用户程序。内存申请是使用 rte_malloc() 接口来完成的,是 hugetlbfs 文件系统支持的调用。
- PCI 设备访问:EAL 使用了 Linux 内核提供的文件系统 /sys/bus/pci 来扫描 PCI Bus(总线)上的内容。通过 uio_pci_generic(Linux 内核提供的原生 UIO 模块)或 igb_uio(DPDK 提供的 UIO 内核模块)提供的 /dev/uioX 设备文件,以及 /sys 下对应的资源文件(resource0…N)用于访问 PCI 设备。
- 逻辑核与共享内存(变量):逻辑核就是处理器的逻辑单元,即:线程(Thread),线程之间的交互默认通过共享内存(变量)来完成。共享内存是 Linux 上的一种 IPC(Inter-Process Communication,进程间通信)技术,提供了进程间通信的方法。实际上是通过线程局部存储技术 TLS 来实现的,它提供了每个线程访问本地存储的功能。
- 日志:EAL 提供了日志信息接口。 默认的,Linux 的应用程序(包括 DPDK App)的日志信息被发送到 syslog 中或打印到 concole 上。DPDK App 也支持用户可以通过使用不同的日志机制来代替上述方式。
- 跟踪与调试功能(DEBUG):Glibc 提供了一些调试函数用于打印应用程序的堆栈信息。rte_panic() 可以产生一个 SIG_ABORT 信号,这个信号可以触发产生用于 GBD 调试的 core 文件,所以,我们可以通过 gdb 指令来加载并调试 DPDK App。
- CPU Feature 识别:EAL 提供了 rte_cpu_get_feature() 接口来查询 CPU 的状态信息,包括 CPU 的特征信息,以此来决定 DPDK App 是否可以在该 CPU 上运行。
- PCI 设备的黑白名单:EAL 的 PCI 设备黑名单功能用于让 DPDK 忽略指定的 NIC 端口,使用 PCI 设备的地址描述符(Domain:Bus:Device:Function)来对端口进行标记。
- MISC 功能:包括锁和原子操作(x86 架构)。
内存分配
在 Linux 中,所有的物理内存都通过一个内存描述符表进行管理,且每个描述符指向一块连续的物理内存。通常,物理内存区块之间很可能是不连续的,所以 DPDK 内存区块分配器的作用就是保证分配到一块连续的物理内存。内存分配可以从指定的地址开始,也使用使用对齐的方式来进行分配(默认是 Cache Line 大小对齐),对齐一般是以 2 的次幂来进行的,并且不小于 64 字节对齐。
实际上,连续的物理内存分配是通过 hugetlbfs 内核文件系统来实现的。 EAL 提供了相应的接口(函数)用于申请指定名字的、连续的内存空间。 这个接口同时会将这段连续空间的地址返回给用户程序。内存申请是使用 rte_malloc() 接口来完成的,是 hugetlbfs 文件系统支持的调用。所以,内存区块可以是 2M 或是 1G 大小的内存页。
这些内存区块会使用一个名称进行唯一标识,通过名字访问一个内存区块会返回对应内存区块的描述符。rte_memzone 描述符也存在 DPDK 的配置结构体中,通过 rte_eal_get_configuration() 接口来获取。
注意,通常的,rte_malloc() 内存分配不应该在数据面处理逻辑中进行,因为相对于基于池(Mempool 库)的分配速度要慢,并且在分配和释放的过程中也使用了锁操作。所以 rte_malloc() 内存分配通常在控制逻辑的配置代码中使用。
rte_malloc() 可以传入一个对齐参数,数值必须是 2 的次幂,表示分配对齐参数乘以倍数的内存区域。在 NUMA 多处理器系统中,默认的,对 rte_malloc() 的调用会从调用该函数的 Core 所在的 Socket 上分配内存。此外,DPDK 也提供了另一组 API,允许在指定的 NUMA node 上显式的分配内存。
Malloc 库内部使用了两种数据结构类型:
- struct malloc_heap:用于在每个 CPU Socket 上跟踪可用内存空间。
- struct malloc_elem:Malloc 库内部用于追踪分配和释放空间的基本要素。
Structure: malloc_heap
- 数据结构 malloc_heap 用于管理每个 Socket 上的可用内存空间。
- NOTE:malloc_heap 并不会跟踪已使用的内存块。
在 Malloc 库内部,每个 NUMA node 都有一个堆结构,这允许我们在线程运行的 NUMA node 上为线程分配内存,但这也只是一种具有优先级的 “弱限制” 而已。
malloc_heap 结构及其关键字段和功能描述如下:
- lock:需要使用锁(Lock)来同步对堆的访问。例如:当使用链表来跟踪堆中的可用空间,我们就需要一个锁来防止多个线程同时处理该链表。
- free_head:指向这个堆的空闲结点链表中的第一个元素。
Structure: malloc_elem
数据结构 malloc_elem 用作各种内存块的通用头结构。它以三种不同的使用方式:
- 作为一个释放/申请内存的头部(正常使用)
- 作为内存的内部填充(Pad)头部
- 作为内存的结尾标记
结构中重要的字段和使用方法如下所述:
heap:这个指针指向了该内存块从哪个堆申请。它被用于正常的内存块,当他们被释放时,将新释放的块添加到堆的空闲列表中。
prev:这个指针用于指向紧跟这当前 memseg 的头元素。当释放一个内存块时,该指针用于引用上一个内存块,检查上一个块是否也是空闲。如果空闲,则将两个空闲块合并成一个大块。
next_free:这个指针用于将空闲块列表连接在一起。它用于正常的内存块,在 malloc() 接口中用于找到一个合适的空闲块申请出来,在 free() 函数中用于将内存块添加到空闲链表。
state:该字段可以有三个可能值:FREE、BUSY 或 PAD。前两个是指示正常内存块的分配状态,后者用于指示元素结构是在块开始填充结束时的虚拟结构,即,由于对齐限制,块内的数据开始的地方不在块本身的开始处。在这种情况下,Pad 头用于定位块的实际 malloc 元素头。对于结尾的结构,这个字段总是 BUSY,它确保没有元素在释放之后搜索超过 memseg 的结尾以供其它块合并到更大的空闲块。
pad:这个字段为块开始处的填充长度。在正常块头部情况下,它被添加到头结构的结尾,以给出数据区的开始地址,即在 malloc 上传回的地址。在填充虚拟头部时,存储相同的值,并从虚拟头部的地址中减去实际块头部的地址。
size:数据块的大小,包括头部本身。对于结尾结构,这个大小需要指定为 0,虽然从未使用。对于正在释放的正常内存块,使用此大小值替代 next 指针,以标识下一个块的存储位置,在 FREE 情况下,可以合并两个空闲块。
申请内存
在 EAL 初始化时,所有 memseg 都被设置为 malloc_heap 的一部分。此设置包括在 BUSY 状态的末尾放置一个结构体,如果启用了 CONFIG_RTE_MALLOC_DEBUG,则该结构体可能包含一个 sentinel 成员,并为每个 memseg 在开始处放置一个具有 FREE 的 malloc_elem 头部。FREE 元素被添加到 malloc_heap 的空闲列表中。
当 DPDK App 调用 rte_malloc 时,rte_malloc 首先为调用线程索引 lcore_config 结构,并确定该线程的 NUMA node。NUMA node 将作为参数传给 heap_alloc(),用于索引 malloc_heap 数组。参与索引参数还有:大小、类型、对齐方式和边界参数等。
函数 heap_alloc() 将扫描堆的空闲链表,尝试找到一个适用于所请求的大小、对齐方式和边界约束的内存块。当已经识别出合适的空闲元素时,将计算要返回给用户的指针,并且在该指针之前的内存的高速缓存行填充一个 malloc_elem 头部。由于对齐和边界约束,在元素的开头和结尾可能会有空闲的空间,这将导致了下列行为:
检查尾随空间。如果尾部空间足够大,例如:>128 字节,那么空闲元素将被分割。否则,仅仅忽略它(浪费空间)。
检查元素开始处的空间。如果起始处的空间很小,例如:<=128 字节,那么使用填充头,这部分空间被浪费。但是,如果空间很大,那么空闲元素将被分割。
从现有元素的末尾分配内存的优点是:不需要调整空闲链表,空闲链表中现有元素仅调整大小指针,并且后面的元素使用 prev 指针重定向到新创建的元素位置。
释放内存
释放内存,将指向数据区开始的指针传递给 free() 函数。从该指针中减去 malloc_elem 结构的大小,以获得内存块的元素头部。如果这个头部类型是 PAD,那么进一步减去 pad 头部的长度,以获得整个块的正确元素头。
从这个元素头中,我们获得指向块所分配的堆的指针及必须被释放的位置,以及指向前一个元素的指针, 并且通过 size 字段,可以计算下一个元素的指针。这意味着我们永远不会有两个相邻的 FREE 内存块,因为他们总是会被合并成一个大的块。
多线程支持
在 DPDK 的术语中,lcore 描述 EAL thread,本质是一个 Linux/FreeBSD pthread。EAL pthreads 由 EAL 创建和管理,用于执行 rte_eal_remote_launch() 回调的任务(Task)函数。EAL pthread 从逻辑上又可以分为 Master、Slave 两种类型,前者做管理相关的,而后者是真正处理业务的线程。
每个 EAL pthread 都有一个 _lcore_id 作为其 TLS(Thread Local Storage,线程本地存储)的唯一标识,并且由于 EAL pthreads 通常和 CPU 是 1:1 的绑定关系,所以 lcore_id 通常就是 CPU 的 ID。但是,当使用多个 pthread 时,EAL pthread 和物理 CPU 之间的绑定关系就未必总是 1:1 的了。EAL pthread 也有可能与一个 CPU 集合关联,这是的 lcore_id 将与 CPU ID 不同。
我们可以通过 DPDK App 的命令行参数 --lcores 进行设定:
-–lcores=’<lcore_set>[@cpu_set][,<lcore_set>[@cpu_set],...]
DPDK 为线程操作提供了两个接口 rte_thread_set_affinity() 和 rte_pthread_get_affinity()。当他们在线程上下文中被调用时,将获取或设置线程 TLS,包括 _cpuset 和 _socket_id:
- _cpuset:存储了与线程相关联的 CPU 位图(BitMap)。
- _socket_id:存储了 CPU set 所在的 NUMA node。如果 CPU set 中的 CPU 属于不同的 NUMA 节点,_socket_id 将设置为 SOCKET_ID_ANY。
DPDK App 通常会进行 CPU 绑核以避免切换开销。这显然是有利于性能提升的,但同时也会缺乏灵活性。
从性能的角度出发,我们应该从操作系统层面为 DPDK App 隔离出专用的 CPU,这是基于 Linux 的 cgroup 来实现的。以下是 cgroup 的简单示例:在同一个 CPU 上两个线程 t0、t1 用于执行数据包 I/O,并期望只有 50% 的 CPU 消耗在数据包 IO 操作上。
mkdir /sys/fs/cgroup/cpu/pkt_io mkdir /sys/fs/cgroup/cpuset/pkt_io echo $cpu > /sys/fs/cgroup/cpuset/cpuset.cpus echo $t0 > /sys/fs/cgroup/cpu/pkt_io/tasks echo $t0 > /sys/fs/cgroup/cpuset/pkt_io/tasks echo $t1 > /sys/fs/cgroup/cpu/pkt_io/tasks echo $t1 > /sys/fs/cgroup/cpuset/pkt_io/tasks cd /sys/fs/cgroup/cpu/pkt_io echo 100000 > pkt_io/cpu.cfs_period_us echo 50000 > pkt_io/cpu.cfs_quota_us
同时,也可以为 DPDK App 的每个线程或进程绑定一个核心。甚至还可以通过对 BIOS 的电源管理进行设置,让 CPU 处于高性能的工作效率,不过这也会带来 CPU 的损耗以及电费成本的提高。所以用户应该根据自己的实际需求来对 DPDK App 的性能进行优化。
从灵活性的角度出发,用户可以设置 DPDK App 的线程 CPU 亲和性指向一个 CPU 集合而不是绑定到某个单一的 CPU 了。例如:
用户态中断处理
主线程的用户态中断和警告处理:EAL 会创建一个主线程(Master Core)用于轮询 UIO 设备的描述文件以检测中断事件,通过 EAL 提供的函数可以为特定的中断事件注册/解注册一个回调函数(中断处理函数),回调函数在主线程中被异步调用。当然,EAL 也支持像物理网卡中断那样定时的调用回调函数。
需要注意的是,DPDK 实现了基于轮询方式的 PMD(Poll Mode Drivers)网卡驱动,内核态的 UIO Driver(e.g. igb_uio)屏蔽了网卡发出的中断信号,然后由用户态的 PMD Driver 采用主动轮询的方式。所以 DPDK App 的用户态中断处理区别于传统物理网卡的软硬中断。在 DPDK 的 PMD 中,主线程只会对链路状态的改变触发的中断进行处理,例如:网卡的打开和关闭。除了链路状态通知仍必须采用中断方式以外,均使用无中断方式直接操作 PCI 网卡设备的接收和发送队列。这与传统的内核协议栈每接受一个数据包都要触发一次中断完全不同(先抛开 NAPI 不谈)。
RX 中断事件:PMD 提供的报文收发程序并不只限制于单存的轮询机制。为了缓解小吞吐量场景中轮询模式对 CPU 资源的浪费,所以,PMD 还实现了 “暂停轮询并等待唤醒事件” 的设计,即 Interrupt DPDK(中断 DPDK)模式。
Interrupt DPDK 的原理和 NAPI 很像,就是 PMD 在没数据包需要处理时自动进入睡眠,改为中断通知,接收到收包中断信号后,激活主动轮询。这就是所谓的链路状态中断通知。并且 Interrupt DPDK 还可以和其他进程共享一个 CPU Core,但 DPDK 进程仍具有更高的调度优先级。
EAL 提供了这种事件驱动模式相关的 rte_eth_dev_rx_intr_* 接口,来实现控制、使能、关闭。当 PMD 不支持 RX 中断时,这些 API 会返回失败。Intr_conf.rxq 标识用于打开每个设备的 RX 中断。
以 Linux 上运行的 DPDK App 为例,其实现依赖于 epoll 技术。每个线程可以监控一个 epoll 实例,而在实例中可以添加所有需要的 wake-up 事件的文件描述符。事件文件描述符创建并根据 UIO/VFIO 的规范来映射到指定的中断向量上。EAL 初始化过程中,完成了中断向量和事件文件描述符之间的映射关系,同时为每个 PCI 设备初始化中断向量和队列之间的映射关系。这样一来,EAL 实际上并不知道在指定向量上发生的中断,这是由设备驱动来负责执行后面的映射的。
3.2 Ring库
环形缓冲区支持队列管理。rte_ring并不是具有无限大小的链表,它具有如下属性:
- 先进先出(FIFO)
- 最大大小固定,指针存储在表中
无锁实现
- 多消费者或单消费者出队操作
- 多生产者或单生产者入队操作
- 批量出队 - 如果成功,将指定数量的元素出队,否则什么也不做
- 批量入队 - 如果成功,将指定数量的元素入队,否则什么也不做
- 突发出队 - 如果指定的数目出队失败,则将最大可用数目对象出队
- 突发入队 - 如果指定的数目入队失败,则将最大可入队数目对象入队
- 单生产者入队:
单消费者出队
rte_ring_tailq保存rte_ring链表
创建ring后会将其插入共享内存链表rte_ring_tailq,以便主从进程都可以访问。
//定义队列头结构 struct rte_tailq_elem_head TAILQ_HEAD(rte_tailq_elem_head, rte_tailq_elem); //声明全局变量rte_tailq_elem_head,类型为struct rte_tailq_elem_head, //相当于是链表头,用来保存本进程注册的队列 /* local tailq list */ static struct rte_tailq_elem_head rte_tailq_elem_head = TAILQ_HEAD_INITIALIZER(rte_tailq_elem_head); //调用EAL_REGISTER_TAILQ在main函数前注册rte_ring_tailq到全局变量rte_tailq_elem_head。 #define RTE_TAILQ_RING_NAME "RTE_RING" static struct rte_tailq_elem rte_ring_tailq = { .name = RTE_TAILQ_RING_NAME, }; EAL_REGISTER_TAILQ(rte_ring_tailq)
调用rte_eal_tailq_update遍历链表rte_tailq_elem_head上的节点,将节点中的head指向 struct rte_mem_ring->tailq_head[]数组中的一个tailq_head,此head又作为另一个链表头。比如注册的rte_ring_tailq节点,其head专门用来保存创建的rte_ring(将rte_ring作为struct rte_tailq_entry的data,将struct rte_tailq_entry插入head)。前面说过struct rte_mem_ring->tailq_head存放在共享内存中,主从进程都可以访问,这样对于rte_ring来说,主从进程都可以创建/访问ring。 相关的数据结构如下图所示:
创建ring
调用函数rte_ring_create创建ring,它会申请一块memzone的内存,大小为struct rte_ring结构加上count个void类型指针,内存结构如下:
然后将ring中生产者和消费者的头尾指向0,最后将ring作为struct rte_tailq_entry的data插入共享内存链表,这样主从进程都可以访问此ring。
/** * An RTE ring structure. * * The producer and the consumer have a head and a tail index. The particularity * of these index is that they are not between 0 and size(ring). These indexes * are between 0 and 2^32, and we mask their value when we access the ring[] * field. Thanks to this assumption, we can do subtractions between 2 index * values in a modulo-32bit base: that's why the overflow of the indexes is not * a problem. */ struct rte_ring { /* * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI * compatibility requirements, it could be changed to RTE_RING_NAMESIZE * next time the ABI changes */ char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */ //flags有如下三个值: //RING_F_SP_ENQ创建单生产者, //RING_F_SC_DEQ创建单消费者, //RING_F_EXACT_SZ int flags; /**< Flags supplied at creation. */ //memzone内存管理的底层结构,用来分配内存 const struct rte_memzone *memzone; /**< Memzone, if any, containing the rte_ring */ //size为ring大小,值和RING_F_EXACT_SZ有关,如果指定了flag //RING_F_EXACT_SZ,则size为rte_ring_create的参数count的 //向上取2次方,比如count为15,则size就为16。如果没有指定 //flag,则count必须是2的次方,此时size等于count uint32_t size; /**< Size of ring. */ //mask值为size-1 uint32_t mask; /**< Mask (size-1) of ring. */ //capacity的值也和RING_F_EXACT_SZ有关,如果指定了, //则capacity为rte_ring_create的参数count,如果没指定, //则capacity为size-1 uint32_t capacity; /**< Usable size of ring */ //生产者位置,包含head和tail,head代表着下一次生产时的起 //始位置。tail代表消费者可以消费的位置界限,到达tail后就无 //法继续消费,通常情况下生产完成后tail = head,意味着刚生 //产的元素皆可以被消费 /** Ring producer status. */ struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN); // 消费者位置,也包含head和tail,head代表着下一次消费时的 //起始位置。tail代表生产者可以生产的位置界限,到达tail后就 //无法继续生产,通常情况下消费完成后,tail =head,意味着 //刚消费的位置皆可以被生产 /** Ring consumer status. */ struct rte_ring_headtail cons __rte_aligned(CONS_ALIGN); };
下面看一下在函数rte_ring_create中ring是如何被创建的:
/* create the ring */ struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags) { char mz_name[RTE_MEMZONE_NAMESIZE]; struct rte_ring *r; struct rte_tailq_entry *te; const struct rte_memzone *mz; ssize_t ring_size; int mz_flags = 0; struct rte_ring_list* ring_list = NULL; const unsigned int requested_count = count; int ret; //(tailq_entry)->tailq_head 的类型应该是 struct rte_tailq_entry_head, //但是返回的却是 struct rte_ring_list,因为 rte_tailq_entry_head 和 rte_ring_list 定义都是一样的, //可以认为是等同的。 #define RTE_TAILQ_CAST(tailq_entry, struct_name) \ (struct struct_name *)&(tailq_entry)->tailq_head ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); /* for an exact size ring, round up from count to a power of two */ if (flags & RING_F_EXACT_SZ) count = rte_align32pow2(count + 1); //获取需要的内存大小,包括结构体 struct rte_ring 和 count 个指针 ring_size = rte_ring_get_memsize(count); ssize_t sz; sz = sizeof(struct rte_ring) + count * sizeof(void *); sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); #define RTE_RING_MZ_PREFIX "RG_" snprintf(mz_name, sizeof(mz_name), "%s%s", RTE_RING_MZ_PREFIX, name); //分配 struct rte_tailq_entry,用来将申请的ring挂到共享链表ring_list中 te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0); rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); //申请memzone, /* reserve a memory zone for this ring. If we can't get rte_config or * we are secondary process, the memzone_reserve function will set * rte_errno for us appropriately - hence no check in this this function */ mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id, mz_flags, __alignof__(*r)); if (mz != NULL) { //memzone的的addr指向分配的内存,ring也从此内存开始 r = mz->addr; /* no need to check return value here, we already checked the * arguments above */ rte_ring_init(r, name, requested_count, flags); //将ring保存到链表entry中 te->data = (void *) r; r->memzone = mz; //将链表entry插入链表ring_list TAILQ_INSERT_TAIL(ring_list, te, next); } else { r = NULL; RTE_LOG(ERR, RING, "Cannot reserve memory\n"); rte_free(te); } rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); return r; } int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, unsigned flags) { int ret; /* compilation-time checks */ RTE_BUILD_BUG_ON((sizeof(struct rte_ring) & RTE_CACHE_LINE_MASK) != 0); RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) & RTE_CACHE_LINE_MASK) != 0); RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); /* init the ring structure */ memset(r, 0, sizeof(*r)); ret = snprintf(r->name, sizeof(r->name), "%s", name); if (ret < 0 || ret >= (int)sizeof(r->name)) return -ENAMETOOLONG; r->flags = flags; r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP; r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC; if (flags & RING_F_EXACT_SZ) { r->size = rte_align32pow2(count + 1); r->mask = r->size - 1; r->capacity = count; } else { if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) { RTE_LOG(ERR, RING, "Requested size is invalid, must be power of 2, and not exceed the size limit %u\n", RTE_RING_SZ_MASK); return -EINVAL; } r->size = count; r->mask = count - 1; r->capacity = r->mask; } //初始时,生产者和消费者的首尾都为0 r->prod.head = r->cons.head = 0; r->prod.tail = r->cons.tail = 0; return 0; }
入队操作
DPDK提供了如下几个api用来执行入队操作,它们最终都会调用__rte_ring_do_enqueue来实现,所以重点分析函数__rte_ring_do_enqueue。
//多生产者批量入队。入队个数n必须全部成功,否则入队失败。调用者明确知道是多生产者 rte_ring_mp_enqueue_bulk //单生产者批量入队。入队个数n必须全部成功,否则入队失败。调用者明确知道是单生产者 rte_ring_sp_enqueue_bulk //批量入队。入队个数n必须全部成功,否则入队失败。调用者不用关心是不是单生产者 rte_ring_enqueue_bulk //多生产者批量入队。入队个数n不一定全部成功。调用者明确知道是多生产者 rte_ring_mp_enqueue_burst //单生产者批量入队。入队个数n不一定全部成功。调用者明确知道是单生产者 rte_ring_sp_enqueue_burst //批量入队。入队个数n不一定全部成功。调用者不用关心是不是单生产者 rte_ring_enqueue_burst
__rte_ring_do_enqueue主要做了三个事情:
- a. 移动生产者head,此处在多生产者下可能会有冲突,需要使用cas操作循环检测,只有自己能移动head时才行。
- b. 执行入队操作,将obj插入ring,从老的head开始,直到新head结束。
- c. 更新生产者tail,只有这样消费者才能看到最新的消费对象。
其参数r指定了目标ring。 参数obj_table指定了入队对象。 参数n指定了入队对象个数。 参数behavior指定了入队行为,有两个值RTE_RING_QUEUE_FIXED和RTE_RING_QUEUE_VARIABLE,前者表示入队对象必须一次性全部成功,后者表示尽可能多的入队。 参数is_sp指定了是否为单生产者模式,默认为多生产者模式。
static __rte_always_inline unsigned int __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, int is_sp, unsigned int *free_space) { uint32_t prod_head, prod_next; uint32_t free_entries; //先移动生产者的头指针,prod_head保存移动前的head,prod_next保存移动后的head n = __rte_ring_move_prod_head(r, is_sp, n, behavior, &prod_head, &prod_next, &free_entries); if (n == 0) goto end; //&r[1]指向存放对象的内存。 //从prod_head开始,将n个对象obj_table插入ring的prod_head位置 ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); rte_smp_wmb(); //更新生产者tail update_tail(&r->prod, prod_head, prod_next, is_sp); end: if (free_space != NULL) *free_space = free_entries - n; return n; }
__rte_ring_move_prod_head用来使用cas操作更新生产者head。
static __rte_always_inline unsigned int __rte_ring_move_prod_head(struct rte_ring *r, int is_sp, unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *free_entries) { const uint32_t capacity = r->capacity; unsigned int max = n; int success; do { /* Reset n to the initial burst count */ n = max; //获取生产者当前的head位置 *old_head = r->prod.head; /* add rmb barrier to avoid load/load reorder in weak * memory model. It is noop on x86 */ rte_smp_rmb(); const uint32_t cons_tail = r->cons.tail; /* * The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * *old_head > cons_tail). So 'free_entries' is always between 0 * and capacity (which is < size). */ //获取空闲 entry 个数 *free_entries = (capacity + cons_tail - *old_head); //如果入队的对象个数大于空闲entry个数,则如果入队要求固定大小,则入队失败,返回0,否则 //只入队空闲entry个数的对象 /* check that we have enough room in ring */ if (unlikely(n > *free_entries)) n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *free_entries; if (n == 0) return 0; //当前head位置加上入队对象个数获取新的生产者head *new_head = *old_head + n; //如果是单生产者,直接更新生产者head,并返回1 if (is_sp) r->prod.head = *new_head, success = 1; else //如果是多生产者,需要借助函数rte_atomic32_cmpset,比较old_head和r->prod.head是否相同, //如果相同,则将r->prod.head更新为new_head,并返回1,退出循环, //如果不相同说明有其他生产者更新head了,返回0,继续循环。 success = rte_atomic32_cmpset(&r->prod.head, *old_head, *new_head); } while (unlikely(success == 0)); return n; }
ENQUEUE_PTRS定义了入队操作。
/* the actual enqueue of pointers on the ring. * Placed here since identical code needed in both * single and multi producer enqueue functions */ #define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \ unsigned int i; \ const uint32_t size = (r)->size; \ uint32_t idx = prod_head & (r)->mask; \ obj_type *ring = (obj_type *)ring_start; \ //idx+n 大于 size,说明入队n个对象后,ring还没满,还没翻转 if (likely(idx + n < size)) { \ //一次循环入队四个对象 for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ ring[idx] = obj_table[i]; \ ring[idx+1] = obj_table[i+1]; \ ring[idx+2] = obj_table[i+2]; \ ring[idx+3] = obj_table[i+3]; \ } \ //还有剩余不满四个对象,则在switch里入队 switch (n & 0x3) { \ case 3: \ ring[idx++] = obj_table[i++]; /* fallthrough */ \ case 2: \ ring[idx++] = obj_table[i++]; /* fallthrough */ \ case 1: \ ring[idx++] = obj_table[i++]; \ } \ } else { \ //入队n个对象,会导致ring满,发生翻转, //则先入队idx到size的位置, for (i = 0; idx < size; i++, idx++)\ ring[idx] = obj_table[i]; \ //再翻转回到ring起始位置,入队剩余的对象 for (idx = 0; i < n; i++, idx++) \ ring[idx] = obj_table[i]; \ } \ } while (0)
最后更新生产者tail。
static __rte_always_inline void update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single) { /* * If there are other enqueues/dequeues in progress that preceded us, * we need to wait for them to complete */ if (!single) //多生产者时,必须等到其他生产者入队成功,再更新自己的tail while (unlikely(ht->tail != old_val)) rte_pause(); ht->tail = new_val; }
出队操作
DPDK提供了如下几个api用来执行出队操作,它们最终都会调用__rte_ring_do_dequeue来实现,所以重点分析函数__rte_ring_do_dequeue。
//多消费者批量出队。出队个数n必须全部成功,否则出队失败。调用者明确知道是多消费者 rte_ring_mc_dequeue_bulk //单消费者批量出队。出队个数n必须全部成功,否则出队失败。调用者明确知道是单消费者 rte_ring_sc_dequeue_bulk //批量出队。出队个数n必须全部成功,否则出队失败。调用者不用关心是不是单消费者 rte_ring_dequeue_bulk //多消费者批量出队。出队个数n不一定全部成功。调用者明确知道是多消费者 rte_ring_mc_dequeue_burst //单消费者批量出队。出队个数n不一定全部成功。调用者明确知道是单消费者 rte_ring_sc_dequeue_burst //批量出队。出队个数n不一定全部成功。调用者不用关心是不是单消费者 rte_ring_dequeue_burst
__rte_ring_do_dequeue主要做了三个事情:
a. 移动消费者head,此处在多消费者下可能会有冲突,需要使用cas操作循环检测,只有自己能移动head时才行。 b. 执行出队操作,将ring中的obj插入obj_table,从老的head开始,直到新head结束。 c. 更新消费者tail,只有这样生成者才能进行生产。
其参数r指定了目标ring。 参数obj_table指定了出队对象出队后存放位置。 参数n指定了入队对象个数。 参数behavior指定了出队行为,有两个值RTE_RING_QUEUE_FIXED和RTE_RING_QUEUE_VARIABLE,前者表示出队对象必须一次性全部成功,后者表示尽可能多的出队。 参数is_sp指定了是否为单消费者模式,默认为多消费者模式。
static __rte_always_inline unsigned int __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, unsigned int n, enum rte_ring_queue_behavior behavior, int is_sc, unsigned int *available) { uint32_t cons_head, cons_next; uint32_t entries; //先移动消费者head,成功后,cons_head为老的head,cons_next为新的head, //两者之间的部分为此次可消费的对象 n = __rte_ring_move_cons_head(r, is_sc, n, behavior, &cons_head, &cons_next, &entries); if (n == 0) goto end; //执行出队操作,从老的cons_head开始出队n个对象 DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); rte_smp_rmb(); //更新消费者tail,和前面更新生产者head代码相同 update_tail(&r->cons, cons_head, cons_next, is_sc); end: if (available != NULL) *available = entries - n; return n; }
__rte_ring_move_cons_head用来使用cas操作更新消费者head。
static __rte_always_inline unsigned int __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { unsigned int max = n; int success; /* move cons.head atomically */ do { /* Restore n as it may change every loop */ n = max; //取出当前head位置 *old_head = r->cons.head; /* add rmb barrier to avoid load/load reorder in weak * memory model. It is noop on x86 */ rte_smp_rmb(); //生产者tail减去消费者head为可消费的对象个数。 //因为head和tail都是无符号32位类型,即使生产者tail比消费者head //小,也能正确得出结果,不用担心溢出。 const uint32_t prod_tail = r->prod.tail; /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * cons_head > prod_tail). So 'entries' is always between 0 * and size(ring)-1. */ *entries = (prod_tail - *old_head); //要求出队对象个数大于实际可消费对象个数 /* Set the actual entries for dequeue */ if (n > *entries) //此时如果behavior为RTE_RING_QUEUE_FIXED,表示必须满足n,满足不了就一个都不出队,返回0, //如果不为RTE_RING_QUEUE_FIXED,则尽可能多的出队 n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; if (unlikely(n == 0)) return 0; //当前head加上n即为新的消费者head *new_head = *old_head + n; if (is_sc) //如果单消费者,直接更新head即可,返回1 r->cons.head = *new_head, success = 1; else //多消费者,需要借用rte_atomic32_cmpset更新head success = rte_atomic32_cmpset(&r->cons.head, *old_head, *new_head); } while (unlikely(success == 0)); return n; }
ring是否满或者是否为空
函数rte_ring_full用来判断ring是否满 static inline int rte_ring_full(const struct rte_ring *r) { return rte_ring_free_count(r) == 0; } static inline unsigned rte_ring_free_count(const struct rte_ring *r) { return r->capacity - rte_ring_count(r); }
函数rte_ring_empty用来判断ring是否为空
static inline int rte_ring_empty(const struct rte_ring *r) { return rte_ring_count(r) == 0; }
判断ring是否为空或者是否满都需要调用rte_ring_count获取当前ring中已使用的个数。
static inline unsigned rte_ring_count(const struct rte_ring *r) { uint32_t prod_tail = r->prod.tail; uint32_t cons_tail = r->cons.tail; uint32_t count = (prod_tail - cons_tail) & r->mask; return (count > r->capacity) ? r->capacity : count; }