'SingleFlight-抑制对下游多次重复请求,防止缓存击穿的利器'

简介: 'SingleFlight-抑制对下游多次重复请求,防止缓存击穿的利器'

微信截图_20230627113246.png

初入门径


Package singleflight provides a duplicate function call suppression mechanism.

singleflight包提供了一种抑制重复函数调用的机制


在处理多个goroutine同时调用同一函数时,SingleFlight可以只让一个goroutine去实际调用该函数,等到这个goroutine返回结果时,再将结果返回给其他几个同时调用该函数的goroutine.

这样可以减少并发调用的数量,减少对下游服务的并发重复请求,比较常见的使用场景是用来防止缓存击穿

达到归并回源的效果




使用场景


缓存击穿


如在双11时,维护有一个全局的活动是否结束的key,由运营配置,5分钟过期,重新从数据库里取.

当这个 Key 正好过期失效时, 大量请求会打到数据库上(即缓存击穿).

而用 SingleFlight 来解决缓存击穿问题再合适不过. 只需要只允许这些对同一个 Key 的并发请求中的一个能到数据库中查询,而后这些并发的请求可以共享该结果.

package main
import (
  "errors"
  "fmt"
  "golang.org/x/sync/singleflight"
  "log"
  "sync"
)
var errorNotExist = errors.New("not exist")
func main() {
  var wg sync.WaitGroup
  wg.Add(10)
  //模拟10个并发
  for i := 0; i < 10; i++ {
    go func() {
      defer wg.Done()
      data, err := getData("key")
      if err != nil {
        fmt.Print(err)
        return
      }
      fmt.Println(data)
      fmt.Println("---------")
    }()
  }
  wg.Wait()
}
var g singleflight.Group
//获取数据
func getData(key string) (string, error) {
  data, err := getDataFromCache(key)
  if err == errorNotExist {
    //模拟从db中获取数据
    data, err = getDataFromDB(key)
    if err != nil {
      log.Println(err)
      return "", err
    }
    //TOOD: set cache
  } else if err != nil {
    return "", err
  }
  return data, nil
}
//模拟从cache中获取值,cache中无该值
func getDataFromCache(key string) (string, error) {
  return "", errorNotExist
}
//模拟从数据库中获取值
func getDataFromDB(key string) (string, error) {
  fmt.Printf("get %s from database\n", key)
  return "数据库中的数据", nil
}

执行结果为:

get key from database
数据库中的数据
get key from database
get key from database
数据库中的数据
---------
get key from database
get key from database
数据库中的数据
---------
---------
数据库中的数据
get key from database
数据库中的数据
---------
get key from database
get key from database
数据库中的数据
---------
---------
数据库中的数据
---------
get key from database
数据库中的数据
---------
数据库中的数据
---------
get key from database
数据库中的数据
---------

可以看得到10个请求都走了db. 用singlefligth 包优化一下 getData

//获取数据
func getData(key string) (string, error) {
  data, err := getDataFromCache(key)
  if err == errorNotExist {
    //模拟从db中获取数据
    v, err, _ := g.Do(key, func() (interface{}, error) {
      return getDataFromDB(key)
      //set cache
    })
    if err != nil {
      log.Println(err)
      return "", err
    }
    //TOOD: set cache
    data = v.(string)
  } else if err != nil {
    return "", err
  }
  return data, nil
}

执行结果为:

get key from database
数据库中的数据
---------
数据库中的数据
---------
数据库中的数据
---------
数据库中的数据
---------
数据库中的数据
---------
数据库中的数据
---------
数据库中的数据
---------
数据库中的数据
---------
数据库中的数据
---------
数据库中的数据
---------

可以看得到只有一个请求走到了db,且其他请求也返回了正确的值. 从而可以大大降低DB的压力




源码实现


源码行数不多,加上注释一共212行.


点击查看 golang.org/x/sync/semaphore/semaphore.go源码:

go

复制代码

