Golang SQL连接池梳理 (二)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Golang SQL连接池梳理 (二)

五、流程梳理#


5.1、先获取DB实例#


在golang中,要想获取连接,一般我们都得通过下面这段代码获取到DB的封装结构体实例。


通过上面的三个结构体可以看出 DB 、driverConn、Conn的关系如下:



所以我们的代码一般长成下面这样,先获取一个DB结构体的实例,DB结构体中有维护连接池、以及和创建连接,关闭连接协程通信的channel,已经各种配置参数。


上图中浅蓝色部分的 freeConn就是空闲连接池,里面的driver包下的Conn interface就是具体的连接。


/**
 * MySQL连接相关的逻辑
 */
type Conenctor struct {
  BaseInfo BaseInfo
  DB       *sql.DB
}
func (c *Conenctor) Open() {
  // 读取配置
  c.loadConfig()
  dataSource := c.BaseInfo.RootUserName + ":" + c.BaseInfo.RootPassword + "@tcp(" + c.BaseInfo.Addr + ":" + c.BaseInfo.Port + ")/" + c.BaseInfo.DBName
  db, Err := sql.Open("mysql", dataSource)
  if Err != nil {
    common.Error("Fail to opendb dataSource:[%v] Err:[%v]", dataSource, Err.Error())
    return
  }
  db.SetMaxOpenConns(500)
  db.SetMaxIdleConns(200)
  c.DB = db
  Err = db.Ping()
  if Err != nil {
    fmt.Printf("Fail to Ping DB Err :[%v]", Err.Error())
    return
  }
}


5.2、流程梳理入口:#


比如我们自己写代码时,可能会搞这样一个方法做增删改


// 插入、更新、删除
func (c *Conenctor) Exec(ctx context.Context, 
                         sqlText string,
                         params ...interface{}) (qr *QueryResults) {
  qr = &QueryResults{}
  result, err := c.DB.ExecContext(ctx, sqlText, params...)
  defer HandleException()
  if err != nil {
    qr.EffectRow = 0
    qr.Err = err
    common.Error("Fail to exec qurey sqlText:[%v] params:[%v] err:[%v]", sqlText, params, err)
    return
  }
  qr.EffectRow, _ = result.RowsAffected()
  qr.LastInsertId, _ = result.LastInsertId()
  return
}


主要是使用DB.ExecContext()执行SQL,获取返回值。

ctx是业务代码传入的上线文,通常是做超时限制使用。

其实这里并不是严格意义上的去执行sql,它其实是通过和MySQL-Server之间建立的连接将sql+params发往MySQL-Server去解析和执行。

进入DB.ExecContext()


主要逻辑如下:exec()方法的主要功能是:获取连接,发送sql和参数。

  • 如果获取一次失败一次,当失败的次数达到sql包预定义的常量maxBadConnRetries的情况下,将会创建新的连接使用
  • 未超过maxBadConnRetries,被打上cachedOrNewConn,优先从空闲池中获取连接


func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
   var res Result
   var err error
   for i := 0; i < maxBadConnRetries; i++ {
      res, err = db.exec(ctx, query, args, cachedOrNewConn)
      if err != driver.ErrBadConn {
         break
      }
   }
   if err == driver.ErrBadConn {
      return db.exec(ctx, query, args, alwaysNewConn)
   }
   return res, err
}


跟进exec() --> db.conn(ctx, strategy)


func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) {
  // 这个strategy就是上一步我们告诉他是创建新连接,还是优先从缓存池中获取连接。
  dc, err := db.conn(ctx, strategy)
  ..
}


5.3、获取连接#


跟进conn()方法

conn方法的返回值是driverConn,也就是我们上面说的数据库连接,作用就是说,跟据传递进来的获取策略,获取数据库连接,如果正常就返回获取到的数据库连接,异常就返回错误err


这张图是conn获取连接的流程图,根据下面这段代码画出来的,注释有写在代码上



// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
  db.mu.Lock()
  // 先监测db是否关闭了
  if db.closed {
    db.mu.Unlock()
    // DB都关闭了,直接返回DBClosed错误,没必要再去获取连接。
    return nil, errDBClosed
  }
  // 检查用户传递进来的Context是否过期了
  select {
  default:
  // 如果用户那边使用了ctx.Done(),毫无疑问,会进入这个case中,返回Ctx错误  
  case <-ctx.Done():
    db.mu.Unlock()
    return nil, ctx.Err()
  }
  // 连接被重用的时间,如果为0,表示 理论上这个连接永不过期,一直可以被使用
  lifetime := db.maxLifetime
  // 看一下空闲连接池(他是个slice)是否是还有空闲的连接
  numFree := len(db.freeConn)
  // 如果获取策略是优先从连接池中获取,并且连接池中确实存在空闲的连接,就从freeConn中取连接使用。
  if strategy == cachedOrNewConn && numFree > 0 {
    // 假设空闲池还剩下五条连接:【1,2,3,4,5】
    // 取出第一条 conn == 1
    conn := db.freeConn[0]
    // 切片的拷贝,实现remove掉第一个连接的目的。
    copy(db.freeConn, db.freeConn[1:])
    // 如果db.freeConn[1:]会导致freeConn变小,所以这里是 db.freeConn = db.freeConn[:numFree-1]
    db.freeConn = db.freeConn[:numFree-1]
    // 这里获取的连接是driverConn,它其实是对真实连接,driver.Conn的封装。
    // 在driver.Conn的基础上多一层封装可以实现在driver.Conn的基础上,加持上状态信息,如下
    conn.inUse = true
    db.mu.Unlock()
    // 检查是否过期
    if conn.expired(lifetime) {
      conn.Close()
      return nil, driver.ErrBadConn
    }
    // Lock around reading lastErr to ensure the session resetter finished.
    // 加锁处理,确保这个conn未曾被标记为 lastErr状态。
    // 一旦被标记为这个状态说明 ConnectionRestter协程在重置conn的状态时发生了错误。也就是这个连接其实已经坏掉了,不可使用。
    conn.Lock()
    err := conn.lastErr
    conn.Unlock()
    // 如果检测到这种错误,driver.ErrBadConn 表示连接不可用,关闭连接,返回错误。
    if err == driver.ErrBadConn {
      conn.Close()
      return nil, driver.ErrBadConn
    }
    return conn, nil
  }
  // Out of free connections or we were asked not to use one. If we're not
  // allowed to open any more connections, make a request and wait.
  // db.maxOpen > 0 表示当前DB实例允许打开连接
  // db.numOpen >= db.maxOpen表示当前DB能打开的连接数,已经大于它能打开的最大连接数,就构建一个request,然后等待获取连接
  if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
    // Make the connRequest channel. It's buffered so that the
    // connectionOpener doesn't block while waiting for the req to be read.
    // 构建connRequest这个channel,缓存大小是1
    // 用于告诉connectionOpener协程,需要打开一个新的连接。
    req := make(chan connRequest, 1)
    /**
      nextRequestKeyLocked函数如下:
      func (db *DB) nextRequestKeyLocked() uint64 {
        next := db.nextRequest
        db.nextRequest++
        return next
      }
      主要作用就是将nextRequest+1,
      至于这个nextRequest的作用我们前面也说过了,它相当于binlog中的next_trx下一个事物的事物id。
      言外之意是这个nextRequest递增的(因为这段代码被加了lock)。
      看如下的代码中,将这个自增后的nextRequest当返回值返回出去。
      然后紧接着将它作为map的key
      至于这个map嘛:
      在本文一开始的位置,我们介绍了DB结构体有这样一个属性,连接请求的map, key是自增的int64类型的数,
      用于唯一标示这个请求分配的
      connRequests map[uint64]chan connRequest 
     */
    reqKey := db.nextRequestKeyLocked()
    // 将这个第n个请求对应channel缓存起来,开始等待有合适的机会分配给他连接
    db.connRequests[reqKey] = req
    // 等待数增加,解锁
    db.waitCount++
    db.mu.Unlock()
    waitStart := time.Now()
    // Timeout the connection request with the context.
    // 进入下面的slice中
    select {
    // 如果客户端传入的上下文超时了,进入这个case
    case <-ctx.Done():
      // Remove the connection request and ensure no value has been sent
      // on it after removing.
      // 当上下文超时时,表示上层的客户端代码想断开,意味着在这个方法收到这个信号后需要退出了
      // 这里将db的connRequests中的reqKey清除,防止还给他分配一个连接。
      db.mu.Lock()
      delete(db.connRequests, reqKey)
      db.mu.Unlock()
      atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
      // 这里也会尝试从req channel中获取一下有没有可用的连接
      // 如果有的话执行 db.putConn(ret.conn, ret.err, false) ,目的是释放掉这个连接
      select {
      default:
      case ret, ok := <-req:
        if ok && ret.conn != nil {
          // 看到这里只需要知道他是用来释放连接的就ok,继续往下看,稍后再杀回来
          db.putConn(ret.conn, ret.err, false)
        }
      }
      //返回ctx异常。
      return nil, ctx.Err()
    // 尝试从 reqchannel 中取出连接
    case ret, ok := <-req:
      atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
      // 处理错误
      if !ok {
        return nil, errDBClosed
      }
      // 检测连接是否过期了,前面也提到过,DB实例有维护一个参数,maxLifeTime,0表示永不过期
      if ret.err == nil && ret.conn.expired(lifetime) {
        ret.conn.Close()
        return nil, driver.ErrBadConn
      }
      // 健壮性检查
      if ret.conn == nil {
        return nil, ret.err
      }
      // Lock around reading lastErr to ensure the session resetter finished.
      // 检查连接是否可用
      ret.conn.Lock()
      err := ret.conn.lastErr
      ret.conn.Unlock()
      if err == driver.ErrBadConn {
        ret.conn.Close()
        return nil, driver.ErrBadConn
      }
      return ret.conn, ret.err
    }
  }
  // 代码能运行到这里说明上面的if条件没有被命中。
  // 换句话说,来到这里说明具备如下条件
  // 1:当前DB实例的空闲连接池中已经没有空闲连接了,获取明确指定,不从空闲池中获取连接,就想新建连接。
  // 2: 当前DB实例允许打开连接
  // 3: DB实例目前打开的连接数还没有到达它能打开的最大连接数的上限。
  // 记录当前DB已经打开的连接数+1
  db.numOpen++ // optimistically
  db.mu.Unlock()
  ci, err := db.connector.Connect(ctx)
  if err != nil {
    db.mu.Lock()
    db.numOpen-- // correct for earlier optimism
    db.maybeOpenNewConnections()
    db.mu.Unlock()
    return nil, err
  }
  db.mu.Lock()
  // 构建一个连接实例,并返回
  dc := &driverConn{
    db:        db,
    createdAt: nowFunc(),
    ci:        ci,
    inUse:     true,
  }
  db.addDepLocked(dc, dc)
  db.mu.Unlock()
  return dc, nil
}


