从原理到实践:掌握DPDK内存池技术(下)

本文涉及的产品
公网NAT网关,每月750个小时 15CU
简介: 从原理到实践:掌握DPDK内存池技术

2.2rte_eal_memory_init映射大页内存

主进程先映射,将映射后的虚拟地址保存到文件/var/run/.rte_hugepage_info中,从进程读取此文件,以相同的虚拟地址进行映射,保证主从进程以相同的地址访问大页内存,这也是实现进程间传递报文零拷贝的关键。

/* init memory subsystem */
int
rte_eal_memory_init(void)
{
    RTE_LOG(DEBUG, EAL, "Setting up physically contiguous memory...\n");
    const int retval = rte_eal_process_type() == RTE_PROC_PRIMARY ?
            rte_eal_hugepage_init() :
            rte_eal_hugepage_attach();
    if (retval < 0)
        return -1;
    if (internal_config.no_shconf == 0 && rte_eal_memdevice_init() < 0)
        return -1;
    return 0;
}

主进程调用 rte_eal_hugepage_init 进行大页映射,并根据分类保存到memseg中。

/*
 * Prepare physical memory mapping: fill configuration structure with
 * these infos, return 0 on success.
 *  1. map N huge pages in separate files in hugetlbfs
 *  2. find associated physical addr
 *  3. find associated NUMA socket ID
 *  4. sort all huge pages by physical address
 *  5. remap these N huge pages in the correct order
 *  6. unmap the first mapping
 *  7. fill memsegs in configuration with contiguous zones
 */
