具体实现代码
package work
import "sync"
// 协程工作池接口,使用工作池的work必须实现该接口
type Worker interface {
Task()
}
// 工作池结构,work为无缓存管道,wg控制纤程执行完成
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
// 创建一个指定纤程的worker池
func New(maxGoroutines int) *Pool {
p := &Pool{
work: make(chan Worker), // 初始化无缓存通道,用来存储work
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
for w := range p.work { // 使用range从通道中取work,当通道中没有值时堵塞,通道关闭时循环结束
w.Task()
}
p.wg.Done()
}()
}
return p
}
// 运行一个worker
func (p *Pool) Run(w Worker) {
p.work <- w // 向通道中写入worker,如有空闲纤程将运行,如暂无空闲纤程将堵塞
}
// 关闭worker池
func (p *Pool) Shutdown() {
close(p.work) // 关闭通道,各纤程执行完当前任务各自退出
p.wg.Wait() // 等待所有纤程运行完毕
}