1. 概念
本质上是生产者、消费者模型
可以有效的控制goroutine数量,防止暴涨
案例:
生成一个随机数,计算该随机数每一个数字相加的和,例如:123:1+2+3=6
主协程负责生产数据发送到待处理通道中去(发送的不仅仅是数据,而是包含数据的结构体指针)
一个子协程负责监听结果通道,一旦有数据,取出来打印
(取出来的不仅仅是结果数据,而是包含结果数据的结构体指针,而该结果结构体指针内部又嵌入了原生产数据的结构体指针)
还有一个协程池负责相对来说耗时的数据处理操作,从生产通道中读取数据,处理完成后,构建结果结构体指针发送到结果通道中去
package main import ( "fmt" "math/rand" "sync" ) var wg sync.WaitGroup type Job struct { Id int RandNum int } type Result struct { job *Job sum int } func main() { // chan中是要存数据的,整型、字符串、布尔、结构体、只够提指针 jobChan := make(chan *Job, 128) resultChan := make(chan *Result, 128) // 开启一个goroutine从resultChan中取数据打印(消费数据) /* 一直处理使用此方法 go func(result <-chan *Result) { for ret := range result { fmt.Printf("id:%d, randnum:%d, result:%d\n", ret.job.Id, ret.job.RandNum, ret.sum) } wg.Done() }(resultChan) */ // wg.Add(1) go func(result <-chan *Result) { for i := 0; i <10; i++ { ret := <-result fmt.Printf("id: %d, randNum: %d, sum: %d\n", ret.job.Id, ret.job.RandNum, ret.sum) } close(resultChan) wg.Done() }(resultChan) // 开启goroutine池去从jobChan中取数据,处理完后在发送到resultChan中 createPool(64, jobChan, resultChan) // 主goroutine负责生产随机数并发往通道(生产数据) var i int for i < 10 { i++ randNum := rand.Int() job := &Job{ Id: i, RandNum: randNum, } // 结构体发送到通道也是拷贝,如果不想拷贝,可以发送结构体指针到通道中 jobChan <- job } close(jobChan) wg.Wait() } // 创建工作池,参数num:开启几个协程 func createPool(num int, jobChan <-chan *Job, resultChan chan<- *Result) { for i := 0; i < num; i++ { go func(jobChan <-chan *Job, resultChan chan<- *Result) { for job := range jobChan { r_num := job.RandNum // 接收随机值 var sum int // 存取结果值 for r_num != 0 { tmp := r_num % 10 sum += tmp r_num /= 10 } r := &Result{ job: job, sum: sum, } resultChan <- r // 发送到结果通道 } }(jobChan, resultChan) } }