int
rte_eal_hugepage_init(void)
{
    struct rte_mem_config *mcfg;
    struct hugepage_file *hugepage = NULL, *tmp_hp = NULL;
    struct hugepage_info used_hp[MAX_HUGEPAGE_SIZES];
    uint64_t memory[RTE_MAX_NUMA_NODES];
    unsigned hp_offset;
    int i, j, new_memseg;
    int nr_hugefiles, nr_hugepages = 0;
    void *addr;
    //测试物理地址是否可用,如果参数指定no_hugetlbfs不用大页或者
    //通过虚拟地址不能获取物理地址,则认为物理地址不可用,则设置phys_addrs_available为false。
    test_phys_addrs_available();
    memset(used_hp, 0, sizeof(used_hp));
    //获取全局共享配置 mem_config
    /* get pointer to global configuration */
    mcfg = rte_eal_get_configuration()->mem_config;
    //遍历 num_hugepage_sizes 中大页内存
    /* calculate total number of hugepages available. at this point we haven't
     * yet started sorting them so they all are on socket 0 */
    for (i = 0; i < (int) internal_config.num_hugepage_sizes; i++) {
        /* meanwhile, also initialize used_hp hugepage sizes in used_hp */
        used_hp[i].hugepage_sz = internal_config.hugepage_info[i].hugepage_sz;
        //获取所有大页的个数
        nr_hugepages += internal_config.hugepage_info[i].num_pages[0];
    }
    /*
     * allocate a memory area for hugepage table.
     * this isn't shared memory yet. due to the fact that we need some
     * processing done on these pages, shared memory will be created
     * at a later stage.
     */
    //分配整块内存,用于保存 nr_hugepages 个 struct hugepage_file
    tmp_hp = malloc(nr_hugepages * sizeof(struct hugepage_file));
    if (tmp_hp == NULL)
        goto fail;
    memset(tmp_hp, 0, nr_hugepages * sizeof(struct hugepage_file));
    hp_offset = 0; /* where we start the current page size entries */
    //internal_config.socket_mem[i] 保存的是参数 --socket-mem 1024,1024 指定的每个socket上的内存。
    //复制一份到局部变量 memory 中
    /* make a copy of socket_mem, needed for balanced allocation. */
    for (i = 0; i < RTE_MAX_NUMA_NODES; i++)
        memory[i] = internal_config.socket_mem[i];
    //开始映射大页内存,主要工作为函数前面注释的前6条。
    /* map all hugepages and sort them */
    //遍历当前系统上配置的几种大页内存
    for (i = 0; i < (int)internal_config.num_hugepage_sizes; i ++){
        unsigned pages_old, pages_new;
        struct hugepage_info *hpi;
        /*
         * we don't yet mark hugepages as used at this stage, so
         * we just map all hugepages available to the system
         * all hugepages are still located on socket 0
         */
        hpi = &internal_config.hugepage_info[i];
        //如果为0,说明此种大页没有free可用的,跳过
        if (hpi->num_pages[0] == 0)
            continue;
        /* map all hugepages available */
        //先获取当前此种类型的大页个数
        pages_old = hpi->num_pages[0];
        //映射大页,返回实际成功应该的个数。
        //map_all_hugepages最后一个参数为1,表示第一次映射,将映射后的虚拟地址保存到 hugepg_tbl[i].orig_va,
        //后面再单独分析此函数
        pages_new = map_all_hugepages(&tmp_hp[hp_offset], hpi, memory, 1);
        if (pages_new < pages_old) {
            RTE_LOG(DEBUG, EAL,
                "%d not %d hugepages of size %u MB allocated\n",
                pages_new, pages_old,
                (unsigned)(hpi->hugepage_sz / 0x100000));
            //获取未映射成功的大页个数
            int pages = pages_old - pages_new;
            //更新总大页个数
            nr_hugepages -= pages;
            //保存映射成功的大页
            hpi->num_pages[0] = pages_new;
            //如果映射成功的大页个数为0,直接跳过,开始下一种大页的映射
            if (pages_new == 0)
                continue;
        }
        //物理地址可用,则调用 find_physaddrs 获取映射后的虚拟地址对应的物理地址,
        //并保存到 hugepg_tbl[i].physaddr
        if (phys_addrs_available) {
            /* find physical addresses for each hugepage */
            if (find_physaddrs(&tmp_hp[hp_offset], hpi) < 0) {
                RTE_LOG(DEBUG, EAL, "Failed to find phys addr "
                    "for %u MB pages\n",
                    (unsigned int)(hpi->hugepage_sz / 0x100000));
                goto fail;
            }
        } else {
            /* set physical addresses for each hugepage */
            if (set_physaddrs(&tmp_hp[hp_offset], hpi) < 0) {
                RTE_LOG(DEBUG, EAL, "Failed to set phys addr "
                    "for %u MB pages\n",
                    (unsigned int)(hpi->hugepage_sz / 0x100000));
                goto fail;
            }
        }
        //根据映射后的huge文件(rte_map0等)获取socket,并保存到hugepg_tbl[i].socket_id
        if (find_numasocket(&tmp_hp[hp_offset], hpi) < 0){
            RTE_LOG(DEBUG, EAL, "Failed to find NUMA socket for %u MB pages\n",
                    (unsigned)(hpi->hugepage_sz / 0x100000));
            goto fail;
        }
        //根据物理地址从小到大排序 tmp_hp
        qsort(&tmp_hp[hp_offset], hpi->num_pages[0],
              sizeof(struct hugepage_file), cmp_physaddr);
        //重新映射大页内存,这次最后一个参数为0,会将映射后的虚拟地址保存到 hugepg_tbl[i].final_va,
        //这也是最终的虚拟地址。
        //重新映射的目的是为了尽量找到物理地址和虚拟地址都连续的大页内存。
        /* remap all hugepages */
        if (map_all_hugepages(&tmp_hp[hp_offset], hpi, NULL, 0) !=
            hpi->num_pages[0]) {
            RTE_LOG(ERR, EAL, "Failed to remap %u MB pages\n",
                    (unsigned)(hpi->hugepage_sz / 0x100000));
            goto fail;
        }
        //解除第一次映射的虚拟地址 hugepg_tbl[i].orig_va
        /* unmap original mappings */
        if (unmap_all_hugepages_orig(&tmp_hp[hp_offset], hpi) < 0)
            goto fail;
        //偏移,遍历下一种大页
        /* we have processed a num of hugepages of this size, so inc offset */
        hp_offset += hpi->num_pages[0];
    }
    //如果没有通过参数 -m 或者 --socket-mem 指定内存,则获取当前所有大页内存总和
    if (internal_config.memory == 0 && internal_config.force_sockets == 0)
        internal_config.memory = eal_get_hugepage_mem_size();
    nr_hugefiles = nr_hugepages;
    //清空hugepage_info的大页个数
    /* clean out the numbers of pages */
    for (i = 0; i < (int) internal_config.num_hugepage_sizes; i++)
        for (j = 0; j < RTE_MAX_NUMA_NODES; j++)
            internal_config.hugepage_info[i].num_pages[j] = 0;
    //前面获取了大页所在socket,这里计算每种大页在各个socket上的大页个数
    /* get hugepages for each socket */
    for (i = 0; i < nr_hugefiles; i++) {
        int socket = tmp_hp[i].socket_id;
        /* find a hugepage info with right size and increment num_pages */
        const int nb_hpsizes = RTE_MIN(MAX_HUGEPAGE_SIZES,
                (int)internal_config.num_hugepage_sizes);
        for (j = 0; j < nb_hpsizes; j++) {
            if (tmp_hp[i].size ==
                    internal_config.hugepage_info[j].hugepage_sz) {
                internal_config.hugepage_info[j].num_pages[socket]++;
            }
        }
    }
    //复制参数指定的socket内存到memory
    /* make a copy of socket_mem, needed for number of pages calculation */
    for (i = 0; i < RTE_MAX_NUMA_NODES; i++)
        memory[i] = internal_config.socket_mem[i];
    //计算最后需要的大页个数。
    //前面映射的当前系统上所有可用的大页,但是如果参数指定了内存大小,就有可能用不到所有的
    //大页。所以此函数时根据实际需要返回大页个数。
    //如果参数申请的内存大于当前可用的内存,直接返回-1
    /* calculate final number of pages */
    nr_hugepages = calc_num_pages_per_socket(memory,
            internal_config.hugepage_info, used_hp,
            internal_config.num_hugepage_sizes);
    //没有足够内存
    /* error if not enough memory available */
    if (nr_hugepages < 0)
        goto fail;
    //创建文件 /var/run/.rte_hugepage_info,文件大小为nr_hugefiles * sizeof(struct hugepage_file),
    //用来保存实际使用的大页信息,并将此文件进行mmap映射。
    /* create shared memory */
    hugepage = create_shared_memory(eal_hugepage_info_path(),
            nr_hugefiles * sizeof(struct hugepage_file));
    memset(hugepage, 0, nr_hugefiles * sizeof(struct hugepage_file));
    /*
     * unmap pages that we won't need (looks at used_hp).
     * also, sets final_va to NULL on pages that were unmapped.
     */
    //删除前面映射的但是不需要的大页文件。
    //比如当前系统有10个1G可用大页内存,进行映射后会生成10个1G的文件,
    //但是参数 --socket-mem 只指定了1G内存,则需要删除9个1G的文件。
    if (unmap_unneeded_hugepages(tmp_hp, used_hp,
            internal_config.num_hugepage_sizes) < 0) {
        RTE_LOG(ERR, EAL, "Unmapping and locking hugepages failed!\n");
        goto fail;
    }
    /*
     * copy stuff from malloc'd hugepage* to the actual shared memory.
     * this procedure only copies those hugepages that have final_va
     * not NULL. has overflow protection.
     */
    //这里只将实际需要的大页信息保存到 hugepage
    if (copy_hugepages_to_shared_mem(hugepage, nr_hugefiles,
            tmp_hp, nr_hugefiles) < 0) {
        RTE_LOG(ERR, EAL, "Copying tables to shared memory failed!\n");
        goto fail;
    }
    //如果参数指定了 unlink,则要将映射后大页文件unlink掉。unlink会将文件删除(ls 看不到文件了),
    //但是前面打开了这些文件,实际上不会真正删除(lsof 可以看到)。
    /* free the hugepage backing files */
    if (internal_config.hugepage_unlink &&
        unlink_hugepage_files(tmp_hp, internal_config.num_hugepage_sizes) < 0) {
        RTE_LOG(ERR, EAL, "Unlinking hugepage files failed!\n");
        goto fail;
    }
    /* free the temporary hugepage table */
    free(tmp_hp);
    tmp_hp = NULL;
    //前面完成了大页内存的映射,这里要将他们分别保存到 memseg 中。
    //同时满足下面四个条件的hugepage 放在同一个memseg中.
    //1. 同socket
    //2. hugepage 大小相同
    //3. 物理地址连续
    //4. 虚拟地址连续
    /* first memseg index shall be 0 after incrementing it below */
    j = -1;
    for (i = 0; i < nr_hugefiles; i++) {
        new_memseg = 0;
        /* if this is a new section, create a new memseg */
        //第一个大页内存,肯定是新memseg
        if (i == 0)
            new_memseg = 1;
        //和前一个大页的socket不一样,认为是新的memseg
        else if (hugepage[i].socket_id != hugepage[i-1].socket_id)
            new_memseg = 1;
        //和前一个大页的大小不一样,认为是新的memseg
        else if (hugepage[i].size != hugepage[i-1].size)
            new_memseg = 1;
        //和前一个大页的物理地址不连续,认为是新的memseg
        else if ((hugepage[i].physaddr - hugepage[i-1].physaddr) !=
            hugepage[i].size)
            new_memseg = 1;
        //和前一个大页的虚拟地址不连续,认为是新的memseg
        else if (((unsigned long)hugepage[i].final_va -
            (unsigned long)hugepage[i-1].final_va) != hugepage[i].size)
            new_memseg = 1;
        //将大页信息保存到 memseg中。
        //最坏情况下,每个大页使用有一个memseg。
        if (new_memseg) {
            j += 1;
            if (j == RTE_MAX_MEMSEG)
                break;
            mcfg->memseg[j].iova = hugepage[i].physaddr;
            mcfg->memseg[j].addr = hugepage[i].final_va;
            mcfg->memseg[j].len = hugepage[i].size;
            mcfg->memseg[j].socket_id = hugepage[i].socket_id;
            mcfg->memseg[j].hugepage_sz = hugepage[i].size;
        }
        /* continuation of previous memseg */
        else {
            mcfg->memseg[j].len += mcfg->memseg[j].hugepage_sz;
        }
        hugepage[i].memseg_id = j;
    }
    if (i < nr_hugefiles) {
        RTE_LOG(ERR, EAL, "Can only reserve %d pages "
            "from %d requested\n"
            "Current %s=%d is not enough\n"
            "Please either increase it or request less amount "
            "of memory.\n",
            i, nr_hugefiles, RTE_STR(CONFIG_RTE_MAX_MEMSEG),
            RTE_MAX_MEMSEG);
        goto fail;
    }
    //将大页信息保存到文件 /var/run/.rte_hugepage_info 后,就可以将其解除映射,
    //等待从进程读取此文件即可。
    munmap(hugepage, nr_hugefiles * sizeof(struct hugepage_file));
    return 0;
}


