PF_RING实现分析(2)

简介: 4、mmap操作用户态的接下来调用: ring->buffer = (char *)mmap(NULL, PAGE_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED, ring->fd, 0);复制代码进行内存映射。同样地,内核调用相应的ring_mmap进行处理。Ring选项结构通过ring_sk宏与sk 建立关联struct ring_opt *pfr = ring_sk(sk);复制代码

4、mmap操作
用户态的接下来调用:

                    ring->buffer = (char *)mmap(NULL, PAGE_SIZE, PROT_READ|PROT_WRITE,
                            MAP_SHARED, ring->fd, 0);

复制代码

进行内存映射。
同样地,内核调用相应的ring_mmap进行处理。
Ring选项结构通过ring_sk宏与sk 建立关联

struct ring_opt *pfr = ring_sk(sk);
复制代码

pfr->ring_memory 即为分配的环形队列空间。所以,要mmap操作,实际上就是调用remap_pfn_range函数把pfr->ring_memory 映射到用户空间即可。这个函数的原型为:

/**

  • remap_pfn_range - remap kernel memory to userspace
  • @vma: user vma to map to
  • @addr: target user address to start at
  • @pfn: physical address of kernel memory
  • @size: size of map area
  • @prot: page protection flags for this mapping

*

  • Note: this is only safe if the mm semaphore is held when called.

*/
int remap_pfn_range(struct vm_area_struct *vma, unsigned long addr,

                unsigned long pfn, unsigned long size, pgprot_t prot)

{
复制代码

关于remap_pfn_range函数的进一步说明,可以参考LDD3,上面有详细说明和现成的例子。

static int ring_mmap(struct file *file,

                 struct socket *sock, struct vm_area_struct *vma)

{
struct sock *sk = sock->sk;
struct ring_opt *pfr = ring_sk(sk); //取得pfr指针,也就是相应取得环形队列的内存空间地址指针
int rc;
unsigned long size = (unsigned long)(vma->vm_end - vma->vm_start);

if(size % PAGE_SIZE) {

if defined(RING_DEBUG)

printk("[PF_RING] ring_mmap() failed: "
       "len is not multiple of PAGE_SIZE\n");

endif

return(-EINVAL);

}

if defined(RING_DEBUG)

printk("[PF_RING] ring_mmap() called, size: %ld bytes\n", size);

endif

if((pfr->dna_device == NULL) && (pfr->ring_memory == NULL)) {

if defined(RING_DEBUG)

printk("[PF_RING] ring_mmap() failed: "
       "mapping area to an unbound socket\n");

endif

return -EINVAL;

}

//dns设备为空,即没有使用dns技术
if(pfr->dna_device == NULL) {

/* if userspace tries to mmap beyond end of our buffer, fail */
//映射空间超限
if(size > pfr->slots_info->tot_mem) {

if defined(RING_DEBUG)

  printk("[PF_RING] ring_mmap() failed: "
         "area too large [%ld > %d]\n",
         size, pfr->slots_info->tot_mem);

endif

  return(-EINVAL);
}

if defined(RING_DEBUG)

printk("[PF_RING] mmap [slot_len=%d]"
       "[tot_slots=%d] for ring on device %s\n",
       pfr->slots_info->slot_len, pfr->slots_info->tot_slots,
       pfr->ring_netdev->name);

endif

    //进行内存映射
if((rc =
     do_memory_mmap(vma, size, pfr->ring_memory, VM_LOCKED,
                    0)) < 0)
  return(rc);

} else {

/* DNA Device */
if(pfr->dna_device == NULL)
  return(-EAGAIN);

switch (pfr->mmap_count) {
case 0:
  if((rc = do_memory_mmap(vma, size,
                           (void *)pfr->dna_device->
                           packet_memory, VM_LOCKED,
                           1)) < 0)
    return(rc);
  break;

case 1:
  if((rc = do_memory_mmap(vma, size,
                           (void *)pfr->dna_device->
                           descr_packet_memory, VM_LOCKED,
                           1)) < 0)
    return(rc);
  break;

case 2:
  if((rc = do_memory_mmap(vma, size,
                           (void *)pfr->dna_device->
                           phys_card_memory,
                           (VM_RESERVED | VM_IO), 2)) < 0)
    return(rc);
  break;

default:
  return(-EAGAIN);
}

pfr->mmap_count++;

}

if defined(RING_DEBUG)

printk("[PF_RING] ring_mmap succeeded\n");

endif

return 0;
}
复制代码

实际上的内存映射工作,是由do_memory_mmap来完成的,这个函数实际上基本就是remap_pfn_range的包裹函数。
不过因为系统支持dna等技术,相应的mode参数有些变化,这里只分析了最基本的方法:mode == 0

static int do_memory_mmap(struct vm_area_struct *vma,

                      unsigned long size, char *ptr, u_int flags, int mode)

{
unsigned long start;
unsigned long page;

/ we do not want to have this area swapped out, lock it /
vma->vm_flags |= flags;
start = vma->vm_start;

while (size > 0) {

int rc;

if(mode == 0) {

if(LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,11))

      //根据地址,计算要映射的页帧
  page = vmalloc_to_pfn(ptr);
  //进行内存映射
  rc = remap_pfn_range(vma, start, page, PAGE_SIZE,
                       PAGE_SHARED);

else

  page = vmalloc_to_page(ptr);
  page = kvirt_to_pa(ptr);
  rc = remap_page_range(vma, start, page, PAGE_SIZE,
                        PAGE_SHARED);

endif

} else if(mode == 1) {
  rc = remap_pfn_range(vma, start,
                       __pa(ptr) >> PAGE_SHIFT,
                       PAGE_SIZE, PAGE_SHARED);
} else {
  rc = remap_pfn_range(vma, start,
                       ((unsigned long)ptr) >> PAGE_SHIFT,
                       PAGE_SIZE, PAGE_SHARED);
}

if(rc) {

if defined(RING_DEBUG)

  printk("[PF_RING] remap_pfn_range() failed\n");

endif

  return(-EAGAIN);
}

start += PAGE_SIZE;
ptr += PAGE_SIZE;
if(size > PAGE_SIZE) {
  size -= PAGE_SIZE;
} else {
  size = 0;
}

}

return(0);
}
复制代码