5.4、释放连接#


连接被是过后是需要被释放的

释放连接的逻辑封装在DB实例中


db.putConn(ret.conn, ret.err, false)


释放连接的流程图如下:



流程图根据如下的代码画出。

方法详细信息如下:


func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
  // 释放连接的操作加锁
  db.mu.Lock()
  // debug的信息
  if !dc.inUse {
    if debugGetPut {
      fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
    }
    panic("sql: connection returned that was never out")
  }
  if debugGetPut {
    db.lastPut[dc] = stack()
  }
  // 标记driverConn处理不可用的状态
  dc.inUse = false
  for _, fn := range dc.onPut {
    fn()
  }
  dc.onPut = nil
  // 本方法的入参中有参数err
  // 当会话获取出这个连接后,发现这个连接过期了、或者被标记上来lastErr时,再调用这个putConn方法时,同时会将这个错误传递进来,然后在这里判断,当出现坏掉的连接时就不直接把这个连接放回空闲连接池了。
  if err == driver.ErrBadConn {
    // Don't reuse bad connections.
    // Since the conn is considered bad and is being discarded, treat it
    // as closed. Don't decrement the open count here, finalClose will
    // take care of that.
    // 这个方法的作用如下:
    // 他会去判断当前DB维护的map的容量,也就是前面提到的那种情况:当DB允许打开连接,但是现在的连接数已经达到当前DB允许打开的最大连接数上限了,那么针对接下来想要获取连接的请求的处理逻辑就是,构建一个req channel,放入connRequests这个map中,表示他们正在等待连接的建立。
    // 换句话说,这时系统时繁忙的,业务处于高峰,那么问题来了,现在竟然出现了一个坏掉的连接,那为了把对业务线的影响降到最低,是不是得主动新建一个新的连接放到空闲连接池中呢?
    //  db.maybeOpenNewConnections() 函数主要干的就是这个事。
    //  方法详情如下
    /*
      func (db *DB) maybeOpenNewConnections() {
          numRequests := len(db.connRequests)
          if db.maxOpen > 0 {
            numCanOpen := db.maxOpen - db.numOpen
          if numRequests > numCanOpen {
            numRequests = numCanOpen
          }
      }
      for numRequests > 0 {
            db.numOpen++ // optimistically
            numRequests--
            if db.closed {
              return
            }
          // 它只是往这个 openerCh channel中写入一个空的结构体,会有专门的协程负责创建连接
          db.openerCh <- struct{}{}
      }
    }
    */
    db.maybeOpenNewConnections()
    //  解锁,关闭连接,返回
    db.mu.Unlock()
    dc.Close()
    return
  }
  if putConnHook != nil {
    putConnHook(db, dc)
  }
  // 如果DB已经关闭了,标记 resetSession为 false
  if db.closed {
    // Connections do not need to be reset if they will be closed.
    // Prevents writing to resetterCh after the DB has closed.
    // 当DB都已经关了,意味着DB里面的连接池都没有了,那当然不需要关闭连接池中的连接了~
    resetSession = false
  }
  // 如果DB没有关闭的话,进入if代码块
  if resetSession {
    // 将dricerConn中的Conn验证转换为driver.SessionResetter
    if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
      // 在此处锁定driverConn,以便在连接重置之前不会释放。
      // 必须在将连接放入池之前获取锁,以防止在重置之前将其取出
      dc.Lock()
    }
  }
  // 真正将连接放回空闲连接池中
  // 满足connRequest或将driverConn放入空闲池并返回true或false
  /*
    func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
        // 检测如果DB都关闭块,直接返回flase
        if db.closed {
          return false
        }
        // 如果DB当前打开的连接数大于DB能打开的最大的连接数,返回false
        if db.maxOpen > 0 && db.numOpen > db.maxOpen {
          return false
        }
        //如果等待获取连接的map中有存货
     if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        // 取出map中的第一个key
        for reqKey, req = range db.connRequests {
          break
        }
        // 将这个key,value再map中删除
        delete(db.connRequests, reqKey) // Remove from pending requests.
        // 重新标记这个连接是可用的状态
        if err == nil {
          dc.inUse = true
        }
        // 将这个连接放入到 req channel中,给等待连接到会话使用
        req <- connRequest{
          conn: dc,
          err:  err,
        }
        return true
    // 来到这个if,说明此时没有任何请求在等待获取连接,并且没有发生错误,DB也没有关闭
    } else if err == nil && !db.closed {
        // 比较当前空闲连接池的大小(默认是2) 和 freeConn空闲连接数的数量
        // 意思是,如果空闲的连接超出了这个规定的阈值,空闲连接是需要被收回的。
        if db.maxIdleConnsLocked() > len(db.freeConn) {
          // 收回
          db.freeConn = append(db.freeConn, dc)
          db.startCleanerLocked()
          return true
        }
        // 如果空闲连接还没到阈值,保留这个连接当作空闲连接
        db.maxIdleClosed++
    }   
        // 收回空闲连接返回false
        return false
}
  */
  // 如果将连接成功放入了空闲连接池,或者将连接成功给了等待连接到会话使用,此处返回true
  // 收回空闲连接返回false
  // 代码详情就是在上面的这段注释中
  added := db.putConnDBLocked(dc, nil)
  db.mu.Unlock()
  // 如果
  if !added {
    // 如果DB没有关闭,进入if
    if resetSession {
      dc.Unlock()
    }
    dc.Close()
    return
  }
  // 重新校验,如果连接关闭了,进入if
  if !resetSession {
    return
  }
  // 如果负责重置 conn状态的线程阻塞住了,那么标记这个driverConn为lastErr
  select {
  default:
    // If the resetterCh is blocking then mark the connection
    // as bad and continue on.
    dc.lastErr = driver.ErrBadConn
    dc.Unlock()
  case db.resetterCh <- dc:
  }
}