/*
 * Mmap all hugepages of hugepage table: it first open a file in
 * hugetlbfs, then mmap() hugepage_sz data in it. If orig is set, the
 * virtual address is stored in hugepg_tbl[i].orig_va, else it is stored
 * in hugepg_tbl[i].final_va. The second mapping (when orig is 0) tries to
 * map contiguous physical blocks in contiguous virtual blocks.
 */
static unsigned
map_all_hugepages(struct hugepage_file *hugepg_tbl, struct hugepage_info *hpi,
          uint64_t *essential_memory __rte_unused, int orig)
{
    int fd;
    unsigned i;
    void *virtaddr;
    void *vma_addr = NULL;
    size_t vma_len = 0;
    //遍历大页
    for (i = 0; i < hpi->num_pages[0]; i++) {
        uint64_t hugepage_sz = hpi->hugepage_sz;
        //如果是第一次映射,保存大页索引,获取大页所在路径
        if (orig) {
            hugepg_tbl[i].file_id = i;
            hugepg_tbl[i].size = hugepage_sz;
            //大页文件所在路径 /mnt/huge/rte_mapx
            eal_get_hugefile_path(hugepg_tbl[i].filepath,
                    sizeof(hugepg_tbl[i].filepath), hpi->hugedir,
                    hugepg_tbl[i].file_id);
            hugepg_tbl[i].filepath[sizeof(hugepg_tbl[i].filepath) - 1] = '\0';
        }
        else if (vma_len == 0) {
            unsigned j, num_pages;
            /* reserve a virtual area for next contiguous
             * physical block: count the number of
             * contiguous physical pages. */
            //找到和当前大页物理地址连续的大页
            for (j = i+1; j < hpi->num_pages[0] ; j++) {
                //前一个大页的物理地址加上大页大小不等于当前页的物理地址,
                //说明这两个大页物理地址不连续。
                if (hugepg_tbl[j].physaddr !=
                    hugepg_tbl[j-1].physaddr + hugepage_sz)
                    break;
            }
            num_pages = j - i;
            vma_len = num_pages * hugepage_sz;
            /* get the biggest virtual memory area up to
             * vma_len. If it fails, vma_addr is NULL, so
             * let the kernel provide the address. */
            //如果物理地址连续,则判断虚拟地址是否也可以连续
            vma_addr = get_virtual_area(&vma_len, hpi->hugepage_sz);
            if (vma_addr == NULL)
                vma_len = hugepage_sz;
        }
        //打开大页文件
        /* try to create hugepage file */
        fd = open(hugepg_tbl[i].filepath, O_CREAT | O_RDWR, 0600);
        if (fd < 0) {
            RTE_LOG(DEBUG, EAL, "%s(): open failed: %s\n", __func__,
                    strerror(errno));
            goto out;
        }
        //第一次映射,vma_addr为NULL,让kernel返回合适的虚拟地址
        /* map the segment, and populate page tables,
         * the kernel fills this segment with zeros */
        virtaddr = mmap(vma_addr, hugepage_sz, PROT_READ | PROT_WRITE,
                MAP_SHARED | MAP_POPULATE, fd, 0);
        if (virtaddr == MAP_FAILED) {
            RTE_LOG(DEBUG, EAL, "%s(): mmap failed: %s\n", __func__,
                    strerror(errno));
            close(fd);
            goto out;
        }
        //第一次映射,将返回的虚拟地址保存到 orig_va
        if (orig) {
            hugepg_tbl[i].orig_va = virtaddr;
        }
        else {//第二次映射,将返回的虚拟地址保存到 final_va
            hugepg_tbl[i].final_va = virtaddr;
        }
        /* set shared flock on the file. */
        if (flock(fd, LOCK_SH | LOCK_NB) == -1) {
            RTE_LOG(DEBUG, EAL, "%s(): Locking file failed:%s \n",
                __func__, strerror(errno));
            close(fd);
            goto out;
        }
        close(fd);
        //下一个大页映射的虚拟地址为当前虚拟地址加大页大小,保证
        //所有大页的虚拟地址连续
        vma_addr = (char *)vma_addr + hugepage_sz;
        vma_len -= hugepage_sz;
    }
out:
    return i;
}

