一、利用chan实现并发协程优雅的退出
1、使用一个专门的退出通道,接收退出的信号。 当启动了多个worker工作协程时,只要main()执行关闭退出通道,每一个worker都会都到信号,进而关闭。
func worker(stopCh <-chan struct{}) {
go func() {
defer fmt.Println("worker exit")
// Using stop channel explicit exit
for {
select {
case <-stopCh:
fmt.Println("Recv stop signal")
return
case <-t.C:
fmt.Println("Working .")
}
}
}()
return
}
//main中
close(stopCh)
2、for-range
是使用频率很高的结构,常用它来遍历数据,range
能够感知channel的关闭,当channel被发送数据的协程关闭时,range就会结束,接着退出for循环。它在并发中的使用场景是:当协程只从1个channel读取数据,然后进行处理,处理后协程退出。下面这个示例程序,当in通道被关闭时,协程可自动退出。
go func(in <-chan int) {
// Using for-range to exit goroutine
// range has the ability to detect the close/end of a channel
for x := range in {
fmt.Printf("Process %d\n", x)
}
}(inCh)
3、for-select
也是使用频率很高的结构,select提供了多路复用的能力,所以for-select可以让函数具有持续多路处理多个channel的能力。但select没有感知channel的关闭,这引出了2个问题:1)继续在关闭的通道上读,会读到通道传输数据类型的零值,2)继续在关闭的通道上写,将会panic。问题2可使用的原则是,通道只由发送方关闭,接收方不可关闭,即某个写通道只由使用该select的协程关闭,select中就不存在继续在关闭的通道上写数据的问题。
问题1可以使用,ok
来检测通道的关闭,使用情况有2种。
第一种:如果某个通道关闭后,需要退出协程,直接return即可。示例代码中,该协程需要从in通道读数据,还需要定时打印已经处理的数量,有2件事要做,所有不能使用for-range,需要使用for-select,当in关闭时,ok=false
,我们直接返回。
go func() {
// in for-select using ok to exit goroutine
for {
select {
case x, ok := <-in:
if !ok {
return
}
fmt.Printf("Process %d\n", x)
processedCnt++
case <-t.C:
fmt.Printf("Working, processedCnt = %d\n", processedCnt)
}
}
}()
第二种:如果某个通道关闭了,不再处理该通道,而是继续处理其他case,退出是等待所有的可读通道关闭。我们需要使用select的一个特征:select不会在nil的通道上进行等待。这种情况,把只读通道设置为nil即可解决。
go func() {
// in for-select using ok to exit goroutine
for {
select {
case x, ok := <-in1:
if !ok {
in1 = nil
}
// Process
case y, ok := <-in2:
if !ok {
in2 = nil
}
// Process
case <-t.C:
fmt.Printf("Working, processedCnt = %d\n", processedCnt)
}
// If both in channel are closed, goroutine exit
if in1 == nil && in2 == nil {
return
}
}
}()
最佳实践
- 发送协程主动关闭通道,接收协程不关闭通道。技巧:把接收方的通道入参声明为只读,如果接收协程关闭只读协程,编译时就会报错。
- 协程处理1个通道,并且是读时,协程优先使用for-range,因为range可以关闭通道的关闭自动退出协程。
- for-select中的x, ok := <-chan可以处理多个读通道关闭,可以关闭当前使用for-select的协程或循环中的特定分支。
- 主协程显式关闭通道stopCh可以处理主动通知多work协程同时退出的场景。
二、利用chan实现生产者消费者模式
通过定义一个channel 变量接收某种事件,然后通过工作 goroutine 消费执行这个 channel 中的事件。
produce_goroutine->event -> consumer_goroutine
| |
produce_goroutine->event -> channel -> consumer_goroutine
| |
produce_goroutine->event -> consumer_goroutine
生产者消费者的使用场景:
1、生成协程生产数据到管道中
2、消费协程在管道中取数据进行处理
通过捕捉特定信号对程序进行相关处理,当某个信号进行触发的时候,主协程将向各个协程发送退出指令,当数据管道处理完成时,若接收到退出指令 将结束协程的执行。
package main
import (
"fmt"
"git.code.oa.com/gongyi/gongyi_base/log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
var dataChan chan int
/*************************生产类*************************/
type Producer struct {
closedChan chan struct{}
wg sync.WaitGroup
}
func (producer *Producer)Produce() {
defer producer.wg.Done()
data := 1
for {
dataChan<-data
log.Infof("push data:%d succ", data)
data++
time.Sleep(time.Second * 1)
select {
// 若关闭了通道,直接退出
case <-producer.closedChan:
return
// 不可阻塞
default:
continue
}
}
}
func (producer *Producer) Stop() {
close(producer.closedChan)
producer.wg.Wait()
log.Infof("producer has stoped...")
}
/*************************消费类*************************/
type Consumer struct {
workNo int
closedChan chan struct{}
wg sync.WaitGroup
}
func (test* Consumer)Work() {
defer test.wg.Done()
for {
select {
case data := <-dataChan:
//process data....
continue
case <-test.closedChan:
log.Infof("worker %d exit...", test.workNo)
return
}
}
}
func (test* Consumer)Stop() {
close(test.closedChan)
test.wg.Wait()
log.Infof("%d has stoped...", test.workNo)
}
/*************************主逻辑*************************/
func main() {
dataChan = make(chan int, 86400)
// 创建生产协程,并启动
producer := Producer{
closedChan: make(chan struct{}),
}
producer.wg.Add(1)
go producer.Produce()
// 创建消费协程,并启动
consumerNumber := 7
var consumers []Consumer
for i := 0; i < consumerNumber; i++ {
workers = append(consumers, Consumer{
workNo: i,
closedChan: make(chan struct{}),
})
}
for i := 0; i < consumerNumber; i++ {
consumers[i].wg.Add(1)
go consumers[i].Work()
}
// 信号处理
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
select {
case sig := <-c:
fmt.Printf("Got %s signal. Aborting...\n", sig)
producer.Stop()
for i := 0; i < consumerNumber; i++ {
consumers[i].Stop()
}
}
}
三、利用chan实现任务的分发
设计一个分发器,当有数据进入分发器后,需要将数据分发到多个处理器处理,每个处理器可以想象为一个协程,处理器在没有数据的时候要阻塞。代码参考
分发器的角色介绍:
Worker: 工作处理者,真正处理业务逻辑的地方,负责接收并处理任务,同时它需要告诉“调度员”是否做好了 接收更多任务的准备。
**WorkerPool:**我们把它比喻为“工人队列”,具体对应GO里的buffered channel of channels。
work: 工作请求
WorkQueue:工作请求队列。chan可以是有缓冲的,进而实现任务队列workqueue。
**步骤一 :**定义工作任务请求(WorkRequest)数据结构。通过dispatcher把任务请求分发给worker。定义work channel的统一接收类型的数据结构。(WorkRequest结构体)。
package main
import "time"
type WorkRequest struct {
Name string
Delay time.Duration
}
步骤二 : Collector(工作收集者)接收客户的任务请求,将请求封装成worker能够理解的 WorkRequest,然后将其添加到全局任务队列的尾部。 Collector需要辨别哪些客户请求才是合理的。具体而言collector需要校验请求类型,必要数据 字段,数据边界等等。collector定义了工作队列workqueue,采用http的形式用于从多个客户处收集任务请求并缓存到workqueue。
package main
import (
"fmt"
"net/http"
"time"
)
// A buffered channel that we can send work requests on.
var WorkQueue = make(chan WorkRequest, 100)
func Collector(w http.ResponseWriter, r *http.Request) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Parse the delay.
delay, err := time.ParseDuration(r.FormValue("delay"))
if err != nil {
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
return
}
// Check to make sure the delay is anywhere from 1 to 10 seconds.
if delay.Seconds() < 1 || delay.Seconds() > 10 {
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
return
}
// Now, we retrieve the person's name from the request.
name := r.FormValue("name")
// Just do a quick bit of sanity checking to make sure the client actually provided us with a name.
if name == "" {
http.Error(w, "You must specify a name.", http.StatusBadRequest)
return
}
// Now, we take the delay, and the person's name, and make a WorkRequest out of them.
work := WorkRequest{Name: name, Delay: delay}
// Push the work onto the queue.
WorkQueue <- work
fmt.Println("Work request queued")
// And let the user know their work request was created.
w.WriteHeader(http.StatusCreated)
return
}
步骤三: 工作者(worker)需要一个专门的工作通道(Work chan WorkRequest)用于接收dispatcher分发给它的任务请求(WorkRequest)。worker的工作通道是一个不带缓冲的任务通道(WorkRequest channel),用来接收Work Request,之所以不带缓冲是因为一个worker一次只能处理一个任务,dispatcher负责将任务请求分发给空闲等待的worker。另外我们将赋予每个工人 一个ID,以便区分哪一个工人正在处理某项任务。 另外多个worker共享一个worker队列WorkerQueue(WorkerQueue chan chan WorkRequest ),用于将worker空闲时将自身的工作通道(chan WorkRequest)注册到WorkerQueue,从而worker能接收任务请求。并且每个worker共享一个退出通道(QuitChan),用于工作协程的集中优雅退出。
package main
import (
"fmt"
"time"
)
// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest,stopCh chan bool) Worker {
// Create, and return the worker.
worker := Worker{
ID: id,
Work: make(chan WorkRequest),
WorkerQueue: workerQueue,
QuitChan: stopCh}
return worker
}
type Worker struct {
ID int
Work chan WorkRequest
WorkerQueue chan chan WorkRequest
QuitChan chan bool
}
// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w *Worker) Start() {
go func() {
for {
// Add ourselves into the worker queue.
w.WorkerQueue <- w.Work
select {
case work := <-w.Work:
// Receive a work request.
fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds())
time.Sleep(work.Delay)
fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name)
case <-w.QuitChan:
// We have been asked to stop.
fmt.Printf("worker%d stopping\n", w.ID)
return
}
}
}()
}
// Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w *Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}
步骤四: dispatch任务分发的实现 ,dispatcher声明了全局的工作者队列(WorkerQueue),它用来接收每个空闲worker注册的工作通道(WorkRequest channel)。StartDispatcher方法里根据需要的worker数量,初始化worker队列。然后我们创建所需数量的 worker实例并“启动”它们。 最后一段代码,起了一个匿名协程,负责从全局任务队列(work queue)获取任务请求 (WorkRequest);然后在另一个匿名协程中,从全局工人队列(worker queue)“唤醒”一个 worker;最后将任务请求分发给worker。我们通过另外起一 个协程来“唤醒”worker并分配任务的原因在于,该操作(worker := <-WorkerQueue)是有可能阻塞的,将其异步化可以确保任务队列不会堆积。
package main
import "fmt"
var WorkerQueue chan chan WorkRequest
var stopCh chan bool
func StartDispatcher(nworkers int) {
// First, initialize the channel we are going to but the workers' work channels into.
WorkerQueue = make(chan chan WorkRequest, nworkers)
stopCh = make(chan bool)
// Now, create all of our workers.
for i := 0; i<nworkers; i++ {
fmt.Println("Starting worker", i+1)
worker := NewWorker(i+1, WorkerQueue,stopCh)
worker.Start()
}
go func() {
for {
select {
case work := <-WorkQueue:
fmt.Println("Received work requeust")
go func() {
worker := <-WorkerQueue
fmt.Println("Dispatching work request")
worker <- work
}()
}
}
}()
}
步骤五: 聚合组件处理请求。main函数是分发器的运行入口,允许客户指定worker的数量,以及HTTP server需要监听的地址。这两个命令行参数是可选的,我们提供了默认值。
package main
import (
"flag"
"fmt"
"net/http"
)
var (
NWorkers = flag.Int("n", 4, "The number of workers to start")
HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on")
)
func main() {
// Parse the command-line flags.
flag.Parse()
// Start the dispatcher.
fmt.Println("Starting the dispatcher")
StartDispatcher(*NWorkers)
// Register our collector as an HTTP handler function.
fmt.Println("Registering the collector")
http.HandleFunc("/work", Collector)
go func(){
// Start the HTTP server!
fmt.Println("HTTP server listening on", *HTTPAddr)
if err := http.ListenAndServe(*HTTPAddr, nil); err != nil {
fmt.Println(err.Error())
}
}()
// 信号处理
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
select {
case sig := <-c:
fmt.Printf("Got %s signal. Aborting...\n", sig)
close(stopCh)
}
}