linux网络实现分析(3)——数据包的发送(IP层到链路层)

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介:

二层(链路层)数据包发送过程分析

        当上层准备好一个包之后,交给链路层,链路层数据包发送主要通过dev_queue_xmit函数处理。数据包的发送可分为两种,一种是正常的传输流程,即通过网卡驱动,另一种是通过软中断(见注3)。为了理解方便,首先看一下dev_queue_xmi函数的整体调用关系图。


     dev_queue_xmit

    本函数用来将带发送的skb加入一个dev的队列(Queue),调用这个函数前必须设置好skb的device和priority,本函数可以在中断上下文中被调用。

返回值:

      返回非0(正数或负数)表示函数出错,返回0表示成功,但是并不表示数据包被成功发送出去,因为数据包可能因为限速等原因被丢掉

    函数执行后传入的skb将被释放,所以如果想控制数据包,实现对skb的重传时需要增加skb的引用计数。

    当调用此函数时中断必须是打开的,因为BH enable必须要求IRQ enable,否则会造成死锁

int dev_queue_xmit(struct sk_buff *skb)

{

    struct net_device *dev = skb->dev;

    struct netdev_queue *txq;

    struct Qdisc *q;

    int rc = -ENOMEM;

 

    /* GSO will handle the following emulations directly. */

    if (netif_needs_gso(dev, skb))

        goto gso;

 

    if (skb_has_frags(skb) &&

        !(dev->features & NETIF_F_FRAGLIST) &&

        __skb_linearize(skb))

        goto out_kfree_skb;

//如果skb有分片但是发送设备不支持分片,或分片中有分片在高端内存但发送设备不支持DMA,需要将所有段重新组合成一个段 ,这里__skb_linearize其实就是__pskb_pull_tail(skb, skb->data_len),这个函数基本上等同于pskb_may_pull  pskb_may_pull的作用就是检测skb对应的主buf中是否有足够的空间来pulllen长度,如果不够就重新分配skb并将frags中的数据拷贝入新分配的主buff中,而这里将参数len设置为skb->datalen 也就是会将所有的数据全部拷贝到主buff中,以这种方式完成skb的线性化。

    if (skb_shinfo(skb)->nr_frags &&

        (!(dev->features & NETIF_F_SG) || illegal_highdma(dev, skb)) &&

        __skb_linearize(skb))

        goto out_kfree_skb;

//如果数据包没有被计算校验和并且发送设备不支持这个协议的校验,则在此进行校验和的计算(注1)。如果上面已经线性化了一次,这里的__skb_linearize就会直接返回,注意区别fragsfrag_list,前者是将多的数据放到单独分配的页面中,sk_buff只有一个。而后者则是连接多个sk_buff 

    if (skb->ip_summed == CHECKSUM_PARTIAL) {

        skb_set_transport_header(skb, skb->csum_start -

                          skb_headroom(skb));

        if (!dev_can_checksum(dev, skb) && skb_checksum_help(skb))

            goto out_kfree_skb;

    }

 

gso:

//关闭软中断,禁止cpu抢占

    rcu_read_lock_bh();

//选择一个发送列,如果设备提供了select_queue就使用它,否选择一个,里只是Linux核多列的实,但是要真正的使用都列,需要网卡支持多列才可以,一般的网卡都只有一个列。在alloc_etherdev分配net_device是,设置列的个

txq = dev_pick_tx(dev, skb);

// netdev_queue构上取设备的qdisc 

    q = rcu_dereference(txq->qdisc);

//如果该设备有队列可用,就调用__dev_xmit_skb

    if (q->enqueue) {

        rc = __dev_xmit_skb(skb, q, dev, txq);

        goto out;

    }

//下面的处理是在有发送列的情况,设备一般有发送列:如lotunnle我们所要做的就是直接调用驱动的hard_start_xmit将它发送出去  如果发送失败就直接丢弃,因为没有队列可以保存它  

    if (dev->flags & IFF_UP) { //确定设备是否开启

        int cpu = smp_processor_id(); /* ok because BHs are off */

 

        if (txq->xmit_lock_owner != cpu) {//是否在同一个cpu

            HARD_TX_LOCK(dev, txq, cpu);

            if (!netif_tx_queue_stopped(txq)) {//确定队列是运行状态

                rc = NET_XMIT_SUCCESS;

                if (!dev_hard_start_xmit(skb, dev, txq)) {

                    HARD_TX_UNLOCK(dev, txq);

                    goto out;

                }

            }

            HARD_TX_UNLOCK(dev, txq);

            if (net_ratelimit())

                printk(KERN_CRIT "Virtual device %s asks to "

                       "queue packet!\n", dev->name);

        } else {// txq->xmit_lock_owner == cpu的情况,说明发生递归

            if (net_ratelimit())

                printk(KERN_CRIT "Dead loop on virtual device "

                       "%s, fix it urgently!\n", dev->name);

        }

    }

 

    rc = -ENETDOWN;

    rcu_read_unlock_bh();

out_kfree_skb:

    kfree_skb(skb);

    return rc;

out:

    rcu_read_unlock_bh();

    return rc;

}


 __dev_xmit_skb

