文章目录
Go并发编程(三)协程池
本文参考如下博客实现了一个简易的协程池
为什么需要协程池
goroutine 太多仍会导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 goroutine 的数量、重用 goroutine
实现
实现的基本思路是采用生产者-消费者模型,用来执行任务的goroutine作为消费者,操作任务队列的goroutine是生产者,任务队列使用的是go中的buffer channel
数据结构定义
任务定义:
// 任务定义
type Task struct {
Handler func(v ...interface{}) // 任务处理函数
Params []interface{} // 处理函数参数列表
}
协程池定义:
// 任务池定义
type TaskPool struct {
Capacity int64 // 任务池容量
RunningGoroutine int64 // 运行中的goroutine数量
TaskQueue chan *Task // 任务队列
Status int64 // 任务池状态
sync.Mutex
PanicHandler func(interface{}) // goroutine异常处理机制
}
协程池状态常量定义
// 协程池状态
const(
RUNNING = iota
STOP
)
全局异常定义:
// 池容量非法异常
var ErrInvalidPoolCap = errors.New("task pool capacity invaild")
var ErrPoolAlreadyClosed = errors.New("pool is already go")
新增任务&执行任务
新增任务本质就是做goroutine数量检查,小于协程池容量则新启协程,超过就复用原有协程,协程的回收依赖于GC,任务是直接丢进管道,等待消费的goroutine执行
// 新增任务
func (p *TaskPool) Put(t *Task) error{
p.Lock()
defer p.Unlock()
if p.Status == STOP{
return ErrPoolAlreadyClosed
}
// 如果协程池未满则新启协程
if p.RunningGoroutine < p.Capacity{
// 协程池未满,则产生协程
p.run()
}
// 任务入队
p.TaskQueue <- t
return nil
}
执行任务其实就是监听channel消费具体的任务,这里采用的是带缓冲区的channel,所以消费生产是非阻塞的
// 从任务队列中取出任务执行
func (pool *TaskPool)run() {
// 新增运行中的goroutine
incRunning(pool)
go func() {
// 执行完成后运行中的goroutine--
defer func() {
decRunning(pool)
// goroutine panic
if r := recover();r != nil{
if pool.PanicHandler != nil{
pool.PanicHandler(r);
} else { // 默认处理
log.Printf("Worker panic: %s\n", r)
}
}
pool.checkRunningWork()
}()
// 具体goroutine执行策略
for{
select {
case task,ok := <- pool.TaskQueue:{
if !ok{
// 任务从管道消费失败
return
}
// 执行任务
task.Handler(task.Params)
}
}
}
}()
}
goroutine异常处理
如果某一个goroutine抛出panic就会导致整个程序崩溃退出,为了保证程序安全执行,需要对panic进行recover,进行异常处理,异常处理函数用户自定义
defer func() {
decRunning(pool)
// goroutine panic
if r := recover();r != nil{
if pool.PanicHandler != nil{
pool.PanicHandler(r);
} else { // 默认处理
log.Printf("Worker panic: %s\n", r)
}
}
pool.checkRunningWork()
}()
关闭协程池
关闭协程池需要做两个步骤:
- 关闭任务进入队列的入口
- 执行完任务队列中剩余的任务
// 安全关闭协程池
func (p *TaskPool) CloseTask() error{
p.Lock()
defer p.Unlock()
if p.Status == STOP{
return ErrPoolAlreadyClosed
}
atomic.CompareAndSwapInt64(&p.Status,RUNNING,STOP)
// 清空任务队列
for len(p.TaskQueue) > 0 { // 阻塞等待所有任务被 worker 消费
time.Sleep(1e6) // 防止等待任务清空 cpu 负载突然变大, 这里小睡一下
}
return nil
}
使用
func TestMyPool() {
pool,err := InitTaskPool(10)
if err != nil{
panic(err)
}
for i := 0;i < 20;i++{
time.Sleep(1e6)
pool.Put(&Task{Handler: func(v ...interface{}) {
fmt.Print("i = ",i," ")
},Params: []interface{}{i}})
fmt.Println("pool running goroutine size: ",pool.GetPoolRunningGSize())
}
}