Sentinel Go-毫秒级统计数据结构揭秘

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
注册配置 MSE Nacos/ZooKeeper,182元/月
简介: 那么如何存储并统计这一段时间内的指标数据则是核心关键,本文将揭秘 Sentienl-Go 是如何实现的毫秒级指标数据存储与统计。

作者:binbin0325


背景介绍


1.png


随着微服务的流行,服务和服务之间的稳定性变得越来越重要。在 2020 年,Sentinel 社区推出 Sentinel Go 版本,朝着云原生方向演进。Sentinel Go 是一个流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。


无论是流量控制还是熔断降级,实现的核心思想都是通过统计一段时间内的指标数据(请求数/错误数等),然后根据预选设定的阈值判断是否应该进行流量管控


那么如何存储并统计这一段时间内的指标数据则是核心关键,本文将揭秘 Sentienl-Go 是如何实现的毫秒级指标数据存储与统计


固定窗口


在正式介绍之前,先简单介绍一下固定窗口的算法(也叫计数器算法)是实现流量控制比较简单的一种方式。其他常见的还有很多例如滑动时间窗口算法,漏桶算法,令牌桶算法等等。


固定窗口算法一般是通过原子操作将请求在统计周期内进行累加,然后当请求数大于阈值时进行限流。


实现代码:


var (
    counter    int64 //计数
    intervalMs int64 = 1000 //窗口长度(1S)
    threshold  int64 = 2 //限流阈值
    startTime        = time.Now().UnixMilli() //窗口开始时间
)
func main() {
    for i := 0; i < 10; i++ {
       if tryAcquire() {
          fmt.Println("成功请求", time.Now().Unix())
      }
   }
}
func tryAcquire() bool {
    if time.Now().UnixMilli()-atomic.LoadInt64(&startTime) > intervalMs {
       atomic.StoreInt64(&startTime, time.Now().UnixMilli())
       atomic.StoreInt64(&counter, 0)
   }
    return atomic.AddInt64(&counter, 1) <= threshold
}


固定窗口的限流在实现上看起来比较简单容易,但是也有一些问题,最典型的就是“边界”问题。


如下图:统计周期为 1S,限流阈值是 2 的情况下,假设 4 次请求恰好“跨越”了固定的时间窗口,如红色的 1SS 时间窗口所示会有四次请求,明显不符合限流的预期。


2.png


滑动时间窗口


在滑动时间窗口算法中可以解决固定窗口算法的边界问题,在滑动窗口算法中通常有两个比较重要的概念


  • 统计周期:例如想限制 5S 的请求数不能超过 100 次,那么 5S 就是统计周期
  • 窗口(格子)的大小:一个周期内会有多个窗口(格子)进行指标(例如请求数)的统计,长度相等的统计周期,格子的数量越多,统计的越精确 


如下所示:统计周期为 1S,每个周期内分为两个格子,每个格子的长度是 500ms。


3.png


在滑动窗口中统计周期以及窗口的大小,需要根据业务情况进行设定。


统计周期一致,窗口大小不一致:窗口越大统计精准度越低,但并发性能好,越小:统计精准度越高,并发性能随之降低;


统计周期不一致,窗口大小一致:周期越长抗流量脉冲情况越好。


统计结构


下面将详细介绍 Sentinel-Go 是如何使用滑动时间窗口高效的存储和统计指标数据的。


窗口结构


在滑动时间窗口中时间很重要。每个窗口(BocketWrap)的组成是由一个开始时间和一个抽象的统计结构:


type BucketWrap struct {
   // BucketStart represents start timestamp of this statistic bucket wrapper.
   BucketStart uint64
   // Value represents the actual data structure of the metrics (e.g. MetricBucket).
   Value atomic.Value
}


开始时间:当前格子的的起始时间


统计结构:存储指标数据,原子操作并发安全


如下图:统计周期 1S,每个窗口的长度是 200ms。


4.png


指标数据:


  1. pass: 表示到来的数量,即此刻通过 Sentinel-Go 规则的流量数量
  2. block: 表示被拦截的流量数量
  3. complete: 表示完成的流量数量,包含正常结束和异常结束的情况
  4. error: 表示错误的流量数量(熔断场景使用)
  5. rt:单次请求的 request time
  6. total:暂时无用 


原子时间轮


