PF_RING实现分析(1)

简介: 内核版本:Linux 2.6.30.9PF_RING版本:4.1.0最近看了一个PF_RING的实现,看了个大概,发上来大家讨论讨论,共同学习。一、什么是PF_RINGPF_RING是一个第三方的内核数据包捕获接口,类似于libpcap。二、为什么需要PF_RING一切为了效率,按照其官方网站上的测试数据,在Linux平台之上,其效率至少高于libpcap 50% - 60%,甚至是一倍。更好的是,PF_RING提供了一个修改版本的libpcap,使之建立在PF_RING接口之上。这样,原来使用libpcap的程序,就可以自然过渡了。

内核版本:Linux 2.6.30.9

PF_RING版本:4.1.0

最近看了一个PF_RING的实现,看了个大概,发上来大家讨论讨论,共同学习。

一、什么是PF_RING
PF_RING是一个第三方的内核数据包捕获接口,类似于libpcap。

二、为什么需要PF_RING
一切为了效率,按照其官方网站上的测试数据,在Linux平台之上,其效率至少高于libpcap 50% - 60%,甚至是一倍。更好的是,PF_RING提供了一个修改版本的libpcap,使之建立在PF_RING接口之上。这样,原来使用libpcap的程序,就可以自然过渡了。

三、声明
1、这不是“零拷贝”,研究“零拷贝”的估计要失望,如果继续看下去的话;
2、这不是包截获接口,如果需要拦截、修改内核数据包,请转向Netfilter;
3、本文只分析了PF_RING最基础的部份。关于DNA、TNAPI,BPF等内容不包含在内。

四、源码的获取
svn co svn. ntop. org/svn/ntop/trunk/PF_RING/

最近好像全流行svn了。

五、编译和使用
接口分为两部份,一个是内核模块,一个是用户态的库
cd my_pf_ring_goes_here
cd kernel
make
sudo insmod ./pf_ring.ko
cd ../userland
make

在源码目录中,关于用户态的库有使用的现成的例子,很容易依葫芦画瓢。后文也会提到用户态库的实现的简单分析,可以两相比照,很容易上手。而且源码目录中有一个PDF文档,有详细的API介绍,建议使用前阅读。

六、实现分析初步
1、核心思路
A、在内核队列层注册Hook,获取数据帧。
B、在内核创建一个环形队列(这也是叫RING的原因),用于存储数据,并使用mmap映射到用户空间。这样,避免用户态的系统调用,也是提高性能的关键所在。
C、创建了一个新的套接字类型PF_RING,用户态通过它与内核通信。

2、模块初始化
模块源码只有一个文件,在目录树kernel/pf_ring.c,嗯,还有一个头文件,在kernel/linux下