嗯,跳过了太多的细节,不过其mmap最核心的东东已经呈现出来。
如果要共享内核与用户空间内存,这倒是个现成的可借鉴的例子。

5、数据包的入队操作

做到这一步,准备工作基本上就完成了。因为PF_RING在初始化中,注册了prot_hook。其func指针指向packet_rcv函数:
当数据报文进入Linux网络协议栈队列时,netif_receive_skb会遍历这些注册的Hook:

int netif_receive_skb(struct sk_buff *skb)
{

    list_for_each_entry_rcu(ptype, &ptype_all, list) {
            if (ptype->dev == null_or_orig || ptype->dev == skb->dev ||
                ptype->dev == orig_dev) {
                    if (pt_prev)
                            ret = deliver_skb(skb, pt_prev, orig_dev);
                    pt_prev = ptype;
            }
    }

}
复制代码

相应的Hook函数得到调用:

static inline int deliver_skb(struct sk_buff *skb,

                          struct packet_type *pt_prev,
                          struct net_device *orig_dev)

{

    atomic_inc(&skb->users);        //注意,这里引用计数器被增加了
    return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);

}
复制代码

packet_rcv随之执行环形队列的入队操作:

static int packet_rcv(struct sk_buff skb, struct net_device dev,

                  struct packet_type *pt, struct net_device *orig_dev)

{
int rc;

//忽略本地环回报文
if(skb->pkt_type != PACKET_LOOPBACK) {

      //进一步转向,最后一个参数直接使用-1,从上下文来看,写为RING_ANY_CHANNEL(其实也是-1)似乎可读性更强,
      //这里表示,如果从packet_rcv进入队列,由通道ID是“未指定的”,由skb_ring_handler来处理
rc = skb_ring_handler(skb,
                      (skb->pkt_type == PACKET_OUTGOING) ? 0 : 1,
                      1, -1 /* unknown channel */);

} else

rc = 0;

kfree_skb(skb); //所以,这里要做相应的减少
return(rc);
}
复制代码