如上:整个统计周期内有多个时间窗口,在 Sentinel-Go 中统计周期是由 slice 实现的,每个元素对应一个窗口。


在上面介绍了为了解决边界问题,滑动时间窗口统计的过程需要向右滑动。随时时间的推移,无限的向右滑动,势必会让 slice 持续的扩张,导致 slice 的容量“无限”增长。


5.png


为了解决这个问题,在 Sentinel-Go 中实现了一个时间轮的概念,通过固定 slice 长度将过期的时间窗口重置,节省空间。


6.png


如下:原子时间轮数据结构


type AtomicBucketWrapArray struct {
   // The base address for real data array
   base unsafe.Pointer // 窗口数组首元素地址
   // The length of slice(array), it can not be modified.
   length int // 窗口数组的长度
   data   []*BucketWrap //窗口数组
}


  • 初始化


1: 根据当前时间计算出当前时间对应的窗口的 startime,并得到当前窗口对应的位置


// 计算开始时间
func calculateStartTime(now uint64, bucketLengthInMs uint32) uint64 {
   return now - (now % uint64(bucketLengthInMs))
}
// 窗口下标位置
idx := int((now / uint64(bucketLengthInMs)) % uint64(len))


7.png


2:初始化窗口数据结构(BucketWrap)


for i := idx; i <= len-1; i++ {
   ww := &BucketWrap{
      BucketStart: startTime,
      Value:       atomic.Value{},
   }
   ww.Value.Store(generator.NewEmptyBucket())
   ret.data[i] = ww
   startTime += uint64(bucketLengthInMs)
}
for i := 0; i < idx; i++ {
   ww := &BucketWrap{
      BucketStart: startTime,
      Value:       atomic.Value{},
   }
   ww.Value.Store(generator.NewEmptyBucket())
   ret.data[i] = ww
   startTime += uint64(bucketLengthInMs)
}


8.png


3:将窗口数组首元素地址设置到原子时间轮


// calculate base address for real data array
sliHeader := (*util.SliceHeader)(unsafe.Pointer(&ret.data))
ret.base = unsafe.Pointer((**BucketWrap)(unsafe.Pointer(sliHeader.Data)))


如果对unsafe.Pointer和slice熟悉的同学,对于这段代码不难理解。这里通过unsafe.Pointer 将底层 slice 首元素(第一个窗口)地址设置到原子时间轮中。这么做的原因主要是实现对时间轮中的元素(窗口)进行原子无锁的读取和更新


  • 窗口获取&窗口替换


如何在并发安全的情况下读取窗口和对窗口进行替换(时间轮涉及到对窗口更新操作)代码如下:


// 获取对应窗口
func (aa *AtomicBucketWrapArray) get(idx int) *BucketWrap {
   // aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
   // then convert to (*unsafe.Pointer)
   if offset, ok := aa.elementOffset(idx); ok {
      return (*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(offset)))
   }
   return nil
}
// 替换对应窗口
func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWrap) bool {
   // aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
   // then convert to (*unsafe.Pointer)
   // update secondary pointer
   if offset, ok := aa.elementOffset(idx); ok {
      return atomic.CompareAndSwapPointer((*unsafe.Pointer)(offset), unsafe.Pointer(except), unsafe.Pointer(update))
   }
   return false
}
// 获取对应窗口的地址
func (aa *AtomicBucketWrapArray) elementOffset(idx int) (unsafe.Pointer, bool) {
   if idx >= aa.length || idx < 0 {
      logging.Error(errors.New("array index out of bounds"),
         "array index out of bounds in AtomicBucketWrapArray.elementOffset()",
         "idx", idx, "arrayLength", aa.length)
      return nil, false
   }
   basePtr := aa.base
   return unsafe.Pointer(uintptr(basePtr) + uintptr(idx)*unsafe.Sizeof(basePtr)), true
}


获取窗口


  1. 在 get func 中接收根据当前时间计算出的窗口对应下标位置
  2. 根据下标位置在 elementOffset func 中,首先将底层的 slice 首元素地址转换成 uintptr,然后将窗口对应下标*对应的指针字节大小即可以得到对应窗口元素的地址
  3. 将对应窗口地址转换成时间窗口(*BucketWarp)即可


9.png


窗口更新


