eBPF 深度探索: 高效 DNS 监控实现(下)

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: eBPF 深度探索: 高效 DNS 监控实现(下)
从数据包到进程


要获取关于 eBPF 中的进程信息,可以使用以下函数:


bpf_get_current_pid_tgid()bpf_get_current_uid_gid()bpf_get_current_comm(char *buf, int size_of_buf)。当程序被绑定到对某个内核函数调用时(如第一个示例所示),就可以使用它们。UID/GID 应该比较明确,但对于那些以前没有接触过内核操作细节的人来说,还是需要解释一下。在内核中被视为 PID 的东西在用户空间中显示为进程的 thread ID。内核认为用户空间中的 thread group ID 是 PID。类似的,bpf_get_current_comm()返回的不是通常的进程名(可以通过ps命令查看),而是线程名。


好吧,我们总归会拿到进程数据,那怎么将数据传递到用户空间?Table 就是用于此目的,通过BPF_PERF_OUTPUT(event)创建,通过方法event.perf_submit(ctx, data, data_size)传递,并通过b.perf_buffer_poll()轮询接收。在此之后,只要数据可用,就会调用callback()函数,即

b["event"].open_perf_buffer(callback)


下面将详细介绍这一机制,但现在,我们继续从理论上进行分析。我们既可以传输数据,也可以传输数据包本身。但要做到这一点,必须为传输的数据选择一个特定长度的变量。怎么选?直接回答是 512 字节,但并不正确。这一长度并没有考虑 EDNS,而且我们还想正确跟踪基于 TCP 的 DNS 报文。因此我们不得不分配大量的预留空间,而更大的包将会被丢弃,大多数情况下,我们将分配比所需更多的内存。我不喜欢这种方法,幸运的是,还有另一个方法: perf_submit_skb()。除了数据外,它还从缓冲区传输指定字节的数据包。但需要注意,该方法仅适用于网络程序 eBPF: 套接字,XDP。也就是说,我们无法获得有关进程的信息。


幸运的是,可以使用多个 eBPF 程序并互相交换数据!这也可以通过 Table 来实现。声明如下:


BPF_TABLE_PUBLIC("hash", key, val, name, max_elements);



这是为了使其对其他 eBPF 程序可用。在另一个程序中,通过如下代码访问:


BPF_TABLE("extern", key, val, name, max_elements);



因此,即使 5 元组(协议、源地址、源端口、目的地址和目的端口)都一样,也不会丢失数据包,键将是以下结构:


struct port_key {
     u8 proto;
     u32 saddr;
     u32 daddr;
     u16 sport;
     u16 dport;
 };


值是我们想知道的关于这个进程的所有信息:


struct port_val {
     u32 ifindex;
     u32 pid;
     u32 tgid;
     u32 uid;
     u32 gid;
     char comm[64];
 };


ifindex是网络设备,我们将在套接字上运行的另一个程序中填充这个值。在这里,我们用它来将整个结构转移到未来的用户空间。


总结: 当调用内核函数发送数据包时,存储涉及到的进程信息。当数据包出现在网络接口上时(不管是传出的还是传入),检查是否在目的地之间通过这样或那样的协议传输包的任何信息。如果有,就将其与包一起传递给 Python,在那里完成其余工作。


好了,我们已经讨论程序的基本逻辑,接下来开始编程吧!


我的名字是进程


我们从获取相关进程的信息开始。udp_sendmsg()tcp_sendmsg()函数用于发送数据包,两者都将sock结构作为第一个参数。在 eBPF 中有两种方法可以访问所研究函数的实参: 将其指定为函数的形参,或者使用宏PT_REGS_PARMx,其中 x 是实参号。下面将展示这两个选项,这是第一个程序,C_BPF_KPROBE:


