linux网络实现分析(3)——数据包的发送(IP层到链路层)-阿里云开发者社区

开发者社区> 开发与运维> 正文

linux网络实现分析(3)——数据包的发送(IP层到链路层)

简介:

二层(链路层)数据包发送过程分析

        当上层准备好一个包之后,交给链路层,链路层数据包发送主要通过dev_queue_xmit函数处理。数据包的发送可分为两种,一种是正常的传输流程,即通过网卡驱动,另一种是通过软中断(见注3)。为了理解方便,首先看一下dev_queue_xmi函数的整体调用关系图。

86cacc0f907f1832817f5bc7bb4b6b5bdd2d16c2

    dev_queue_xmit

    本函数用来将带发送的skb加入一个dev的队列(Queue),调用这个函数前必须设置好skb的device和priority,本函数可以在中断上下文中被调用。

返回值:

      返回非0(正数或负数)表示函数出错,返回0表示成功,但是并不表示数据包被成功发送出去,因为数据包可能因为限速等原因被丢掉

    函数执行后传入的skb将被释放,所以如果想控制数据包,实现对skb的重传时需要增加skb的引用计数。

    当调用此函数时中断必须是打开的,因为BH enable必须要求IRQ enable,否则会造成死锁

int dev_queue_xmit(struct sk_buff *skb)

{

    struct net_device *dev = skb->dev;

    struct netdev_queue *txq;

    struct Qdisc *q;

    int rc = -ENOMEM;

 

    /* GSO will handle the following emulations directly. */

    if (netif_needs_gso(dev, skb))

        goto gso;

 

    if (skb_has_frags(skb) &&

        !(dev->features & NETIF_F_FRAGLIST) &&

        __skb_linearize(skb))

        goto out_kfree_skb;

//如果skb有分片但是发送设备不支持分片,或分片中有分片在高端内存但发送设备不支持DMA,需要将所有段重新组合成一个段 ,这里__skb_linearize其实就是__pskb_pull_tail(skb, skb->data_len),这个函数基本上等同于pskb_may_pull  pskb_may_pull的作用就是检测skb对应的主buf中是否有足够的空间来pulllen长度,如果不够就重新分配skb并将frags中的数据拷贝入新分配的主buff中,而这里将参数len设置为skb->datalen 也就是会将所有的数据全部拷贝到主buff中,以这种方式完成skb的线性化。

    if (skb_shinfo(skb)->nr_frags &&

        (!(dev->features & NETIF_F_SG) || illegal_highdma(dev, skb)) &&

        __skb_linearize(skb))

        goto out_kfree_skb;

//如果数据包没有被计算校验和并且发送设备不支持这个协议的校验,则在此进行校验和的计算(注1)。如果上面已经线性化了一次,这里的__skb_linearize就会直接返回,注意区别fragsfrag_list,前者是将多的数据放到单独分配的页面中,sk_buff只有一个。而后者则是连接多个sk_buff 

    if (skb->ip_summed == CHECKSUM_PARTIAL) {

        skb_set_transport_header(skb, skb->csum_start -

                          skb_headroom(skb));

        if (!dev_can_checksum(dev, skb) && skb_checksum_help(skb))

            goto out_kfree_skb;

    }

 

gso:

//关闭软中断,禁止cpu抢占

    rcu_read_lock_bh();

//选择一个发送列,如果设备提供了select_queue就使用它,否选择一个,里只是Linux核多列的实,但是要真正的使用都列,需要网卡支持多列才可以,一般的网卡都只有一个列。在alloc_etherdev分配net_device是,设置列的个

txq = dev_pick_tx(dev, skb);

// netdev_queue构上取设备的qdisc 

    q = rcu_dereference(txq->qdisc);

//如果该设备有队列可用,就调用__dev_xmit_skb

    if (q->enqueue) {

        rc = __dev_xmit_skb(skb, q, dev, txq);

        goto out;

    }

//下面的处理是在有发送列的情况,设备一般有发送列:如lotunnle我们所要做的就是直接调用驱动的hard_start_xmit将它发送出去  如果发送失败就直接丢弃,因为没有队列可以保存它  

    if (dev->flags & IFF_UP) { //确定设备是否开启

        int cpu = smp_processor_id(); /* ok because BHs are off */

 

        if (txq->xmit_lock_owner != cpu) {//是否在同一个cpu

            HARD_TX_LOCK(dev, txq, cpu);

            if (!netif_tx_queue_stopped(txq)) {//确定队列是运行状态

                rc = NET_XMIT_SUCCESS;

                if (!dev_hard_start_xmit(skb, dev, txq)) {

                    HARD_TX_UNLOCK(dev, txq);

                    goto out;

                }

            }

            HARD_TX_UNLOCK(dev, txq);

            if (net_ratelimit())

                printk(KERN_CRIT "Virtual device %s asks to "

                       "queue packet!\n", dev->name);

        } else {// txq->xmit_lock_owner == cpu的情况,说明发生递归

            if (net_ratelimit())

                printk(KERN_CRIT "Dead loop on virtual device "

                       "%s, fix it urgently!\n", dev->name);

        }

    }

 

    rc = -ENETDOWN;

    rcu_read_unlock_bh();

out_kfree_skb:

    kfree_skb(skb);

    return rc;

out:

    rcu_read_unlock_bh();

    return rc;

}


 __dev_xmit_skb