_dev_xmit_skb函数主要做两件事情:

(1)如果流控对象为空的,试图直接发送数据包。

(2)如果流控对象不空,将数据包加入流控对象,并运行流控对象。

static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,

                 struct net_device *dev,

                 struct netdev_queue *txq)

{

    spinlock_t *root_lock = qdisc_lock(q);//见注2

    int rc;

 

    spin_lock(root_lock);   //qdisc

    if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {//判断队列是否失效

        kfree_skb(skb);

        rc = NET_XMIT_DROP;

    } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&

           !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {

        /*

         * This is a work-conserving queue; there are no old skbs

         * waiting to be sent out; and the qdisc is not running -

         * xmit the skb directly.

         */

        __qdisc_update_bstats(q, skb->len);

        if (sch_direct_xmit(skb, q, dev, txq, root_lock))

            __qdisc_run(q);

        else

            clear_bit(__QDISC_STATE_RUNNING, &q->state);

 

        rc = NET_XMIT_SUCCESS;

    } else {

        rc = qdisc_enqueue_root(skb, q);

        qdisc_run(q);

    }

    spin_unlock(root_lock);

 

    return rc;

}

 

    qdisc_run

有两个时机将会调用qdisc_run():

1.__dev_xmit_skb()

2. 软中断服务线程NET_TX_SOFTIRQ

static inline void qdisc_run(struct Qdisc *q)

{

    if (!test_and_set_bit(__QDISC_STATE_RUNNING, &q->state))//将队列设置为运行状态

        __qdisc_run(q);

}

    __qdisc_run

void __qdisc_run(struct Qdisc *q)

{

    unsigned long start_time = jiffies;

 

    while (qdisc_restart(q)) { //返回值大于0,说明流控对象非空

        /*如果发现本队列运行的时间太长了,将会停止队列的运行,并将队列加入output_queue链表头

         * Postpone processing if  (延迟处理)

         * 1. another process needs the CPU;

         * 2. we've been doing it for too long.

         */

        if (need_resched() || jiffies != start_time) { //已经不允许继续运行本流控对象

            __netif_schedule(q); //将本qdisc加入每cpu变量softnet_dataoutput_queue链表中

            break;

        }

    }

  //清除队列的运行标识

    clear_bit(__QDISC_STATE_RUNNING, &q->state);

}

环调qdisc_restart发送,下面个函qdisc_restart是真正发送据包的函,它从列上取下一个,然后尝试将它发送出去,若发送失败则一般是重新入

此函返回值:发送成功返回剩余列长度,发送失败时返回0(若发送成功且剩余列长度0也返回0

 

    qdisc_restart

__QDISC_STATE_RUNNING状态保证同一时刻只有一个cpu在处理这个qdisc,qdisc_lock(q)用来保证对这个队列的顺序访问。

通常netif_tx_lock使用来保本设备驱动的顺序(独占)访问的,qdisc_lock(q)用来保证qdisc的顺序访问,这两个是互斥的,获得其中一个必须释放另一个。

static inline int qdisc_restart(struct Qdisc *q)

{

    struct netdev_queue *txq;

    struct net_device *dev;

    spinlock_t *root_lock;

    struct sk_buff *skb;

 

    /* Dequeue packet */

    skb = dequeue_skb(q); //一开始就调用dequeue函数

    if (unlikely(!skb))

        return 0;  //返回0说明队列是空的或者被限制

 

    root_lock = qdisc_lock(q);

    dev = qdisc_dev(q);

    txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));

 

    return sch_direct_xmit(skb, q, dev, txq, root_lock); //用于发送数据包

}

    sch_direct_xmit

   发送一个skb,将队列置为__QDISC_STATE_RUNNING状态,保证只有一个cpu运行这个函数,返回0表示队列为空或者发送受限,大于0表示队列非空。

