UCX-UCT统一通信传输层3-服务端和客户端调用栈详解(及相关)_源码解读

简介: 主流程(服务端或客户端): 1. 主函数中解析命令行参数(parse_cmd), 设置默认服务端口2. 初始化上下文(ucs_async_context_create, 异步事件上下文用于管理定时器和FD通知), 在其中, 初始化多生产者/多消费者队列(ucs_mpmc_queue_init), 初始化非阻塞异步轮询器(ucs_async_poll_init), 初始化可重入自旋锁上下文等3. 创建工人(uct_worker_create), 工人代表着 progress 的引擎。 可以在应用程序中创建多个进度引擎,例如供多个线程使用4. 根据入参查找期望的传输层(dev_tl_loo

主流程


主流程(服务端或客户端):1.主函数中解析命令行参数(parse_cmd), 设置默认服务端口2.初始化上下文(ucs_async_context_create, 异步事件上下文用于管理定时器和FD通知), 在其中, 初始化多生产者/多消费者队列(ucs_mpmc_queue_init),初始化非阻塞异步轮询器(ucs_async_poll_init), 初始化可重入自旋锁上下文等3.创建工人(uct_worker_create), 工人代表着 progress 的引擎。 可以在应用程序中创建多个进度引擎,例如供多个线程使用4.根据入参查找期望的传输层(dev_tl_lookup, 由最小延迟决定要使用的设备和传输)5.设置回调(uct_iface_set_am_handler), 设置服务端接收到客户端数据后的回调6.建立socket连接(connect_common), 服务端监听端口, 等待客户端发起socket连接7. 客户端连接服务端后,两边交换地址(sendrecv, 先通过socket发送和接收长度, 然后发送和接收地址, 交换地址)8.创建端点(uct_ep_create),获取端点地址(uct_ep_get_address),连接对等端点(uct_ep_connect_to_ep, 内部通过 ibv_modify_qp 设置QP状态机建立QP连接)9. 连接建立后,客户端调用短消息(do_am_short)/缓冲区(do_am_bcopy)/零拷贝(do_am_zcopy)发送数据10.显示驱动工人推进(uct_worker_progress, 该例程显式地处理任何未完成的通信操作和活动消息请求, 底层通过poll网卡完成事件,ibv_poll_cq)11.资源销毁(uct_ep_destroy,free其他资源等)

启动服务端源码解读

启动服务端:gdb: examples/uct_hello_world.c -> int main(int argc, char **argv)  if(parse_cmd(argc, argv,&cmd_args))-> int parse_cmd(int argc, char *const argv[], cmd_args_t *args)    args->server_port   =13337  status =ucs_async_context_create(UCS_ASYNC_MODE_THREAD_SPINLOCK,&async)-> 创建异步执行上下文, 分配并初始化异步执行上下文。 这可用于确保安全的事件传递,模式为线程自旋锁(可重入), 在异步对象上, 初始化多生产者和多消费者队列     async =ucs_malloc(sizeof(*async),"async context")-> 内部分配, 并记录内存分配信息       void*ptr =malloc(size)    status =ucs_async_context_init(async, mode)      ucs_trace_func("async=%p", async)-> 跟踪方法       status =ucs_mpmc_queue_init(&async->missed)-> 初始化多生产者/多消费者队列         ucs_queue_head_init(&mpmc->queue)        pthread_spin_init(&lock->lock, lock_flags)-> 初始化自旋锁,标志:PTHREAD_PROCESS_SHAREDPTHREAD_PROCESS_PRIVATE      status =ucs_async_method_call(mode, context_init, async)->.context_init       = ucs_async_poll_init -> 异步轮询初始化, 不阻塞       async->last_wakeup =ucs_get_time()-> asm volatile("rdtsc":"=a"(low),"=d"(high)) 汇编获取时间 ->UCS/ARCH/INFO:如果无法从CPU型号中读取x86 TSC值,请不要从/proc/cpuinfo中读取测量的CPU频率,因为它只能代表核心频率而不是TSC频率。 相反,通过一个短循环进行测量,当频率测量收敛或达到 1ms 时间限制时停止         ucs_async_thread_spinlock_ops.context_init(async)-> ucs_async_thread_spinlock_init -> 可重入自旋锁上下文初始化   status =uct_worker_create(async,UCS_THREAD_MODE_SINGLE,&if_info.worker)->创建工人(独立资源)->UCS_CLASS_DEFINE_NAMED_NEW_FUNC(uct_worker_create ->用宏初始化工人(类似面向对象实例化)-> 创建一个工作对象。 工人代表着progress的引擎。 可以在应用程序中创建多个进度引擎,例如供多个线程使用。 Transports 可以为每个 Worker 分配单独的通信资源,以便每个 Worker 都可以独立于其他 Worker 进行操作 -> 声明/定义一个创建类实例的函数, 初始化工人私有worker, 传输层链表     ucs_class_t *cls =&uct_priv_worker_t_class -> typedef struct uct_priv_worker     obj =ucs_class_malloc(cls)    ucs_class_t *_cls =&uct_priv_worker_t_class     uct_priv_worker_t_init ->staticUCS_CLASS_INIT_FUNC(uct_priv_worker_t ->staticUCS_CLASS_INIT_FUNC(uct_priv_worker_t -> 初始化父类以及当前类的传输链表       staticUCS_CLASS_INIT_FUNC(uct_worker_t)        ucs_callbackq_init(&self->progress_q);        ucs_vfs_obj_add_dir(NULL, self,"uct/worker/%p", self)  status =dev_tl_lookup(&cmd_args,&if_info)-> 查找期望的传输层, 动态加载, 由最小延迟决定要使用的设备和传输, 实参为地址指针, 形参为指针     status =uct_query_components(&components,&num_components)-> 查询组件列表。 获取当前系统上可用的传输组件列表, 得到8个组件,为每个组件添加vfs对象:uct/component/组件名       UCS_MODULE_FRAMEWORK_DECLARE(uct)-> 声明一个“框架”,它是可加载模块的特定集合的上下文。 通常特定框架中的模块提供相同内部接口的替代实现         static ucs_init_once_t ucs_framework_init_once_uct ={{{0,0,0,0,0,0,0,{0,0}}},0}-> 互斥锁+初始化标记       UCS_MODULE_FRAMEWORK_LOAD(uct,0)->void ucs_load_modules 加载所有模块, self, tcp, sysv, posix, ib, rdmacm, cma, knem,...      [in] – _flags 模块加载标志,参见 ucs_module_load_flags_t 框架中的模块由 dlopen() 加载。 模块的共享库名称为:“lib<framework>_<module>.so.<version>”,其中: -<framework> 是框架名称 -<module> 是模块名称。 框架中所有模块的列表由自动生成的 config.h 文件中的预处理器宏 <framework>_MODULES 定义,例如:#define foo_MODULES ":bar1:bar2"-<version> 是模块的共享库版本,由 libtool 生成。 它是从当前库(libucs) 的完整路径中提取的。 在以下位置搜索模块共享库(按优先级顺序): 1.当前共享库(libucs) 目录内的“ucx”子目录 2. ${libdir}/ucx,其中 ${libdir} 是 库的安装目录 请注意,如果 libucs 是从其安装路径加载的,则(1)(2) 是同一位置。 仅当 libucs 被移动或从构建目录运行时,路径才会不同,在这种情况下,优先使用“本地库”而不是“已安装的”库。[in] – _name 框架名称(作为令牌)         ucs_load_modules("uct", uct_MODULES,&ucs_framework_init_once_uct,0)-> 加载框架, 模块           ucs_module_loader_init_paths -> 找到路径,然后动态加载:ucs_module_load_one(framework, module_name, flags)->ucs_module_init(module_path, dl)-> 找到初始化方法名,: module_init_name,动态打开模块: dl =dlopen(module_path, mode)            fullpath =realpath(module_path, buffer)            init_func =(init_func_t)ucs_module_dlsym_shallow module_init_name -> 找到全局初始化函数入口, 一般都没有           modules_str =ucs_strdup(modules,"modules_list")-> 从内存跟踪表中查找字符串           module_name =strtok_r(modules_str,":",&saveptr)->每次取冒号分割的第一个字符串分段(字符串分割函数 strtok_r )          ucs_module_loader_add_dl_dir             动态库路径:0x6070b0"/home/xb/project/ucx/src/ucs/.libs/libucs.so.0"            dladdr((void*)&ucs_module_loader_state,&dl_info)->利用dladdr来获得so自身的路径(ucs_module_loader_state)            ucs_module_loader_state.srch_path[ucs_module_loader_state.srchpath_cnt++]= path -> 记录动态库位置           ucs_module_loader_add_install_dir           ucs_module_global_init -> 查找动态库入库函数地址: addr =dlsym(dl, symbol)-> status =init_func()            voidUCS_F_CTORuct_ib_init()              uct_component_register(&uct_ib_component)-> 注册组件               uct_tl_register(&uct_ib_component, uct_ib_tls[i])-> 注册所有IB传输层       ucs_list_for_each uct_components_list 8个组件 -> ucs_vfs_obj_add_dir -> ucs_vfs_node_add 虚拟文件系统     component_attr.md_resources = alloca -> alloca - 分配自动释放的内存     status =uct_component_query(components[cmpt_index],&component_attr)-> 查询网卡, 拷贝内存域数据资源       status = component->query_md_resources(component,&resources,&num_resources);-> uct_md_query_single_md_resource -> 调用每个组件的查询内存域资源接口       UCS_MODULE_FRAMEWORK_LOAD(uct_ib,0)-> 调用查询内存域资源接口中会加载对应的动态库     status =uct_md_config_read(components[cmpt_index],NULL,NULL,&md_config);-> 读取内存域配置       status =uct_config_read(&bundle,&component->md_config, env_prefix)        status =ucs_config_parser_fill_opts(config_bundle->data, entry, full_prefix,0)          ucs_config_parser_set_default_values(opts, entry->table)-> ucs_config_sscanf_table           ucs_config_parser_get_sub_prefix(env_prefix,&sub_prefix)          ucs_config_parse_config_files()          ucs_config_apply_config_vars -> 应用环境变量, 以及自定义前缀的环境变量     for 迭代内存域资源     uct_md_open -> 重要函数, 打开内存域       status = component->md_open(component, md_name, config,&md)-> ucs_status_t uct_ib_md_open ->IB实现的内存域打开函数         ib_device_list =ibv_get_device_list(&num_devices)-> 获取所有网卡列表, 获取设备列表,比如4个网口(网卡设备), 可通过 ibdev2netdev 查询rdma网口映射         ibv_fork_init -> 核心原理: 通过对所有已注册的MR所在内存页打MADV_DONTFORK标记,创建子进程后,MR所在内存页不会触发COW拷贝,避免了前面所说的COW带来网卡DMA内存地址不一致的问题,但会引入额外的内存记录和查找开销(降低性能)        status = uct_ib_ops[i]->ops->open(ib_device, md_config,&md)->static ucs_status_t uct_ib_mlx5_devx_md_open           vhca_id -> 虚拟主机通道适配器ID          ctx =uct_ib_mlx5_devx_open_device(ibv_device)-> 通过创建完成队列和事件通道来检查网卡设备是否支持事件通道             ctx =mlx5dv_open_device(ibv_device,&dv_attr)-> verbs_open_device -> rdma-core             ibv_create_cq(ctx,1,NULL,NULL,0)            ibv_destroy_cq(cq)            event_channel = mlx5dv_devx_create_event_channel             mlx5dv_devx_destroy_event_channel(event_channel)          md =ucs_derived_of(uct_ib_md_alloc(sizeof(*md),"ib_mlx5_devx_md", ctx)          status =uct_ib_mlx5_check_uar(md)-> 用户访问区域             uct_ib_mlx5_devx_uar_init               uct_ib_mlx5_devx_alloc_uar                 mlx5dv_devx_alloc_uar             uct_ib_mlx5_devx_uar_cleanup           md->mkey_tag =0;-> 使用间接密钥使 MR 无效           uct_ib_mlx5_devx_mr_lru_init(md)          status =uct_ib_device_query(dev, ibv_device)            uct_ib_query_device               ibv_get_device_name               ret =ibv_query_device_ex(ctx,NULL, attr)                ret = vctx->query_device_ex(context, input, attr,sizeof(*attr))            ibv_query_port             ucs_topo_resolve_sysfs_path ->  "/sys/devices/pci0000:15/0000:15:04.0/0000:17:00.0"->PCI地址             ucs_topo_get_sysfs_dev             uct_ib_device_set_pci_id             ucs_topo_get_pci_bw -> 获取PCI带宽               effective_bw =(p->bw_gbps *1e9/8.0)* width *((double)p->encoding / p->decoding)* link_utilization;-> 计算带宽           ret =mlx5dv_devx_general_cmd(ctx,in,sizeof(in), out,sizeof(out))-> ucs_status_t uct_ib_mlx5_devx_general_cmd -> rdma-core -> 通过 devx 接口发出通用命令, 介绍 DEVX 对象及其 DVAPI:创建/修改/读取/销毁。 还添加了 DEVX 通用命令 API,以便能够直接从固件读取 CAP,参考: https://patchwork.kernel.org/project/linux-rdma/patch/1539190590-31186-2-git-send-email-yishaih@mellanox.com/          status =uct_ib_mlx5_devx_query_lag(md,&lag_state)UCT_IB_MLX5_CMD_OP_QUERY_LAG->链路汇聚(bonding),参考: https://docs.nvidia.com/networking/display/bluefielddpuosv385/link+aggregation           md->port_select_mode =uct_ib_mlx5_devx_query_port_select(md)            uct_ib_mlx5_devx_general_cmd UCT_IB_MLX5_CMD_OP_QUERY_LAG          uct_ib_mlx5_is_xgvmi_alias_supported(ctx)-> 跨越GVMI(XGVMI)-DPU 可以代表主机常驻内存启动 RDMA 操作,仅当数据源自或目标为 DPU 内存时才涉及 DPU 内存, Guest VMID 主机_虚机ID          uct_ib_mlx5_devx_check_odp(md, md_config, cap)->按需分页(ODP) 是一种可以缓解内存注册缺点的技术。 应用程序不再需要确定地址空间的底层物理页,并跟踪映射的有效性。 相反,当页面不存在时,HCA 向操作系统请求最新的转换,并且操作系统使由于不存在页面或映射更改而不再有效的转换无效。 ODP 不支持连续页。ODP 可以进一步分为 2 个子类:显式 ODP 和隐式 ODP。显式 ODP 在显式 ODP 中,应用程序仍然注册内存缓冲区以进行通信,但此操作用于定义 IO 的访问控制而不是 pin-down 页面。 ODP内存区域(MR) 在注册时不需要具有有效的映射。 隐式 ODP 在隐式 ODP 中,为应用程序提供了一个特殊的内存密钥,该密钥代表其完整的地址空间。 所有引用该键的 IO 访问(受限于与该键关联的访问权限)不需要注册任何虚拟地址范围。 有关 ODP的更多信息,请参阅了解按需寻呼(ODP) 社区帖子           uct_ib_mlx5_devx_general_cmd           dev->atomic_align =ucs_rounddown_pow2(arg_size)-> 向下取整对齐            uct_ib_md_open_common(&md->super, ibv_device, md_config)-> ucs_status_t uct_ib_md_open_common             uct_ib_device_init -> 初始化IB设备               uct_ib_device_get_locality -> 获取cpu位置top               ucs_sys_fcntl_modfl O_NONBLOCK-> 设置fd为非阻塞模式                 oldfl =fcntl(fd,F_GETFL)              ucs_async_set_event_handler uct_ib_async_event_handler -> 异步事件处理                 ucs_async_method_call(mode, add_event_fd, async, event_fd, events)                          ucs_async_thread_add_event_fd                     ucs_async_thread_start(&thread)                    ucs_event_set_add                       epoll_ctl(event_set->event_fd,EPOLL_CTL_ADD, fd,&raw_event)                    ucs_async_pipe_push(&thread->wakeup)                      ret =write(p->write_fd,&dummy,sizeof(dummy))->0通知对端             uct_ib_md_parse_subnet_prefix -> 添加UCT_IB_SUBNET_PREFIX=fe80::以按subnet_prefix过滤IB端口             uct_ib_check_gpudirect_driver /sys/kernel/mm/memory_peers/nv_mem/version             uct_ib_check_gpudirect_driver /dev/kfd             uct_ib_md_check_dmabuf(md)                               ibv_reg_dmabuf_mr(md->pd,0,ucs_get_page_size(),0, bad_fd,UCT_IB_MEM_ACCESS_FLAGS)            uct_ib_md_set_pci_bw(md, md_config)-> 修复:PCI 速度和系统设备 ID 设置不正确, 先从系统获取, 没找到则从底层设备获取           uct_ib_mlx5_md_port_counter_set_id_init(md)            ucs_carray_for_each(counter_set_id, md->port_counter_set_ids,sizeof(md->port_counter_set_ids))-> 遍历c语言数组           ucs_mpool_params_reset(&mp_params)-> 重置内存池参数           mp_params.ops             =&uct_ib_mlx5_dbrec_ops -> 设置池操作表           ucs_mpool_init(&mp_params,&md->dbrec_pool)            VALGRIND_CREATE_MEMPOOL(mp,0,0)-> valgrind内存池           uct_ib_mlx5_md_buf_alloc(md,ucs_get_page_size(),0,&md->zero_buf,&md->zero_mem,0,"zero umem");            先对齐内存             madvise(buf, size,MADV_DONTFORK)->MADV_DONTFORK在执行fork(2)后,子进程不允许使用此范围的页面。这样是为了避免COW机制导致父进程在写入页面时更改页面的物理位置             mlx5dv_devx_umem_reg(md->super.dev.ibv_context, buf, size, access_mode)-> 注册或取消注册由 devx 接口使用的用户内存。 寄存器动词公开 UMEMDEVX 对象,用于 DMA 的用户内存注册。 用于注册用户存储器的 API 将用户地址、长度和访问标志作为输入,并向用户提供一个对象作为输出,该对象保存由固件返回到该已注册存储器的 UMEMID。 用户将在使用此内存而不是物理地址列表的设备直接命令中使用该 UMEMID,例如在 mlx5dv_devx_obj_create 上创建 QP          uct_ib_md_parse_relaxed_order(&md->super, md_config, ksm_atomic)          uct_ib_mlx5_devx_init_flush_mr(md)            uct_ib_reg_mr(&md->super, md->zero_buf,UCT_IB_MD_FLUSH_REMOTE_LENGTH,&params,UCT_IB_MEM_ACCESS_FLAGS,&md->flush_mr);-> 当 access_flags 包含 IBV_ACCESS_ON_DEMAND时,ibv_reg_mr() 可能会失败并出现 EAGAIN。 这意味着预取由于与失效冲突而失败               UCS_PROFILE_CALL_ALWAYS(ibv_reg_mr, md->pd, address, length, access_flags)-> 调用verbs接口注册内存               ibv_reg_dmabuf_mr -> 注册内存的另一种方式               uct_ib_md_print_mem_reg_err_msg -> 如果内存为空, 打印错误                 UCS_STRING_BUFFER_ONSTACK(msg,256)-> 声明一个字符串缓冲区,该缓冲区使用现有字符串作为后备存储。 此类字符串缓冲区不会分配额外的内存,也不必进行清理,并且它还可以用于在作为函数参数传递的现有 C 字符串缓冲区上构建字符串。                 uct_ib_memlock_limit_msg(&msg, err)                  ucs_sys_get_effective_memlock_rlimit                     getrlimit(RLIMIT_MEMLOCK,&limit_info)-> 获取内存限制                 ucs_string_buffer_cstr(&msg)                  c_str =ucs_array_begin(&strb->str)->返回字符串数组第一个元素                    ((&strb->str)->buffer)            uct_ib_mlx5_devx_reg_ksm_data_addr               uct_ib_mlx5_alloc_mkey_inbox(list_size,&in)              uct_ib_mlx5_devx_reg_ksm(md, atomic, iova, length, list_size ->UCT/IB/MLX5:内存注册流程部分重构,使用params struct进行多线程注册(从上层传递),使用params struct注册flush_mr,改进调试日志记录                 UCT_IB_MLX5DV_SET(create_mkey_in,in, opcode,UCT_IB_MLX5_CMD_OP_CREATE_MKEY)-> 往结构体中插入一个值                 mkc =UCT_IB_MLX5DV_ADDR_OF(create_mkey_in,in, memory_key_mkey_entry)-> 限制所有条目的缓冲区大小以提高性能。 将密钥长度的虚拟地址(KLM) 与固定内存大小关联时使用 KSM                mlx5dv_devx_obj_create         static ucs_status_t uct_self_md_open -> self模块实现的打开内存域       uct_md_vfs_init(component, md, md_name)-> 用vfs节点表示组件和内存域     uct_config_release(md_config)    uct_md_query(iface_p->md,&iface_p->md_attr)      status =uct_md_attr_v2_init(md,&md_attr_v2)->static ucs_status_t uct_self_md_query -> ucs_status_t    uct_ib_md_query         md_attr->access_mem_types          =UCS_BIT(UCS_MEMORY_TYPE_HOST)-> 内存访问的类型         ucs_sys_cpuset_copy(&md_attr->local_cpus,&md->dev.local_cpus)->for->CPU_ISSET(c, src)-> 拷贝CPU参数       uct_md_attr_from_v2(md_attr,&md_attr_v2)->用源(参数2)赋值给目的(参数1),并用内存拷贝(memcpy)cpu和组件名     uct_md_query_tl_resources(iface_p->md,&tl_resources,&num_tl_resources)-> 查询传输层资源。 该例程查询 uct_md_h“内存域”以获取可用的通信资源         uct_self_query_tl_devices | uct_dc_mlx5_query_tl_devices             uct_ib_device_query_ports             uct_ib_device_port_check                 uct_ib_device_port_attr         ucs_realloc     for-> 遍历可用的传输层并找出合适的传输层名             status =init_iface(tl_resources[tl_index].dev_name, tl_resources[tl_index].tl_name, cmd_args->func_am_type, iface_p)-> 按传输层名初始化接口,入参为: 网卡设备名,传输层名,活动消息类型, 接口       status =uct_md_iface_config_read(iface_p->md, tl_name,NULL,NULL,&config)-> 读取并填充配置       status =uct_iface_open(iface_p->md, iface_p->worker,&params, config,&iface_p->iface)-> 打开通信接口         uct_find_tl(md->component, params->mode.device.tl_name)-> 查找传输层         status = tl->iface_open ->staticUCS_CLASS_DEFINE_NEW_FUNC(uct_rc_verbs_iface_t ->staticUCS_CLASS_INIT_FUNC(uct_rc_verbs_iface_t <-.iface_open         =UCS_CLASS_NEW_FUNC_NAME(_iface_class)<-UCT_TL_DEFINE_ENTRY->宏展开得到(uct_rc_verbs_iface_t_new)            init_attr.qp_type               =IBV_QPT_RC-> 设置QP属性             UCS_CLASS_CALL_SUPER_INIT(uct_rc_iface_t ->执行父类的构造函数(初始化)-> uct_rc_verbs_iface_t_init ->UCS_CLASS_INIT_FUNC(uct_rc_iface_t                 UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t ->UCS_CLASS_INIT_FUNC(uct_ib_iface_t                     preferred_cpu =ucs_cpu_set_find_lcs(&cpu_mask)                    UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t                         uct_base_iface_t_init(&self->super, _myclass->superclass,->UCS_CLASS_INIT_FUNC(uct_base_iface_t                             UCS_CLASS_CALL_SUPER_INIT(uct_iface_t, ops)                                uct_iface_t_init(&self->super, _myclass->superclass ->UCS_CLASS_INIT_FUNC(uct_iface_t, uct_iface_ops_t *ops)                                    self->ops =*ops;                            UCT_CB_FLAGS_CHECK((params->field_mask                             self->internal_ops      = internal_ops ->UCS_CLASS_INIT_FUNC(uct_base_iface_t, uct_iface_ops_t *ops,                            uct_worker_progress_init(&self->prog)                            uct_iface_set_stub_am_handler(self, id);                                iface->am[id].cb    = uct_iface_stub_am_handler                             UCS_STATS_NODE_ALLOC(&self->stats -> ucs_status_t ucs_stats_node_alloc                                 ucs_stats_node_new(cls,&node)                                ucs_stats_node_initv(node, cls, name, ap)                                    ucs_stats_name_check(cls->name)                                    ucs_vsnprintf_safe(node->name,UCS_STAT_NAME_MAX, name, ap)                                        vsnprintf(buf, size, fmt, ap)                                        buf[size -1]='\0'-> 字符串数组补0                                    ucs_list_head_init(&node->children[UCS_STATS_INACTIVE_CHILDREN])                                ucs_stats_filter_node_new(node->cls,&filter_node)                                ucs_stats_node_add(node, parent, filter_node)                                    ucs_list_add_tail(&parent->children[UCS_STATS_ACTIVE_CHILDREN],&node->list)                                    ucs_stats_add_to_filter(node, filter_node)                    uct_ib_device_find_port(dev, params->mode.device.dev_name,&port_num)                    uct_ib_iface_set_path_mtu(self, config)                        (IBV_DEV_ATTR(dev, vendor_id)==0x02c9)-> #  define IBV_DEV_ATTR(_dev, _attr)        ((_dev)->dev_attr._attr)-> 常用操作定义为宏                     uct_ib_iface_init_pkey(self, config)                        ibv_query_pkey(dev->ibv_context, iface->config.port_num, pkey_index,&port_pkey)                        pkey =ntohs(port_pkey)                    uct_ib_iface_init_gid_info(self, config)                        uct_ib_iface_init_roce_gid_info(iface, cfg_gid_index)                            uct_ib_device_select_gid(dev, port_num,&iface->gid_info)-> 选择要使用的最佳 gid 并在 RoCE 端口上设置其信息 - gid 索引、RoCE 版本和地址家族                                 uct_ib_device_query_gid_info                                     ibv_query_gid(ctx, port_num, gid_index,&info->gid)                                    uct_ib_device_get_addr_family(&info->gid, gid_index)                                        const uint32_t addr_last_bits = raw->s6_addr32[2]^htonl(0x0000ffff)                                        uct_ib_device_is_addr_ipv4_mcast(raw, addr_last_bits)                                            return(raw->s6_addr32[0]==htonl(0xff0e0000))&&!(raw->s6_addr32[1]| addr_last_bits)-> 编码IPv4多播地址                                 uct_ib_device_test_roce_gid_index ->UCT/IB:使用 ibv_create_ah() 测试本地 gid 的有效性 在某些情况下,可能无法为 RoCE 设备正确配置网络地址,因此在选择要使用的默认 GID 索引时,我们希望跳过 GID 表中的条目 无法创建AH                                    ibv_create_ah(ucs_container_of(dev, uct_ib_md_t, dev)->pd,&ah_attr)                                    ibv_destroy_ah(ah)                            或 uct_ib_device_query_gid_info                         uct_ib_iface_init_roce_addr_prefix(iface, config)                            ucs_sockaddr_inet_addr_size(gid_info->roce_info.addr_family,&addr_size)                            uct_ib_device_get_roce_ndev_name                             ucs_netif_get_addr -> 获取给定接口的地址和网络掩码                             ucs_sockaddr_get_inet_addr                             ucs_count_ptr_trailing_zero_bits -> 计算缓冲区末尾有多少位等于零                             ucs_debug failed to detect RoCE subnet mask prefix on ->UCT:使用子网掩码确定 RoCE IP 的可达性                         uct_ib_device_query_gid                     uct_ib_iface_init_lmc(self, config)                        uct_ib_iface_port_attr(iface)->lmc ->LID mask control 当多个LID被分配给当前端口时使用                     uct_ib_iface_set_num_paths(self, config)->UCT/IB:为 RoCE LAG(链路聚合,bond相关)IBLMC 实现多路径                         iface->num_paths =uct_ib_iface_roce_lag_level(iface)-> 计算bond后的实际带宽, 端口数                             ucs_netif_bond_ad_num_ports(ndev_name)-> 获取绑定设备的活动 802.3ad 端口数。 如果设备不是绑定设备,或者未启用802.3ad,则返回1                                ucs_read_file_number(&ad_num_ports,1,"%s/%s", bond_path,UCS_BOND_NUM_PORTS_FILE);->                          (uct_ib_iface_port_attr(iface)->active_speed ==UCT_IB_SPEED_NDR))                    self->comp_channel =ibv_create_comp_channel(dev->ibv_context)-> 支持中断通知                     ucs_sys_fcntl_modfl(self->comp_channel->fd,O_NONBLOCK,0)-> 设置事件通道fd为非阻塞模式                     self->ops->create_cq(self,UCT_IB_DIR_TX-> ucs_status_t uct_ib_verbs_create_cq                         ibv_cq_ex_to_cq(ibv_create_cq_ex(dev->ibv_context,&cq_attr))                    self->ops->create_cq(self,UCT_IB_DIR_RX                    self->addr_size  =uct_ib_iface_address_size(self)->UCT/GTESTCM 上的客户端-服务器 - 框架和 CM 基本功能。 重构 IB 地址函数,为在 qp-less 模式下通过 rdmacm 建立连接做准备,添加基本 gtest 来测试基本 CM(连接管理器)功能                         returnuct_ib_address_size(&params)-> 根据地址打包标记判断                 uct_iface_set_async_event_params(params,&self->async.event_cb,&self->async.event_arg)                self->super.release_desc.cb = uct_ud_iface_release_desc -> 启用异步推进                 ucs_ptr_array_init(&self->eps,"ud_eps")-> 初始化数组, 先清空结构体字段, 然后设置数组名                 uct_ud_iface_create_qp(self, config)-> 创建队列对,设置类型(UD不可靠数据报),                      qp_init_attr.qp_type             =IBV_QPT_UD                    ops->create_qp(&self->super,&qp_init_attr,&self->qp)-> ucs_status_t uct_ib_iface_create_qp                         ibv_create_qp                     qp_attr.qp_state   =IBV_QPS_INIT                    ibv_modify_qp(self->qp,&qp_attr,IBV_QP_STATE|IBV_QP_PKEY_INDEX|IBV_QP_PORT|IBV_QP_QKEY)                    qp_attr.qp_state =IBV_QPS_RTR                    ibv_modify_qp(self->qp,&qp_attr,IBV_QP_STATE)                    qp_attr.qp_state =IBV_QPS_RTS                    ibv_modify_qp(self->qp,&qp_attr,IBV_QP_STATE|IBV_QP_SQ_PSN)                uct_ib_iface_recv_mpool_init(&self->super,&config->super, params,"ud_recv_skb",&self->rx.mp)-> 创建接收描述符的内存池                     grow =1024                    uct_iface_param_am_alignment(params, iface->config.seg_size,-> 根据接口参数中提供的用户配置初始化 AM 数据对齐及其偏移量                         *align        =UCS_SYS_CACHE_LINE_SIZE-> cacheline 缓存行对齐                     uct_iface_mpool_init uct_ib_iface_recv_desc_init                         ucs_mpool_params_reset(&mp_params)                        uct_iface_mpool_config_copy(&mp_params, config)                        ucs_mpool_init(&mp_params, mp)-> ucs_status_t ucs_mpool_init                             mp->data =ucs_malloc(sizeof(*mp->data)+ params->priv_size,"mpool_data")                            mp->data->grow_factor     = params->grow_factor -> 增长因子                             VALGRIND_CREATE_MEMPOOL(mp,0,0);-> Memcheck:内存错误检测器,https://valgrind.org/docs/manual/mc-manual.html, 该请求将地址池注册为内存池的锚地址。 它还提供了 rzB 大小,指定放置在从池中分配的块周围的 redzone 应该有多大。 最后,它提供了一个 is_zeroed 参数,用于指定分配时池的块是否归零                 ucs_mpool_grow(&self->rx.mp, self->rx.available)->UCT/IB:减少接收缓冲区的内存消耗。 仅当接口上启用进度时才分配接收缓冲区。 对于 UDDC,分配少量接收缓冲区,以便能够处理新的传入连接, 给内存池增加指定数量的元素                     data->ops->chunk_alloc(mp,&chunk_size,&ptr)->UCS/UCT/TEST:重构和优化内存池基础设施 ->UCS_PROFILE_FUNC_ALWAYS(ucs_status_t, uct_iface_mp_chunk_alloc                         uct_iface_mem_alloc(&iface->super, length,UCT_MD_MEM_ACCESS_LOCAL_READ  |UCT_MD_MEM_ACCESS_LOCAL_WRITE|UCT_MD_MEM_FLAG_LOCK,ucs_mpool_name(mp),&mem)-> 分配可用于零拷贝通信的内存。 分配可用于特定传输接口上的零复制数据传输或远程访问的内存区域                             uct_md_query(iface->md,&md_attr)                            params.mem_type        =UCS_MEMORY_TYPE_HOST                            uct_mem_alloc(length, alloc_methods, num_alloc_methods,&params, mem)-> 为零拷贝通信和远程访问分配内存。 分配可能已注册的内存, 统一内存分配层?                                uct_mem_alloc_check_params(length, methods, num_methods, params)                                uct_md_mem_alloc(md,&alloc_length,&address,                            uct_md_mem_reg(iface->md, mem->address, mem->length, flags,                         ucs_mpool_chunk_elems(mp, chunk)                    ucs_mpool_num_elems_per_chunk(mp, chunk, chunk_size)                    for(i =0; i < chunk->num_elems;++i)                        ucs_mpool_add_to_freelist(mp, elem)                            VALGRIND_MAKE_MEM_DEFINED(tail, sizeof *tail)->UCS/MPOOL:在调试模式下添加 FIFO 行为选项,而不是 LIFOFIFO 模式对于调试很有用,因为 mpool 对象不像 LIFO 那样被回收。 LIFO 的性能更好,因为它减少了缓存未命中和 mpool get/put 开销。 设置 UCS_MPOOL_FIFO=y 启用此模式                     VALGRIND_MAKE_MEM_NOACCESS(chunk +1, chunk_size -sizeof(*chunk))                uct_iface_mpool_init(&self->super.super,&self->tx.mp,  uct_ud_iface_send_skb_init             uct_rc_am_hdr_fill(&self->am_inl_hdr.rc_hdr,0)            status = uct_iface_mpool_init -> 创建地址控制器和原子内存池             uct_rc_verbs_iface_init_inl_wrs -> 初始化可靠连接接口上的工作请求和内联请求,设置RDMA操作码                 iface->inl_am_wr.opcode         =IBV_WR_SEND;                iface->inl_am_wr.send_flags     =IBV_SEND_INLINE;                iface->inl_rwrite_wr.opcode     =IBV_WR_RDMA_WRITE;                iface->inl_rwrite_wr.send_flags =IBV_SEND_SIGNALED|IBV_SEND_INLINE;            status =uct_rc_init_fc_thresh(&config->super,&self->super)            status = uct_rc_iface_qp_create                 UCS_PROFILE_CALL_ALWAYS(ibv_create_qp_ex, 或                 UCS_PROFILE_CALL_ALWAYS(ibv_create_qp             uct_ib_destroy_qp(qp)        ucs_vfs_obj_add_dir(worker,*iface_p,"iface/%p",*iface_p)        ucs_vfs_obj_add_sym_link(*iface_p, md,"memory_domain")        ucs_vfs_obj_set_dirty(*iface_p, uct_iface_vfs_refresh)      uct_iface_progress_enable(iface_p->iface -> uct_base_iface_progress_enable_cb -> uct_rc_verbs_iface_common_progress_enable         uct_rc_verbs_iface_common_prepost_recvs(iface)          uct_rc_verbs_iface_post_recv_common             uct_rc_verbs_iface_post_recv_always               uct_ib_iface_prepare_rx_wrs                 UCT_TL_IFACE_GET_RX_DESC(&iface->super, mp, desc,break)-> ucs_mpool_get_inline                   ucs_mpool_get_grow                     ucs_mpool_grow -> chunk_alloc -> uct_iface_mp_chunk_alloc                       uct_iface_mem_alloc                     ...                    ibv_reg_mr_iova2               ibv_post_srq_recv         uct_rc_verbs_iface_progress(search)-> 驱动接口运转         ucs_callbackq_add_safe       status =uct_iface_query(iface_p->iface,&iface_p->iface_attr)-> uct_rc_verbs_iface_query         status = uct_rc_iface_query           status = uct_ib_iface_query -> ucs_status_t uct_ib_iface_query             staticconst uint8_t ib_port_widths[]={[1]=1,[2]=4,[4]=8,[8]=12,[16]=2};            uct_base_iface_query(&iface->super, iface_attr)            active_width =uct_ib_iface_port_attr(iface)->active_width             caseUCT_IB_SPEED_EDR-> 网卡速度               iface_attr->latency.c =600e-9              signal_rate           =25.78125e9              encoding              =64.0/66.0            num_path   =uct_ib_iface_is_roce(iface)            uct_ib_device_has_pci_atomics(dev))        iface_attr->latency.m +=1e-9-> 1ns 纳秒         iface_attr->overhead   =75e-9        iface_attr->ep_addr_len = uct_ib_md_is_flush_rkey_valid   status =uct_iface_set_am_handler(if_info.iface, id, hello_world,&cmd_args.func_am_type,0)-> 设置服务端接收到客户端数据后的回调     iface->am[id].cb    = cb   connect_common     ret =getaddrinfo(server, service,&hints,&res)    sockfd =socket(t->ai_family, t->ai_socktype, t->ai_protocol)    setsockopt     bind, listen, accept -> 等待客户端发起socket连接   status =uct_iface_get_device_address(if_info.iface, own_dev)  sendrecv(oob_sock, own_dev, if_info.iface_attr.device_addr_len,(void**)&peer_dev)-> 先通过socket发送和接收长度, 然后发送和接收地址, 交换地址   uct_iface_is_reachable 检查地址是否可达 -> uct_ib_iface_is_reachable_v2 ->static int uct_ib_iface_dev_addr_is_reachable -> ucs_test_all_flags     device_addr =(const uct_ib_address_t*)UCS_PARAM_VALUE-> 如果设置了字段掩码中的标志,则有条件地返回参数值。 否则,返回默认值   status =uct_ep_create(&ep_params,&ep)-> 创建新端点 ->UCS_CLASS_DEFINE_NEW_FUNC(uct_rc_verbs_ep_t, uct_ep_t,const uct_ep_params_t *)  ucs_vfs_obj_set_dirty(params->iface, uct_iface_vfs_refresh)    ucs_vfs_global_init()->UCS_INIT_ONCE(&ucs_vfs_init_once) 单例     ucs_vfs_node_find_by_obj -> ucs_vfs_kh_find -> klib -> kh_get -> 独立且轻量级c库: https://github.com/attractivechaos/klib   ucs_status_t uct_rc_verbs_ep_get_address   status =uct_ep_connect_to_ep(ep, peer_dev, peer_ep)-> uct_rc_verbs_ep_connect_to_ep_v2     status =uct_rc_iface_qp_connect(iface, ep->qp, qp_num,&ah_attr, path_mtu)-> ucs_status_t uct_rc_iface_qp_connect       qp_attr.qp_state              =IBV_QPS_RTR-> 设备qp       ret =ibv_modify_qp(qp,&qp_attr, qp_attr_mask);-> 设置qp状态机       ...      qp_attr.qp_state              =IBV_QPS_RTS      ret =ibv_modify_qp(qp,&qp_attr, qp_attr_mask)      ucs_debug("connected rc qp 0x%x on "-> 打印debug日志 -> connected rc qp 0x1a91b on mlx5_0:1/RoCE to lid 49152(+0) sl 0 remote_qp 0x1a91a mtu 1024 timer 18x7 rnr 13x7 rd_atom 16  uct_worker_progress(if_info.worker)->UCT 显示驱动工人推进。 该例程显式地处理任何未完成的通信操作和活动消息请求 -> uct_rc_verbs_iface_progress     count =uct_rc_verbs_iface_poll_rx_common(iface)-> ibv_poll_cq       uct_rc_verbs_iface_post_recv_common(iface,0)    return count +uct_rc_verbs_iface_poll_tx(iface)-> always_inline       UCT_RC_VERBS_IFACE_FOREACH_TXWQE(&iface->super, i, wc, num_wcs)-> 遍历宏       uct_rc_txqp_completion_desc       ucs_arbiter_group_schedule -> 安排一个小组进行仲裁。 如果该组已经存在,则该操作将无效       uct_rc_verbs_update_tx_res(&iface->super, ep, count)        uct_rc_txqp_available_add         uct_rc_iface_update_reads ->RDMA_READ 积分释放回 RC iface。 RDMA_READ 积分在完成回调中释放,但不会释放到 RC iface 以避免 OOO 发送。 否则,如果读取信用是唯一缺少的资源并在完成回调中释放,则即使挂起队列不为空,下一个完成回调也将能够发送         uct_rc_iface_add_cq_credits ->UCT/IB:修复错误处理后清除待处理请求的问题       ucs_arbiter_dispatch(&iface->super.tx.arbiter,1, uct_rc_ep_process_pending,NULL)-> 在仲裁器中调度工作元素。 对于每个组,只要回调返回 REMOVE_ELEMNEXT_GROUP,最多会调度 per_group 工作元素。 然后,对下一组执行相同的操作,直到仲裁器变空或回调返回 STOP。 如果一个组没有元素,或其回调返回 REMOVE_GROUP,则它将被删除,直到使用 ucs_arbiter_group_schedule() 将其放回到仲裁器上   if(barrier(oob_sock, progress_worker, if_info.worker))->TCP/TEST:修复 ucp_hello_world 同时 ep close 的问题   out_free_ep -> 释放资源   ...server:while(desc_holder ==NULL)uct_worker_progress(if_info.worker)->static inline unsigned ucs_callbackq_dispatch -> count +=cb(elem->arg)  static unsigned uct_rc_verbs_iface_progress(void*arg)<- self->super.progress     count =uct_rc_verbs_iface_poll_rx_common(iface)      uct_rc_verbs_iface_handle_am         uct_iface_invoke_am           status = handler->cb(handler->arg, data, length, flags)-> hello_world -> 服务端收到数据后回调             print_strings uct_iface_progress_enable   uct_iface_progress ->return iface->ops.iface_progress(iface)    uct_rc_iface_do_progress <-.iface_progress       return iface->progress(iface)       typedef struct ucs_mpmc_queue, 多生产者多消费者线程安全队列。 在“良好”场景中,每次推/拉都是一个原子操作 typedef struct uct_tl_resource_desc, 通信资源描述符。 资源描述符是表示网络资源的对象。 资源描述符可以表示独立的通信资源(例如HCA端口、网络接口)或多个资源(例如多个网络接口或通信端口)。 它还可以表示通过单个物理网络接口定义的虚拟通信资源 md_resources, 内存域资源数组。 使用时,应在使用指向数组的指针调用 @ref uct_component_query 之前对其进行初始化,该数组足够大以容纳所有内存域资源条目。 调用后,该数组将填充现有内存域资源的信息。 为了分配该数组,您可以调用@ref uct_component_query两次:第一次将仅通过在field_mask中指定@ref UCT_COMPONENT_ATTR_FIELD_MD_RESOURCE_COUNT来获取所需的条目数量。 然后,可以为数组分配返回的条目数,并传递给对 @ref uct_component_query 的第二次调用,这次将 field_mask 设置为 @ref UCT_COMPONENT_ATTR_FIELD_MD_RESOURCES connect_common uct_iface_get_device_address sendrecv status =uct_ep_create(&ep_params,&ep)uct_rc_verbs_ep_connect_to_ep_v2

客户端流程

client: 客户端流程[root@node63 ucx]# /home/xb/project/ucx/examples/.libs/lt-uct_hello_world -d mlx5_0:1-t rc_verbs -n 172.17.29.63-z...if(connect(sockfd, t->ai_addr, t->ai_addrlen)==0) examples/uct_hello_world.c -> int main(int argc, char **argv)if(parse_cmd(argc, argv,&cmd_args))-> 解析命令行参数 ucs_async_context_create uct_worker_create dev_tl_lookup status =uct_iface_set_am_handler(if_info.iface, id, hello_world -> 设置接收到数据后的回调 connect_common uct_iface_get_device_address uct_iface_get_address uct_ep_create sendrecv char *str =(char *)mem_type_malloc(cmd_args.test_strlen)->分配16字节的字符(待发送的数据), malloc | cudaMalloc | cudaMallocManaged generate_test_string -> 生成测试字符串   memcpy(dst, src, count)-> 拷贝16字节的字符串到strdo_am_zcopy(&if_info, ep, id,&cmd_args, str)-> str -> char *buf 字符串地址转为buf地址   uct_md_mem_reg -> uct_ib_mlx5_devx_mem_reg ->将buf地址进行内存注册得到内存控制器(memh->address = address)    uct_ib_memh_alloc(&md->super, length,        memh =ucs_calloc(1, memh_base_size +(mr_size * num_mrs),"ib_memh")    uct_ib_mlx5_devx_reg_mr(md, memh, address,        access_flags &IBV_ACCESS_ON_DEMAND-> 按需注册         uct_ib_reg_mr(&md->super, address, length, params, access_flags,        *lkey_p = memh->mrs[mr_type].super.ib->lkey         *rkey_p = memh->mrs[mr_type].super.ib->rkey         uct_ib_mem_prefetch(&md->super,&memh->super, address, length)-> 支持内存预取             UCS_PROFILE_CALL(ibv_advise_mr -> 提供有关 MR 中地址范围的建议     mr =UCS_PROFILE_CALL_ALWAYS(ibv_reg_mr, md->pd, address, length, access_flags)-> 注册该段内存   iov.buffer = buf -> buf转iov   iov.memh   = memh -> 空指针   iov.stride =0;  iov.count  =1;  ...if(barrier(oob_sock, progress_worker, if_info.worker))...ucs_status_t do_am_short   UCT_INLINE_API ucs_status_t uct_ep_am_short     ucs_status_t uct_rc_verbs_ep_am_short       uct_rc_verbs_iface_fill_inl_am_sge       uct_rc_verbs_ep_post_send         ibv_post_send

零拷贝调用栈

do_am_bcopy   uct_ep_am_bcopy     uct_rc_verbs_ep_am_bcopy       UCT_RC_IFACE_GET_TX_AM_BCOPY_DESC      ucs_mpool_get_inline         status =uct_rc_verbs_ep_post_send_desc(ep,&wr, desc, send_flags |IBV_SEND_SOLICITED,UCT_IB_MAX_ZCOPY_LOG_SGE(&iface->super.super));        UCT_RC_VERBS_FILL_DESC_WR(wr, desc)-> 将描述填充到工作请求, 新建sge与wr通过sgl关联         uct_rc_verbs_ep_post_send(iface, ep, wr, send_flags, max_log_sge)          ucs_assertv(ep->qp->state ==IBV_QPS_RTS-> 队列对状态必须是准备发送           uct_rc_iface_tx_moderation           uct_rc_ep_fm           uct_ib_log_post_send           ret =ibv_post_send(ep->qp, wr,&bad_wr);-> 提交wr到qp的发送队列, 触发服务端收到消息后打印, print_strings           uct_rc_verbs_txqp_posted ->DC(动态连接队列对 Dynamically Connected(DC) QPs)传输类型做准备, cq流控信用, 状态计数器等         uct_rc_txqp_add_send_op_sn -> 提交发送后, 将io描述, 按序号sn插入outstanding队列, 因为在轮询完成时,我们可获得完成的数量(而不是基于完成的零索引)           ucs_queue_push(&txqp->outstanding,&op->queue) ucs_status_t do_am_zcopy -> 零拷贝   uct_md_mem_reg -> uct_md_mem_reg_v2 -> md->ops->mem_reg ->UCS_PROFILE_CALL_ALWAYS(ibv_reg_mr -> 注册内存 -> 分析函数调用, 当 access_flags 包含 IBV_ACCESS_ON_DEMAND时,ibv_reg_mr() 可能会失败并出现 EAGAIN。 这意味着预取由于与失效冲突而失败     __ibv_reg_mr   comp.uct_comp.func   = zcopy_completion_cb -> 完成回调   do...while(status ==UCS_ERR_NO_RESOURCE)-> 没资源时退出循环    status =uct_ep_am_zcopy(ep, id,NULL,0,&iov,1,0,(uct_completion_t *)&comp)-> ep->iface->ops.ep_am_zcopy     ucs_status_t uct_rc_mlx5_ep_am_zcopy | ucs_status_t uct_rc_verbs_ep_am_zcopy -> 可靠连接verbs端点的活动消息零拷贝       struct ibv_sge sge[UCT_IB_MAX_IOV];      UCT_CHECK_IOV_SIZE-> 检查向量IO个数,应该不超过网卡最大发送(SGE-1), 第一个sge给header使用       UCT_RC_CHECK_AM_ZCOPY-> 检查零拷贝参数       UCT_RC_CHECK_RES_AND_FC-> 检查资源和流控参数       UCT_RC_IFACE_GET_TX_AM_ZCOPY_DESC-> 从内存池获取发送端零拷贝内存描述 ->UCT/RC:使用恒定的标头大小,无论 TM 状态如何       sge_cnt =uct_ib_verbs_sge_fill_iov(sge +1, iov, iovcnt)-> 填充IO(将iov设置到sge上), 通过 uct_iov_t 中提供的数据填充 ibv_sge 数据结构 该函数避免复制零长度的 IOV        sge[sge_it].addr   =(uintptr_t)(iov[iov_it].buffer)-> buffer地址转sge地址         sge[sge_it].lkey =uct_ib_memh_get_lkey(iov[iov_it].memh)-> 设置本地键       UCT_RC_VERBS_FILL_AM_ZCOPY_WR_IOV-> 准备工作请求wr, 将sge首地址和数量设置到wr,rdma操作码为发送(双边,IBV_WR_SEND)        wr.sg_list = sge; wr.num_sge =(sge_cnt +1); wr.opcode =(typeof(wr.opcode))IBV_WR_SEND;      UCT_TL_EP_STAT_OP-> 统计发送状态       uct_rc_verbs_ep_post_send_desc | status =uct_rc_mlx5_ep_zcopy_post(ep,MLX5_OPCODE_SEND, iov, iovcnt, 0ul, id, header, header_length,0,0, 0ul,0,0,MLX5_WQE_CTRL_SOLICITED, uct_rc_ep_send_op_completion_handler,0, comp);        ----------- rc_mlx5_iface 迈络思可靠连接接口的提交实现         uct_rc_mlx5_txqp_dptr_post_iov           uct_rc_mlx5_am_hdr_fill           uct_ib_mlx5_inline_copy           uct_rc_mlx5_common_post_send             uct_ib_mlx5_post_send ->UCT/IB/MLX5:修复 TXWQ 溢出检查,当 WQE 恰好在 hw_ci+qp_length 结束时,当前检查失败,即使这是有效的情况,将溢出检查重构为特定断言并添加注释来解释它们,移动检查 函数到 C 文件,因为它不是快速路径               num_bb  =ucs_div_round_up(wqe_size,MLX5_SEND_WQE_BB)-> 向上取大的               uct_ib_mlx5_txwq_validate               ucs_memory_cpu_store_fence -> 内存屏障 -> asm volatile("":::"memory")-> 防止编译器重新排序指令               *wq->dbrec =htonl(sw_pi += num_bb)-> 写门铃记录, volatile uint32_t           *dbrec;              ucs_memory_bus_store_fence()              ucs_likely uct_ib_mlx5_bf_copy                 uct_ib_mlx5_bf_copy_bb ->UCP/UCT/IB:修复UCT中的线程模式,当以“序列化”模式创建 IB 接口时,请确保刷新写入组合缓冲区,以避免在另一个线程使用同一 MMIO 寄存器时数据损坏                   UCS_WORD_COPY(uint64_t, dst, uint64_t, src,MLX5_SEND_WQE_BB)->UCT/IB:为 DM 字节复制循环解决无效的 GCC 矢量化问题 GCC 可以生成“movdqa”指令,该指令假定源缓冲区与 16 字节对齐,但是源缓冲区是由用户提供的,并且可能未对齐。 将源缓冲区类型声明为严格未对齐,以防止 GCC 进行无效矢量化             uct_rc_txqp_posted         uct_rc_txqp_add_send_comp       UCT_RC_UPDATE_FC-> 更新流控   uct_worker_progress


零拷贝内存示意图



创建队列对调用栈(ibv_create_qp)

(gdb) b ibv_create_qp #0  0x00007ffff65347b0inibv_create_qp() from /lib64/libibverbs.so.1#1  0x00007ffff6778a58inuct_rc_verbs_can_create_qp(ctx=<optimized out>, pd=0x60d3f0) at rc/verbs/rc_verbs_iface.c:556#2  0x00007ffff6778c23inuct_rc_verbs_query_tl_devices(md=0x60efd0, tl_devices_p=0x7fffffffd630, num_tl_devices_p=0x7fffffffd620) at rc/verbs/rc_verbs_iface.c:581#3  0x00007ffff792864finuct_md_query_tl_resources(md=0x60efd0, resources_p=resources_p@entry=0x7fffffffd7b0, num_resources_p=num_resources_p@entry=0x7fffffffd790) at base/uct_md.c:94#4  0x000000000040248cindev_tl_lookup(cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:363#5  0x000000000040193binmain(argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611(gdb)c(gdb) bt #0  0x00007ffff65347b0inibv_create_qp() from /lib64/libibverbs.so.1#1  0x00007ffff675db21inibv_create_qp_ex(qp_init_attr_ex=0x7fffffffd558, context=<optimized out>) at /usr/include/infiniband/verbs.h:3016#2  uct_ib_iface_create_qp(iface=iface@entry=0x6160f0, attr=attr@entry=0x7fffffffd520, qp_p=qp_p@entry=0x7fffffffd4e0) at base/ib_iface.c:1024#3  0x00007ffff677238binuct_rc_iface_qp_create(iface=iface@entry=0x6160f0, qp_p=qp_p@entry=0x7fffffffd4e0, attr=attr@entry=0x7fffffffd520, max_send_wr=<optimized out>, srq=<optimized out>)    at rc/base/rc_iface.c:838#4  0x00007ffff67794edinuct_rc_verbs_iface_t_init(tl_config=0x60e130, params=<optimized out>, worker=<optimized out>, tl_md=<optimized out>, _init_count=0x7fffffffd4d0,      _myclass=0x7ffff69fc760<uct_rc_verbs_iface_t_class>, self=0x6160f0) at rc/verbs/rc_verbs_iface.c:357#5  uct_rc_verbs_iface_t_new(arg0=<optimized out>, arg1=<optimized out>, arg2=<optimized out>, arg3=0x60e130, obj_p=0x7fffffffdcd8) at rc/verbs/rc_verbs_iface.c:461#6  0x00007ffff7928b5binuct_iface_open(md=0x60efd0, worker=0x607560, params=params@entry=0x7fffffffd820, config=0x60e130, iface_p=iface_p@entry=0x7fffffffdcd8) at base/uct_md.c:250#7  0x0000000000402738ininit_iface(iface_p=0x7fffffffdac0, func_am_type=FUNC_AM_SHORT, tl_name=0x616010"rc_verbs", dev_name=0x61601a"mlx5_0:1") at uct_hello_world.c:271#8  dev_tl_lookup(cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:383#9  0x000000000040193binmain(argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611

创建完成队列(ibv_create_cq)

创建完成队列:#0  0x00007ffff65345c0inibv_create_cq() from /lib64/libibverbs.so.1#1  0x00007ffff67694dcinuct_ib_mlx5_devx_open_device(ibv_device=ibv_device@entry=0x60de40) at mlx5/dv/ib_mlx5dv_md.c:947#2  0x00007ffff676cfd8inuct_ib_mlx5_devx_md_open(ibv_device=0x60de40, md_config=0x607750, p_md=0x7fffffffd5e0) at mlx5/dv/ib_mlx5dv_md.c:1111#3  0x00007ffff6760d04inuct_ib_md_open(component=<optimized out>, md_name=0x7fffffffd680"mlx5_0", uct_md_config=0x607750, md_p=0x7fffffffd640) at base/ib_md.c:1051#4  0x00007ffff792852dinuct_md_open(component=0x7ffff69faf40<uct_ib_component>, md_name=0x7fffffffd680"mlx5_0", config=<optimized out>, md_p=md_p@entry=0x7fffffffddc0) at base/uct_md.c:61#5  0x0000000000402427indev_tl_lookup(cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:352#6  0x000000000040193binmain(argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611 #0  0x00007ffff65345c0inibv_create_cq() from /lib64/libibverbs.so.1#1  0x00007ffff6778a39inuct_rc_verbs_can_create_qp(ctx=<optimized out>, pd=0x60d3f0) at rc/verbs/rc_verbs_iface.c:546#2  0x00007ffff6778c23inuct_rc_verbs_query_tl_devices(md=0x60efd0, tl_devices_p=0x7fffffffd630, num_tl_devices_p=0x7fffffffd620) at rc/verbs/rc_verbs_iface.c:581#3  0x00007ffff792864finuct_md_query_tl_resources(md=0x60efd0, resources_p=resources_p@entry=0x7fffffffd7b0, num_resources_p=num_resources_p@entry=0x7fffffffd790) at base/uct_md.c:94#4  0x000000000040248cindev_tl_lookup(cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:363#5  0x000000000040193binmain(argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611 #0  0x00007ffff65347b0inibv_create_qp() from /lib64/libibverbs.so.1#1  0x00007ffff675db21inibv_create_qp_ex(qp_init_attr_ex=0x7fffffffd558, context=<optimized out>) at /usr/include/infiniband/verbs.h:3016#2  uct_ib_iface_create_qp(iface=iface@entry=0x6160f0, attr=attr@entry=0x7fffffffd520, qp_p=qp_p@entry=0x7fffffffd4e0) at base/ib_iface.c:1024#3  0x00007ffff677238binuct_rc_iface_qp_create(iface=iface@entry=0x6160f0, qp_p=qp_p@entry=0x7fffffffd4e0, attr=attr@entry=0x7fffffffd520, max_send_wr=<optimized out>, srq=<optimized out>)    at rc/base/rc_iface.c:838#4  0x00007ffff67794edinuct_rc_verbs_iface_t_init(tl_config=0x60e130, params=<optimized out>, worker=<optimized out>, tl_md=<optimized out>, _init_count=0x7fffffffd4d0,      _myclass=0x7ffff69fc760<uct_rc_verbs_iface_t_class>, self=0x6160f0) at rc/verbs/rc_verbs_iface.c:357#5  uct_rc_verbs_iface_t_new(arg0=<optimized out>, arg1=<optimized out>, arg2=<optimized out>, arg3=0x60e130, obj_p=0x7fffffffdcd8) at rc/verbs/rc_verbs_iface.c:461#6  0x00007ffff7928b5binuct_iface_open(md=0x60efd0, worker=0x607560, params=params@entry=0x7fffffffd820, config=0x60e130, iface_p=iface_p@entry=0x7fffffffdcd8) at base/uct_md.c:250#7  0x0000000000402738ininit_iface(iface_p=0x7fffffffdac0, func_am_type=FUNC_AM_SHORT, tl_name=0x616010"rc_verbs", dev_name=0x61601a"mlx5_0:1") at uct_hello_world.c:271#8  dev_tl_lookup(cmd_args=cmd_args@entry=0x7fffffffda00, iface_p=iface_p@entry=0x7fffffffdac0) at uct_hello_world.c:383#9  0x000000000040193binmain(argc=<optimized out>, argv=<optimized out>) at uct_hello_world.c:611

处理异步事件的线程线程函数

处理异步事件的线程线程函数(调用栈)ucs_async_thread_startucs_pthread_create(&thread->thread_id, ucs_async_thread_func, thread,"async");    staticvoid*ucs_async_thread_func(void*arg)    status =ucs_event_set_wait(thread->event_set ucs_async_thread_ev_handler       event_set_handler(events[i].data.ptr, io_events, arg)        status =ucs_async_dispatch_handlers(&fd,1, events)          ucs_async_handler_dispatch(handler, events)            ucs_async_handler_invoke(handler, events)              handler->cb(handler->id, events, handler->arg)                uct_ib_async_event_handler 异步事件处理                   ibv_get_async_event(dev->ibv_context,&ibevent)                  uct_ib_handle_async_event(dev,&event)-> 处理异步事件                     switch(event->event_type)                    ...                    UCS_STATS_UPDATE_COUNTER(dev->stats,UCT_IB_DEVICE_STAT_ASYNC_EVENT,+1)                  ibv_ack_async_event(&ibevent)


其他参考

UCT笔记(随时更新)

https://github.com/ssbandjl/ucx/blob/master/category/uct_readme


晓兵

博客: https://logread.cn | https://blog.csdn.net/ssbandjl | https://cloud.tencent.com/developer/user/5060293/articles

DAOS汇总: https://cloud.tencent.com/developer/article/2344030

WX_PUB_SUB: 云原生云

WX: ssbandjl


相关实践学习
阿里云图数据库GDB入门与应用
图数据库(Graph Database,简称GDB)是一种支持Property Graph图模型、用于处理高度连接数据查询与存储的实时、可靠的在线数据库服务。它支持Apache TinkerPop Gremlin查询语言,可以帮您快速构建基于高度连接的数据集的应用程序。GDB非常适合社交网络、欺诈检测、推荐引擎、实时图谱、网络/IT运营这类高度互连数据集的场景。 GDB由阿里云自主研发,具备如下优势: 标准图查询语言:支持属性图,高度兼容Gremlin图查询语言。 高度优化的自研引擎:高度优化的自研图计算层和存储层,云盘多副本保障数据超高可靠,支持ACID事务。 服务高可用:支持高可用实例,节点故障迅速转移,保障业务连续性。 易运维:提供备份恢复、自动升级、监控告警、故障切换等丰富的运维功能,大幅降低运维成本。 产品主页:https://www.aliyun.com/product/gdb
目录
相关文章
|
8月前
|
Java Maven
【Netty 网络通信】启动通信服务端
【1月更文挑战第9天】【Netty 网络通信】启动通信服务端
|
7月前
|
缓存 网络协议 Linux
c++实战篇(三) ——对socket通讯服务端与客户端的封装
c++实战篇(三) ——对socket通讯服务端与客户端的封装
159 0
|
8月前
|
设计模式 监控 网络协议
socket通信处于网络协议那一层和两种接收发送消息方式
socket通信处于网络协议那一层和两种接收发送消息方式
102 2
|
8月前
|
前端开发 Java Maven
【Netty 网络通信】启动客户端连接服务端实现通信
【1月更文挑战第9天】【Netty 网络通信】启动客户端连接服务端实现通信
|
Dubbo Java 应用服务中间件
dubbo协议下的单一长连接与多线程并发如何协同工作
dubbo协议下的单一长连接与多线程并发如何协同工作
242 0
dubbo协议下的单一长连接与多线程并发如何协同工作
|
网络安全 开发工具 数据安全/隐私保护
建立一个流式的RPC通信 | 青训营笔记
建立一个流式的RPC通信 | 青训营笔记
233 0
|
网络协议 Linux
Linux网络编程服务端的创建
Linux网络编程服务端的创建
112 0
|
Python
python实现两台不同主机之间进行通信(客户端和服务端)——Socket
通过Python进行Socket网络编程 (做一个聊天程序) 可以实现在不同的主机(电脑)之间进行通话。
780 0
|
XML 安全 网络协议