static int skb_ring_handler(struct sk_buff *skb, //要捕获的数据包

                        u_char recv_packet,                                                                //数据流方向,>0表示是进入(接收)方向
                        u_char real_skb /* 1=real skb, 0=faked skb */ ,
                        short channel_id)                                                                //通道ID

{
struct sock *skElement;
int rc = 0, is_ip_pkt;
struct list_head *ptr;
struct pfring_pkthdr hdr;
int displ;
struct sk_buff *skk = NULL;
struct sk_buff *orig_skb = skb;

ifdef PROFILING

uint64_t rdt = _rdtsc(), rdt1, rdt2;

endif

//skb合法检查,包括数据流的方向
if((!skb) / Invalid skb /

  ||((!enable_tx_capture) && (!recv_packet))) {
/*
  An outgoing packet is about to be sent out
  but we decided not to handle transmitted
  packets.
*/
return(0);

}

if defined(RING_DEBUG)

if(1) {

struct timeval tv;

skb_get_timestamp(skb, &tv);
printk
  ("[PF_RING] skb_ring_handler() [skb=%p][%u.%u][len=%d][dev=%s][csum=%u]\n",
   skb, (unsigned int)tv.tv_sec, (unsigned int)tv.tv_usec,
   skb->len,
   skb->dev->name == NULL ? "<NULL>" : skb->dev->name,
   skb->csum);

}

endif

    //如果通道ID未指定,<a rel="dofollow" href="https://www.fgba.net/" title="富贵论坛"><span style="color: rgba(38, 38, 38, 1)">富贵论坛</span></a>根据进入的报文设备索引,设定之

if(LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,21))

if(channel_id == RING_ANY_CHANNEL / Unknown channel / )

channel_id = skb->iif;        /* Might have been set by the driver */

endif

if defined (RING_DEBUG)

/ printk("[PF_RING] channel_id=%d\n", channel_id); /

endif

ifdef PROFILING

rdt1 = _rdtsc();

endif

if(recv_packet) {

/* Hack for identifying a packet received by the e1000 */
if(real_skb)
  displ = SKB_DISPLACEMENT;
else
  displ = 0;        /* Received by the e1000 wrapper */

} else

displ = 0;
    

//解析数据报文,并判断是否为IP报文
is_ip_pkt = parse_pkt(skb, displ, &hdr);

//分片处理,是一个可选的功能项,事实上,对大多数包捕获工具而言,它们好像都不使用底层库来完成这一功能
/ (de)Fragmentation <fusco@ntop.org> /
if(enable_ip_defrag

  && real_skb && is_ip_pkt && recv_packet && (ring_table_size > 0)) {
} else {

if defined (RING_DEBUG)

    printk("[PF_RING] Do not seems to be a fragmented ip_pkt[iphdr=%p]\n",
           iphdr);

endif

  }
}

}

//按惯例,在报文的捕获首部信息中记录捕获的时间戳
/ BD - API changed for time keeping /

if(LINUX_VERSION_CODE < KERNEL_VERSION(2,6,14))

if(skb->stamp.tv_sec == 0)

do_gettimeofday(&skb->stamp);

hdr.ts.tv_sec = skb->stamp.tv_sec, hdr.ts.tv_usec = skb->stamp.tv_usec;

elif(LINUX_VERSION_CODE < KERNEL_VERSION(2,6,22))

if(skb->tstamp.off_sec == 0)

__net_timestamp(skb);

hdr.ts.tv_sec = skb->tstamp.off_sec, hdr.ts.tv_usec =

skb->tstamp.off_usec;

else / 2.6.22 and above /

if(skb->tstamp.tv64 == 0)

__net_timestamp(skb);

hdr.ts = ktime_to_timeval(skb->tstamp);

endif

//除了时间,还有长度,熟悉libpcap的话,这些操作应该很眼熟
hdr.len = hdr.caplen = skb->len + displ;

/ Avoid the ring to be manipulated while playing with it /
read_lock_bh(&ring_mgmt_lock);