从进程调用 rte_eal_hugepage_attach 映射和主进程相同的虚拟地址。

/*
 * This creates the memory mappings in the secondary process to match that of
 * the server process. It goes through each memory segment in the DPDK runtime
 * configuration and finds the hugepages which form that segment, mapping them
 * in order to form a contiguous block in the virtual memory space
 */
int
rte_eal_hugepage_attach(void)
{
    //获取全局共享内存配置
    const struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
    struct hugepage_file *hp = NULL;
    unsigned num_hp = 0;
    unsigned i, s = 0; /* s used to track the segment number */
    unsigned max_seg = RTE_MAX_MEMSEG;
    off_t size = 0;
    int fd, fd_zero = -1, fd_hugepage = -1;
    if (aslr_enabled() > 0) {
        RTE_LOG(WARNING, EAL, "WARNING: Address Space Layout Randomization "
                "(ASLR) is enabled in the kernel.\n");
        RTE_LOG(WARNING, EAL, "   This may cause issues with mapping memory "
                "into secondary processes\n");
    }
    test_phys_addrs_available();
    //打开 /dev/zero,用来测试虚拟地址是否可用
    fd_zero = open("/dev/zero", O_RDONLY);
    if (fd_zero < 0) {
        RTE_LOG(ERR, EAL, "Could not open /dev/zero\n");
        goto error;
    }
    //打开文件 /var/run/.rte_hugepage_info
    fd_hugepage = open(eal_hugepage_info_path(), O_RDONLY);
    if (fd_hugepage < 0) {
        RTE_LOG(ERR, EAL, "Could not open %s\n", eal_hugepage_info_path());
        goto error;
    }
    //主进程已经将需要的大页进行映射,并保存到了 mem_config->memseg[]中,
    //遍历memseg,将主进程保存到memseg的虚拟地址在从进程映射,查看是否能
    //映射成功,如果不能成功,则报错返回,说明不能和主进程使用相同的地址。
    /* map all segments into memory to make sure we get the addrs */
    for (s = 0; s < RTE_MAX_MEMSEG; ++s) {
        void *base_addr;
        /*
         * the first memory segment with len==0 is the one that
         * follows the last valid segment.
         */
        if (mcfg->memseg[s].len == 0)
            break;
        /*
         * fdzero is mmapped to get a contiguous block of virtual
         * addresses of the appropriate memseg size.
         * use mmap to get identical addresses as the primary process.
         */
        base_addr = mmap(mcfg->memseg[s].addr, mcfg->memseg[s].len,
                 PROT_READ,
                 MAP_PRIVATE,
                 fd_zero, 0);
        if (base_addr == MAP_FAILED ||
            base_addr != mcfg->memseg[s].addr) {
            max_seg = s;
            if (base_addr != MAP_FAILED) {
                /* errno is stale, don't use */
                RTE_LOG(ERR, EAL, "Could not mmap %llu bytes "
                    "in /dev/zero at [%p], got [%p] - "
                    "please use '--base-virtaddr' option\n",
                    (unsigned long long)mcfg->memseg[s].len,
                    mcfg->memseg[s].addr, base_addr);
                munmap(base_addr, mcfg->memseg[s].len);
            } else {
                RTE_LOG(ERR, EAL, "Could not mmap %llu bytes "
                    "in /dev/zero at [%p]: '%s'\n",
                    (unsigned long long)mcfg->memseg[s].len,
                    mcfg->memseg[s].addr, strerror(errno));
            }
            if (aslr_enabled() > 0) {
                RTE_LOG(ERR, EAL, "It is recommended to "
                    "disable ASLR in the kernel "
                    "and retry running both primary "
                    "and secondary processes\n");
            }
            goto error;
        }
    }
    //获取文件 /var/run/.rte_hugepage_info 的实际大小
    size = getFileSize(fd_hugepage);
    //映射此文件
    hp = mmap(NULL, size, PROT_READ, MAP_PRIVATE, fd_hugepage, 0);
    if (hp == MAP_FAILED) {
        RTE_LOG(ERR, EAL, "Could not mmap %s\n", eal_hugepage_info_path());
        goto error;
    }
    //计算保存的大页个数
    num_hp = size / sizeof(struct hugepage_file);
    RTE_LOG(DEBUG, EAL, "Analysing %u files\n", num_hp);
    //再次遍历 memseg
    s = 0;
    while (s < RTE_MAX_MEMSEG && mcfg->memseg[s].len > 0){
        void *addr, *base_addr;
        uintptr_t offset = 0;
        size_t mapping_size;
        /*
         * free previously mapped memory so we can map the
         * hugepages into the space
         */
        //解除到 /dev/zero 的映射
        base_addr = mcfg->memseg[s].addr;
        munmap(base_addr, mcfg->memseg[s].len);
        //找到memseg中的大页进行映射
        /* find the hugepages for this segment and map them
         * we don't need to worry about order, as the server sorted the
         * entries before it did the second mmap of them */
        for (i = 0; i < num_hp && offset < mcfg->memseg[s].len; i++) {
            if (hp[i].memseg_id == (int)s){
                fd = open(hp[i].filepath, O_RDWR);
                if (fd < 0) {
                    RTE_LOG(ERR, EAL, "Could not open %s\n",
                        hp[i].filepath);
                    goto error;
                }
                mapping_size = hp[i].size;
                addr = mmap(RTE_PTR_ADD(base_addr, offset),
                        mapping_size, PROT_READ | PROT_WRITE,
                        MAP_SHARED, fd, 0);
                close(fd); /* close file both on success and on failure */
                if (addr == MAP_FAILED ||
                        addr != RTE_PTR_ADD(base_addr, offset)) {
                    RTE_LOG(ERR, EAL, "Could not mmap %s\n",
                        hp[i].filepath);
                    goto error;
                }
                offset+=mapping_size;
            }
        }
        RTE_LOG(DEBUG, EAL, "Mapped segment %u of size 0x%llx\n", s,
                (unsigned long long)mcfg->memseg[s].len);
        s++;
    }
    /* unmap the hugepage config file, since we are done using it */
    munmap(hp, size);
    close(fd_zero);
    close(fd_hugepage);
    return 0;
}

