五、流程梳理#
5.1、先获取DB实例#
在golang中,要想获取连接,一般我们都得通过下面这段代码获取到DB的封装结构体实例。
通过上面的三个结构体可以看出 DB 、driverConn、Conn的关系如下:
所以我们的代码一般长成下面这样,先获取一个DB结构体的实例,DB结构体中有维护连接池、以及和创建连接,关闭连接协程通信的channel,已经各种配置参数。
上图中浅蓝色部分的 freeConn就是空闲连接池,里面的driver包下的Conn interface就是具体的连接。
/** * MySQL连接相关的逻辑 */ type Conenctor struct { BaseInfo BaseInfo DB *sql.DB } func (c *Conenctor) Open() { // 读取配置 c.loadConfig() dataSource := c.BaseInfo.RootUserName + ":" + c.BaseInfo.RootPassword + "@tcp(" + c.BaseInfo.Addr + ":" + c.BaseInfo.Port + ")/" + c.BaseInfo.DBName db, Err := sql.Open("mysql", dataSource) if Err != nil { common.Error("Fail to opendb dataSource:[%v] Err:[%v]", dataSource, Err.Error()) return } db.SetMaxOpenConns(500) db.SetMaxIdleConns(200) c.DB = db Err = db.Ping() if Err != nil { fmt.Printf("Fail to Ping DB Err :[%v]", Err.Error()) return } }
5.2、流程梳理入口:#
比如我们自己写代码时,可能会搞这样一个方法做增删改
// 插入、更新、删除 func (c *Conenctor) Exec(ctx context.Context, sqlText string, params ...interface{}) (qr *QueryResults) { qr = &QueryResults{} result, err := c.DB.ExecContext(ctx, sqlText, params...) defer HandleException() if err != nil { qr.EffectRow = 0 qr.Err = err common.Error("Fail to exec qurey sqlText:[%v] params:[%v] err:[%v]", sqlText, params, err) return } qr.EffectRow, _ = result.RowsAffected() qr.LastInsertId, _ = result.LastInsertId() return }
主要是使用DB.ExecContext()
执行SQL,获取返回值。
ctx是业务代码传入的上线文,通常是做超时限制使用。
其实这里并不是严格意义上的去执行sql,它其实是通过和MySQL-Server之间建立的连接将sql+params发往MySQL-Server去解析和执行。
进入DB.ExecContext()
主要逻辑如下:exec()
方法的主要功能是:获取连接,发送sql和参数。
- 如果获取一次失败一次,当失败的次数达到sql包预定义的常量maxBadConnRetries的情况下,将会创建新的连接使用
- 未超过maxBadConnRetries,被打上cachedOrNewConn,优先从空闲池中获取连接
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) { var res Result var err error for i := 0; i < maxBadConnRetries; i++ { res, err = db.exec(ctx, query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { return db.exec(ctx, query, args, alwaysNewConn) } return res, err }
跟进exec()
--> db.conn(ctx, strategy)
func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) { // 这个strategy就是上一步我们告诉他是创建新连接,还是优先从缓存池中获取连接。 dc, err := db.conn(ctx, strategy) .. }
5.3、获取连接#
跟进conn()
方法
conn方法的返回值是driverConn,也就是我们上面说的数据库连接,作用就是说,跟据传递进来的获取策略,获取数据库连接,如果正常就返回获取到的数据库连接,异常就返回错误err
这张图是conn获取连接的流程图,根据下面这段代码画出来的,注释有写在代码上
// conn returns a newly-opened or cached *driverConn. func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { db.mu.Lock() // 先监测db是否关闭了 if db.closed { db.mu.Unlock() // DB都关闭了,直接返回DBClosed错误,没必要再去获取连接。 return nil, errDBClosed } // 检查用户传递进来的Context是否过期了 select { default: // 如果用户那边使用了ctx.Done(),毫无疑问,会进入这个case中,返回Ctx错误 case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } // 连接被重用的时间,如果为0,表示 理论上这个连接永不过期,一直可以被使用 lifetime := db.maxLifetime // 看一下空闲连接池(他是个slice)是否是还有空闲的连接 numFree := len(db.freeConn) // 如果获取策略是优先从连接池中获取,并且连接池中确实存在空闲的连接,就从freeConn中取连接使用。 if strategy == cachedOrNewConn && numFree > 0 { // 假设空闲池还剩下五条连接:【1,2,3,4,5】 // 取出第一条 conn == 1 conn := db.freeConn[0] // 切片的拷贝,实现remove掉第一个连接的目的。 copy(db.freeConn, db.freeConn[1:]) // 如果db.freeConn[1:]会导致freeConn变小,所以这里是 db.freeConn = db.freeConn[:numFree-1] db.freeConn = db.freeConn[:numFree-1] // 这里获取的连接是driverConn,它其实是对真实连接,driver.Conn的封装。 // 在driver.Conn的基础上多一层封装可以实现在driver.Conn的基础上,加持上状态信息,如下 conn.inUse = true db.mu.Unlock() // 检查是否过期 if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } // Lock around reading lastErr to ensure the session resetter finished. // 加锁处理,确保这个conn未曾被标记为 lastErr状态。 // 一旦被标记为这个状态说明 ConnectionRestter协程在重置conn的状态时发生了错误。也就是这个连接其实已经坏掉了,不可使用。 conn.Lock() err := conn.lastErr conn.Unlock() // 如果检测到这种错误,driver.ErrBadConn 表示连接不可用,关闭连接,返回错误。 if err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil } // Out of free connections or we were asked not to use one. If we're not // allowed to open any more connections, make a request and wait. // db.maxOpen > 0 表示当前DB实例允许打开连接 // db.numOpen >= db.maxOpen表示当前DB能打开的连接数,已经大于它能打开的最大连接数,就构建一个request,然后等待获取连接 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { // Make the connRequest channel. It's buffered so that the // connectionOpener doesn't block while waiting for the req to be read. // 构建connRequest这个channel,缓存大小是1 // 用于告诉connectionOpener协程,需要打开一个新的连接。 req := make(chan connRequest, 1) /** nextRequestKeyLocked函数如下: func (db *DB) nextRequestKeyLocked() uint64 { next := db.nextRequest db.nextRequest++ return next } 主要作用就是将nextRequest+1, 至于这个nextRequest的作用我们前面也说过了,它相当于binlog中的next_trx下一个事物的事物id。 言外之意是这个nextRequest递增的(因为这段代码被加了lock)。 看如下的代码中,将这个自增后的nextRequest当返回值返回出去。 然后紧接着将它作为map的key 至于这个map嘛: 在本文一开始的位置,我们介绍了DB结构体有这样一个属性,连接请求的map, key是自增的int64类型的数, 用于唯一标示这个请求分配的 connRequests map[uint64]chan connRequest */ reqKey := db.nextRequestKeyLocked() // 将这个第n个请求对应channel缓存起来,开始等待有合适的机会分配给他连接 db.connRequests[reqKey] = req // 等待数增加,解锁 db.waitCount++ db.mu.Unlock() waitStart := time.Now() // Timeout the connection request with the context. // 进入下面的slice中 select { // 如果客户端传入的上下文超时了,进入这个case case <-ctx.Done(): // Remove the connection request and ensure no value has been sent // on it after removing. // 当上下文超时时,表示上层的客户端代码想断开,意味着在这个方法收到这个信号后需要退出了 // 这里将db的connRequests中的reqKey清除,防止还给他分配一个连接。 db.mu.Lock() delete(db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) // 这里也会尝试从req channel中获取一下有没有可用的连接 // 如果有的话执行 db.putConn(ret.conn, ret.err, false) ,目的是释放掉这个连接 select { default: case ret, ok := <-req: if ok && ret.conn != nil { // 看到这里只需要知道他是用来释放连接的就ok,继续往下看,稍后再杀回来 db.putConn(ret.conn, ret.err, false) } } //返回ctx异常。 return nil, ctx.Err() // 尝试从 reqchannel 中取出连接 case ret, ok := <-req: atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) // 处理错误 if !ok { return nil, errDBClosed } // 检测连接是否过期了,前面也提到过,DB实例有维护一个参数,maxLifeTime,0表示永不过期 if ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } // 健壮性检查 if ret.conn == nil { return nil, ret.err } // Lock around reading lastErr to ensure the session resetter finished. // 检查连接是否可用 ret.conn.Lock() err := ret.conn.lastErr ret.conn.Unlock() if err == driver.ErrBadConn { ret.conn.Close() return nil, driver.ErrBadConn } return ret.conn, ret.err } } // 代码能运行到这里说明上面的if条件没有被命中。 // 换句话说,来到这里说明具备如下条件 // 1:当前DB实例的空闲连接池中已经没有空闲连接了,获取明确指定,不从空闲池中获取连接,就想新建连接。 // 2: 当前DB实例允许打开连接 // 3: DB实例目前打开的连接数还没有到达它能打开的最大连接数的上限。 // 记录当前DB已经打开的连接数+1 db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() // 构建一个连接实例,并返回 dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil }
5.4、释放连接#
连接被是过后是需要被释放的
释放连接的逻辑封装在DB实例中
db.putConn(ret.conn, ret.err, false)
释放连接的流程图如下:
流程图根据如下的代码画出。
方法详细信息如下:
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { // 释放连接的操作加锁 db.mu.Lock() // debug的信息 if !dc.inUse { if debugGetPut { fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc]) } panic("sql: connection returned that was never out") } if debugGetPut { db.lastPut[dc] = stack() } // 标记driverConn处理不可用的状态 dc.inUse = false for _, fn := range dc.onPut { fn() } dc.onPut = nil // 本方法的入参中有参数err // 当会话获取出这个连接后,发现这个连接过期了、或者被标记上来lastErr时,再调用这个putConn方法时,同时会将这个错误传递进来,然后在这里判断,当出现坏掉的连接时就不直接把这个连接放回空闲连接池了。 if err == driver.ErrBadConn { // Don't reuse bad connections. // Since the conn is considered bad and is being discarded, treat it // as closed. Don't decrement the open count here, finalClose will // take care of that. // 这个方法的作用如下: // 他会去判断当前DB维护的map的容量,也就是前面提到的那种情况:当DB允许打开连接,但是现在的连接数已经达到当前DB允许打开的最大连接数上限了,那么针对接下来想要获取连接的请求的处理逻辑就是,构建一个req channel,放入connRequests这个map中,表示他们正在等待连接的建立。 // 换句话说,这时系统时繁忙的,业务处于高峰,那么问题来了,现在竟然出现了一个坏掉的连接,那为了把对业务线的影响降到最低,是不是得主动新建一个新的连接放到空闲连接池中呢? // db.maybeOpenNewConnections() 函数主要干的就是这个事。 // 方法详情如下 /* func (db *DB) maybeOpenNewConnections() { numRequests := len(db.connRequests) if db.maxOpen > 0 { numCanOpen := db.maxOpen - db.numOpen if numRequests > numCanOpen { numRequests = numCanOpen } } for numRequests > 0 { db.numOpen++ // optimistically numRequests-- if db.closed { return } // 它只是往这个 openerCh channel中写入一个空的结构体,会有专门的协程负责创建连接 db.openerCh <- struct{}{} } } */ db.maybeOpenNewConnections() // 解锁,关闭连接,返回 db.mu.Unlock() dc.Close() return } if putConnHook != nil { putConnHook(db, dc) } // 如果DB已经关闭了,标记 resetSession为 false if db.closed { // Connections do not need to be reset if they will be closed. // Prevents writing to resetterCh after the DB has closed. // 当DB都已经关了,意味着DB里面的连接池都没有了,那当然不需要关闭连接池中的连接了~ resetSession = false } // 如果DB没有关闭的话,进入if代码块 if resetSession { // 将dricerConn中的Conn验证转换为driver.SessionResetter if _, resetSession = dc.ci.(driver.SessionResetter); resetSession { // 在此处锁定driverConn,以便在连接重置之前不会释放。 // 必须在将连接放入池之前获取锁,以防止在重置之前将其取出 dc.Lock() } } // 真正将连接放回空闲连接池中 // 满足connRequest或将driverConn放入空闲池并返回true或false /* func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { // 检测如果DB都关闭块,直接返回flase if db.closed { return false } // 如果DB当前打开的连接数大于DB能打开的最大的连接数,返回false if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } //如果等待获取连接的map中有存货 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 // 取出map中的第一个key for reqKey, req = range db.connRequests { break } // 将这个key,value再map中删除 delete(db.connRequests, reqKey) // Remove from pending requests. // 重新标记这个连接是可用的状态 if err == nil { dc.inUse = true } // 将这个连接放入到 req channel中,给等待连接到会话使用 req <- connRequest{ conn: dc, err: err, } return true // 来到这个if,说明此时没有任何请求在等待获取连接,并且没有发生错误,DB也没有关闭 } else if err == nil && !db.closed { // 比较当前空闲连接池的大小(默认是2) 和 freeConn空闲连接数的数量 // 意思是,如果空闲的连接超出了这个规定的阈值,空闲连接是需要被收回的。 if db.maxIdleConnsLocked() > len(db.freeConn) { // 收回 db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } // 如果空闲连接还没到阈值,保留这个连接当作空闲连接 db.maxIdleClosed++ } // 收回空闲连接返回false return false } */ // 如果将连接成功放入了空闲连接池,或者将连接成功给了等待连接到会话使用,此处返回true // 收回空闲连接返回false // 代码详情就是在上面的这段注释中 added := db.putConnDBLocked(dc, nil) db.mu.Unlock() // 如果 if !added { // 如果DB没有关闭,进入if if resetSession { dc.Unlock() } dc.Close() return } // 重新校验,如果连接关闭了,进入if if !resetSession { return } // 如果负责重置 conn状态的线程阻塞住了,那么标记这个driverConn为lastErr select { default: // If the resetterCh is blocking then mark the connection // as bad and continue on. dc.lastErr = driver.ErrBadConn dc.Unlock() case db.resetterCh <- dc: } }
5.5、connectionOpener#
5.5.1、是什么?
这个connectionOpener是一个工作协程,它会去尝试消费指定的channel,负责创建数据库连接,其实在前面阅读获取连接的逻辑时,有这样的两种情况会阻塞等待
connectionOpener来新创建连接:
第一种:当获取连接的策略是优先从cache连接池中获取出来,但是空闲连接池已经没有空闲的连接了,首先这时DB允许打开连接,但是DB能打开的连接数已经达到了它能打开的连接数的上线,所以得等待有空闲连接出现,或者等有连接被释放后,DB能当前打开的连接数小于了它能打开的连接数的最大值,这时它会被阻塞等待去尝试创建连接。
第二种:获取连接的策略不再是优先从空闲缓冲池中获取连接,直接明了的想获取最一条新连接,同样的此时DB已经打开的连接数大于它能打开连接数的上线,它会被阻塞等待创建连接。
5.5.2、什么时候开启的?
func OpenDB(c driver.Connector) *DB { ctx, cancel := context.WithCancel(context.Background()) db := &DB{ connector: c, openerCh: make(chan struct{}, connectionRequestQueueSize), resetterCh: make(chan *driverConn, 50), lastPut: make(map[*driverConn]string), connRequests: make(map[uint64]chan connRequest), stop: cancel, } // 可以看到他是在DB被实例化时开启的。 go db.connectionOpener(ctx) go db.connectionResetter(ctx) return db }
5.5.3、代码详情
可以看到它一直尝试从db的openerCh中获取内容,而且只要获取到了内容,就会调用方法打开连接。
// Runs in a separate goroutine, opens new connections when requested. func (db *DB) connectionOpener(ctx context.Context) { for { select { case <-ctx.Done(): return // here case <-db.openerCh: db.openNewConnection(ctx) } } }
5.5.4、谁往openerCh中投放消息?
往channl中投放消息的逻辑在db的mayBeOpenNewConnections中
func (db *DB) maybeOpenNewConnections() { // 通过检查这个map的长度来决定是否往opennerCh中投放消息 numRequests := len(db.connRequests) if db.maxOpen > 0 { numCanOpen := db.maxOpen - db.numOpen if numRequests > numCanOpen { numRequests = numCanOpen } } for numRequests > 0 { db.numOpen++ // optimistically numRequests-- if db.closed { return } // 一旦执行了这一步,connectionOpener 就会监听到去创建连接。 db.openerCh <- struct{}{} } }
5.5.5、注意点:
在DB结构体中有这样一个属性
// 连接池的大小,0意味着使用默认的大小2, 小于0表示不使用连接池 maxIdle int // zero means defaultMaxIdleConns; negative means 0
表示空闲连接池默认的大小,如果它为0,表示都没有缓存池,也就意味着会为所有想获取连接的请求创建新的conn,这时也就不会有这个opnerCh,更不会有connectionOpener
5.6、connectionCleaner#
5.6.1、是什么?有啥用?
它同样以一条协程的形式存在,用于定时清理数据库连接池中过期的连接
func (db *DB) startCleanerLocked() { if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil { db.cleanerCh = make(chan struct{}, 1) go db.connectionCleaner(db.maxLifetime) } }
5.6.2、注意点
同样的,DB中存在一个参数:maxLifetime
它表示数据库连接最大的生命时长,如果将它设置为0,表示这个连接永不过期,既然所有的连接永不过期,就不会存在connectionCleaner去定时根据maxLifetime
来定时清理连接。
它的调用时机是:需要将连接放回到连接池时调用。
5.7、connectionRestter#
5.7.1、作用
我们使用获取的连接的封装结构体是driverConn,其实它是会driver包下的Conn连接的又一层封装,目的是增强
driver包下的Conn的,多出来了一些状态。当将使用完毕的连接放入连接池时,就得将这些状态清除掉。
使用谁去清除呢?就是这个go 协程:connectionRestter
当connectionRestter碰到错误时,会将这个conn标记为lastErr,连接使用者在使用连接时会先校验conn的诸多状态,比如出现lastErr,会返回给客户端 badConnErr