// Copyright 2013 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.// Package singleflight provides a duplicate function call suppression// mechanism.package singleflight // import "golang.org/x/sync/singleflight"import (
"bytes""errors""fmt""runtime""runtime/debug""sync")// errGoexit indicates the runtime.Goexit was called in// the user given function.var errGoexit = errors.New("runtime.Goexit was called")

// A panicError is an arbitrary value recovered from a panic// with the stack trace during the execution of given function.type panicError struct {
	value interface{}	stack []byte}// Error implements error interface.func(p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)}funcnewPanicError(v interface{})error {
	stack := debug.Stack()// The first line of the stack trace is of the form "goroutine N [status]:"// but by the time the panic reaches Do the goroutine may no longer exist// and its status will have changed. Trim out the misleading line.if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {		stack = stack[line+1:]	}return &panicError{value: v, stack: stack}}// call is an in-flight or completed singleflight.Do calltype call struct {
	wg sync.WaitGroup// These fields are written once before the WaitGroup is done// and are only read after the WaitGroup is done.// 函数的返回值,在 wg 返回前只会写入一次	val interface{}	err error// forgotten indicates whether Forget was called with this call's key// while the call was still in flight.// 使用调用了 Forgot 方法	forgotten bool// These fields are read and written with the singleflight// mutex held before the WaitGroup is done, and are read but// not written after the WaitGroup is done.// 统计调用次数以及返回的 channel	dups  int	chans []chan<- Result}// Group represents a class of work and forms a namespace in// which units of work can be executed with duplicate suppression.type Group struct {
	mu sync.Mutex       // protects m	m  map[string]*call // lazily initialized}// Result holds the results of Do, so they can be passed// on a channel.type Result struct {
	Val    interface{}	Err    error	Shared bool}// Do executes and returns the results of the given function, making// sure that only one execution is in-flight for a given key at a// time. If a duplicate comes in, the duplicate caller waits for the// original to complete and receives the same results.// The return value shared indicates whether v was given to multiple callers.func(g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()// 前面提到的懒加载if g.m == nil {		g.m = make(map[string]*call)	}// 会先去看 key 是否已经存在if c, ok := g.m[key]; ok {// 如果存在就会解锁		c.dups++ // 存在相同的key, 增加计数		g.mu.Unlock()// 然后等待 WaitGroup 执行完毕,只要一执行完,所有的 wait 都会被唤醒		c.wg.Wait() //等待这个key对应的fn调用完成// 这里区分 panic 错误和 runtime 的错误,避免出现死锁,后面可以看到为什么这么做if e, ok := c.err.(*panicError); ok {panic(e)		} elseif c.err == errGoexit {			runtime.Goexit()		}return c.val, c.err, true// 返回fn调用的结果	}// 如果没有找到这个 key 就 new call	c := new(call)  // 不存在key, 是第一个请求, 创建一个call结构体// 然后调用 waitgroup 这里只有第一次调用会 add 1,其他的都会调用 wait 阻塞掉// 所以这要这次调用返回,所有阻塞的调用都会被唤醒	c.wg.Add(1)	g.m[key] = c //加入到映射表中	g.mu.Unlock()// 然后调用 doCall 去执行	g.doCall(c, key, fn) // 调用方法return c.val, c.err, c.dups > 0}// DoChan is like Do but returns a channel that will receive the// results when they are ready.//// The returned channel will not be closed.// Do chan 和 Do 类似,其实就是一个是同步等待,一个是异步返回,主要实现上:// 如果调用 DoChan 会给 call.chans 添加一个 channel 这样等第一次调用执行完毕之后就会循环向这些 channel 写入数据func(g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1)	g.mu.Lock()if g.m == nil {		g.m = make(map[string]*call)	}if c, ok := g.m[key]; ok {		c.dups++		c.chans = append(c.chans, ch)		g.mu.Unlock()return ch	}	c := &call{chans: []chan<- Result{ch}}	c.wg.Add(1)	g.m[key] = c	g.mu.Unlock()go g.doCall(c, key, fn)return ch}// doCall handles the single call for a key.// 这个方法的实现有意思,使用了两个 defer 巧妙的将 runtime 的错误// 和我们传入 function 的 panic 区别开来// 避免了由于传入的 function panic 导致的死锁func(g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false	recovered := false// use double-defer to distinguish panic from runtime.Goexit,// more details see https://golang.org/cl/134395// 第一个 defer 检查 runtime 错误deferfunc() {// the given function invoked runtime.Goexit// 如果既没有正常执行完毕,又没有 recover 那就说明需要直接退出了if !normalReturn && !recovered {			c.err = errGoexit		}		c.wg.Done()		g.mu.Lock()defer g.mu.Unlock()// 如果已经 forgot 过了,就不要重复删除这个 key 了if !c.forgotten {delete(g.m, key)		}if e, ok := c.err.(*panicError); ok {// In order to prevent the waiting channels from being blocked forever,// needs to ensure that this panic cannot be recovered.// 如果返回的是 panic 错误,为了避免 channel 死锁,我们需要确保这个 panic 无法被恢复iflen(c.chans) > 0 {gopanic(e)select {} // Keep this goroutine around so that it will appear in the crash dump.			} else {panic(e)			}		} elseif c.err == errGoexit {// Already in the process of goexit, no need to call again// 已经准备退出了,也就不用做其他操作了		} else {// Normal return// 正常情况下向 channel 写入数据for _, ch := range c.chans {				ch <- Result{c.val, c.err, c.dups > 0}			}		}	}()// 使用一个匿名函数来执行func() {deferfunc() {if !normalReturn {// Ideally, we would wait to take a stack trace until we've determined// whether this is a panic or a runtime.Goexit.//// Unfortunately, the only way we can distinguish the two is to see// whether the recover stopped the goroutine from terminating, and by// the time we know that, the part of the stack trace relevant to the// panic has been discarded.// 如果 panic 了我们就 recover 掉,然后 new 一个 panic 的错误// 后面在上层重新 panicif r := recover(); r != nil {					c.err = newPanicError(r)				}			}		}()// 如果 fn 没有 panic 就会执行到这一步,如果 panic 了就不会执行到这一步// 所以可以通过这个变量来判断是否 panic 了		c.val, c.err = fn()		normalReturn = true	}()if !normalReturn {		recovered = true	}}// Forget tells the singleflight to forget about a key.  Future calls// to Do for this key will call the function rather than waiting for// an earlier call to complete.// 用于手动释放某个 key 下次调用就不会阻塞等待了func(g *Group) Forget(key string) {
	g.mu.Lock()if c, ok := g.m[key]; ok {		c.forgotten = true	}delete(g.m, key)	g.mu.Unlock()}