rte_eal_memzone_init

虽然函数名字是memzone初始化,但更多的是初始化 malloc_heap。

/*
 * Init the memzone subsystem
 */
int
rte_eal_memzone_init(void)
{
    struct rte_mem_config *mcfg;
    const struct rte_memseg *memseg;
    /* get pointer to global configuration */
    mcfg = rte_eal_get_configuration()->mem_config;
    //从进程不用执行
    /* secondary processes don't need to initialise anything */
    if (rte_eal_process_type() == RTE_PROC_SECONDARY)
        return 0;
    memseg = rte_eal_get_physmem_layout();
    if (memseg == NULL) {
        RTE_LOG(ERR, EAL, "%s(): Cannot get physical layout\n", __func__);
        return -1;
    }
    rte_rwlock_write_lock(&mcfg->mlock);
    //清空 memzone 个数
    /* delete all zones */
    mcfg->memzone_cnt = 0;
    memset(mcfg->memzone, 0, sizeof(mcfg->memzone));
    rte_rwlock_write_unlock(&mcfg->mlock);
    //初始化 heap
    return rte_eal_malloc_heap_init();
}
int
rte_eal_malloc_heap_init(void)
{
    struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
    unsigned ms_cnt;
    struct rte_memseg *ms;
    if (mcfg == NULL)
        return -1;
    //遍历 memseg,按照memseg中的socket插入malloc_heaps中
    for (ms = &mcfg->memseg[0], ms_cnt = 0;
            (ms_cnt < RTE_MAX_MEMSEG) && (ms->len > 0);
            ms_cnt++, ms++) {
        malloc_heap_add_memseg(&mcfg->malloc_heaps[ms->socket_id], ms);
    }
    return 0;
}

将memseg放入 malloc_heap,首尾各分配一个 malloc_elem,后者指向前者,将前者插入free_head

/*
 * Expand the heap with a memseg.
 * This reserves the zone and sets a dummy malloc_elem header at the end
 * to prevent overflow. The rest of the zone is added to free list as a single
 * large free block
 */
static void
malloc_heap_add_memseg(struct malloc_heap *heap, struct rte_memseg *ms)
{
    /* allocate the memory block headers, one at end, one at start */
    //memseg的首地址作为第一个 malloc_elem
    struct malloc_elem *start_elem = (struct malloc_elem *)ms->addr;
    //memseg的尾地址减去malloc_elem大小作为最后一个 malloc_elem
    struct malloc_elem *end_elem = RTE_PTR_ADD(ms->addr, ms->len - MALLOC_ELEM_OVERHEAD);
    end_elem = RTE_PTR_ALIGN_FLOOR(end_elem, RTE_CACHE_LINE_SIZE);
    //首尾malloc_elem相减得出第一个elem的大小
    const size_t elem_size = (uintptr_t)end_elem - (uintptr_t)start_elem;
    //初始化第一个 elem,状态为free,表示可以被分配
    malloc_elem_init(start_elem, heap, ms, elem_size);
        elem->heap = heap;
        elem->ms = ms;
        elem->prev = NULL;
        memset(&elem->free_list, 0, sizeof(elem->free_list));
        elem->state = ELEM_FREE;
        elem->size = size;
        elem->pad = 0;
        set_header(elem);
        set_trailer(elem);
    //初始化最后一个 elem,并指向前一个elem,状态为busy,表示永远不会被分配走
    malloc_elem_mkend(end_elem, start_elem);
        malloc_elem_init(elem, prev->heap, prev->ms, 0);
        elem->prev = prev;
        elem->state = ELEM_BUSY; /* mark busy so its never merged */
    //根据第一个elem的大小插入对应的free_head
    malloc_elem_free_list_insert(start_elem);
        size_t idx;
        //根据size计算idx
        idx = malloc_elem_free_list_index(elem->size - MALLOC_ELEM_HEADER_LEN);
        elem->state = ELEM_FREE;
        LIST_INSERT_HEAD(&elem->heap->free_head[idx], elem, free_list);
    //保存此heap可分配的总内存大小
    heap->total_size += elem_size;
}