/* 前面在创建sk时,已经看过ring_insert的入队操作了,现在要检查它的成员

  • 它们的关系是,通过ring_table的成员,获取到element,它里面封装了sk,

*通过ring_sk宏,就可以得到ring_opt指针
*/
list_for_each(ptr, &ring_table) {

struct ring_opt *pfr;
struct ring_element *entry;

entry = list_entry(ptr, struct ring_element, list);

skElement = entry->sk;
pfr = ring_sk(skElement);
    
    //看来要加入社团,条件还是满多的,pfr不能为空,未指定集群cluster_id,槽位不能为空,方向要正确,绑定的网络设备
    //得对上号
    //另一种可能就是对bonding的支持,如果设备是从属设备,则应校验其主设备
if((pfr != NULL)
    && (pfr->cluster_id == 0 /* No cluster */ )
    && (pfr->ring_slots != NULL)
    && is_valid_skb_direction(pfr->direction, recv_packet)
    && ((pfr->ring_netdev == skb->dev)
        || ((skb->dev->flags & IFF_SLAVE)
            && (pfr->ring_netdev == skb->dev->master)))) {
  /* We've found the ring where the packet can be stored */
  /* 从新计算捕获帧长度,是因为可能因为巨型帧的出现——超过了桶能容纳的长度 */
  int old_caplen = hdr.caplen;        /* Keep old lenght */
  hdr.caplen = min(hdr.caplen, pfr->bucket_len);
  /* 入队操作 */
  add_skb_to_ring(skb, pfr, &hdr, is_ip_pkt, displ, channel_id);
  hdr.caplen = old_caplen;
  rc = 1;        /* Ring found: we've done our job */
}

}

/ [2] Check socket clusters /
list_for_each(ptr, &ring_cluster_list) {

ring_cluster_element *cluster_ptr;
struct ring_opt *pfr;

cluster_ptr = list_entry(ptr, ring_cluster_element, list);

if(cluster_ptr->cluster.num_cluster_elements > 0) {
  u_int skb_hash = hash_pkt_cluster(cluster_ptr, &hdr);

  skElement = cluster_ptr->cluster.sk[skb_hash];

  if(skElement != NULL) {
    pfr = ring_sk(skElement);

    if((pfr != NULL)
        && (pfr->ring_slots != NULL)
        && ((pfr->ring_netdev == skb->dev)
            || ((skb->dev->flags & IFF_SLAVE)
                && (pfr->ring_netdev ==
                    skb->dev->master)))
        && is_valid_skb_direction(pfr->direction, recv_packet)
        ) {
      /* We've found the ring where the packet can be stored */
      add_skb_to_ring(skb, pfr, &hdr,
                      is_ip_pkt, displ,
                      channel_id);
      rc = 1;        /* Ring found: we've done our job */
    }
  }
}

}

read_unlock_bh(&ring_mgmt_lock);

ifdef PROFILING

rdt1 = _rdtsc() - rdt1;

endif

ifdef PROFILING

rdt2 = _rdtsc();

endif

/ Fragment handling /
if(skk != NULL)

kfree_skb(skk);

if(rc == 1) {

if(transparent_mode != driver2pf_ring_non_transparent) {
  rc = 0;
} else {
  if(recv_packet && real_skb) {

if defined(RING_DEBUG)

    printk("[PF_RING] kfree_skb()\n");

endif

    kfree_skb(orig_skb);
  }
}

}

ifdef PROFILING

rdt2 = _rdtsc() - rdt2;
rdt = _rdtsc() - rdt;

if defined(RING_DEBUG)

printk

("[PF_RING] # cycles: %d [lock costed %d %d%%][free costed %d %d%%]\n",
 (int)rdt, rdt - rdt1,
 (int)((float)((rdt - rdt1) * 100) / (float)rdt), rdt2,
 (int)((float)(rdt2 * 100) / (float)rdt));

endif

endif

//printk("[PF_RING] Returned %d\n", rc);
return(rc); / 0 = packet not handled /
}

上面跳过了对cluster(集群)的分析,PF_RING允许同时对多个接口捕获报文,而并不是一个。这就是集群。看一下它用户态的注释就一目了然了:

                    /* Syntax
                    ethX@1,5       channel 1 and 5
                    ethX@1-5       channel 1,2...5
                    ethX@1-3,5-7   channel 1,2,3,5,6,7
                    */

