译|Monitoring and Tuning the Linux Networking Stack: Sending Data(六)

简介: 译|Monitoring and Tuning the Linux Networking Stack: Sending Data(六)
Transmit Packet Steering(XPS)

Transmit Packet Steering(XPS)是一项特性,允许系统管理员确定哪些 CPU 可以处理设备的哪些传输队列的传输操作。此功能的主要目的是避免在处理传输请求时出现锁争用。使用 XPS 时,还期望获得其他好处,如减少缓存驱逐和避免在 NUMA 机器 上进行远程内存访问。

您可以 查看 XPS 的内核文档 来了解更多关于 XPS 如何工作的信息。我们将在下面研究如何为您的系统调整 XPS,但现在,您需要知道的是,要配置 XPS,系统管理员可以定义一个位图,映射传输队列到 CPU。

上面代码中调用 get_xps_queue 函数将查询此用户指定的映射,以确定应使用哪个传输队列。如果 get_xps_queue 返回 -1,则将改用 skb_tx_hash

skb_tx_hash

如果内核未包含 XPS,或未配置 XPS,或建议的队列不可用(可能是因为用户调整了队列计数),则 skb_tx_hash 接管以确定发送数据到哪个队列。根据传输工作负载,准确了解 skb_tx_hash 工作原理非常重要。 请注意,这段代码已经随着时间的推移进行了调整,因此如果您使用的内核版本与本文档不同,您应该直接查阅您的内核源代码。

让我们看看它是如何工作的,来自 ./include/linux/netdevice.h

/*
 * Returns a Tx hash for the given packet when dev->real_num_tx_queues is used
 * as a distribution range limit for the returned value.
 */
static inline u16 skb_tx_hash(const struct net_device *dev,
                              const struct sk_buff *skb)
{
        return __skb_tx_hash(dev, skb, dev->real_num_tx_queues);
}

代码只是调用 __skb_tx_hash,来自 ./net/core/flow_dissector.c。这个函数中有一些有趣的代码,让我们来看看:

/*
 * Returns a Tx hash based on the given packet descriptor a Tx queues' number
 * to be used as a distribution range.
 */
u16 __skb_tx_hash(const struct net_device *dev, const struct sk_buff *skb,
                  unsigned int num_tx_queues)
{
        u32 hash;
        u16 qoffset = 0;
        u16 qcount = num_tx_queues;
        if (skb_rx_queue_recorded(skb)) {
                hash = skb_get_rx_queue(skb);
                while (unlikely(hash >= num_tx_queues))
                        hash -= num_tx_queues;
                return hash;
        }

函数中的第一个 if 语句是一个有趣的短路。函数名 skb_rx_queue_recorded 有些误导。skb 有一个 queue_mapping 字段,用于 rx 和 tx。无论如何,如果您的系统正在接收数据包,并转发它们到其他地方,则此 if 语句为真。如果不是这种情况,则代码继续。

if (dev->num_tc) {
        u8 tc = netdev_get_prio_tc_map(dev, skb->priority);
        qoffset = dev->tc_to_txq[tc].offset;
        qcount = dev->tc_to_txq[tc].count;
}

要理解这段代码,重要的是要提到程序可以设置套接字发送数据的优先级。这可以使用 setsockoptSOL_SOCKETSO_PRIORITY 级别和 optname 分别完成。有关 SO_PRIORITY 的更多信息,请参阅 socket(7) 手册页

请注意,如果您在应用程序中使用了 setsockopt 选项 IP_TOS 来设置特定套接字发送的 IP 数据包的 TOS 标志(或者如果作为辅助消息传递给 sendmsg 则按每个数据包设置),则内核转换您设置的 TOS 选项为优先级,最终进入 skb->priority

如前所述,某些网络设备支持基于硬件的流量控制系统。如果 num_tc 非零,则表示此设备支持基于硬件的流量控制。

如果该数字非零,则表示此设备支持基于硬件的流量控制。将查询优先级映射,优先级映射映射数据包优先级到基于硬件的流量控制。根据此映射为数据优先级选择适当的流量类别。

接下来,将生成适合流量类别的传输队列范围。它们将确定传输队列。

如果 num_tc 为零(因为网络设备不支持基于硬件的流量控制),则 qcountqoffset 变量分别设置为传输队列数和 0

使用 qcountqoffset,可以计算传输队列的索引:

if (skb->sk && skb->sk->sk_hash)
                hash = skb->sk->sk_hash;
        else
                hash = (__force u16) skb->protocol;
        hash = __flow_hash_1word(hash);
        return (u16) (((u64) hash * qcount) >> 32) + qoffset;
}
EXPORT_SYMBOL(__skb_tx_hash);