三、内存分配

3.1将内存固定到NUMA节点

当分配常规内存时,理论上,它可以被分配到RAM中的任何位置。这在单CPU系统上没有什么问题,但是许多DPDK用户是在支持非统一内存访问 (NUMA) 的多CPU系统上运行应用的。对于NUMA来说,所有内存都是不同的:某一个CPU对一些内存的访问(如不在该CPU所属NUMA NODE上的内存)将比其他内存访问花费更长的时间,这是由于它们相对于执行所述内存访问的CPU所在的物理位置不同。

进行常规内存分配时,通常无法控制该内存分配到哪里,因此如果DPDK在这样的系统上使用常规内存,就可能会导致以下的情况:在一个CPU上执行的线程却在无意中访问属于非本地NUMA节点的内存。

理想的NUMA节点分配

虽然这种跨NUMA节点访问在所有现代操作系统上都比较少有,因为这样的访问都是都是NUMA感知的,而且即使没有DPDK还是有方法能对内存实施NUMA定位。但是DPDK带来的不仅仅是NUMA感知,事实上,整个DPDK API的构建都旨在为每个操作提供明确的NUMA感知。如果不明确请求NUMA节点访问(其中所述结构必须位于内存中),通常无法分配给定的DPDK数据结构。

DPDK API提供的这种明确的NUMA感知有助于确保用户应用在每个操作中都能考虑到NUMA感知;换句话说,DPDK API可以减少写出编写性能差的代码的可能性。

硬件、物理地址和直接内存存取(DMA)

DPDK被认为是一组用户态的网络包输入/输出库,到目前为止,它基本上保持了最初的任务声明。但是,电脑上的硬件不能处理用户空间的虚拟地址,因为它不能感知任何用户态的进程和其所分配到的用户空间虚拟地址。相反,它只能访问真实的物理地址上的内存,也就是CPU、RAM和系统所有其他的部分用来相互通信的地址。

出于对效率的考量,现代硬件几乎总是使用直接内存存取(DMA)事务。通常,为了执行一个DMA事务,内核需要参与创建一个支持DMA的存储区域,将进程内虚拟地址转换成硬件能够理解的真实物理地址,并启动DMA事务。这是大多数现代操作系统中输入输出的工作方式;然而,这是一个耗时的过程,需要上下文切换、转换和查找操作,这不利于高性能输入/输出。

DPDK的内存管理以一种简单的方式解决了这个问题。每当一个内存区域可供DPDK使用时,DPDK就通过询问内核来计算它的物理地址。由于DPDK使用锁定内存,通常以大页的形式,底层内存区域的物理地址预计不会改变,因此硬件可以依赖这些物理地址始终有效,即使内存本身有一段时间没有使用。然后,DPDK会在准备由硬件完成的输入/输出事务时使用这些物理地址,并以允许硬件自己启动DMA事务的方式配置硬件。这使DPDK避免不必要的开销,并且完全从用户空间执行输入/输出。

IOMMU和IOVA

默认情况下,任何硬件都可以访问整个系统,因此它可以在任何地方执行DMA 事务。这有许多安全隐患。例如,流氓和/或不可信进程(包括在VM (虚拟机)内运行的进程)可能使用硬件设备来读写内核空间,和几乎其他任何存储位置。为了解决这个问题,现代系统配备了输入输出内存管理单元(IOMMU)。这是一种硬件设备,提供DMA地址转换和设备隔离功能,因此只允许特定设备执行进出特定内存区域(由IOMMU指定)的DMA 事务,而不能访问系统内存地址空间的其余部分。

由于IOMMU的参与,硬件使用的物理地址可能不是真实的物理地址,而是IOMMU分配给硬件的(完全任意的)输入输出虚拟地址(IOVA)。一般来说,DPDK社区可以互换使用物理地址和IOVA这两个术语,但是根据上下文,这两者之间的区别可能很重要。例如,DPDK 17.11和更新的DPDK长期支持(LTS)版本在某些情况下可能根本不使用实际的物理地址,而是使用用户空间虚拟地址(甚至完全任意的地址)来实现DMA。IOMMU负责地址转换,因此硬件永远不会注意到两者之间的差异。

IOMMU将物理地址重新映射到IOVA地址的示例

根据DPDK的初始化方式,IOVA地址可能代表也可能不代表实际的物理地址,但有一点始终是正确的:DPDK知道底层内存布局,因此可以利用这一点。例如,它可以以创建IOVA连续虚拟区域的方式映射页面,或者甚至利用IOMMU来重新排列内存映射,以使内存看起来IOVA连续,即使底层物理内存可能不连续。

因此,这种对底层物理内存区域的感知是DPDK工具包中的又一个利器。大多数数据结构不关心IOVA地址,但当它们关心时,DPDK为软件和硬件提供了利用物理内存布局的工具,并针对不同的用例进行优化

