Go并发编程(三)协程池

文章目录

Go并发编程(三)协程池

本文参考如下博客实现了一个简易的协程池

为什么需要协程池

goroutine 太多仍会导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 goroutine 的数量、重用 goroutine

实现

实现的基本思路是采用生产者-消费者模型,用来执行任务的goroutine作为消费者,操作任务队列的goroutine是生产者,任务队列使用的是go中的buffer channel

Go并发编程(三)协程池

数据结构定义

任务定义:

// 任务定义
type Task struct {
	Handler func(v ...interface{}) // 任务处理函数
	Params []interface{} 		   // 处理函数参数列表
}

协程池定义:

// 任务池定义
type TaskPool struct {
	Capacity int64 		   		   // 任务池容量
	RunningGoroutine int64 		   // 运行中的goroutine数量
	TaskQueue chan *Task   		   // 任务队列
	Status int64 		     	   // 任务池状态
	sync.Mutex
	PanicHandler func(interface{}) // goroutine异常处理机制
}

协程池状态常量定义

// 协程池状态
const(
	RUNNING = iota
	STOP
)

全局异常定义:

// 池容量非法异常
var ErrInvalidPoolCap = errors.New("task pool capacity invaild")
var ErrPoolAlreadyClosed = errors.New("pool is already go")

新增任务&执行任务

新增任务本质就是做goroutine数量检查,小于协程池容量则新启协程,超过就复用原有协程,协程的回收依赖于GC,任务是直接丢进管道,等待消费的goroutine执行

// 新增任务
func (p *TaskPool) Put(t *Task) error{
	p.Lock()
	defer p.Unlock()

	if p.Status == STOP{
		return ErrPoolAlreadyClosed
	}

	// 如果协程池未满则新启协程
	if p.RunningGoroutine < p.Capacity{
		// 协程池未满,则产生协程
		p.run()
	}
	// 任务入队
	p.TaskQueue <- t

	return nil
}

执行任务其实就是监听channel消费具体的任务,这里采用的是带缓冲区的channel,所以消费生产是非阻塞的

// 从任务队列中取出任务执行
func (pool *TaskPool)run()  {
	// 新增运行中的goroutine
	incRunning(pool)
	go func() {
		// 执行完成后运行中的goroutine--
		defer func() {
			decRunning(pool)
			// goroutine panic
			if r := recover();r != nil{
				if pool.PanicHandler != nil{
					pool.PanicHandler(r);
				} else { // 默认处理
					log.Printf("Worker panic: %s\n", r)
				}
			}
			pool.checkRunningWork()
		}()
		// 具体goroutine执行策略
		for{
			select {
			case task,ok := <- pool.TaskQueue:{
				if !ok{
					// 任务从管道消费失败
					return
				}
				// 执行任务
				task.Handler(task.Params)
			}
			}
		}
	}()
}

goroutine异常处理

如果某一个goroutine抛出panic就会导致整个程序崩溃退出,为了保证程序安全执行,需要对panic进行recover,进行异常处理,异常处理函数用户自定义

		defer func() {
			decRunning(pool)
			// goroutine panic
			if r := recover();r != nil{
				if pool.PanicHandler != nil{
					pool.PanicHandler(r);
				} else { // 默认处理
					log.Printf("Worker panic: %s\n", r)
				}
			}
			pool.checkRunningWork()
		}()

关闭协程池

关闭协程池需要做两个步骤:

  1. 关闭任务进入队列的入口
  2. 执行完任务队列中剩余的任务
// 安全关闭协程池
func (p *TaskPool) CloseTask() error{
	p.Lock()
	defer p.Unlock()

	if p.Status == STOP{
		return ErrPoolAlreadyClosed
	}

	atomic.CompareAndSwapInt64(&p.Status,RUNNING,STOP)

	// 清空任务队列
	for len(p.TaskQueue) > 0 { // 阻塞等待所有任务被 worker 消费
		time.Sleep(1e6) // 防止等待任务清空 cpu 负载突然变大, 这里小睡一下
	}

	return nil
}

使用

func TestMyPool()  {
	pool,err := InitTaskPool(10)
	if err != nil{
		panic(err)
	}
	for i := 0;i < 20;i++{
		time.Sleep(1e6)
		pool.Put(&Task{Handler: func(v ...interface{}) {
			fmt.Print("i = ",i," ")
		},Params: []interface{}{i}})
		fmt.Println("pool running goroutine size: ",pool.GetPoolRunningGSize())
	}

}

Go并发编程(三)协程池

上一篇:Panic(TEE Internal API)


下一篇:Go 练习- 文件读写