最后,返回适当的队列索引到 __netdev_pick_tx

恢复 __dev_queue_xmit

此时,已选择适当的传输队列。__dev_queue_xmit 可以继续:

q = rcu_dereference_bh(txq->qdisc);
#ifdef CONFIG_NET_CLS_ACT
        skb->tc_verd = SET_TC_AT(skb->tc_verd, AT_EGRESS);
#endif
        trace_net_dev_queue(skb);
        if (q->enqueue) {
                rc = __dev_xmit_skb(skb, q, dev, txq);
                goto out;
        }

它首先获得与此队列相关联的排队规则的引用。回想一下,我们之前看到,对于单个传输队列设备,默认值是 pfifo_fast qdisc,而对于多队列设备,它是 mq qdisc。

接下来,如果在内核中启用了数据包分类 API,则代码会为传出数据分配一个流量分类“决定”。接下来,检查排队规则是否有方法将数据排队。像 noqueue qdisc 这样的一些排队规则没有队列。如果有队列,则代码调用 __dev_xmit_skb 来继续处理要传输的数据。之后,执行跳转到此函数的结尾。我们稍后将看一下 __dev_xmit_skb。现在,让我们看看如果没有队列会发生什么,从一个非常有用的注释开始:

/* The device has no queue. Common case for software devices:
   loopback, all the sorts of tunnels...
   Really, it is unlikely that netif_tx_lock protection is necessary
   here.  (f.e. loopback and IP tunnels are clean ignoring statistics
   counters.)
   However, it is possible, that they rely on protection
   made by us here.
   Check this and shot the lock. It is not prone from deadlocks.
   Either shot noqueue qdisc, it is even simpler 8)
 */
if (dev->flags & IFF_UP) {
        int cpu = smp_processor_id(); /* ok because BHs are off */

正如注释所示,唯一可以拥有不带队列的 qdisc 的设备是环回设备和隧道设备。 如果设备当前已启动,则保存当前 CPU。 它用于下一项检查,这有点棘手,让我们来看看:

if (txq->xmit_lock_owner != cpu) {
        if (__this_cpu_read(xmit_recursion) > RECURSION_LIMIT)
                goto recursion_alert;

此处有两个分支:该设备队列上的传输锁是否由该 CPU 拥有。 如果是,则在此处检查为每个 CPU 分配的计数器变量 xmit_recursion,以确定计数是否超过 RECURSION_LIMIT。 一个程序可能试图发送数据,并在代码中的这个地方被抢占。 调度程序可以选择另一个程序来运行。 如果第二个程序也试图发送数据并运行到这里。 因此,xmit_recursion 计数器防止超过RECURSION_LIMIT 程序此处竞争传输数据。 让我们继续:

HARD_TX_LOCK(dev, txq, cpu);
                        if (!netif_xmit_stopped(txq)) {
                                __this_cpu_inc(xmit_recursion);
                                rc = dev_hard_start_xmit(skb, dev, txq);
                                __this_cpu_dec(xmit_recursion);
                                if (dev_xmit_complete(rc)) {
                                        HARD_TX_UNLOCK(dev, txq);
                                        goto out;
                                }
                        }
                        HARD_TX_UNLOCK(dev, txq);
                        net_crit_ratelimited("Virtual device %s asks to queue packet!\n",
                                             dev->name);
                } else {
                        /* Recursion is detected! It is possible,
                         * unfortunately
                         */
recursion_alert:
                        net_crit_ratelimited("Dead loop on virtual device %s, fix it urgently!\n",
                                             dev->name);
                }
        }

