OpenYurt 之 Yurthub 数据过滤框架解析

本文涉及的产品
云原生网关 MSE Higress,422元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: OpenYurt 是业界首个非侵入的边缘计算云原生开源项目,通过边缘自治,云边协同,边缘单元化,边缘流量闭环等能力为用户提供云边一体化的使用体验。在 Openyurt 里边缘网络可以使用数据过滤框架在不同节点池里实现边缘流量闭环能力。

作者:应健健,新华智云计算中心


OpenYurt 是业界首个非侵入的边缘计算云原生开源项目,通过边缘自治,云边协同,边缘单元化,边缘流量闭环等能力为用户提供云边一体化的使用体验。在 Openyurt 里边缘网络可以使用数据过滤框架在不同节点池里实现边缘流量闭环能力。


Yurthub 数据过滤框架解析


Yurthub 本质上是一层 kube-apiserver 的代理,在代理的基础上加了一层 cache,一来保证边缘节点离线的情况下可以使用本地 cache 保证业务稳定性,有效的解决了边缘自治的问题。二来可以降低大量的 list & watch 操作对云上 api 产生一定的负载。


Yurthub 的数据过滤通过节点上的 pod 以及 kubelet 的请求通过 Load Balancer 发送给 kube-apiserver,代理接收到响应消息进行数据过滤处理之后再将过滤后的数据返回给请求方。如果节点是边缘节点会根据请求类型对响应请求体中的资源进行本地缓存,如果是云端节点考虑到网络状态良好不进行本地缓存。

Yurthub 的过滤框架实现原理图:


1.pngimage.gif


Yurthub 目前包含四种过滤规则,通过 addons 请求的 user-agent,resource,verb 判断经过那个过滤器进行相应的数据过滤。


四种过滤规则功能及实现


ServiceTopologyFilter


主要针对 EndpointSlice 资源进行数据过滤, 但 Endpoint Slice 特性需要在 Kubernetes v1.18 或以上版本才能支持,如果在 1.18 版本以下建议使用 endpointsFilter 过滤器。当经过该过滤器首先通过 kubernetes.io/service-name 找到 endpointSlice 资源所对应的 services 资源,之后判断 servces 资源是否存在 openyurt.io/topologyKeys 这个 Annotations,如果存在那么通过这个 Annotations 的值判断数据过滤规则,最后更新 response data 返回给 addons。


Annotations 的值分为两大类:


1、kubernetes.io/hostname:只过滤出相同节点的 endpoint ip


2、openyurt.io/nodepool 或者 kubernetes.io/zone: 通过这个 Annotations 获取对应节点池,最后遍历 endpointSlice 资源,通过 endpointSlice 里的 topology 字段中的 kubernetes.io/hostname 字段在 endpointSlice 对象里找到对应的 Endpoints,之后重组 endpointSlice 里的 Endpoints 后返回给 addons。 


代码实现:


func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice) *discovery.EndpointSlice {
   var serviceTopologyType string
   // get the service Topology type
   if svcName, ok := endpointSlice.Labels[discovery.LabelServiceName]; ok {
      svc, err := fh.serviceLister.Services(endpointSlice.Namespace).Get(svcName)
      if err != nil {
         klog.Infof("skip reassemble endpointSlice, failed to get service %s/%s, err: %v", endpointSlice.Namespace, svcName, err)
         return endpointSlice
      }
      if serviceTopologyType, ok = svc.Annotations[AnnotationServiceTopologyKey]; !ok {
         klog.Infof("skip reassemble endpointSlice, service %s/%s has no annotation %s", endpointSlice.Namespace, svcName, AnnotationServiceTopologyKey)
         return endpointSlice
      }
   }
   var newEps []discovery.Endpoint
   // if type of service Topology is 'kubernetes.io/hostname'
   // filter the endpoint just on the local host
   if serviceTopologyType == AnnotationServiceTopologyValueNode {
      for i := range endpointSlice.Endpoints {
         if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName {
            newEps = append(newEps, endpointSlice.Endpoints[i])
         }
      }
      endpointSlice.Endpoints = newEps
   } else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone {
      // if type of service Topology is openyurt.io/nodepool
      // filter the endpoint just on the node which is in the same nodepool with current node
      currentNode, err := fh.nodeGetter(fh.nodeName)
      if err != nil {
         klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err)
         return endpointSlice
      }
      if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
         nodePool, err := fh.nodePoolLister.Get(nodePoolName)
         if err != nil {
            klog.Infof("skip reassemble endpointSlice, failed to get nodepool %s, err: %v", nodePoolName, err)
            return endpointSlice
         }
         for i := range endpointSlice.Endpoints {
            if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) {
               newEps = append(newEps, endpointSlice.Endpoints[i])
            }
         }
         endpointSlice.Endpoints = newEps
      }
   }
   return endpointSlice
}