请注意,IOMMU不会自行设置任何映射。相反,平台、硬件和操作系统必须进行配置,来使用IOMMU。这种配置说明超出了本系列文章的范围,但是在DPDK文档和其他地方有相关说明。一旦系统和硬件设置为使IOMMU,DPDK就可以使用IOMMU为DPDK分配的任何内存区域设置DMA映射。使用IOMMU是运行DPDK的推荐方法,因为这样做更安全,并且它提供了可用性优势。

3.2内存分配和管理

DPDK不使用常规内存分配函数,如malloc()。相反,DPDK管理自己的内存。更具体地说,DPDK分配大页并在此内存中创建一个堆(heap)并将其提供给用户应用程序并用于存取应用程序内部的数据结构。

使用自定义内存分配器有许多优点。最明显的一个是终端应用程序的性能优势:DPDK创建应用程序要使用的内存区域,并且应用程序可以原生支持大页、NUMA节点亲和性、对DMA地址的访问、IOVA连续性等等性能优势,而无需任何额外的开发。

DPDK内存分配总是在CPU高速缓存行(cache line)的边界上对齐,每个分配的起始地址将是系统高速缓存行大小的倍数。这种方法防止了许多常见的性能问题,例如未对齐的访问和错误的数据共享,其中单个高速缓存行无意中包含(可能不相关的)多个内核同时访问的数据。对于需要这种对齐的用例(例如,分配硬件环结构),也支持任何其他二次幂值 (当然> =高速缓存行大小)。

DPDK中的任何内存分配也是线程安全的。这意味着在任何CPU核心上发生的任何分配都是原子的,不会干扰任何其他分配。这可能看起来很无足轻重 (毕竟,常规glibc内存分配例程通常也是线程安全的),但是一旦在多处理环境中考虑,它的重要性就会变得更加清晰。

DPDK支持特定风格的协同多处理,其中主进程管理所有DPDK资源,多个辅助进程可以连接到主进程,并共享由主进程管理的资源的访问。

DPDK的共享内存实现不仅通过映射不同进程中的相同资源 (类似于shmget () 机制) 来实现,还通过复制另一个进程中主进程的地址空间来实现。因此,由于两个进程中的所有内容都位于相同的地址,指向DPDK内存对象的任何指针都将跨进程工作,无需任何地址转换。这对于跨进程传递数据时的性能非常重要。

640.jpg

操作系统和DPDK分配器的比较


malloc_heap_alloc

对外提供API的最底层实现是malloc_heap,它提供了函数malloc_heap_alloc用来从heap中分配内存。

/*
 * Main function to allocate a block of memory from the heap.
 * It locks the free list, scans it, and adds a new memseg if the
 * scan fails. Once the new memseg is added, it re-scans and should return
 * the new element after releasing the lock.
 */
void *
malloc_heap_alloc(struct malloc_heap *heap,
        const char *type __attribute__((unused)), size_t size, unsigned flags,
        size_t align, size_t bound)
{
    struct malloc_elem *elem;
    size = RTE_CACHE_LINE_ROUNDUP(size);
    align = RTE_CACHE_LINE_ROUNDUP(align);
    //分配内存都要先加锁
    rte_spinlock_lock(&heap->lock);
    //先根据请求的内存大小判断是否有这么多可用内存
    elem = find_suitable_element(heap, size, flags, align, bound);
    if (elem != NULL) {
        //有可用内存,则将memseg进行分割
        elem = malloc_elem_alloc(elem, size, align, bound);
        /* increase heap's count of allocated elements */
        heap->alloc_count++;
    }
    rte_spinlock_unlock(&heap->lock);
    return elem == NULL ? NULL : (void *)(&elem[1]);
}

rte_memzone_reserve

rte_memzone_reserve用来从heap中分配一个内存,可用指定长度和socket。、

/*
 * Return a pointer to a correctly filled memzone descriptor. If the
 * allocation cannot be done, return NULL.
 */
const struct rte_memzone *
rte_memzone_reserve(const char *name, size_t len, int socket_id,
            unsigned flags)
{
    return rte_memzone_reserve_thread_safe(name, len, socket_id,
                           flags, RTE_CACHE_LINE_SIZE, 0);
}

rte_mempool_create

rte_mempool_create用来申请内存保存固定大小的对象,会申请多个memzone,其中一个用于存放struct rte_mempool,其余的一个或者多个memzone存放固定大小的对象。后面介绍的mbuf会作为固定大小的对象存储在mempool中。

rte_pktmbuf_pool_create

rte_mbuf用来存放报文,它是在应用启动前调用rte_pktmbuf_pool_create申请好的内存,后面申请和释放只是指针的操作。

name: mempool的名字。

n: mempool中存放obj的个数。

cache_size: 每个cpu缓存obj的最大个数。

priv_size: mbuf结构后面的内存,可用来存放应用的私有数据。

data_room_size: mbuf中存放报文的空间大小。

/* helper to create a mbuf pool */
struct rte_mempool *
rte_pktmbuf_pool_create(const char *name, unsigned n,
    unsigned cache_size, uint16_t priv_size, uint16_t data_room_size,
    int socket_id)
{
    struct rte_mempool *mp;
    struct rte_pktmbuf_pool_private mbp_priv;
    const char *mp_ops_name;
    unsigned elt_size;
    int ret;
    if (RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) != priv_size) {
        RTE_LOG(ERR, MBUF, "mbuf priv_size=%u is not aligned\n",
            priv_size);
        rte_errno = EINVAL;
        return NULL;
    }
    //mempool中一个对象的大小
    elt_size = sizeof(struct rte_mbuf) + (unsigned)priv_size +
        (unsigned)data_room_size;
    mbp_priv.mbuf_data_room_size = data_room_size;
    mbp_priv.mbuf_priv_size = priv_size;
    //创建mempool结构,并插入共享链表rte_mempool_tailq
    mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
         sizeof(struct rte_pktmbuf_pool_private), socket_id, 0);
    if (mp == NULL)
        return NULL;
    mp_ops_name = rte_eal_mbuf_default_mempool_ops();
    ret = rte_mempool_set_ops_byname(mp, mp_ops_name, NULL);
    if (ret != 0) {
        RTE_LOG(ERR, MBUF, "error setting mempool handler\n");
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }
    rte_pktmbuf_pool_init(mp, &mbp_priv);