int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,

            struct net_device *dev, struct netdev_queue *txq,

            spinlock_t *root_lock)

{

    int ret = NETDEV_TX_BUSY;

 

    spin_unlock(root_lock);// release qdisc,因为后面要获取设备锁

   // 调用__netif_tx_lockà spin_lock(&txq->_xmit_lock,,保证设备驱动的独占访问

    HARD_TX_LOCK(dev, txq, smp_processor_id());

    if (!netif_tx_queue_stopped(txq) && //设备没有被停止,且发送队列没有被冻结

        !netif_tx_queue_frozen(txq))

        ret = dev_hard_start_xmit(skb, dev, txq); //发送数据包

    HARD_TX_UNLOCK(dev, txq);  // 调用__netif_tx_unlock

 

    spin_lock(root_lock);

 

    switch (ret) {

    case NETDEV_TX_OK:  //如果设备成功将数据包发送出去

        ret = qdisc_qlen(q); //返回剩余的队列长度

        break;

 

    case NETDEV_TX_LOCKED: //获取设备锁失败

        ret = handle_dev_cpu_collision(skb, txq, q);

        break;

 

    default: //设备繁忙,重新入队发送(利用softirq

        if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))

            printk(KERN_WARNING "BUG %s code %d qlen %d\n",

                   dev->name, ret, q->q.qlen);

        ret = dev_requeue_skb(skb, q);

        break;

    }

 

    if (ret && (netif_tx_queue_stopped(txq) ||

            netif_tx_queue_frozen(txq)))

        ret = 0;

 

    return ret;

}

 

    dev_hard_start_xmit

int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,

            struct netdev_queue *txq)

{

    const struct net_device_ops *ops = dev->netdev_ops;

    int rc;

 

if (likely(!skb->next)) {

//从这里可以看出,对于每一个发送的包也会发给ptype_all一份,  packet套接字创建时对于protoETH_P_ALL的会在ptype_all中注册一个成员,因此对于协议号为ETH_P_ALLpacket套接字来说,发送和接受的数据都能收到

        if (!list_empty(&ptype_all))

            dev_queue_xmit_nit(skb, dev);

 

        if (netif_needs_gso(dev, skb)) {

            if (unlikely(dev_gso_segment(skb)))

                goto out_kfree_skb;

            if (skb->next)

                goto gso;

        }

 

       //如果发送设备不需要skb->dst,则在此将其释放

        if (dev->priv_flags & IFF_XMIT_DST_RELEASE)

            skb_dst_drop(skb);

       //调用设备注册的发送函数,即dev->netdev_ops-> ndo_start_xmit(skb, dev)

        rc = ops->ndo_start_xmit(skb, dev);

        if (rc == NETDEV_TX_OK)

txq_trans_update(txq);

        return rc;

    }

 

gso:

……

}

    dev_queue_xmit_nit

static void dev_queue_xmit_nit(struct sk_buff *skb, struct net_device *dev)

{

    struct packet_type *ptype;

#ifdef CONFIG_NET_CLS_ACT

    if (!(skb->tstamp.tv64 && (G_TC_FROM(skb->tc_verd) & AT_INGRESS)))

        net_timestamp(skb); //记录该数据包输入的时间戳

#else

    net_timestamp(skb);

#endif

 

    rcu_read_lock();

    list_for_each_entry_rcu(ptype, &ptype_all, list) {

        /* Never send packets back to the socket they originated from */

       //遍历ptype_all链表,查找所有符合输入条件的原始套接口,并循环将数据包输入到满足条件的套接口

        if ((ptype->dev == dev || !ptype->dev) &&

            (ptype->af_packet_priv == NULL ||

(struct sock *)ptype->af_packet_priv != skb->sk)) {

       //由于该数据包是额外输入到这个原始套接口的,因此需要克隆一个数据包

            struct sk_buff *skb2 = skb_clone(skb, GFP_ATOMIC);

            if (!skb2)

                break;

 

            /* skb->nh should be correctly(确保头部偏移正确)

               set by sender, so that the second statement is

               just protection against buggy protocols.

             */

            skb_reset_mac_header(skb2);

 

            if (skb_network_header(skb2) < skb2->data ||

                skb2->network_header > skb2->tail) {

                if (net_ratelimit())//net_ratelimit用来保证网络代码中printk的频率

                    printk(KERN_CRIT "protocol %04x is "

                           "buggy, dev %s\n",

                           skb2->protocol, dev->name);

                skb_reset_network_header(skb2); //重新设置L3头部偏移

            }

 

            skb2->transport_header = skb2->network_header;

            skb2->pkt_type = PACKET_OUTGOING;

            ptype->func(skb2, skb->dev, ptype, skb->dev);//调用协议(ptype_all)接受函数

        }

    }

    rcu_read_unlock();

}

 