复制代码

进一步的入队操作,是通过add_skb_to_ring来完成的:

static int add_skb_to_ring(struct sk_buff *skb,

                       struct ring_opt *pfr,
                       struct pfring_pkthdr *hdr,
                       int is_ip_pkt, int displ, short channel_id)

{

  //add_skb_to_ring函数比较复杂,因为它要处理过滤器方面的问题。
  //关于PF_RING的过滤器,可以参考[url]http://luca.ntop.org/Blooms.pdf[/url]
  //获取更多内容。这里不做详细讨论了。或者留到下回分解吧。
  
  //最终入队操作,是通过调用dd_pkt_to_ring来实现的。
  add_pkt_to_ring(skb, pfr, hdr, displ, channel_id,
                  offset, mem);        

}
复制代码

static void add_pkt_to_ring(struct sk_buff *skb,

                        struct ring_opt *pfr,
                        struct pfring_pkthdr *hdr,
                        int displ, short channel_id,
                        int offset, void *plugin_mem)

{
char *ring_bucket;
int idx;
FlowSlot *theSlot;
int32_t the_bit = 1 << channel_id;

if defined(RING_DEBUG)

printk("[PF_RING] --> add_pkt_to_ring(len=%d) pfr->channel_id=%d\n",

     hdr->len, pfr->channel_id, channel_id);

endif

//检查激活标志
if(!pfr->ring_active)

return;

if((pfr->channel_id != RING_ANY_CHANNEL)

  && (channel_id != RING_ANY_CHANNEL)
  && ((pfr->channel_id & the_bit) != the_bit))
return; /* Wrong channel */

//写锁
write_lock_bh(&pfr->ring_index_lock);
//获取前一次插入的位置索引
idx = pfr->slots_info->insert_idx;
//调用get_insert_slot获取当前要捕获数据报文的合适的槽位
//这里idx++后,指向了下一次插入的位置索引
idx++, theSlot = get_insert_slot(pfr);
//累计计数器
pfr->slots_info->tot_pkts++;

//没位子了,累计丢包计数器,返回之
if((theSlot == NULL) || (theSlot->slot_state != 0)) {

/* No room left */
pfr->slots_info->tot_lost++;
write_unlock_bh(&pfr->ring_index_lock);
return;

}

//获取当前槽位的桶
ring_bucket = &theSlot->bucket;

//支持插件??在最开始处记录插件信息??
if((plugin_mem != NULL) && (offset > 0))

memcpy(&ring_bucket[sizeof(struct pfring_pkthdr)], plugin_mem, offset);  

if(skb != NULL) {

      //重新计算捕获帧长度
hdr->caplen = min(pfr->bucket_len - offset, hdr->caplen);

if(hdr->caplen > 0) {

if defined(RING_DEBUG)

  printk("[PF_RING] --> [caplen=%d][len=%d][displ=%d][parsed_header_len=%d][bucket_len=%d][sizeof=%d]\n",
     hdr->caplen, hdr->len, displ,
         hdr->parsed_header_len, pfr->bucket_len,
         sizeof(struct pfring_pkthdr));

endif

  //拷贝捕获的数据报文,前面空了两个栏位:一个是pkthdr首部,一个是插件offset长度
  //这里经过了一次数据拷贝,对于完美主义者,这并不是一个好的方法。但是PF_RING定位于一个
  //通用的接口库,似乎只有这么做了。否则,追求“零拷贝”,为了避免这一次拷贝,只有逐个修改网卡驱动了。
  skb_copy_bits(skb, -displ,
                &ring_bucket[sizeof(struct pfring_pkthdr) + offset], hdr->caplen);
} else {
  if(hdr->parsed_header_len >= pfr->bucket_len) {
    static u_char print_once = 0;

    if(!print_once) {
      printk("[PF_RING] WARNING: the bucket len is [%d] shorter than the plugin parsed header [%d]\n",
         pfr->bucket_len, hdr->parsed_header_len);
      print_once = 1;
    }
  }
}

}

//记录首部
memcpy(ring_bucket, hdr, sizeof(struct pfring_pkthdr)); / Copy extended packet header /

//前面idx已经自加过了,判断是否队列已满,若满,归零,否则更新插入索引
if(idx == pfr->slots_info->tot_slots)

pfr->slots_info->insert_idx = 0;

else

pfr->slots_info->insert_idx = idx;

if defined(RING_DEBUG)

printk("[PF_RING] ==> insert_idx=%d\n", pfr->slots_info->insert_idx);

endif

//累计插入计数器
pfr->slots_info->tot_insert++;
//槽位就绪标记,用户空间可以来取了
theSlot->slot_state = 1;
write_unlock_bh(&pfr->ring_index_lock);

//有的时候会出现,用户空间取不到的情况,如队列为空。这样,用户空间调用poll等待数据。这里做相应的唤醒处理
/ wakeup in case of poll() /
if(waitqueue_active(&pfr->ring_slots_waitqueue))

wake_up_interruptible(&pfr->ring_slots_waitqueue);

}
复制代码