static int __init ring_init(void)
{
int i, rc;

printk("[PF_RING] Welcome to PF_RING %s ($Revision: 4012 $)\n"

     "(C) 2004-09 L.Deri <[email]deri@ntop.org[/email]>\n", RING_VERSION);
    
    //注册名为PF_RING的新协议

if((rc = proto_register(&ring_proto, )) != )

return(rc);

ring_proto的定义为

if(LINUX_VERSION_CODE > KERNEL_VERSION(2,6,11))

static struct proto ring_proto = {
.name = "PF_RING",
.owner = THIS_MODULE,
.obj_size = sizeof(struct ring_sock),
};

endif

初始化四个链表,它们的作用,后文会分析到:

//初始化四个链表
INIT_LIST_HEAD(&ring_table); / List of all ring sockets. /
INIT_LIST_HEAD(&ring_cluster_list); / List of all clusters /
INIT_LIST_HEAD(&ring_aware_device_list); / List of all devices on which PF_RING has been registered /
INIT_LIST_HEAD(&ring_dna_devices_list); / List of all dna (direct nic access) devices /
device_ring_list是一个指针数组,它的每一个元素对应一个网络设备,后文也会分析它的使用:

/*
For each device, pf_ring keeps a list of the number of
available ring socket slots. So that a caller knows in advance whether
there are slots available (for rings bound to such device)
that can potentially host the packet
*/
for (i = ; i < MAX_NUM_DEVICES; i++)

INIT_LIST_HEAD(&device_ring_list[i]);

//为新协议注册sock
sock_register(&ring_family_ops);
static struct net_proto_family ring_family_ops = {
.family = PF_RING,
.create = ring_create,
.owner = THIS_MODULE,
};
这样,当用户空间创建PF_RING时,例如,

fd = socket(PF_RING, SOCK_RAW, htons(ETH_P_ALL));
ring_create将会被调用

//注册通知链表
register_netdevice_notifier(&ring_netdev_notifier);

/ 工作模式语法检查 /
if(transparent_mode > driver2pf_ring_non_transparent)

transparent_mode = standard_linux_path;

PF_RING一共有三种工作模式:

typedef enum {
standard_linux_path = , / Business as usual /
driver2pf_ring_transparent = 1, / Packets are still delivered to the kernel /
driver2pf_ring_non_transparent = 2 / Packets not delivered to the kernel /
} direct2pf_ring;
第一种最简单,这里仅分析第一种

//输出工作信息参数
printk("[PF_RING] Ring slots %d\n", num_slots);
printk("[PF_RING] Slot version %d\n",

     RING_FLOWSLOT_VERSION);

printk("[PF_RING] Capture TX %s\n",

     enable_tx_capture ? "Yes [RX+TX]" : "No [RX only]");

printk("[PF_RING] Transparent Mode %d\n",

     transparent_mode);

printk("[PF_RING] IP Defragment %s\n",

     enable_ip_defrag ? "Yes" : "No");

printk("[PF_RING] Initialized correctly\n");
num_slots为槽位总数,系统采用数组来实现双向环形队列,它也就代表数组的最大元素。
版本信息:不用多说了。不过我的版本在2.6.18及以下都没有编译成功,后来使用2.6.30.9搞定之。
enable_tx_capture:是否启用发送时的数据捕获,对于大多数应用而言,都是在接收时处理。
enable_ip_defrag:为用户提供一个接口,是否在捕获最重组IP分片。

//创建/proc目录
ring_proc_init();

//注册设备句柄
register_device_handler();

pfring_enabled = 1; //工作标志
return ;
}
register_device_handler注册了一个协议,用于数据包的获取:

/ Protocol hook /
static struct packet_type prot_hook;

void register_device_handler(void) {

    //只有在第一种模式下,才用这种方式接收数据

if(transparent_mode != standard_linux_path) return;

prot_hook.func = packet_rcv;
prot_hook.type = htons(ETH_P_ALL);
dev_add_pack(&prot_hook);
}

void register_device_handler(void) {
if(transparent_mode != standard_linux_path) return;

prot_hook.func = packet_rcv;
prot_hook.type = htons(ETH_P_ALL);
dev_add_pack(&prot_hook);
}
2、创建套接字
Linux的套按字的内核接口,使用了两个重要的数据结构:
struct socket和struct sock,这本来并没有什么,不过令人常常迷惑的是,前者常常被缩写为sock,即:
struct socket *sock;
这样,“sock”就容易造成混淆了。还好,后者常常被缩写为sk……
我这里写sock指前者,sk指后者,如果不小心写混了,请参考上下文区分 。

关于这两个结构的含义,使用等等,可以参考相关资料以获取详细信息,如《Linux情景分析》。我的个人网站www.skynet.org.cn上也分析了Linux socket的实现。可以参考。这里关于socket的进一步信息,就不详细分析了。

这里的创建套接字,内核已经在系统调用过程中,准备好了sock,主要就是分析sk,并为sk指定一系列的操作函数,如bind、mmap、poll等等。
如前所述,套接字的创建,是通过调用ring_create函数来完成的:

static int ring_create(

if(LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,24))

                   struct net *net,

endif

                   struct socket *sock, int protocol)