Ø  环回设备

对于环回设备loopback,设备的ops->ndo_start_xmit被初始化为loopback_xmit函数。

static const struct net_device_ops loopback_ops = {

    .ndo_init      = loopback_dev_init,

    .ndo_start_xmit= loopback_xmit,

    .ndo_get_stats = loopback_get_stats,

};

drivers/net/loopback.c

static netdev_tx_t loopback_xmit(struct sk_buff *skb,

                 struct net_device *dev)

{

    struct pcpu_lstats *pcpu_lstats, *lb_stats;

    int len;

 

    skb_orphan(skb);

 

    skb->protocol = eth_type_trans(skb, dev);

 

    /* it's OK to use per_cpu_ptr() because BHs are off */

    pcpu_lstats = dev->ml_priv;

    lb_stats = per_cpu_ptr(pcpu_lstats, smp_processor_id());

 

    len = skb->len;

    if (likely(netif_rx(skb) == NET_RX_SUCCESS)) { //直接调用了netif_rx进行了接收处理

        lb_stats->bytes += len;

        lb_stats->packets++;

    } else

        lb_stats->drops++;

 

    return NETDEV_TX_OK;

}

 

注:

1. CHECKSUM_PARTIAL表示使用硬件checksum ,L4头的校已经完,并且已经加入uh->check字段中,只需要设备算整个头4头的校值。

2. 整个数据包发送逻辑中会涉及到三个用于互斥访问的代码:

1spinlock_t *root_lock = qdisc_lock(q);

2test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)

3__netif_tx_lockà spin_lock(&txq->_xmit_lock)

     其中(1)(3)分别对应一个spinlock,(2)对应一个队列状态。在了解代码中如何使用这三个同步方法时,首先看一下相关数据结构的关系,如下。


    图中绿色部分表示(1)(3)两处spinlock。首先看(1)处对应的代码:

static inline spinlock_t *qdisc_lock(struct Qdisc *qdisc)

{

    return &qdisc->q.lock;

}

所以root_lock是用于控制qdiscskb队列访问的锁,当需要对skb队列进行enqueue、dequeue、requeue时,就需要加锁。

__QDISC_STATE_RUNNING标志用于保证一个流控对象(qdisc不会同时被多个cpu访问。

而(3)处的spinlock,即struct netdev_queue中的_xmit_lock,则用于保证dev的注册函数的互斥访问,即deriver的同步。

另外,内核代码注释中写到,(1)和(3)是互斥的,获得(1)处的锁时必须先保证释放(3)处的锁,反之亦然,为什么要这样还没有想明白。。。。

3. 已经有了dev_queue_xmit什么需要断来发送呢?

可以看到在dev_queue_xmitskb行了一些处理(比如合并成一个包,算校和等),处理完的skb是可以直接发送的了,这时dev_queue_xmit也会先skbskb一般都是在个函中入的),并且qdisc_run尝试发送,但是有可能发送失这时skb重新入,并且自己直接返回。