Do方法:


接收一个字符串Key和一个待调用的函数,会返回调用函数的结果和错误. 使用Do方法时,会根据提供的Key判断是否去真正调用fn函数.同一个 key,在同一时间只有第一次调用Do方法时才会去执行fn函数,其他并发的请求会等待调用的执行结果.

Do方法的执行逻辑是每次调用Do方法都会先去获取互斥锁,随后判断在映射表里是否已经有Key对应的fn函数调用信息的call结构体。

当不存在时,证明是这个Key的第一次请求,那么会初始化一个call结构体指针,增加SingleFlight内部持有的sync.WaitGroup计数器到1。释放互斥锁,然后阻塞的等待doCall方法执行fn函数的返回结果 当存在时,增加call结构体内代表fn重复调用次数的计数器dups,释放互斥锁,然后使用WaitGroup等待fn函数执行完成。


call结构体的val 和 err 两个字段只会在 doCall方法中执行fn有返回结果后才赋值,所以当 doCall方法 和 WaitGroup.Wait返回时,函数调用的结果和错误会返回给Do方法的所有调用者。

doCall方法会去实际调用fn函数,因为call结构体初始化后forgotten字段的默认值是false,fn调用有返回后,会把对应的Key删掉。这样这轮请求都返回后,下一轮使用同一的Key的请求会重新调用执行一次fn函数。


DoChan方法:


类似Do方法,只不过是异步调用.它会返回一个通道,等fn函数执行完,产生了结果后,就能从这个 chan 中接收这个结果.