_dev_xmit_skb函数主要做两件事情:

(1)如果流控对象为空的,试图直接发送数据包。

(2)如果流控对象不空,将数据包加入流控对象,并运行流控对象。

static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,

                 struct net_device *dev,

                 struct netdev_queue *txq)

{

    spinlock_t *root_lock = qdisc_lock(q);//见注2

    int rc;

 

    spin_lock(root_lock);   //qdisc

    if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {//判断队列是否失效

        kfree_skb(skb);

        rc = NET_XMIT_DROP;

    } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&

           !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {

        /*

         * This is a work-conserving queue; there are no old skbs

         * waiting to be sent out; and the qdisc is not running -

         * xmit the skb directly.

         */

        __qdisc_update_bstats(q, skb->len);

        if (sch_direct_xmit(skb, q, dev, txq, root_lock))

            __qdisc_run(q);

        else

            clear_bit(__QDISC_STATE_RUNNING, &q->state);

 

        rc = NET_XMIT_SUCCESS;

    } else {

        rc = qdisc_enqueue_root(skb, q);

        qdisc_run(q);

    }

    spin_unlock(root_lock);

 

    return rc;

}

 

   qdisc_run

有两个时机将会调用qdisc_run():

1.__dev_xmit_skb()

2. 软中断服务线程NET_TX_SOFTIRQ

static inline void qdisc_run(struct Qdisc *q)

{

    if (!test_and_set_bit(__QDISC_STATE_RUNNING, &q->state))//将队列设置为运行状态

        __qdisc_run(q);

}

   __qdisc_run

void __qdisc_run(struct Qdisc *q)

{

    unsigned long start_time = jiffies;

 

    while (qdisc_restart(q)) { //返回值大于0,说明流控对象非空

        /*如果发现本队列运行的时间太长了,将会停止队列的运行,并将队列加入output_queue链表头

         * Postpone processing if  (延迟处理)

         * 1. another process needs the CPU;

         * 2. we've been doing it for too long.

         */

        if (need_resched() || jiffies != start_time) { //已经不允许继续运行本流控对象

            __netif_schedule(q); //将本qdisc加入每cpu变量softnet_dataoutput_queue链表中

            break;

        }

    }

  //清除队列的运行标识

    clear_bit(__QDISC_STATE_RUNNING, &q->state);

}

环调qdisc_restart发送,下面个函qdisc_restart是真正发送据包的函,它从列上取下一个,然后尝试将它发送出去,若发送失败则一般是重新入

此函返回值:发送成功返回剩余列长度,发送失败时返回0(若发送成功且剩余列长度0也返回0

 

   qdisc_restart

__QDISC_STATE_RUNNING状态保证同一时刻只有一个cpu在处理这个qdisc,qdisc_lock(q)用来保证对这个队列的顺序访问。

通常netif_tx_lock使用来保本设备驱动的顺序(独占)访问的,qdisc_lock(q)用来保证qdisc的顺序访问,这两个是互斥的,获得其中一个必须释放另一个。

static inline int qdisc_restart(struct Qdisc *q)

{

    struct netdev_queue *txq;

    struct net_device *dev;

    spinlock_t *root_lock;

    struct sk_buff *skb;

 

    /* Dequeue packet */

    skb = dequeue_skb(q); //一开始就调用dequeue函数

    if (unlikely(!skb))

        return 0;  //返回0说明队列是空的或者被限制

 

    root_lock = qdisc_lock(q);

    dev = qdisc_dev(q);

    txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));

 

    return sch_direct_xmit(skb, q, dev, txq, root_lock); //用于发送数据包

}

   sch_direct_xmit

   发送一个skb,将队列置为__QDISC_STATE_RUNNING状态,保证只有一个cpu运行这个函数,返回0表示队列为空或者发送受限,大于0表示队列非空。