只是发送列中的skb以及放已经发送的skb,它无需再skb线性化或者校和处理另外在列被停止的情况下,dev_queue_xmit仍然可以把包加入列,但是不能发送样在列被醒的候就需要通过软断来发送停止期间积压的包而言之,dev_queue_xmitskb做些最后的处理并且第一次尝试发送,前者发送失或者发完的包发送出去。(其实发送断还有一个作用,就是放已经发送的包,因某些情况下发送是在硬件中中完成的,了提高硬件中处理效率,核提供一种方式将释skb放到行,这时只要dev_kfree_skb_irq,它skb加入softnet_datacompletion_queue中,然后启发送net_tx_action会在completion_queue中的skb全部放掉)

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
2月前
|
域名解析 存储 网络协议
深入解析网络通信关键要素:IP 协议、DNS 及相关技术
本文详细介绍了IP协议报头结构及其各字段的功能,包括版本、首部长度、服务类型、总长度、标识、片偏移、标志、生存时间(TTL)、协议、首部检验和等内容。此外,还探讨了IP地址的网段划分、特殊IP地址的应用场景,以及路由选择的大致流程。最后,文章简要介绍了DNS协议的作用及其发展历史,解释了域名解析系统的工作原理。
106 5
深入解析网络通信关键要素:IP 协议、DNS 及相关技术
|
2天前
|
网络协议 网络安全 数据安全/隐私保护
计算机网络概念:网关,DHCP,IP寻址,ARP欺骗,路由,DDOS等
【10月更文挑战第27天】计算机主机网关的作用类似于小区传达室的李大爷,负责将内部网络的请求转发到外部网络。当小区内的小不点想与外面的小明通话时,必须通过李大爷(网关)进行联系。网关不仅帮助内部设备与外部通信,还负责路由选择,确保数据包高效传输。此外,网关还参与路由表的维护和更新,确保网络路径的准确性。
11 2
|
24天前
|
Web App开发 资源调度 网络协议
Linux系统之部署IP工具箱MyIP
【10月更文挑战第5天】使用Docker部署Radicale日历和联系人应用Linux系统之部署IP工具箱MyIP
62 1
Linux系统之部署IP工具箱MyIP
|
11天前
|
存储 缓存 Ubuntu
配置网络接口的“IP”命令10个
【10月更文挑战第18天】配置网络接口的“IP”命令10个
35 0
|
20天前
|
运维 安全 网络协议
Python 网络编程:端口检测与IP解析
本文介绍了使用Python进行网络编程的两个重要技能:检查端口状态和根据IP地址解析主机名。通过`socket`库实现端口扫描和主机名解析的功能,并提供了详细的示例代码。文章最后还展示了如何整合这两部分代码,实现一个简单的命令行端口扫描器,适用于网络故障排查和安全审计。
21 0
|
2月前
|
缓存 网络协议 网络架构
网络抓包分析【IP,ICMP,ARP】以及 IP数据报,MAC帧,ICMP报和ARP报的数据报格式
本文详细介绍了如何使用网络抓包工具Wireshark进行网络抓包分析,包括以太网v2 MAC帧、IP数据报、ICMP报文和ARP报文的格式,以及不同网络通信的过程。文章通过抓包分析展示了IP数据报、ICMP数据报和ARP数据报的具体信息,包括MAC地址、IP地址、ICMP类型和代码、以及ARP的硬件类型、协议类型、操作类型等。通过这些分析,可以更好地理解网络协议的工作机制和数据传输过程。
网络抓包分析【IP,ICMP,ARP】以及 IP数据报,MAC帧,ICMP报和ARP报的数据报格式
|
2月前
|
网络协议 网络虚拟化
接收网络包的过程——从硬件网卡解析到IP
【9月更文挑战第18天】这段内容详细描述了网络包接收过程中机制。当网络包触发中断后,内核处理完这批网络包,会进入主动轮询模式,持续处理后续到来的包,直至处理间隙返回其他任务,从而减少中断次数,提高处理效率。此机制涉及网卡驱动初始化时注册轮询函数,通过软中断触发后续处理,并逐步深入内核网络协议栈,最终到达TCP层。整个接收流程分为多个层次,包括DMA技术存入Ring Buffer、中断通知CPU、软中断处理、以及进入内核网络协议栈等多个步骤。
|
24天前
|
网络协议 网络架构
【第三期】计算机网络常识/网络分层模型与数据包封装传输过程
【第三期】计算机网络常识/网络分层模型与数据包封装传输过程
41 0
|
2月前
|
网络协议 安全 数据安全/隐私保护
动静态IP的网络协议有什么不同
IP地址分为静态和动态两种分配方式。静态IP地址由管理员手动分配,确保设备具有固定且唯一的网络标识,适用于服务器等关键设备。动态IP地址则通过DHCP服务器自动分配,提供更高的灵活性和管理效率,适合个人电脑和移动设备。两者在网络配置、管理和安全性方面各有优劣,需根据具体应用场景进行选择。静态IP地址虽稳定但配置复杂,安全性较低;动态IP地址配置简单,安全性更高,能有效防止针对固定IP的攻击。
|
2月前
|
存储 传感器 Linux
STM32微控制器为何不适合运行Linux系统的分析
总的来说,虽然技术上可能存在某些特殊情况下将Linux移植到高端STM32微控制器上的可能性,但从资源、性能、成本和应用场景等多个方面考虑,STM32微控制器不适合运行Linux系统。对于需要运行Linux的应用,更适合选择ARM Cortex-A系列处理器的开发平台。
203 0