它的执行逻辑和Do方法类似,唯一不同的是调用者不用阻塞等待调用的返回, DoChan方法会创建一个chan Result通道返回给调用者,调用者通过这个通道就能接受到fn函数的结果。这个chan Result通道,在返回给调用者前会先放到call结构体的维护的通知队列里,待fn函数返回结果后DoChan方法会把结果发送给通知队列中的每个通道。


Forget方法:


在SingleFlight中删除一个Key. 这样一来,之后这个Key的Do方法调用会执行fn函数,而不是等待前一个未完成的fn 函数的结果.




注意事项


  • 一个阻塞,全员等待
  • 一个出错,全部出错


即 "一荣俱荣,一损俱损"


Go并发编程(十二) Singleflight

Go Singleflight导致死锁问题分析




官方库或知名项目中的使用


项目中有大量使用,场景基本都是用于防止缓存击穿.


另外,

net标准库里使用的lookupGroup结构,将对相同域名的DNS记录查询合并成一个查询.

net库提供的DNS记录查询方法LookupIp, 使用lookupGroup这个SingleFlight进行合并查询的相关操作(使用的是异步查询的方法DoChan)

微信截图_20230627113432.png

net库的 h2_hundle.go ,以及[golang.org/x/net/http2/client_conn_pool.go],都试图用SingleFlight来优化现有代码

微信截图_20230627113443.png

Docker之前的某个版本,/docker/builder/fscache/fscache.go中有使用到SingleFlight

微信截图_20230627113530.png

目录
相关文章
|
3月前
|
存储 缓存 监控
缓存击穿、缓存穿透、缓存雪崩 3大问题,如何彻底解决?
【10月更文挑战第8天】在分布式系统中,缓存的使用极大地提高了系统的性能和响应速度。然而,缓存击穿、缓存穿透和缓存雪崩是三个常见的缓存相关问题,它们可能导致系统性能下降,甚至引发系统崩溃。本文将深入探讨这三个问题的成因、影响以及彻底的解决方案。
119 1
|
3月前
|
缓存 NoSQL 关系型数据库
redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿
本文深入探讨了Redis缓存的相关知识,包括缓存的概念、使用场景、可能出现的问题(缓存预热、缓存穿透、缓存雪崩、缓存击穿)及其解决方案。
241 0
redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿
|
5月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
2月前
|
缓存 NoSQL 数据库
缓存穿透、缓存击穿和缓存雪崩及其解决方案
在现代应用中,缓存是提升性能的关键技术之一。然而,缓存系统也可能遇到一系列问题,如缓存穿透、缓存击穿和缓存雪崩。这些问题可能导致数据库压力过大,甚至系统崩溃。本文将探讨这些问题及其解决方案。
|
4月前
|
缓存 JavaScript 中间件
优化Express.js应用程序性能:缓存策略、请求压缩和路由匹配
在开发Express.js应用时,采用合理的缓存策略、请求压缩及优化路由匹配可大幅提升性能。本文介绍如何利用`express.static`实现缓存、`compression`中间件压缩响应数据,并通过精确匹配、模块化路由及参数化路由提高路由处理效率,从而打造高效应用。
204 15
|
3月前
|
缓存 JavaScript CDN
一次js请求一般情况下有哪些地方会有缓存处理?
一次js请求一般情况下有哪些地方会有缓存处理?
46 4
|
3月前
|
消息中间件 缓存 NoSQL
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
69 2
|
4月前
|
存储 缓存 NoSQL
解决Redis缓存击穿问题的技术方法
解决Redis缓存击穿问题的技术方法
82 2
|
5月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
这篇文章介绍了如何在SpringBoot项目中整合Redis,并探讨了缓存穿透、缓存雪崩和缓存击穿的问题以及解决方法。文章还提供了解决缓存击穿问题的加锁示例代码,包括存在问题和问题解决后的版本,并指出了本地锁在分布式情况下的局限性,引出了分布式锁的概念。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
|
4月前
|
缓存 NoSQL 前端开发
16)缓存雪崩、缓存击穿、缓存穿透
16)缓存雪崩、缓存击穿、缓存穿透
45 0