EndpointsFilter


针对 endpoints 资源进行相应的数据过滤,首先判断 endpoint 是否存在对应的 service,通过 node 的 label: apps.openyurt.io/nodepool 获取节点池,之后获取节点池下的所有节点,遍历 endpoints.Subsets 下的资源找出同一个节点池的 Ready pod address 以及 NotReady pod address 重组成新的 endpoints 之后返回给 addons。


func (fh *endpointsFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints {
   svcName := endpoints.Name
   _, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName)
   if err != nil {
      klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err)
      return endpoints
   }
   // filter the endpoints on the node which is in the same nodepool with current node
   currentNode, err := fh.nodeGetter(fh.nodeName)
   if err != nil {
      klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err)
      return endpoints
   }
   if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
      nodePool, err := fh.nodePoolLister.Get(nodePoolName)
      if err != nil {
         klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err)
         return endpoints
      }
      var newEpSubsets []v1.EndpointSubset
      for i := range endpoints.Subsets {
         endpoints.Subsets[i].Addresses = filterValidEndpointsAddr(endpoints.Subsets[i].Addresses, nodePool)
         endpoints.Subsets[i].NotReadyAddresses = filterValidEndpointsAddr(endpoints.Subsets[i].NotReadyAddresses, nodePool)
         if endpoints.Subsets[i].Addresses != nil || endpoints.Subsets[i].NotReadyAddresses != nil {
            newEpSubsets = append(newEpSubsets, endpoints.Subsets[i])
         }
      }
      endpoints.Subsets = newEpSubsets
      if len(endpoints.Subsets) == 0 {
         // this endpoints has no nodepool valid addresses for ingress controller, return nil to ignore it
         return nil
      }
   }
   return endpoints
}


MasterServiceFilter

针对 services 下的域名进行 ip 以及端口替换,这个过滤器的场景主要在于边缘端的 pod 无缝使用 InClusterConfig 访问集群资源。


func (fh *masterServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
   list, err := fh.serializer.Decode(b)
   if err != nil || list == nil {
      klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of masterServiceFilterHandler, %v", err)
      return b, nil
   }
   // return data un-mutated if not ServiceList
   serviceList, ok := list.(*v1.ServiceList)
   if !ok {
      return b, nil
   }
   // mutate master service
   for i := range serviceList.Items {
      if serviceList.Items[i].Namespace == MasterServiceNamespace && serviceList.Items[i].Name == MasterServiceName {
         serviceList.Items[i].Spec.ClusterIP = fh.host
         for j := range serviceList.Items[i].Spec.Ports {
            if serviceList.Items[i].Spec.Ports[j].Name == MasterServicePortName {
               serviceList.Items[i].Spec.Ports[j].Port = fh.port
               break
            }
         }
         klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req))
         break
      }
   }
   // return the mutated serviceList
   return fh.serializer.Encode(serviceList)
}


DiscardCloudService


该过滤器针对两种 service 其中的一种类型是 LoadBalancer,因为边缘端无法访问 LoadBalancer 类型的资源,所以该过滤器会将这种类型的资源直接过滤掉。另外一种是针对 kube-system 名称空间下的 x-tunnel-server-internal-svc,这个 services 主要存在 cloud 节点用于访问 yurt-tunnel-server,对于 edge 节点会直接过滤掉该 service。