int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,

            struct net_device *dev, struct netdev_queue *txq,

            spinlock_t *root_lock)

{

    int ret = NETDEV_TX_BUSY;

 

    spin_unlock(root_lock);// release qdisc,因为后面要获取设备锁

   // 调用__netif_tx_lockà spin_lock(&txq->_xmit_lock,,保证设备驱动的独占访问

    HARD_TX_LOCK(dev, txq, smp_processor_id());

    if (!netif_tx_queue_stopped(txq) && //设备没有被停止,且发送队列没有被冻结

        !netif_tx_queue_frozen(txq))

        ret = dev_hard_start_xmit(skb, dev, txq); //发送数据包

    HARD_TX_UNLOCK(dev, txq);  // 调用__netif_tx_unlock

 

    spin_lock(root_lock);

 

    switch (ret) {

    case NETDEV_TX_OK:  //如果设备成功将数据包发送出去

        ret = qdisc_qlen(q); //返回剩余的队列长度

        break;

 

    case NETDEV_TX_LOCKED: //获取设备锁失败

        ret = handle_dev_cpu_collision(skb, txq, q);

        break;

 

    default: //设备繁忙,重新入队发送(利用softirq

        if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))

            printk(KERN_WARNING "BUG %s code %d qlen %d\n",

                   dev->name, ret, q->q.qlen);

        ret = dev_requeue_skb(skb, q);

        break;

    }

 

    if (ret && (netif_tx_queue_stopped(txq) ||

            netif_tx_queue_frozen(txq)))

        ret = 0;

 

    return ret;

}

 

   dev_hard_start_xmit

int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,

            struct netdev_queue *txq)

{

    const struct net_device_ops *ops = dev->netdev_ops;

    int rc;

 

if (likely(!skb->next)) {

//从这里可以看出,对于每一个发送的包也会发给ptype_all一份,  packet套接字创建时对于protoETH_P_ALL的会在ptype_all中注册一个成员,因此对于协议号为ETH_P_ALLpacket套接字来说,发送和接受的数据都能收到

        if (!list_empty(&ptype_all))

            dev_queue_xmit_nit(skb, dev);

 

        if (netif_needs_gso(dev, skb)) {

            if (unlikely(dev_gso_segment(skb)))

                goto out_kfree_skb;

            if (skb->next)

                goto gso;

        }

 

       //如果发送设备不需要skb->dst,则在此将其释放

        if (dev->priv_flags & IFF_XMIT_DST_RELEASE)

            skb_dst_drop(skb);

       //调用设备注册的发送函数,即dev->netdev_ops-> ndo_start_xmit(skb, dev)

        rc = ops->ndo_start_xmit(skb, dev);

        if (rc == NETDEV_TX_OK)

txq_trans_update(txq);

        return rc;

    }

 

gso:

……

}

   dev_queue_xmit_nit

static void dev_queue_xmit_nit(struct sk_buff *skb, struct net_device *dev)

{

    struct packet_type *ptype;

#ifdef CONFIG_NET_CLS_ACT

    if (!(skb->tstamp.tv64 && (G_TC_FROM(skb->tc_verd) & AT_INGRESS)))

        net_timestamp(skb); //记录该数据包输入的时间戳

#else

    net_timestamp(skb);

#endif

 

    rcu_read_lock();

    list_for_each_entry_rcu(ptype, &ptype_all, list) {

        /* Never send packets back to the socket they originated from */

       //遍历ptype_all链表,查找所有符合输入条件的原始套接口,并循环将数据包输入到满足条件的套接口

        if ((ptype->dev == dev || !ptype->dev) &&

            (ptype->af_packet_priv == NULL ||

(struct sock *)ptype->af_packet_priv != skb->sk)) {

       //由于该数据包是额外输入到这个原始套接口的,因此需要克隆一个数据包

            struct sk_buff *skb2 = skb_clone(skb, GFP_ATOMIC);

            if (!skb2)

                break;

 

            /* skb->nh should be correctly(确保头部偏移正确)

               set by sender, so that the second statement is

               just protection against buggy protocols.

             */

            skb_reset_mac_header(skb2);

 

            if (skb_network_header(skb2) < skb2->data ||

                skb2->network_header > skb2->tail) {

                if (net_ratelimit())//net_ratelimit用来保证网络代码中printk的频率

                    printk(KERN_CRIT "protocol %04x is "

                           "buggy, dev %s\n",

                           skb2->protocol, dev->name);

                skb_reset_network_header(skb2); //重新设置L3头部偏移

            }

 

            skb2->transport_header = skb2->network_header;

            skb2->pkt_type = PACKET_OUTGOING;

            ptype->func(skb2, skb->dev, ptype, skb->dev);//调用协议(ptype_all)接受函数

        }

    }

    rcu_read_unlock();

}

 