//申请n个obj所占内存,如果一个memzone不满足,会申请多个memzone。将申请的memzone信息保存到rte_mempool_memhdr中,并插入mp->mem_list。同时将每个memzone按obj大小分成n份(每份就相当于是一个mbuf),将每份的地址又保存到rte_mempool_objhdr,并插入mp->elt_list,然后将每份的地址入队到mp->pool_data(rte_ring)。
    ret = rte_mempool_populate_default(mp);
    if (ret < 0) {
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }
    //调用rte_pktmbuf_init初始化mbuf
    rte_mempool_obj_iter(mp, rte_pktmbuf_init, NULL);
    return mp;
}

初始化后,mempool内存结构如下、

其中rte_mbuf的内存结构如下

rte_pktmbuf_alloc

rte_pktmbuf_alloc用来从指定的mempool中获取一个mbuf,优先从当前cpu的cache里取,如果cache中没了再从pool里取。

static inline struct rte_mbuf *rte_pktmbuf_alloc(struct rte_mempool *mp)
{
    struct rte_mbuf *m;
    if ((m = rte_mbuf_raw_alloc(mp)) != NULL)
        rte_pktmbuf_reset(m);
    return m;
}

3.3内存池

DPDK也有一个内存池管理器,在整个DPDK中广泛用于管理大型对象池,对象大小固定。它的用途很多——包输入/输出、加密操作、事件调度和许多其他需要快速分配或解除分配固定大小缓冲区的用例。DPDK内存池针对性能进行了高度优化,并支持可选的线程安全(如果用户不需要线程安全,则无需为之付费)和批量操作,所有这些都会导致每个缓冲区的分配或空闲操作周期计数达到两位数以下。

也就是说,即使DPDK内存池的主题出现在几乎所有关于DPDK内存管理的讨论中,从技术上讲,内存池管理器是一个建立在常规DPDK内存分配器之上的库。它不是标准DPDK内存分配工具的一部分,它的内部工作与DPDK内存管理例程完全分离 (并且非常不同) 。因此,这超出了本系列文章的范围。但是,有关DPDK内存池管理器库的更多信息可以在DPDK文档中找到。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
相关文章
|
28天前
|
C++
【C++】深入解析C/C++内存管理:new与delete的使用及原理(二)
【C++】深入解析C/C++内存管理:new与delete的使用及原理
|
28天前
|
编译器 C++ 开发者
【C++】深入解析C/C++内存管理:new与delete的使用及原理(三)
【C++】深入解析C/C++内存管理:new与delete的使用及原理
|
28天前
|
存储 C语言 C++
【C++】深入解析C/C++内存管理:new与delete的使用及原理(一)
【C++】深入解析C/C++内存管理:new与delete的使用及原理
|
2月前
|
KVM 虚拟化
KVM的热添加技术之内存
文章介绍了KVM虚拟化技术中如何通过命令行调整虚拟机内存配置,包括调小和调大内存的步骤,以及一些相关的注意事项。
68 4
KVM的热添加技术之内存
|
3月前
|
监控 算法 Java
Java内存管理:垃圾收集器的工作原理与调优实践
在Java的世界里,内存管理是一块神秘的领域。它像是一位默默无闻的守护者,确保程序顺畅运行而不被无用对象所困扰。本文将带你一探究竟,了解垃圾收集器如何在后台无声地工作,以及如何通过调优来提升系统性能。让我们一起走进Java内存管理的迷宫,寻找提高应用性能的秘诀。
|
3月前
|
安全 Java 开发者
Java 内存模型解析与实践
在Java的世界中,理解内存模型对于编写高效、线程安全的代码至关重要。本文将深入探讨Java内存模型的核心概念,并通过实例分析其对并发编程的影响,旨在为读者提供一套实用的策略和思考方式来优化多线程应用的性能与安全性。
62 0
|
2月前
|
监控 算法 Java
深入理解Java中的垃圾回收机制在Java编程中,垃圾回收(Garbage Collection, GC)是一个核心概念,它自动管理内存,帮助开发者避免内存泄漏和溢出问题。本文将探讨Java中的垃圾回收机制,包括其基本原理、不同类型的垃圾收集器以及如何调优垃圾回收性能。通过深入浅出的方式,让读者对Java的垃圾回收有一个全面的认识。
本文详细介绍了Java中的垃圾回收机制,从基本原理到不同类型垃圾收集器的工作原理,再到实际调优策略。通过通俗易懂的语言和条理清晰的解释,帮助读者更好地理解和应用Java的垃圾回收技术,从而编写出更高效、稳定的Java应用程序。
|
25天前
|
SQL 安全 算法
ChatGPT高效提问—prompt实践(漏洞风险分析-重构建议-识别内存泄漏)
ChatGPT高效提问—prompt实践(漏洞风险分析-重构建议-识别内存泄漏)
30 0
|
28天前
|
程序员 编译器 数据处理
【C语言】深度解析:动态内存管理的机制与实践
【C语言】深度解析:动态内存管理的机制与实践
|
2月前
ARM64技术 —— MMU处于关闭状态时,内存访问是怎样的?
ARM64技术 —— MMU处于关闭状态时,内存访问是怎样的?