// The structure that will be used as the key for 
// eBPF table 'proc_ports':
struct port_key {
    u8 proto;
    u32 saddr;
    u32 daddr;
    u16 sport;
    u16 dport;
};
// The structure that will be stored in the eBPF table 'proc_ports' 
// contains information about the process:
struct port_val {
    u32 ifindex;
    u32 pid;
    u32 tgid;
    u32 uid;
    u32 gid;
    char comm[64];
};
// Public (accessible from other eBPF programs) eBPF table in which 
// information about the process is written. 
// It's read when a packet appears on the socket:
BPF_TABLE_PUBLIC("hash", struct port_key, struct port_val, proc_ports, 20480);
// These are two ways to get access to the function arguments:
//int trace_udp_sendmsg(struct pt_regs *ctx) {
// struct sock *sk = (struct sock *)PT_REGS_PARM1(ctx);
int trace_udp_sendmsg(struct pt_regs *ctx, struct sock *sk) {
    u16 sport = sk->sk_num;
    u16 dport = sk->sk_dport;
    // Processing packets only on port 53.
    // 13568 = ntohs(53);
    if (sport == 13568 || dport == 13568) {
        // Preparing the data:
        u32 saddr = sk->sk_rcv_saddr;
        u32 daddr = sk->sk_daddr;
        u64 pid_tgid = bpf_get_current_pid_tgid();
        u64 uid_gid = bpf_get_current_uid_gid();
        // Forming the key structure.
        // These strange transformations will be explained below.
        struct port_key key = {.proto = 17};
        key.saddr = htonl(saddr);
        key.daddr = htonl(daddr);
        key.sport = sport;
        key.dport = htons(dport);
        // Forming a structure with the process properties:
        struct port_val val = {};
        val.pid = pid_tgid >> 32;
        val.tgid = (u32)pid_tgid;
        val.uid = (u32)uid_gid;
        val.gid = uid_gid >> 32;
        bpf_get_current_comm(val.comm, 64);
        //Writing the value into the eBPF table:
        proc_ports.update(&key, &val);
    }
    return 0;
}


使用tcp_sendmsg也完全一样,唯一的区别是,在结构port_key中,字段proto将等于 6,这两个值(17 和 6)分别是 UDP 和 TCP 的协议号,可以在/etc/protocols文件中查看这些值。


两个bpf_get_current_*函数都返回 64 比特,因此我们分别获取高低 32 比特来提取数据。此外,对于 PID/TGID,我们可以立即以常见的形式获取(例如,对于 PID,写入字段的高 32 位,其中包含内核认为是 TGID 的内容)。


我们接下来看看关键数据结构的转换。在下一节中,我们将在程序中创建一个类似的结构。但我们不是从原子结构sock中获取数据,而是从 eBPF 的__sk_buff中,数据的存储形式为:


__u32 remote_ip4; /* Stored in network byte order */
__u32 local_ip4; /* Stored in network byte order */
__u32 remote_port; /* Stored in network byte order */
__u32 local_port; /* stored in host byte order */


提取到用户空间


我们的第二个程序BPF_SOCK_TEXT将"挂起(hang)"在套接字上,为每个包检查对应进程的信息,并将其和包本身一起传输到用户空间:


// The structure that will be used as the key for
// eBPF table 'proc_ports':
struct port_key {
    u8 proto;
    u32 saddr;
    u32 daddr;
    u16 sport;
    u16 dport;
};
// The structure that will be stored in the eBPF table 'proc_ports',
// Contains information about the process:
struct port_val {
    u32 ifindex;
    u32 pid;
    u32 tgid;
    u32 uid;
    u32 gid;
    char comm[64];
};
// eBPF table from which information about the process is extracted.
// Filled when calling kernel functions udp_sendmsg()/tcp_sendmsg():
BPF_TABLE("extern", struct port_key, struct port_val, proc_ports, 20480);
// Table for transferring data to the user space:
BPF_PERF_OUTPUT(dns_events);
// Look for DNS packets among the data passing through the socket and 
// check if there is any information about the process:
int dns_matching(struct __sk_buff *skb) {
    u8 *cursor = 0;
// Checking the IP protocol:
struct ethernet_t *ethernet = cursor_advance(cursor, sizeof(*ethernet));
     if (ethernet->type == ETH_P_IP) {
        struct ip_t *ip = cursor_advance(cursor, sizeof(*ip));
        u8 proto;
        u16 sport;
        u16 dport;
        // Checking the transport layer protocol:
        if (ip->nextp == IPPROTO_UDP) {
            struct udp_t *udp = cursor_advance(cursor, sizeof(*udp));
            proto = 17;
            // Getting the data about the ports:
            sport = udp->sport;
            dport = udp->dport;
        } else if (ip->nextp == IPPROTO_TCP) {
            struct tcp_t *tcp = cursor_advance(cursor, sizeof(*tcp));
            // We don't need packets where no data is transmitted:
            if (!tcp->flag_psh) {
                return 0;
            }
            proto = 6;
            // Getting the data about the ports:
            sport = tcp->src_port;
            dport = tcp->dst_port;
        } else {
            return 0;
        }
        // If it's a DNS query:
        if (dport == 53 || sport == 53) {
            // Form a key structure:
            struct port_key key = {};
            key.proto = proto;
            if (skb->ingress_ifindex == 0) {
                key.saddr = ip->src;
                key.daddr = ip->dst;
                key.sport = sport;
                key.dport = dport;
            } else {
                key.saddr = ip->dst;
                key.daddr = ip->src;
                key.sport = dport;
                key.dport = sport;
            }
            // By the key, look for a value in the eBPF table:
            struct port_val *p_val;
            p_val = proc_ports.lookup(&key);
            // If no value is found, then we have no information about the 
            // process and there is no point in continuing:
            if (!p_val) {
                return 0;
            }
            // Network device index:
            p_val->ifindex = skb->ifindex;
            // Transmit the structure with the process information along with 
            // skb->len bytes sent to the socket:
            dns_events.perf_submit_skb(skb, skb->len, p_val,
                                       sizeof(struct port_val));
            return 0;
        } //dport == 53 || sport == 53
    } //ethernet->type == ETH_P_IP
return 0;
}


该程序的启动方式与第一个示例相同。我们在数据包中移动指针,从不同级别的协议中收集信息。当前仍然不考虑 IP 头的实际长度,但还是添加了一些新的东西,对于 TCP 包,我们将检查其标志,过滤掉不携带数据的包(SYNACK 等)。


但我们必须恢复键,从而从proc_ports表中获取数据。同时,必须区分流量的方向,毕竟,当我们在表中输入数据时,意味着我们是源。但是对于传入的数据包,源将是远程服务器。为了理解数据包的移动方向,我将ingress_ifindex标识为 0 用于标识输出流量。


提供服务


我们需要通过 Python 做三件事: 将程序加载到内核中,从内核中获取数据,并对其进行处理。


前两个任务很简单。此外,我们已经在第一个例子中考虑了使用 eBPF 的两种方法:


# BPF initialization:
bpf_kprobe = BPF(text=C_BPF_KPROBE)
bpf_sock = BPF(text=BPF_SOCK_TEXT)
# Send UDP:
bpf_kprobe.attach_kprobe(event="udp_sendmsg", fn_name="trace_udp_sendmsg")
# Send TCP:
bpf_kprobe.attach_kprobe(event="tcp_sendmsg", fn_name="trace_tcp_sendmsg")
# Socket:
function_dns_matching = bpf_sock.load_func("dns_matching", BPF.SOCKET_FILTER)
BPF.attach_raw_socket(function_dns_matching, '')


获取数据的代码甚至更短:


bpf_sock["dns_events"].open_perf_buffer(print_dns)
while True:
    try:
        bpf_sock.perf_buffer_poll()
    except KeyboardInterrupt:
        exit()