和获取窗口一样,获取到对应下标位置的窗口地址,然后利用 atomic.CompareAndSwapPointer 进行 cas 更新,将新的窗口指针地址更新到底层数组中。


滑动窗口


在原子时间轮中提供了对窗口读取以及更新的操作。那么在什么时机触发更新以及如何滑动?


  • 滑动


所谓滑动就是根据当前时间找到整个统计周期的所有窗口中的数据。例如在限流场景下,我们需要获取统计周期内的所有 pass 的流量,从而来判断当前流量是否应该被限流。


核心代码:


// 根据当前时间获取周期内的所有窗口
func (m *SlidingWindowMetric) getSatisfiedBuckets(now uint64) []*BucketWrap {
   start, end := m.getBucketStartRange(now)
   satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool {
      return ws >= start && ws <= end
   })
   return satisfiedBuckets
}
// 根据当前时间获取整个周期对应的窗口的开始时间和结束时间
func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) {
   curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs())
   end = curBucketStartTime
   start = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs())
   return
}
// 匹配符合条件的窗口
func (la *LeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*BucketWrap {
   if now <= 0 {
      return make([]*BucketWrap, 0)
   }
   ret := make([]*BucketWrap, 0, la.array.length)
   for i := 0; i < la.array.length; i++ {
      ww := la.array.get(i)
      if ww == nil || la.isBucketDeprecated(now, ww) || !predicate(atomic.LoadUint64(&ww.BucketStart)) {
         continue
      }
      ret = append(ret, ww)
   }
   return ret
}


如下图所示:统计周期=1000ms(跨两个格子),now=1300 时 计算出 start=500,end=1000


10.png


那么在计算周期内的 pass 数量时,会根据如下条件遍历格子,也就会找到开始时间是 500 和 1000 的两个格子,那么统计的时候 1000 的这个格子中的数据自然也会被统计到。(当前时间 1300,在 1000 的这个格子中)


satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool { 
     return ws >= start && ws <= end 
   })


  • 更新


每次流量经过时都会进行相应的指标存储,在存储时会先获取对应的窗口,然后会根据窗口的开始时间进行对比,如果过期则进行窗口重置。


如下图:根据窗口开始时间匹配发现 0 号窗口已过期。


11.png


如下图:重置窗口的开始时间和统计指标。


12.png


核心代码:


func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
   // 计算当前时间对应的窗口下标
   idx := la.calculateTimeIdx(now)
   // 计算当前时间对应的窗口的开始时间
   bucketStart := calculateStartTime(now, la.bucketLengthInMs)
   for {
     // 获取旧窗口
      old := la.array.get(idx)
      // 如果旧窗口==nil则初始化(正常不会执行这部分代码)
      if old == nil {
         newWrap := &BucketWrap{
            BucketStart: bucketStart,
            Value:       atomic.Value{},
         }
         newWrap.Value.Store(bg.NewEmptyBucket())
         if la.array.compareAndSet(idx, nil, newWrap) {
            return newWrap, nil
         } else {
            runtime.Gosched()
         }
      // 如果本次计算的开始时间等于旧窗口的开始时间,则认为窗口没有过期,直接返回
      } else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
         return old, nil
      //  如果本次计算的开始时间大于旧窗口的开始时间,则认为窗口过期尝试重置
      } else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
         if la.updateLock.TryLock() {
            old = bg.ResetBucketTo(old, bucketStart)
            la.updateLock.Unlock()
            return old, nil
         } else {
            runtime.Gosched()
         }
        ......
      } 
}


总结


通过上面的介绍可以了解到在 Sentienl-Go 中实现底层指标的统计代码量并不多,本质是通过“时间轮”进行指标的数据统计和存储,在时间轮中借鉴 slice 的底层实现利用 unsafe.Pointer 和 atomic 配合对时间轮进行无锁的原子操作,极大的提升了性能。


Sentinel-GO 整体的数据结构图:


13.png


作者介绍:

张斌斌(Github账号:binbin0325,公众号:柠檬汁Code),Sentinel-Golang Committer 、ChaosBlade Committer 、 Nacos PMC 、Apache Dubbo-Go Committer。目前主要关注于混沌工程、中间件以及云原生方向。


文章参考:

《golang unsafe.Pointer 使用原则以及 uintptr 隐藏的坑》

