singleflight解决缓存击穿与源码分析

简介: singleflight解决缓存击穿与源码分析

缓存击穿


在高并发的场景下,出现同一时刻有大量的请求同时查询一个key时,恰好在这个时候这个缓存key过期失效了,那么这些大量的请求转发到数据库DB上,高并发的对这一个key进行请求,就像水滴石穿一样,瞬间请求量给到DB,可能会把DB打死。


解决办法:

  1. 让热key永远不过期,定期去刷新数据就可以了。
  2. 为了避免出现缓存击穿的情况,可以在第一个请求去查询数据库的时候对他加一个互斥锁,其余的查询请求都会被阻塞住,直到锁被释放,后面的线程进来发现已经有缓存了,就直接走缓存,从而保护数据库。但是也是由于它会阻塞其他的线程,颗粒很大,此时系统吞吐量会下降。需要结合实际的业务去考虑是否要这么做。
  3. singleflight的设计思路,也会使用互斥锁,但是相对于方法二的加锁粒度会更细。将一组相同的请求合并成一个请求,使用map存储,只会有一个请求到达mysql,使用sync.waitgroup进行同步,对所有的请求返回相同的结果。

模拟演示

被击穿

package main
import (
  "errors"
  "log"
  "sync"
)
var errorNotExist = errors.New("not exist")
func main() {
  var wg sync.WaitGroup
  //模拟10个并发
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      data, err := getData("key")
      if err != nil {
        log.Print(err)
        return
      }
      log.Println(data)
    }()
  }
  wg.Wait()
}
//获取数据
func getData(key string) (string, error) {
  data, err := getDataFromCache(key)
  if err == errorNotExist {
    //模拟从db中获取数据
    data, err = getDataFromDB(key)
    if err != nil {
      log.Println(err)
      return "", err
    }
    //TODO set cache
  } else if err != nil {
    return "", err
  }
  return data, nil
}
//模拟从cache中获取值,cache中无该值
func getDataFromCache(key string) (string, error) {
  return "", errorNotExist
}
//模拟从数据库中获取值
func getDataFromDB(key string) (string, error) {
  log.Printf("get %s from database", key)
  return "data", nil
}

其中通过 getData(key) 获取数据, 逻辑是

  1. 先尝试从cache中获取
  2. 如果cache中不存在就从db中获取


模拟了10个并发请求,来同时调用 getData 函数,执行结果如下

2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
2022/01/19 21:03:04 get key from database
2022/01/19 21:03:04 data
进程 已完成,退出代码为 0

可以看得到10个请求都是走的db,因为cache中不存在该值

singleflight解决

对getData稍微改动一下

package main
import (
  "errors"
  "golang.org/x/sync/singleflight"
  "log"
  "sync"
)
var errorNotExist = errors.New("not exist")
var g singleflight.Group
func main() {
  var wg sync.WaitGroup
  //模拟10个并发
  for i := 0; i < 100; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      data, err := getData("key")
      if err != nil {
        log.Print(err)
        return
      }
      log.Println(data)
    }()
  }
  wg.Wait()
}
//获取数据
func getData(key string) (string, error) {
  data, err := getDataFromCache(key)
  if err == errorNotExist {
    //模拟从db中获取数据
    value, err, _ := g.Do(key, func() (interface{}, error) {
      return getDataFromDB(key)
      //set cache
    })
    if err != nil {
      log.Println(err)
      return "", err
    }
    //TODO: set cache
    data = value.(string)
  } else if err != nil {
    return "", err
  }
  return data, nil
}
//模拟从cache中获取值,cache中无该值
func getDataFromCache(key string) (string, error) {
  return "", errorNotExist
}
//模拟从数据库中获取值
func getDataFromDB(key string) (string, error) {
  log.Printf("get %s from database", key)
  return "data", nil
}

执行结果如下,可以看得到只有一个请求进入的DB,其他的请求也正常返回了值,从而保护了DB。

2022/01/19 21:16:48 get key from database
2022/01/19 21:16:48 data
2022/01/19 21:16:48 data
2022/01/19 21:16:48 data
2022/01/19 21:16:48 data
2022/01/19 21:16:48 data
2022/01/19 21:16:48 data
2022/01/19 21:16:48 data
2022/01/19 21:16:48 data
2022/01/19 21:16:48 data
2022/01/19 21:16:48 data
进程 已完成,退出代码为 0

