victoriaMetrics库之布隆过滤器

简介: victoriaMetrics库之布隆过滤器

victoriaMetrics库之布隆过滤器

代码路径:/lib/bloomfilter

概述

victoriaMetrics的vmstorage组件会接收上游传递过来的指标,在现实场景中,指标或瞬时指标的数量级可能会非常恐怖,如果不限制缓存的大小,有可能会由于cache miss而导致出现过高的slow insert


为此,vmstorage提供了两个参数:maxHourlySeriesmaxDailySeries,用于限制每小时/每天添加到缓存的唯一序列。


唯一序列指表示唯一的时间序列,如metrics{label1="value1",label2="value2"}属于一个时间序列,但多条不同值的metrics{label1="value1",label2="value2"}属于同一条时间序列。victoriaMetrics使用如下方式来获取时序的唯一标识:

func getLabelsHash(labels []prompbmarshal.Label) uint64 {
  bb := labelsHashBufPool.Get()
  b := bb.B[:0]
  for _, label := range labels {
    b = append(b, label.Name...)
    b = append(b, label.Value...)
  }
  h := xxhash.Sum64(b)
  bb.B = b
  labelsHashBufPool.Put(bb)
  return h
}

限速器的初始化

victoriaMetrics使用了一个类似限速器的概念,限制每小时/每天新增的唯一序列,但与普通的限速器不同的是,它需要在序列级别进行限制,即判断某个序列是否是新的唯一序列,如果是,则需要进一步判断一段时间内缓存中新的时序数目是否超过限制,而不是简单地在请求层面进行限制。

hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour)
dailySeriesLimiter = bloomfilter.NewLimiter(*maxDailySeries, 24*time.Hour)

下面是新建限速器的函数,传入一个最大(序列)值,以及一个刷新时间。该函数中会:

  1. 初始化一个限速器,限速器的最大元素个数为maxItems
  2. 则启用了一个goroutine,当时间达到refreshInterval时会重置限速器
func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter {
  l := &Limiter{
    maxItems: maxItems,
    stopCh:   make(chan struct{}),
  }
  l.v.Store(newLimiter(maxItems)) //1
  l.wg.Add(1)
  go func() {
    defer l.wg.Done()
    t := time.NewTicker(refreshInterval)
    defer t.Stop()
    for {
      select {
      case <-t.C:
        l.v.Store(newLimiter(maxItems))//2
      case <-l.stopCh:
        return
      }
    }
  }()
  return l
}

