针对这个资源池管理的一步步都实现了,而且做了详细的讲解,下面就看下整个示例代码,方便理解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
package commonimport (
"errors"
"io"
"sync"
"log"
)
/
/
一个安全的资源池,被管理的资源必须都实现io.Close接口
type
Pool struct {
m sync.Mutex
res chan io.Closer
factory func() (io.Closer, error)
closed
bool
}var ErrPoolClosed
=
errors.New(
"资源池已经被关闭。"
)
/
/
创建一个资源池func New(fn func() (io.Closer, error), size uint) (
*
Pool, error) {
if
size <
=
0
{
return
nil, errors.New(
"size的值太小了。"
)
}
return
&Pool{
factory: fn,
res: make(chan io.Closer, size),
}, nil}
/
/
从资源池里获取一个资源func (p
*
Pool) Acquire() (io.Closer,error) {
select {
case r,ok :
=
<
-
p.res:
log.Println(
"Acquire:共享资源"
)
if
!ok {
return
nil,ErrPoolClosed
}
return
r,nil
default:
log.Println(
"Acquire:新生成资源"
)
return
p.factory()
}}
/
/
关闭资源池,释放资源func (p
*
Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if
p.closed {
return
}
p.closed
=
true
/
/
关闭通道,不让写入了
close(p.res)
/
/
关闭通道里的资源
for
r:
=
range
p.res {
r.Close()
}}func (p
*
Pool) Release(r io.Closer){
/
/
保证该操作和Close方法的操作是安全的
p.m.Lock()
defer p.m.Unlock()
/
/
资源池都关闭了,就省这一个没有释放的资源了,释放即可
if
p.closed {
r.Close()
return
}
select {
case p.res <
-
r:
log.Println(
"资源释放到池子里了"
)
default:
log.Println(
"资源池满了,释放这个资源吧"
)
r.Close()
}
}
|
好了,资源池管理写好了,也知道资源池是如何实现的啦,现在我们看看如何使用这个资源池,模拟一个数据库连接池吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package mainimport (
"flysnow.org/hello/common"
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)const (
/
/
模拟的最大goroutine
maxGoroutine
=
5
/
/
资源池的大小
poolRes
=
2
)func main() {
/
/
等待任务完成
var wg sync.WaitGroup
wg.Add(maxGoroutine)
p, err :
=
common.New(createConnection, poolRes)
if
err !
=
nil {
log.Println(err)
return
}
/
/
模拟好几个goroutine同时使用资源池查询数据
for
query :
=
0
; query < maxGoroutine; query
+
+
{
go func(q
int
) {
dbQuery(q, p)
wg.Done()
}(query)
}
wg.Wait()
log.Println(
"开始关闭资源池"
)
p.Close()}
/
/
模拟数据库查询func dbQuery(query
int
, pool
*
common.Pool) {
conn, err :
=
pool.Acquire()
if
err !
=
nil {
log.Println(err)
return
}
defer pool.Release(conn)
/
/
模拟查询
time.Sleep(time.Duration(rand.Intn(
1000
))
*
time.Millisecond)
log.Printf(
"第%d个查询,使用的是ID为%d的数据库连接"
, query, conn.(
*
dbConnection).
ID
)}
/
/
数据库连接
type
dbConnection struct {
ID
int32
/
/
连接的标志}
/
/
实现io.Closer接口func (db
*
dbConnection) Close() error {
log.Println(
"关闭连接"
, db.
ID
)
return
nil}var idCounter int32
/
/
生成数据库连接的方法,以供资源池使用func createConnection() (io.Closer, error) {
/
/
并发安全,给数据库连接生成唯一标志
id
:
=
atomic.AddInt32(&idCounter,
1
)
return
&dbConnection{
id
}, nil
}
|
这时我们测试使用资源池的例子,首先定义了一个结构体dbConnection,它只有一个字段,用来做唯一标记。然后dbConnection实现了io.Closer接口,这样才可以使用我们的资源池。
createConnection函数对应的是资源池中的factory字段,用来创建数据库连接dbConnection的,同时为其赋予了一个为止的标志。
接着我们就同时开了 5 个goroutine,模拟并发的数据库查询dbQuery,查询方法里,先从资源池获取可用的数据库连接,用完后再释放。
这里我们会创建 5 个数据库连接,但是我们设置的资源池大小只有 2 ,所以再释放了 2 个连接后,后面的 3 个连接会因为资源池满了而释放不了,一会我们看下输出的打印信息就可以看到。
最后这个资源连接池使用完之后,我们要关闭资源池,使用资源池的Close方法即可。
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 第2个查询,使用的是ID为4的数据库连接
2017/04/17 22:25:08 资源释放到池子里了
2017/04/17 22:25:08 第4个查询,使用的是ID为1的数据库连接
2017/04/17 22:25:08 资源释放到池子里了
2017/04/17 22:25:08 第3个查询,使用的是ID为5的数据库连接
2017/04/17 22:25:08 资源池满了,释放这个资源吧
2017/04/17 22:25:08 关闭连接 5
2017/04/17 22:25:09 第1个查询,使用的是ID为3的数据库连接
2017/04/17 22:25:09 资源池满了,释放这个资源吧
2017/04/17 22:25:09 关闭连接 3
2017/04/17 22:25:09 第0个查询,使用的是ID为2的数据库连接
2017/04/17 22:25:09 资源池满了,释放这个资源吧
2017/04/17 22:25:09 关闭连接 2
2017/04/17 22:25:09 开始关闭资源池
2017/04/17 22:25:09 关闭连接 4
2017/04/17 22:25:09 关闭连接 1
到这里,我们已经完成了一个资源池的管理,并且进行了使用测试。
资源对象池的使用比较频繁,因为我们想把一些对象缓存起来,以便使用,这样就会比较高效,而且不会经常调用GC,为此Go为我们提供了原生的资源池管理,防止我们重复造轮子,这就是sync.Pool,我们看下刚刚我们的例子,如果用sync.Pool实现。
package mainimport (
"log"
"math/rand"
"sync"
"sync/atomic"
"time")const (
//模拟的最大goroutine
maxGoroutine = 5)func main() {
//等待任务完成
var wg sync.WaitGroup
wg.Add(maxGoroutine)
p:=&sync.Pool{
New:createConnection,
}
//模拟好几个goroutine同时使用资源池查询数据
for query := 0; query < maxGoroutine; query++ {
go func(q int) {
dbQuery(q, p)
wg.Done()
}(query)
}
wg.Wait()}//模拟数据库查询
func dbQuery(query int, pool *sync.Pool) {
conn:=pool.Get().(*dbConnection)
defer pool.Put(conn)
//模拟查询
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.ID)}//数据库连接
type dbConnection struct {
ID int32//连接的标志}//实现io.Closer接口
func (db *dbConnection) Close() error {
log.Println("关闭连接", db.ID)
return nil}var idCounter int32//生成数据库连接的方法,以供资源池使用
func createConnection() interface{} {
//并发安全,给数据库连接生成唯一标志
id := atomic.AddInt32(&idCounter, 1)
return &dbConnection{ID:id}
}
进行微小的改变即可,因为系统库没有提供New这类的工厂函数,所以我们使用字面量创建了一个sync.Pool,注意里面的New字段,这是一个返回任意对象的方法,类似我们自己实现的资源池中的factory字段,意思都是一样的,都是当没有可用资源的时候,生成一个。
这里我们留意到系统的资源池是没有大小限制的,也就是说默认情况下是无上限的,受内存大小限制。
资源的获取和释放对应的方法是Get和Put,也很简洁,返回任意对象interface{}。
2017/04/17 22:42:43 第0个查询,使用的是ID为2的数据库连接
2017/04/17 22:42:43 第2个查询,使用的是ID为5的数据库连接
2017/04/17 22:42:43 第4个查询,使用的是ID为1的数据库连接
2017/04/17 22:42:44 第3个查询,使用的是ID为4的数据库连接
2017/04/17 22:42:44 第1个查询,使用的是ID为3的数据库连接
关于系统的资源池,我们需要注意的是它缓存的对象都是临时的,也就说下一次GC的时候,这些存放的对象都会被清除掉。