Ø  环回设备

对于环回设备loopback,设备的ops->ndo_start_xmit被初始化为loopback_xmit函数。

static const struct net_device_ops loopback_ops = {

    .ndo_init      = loopback_dev_init,

    .ndo_start_xmit= loopback_xmit,

    .ndo_get_stats = loopback_get_stats,

};

drivers/net/loopback.c

static netdev_tx_t loopback_xmit(struct sk_buff *skb,

                 struct net_device *dev)

{

    struct pcpu_lstats *pcpu_lstats, *lb_stats;

    int len;

 

    skb_orphan(skb);

 

    skb->protocol = eth_type_trans(skb, dev);

 

    /* it's OK to use per_cpu_ptr() because BHs are off */

    pcpu_lstats = dev->ml_priv;

    lb_stats = per_cpu_ptr(pcpu_lstats, smp_processor_id());

 

    len = skb->len;

    if (likely(netif_rx(skb) == NET_RX_SUCCESS)) { //直接调用了netif_rx进行了接收处理

        lb_stats->bytes += len;

        lb_stats->packets++;

    } else

        lb_stats->drops++;

 

    return NETDEV_TX_OK;

}

 

注:

1. CHECKSUM_PARTIAL表示使用硬件checksum ,L4头的校已经完,并且已经加入uh->check字段中,只需要设备算整个头4头的校值。

2. 整个数据包发送逻辑中会涉及到三个用于互斥访问的代码:

1spinlock_t *root_lock = qdisc_lock(q);

2test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)

3__netif_tx_lockà spin_lock(&txq->_xmit_lock)

     其中(1)(3)分别对应一个spinlock,(2)对应一个队列状态。在了解代码中如何使用这三个同步方法时,首先看一下相关数据结构的关系,如下。

23560ed0c021e54d1796d5222568ada44abdc449

    图中绿色部分表示(1)(3)两处spinlock。首先看(1)处对应的代码:

static inline spinlock_t *qdisc_lock(struct Qdisc *qdisc)

{

    return &qdisc->q.lock;

}

所以root_lock是用于控制qdiscskb队列访问的锁,当需要对skb队列进行enqueue、dequeue、requeue时,就需要加锁。

__QDISC_STATE_RUNNING标志用于保证一个流控对象(qdisc不会同时被多个cpu访问。

而(3)处的spinlock,即struct netdev_queue中的_xmit_lock,则用于保证dev的注册函数的互斥访问,即deriver的同步。

另外,内核代码注释中写到,(1)和(3)是互斥的,获得(1)处的锁时必须先保证释放(3)处的锁,反之亦然,为什么要这样还没有想明白。。。。

3. 已经有了dev_queue_xmit什么需要断来发送呢?

可以看到在dev_queue_xmitskb行了一些处理(比如合并成一个包,算校和等),处理完的skb是可以直接发送的了,这时dev_queue_xmit也会先skbskb一般都是在个函中入的),并且qdisc_run尝试发送,但是有可能发送失这时skb重新入,并且自己直接返回。

只是发送列中的skb以及放已经发送的skb,它无需再skb线性化或者校和处理另外在列被停止的情况下,dev_queue_xmit仍然可以把包加入列,但是不能发送样在列被醒的候就需要通过软断来发送停止期间积压的包而言之,dev_queue_xmitskb做些最后的处理并且第一次尝试发送,前者发送失或者发完的包发送出去。(其实发送断还有一个作用,就是放已经发送的包,因某些情况下发送是在硬件中中完成的,了提高硬件中处理效率,核提供一种方式将释skb放到行,这时只要dev_kfree_skb_irq,它skb加入softnet_datacompletion_queue中,然后启发送net_tx_action会在completion_queue中的skb全部放掉)

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

其他文章