源码分析

//Do方法,传入key,以及回调函数,如果key相同,fn方法只会执行一次,同步等待
//返回值v:表示fn执行结果
//返回值err:表示fn的返回的err
//第三个返回值shared:表示是否是真实fn返回的还是从保存的map[key]返回的,也就是共享的
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) 
//DoChan方法类似Do方法,只是返回的是一个chan
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result 
//Forget方法控制key关联的值是否失效,默认以上两个方法只要fn方法执行完成后,内部维护的fn的值也删除(即并发结束后就失效了)
func (g *Group) Forget(key string) 

singleflight的数据结构:

type Group struct {
  mu sync.Mutex       // 互斥锁,保证并发安全
  m  map[string]*call // 存储相同的请求,key是相同的请求,value保存返回的信息。
}
type call struct {
  wg sync.WaitGroup
  // 存储返回值
  val interface{}
  // 存储返回的错误信息
  err error
  // 标识别是否调用了Forgot方法
  forgotten bool
  // 统计相同请求的次数
  dups int
  // 使用DoChan方法使用,用channel进行通知
  chans []chan<- Result
}
// Dochan方法时使用
type Result struct {
  Val    interface{} // 存储返回值
  Err    error       // 存储返回的错误信息
  Shared bool        // 标示结果是否是共享结果
}
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
  // 代码块加锁
  g.mu.Lock()
  // map进行懒加载
  if g.m == nil {
    // map初始化
    g.m = make(map[string]*call)
  }
  // 判断是否有相同请求
  if c, ok := g.m[key]; ok {
    // 相同请求次数+1
    c.dups++
    // 解锁就好了,只需要等待执行结果了,不会有写入操作了
    g.mu.Unlock()
    // 已有请求在执行,只需要等待就好了
    c.wg.Wait()
    // 区分panic错误和runtime错误
    if e, ok := c.err.(*panicError); ok {
      panic(e)
    } else if c.err == errGoexit {
      runtime.Goexit()
    }
    return c.val, c.err, true
  }
  // 之前没有这个请求,则需要new一个指针类型
  c := new(call)
  // sync.WaitGroup的用法,只有一个请求运行,其他请求等待,所以只需要add(1)
  c.wg.Add(1)
  // m赋值
  g.m[key] = c
  // 没有写入操作了,解锁即可
  g.mu.Unlock()
  // 唯一的请求该去执行函数了
  g.doCall(c, key, fn)
  return c.val, c.err, c.dups > 0
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
  // 标识是否正常返回
  normalReturn := false
  // 标识别是否发生panic
  recovered := false
  defer func() {
    // 通过这个来判断是否是runtime导致直接退出了
    if !normalReturn && !recovered {
      // 返回runtime错误信息
      c.err = errGoexit
    }
    c.wg.Done()
    g.mu.Lock()
    defer g.mu.Unlock()
    // 防止重复删除key
    if !c.forgotten {
      delete(g.m, key)
    }
    // 检测是否出现了panic错误
    if e, ok := c.err.(*panicError); ok {
      // 如果是调用了dochan方法,为了channel避免死锁,这个panic要直接抛出去,不能recover住,要不就隐藏错误了
      if len(c.chans) > 0 {
        go panic(e) // 开一个写成panic
        select {}   // 保持住这个goroutine,这样可以将panic写入crash dump
      } else {
        panic(e)
      }
    } else if c.err == errGoexit {
      // runtime错误不需要做任何时,已经退出了
    } else {
      // 正常返回的话直接向channel写入数据就可以了
      for _, ch := range c.chans {
        ch <- Result{c.val, c.err, c.dups > 0}
      }
    }
  }()
  // 使用匿名函数目的是recover住panic,返回信息给上层
  func() {
    defer func() {
      if !normalReturn {
        // 发生了panic,我们recover住,然后把错误信息返回给上层
        if r := recover(); r != nil {
          c.err = newPanicError(r)
        }
      }
    }()
    // 执行函数
    c.val, c.err = fn()
    // fn没有发生panic
    normalReturn = true
  }()
  // 判断执行函数是否发生panic
  if !normalReturn {
    recovered = true
  }
}
//异步返回
// 入参数:key:标识相同请求,fn:要执行的函数
// 出参数:<- chan 等待接收结果的channel
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
  // 初始化channel
  ch := make(chan Result, 1)
  g.mu.Lock()
  // 懒加载
  if g.m == nil {
    g.m = make(map[string]*call)
  }
  // 判断是否有相同的请求
  if c, ok := g.m[key]; ok {
    //相同请求数量+1
    c.dups++
    // 添加等待的chan
    c.chans = append(c.chans, ch)
    g.mu.Unlock()
    return ch
  }
  c := &call{chans: []chan<- Result{ch}}
  c.wg.Add(1)
  g.m[key] = c
  g.mu.Unlock()
  // 开一个写成调用
  go g.doCall(c, key, fn)
  // 返回这个channel等待接收数据
  return ch
}
// 释放某个 key 下次调用就不会阻塞等待了
func (g *Group) Forget(key string) {
  g.mu.Lock()
  if c, ok := g.m[key]; ok {
    c.forgotten = true
  }
  delete(g.m, key)
  g.mu.Unlock()
}