https://louyuting.blog.csdn.net/article/details/103826830

相关文章
|
4月前
|
人工智能 自然语言处理 算法
Go语言统计字符串中每个字符出现的次数 — 简易频率分析器
本案例实现一个字符统计程序,支持中文、英文及数字,可统计用户输入文本中各字符的出现次数,并以整洁格式输出。内容涵盖应用场景、知识点讲解、代码实现与拓展练习,适合学习文本分析及Go语言基础编程。
|
1月前
|
存储 监控 算法
基于 Go 语言跳表结构的局域网控制桌面软件进程管理算法研究
针对企业局域网控制桌面软件对海量进程实时监控的需求,本文提出基于跳表的高效管理方案。通过多级索引实现O(log n)的查询、插入与删除性能,结合Go语言实现并发安全的跳表结构,显著提升进程状态处理效率,适用于千级进程的毫秒级响应场景。
144 15
|
2月前
|
存储 监控 算法
企业电脑监控系统中基于 Go 语言的跳表结构设备数据索引算法研究
本文介绍基于Go语言的跳表算法在企业电脑监控系统中的应用,通过多层索引结构将数据查询、插入、删除操作优化至O(log n),显著提升海量设备数据管理效率,解决传统链表查询延迟问题,实现高效设备状态定位与异常筛选。
120 3
|
2月前
|
存储 Java 编译器
对比Java学习Go——程序结构与变量
本节对比了Java与Go语言的基础结构,包括“Hello, World!”程序、代码组织方式、入口函数定义、基本数据类型及变量声明方式。Java强调严格的面向对象结构,所有代码需置于类中,入口方法需严格符合`public static void main(String[] args)`格式;而Go语言结构更简洁,使用包和函数组织代码,入口函数为`func main()`。两种语言在变量声明、常量定义、类型系统等方面也存在显著差异,体现了各自的设计哲学。
|
4月前
|
存储 安全 算法
Go语言泛型-泛型对代码结构的优化
Go语言自1.18版本引入泛型,极大提升了代码的通用性与可维护性。通过泛型,开发者可以减少重复代码、提高类型安全性,并增强程序的复用性和可读性。本文详细介绍了泛型在数据结构、算法及映射功能中的应用,展示了其在优化代码结构方面的优势。同时,Go编译器对泛型代码进行类型推导,确保运行时性能不受影响。合理使用泛型,有助于构建更加灵活高效的程序。
|
4月前
|
消息中间件 存储 算法
Go语言实战案例-自定义队列结构
本案例讲解如何使用 Go 语言通过结构体和切片实现自定义队列结构,涵盖入队、出队、查看队头元素及判空等基本操作,并提供完整示例代码与运行结果,适合初学者学习队列数据结构的实现与应用。
|
4月前
|
存储 算法 安全
Go语言实战案例-自定义栈结构
本案例详解如何用Go语言自定义栈结构,涵盖栈的压栈、弹栈、查看栈顶等基本操作,适合初学者掌握数据结构与算法基础。
|
存储 Go 容器
深入探究Go语言中的数据结构
深入探究Go语言中的数据结构
296 3
|
7月前
|
算法 Go
【LeetCode 热题100】深入理解二叉树结构变化与路径特性(力扣104 / 226 / 114 / 543)(Go语言版)
本博客深入探讨二叉树的深度计算、结构变换与路径分析,涵盖四道经典题目:104(最大深度)、226(翻转二叉树)、114(展开为链表)和543(二叉树直径)。通过递归与遍历策略(前序、后序等),解析每题的核心思路与实现方法。结合代码示例(Go语言),帮助读者掌握二叉树相关算法的精髓。下一讲将聚焦二叉树构造问题,欢迎持续关注!
192 10
|
9月前
|
Go C语言
Go语言入门:分支结构
本文介绍了Go语言中的条件语句,包括`if...else`、`if...else if`和`switch`结构,并通过多个练习详细解释了它们的用法。`if...else`用于简单的条件判断;`if...else if`处理多条件分支;`switch`则适用于基于不同值的选择逻辑。特别地,文章还介绍了`fallthrough`关键字,用于优化重复代码。通过实例如判断年龄、奇偶数、公交乘车及成绩等级等,帮助读者更好地理解和应用这些结构。
169 15

热门文章

最新文章