{
struct sock *sk;
struct ring_opt *pfr;
int err;

if defined(RING_DEBUG)

printk("[PF_RING] ring_create()\n");

endif

/ 权限验证 ? /
if(!capable(CAP_NET_ADMIN))

return -EPERM;

//协议簇验证
if(sock->type != SOCK_RAW)

return -ESOCKTNOSUPPORT;

//协议验证
if(protocol != htons(ETH_P_ALL))

return -EPROTONOSUPPORT;

err = -ENOMEM;

// 分配sk
// options are.

if(LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,11))

sk = sk_alloc(PF_RING, GFP_KERNEL, 1, NULL);

else

if(LINUX_VERSION_CODE < KERNEL_VERSION(2,6,24))

// BD: API changed in 2.6.12, ref:
// [url]http://svn.clkao.org/svnweb/linux/revision/?rev=28201[/url]
sk = sk_alloc(PF_RING, GFP_ATOMIC, &ring_proto, 1);

else

sk = sk_alloc(net, PF_INET, GFP_KERNEL, &ring_proto);

endif

endif

//分配失败
if(sk == NULL)

goto out;

//这里很重要,设定sock的ops,即对应用户态的bind、connect诸如此类操作的动作
sock->ops = &ring_ops;

//初始化sock结构(即sk)各成员,并设定与套接字socket(即sock)的关联
sock_init_data(sock, sk);

if(LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,11))

sk_set_owner(sk, THIS_MODULE);

endif

err = -ENOMEM;

define ring_sk_datatype(__sk) ((struct ring_opt *)__sk)

define ring_sk(__sk) ((__sk)->sk_protinfo)

//作者喜欢用小写的宏
//这里分配一个struct ring_opt结构,这个结构比较重要,富贵论坛记录了ring的选项信息。
在sk中,使用sk_protinfo成员指向之,这样就建立了sock->sk->ring_opt的关联。可以通过套接字很容易获取Ring的信息。
ring_sk(sk) = ring_sk_datatype(kmalloc(sizeof(*pfr), GFP_KERNEL));
//分配失败
if(!(pfr = ring_sk(sk))) {

sk_free(sk);
goto out;

}

//初始化各成员
memset(pfr, 0, sizeof(*pfr));
//激活标志
pfr->ring_active = 0; / We activate as soon as somebody waits for packets /
//通道ID
pfr->channel_id = RING_ANY_CHANNEL;
//RING的每个槽位的桶的大小,其用来存储捕获的数据帧,这个值,用户态也可以使用setsocketopt来调整
pfr->bucket_len = DEFAULT_BUCKET_LEN;
//过滤器hash桶
pfr->handle_hash_rule = handle_filtering_hash_bucket;
//初始化等待队列
init_waitqueue_head(&pfr->ring_slots_waitqueue);
//初始化RING的锁
rwlock_init(&pfr->ring_index_lock);
rwlock_init(&pfr->ring_rules_lock);
//初始化使用计数器
atomic_set(&pfr->num_ring_users, 0);
INIT_LIST_HEAD(&pfr->rules);
//设定协议簇
sk->sk_family = PF_RING;
//设定sk的destuct函数
sk->sk_destruct = ring_sock_destruct;

//sk入队
ring_insert(sk);

if defined(RING_DEBUG)

printk("[PF_RING] ring_create() - created\n");

endif

return(0);
out:
return err;
在模块初始化中,初始化过四个链表。其中一个是ring_table,ring_insert将刚刚创建的套接字插入其中。其封装引进了一个struct ring_element 结构:

/*

  • ring_insert()

*

  • store the sk in a new element and add it
  • to the head of the list.

*/
static inline void ring_insert(struct sock *sk)
{
struct ring_element *next;
struct ring_opt *pfr;

if defined(RING_DEBUG)

printk("[PF_RING] ring_insert()\n");

endif

//分配element
next = kmalloc(sizeof(struct ring_element), GFP_ATOMIC);
if(next != NULL) {

//记录sk
next->sk = sk;
write_lock_bh(&ring_mgmt_lock);
//入队
list_add(&next->list, &ring_table);
write_unlock_bh(&ring_mgmt_lock);

} else {

if(net_ratelimit())
  printk("[PF_RING] net_ratelimit() failure\n");

}

//累计使用计数器
ring_table_size++;
//ring_proc_add(ring_sk(sk));
//记录进程PID
pfr = (struct ring_opt *)ring_sk(sk);
pfr->ring_pid = current->pid;
}
复制代码

3 、分配队列空间
用户态在创建了套接字后,接下来就调用bind函数,绑定套接字,而PF_RING实际做的就是为RING分配相应的空间。也就是说,一个套接字,都有一个与之对应的RING。这样,有多个进程同时使用PF_RING,也没有问题:

            sa.sa_family   = PF_RING;
            snprintf(sa.sa_data, sizeof(sa.sa_data), "%s", device_name);
            rc = bind(ring->fd, (struct sockaddr *)&sa, sizeof(sa));

复制代码

因为前一步创建套接字时,为sk指定了其ops:

static struct proto_ops ring_ops = {
.family = PF_RING,
.owner = THIS_MODULE,

/ Operations that make no sense on ring sockets. /
.connect = sock_no_connect,
.socketpair = sock_no_socketpair,
.accept = sock_no_accept,
.getname = sock_no_getname,
.listen = sock_no_listen,
.shutdown = sock_no_shutdown,
.sendpage = sock_no_sendpage,
.sendmsg = sock_no_sendmsg,

/ Now the operations that really occur. /
.release = ring_release,
.bind = ring_bind,
.mmap = ring_mmap,
.poll = ring_poll,
.setsockopt = ring_setsockopt,
.getsockopt = ring_getsockopt,
.ioctl = ring_ioctl,
.recvmsg = ring_recvmsg,
};
复制代码

这样,当bing系统调用触发时,ring_bind函数将被调用:

  • Bind to a device */

static int ring_bind(struct socket sock, struct sockaddr sa, int addr_len)
{
struct sock *sk = sock->sk;
struct net_device *dev = NULL;

if defined(RING_DEBUG)

printk("[PF_RING] ring_bind() called\n");

endif

/*

  • Check legality

*/
if(addr_len != sizeof(struct sockaddr))

return -EINVAL;

if(sa->sa_family != PF_RING)

return -EINVAL;

if(sa->sa_data == NULL)

return -EINVAL;

/ Safety check: add trailing zero if missing /
sa->sa_data[sizeof(sa->sa_data) - 1] = '\0';

if defined(RING_DEBUG)

printk("[PF_RING] searching device %s\n", sa->sa_data);

endif

if((dev = __dev_get_by_name(

if(LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,24))

                           &init_net,

endif

                           sa->sa_data)) == NULL) {

if defined(RING_DEBUG)

printk("[PF_RING] search failed\n");

endif

return(-EINVAL);

} else

return(packet_ring_bind(sk, dev));

}
复制代码

在做了一些必要的语法检查后,函数转向packet_ring_bind:

/*

  • We create a ring for this socket and bind it to the specified device

*/
static int packet_ring_bind(struct sock sk, struct net_device dev)
{
u_int the_slot_len;
u_int32_t tot_mem;
struct ring_opt *pfr = ring_sk(sk);
// struct page page, page_end;

if(!dev)

return(-1);

if defined(RING_DEBUG)

printk("[PF_RING] packet_ring_bind(%s) called\n", dev->name);

endif

/ *


    • *
    • FlowSlotInfo *
    • *
  • * <-+
    • FlowSlot * |
  • * |
    • FlowSlot * |
  • * +- num_slots
    • FlowSlot * |
  • * |
    • FlowSlot * |
  • * <-+

*

  • * /

     //计算每一个槽位所需的内存空间

the_slot_len = sizeof(u_char) / flowSlot.slot_state /

ifdef RING_MAGIC

+ sizeof(u_char)

endif

+ sizeof(struct pfring_pkthdr)
+ pfr->bucket_len /* flowSlot.bucket */ ;

/*

    对于槽位空间的计算,有意思的是

typedef struct flowSlot {

ifdef RING_MAGIC

u_char magic; / It must alwasy be zero /

endif

u_char slot_state; / 0=empty, 1=full /
u_char bucket; / bucket[bucketLen] /
} FlowSlot;
对照结构定义和上面的计算公式:
1、作者好像把magic和slog_state的顺序给搞反了,不过还好,它们都是u_char,对结果不影响
2、bucket,桶的大小,这个桶就是拿来装要捕获的数据包了,虽然它在结构中定义是一个成员,事实上,
它由两个部份组成,一个是包的首部信息,这个结构的定义同libpcap很接近。另一个才是包的空间。
*/

    //总共的环形队列内存所需空间,包含一个队列控制信息FlowSlotInfo和若干个(由变量num_slots决定)槽位空间

tot_mem = sizeof(FlowSlotInfo) + num_slots * the_slot_len;

//确保按整数页分配,mmap也要求这样
if(tot_mem % PAGE_SIZE)

tot_mem += PAGE_SIZE - (tot_mem % PAGE_SIZE);

    //分配内存空间

pfr->ring_memory = rvmalloc(tot_mem);

if(pfr->ring_memory != NULL) {

printk("[PF_RING] successfully allocated %lu bytes at 0x%08lx\n",
       (unsigned long)tot_mem, (unsigned long)pfr->ring_memory);

} else {

printk("[PF_RING] ERROR: not enough memory for ring\n");
return(-1);

}

// memset(pfr->ring_memory, 0, tot_mem); // rvmalloc does the memset already
//初始化各成员

    //内存指定,因为分配的内存开始部份是sizeof(FlowSlotInfo),所以可以做这样的强制转换,很容易互相取值

pfr->slots_info = (FlowSlotInfo *) pfr->ring_memory;
//跳过控制信息,指向槽位指针.事实上,它就是一个一维数组了,可以计算出合适的索引值,取到数组(RING)中的任意槽位值
pfr->ring_slots = (char *)(pfr->ring_memory + sizeof(FlowSlotInfo));

//版本信息
pfr->slots_info->version = RING_FLOWSLOT_VERSION;
//登记单个槽的大小
pfr->slots_info->slot_len = the_slot_len;
//登记bucket大小,从前面特别注释的bucket的分配看,bucket_len这个大小并不代表bucket成员的实际大小——它不包含struct pfring_pkthdr
pfr->slots_info->data_len = pfr->bucket_len;
//登记实际分配到的槽位数量,这里不用num_slots,难道是怕rvmalloc偷吃?
pfr->slots_info->tot_slots =

(tot_mem - sizeof(FlowSlotInfo)) / the_slot_len;

//登记实际分配的内存总数
pfr->slots_info->tot_mem = tot_mem;
//采样速率??
pfr->slots_info->sample_rate = 1;

printk("[PF_RING] allocated %d slots slot_len=%d\n",

     pfr->slots_info->tot_slots, pfr->slots_info->slot_len,
     pfr->slots_info->tot_mem);

ifdef RING_MAGIC

{

int i;

for (i = 0; i < pfr->slots_info->tot_slots; i++) {
  unsigned long idx = i * pfr->slots_info->slot_len;
  FlowSlot *slot = (FlowSlot *) & pfr->ring_slots[idx];
  slot->magic = RING_MAGIC_VALUE;
  slot->slot_state = 0;
}

}

endif

//这些控制变量可以在环的入队操作中看到它们的作用
pfr->sample_rate = 1; / No sampling /
pfr->insert_page_id = 1, pfr->insert_slot_id = 0;
pfr->rules_default_accept_policy = 1, pfr->num_filtering_rules = 0;
ring_proc_add(ring_sk(sk), dev);

//记录与之相应的设备信息,例如,如果在eth0上打开了5 个PF_RING, bind
//被调用5次,分配了5个环形队列空间。eth0上随之分配5个elem,它们指向与
//之对应的ring,然后根据设备索引号民全部加入至了device_ring_list
//当有数据报文从指定接口进入时,可以很容易地在device_ring_list中找到相应的设备
//然后再遍历链表,再找到与之相应的ring
if(dev->ifindex < MAX_NUM_DEVICES) {

device_ring_list_element *elem;

/* printk("[PF_RING] Adding ring to device index %d\n", dev->ifindex); */

elem = kmalloc(sizeof(device_ring_list_element), GFP_ATOMIC);
if(elem != NULL) {
  elem->the_ring = pfr;
  INIT_LIST_HEAD(&elem->list);
  write_lock(&ring_list_lock);
  list_add(&elem->list, &device_ring_list[dev->ifindex]);
  write_unlock(&ring_list_lock);
  /* printk("[PF_RING] Added ring to device index %d\n", dev->ifindex); */
}

}

/*

IMPORTANT
Leave this statement here as last one. In fact when
the ring_netdev != NULL the socket is ready to be used.

*/
pfr->ring_netdev = dev;

return(0);
}
复制代码