注意事项

在使用singleflight时需要自己写回调函数,但是如果回调函数里面一直循环,那么相同请求就会阻塞在那里,需要特别注意


value, err, _ := g.Do(key, func() (interface{}, error) {
  for{
  }
})

singleflight Do方法原理总结

在Do方法中主要是通过waitgroup来控制的,主要流程如下:

  1. 设置一个map,如果key不存在,则创建一个*call,存入map中
  2. 如果已经在调用中key已存在map中,则wg.Wait等待第一个去DB的请求回来
  3. 第一个去DB的执行回调函数结束之后,在doCall方法中执行wg.Done,此时所有wg.Wait的协程都将继续执行


DoChan方法也是类似的逻辑,只是返回的是一个chan。

目录
相关文章
|
28天前
|
存储 缓存 监控
缓存击穿、缓存穿透、缓存雪崩 3大问题,如何彻底解决?
【10月更文挑战第8天】在分布式系统中,缓存的使用极大地提高了系统的性能和响应速度。然而,缓存击穿、缓存穿透和缓存雪崩是三个常见的缓存相关问题,它们可能导致系统性能下降,甚至引发系统崩溃。本文将深入探讨这三个问题的成因、影响以及彻底的解决方案。
61 1
|
1月前
|
缓存 NoSQL 关系型数据库
redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿
本文深入探讨了Redis缓存的相关知识,包括缓存的概念、使用场景、可能出现的问题(缓存预热、缓存穿透、缓存雪崩、缓存击穿)及其解决方案。
161 0
redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿
|
3月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
1月前
|
消息中间件 缓存 NoSQL
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
50 2
|
2月前
|
存储 缓存 NoSQL
解决Redis缓存击穿问题的技术方法
解决Redis缓存击穿问题的技术方法
63 2
|
3月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
这篇文章介绍了如何在SpringBoot项目中整合Redis,并探讨了缓存穿透、缓存雪崩和缓存击穿的问题以及解决方法。文章还提供了解决缓存击穿问题的加锁示例代码,包括存在问题和问题解决后的版本,并指出了本地锁在分布式情况下的局限性,引出了分布式锁的概念。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
|
2月前
|
缓存 NoSQL 前端开发
16)缓存雪崩、缓存击穿、缓存穿透
16)缓存雪崩、缓存击穿、缓存穿透
32 0
|
3月前
|
缓存 数据库
缓存穿透和击穿
【8月更文挑战第16天】
43 0
缓存穿透和击穿
|
3月前
|
缓存 NoSQL Redis
一天五道Java面试题----第九天(简述MySQL中索引类型对数据库的性能的影响--------->缓存雪崩、缓存穿透、缓存击穿)
这篇文章是关于Java面试中可能会遇到的五个问题,包括MySQL索引类型及其对数据库性能的影响、Redis的RDB和AOF持久化机制、Redis的过期键删除策略、Redis的单线程模型为何高效,以及缓存雪崩、缓存穿透和缓存击穿的概念及其解决方案。
|
3月前
|
存储 缓存 NoSQL
基于SpringBoot+Redis解决缓存与数据库一致性、缓存穿透、缓存雪崩、缓存击穿问题
这篇文章讨论了在使用SpringBoot和Redis时如何解决缓存与数据库一致性问题、缓存穿透、缓存雪崩和缓存击穿问题,并提供了相应的解决策略和示例代码。
77 0