提供一个 go routine 池,每个 go routine 循环阻塞等待从任务池中执行任务;外界使用者不断的往任务池里丢任务,则 go routine 池中的多个 go routine 会并发的处理这些任务
一、worker/workPool.go
import "sync" type Worker interface { Task() } type Pool struct { wg sync.WaitGroup // 工作池 taskPool chan Worker } func New(maxGoroutineNum int) *Pool { // 1. 初始化一个 Pool p := Pool{ taskPool: make(chan Worker), } p.wg.Add(maxGoroutineNum) // 2. 创建 maxGoroutineNum 个 goroutine,并发的从 taskPool 中获取任务 for i := 0; i < maxGoroutineNum; i++ { go func() { for task := range p.taskPool { // 阻塞获取,一旦没有任务,阻塞在这里 - 无缓冲 channel // 3. 执行任务 task.Task() } p.wg.Done() }() } return &p } // 提交任务到worker池中 func (p *Pool) Run(worker Worker) { p.taskPool <- worker } func (p *Pool) Shutdown() { // 关闭通道 close(p.taskPool) p.wg.Wait() }
二、namePrinter/namePrinter.go
import ( "fmt" "time" ) type NamePrinter struct { Name string } func (np *NamePrinter) Task() { fmt.Println(np.Name) time.Sleep(time.Second) }
三、main.go
import ( "github.com/zhaojigang/worker/worker" "sync" "github.com/zhaojigang/worker/namePrinter" ) var names = []string{ "steve", "bob", } func main() { // 1. 启动两个 goroutine,等待执行任务 p := worker.New(2) var wg sync.WaitGroup wg.Add(3 * len(names)) // 2. 创建 worker,扔到 goroutine 池中 for i := 0; i < 3; i++ { for _, namex := range names { worker := namePrinter.NamePrinter{ Name:namex, } go func() { p.Run(&worker) wg.Done() }() } } // 3. 等待添加任务完毕 wg.Wait() // 4. 关闭 goroutine 池 p.Shutdown() }