提供一个资源池,类似于数据库连接池的功能;资源池在 go 1.11.1 中有官方实现:sync/pool.go
一、资源池
import "log" package pool import ( "sync" "io" "errors" "log" ) // 声明池类结构体 type Pool struct { // 锁 lock sync.Mutex // 池中存储的资源 resources chan io.Closer // 资源创建工厂函数 factory func() (io.Closer, error) // 池是否已经被关闭 closed bool } // 创建池类实例的工厂函数 // 工厂函数名通常使用 New 名字 func New(fn func() (io.Closer, error), size int) (*Pool, error) { if size <= 0 { return nil, errors.New("size too small"); } return &Pool{ resources: make(chan io.Closer, size), factory: fn, }, nil } // 从池中获取一个资源 func (p *Pool) Acquire() (io.Closer, error) { // select - default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default // 当然,如果没有 default,那么还是要阻塞在 <-p.resources 上的 select { // 检查是否有空闲的资源 case r, ok := <-p.resources: log.Println("Acquire:", "Shared Resource") if !ok { return nil, errors.New("pool already closed") } return r, nil default: log.Println("Acquire:", "New Resource") // 调用资源创建函数创建资源 return p.factory() } } // 将一个使用后的资源放回池里 func (p *Pool) Release(r io.Closer) { // 注意:Release 和 Close 使用的是同一把锁,就是说二者同时只能执行一个,防止资源池已经关闭了,release 还向资源池放资源 // 向一个已经关闭的 channel 发送消息,会发生 panic: send on closed channel p.lock.Lock() defer p.lock.Unlock() // 如果池已经被关闭,销毁这个资源 if p.closed { r.Close() return } select { // 试图将这个资源放入队列 case p.resources <- r: log.Println("Release:", "In Queue") default: log.Println("Release:", "Closing") r.Close() } } // 关闭资源池,并关闭所有现有的资源 func (p *Pool) Close() { p.lock.Lock() defer p.lock.Unlock() if p.closed { return } p.closed = true // 在清空通道里的资源之前,将通道关闭 close(p.resources) // 关闭资源 for r := range p.resources { r.Close() } }
select – default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default;当然,如果没有 default,那么还是要阻塞在 <-p.resources
二、具体的资源类
import ( "io" "log" "sync/atomic" ) package db import ( "log" "io" "sync/atomic" ) // 给每个连接分配一个独一无二的id var idCounter int32 // 资源 - 数据库连接 type DBConnection struct { ID int32 } // dbConnection 实现了 io.Closer 接口 // 关闭资源 func (conn *DBConnection) Close() error { log.Println("conn closed") return nil } // 创建一个资源 - dbConnection func CreateConn() (io.Closer, error) { id := atomic.AddInt32(&idCounter, 1) log.Println("Create conn, id:", id) return &DBConnection{ ID: id, }, nil }
三、使用资源池
package main
import (
"sync"
"github.com/zhaojigang/pool/pool"
"github.com/zhaojigang/pool/db"
"log"
"time"
"math/rand"
)
const (
maxGoroutines = 5 // 要使用的goroutine的数量
pooledResources = 2 // 池中的资源的数量
)
func performQuery(query int, p *pool.Pool) {
// 1. 获取连接
conn, err := p.Acquire()
if err != nil {
log.Println("acquire conn error, ", err)
return
}
// 使用结束后,释放链接
defer p.Release(conn)
// 该 log 模拟对连接的使用
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*db.DBConnection).ID)
}
func main() {
var waitGroup sync.WaitGroup
waitGroup.Add(maxGoroutines)
// 1. 创建一个 Pool
p, err := pool.New(db.CreateConn, pooledResources)
if err != nil {
log.Println("create Pool error")
}
// 2. 开启 goroutine 执行任务
for query := 0; query < maxGoroutines; query++ {
// 每个goroutine需要自己复制一份要、查询值的副本,
// 不然所有的查询会共享同一个查询变量,即所有的 goroutine 最后的 query 值都是3
go func(q int) {
performQuery(q, p)
waitGroup.Done()
}(query)
//time.Sleep(1000*time.Millisecond) // 用于测试从 resources channel 中获取资源
}
// 3. 关闭连接池
waitGroup.Wait()
p.Close()
log.Println("pool closed - main")
}
在高并发的创建 go routine 的情况下,从 pool.go # Acquire 方法中可以看到,大家可能都还没有 Release 资源,此时都会创建资源,资源在一瞬间会大量增加,在实际系统中,需要根据需求,做一些措施,例如:提前创建好资源放入池中,go routine 都从池中取资源,资源不够就等待,使用完之后就放入池中,防止资源意外关闭,还可以启用后台线程监控等。
近期评论