5.5、connectionOpener#

5.5.1、是什么?


这个connectionOpener是一个工作协程,它会去尝试消费指定的channel,负责创建数据库连接,其实在前面阅读获取连接的逻辑时,有这样的两种情况会阻塞等待


connectionOpener来新创建连接:

第一种:当获取连接的策略是优先从cache连接池中获取出来,但是空闲连接池已经没有空闲的连接了,首先这时DB允许打开连接,但是DB能打开的连接数已经达到了它能打开的连接数的上线,所以得等待有空闲连接出现,或者等有连接被释放后,DB能当前打开的连接数小于了它能打开的连接数的最大值,这时它会被阻塞等待去尝试创建连接。


第二种:获取连接的策略不再是优先从空闲缓冲池中获取连接,直接明了的想获取最一条新连接,同样的此时DB已经打开的连接数大于它能打开连接数的上线,它会被阻塞等待创建连接。



5.5.2、什么时候开启的?


func OpenDB(c driver.Connector) *DB {
  ctx, cancel := context.WithCancel(context.Background())
  db := &DB{
    connector:    c,
    openerCh:     make(chan struct{}, connectionRequestQueueSize),
    resetterCh:   make(chan *driverConn, 50),
    lastPut:      make(map[*driverConn]string),
    connRequests: make(map[uint64]chan connRequest),
    stop:         cancel,
  }
  // 可以看到他是在DB被实例化时开启的。
  go db.connectionOpener(ctx)
  go db.connectionResetter(ctx)
  return db
}