代码的其余部分首先尝试获取传输锁。检查要使用的设备的传输队列,以查看是否停止传输。如果没有,则增加 xmit_recursion 变量,并传递数据到更靠近设备的位置进行传输。我们稍后会更详细地看到 dev_hard_start_xmit。完成后,释放锁并打印警告。

另外,如果当前 CPU 是传输锁所有者,或者如果达到了 RECURSION_LIMIT,则不进行传输,但会打印警告。函数中剩余的代码设置错误码并返回。

由于我们对真实以太网设备感兴趣,因此让我们继续沿着前面 __dev_xmit_skb 为那些设备所采用的代码路径。

__dev_xmit_skb

现在我们从 ./net/core/dev. c 进入 __dev_xmit_skb,并配备了排队规则、网络设备和传输队列引用:

static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
                                 struct net_device *dev,
                                 struct netdev_queue *txq)
{
        spinlock_t *root_lock = qdisc_lock(q);
        bool contended;
        int rc;
        qdisc_pkt_len_init(skb);
        qdisc_calculate_pkt_len(skb, q);
        /*
         * Heuristic to force contended enqueues to serialize on a
         * separate lock before trying to get qdisc main lock.
         * This permits __QDISC_STATE_RUNNING owner to get the lock more often
         * and dequeue packets faster.
         */
        contended = qdisc_is_running(q);
        if (unlikely(contended))
                spin_lock(&q->busylock);

这段代码首先使用 qdisc_pkt_len_initqdisc_calculate_pkt_len 计算 qdisc 稍后将使用的数据的准确长度。 这对于基于硬件的发送卸载(例如 UDP 分段卸载,如我们之前所看到的)的 skb 是必要的,因为需要考虑在分段发生时添加的附加报头。

接下来,使用一把锁来帮助减少 qdisc 主锁(稍后我们将看到第二把锁)的竞争。 如果 qdisc 当前正在运行,则其他试图传输的程序将竞争 qdisc 的 busylock。 使得运行中的 qdisc 处理数据包,并与较少数量的程序竞争第二把主锁。 该技巧减少了竞争者的数量,从而增加了吞吐量。 你可以在 这里 阅读描述这一点的原始提交消息。 接下来,主锁被占用:

spin_lock(root_lock);

现在,我们接近一个 if 语句,它处理 3 种可能的情况:

  1. qdisc 已停用。
  2. qdisc 允许数据包绕过排队系统,且没有其他数据包要发送,且 qdisc 当前未运行。 qdisc 变为 “工作节省” qdisc ,允许数据包绕过 —— 换句话说,流量整形目的的 qdisc 不延迟数据包传输。
  3. 所有其他情况。

让我们来看看在这些情况下会发生什么,从停用的 qdisc 开始:

if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
        kfree_skb(skb);
        rc = NET_XMIT_DROP;

这是直截了当的。 如果 qdisc 已停用,请释放数据并设置返回码为 NET_XMIT_DROP。 接下来,qdisc 允许数据包旁路,没有其他未完成的数据包,且 qdisc 当前未运行:

} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
           qdisc_run_begin(q)) {
        /*
         * This is a work-conserving queue; there are no old skbs
         * waiting to be sent out; and the qdisc is not running -
         * xmit the skb directly.
         */
        if (!(dev->priv_flags & IFF_XMIT_DST_RELEASE))
                skb_dst_force(skb);
        qdisc_bstats_update(q, skb);
        if (sch_direct_xmit(skb, q, dev, txq, root_lock)) {
                if (unlikely(contended)) {
                        spin_unlock(&q->busylock);
                        contended = false;
                }
                __qdisc_run(q);
        } else
                qdisc_run_end(q);
        rc = NET_XMIT_SUCCESS;

这个 if 语句有点棘手。 如果以下所有条件均为 true,则整个语句的计算结果为真:

  1. q->flags & TCQ_F_CAN_BYPASS:qdisc 允许数据包绕过排队系统。 这对于“工作节省”的 qdisc 是 true;即,出于流量整形目的而不延迟数据包传输的 qdisc 被认为是 “工作节省” 的,并且允许数据包绕过。 pfifo_fast qdisc 允许数据包绕过排队系统。
  2. !qdisc_qlen(q):qdisc 的队列中没有等待传输的数据。
  3. qdisc_run_begin(p):此函数调用设置 qdisc 的状态为 “running” 并返回 true,如果 qdisc 已经在运行则返回 false。

如果上述所有值均为 true,则:

  • 检查 IFF_XMIT_DST_RELEASE 标志。 如果启用,此标志表示允许内核释放 skb 的目标缓存结构。 此函数中的代码检查标志是否被禁用,并强制对该结构进行引用计数。
  • qdisc_bstats_update 增加 qdisc 发送的字节数和数据包数。
  • sch_direct_xmit 尝试发送数据包。 我们将很快深入研究 sch_direct_xmit,因为它也用于较慢的代码路径中。

在两种情况下检查 sch_direct_xmit 的返回值:

  1. 队列不为空(返回 > 0 )。在这种情况下,会释放防止其他程序争用的锁,并调用__qdisc_run 重新启动 qdisc 处理。
  2. 队列为空(返回 0)。在这种情况下,调用 qdisc_run_end 关闭 qdisc 处理。

在这两种情况下,返回值 NET_XMIT_SUCCESS 都被设置为返回码。 还不算太糟。 让我们看看最后一个分支,即捕获所有情况:

} else {
        skb_dst_force(skb);
        rc = q->enqueue(skb, q) & NET_XMIT_MASK;
        if (qdisc_run_begin(q)) {
                if (unlikely(contended)) {
                        spin_unlock(&q->busylock);
                        contended = false;
                }
                __qdisc_run(q);
        }
}

在所有其他情况下:

  1. 调用 skb_dst_force 强制增加 skb 的目标缓存引用计数。
  2. 调用 qdisc 的 enqueue 函数排队数据到 qdisc。 存储返回码。
  3. 调用 qdisc_run_begin(p) 标记 qdisc 为正在运行。 如果尚未运行,则释放 busylock 并调用 __qdisc_run(p) 来启动 qdisc 处理。

然后,该函数释放一些锁,并返回返回码:

spin_unlock(root_lock);
if (unlikely(contended))
        spin_unlock(&q->busylock);
return rc;
目录
相关文章
|
5月前
|
监控 Linux
Linux的epoll用法与数据结构data、event
Linux的epoll用法与数据结构data、event
59 0
|
运维 监控 网络协议
译|llustrated Guide to Monitoring and Tuning the Linux Networking Stack: Receiving Data
译|llustrated Guide to Monitoring and Tuning the Linux Networking Stack: Receiving Data
120 0
|
11月前
|
Linux
linux下的内存查看(virt,res,shr,data的意义)
linux下的内存查看(virt,res,shr,data的意义)
155 0
|
SQL 存储 缓存
译|Monitoring and Tuning the Linux Networking Stack: Sending Data(十)
译|Monitoring and Tuning the Linux Networking Stack: Sending Data(十)
318 1
|
SQL 缓存 监控
译|Monitoring and Tuning the Linux Networking Stack: Sending Data(十一)
译|Monitoring and Tuning the Linux Networking Stack: Sending Data(十一)
131 0
|
缓存 监控 Linux
译|Monitoring and Tuning the Linux Networking Stack: Sending Data(九)
译|Monitoring and Tuning the Linux Networking Stack: Sending Data(九)
204 0
|
监控 Linux 调度
译|Monitoring and Tuning the Linux Networking Stack: Sending Data(八)
译|Monitoring and Tuning the Linux Networking Stack: Sending Data(八)
90 0
|
5天前
|
Linux Python Perl
Linux命令删除文件里的字符串
Linux命令删除文件里的字符串
17 7
|
5天前
|
Shell Linux
Linux shell编程学习笔记82:w命令——一览无余
Linux shell编程学习笔记82:w命令——一览无余
|
7天前
|
Linux Perl
Linux之sed命令
Linux之sed命令
下一篇
无影云桌面