但数据处理将更加繁琐。尽管有现成模块,我们还是决定自己解析协议头。首先,我想自己弄清楚这是如何发生的(最后,尽管在当前情况下正确处理 IP 包头的长度没有意义,因为头域有额外选项的包将在 eBPF 中被丢弃),其次是减少对模块的依赖。然而,对于直接解析 DNS,我仍然(到目前为止)使用现成模块,DNS 结构比 IP/TCP 稍微复杂一些,需要另一个模块(ctypes)来处理 C 数据类型。


def print_dns(cpu, data, size):
    import ctypes as ct
    class SkbEvent(ct.Structure):
        _fields_ = [
            ("ifindex", ct.c_uint32),
            ("pid", ct.c_uint32),
            ("tgid", ct.c_uint32),
            ("uid", ct.c_uint32),
            ("gid", ct.c_uint32),
            ("comm", ct.c_char * 64),
            ("raw", ct.c_ubyte * (size - ct.sizeof(ct.c_uint32 * 5) - ct.sizeof(ct.c_char * 64)))
        ]
    # We get our 'port_val' structure and also the packet itself in the 'raw' field:
    sk = ct.cast(data, ct.POINTER(SkbEvent)).contents
    # Protocols:
    NET_PROTO = {6: "TCP", 17: "UDP"}
    # eBPF operates on thread names.
    # Sometimes they coincide with process names, but often not.
    # So we try to get the process name by its PID:
    try:
        with open(f'/proc/{sk.pid}/comm', 'r') as proc_comm:
            proc_name = proc_comm.read().rstrip()
    except:
        proc_name = sk.comm.decode()
    # Get the name of the network interface by index:
    ifname = if_indextoname(sk.ifindex)
    # The length of the Ethernet frame header is 14 bytes:
    ip_packet = bytes(sk.raw[14:])
    # The length of the IP packet header is not fixed due to the arbitrary
    # number of parameters.
    # Of all the possible IP header we are only interested in 20 bytes:
    (length, _, _, _, _, proto, _, saddr, daddr) = unpack('!BBHLBBHLL', ip_packet[:20])
    # The direct length is written in the second half of the first byte (0b00001111 = 15):
    # len_iph = length & 15
    # Length is written in 32-bit words, convert it to bytes:
    # len_iph = len_iph * 4
    # Convert addresses from numbers into IPs, assembling it into octets:
    saddr = ".".join(map(str, [saddr >> 24 & 0xff, saddr >> 16 & 0xff, saddr >> 8 & 0xff, saddr & 0xff]))
    daddr = ".".join(map(map(str, [daddr >> 24 & 0xff, daddr >> 16 & 0xff, daddr >> 8 & 0xff, daddr & 0xff]))
    # If the transport layer protocol is UDP:
    if proto == 17:
        udp_packet = ip_packet[len_iph:]
        (sport, dport) = unpack('!HH', udp_packet[:4])
        # UDP datagram header length is 8 bytes:
        dns_packet = udp_packet[8:]
    # If the transport layer protocol is TCP:
    elif proto == 6:
        tcp_packet = ip_packet[len_iph:]
        # TCP packet header length is also not fixed due to the optional
        # options. Of the entire TCP header, we are only interested in the data up to the 13th
        # byte (header length):
        (sport, dport, _, length) = unpack('!HHQB', tcp_packet[:13])
        # The direct length is written in the first half (4 bits):
        len_tcph = length >> 4
        # Length is written in 32-bit words, converted to bytes:
        len_tcph = len_tcph * 4
        # That's the tricky part.
        # I don't know where I went wrong or why I need a 2 byte offset,
        # but it's necessary because the DNS packet doesn't start until after it:
        dns_packet = tcp_packet[len_tcph + 2:]
    # other protocols are not handled:
    else:
        return
    # DNS data decoding:
    dns_data = dnslib.DNSRecord.parse(dns_packet)
    # Resource record types:
    DNS_QTYPE = {1: "A", 28: "AAAA"}
    # Query:
    If dns_data.header.qr == 0:
        # We are only interested in A (1) and AAAA (28) records:
        for q in dns_data.questions:
            If q.qtype == 1 or q.qtype == 28:
                print(f'COMM={proc_name} PID={sk.pid} TGID={sk.tgid} DEV={ifname} PROTO={NET_PROTO[proto]} SRC={saddr} DST={daddr} SPT={sport} DPT={dport} UID={sk.uid} GID={sk.gid} DNS_QR=0 DNS_NAME={q.qname} DNS_TYPE={DNS_QTYPE[q.qtype]}')
    # Response:
    elif dns_data.header.qr == 1:
        # We are only interested in A (1) and AAAA (28) records:
        For rr in dns_data.rr:
            If rr.rtype == 1 or rr.rtype == 28:
                print(f'COMM={proc_name} PID={sk.pid} TGID={sk.tgid} DEV={ifname} PROTO={NET_PROTO[proto]} SRC={saddr} DST={daddr} SPT={sport} DPT={dport} UID={sk.uid} GID={sk.gid} DNS_QR=1 DNS_NAME={rr.rname} DNS_TYPE={DNS_QTYPE[rr.rtype]} DNS_DATA={rr.rdata}')
    else:
        print('Invalid DNS query type.')


最后


启动应用程序 Python 代码,在另一个控制台中用dig工具发起请求。


# dig @1.1.1.1 google.com +tcp



如果正确执行,程序输出应该是这样的:


# python3 final_code_eBPF_dns.py
The program is running. Press Ctrl-C to abort.
COMM=dig PID=10738 TGID=10739 DEV=ens18 PROTO=TCP SRC=192.168.44.3 DST=1.1.1.1 SPT=57915 DPT=53 UID=0 GID=0 DNS_QR=0 DNS_NAME=google.com. DNS_TYPE=A
COMM=dig PID=10738 TGID=10739 DEV=ens18 PROTO=TCP SRC=1.1.1.1 DST=192.168.44.3 SPT=53 DPT=57915 UID=0 GID=0 DNS_QR=1 DNS_NAME=google.com. DNS_TYPE=A DNS_DATA=142.251.12.101
COMM=dig PID=10738 TGID=10739 DEV=ens18 PROTO=TCP SRC=1.1.1.1 DST=192.168.44.3 SPT=53 DPT=57915 UID=0 GID=0 DNS_QR=1 DNS_NAME=google.com. DNS_TYPE=A DNS_DATA=142.251.12.113
COMM=dig PID=10738 TGID=10739 DEV=ens18 PROTO=TCP SRC=1.1.1.1 DST=192.168.44.3 SPT=53 DPT=57915 UID=0 GID=0 DNS_QR=1 DNS_NAME=google.com. DNS_TYPE=A DNS_DATA=142.251.12.102
COMM=dig PID=10738 TGID=10739 DEV=ens18 PROTO=TCP SRC=1.1.1.1 DST=192.168.44.3 SPT=53 DPT=57915 UID=0 GID=0 DNS_QR=1 DNS_NAME=google.com. DNS_TYPE=A DNS_DATA=142.251.12.139
COMM=dig PID=10738 TGID=10739 DEV=ens18 PROTO=TCP SRC=1.1.1.1 DST=192.168.44.3 SPT=53 DPT=57915 UID=0 GID=0 DNS_QR=1 DNS_NAME=google.com. DNS_TYPE=A DNS_DATA=142.251.12.100
COMM=dig PID=10738 TGID=10739 DEV=ens18 PROTO=TCP SRC=1.1.1.1 DST=192.168.44.3 SPT=53 DPT=57915 UID=0 GID=0 DNS_QR=1 DNS_NAME=google.com. DNS_TYPE=A DNS_DATA=142.251.12.138


到此为止,我们已经创建了一个有用的应用程序,可以显示系统中所有的 DNS 查询。希望上面的解释足够详细,这样如果你对编写 eBPF 程序感兴趣,可以更容易开始。这段代码已经帮助我更好的了解服务器上发生的事情,以下链接可以获取完整代码。


完整代码


结论


这段代码还可以做得更好吗?当然可以!首先,应该增加对 IPv6 的支持。其次,不要再依赖 IP 头的固定长度,而是要对其进行解析。我拒绝使用 Python 库来处理数据包,不是没有原因的,在 C 语言中,仍然需要手动操作。第三,用 C 语言重写代码也很好,可以完全放弃 Python,当然还要增加几行 JSON 输出的代码,这样在以后开发 UI 仪表盘时会更方便。这将导致第四点,对 DNS 数据包的手动分析。最后,最诱人的一点是停止查看端口(因为也许 DNS 数据包并不总是通过 53 端口),并尝试分析每个数据包,在其中寻找那些符合 DNS 格式的数据包,这将使我们即使在非标准的端口上也能检测到数据包。

目录
相关文章
|
2天前
|
运维 监控 DataWorks
DataWorks 稳定性保障全解析:深入监控与资源调配
DataWorks 的稳定性保障体系涵盖精细监控与资源调配,确保企业数据业务高效、稳定运行。监控模块包括资源、任务和质量监控,及时预警并处理异常;资源调配策略则针对集成、调度、数据服务及计算资源进行科学配置,保障数据同步、任务优先级和高并发需求。通过全方位的监控和合理的资源配置,DataWorks 为企业筑牢数据根基,助力数字化转型。
23 10
|
5天前
|
存储 监控 算法
企业内网监控系统中基于哈希表的 C# 算法解析
在企业内网监控系统中,哈希表作为一种高效的数据结构,能够快速处理大量网络连接和用户操作记录,确保网络安全与效率。通过C#代码示例展示了如何使用哈希表存储和管理用户的登录时间、访问IP及操作行为等信息,实现快速的查找、插入和删除操作。哈希表的应用显著提升了系统的实时性和准确性,尽管存在哈希冲突等问题,但通过合理设计哈希函数和冲突解决策略,可以确保系统稳定运行,为企业提供有力的安全保障。
|
2月前
|
SQL 监控 安全
员工上网行为监控软件:SQL 在数据查询监控中的应用解析
在数字化办公环境中,员工上网行为监控软件对企业网络安全和管理至关重要。通过 SQL 查询和分析数据库中的数据,企业可以精准了解员工的上网行为,包括基础查询、复杂条件查询、数据统计与分析等,从而提高网络管理和安全防护的效率。
31 0
|
7月前
|
存储 JSON 监控
Elasticsearch索引监控全面解析
Elasticsearch索引监控全面解析
134 0
|
6月前
|
传感器 数据采集 运维
ERP系统中的生产线监控与异常处理解析
【7月更文挑战第25天】 ERP系统中的生产线监控与异常处理解析
239 8
|
6月前
|
传感器 数据采集 监控
ERP系统中的生产过程监控与质量管理解析
【7月更文挑战第25天】 ERP系统中的生产过程监控与质量管理解析
287 0
ERP系统中的生产过程监控与质量管理解析
|
7月前
|
消息中间件 监控 Java
「布道师系列文章」宝兰德徐清康解析 Kafka 和 AutoMQ 的监控
本文由北京宝兰德公司解决方案总监徐清康撰写,探讨了Kafka和AutoMQ集群的监控。
251 2
「布道师系列文章」宝兰德徐清康解析 Kafka 和 AutoMQ 的监控
|
7月前
|
监控 NoSQL MongoDB
深入MongoDB监控:全面解析命令、实用示例与最佳实践
深入MongoDB监控:全面解析命令、实用示例与最佳实践
211 0
|
8月前
|
监控 API 数据安全/隐私保护
屏幕监控软件开发指南:C++实现原理解析
在当今数字化时代,屏幕监控软件成为了企业管理和个人隐私保护的重要工具。本文将深入探讨如何使用C++语言实现屏幕监控软件,并解析其实现原理。我们将通过多个代码示例来说明其工作方式,最后将介绍如何将监控到的数据自动提交到网站。
207 3

相关产品

  • 云解析DNS
  • 推荐镜像

    更多