槽位的计算:

在ring_bind函数中,分配空间后,使用ring_slots做为槽位指针。事实上,这里要计算槽位,就是通过索引号 * 槽位长度来得到:
static inline FlowSlot get_insert_slot(struct ring_opt pfr)
{
if(pfr->ring_slots != NULL) {

FlowSlot *slot =
  (FlowSlot *) & (pfr->
                  ring_slots[pfr->slots_info->insert_idx *
                             pfr->slots_info->slot_len]);

if defined(RING_DEBUG)

printk
  ("[PF_RING] get_insert_slot(%d): returned slot [slot_state=%d]\n",
   pfr->slots_info->insert_idx, slot->slot_state);

endif

return(slot);

} else {

if defined(RING_DEBUG)

printk("[PF_RING] get_insert_slot(%d): NULL slot\n",
       pfr->slots_info->insert_idx);

endif

return(NULL);

}
}

目录
相关文章
|
1月前
|
API
FFmpeg中AVPacket、AVFrame结构的基本使用
FFmpeg中AVPacket和AVFrame结构的内存分配、释放和引用计数处理,以及如何避免内存泄漏。
31 3
|
1月前
|
Shell 网络安全 PHP
thewall-文件包含-CAP_DAC_READ_SEARCH
thewall-文件包含-CAP_DAC_READ_SEARCH
18 2
|
2月前
|
Linux Windows
[收藏] Ring0 Call Ring3
[收藏] Ring0 Call Ring3
|
存储
av_register_all分析
av_register_all分析
87 0
av_register_all分析
nobuffer与av_read_frame的关系
nobuffer与av_read_frame的关系
80 0
AVPacket结构体内几个变量分析
AVPacket结构体内几个变量分析
53 0
AVPacket结构体内几个变量分析
|
索引
av_find_best_stream
av_find_best_stream
140 0
av_find_best_stream
|
存储 编解码 Android开发
【Android FFMPEG 开发】FFMPEG 读取音视频流中的数据到 AVPacket ( 初始化 AVPacket 数据 | 读取 AVPacket )
【Android FFMPEG 开发】FFMPEG 读取音视频流中的数据到 AVPacket ( 初始化 AVPacket 数据 | 读取 AVPacket )
248 0
|
存储 Linux API
PF_RING实现分析(1)
内核版本:Linux 2.6.30.9 PF_RING版本:4.1.0 最近看了一个PF_RING的实现,看了个大概,发上来大家讨论讨论,共同学习。 一、什么是PF_RING PF_RING是一个第三方的内核数据包捕获接口,类似于libpcap。 二、为什么需要PF_RING 一切为了效率,按照其官方网站上的测试数据,在Linux平台之上,其效率至少高于libpcap 50% - 60%,甚至是一倍。更好的是,PF_RING提供了一个修改版本的libpcap,使之建立在PF_RING接口之上。这样,原来使用libpcap的程序,就可以自然过渡了。
694 0
PF_RING实现分析(3)
刚刚接触PF_RING,学习了,O(∩_∩)O 有个问题请教一下九贱前辈:内核中的PACKET_MMAP跟PF_RING有什么不同呢? 感觉PACKET_MMAP跟transparent_mode=0时的PF_RING原理上是一样的( ⊙ o ⊙ )啊...?
166 0