这个函数中,最重要的三点:
1、整个空间的详细构成,作者画了一个简单的草图,清晰明了。
2、如果取得某个槽位。
3、device_ring_list链表的使用。

一些作者有详细注释的地方,我就不再重重了。

这一步进行完了后,就有一块内存了(系统将其看成一个数组),用来存储捕获的数据帧。接下来要做的事情。就是把它映射到用户态。

目录
相关文章
|
4月前
|
Linux Windows
[收藏] Ring0 Call Ring3
[收藏] Ring0 Call Ring3
|
存储
av_register_all分析
av_register_all分析
107 0
av_register_all分析
AVPacket结构体内几个变量分析
AVPacket结构体内几个变量分析
66 0
AVPacket结构体内几个变量分析
|
前端开发 rax Shell
[PWN][高级篇]ROP-ret2libc-32/64位实例 (共四个)(上)
[PWN][高级篇]ROP-ret2libc-32/64位实例 (共四个)
836 0
[PWN][高级篇]ROP-ret2libc-32/64位实例 (共四个)(上)
|
网络协议 BI 调度
NR PRACH(五) type1 RA(4-step)基本过程
无线通信,最重要的前提是建立接收端和发射端之间的时间同步。
FreeType 使用FT_MEM_ALLOC/FT_FREE内存操作
FreeType 使用FT_MEM_ALLOC/FT_FREE内存操作
124 0
|
NoSQL Shell
[PWN][高级篇]利用ROP-ret2Syscall突破NX保护(下)
[PWN][高级篇]利用ROP-ret2Syscall突破NX保护
145 0
[PWN][高级篇]利用ROP-ret2Syscall突破NX保护(下)
|
安全 Shell
[PWN][高级篇]利用ROP-ret2Syscall突破NX保护(上)
[PWN][高级篇]利用ROP-ret2Syscall突破NX保护
226 0
[PWN][高级篇]利用ROP-ret2Syscall突破NX保护(上)
PF_RING实现分析(3)
刚刚接触PF_RING,学习了,O(∩_∩)O 有个问题请教一下九贱前辈:内核中的PACKET_MMAP跟PF_RING有什么不同呢? 感觉PACKET_MMAP跟transparent_mode=0时的PF_RING原理上是一样的( ⊙ o ⊙ )啊...?
172 0
|
网络协议 Linux API
PF_RING实现分析(2)
4、mmap操作 用户态的接下来调用: ring->buffer = (char *)mmap(NULL, PAGE_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED, ring->fd, 0); 复制代码 进行内存映射。 同样地,内核调用相应的ring_mmap进行处理。 Ring选项结构通过ring_sk宏与sk 建立关联 struct ring_opt *pfr = ring_sk(sk); 复制代码
436 0