限速器只有一个核心函数Add,当vmstorage接收到一个指标之后,会(通过getLabelsHash计算该指标的唯一标识(h),然后调用下面的Add函数来判断该唯一标识是否存在于缓存中。


如果当前存储的元素个数大于等于允许的最大元素,则通过过滤器判断缓存中是否已经存在该元素;否则将该元素直接加入过滤器中,后续允许将该元素加入到缓存中。

func (l *Limiter) Add(h uint64) bool {
  lm := l.v.Load().(*limiter)
  return lm.Add(h)
}
func (l *limiter) Add(h uint64) bool {
  currentItems := atomic.LoadUint64(&l.currentItems)
  if currentItems >= uint64(l.f.maxItems) {
    return l.f.Has(h)
  }
  if l.f.Add(h) {
    atomic.AddUint64(&l.currentItems, 1)
  }
  return true
}

上面的过滤器采用的是布隆过滤器,核心函数为HasAdd,分别用于判断某个元素是否存在于过滤器中,以及将元素添加到布隆过滤器中。


过滤器的初始化函数如下,bitsPerItem是个常量,值为16。bitsCount统计了过滤器中的总bit数,每个bit表示某个值的存在性。bits以64bit为单位的(后续称之为slot,目的是为了在bitsCount中快速检索目标bit)。计算bits时加上63的原因是为了四舍五入向上取值,比如当maxItems=1时至少需要1个unit64的slot。

func newFilter(maxItems int) *filter {
  bitsCount := maxItems * bitsPerItem
  bits := make([]uint64, (bitsCount+63)/64)
  return &filter{
    maxItems: maxItems,
    bits:     bits,
  }
}

为什么bitsPerItem为16?这篇文章给出了如何计算布隆过滤器的大小。在本代码中,k为4(hashesCount),期望的漏失率为0.003(可以从官方的filter_test.go中看出),则要求总存储和总元素的比例为15,为了方便检索slot(64bit,为16的倍数),将之设置为16。

if p > 0.003 {
    t.Fatalf("too big false hits share for maxItems=%d: %.5f, falseHits: %d", maxItems, p, falseHits)
  }

下面是过滤器的Add操作,目的是在过滤器中添加某个元素。Add函数中没有使用多个哈希函数来计算元素的哈希值,转而改变同一个元素的值,然后对相应的值应用相同的哈希函数,元素改变的次数受hashesCount的限制。

  1. 获取过滤器的完整存储,并转换为以bit单位
  2. 将元素h转换为byte数组,便于xxhash.Sum64计算
  3. 后续将执行hashesCount次哈希,降低漏失率
  4. 计算元素h的哈希
  5. 递增元素h,为下一次哈希做准备
  6. 取余法获取元素的bit范围
  7. 获取元素所在的slot(即uint64大小的bit范围)
  8. 获取元素所在的slot中的bit位,该位为1表示该元素存在,为0表示该元素不存在
  9. 获取元素所在bit位的掩码
  10. 加载元素所在的slot的数值
  11. 如果w & mask结果为0,说明该元素不存在,
  12. 将元素所在的slot(w)中的元素所在的bit位(mask)置为1,表示添加了该元素
  13. 由于Add函数可以并发访问,因此bits[i]有可能被其他操作修改,因此需要通过重新加载(14)并通过循环来在bits[i]中设置该元素的存在性
func (f *filter) Add(h uint64) bool {
  bits := f.bits
  maxBits := uint64(len(bits)) * 64 //1
  bp := (*[8]byte)(unsafe.Pointer(&h))//2
  b := bp[:]
  isNew := false
  for i := 0; i < hashesCount; i++ {//3
    hi := xxhash.Sum64(b)//4
    h++ //5
    idx := hi % maxBits //6
    i := idx / 64 //7
    j := idx % 64 //8
    mask := uint64(1) << j //9
    w := atomic.LoadUint64(&bits[i])//10
    for (w & mask) == 0 {//11
      wNew := w | mask //12
      if atomic.CompareAndSwapUint64(&bits[i], w, wNew) {//13
        isNew = true//14
        break
      }
      w = atomic.LoadUint64(&bits[i])//14
    }
  }
  return isNew
}

看懂了Add函数,Has就相当简单了,它只是Add函数的缩减版,无需设置bits[i]

func (f *filter) Has(h uint64) bool {
  bits := f.bits
  maxBits := uint64(len(bits)) * 64
  bp := (*[8]byte)(unsafe.Pointer(&h))
  b := bp[:]
  for i := 0; i < hashesCount; i++ {
    hi := xxhash.Sum64(b)
    h++
    idx := hi % maxBits
    i := idx / 64
    j := idx % 64
    mask := uint64(1) << j
    w := atomic.LoadUint64(&bits[i])
    if (w & mask) == 0 {
      return false
    }
  }
  return true
}

总结

由于victoriaMetrics的过滤器采用的是布隆过滤器,因此它的限速并不精准,在源码条件下, 大约有3%的偏差。但同样地,由于采用了布隆过滤器,降低了所需的内存以及相关计算资源。此外victoriaMetrics的过滤器实现了并发访问。

过滤器为了支持并发访问,使用atomic来实现数值存储和加载以及数值变更等操作。


在大流量场景中,如果需要对请求进行相对精准的过滤,可以考虑使用布隆过滤器,降低所需要的资源,但前提是过滤的结果能够忍受一定程度的漏失率。

目录
相关文章
|
5月前
|
存储 NoSQL Redis
详解布隆过滤器的原理、使用场景和注意事项
详解布隆过滤器的原理、使用场景和注意事项
229 0
|
3月前
|
存储 缓存 NoSQL
详解布隆过滤器原理与实现
详解布隆过滤器原理与实现
|
5月前
|
XML 监控 大数据
基于Guava布隆过滤器优化海量字符串去重策略
**Guava Bloom Filter实践:** 在大数据场景下,利用布隆过滤器进行高效字符串去重。Guava提供易用的BloomFilter实现,通过添加Guava依赖,设定预期元素数和误报率来创建过滤器。尽管可能产生误报,但不会漏报,常用于初期快速判断。添加元素,使用`mightContain`查询,若可能存在,再用精确数据结构确认。优化涉及选择哈希函数、调整误报率和避免重复添加。
|
5月前
|
存储 算法 安全
基于Guava布隆过滤器的海量字符串高效去重实践
基于Guava布隆过滤器的海量字符串高效去重实践
|
6月前
|
存储 算法 C++
【C++高阶(六)】哈希的应用--位图&布隆过滤器
【C++高阶(六)】哈希的应用--位图&布隆过滤器
|
6月前
|
算法 NoSQL Redis
一文搞懂布隆过滤器(BloomFilter)
一文搞懂布隆过滤器(BloomFilter)
338 0
|
6月前
|
存储 数据采集 缓存
解密布隆过滤器:数据领域的魔法阵
解密布隆过滤器:数据领域的魔法阵
101 0
|
6月前
|
存储 NoSQL Java
什么是布隆过滤器?如何实现布隆过滤器?
什么是布隆过滤器?如何实现布隆过滤器?
130 0
|
存储 数据采集 缓存
布隆过滤器:原理与应用
在日常生活和工作中,我们经常需要处理海量的数据,筛选出有用的信息。这个时候,布隆过滤器(Bloom Filter)就派上了用场。
187 1
布隆过滤器:原理与应用
|
数据采集 缓存 NoSQL
干货 | 使用布隆过滤器实现高效缓存
本文主要描述,使用布隆过滤实现高效缓存。文中采用数组做为缓存,如果需要高并发命中,则需将文中的数组换成Redis数据库。
干货 | 使用布隆过滤器实现高效缓存