5.5.3、代码详情


可以看到它一直尝试从db的openerCh中获取内容,而且只要获取到了内容,就会调用方法打开连接。


// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      return
    // here  
    case <-db.openerCh:
      db.openNewConnection(ctx)
    }
  }
}


5.5.4、谁往openerCh中投放消息?


往channl中投放消息的逻辑在db的mayBeOpenNewConnections中


func (db *DB) maybeOpenNewConnections() {
  // 通过检查这个map的长度来决定是否往opennerCh中投放消息
  numRequests := len(db.connRequests)
  if db.maxOpen > 0 {
    numCanOpen := db.maxOpen - db.numOpen
    if numRequests > numCanOpen {
      numRequests = numCanOpen
    }
  }
  for numRequests > 0 {
    db.numOpen++ // optimistically
    numRequests--
    if db.closed {
      return
    }
    // 一旦执行了这一步,connectionOpener 就会监听到去创建连接。
    db.openerCh <- struct{}{}
  }
}


5.5.5、注意点:


在DB结构体中有这样一个属性


// 连接池的大小,0意味着使用默认的大小2, 小于0表示不使用连接池
  maxIdle           int    // zero means defaultMaxIdleConns; negative means 0


表示空闲连接池默认的大小,如果它为0,表示都没有缓存池,也就意味着会为所有想获取连接的请求创建新的conn,这时也就不会有这个opnerCh,更不会有connectionOpener