func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
   list, err := fh.serializer.Decode(b)
   if err != nil || list == nil {
      klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of discardCloudServiceFilterHandler %v", err)
      return b, nil
   }
   serviceList, ok := list.(*v1.ServiceList)
   if ok {
      var svcNew []v1.Service
      for i := range serviceList.Items {
         nsName := fmt.Sprintf("%s/%s", serviceList.Items[i].Namespace, serviceList.Items[i].Name)
         // remove lb service
         if serviceList.Items[i].Spec.Type == v1.ServiceTypeLoadBalancer {
            if serviceList.Items[i].Annotations[filter.SkipDiscardServiceAnnotation] != "true" {
               klog.V(2).Infof("load balancer service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
               continue
            }
         }
         // remove cloud clusterIP service
         if _, ok := cloudClusterIPService[nsName]; ok {
            klog.V(2).Infof("clusterIP service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)
            continue
         }
         svcNew = append(svcNew, serviceList.Items[i])
      }
      serviceList.Items = svcNew
      return fh.serializer.Encode(serviceList)
   }
   return b, nil
}


过滤框架现状


目前的过滤框架比较僵硬,将资源过滤硬编码至代码中,只能是已注册的资源才能进行相应的过滤,为了解决这个问题,需要对过滤框架进行相应的改造。


解决方案


方案一:


使用参数或者环境变量的形式自定义过滤配置,但是这种方式有以下弊端:

1、配置复杂需要将所以需要自定义的配置写入到启动参数或者读取环境变量 例如下格式:


--filter_serviceTopology=coredns/endpointslices#list,kube-proxy/services#list;watch --filter_endpointsFilter=nginx-ingress-controller/endpoints#list;watch

2、无法热更新,每次修改配置都需要重启 Yurthub 生效。


方案二:


1、使用 configmap 的形式自定义过滤配置降低配置复杂度配置格式(user-agent/resource#list,watch) 多个资源通过逗号隔开。如下所示:


filter_endpoints: coredns/endpoints#list;watch,test/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""

2、利用 Informer 机制保证配置实时生效


综合以上两点在 OpenYurt 中我们选择了解决方案二。


开发过程中遇到的问题

在边缘端 Informer watch 的 api 地址是 Yurthub 的代理地址,那么 Yurthub 在启动代理端口之前都是无法保证 configmap 的数据是正常的。如果在启动完成之后 addons 的请求先于 configmap 数据更新 这个时候会导致数据在没有过滤的情况下就返回给了 addons,这样会导致很多预期以外的问题。


为了解决这个问题 我们需要在 apporve 中加入 WaitForCacheSync 保证数据同步完成之后才能返回相应的过滤数据,但是在 apporve 中加入 WaitForCacheSync 也直接导致 configmap 进行 watch 的时候也会被阻塞,所以需要在 WaitForCacheSync 之前加入一个白名单机制,当 Yurthub 使用 list & watch 访问 configmap 的时候我们直接不进行数据过滤,相应的代码逻辑如下:


func (a *approver) Approve(comp, resource, verb string) bool {
   if a.isWhitelistReq(comp, resource, verb) {
      return false
   }
   if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {
      panic("wait for configMap cache sync timeout")
   }
   a.Lock()
   defer a.Unlock()
   for _, requests := range a.nameToRequests {
      for _, request := range requests {
         if request.Equal(comp, resource, verb) {
            return true
         }
      }
   }
   return false
}


总结


1、通过上述的扩展能力可以看出,YurtHub 不仅仅是边缘节点上的带有数据缓存能力的反向代理。而是对 Kubernetes 节点应用生命周期管理加了一层新的封装,提供边缘计算所需要的核心管控能力。


2、YurtHub 不仅仅适用于边缘计算场景,其实可以作为节点侧的一个常备组件,适用于使用 Kubernetes 的任意场景。相信这也会驱动 YurtHub 向更高性能,更高稳定性发展。


点击此处,立即了解 OpenYurt 项目!

相关文章
|
7月前
|
人工智能 API 开发者
HarmonyOS Next~鸿蒙应用框架开发实战:Ability Kit与Accessibility Kit深度解析
本书深入解析HarmonyOS应用框架开发,聚焦Ability Kit与Accessibility Kit两大核心组件。Ability Kit通过FA/PA双引擎架构实现跨设备协同,支持分布式能力开发;Accessibility Kit提供无障碍服务构建方案,优化用户体验。内容涵盖设计理念、实践案例、调试优化及未来演进方向,助力开发者打造高效、包容的分布式应用,体现HarmonyOS生态价值。
329 27
|
7月前
|
人工智能 自然语言处理 搜索推荐
ViDoRAG:开源多模态文档检索框架,多智能体推理+图文理解精准解析文档
ViDoRAG 是阿里巴巴通义实验室联合中国科学技术大学和上海交通大学推出的视觉文档检索增强生成框架,基于多智能体协作和动态迭代推理,显著提升复杂视觉文档的检索和生成效率。
345 8
ViDoRAG:开源多模态文档检索框架,多智能体推理+图文理解精准解析文档
|
7月前
|
数据采集 JSON 数据可视化
JSON数据解析实战:从嵌套结构到结构化表格
在信息爆炸的时代,从杂乱数据中提取精准知识图谱是数据侦探的挑战。本文以Google Scholar为例,解析嵌套JSON数据,提取文献信息并转换为结构化表格,通过Graphviz制作技术关系图谱,揭示文献间的隐秘联系。代码涵盖代理IP、请求头设置、JSON解析及可视化,提供完整实战案例。
436 4
JSON数据解析实战:从嵌套结构到结构化表格
|
7月前
|
JSON 监控 网络协议
Bilibili直播信息流:连接方法与数据解析
本文详细介绍了自行实现B站直播WebSocket连接的完整流程。解析了基于WebSocket的应用层协议结构,涵盖认证包构建、心跳机制维护及数据包解析步骤,为开发者定制直播数据监控提供了完整技术方案。
|
7月前
|
机器学习/深度学习 人工智能 Java
Java机器学习实战:基于DJL框架的手写数字识别全解析
在人工智能蓬勃发展的今天,Python凭借丰富的生态库(如TensorFlow、PyTorch)成为AI开发的首选语言。但Java作为企业级应用的基石,其在生产环境部署、性能优化和工程化方面的优势不容忽视。DJL(Deep Java Library)的出现完美填补了Java在深度学习领域的空白,它提供了一套统一的API,允许开发者无缝对接主流深度学习框架,将AI模型高效部署到Java生态中。本文将通过手写数字识别的完整流程,深入解析DJL框架的核心机制与应用实践。
373 3
|
9月前
|
存储 搜索推荐 大数据
数据大爆炸:解析大数据的起源及其对未来的启示
数据大爆炸:解析大数据的起源及其对未来的启示
422 15
数据大爆炸:解析大数据的起源及其对未来的启示
|
7月前
|
机器学习/深度学习 JSON 算法
淘宝拍立淘按图搜索API接口系列的应用与数据解析
淘宝拍立淘按图搜索API接口是阿里巴巴旗下淘宝平台提供的一项基于图像识别技术的创新服务。以下是对该接口系列的应用与数据解析的详细分析
|
9月前
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。
|
8月前
|
Java API 数据处理
深潜数据海洋:Java文件读写全面解析与实战指南
通过本文的详细解析与实战示例,您可以系统地掌握Java中各种文件读写操作,从基本的读写到高效的NIO操作,再到文件复制、移动和删除。希望这些内容能够帮助您在实际项目中处理文件数据,提高开发效率和代码质量。
178 4
|
8月前
|
数据采集 监控 搜索推荐
深度解析淘宝商品详情API接口:解锁电商数据新维度,驱动业务增长
淘宝商品详情API接口,是淘宝开放平台为第三方开发者提供的一套用于获取淘宝、天猫等电商平台商品详细信息的应用程序接口。该接口涵盖了商品的基本信息(如标题、价格、图片)、属性参数、库存状况、销量评价、物流信息等,是电商企业实现商品管理、市场分析、营销策略制定等功能的得力助手。

推荐镜像

更多
  • DNS