5.6、connectionCleaner#

5.6.1、是什么?有啥用?


它同样以一条协程的形式存在,用于定时清理数据库连接池中过期的连接


func (db *DB) startCleanerLocked() {
  if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
    db.cleanerCh = make(chan struct{}, 1)
    go db.connectionCleaner(db.maxLifetime)
  }
}


5.6.2、注意点


同样的,DB中存在一个参数:maxLifetime

它表示数据库连接最大的生命时长,如果将它设置为0,表示这个连接永不过期,既然所有的连接永不过期,就不会存在connectionCleaner去定时根据maxLifetime来定时清理连接。


它的调用时机是:需要将连接放回到连接池时调用。


5.7、connectionRestter#

5.7.1、作用


我们使用获取的连接的封装结构体是driverConn,其实它是会driver包下的Conn连接的又一层封装,目的是增强

driver包下的Conn的,多出来了一些状态。当将使用完毕的连接放入连接池时,就得将这些状态清除掉。


使用谁去清除呢?就是这个go 协程:connectionRestter

当connectionRestter碰到错误时,会将这个conn标记为lastErr,连接使用者在使用连接时会先校验conn的诸多状态,比如出现lastErr,会返回给客户端 badConnErr

相关文章
|
7月前
|
SQL 关系型数据库 MySQL
mysqldiff - Golang 针对 MySQL 数据库表结构的差异 SQL 工具
Golang 针对 MySQL 数据库表结构的差异 SQL 工具。https://github.com/camry/mysqldiff
104 7
|
SQL JSON Java
知识分享之Golang——在Goland中快速基于JSON或SQL创建struct
知识分享之Golang篇是我在日常使用Golang时学习到的各种各样的知识的记录,将其整理出来以文章的形式分享给大家,来进行共同学习。欢迎大家进行持续关注。 知识分享系列目前包含Java、Golang、Linux、Docker等等。
488 0
知识分享之Golang——在Goland中快速基于JSON或SQL创建struct
|
SQL Java Linux
知识分享之Golang——使用gorm时进行执行自定义SQL的几种方式
知识分享之Golang篇是我在日常使用Golang时学习到的各种各样的知识的记录,将其整理出来以文章的形式分享给大家,来进行共同学习。欢迎大家进行持续关注。 知识分享系列目前包含Java、Golang、Linux、Docker等等。
595 0
知识分享之Golang——使用gorm时进行执行自定义SQL的几种方式
|
SQL 网络协议 关系型数据库
Golang SQL连接池梳理 (一)
Golang SQL连接池梳理 (一)
355 0
|
SQL 网络协议 安全
Golang SQL连接池梳理 (三)
Golang SQL连接池梳理 (三)
268 0
|
SQL 监控 Java
Java Spring Boot 2.0实战MyBatis连接池阿里Druid与SQL性能监控
阿里开源数据库连接池组件Druid非常强大,,本次课程一起学习如何在最新的Java Spring Boot 2.0和MyBatis系统中集成阿里开源的连接池Druid,以及SQL性能监控,生产环境必备利器。
73316 0
|
SQL druid Java
【直播预告】:Java Spring Boot开发实战系列课程【第14讲】:Spring Boot 2.0实战MyBatis连接池阿里Druid与SQL性能监控
阿里开源数据库连接池组件Druid非常强大,,本次课程一起学习如何在最新的Java Spring Boot 2.0和MyBatis系统 中集成阿里开源的连接池Druid,以及SQL性能监控,生产环境必备利器。
66494 0