Golang chan的任务分发和优雅退出

一、利用chan实现并发协程优雅的退出

1、使用一个专门的退出通道,接收退出的信号。 当启动了多个worker工作协程时,只要main()执行关闭退出通道,每一个worker都会都到信号,进而关闭。

func worker(stopCh <-chan struct{}) {
    go func() {
        defer fmt.Println("worker exit")
        // Using stop channel explicit exit
        for {
            select {
            case <-stopCh:
                fmt.Println("Recv stop signal")
                return
            case <-t.C:
                fmt.Println("Working .")
            }
        }
    }()
    return
}
//main中
close(stopCh)

2、for-range是使用频率很高的结构,常用它来遍历数据,range能够感知channel的关闭,当channel被发送数据的协程关闭时,range就会结束,接着退出for循环。它在并发中的使用场景是:当协程只从1个channel读取数据,然后进行处理,处理后协程退出。下面这个示例程序,当in通道被关闭时,协程可自动退出。

go func(in <-chan int) {
    // Using for-range to exit goroutine
    // range has the ability to detect the close/end of a channel
    for x := range in {
        fmt.Printf("Process %d\n", x)
    }
}(inCh)

3、for-select也是使用频率很高的结构,select提供了多路复用的能力,所以for-select可以让函数具有持续多路处理多个channel的能力。但select没有感知channel的关闭,这引出了2个问题:1)继续在关闭的通道上读,会读到通道传输数据类型的零值,2)继续在关闭的通道上写,将会panic。问题2可使用的原则是,通道只由发送方关闭,接收方不可关闭,即某个写通道只由使用该select的协程关闭,select中就不存在继续在关闭的通道上写数据的问题。

问题1可以使用,ok来检测通道的关闭,使用情况有2种。
第一种:如果某个通道关闭后,需要退出协程,直接return即可。示例代码中,该协程需要从in通道读数据,还需要定时打印已经处理的数量,有2件事要做,所有不能使用for-range,需要使用for-select,当in关闭时,ok=false,我们直接返回。

go func() {
    // in for-select using ok to exit goroutine
    for {
        select {
        case x, ok := <-in:
            if !ok {
                return
            }
            fmt.Printf("Process %d\n", x)
            processedCnt++
        case <-t.C:
            fmt.Printf("Working, processedCnt = %d\n", processedCnt)
        }
    }
}()

第二种:如果某个通道关闭了,不再处理该通道,而是继续处理其他case,退出是等待所有的可读通道关闭。我们需要使用select的一个特征:select不会在nil的通道上进行等待。这种情况,把只读通道设置为nil即可解决。

go func() {
    // in for-select using ok to exit goroutine
    for {
        select {
        case x, ok := <-in1:
            if !ok {
                in1 = nil
            }
            // Process
        case y, ok := <-in2:
            if !ok {
                in2 = nil
            }
            // Process
        case <-t.C:
            fmt.Printf("Working, processedCnt = %d\n", processedCnt)
        }

        // If both in channel are closed, goroutine exit
        if in1 == nil && in2 == nil {
            return
        }
    }
}()
最佳实践
  • 发送协程主动关闭通道,接收协程不关闭通道。技巧:把接收方的通道入参声明为只读,如果接收协程关闭只读协程,编译时就会报错。
  • 协程处理1个通道,并且是读时,协程优先使用for-range,因为range可以关闭通道的关闭自动退出协程。
  • for-select中的x, ok := <-chan可以处理多个读通道关闭,可以关闭当前使用for-select的协程或循环中的特定分支。
  • 主协程显式关闭通道stopCh可以处理主动通知多work协程同时退出的场景。

二、利用chan实现生产者消费者模式

通过定义一个channel 变量接收某种事件,然后通过工作 goroutine 消费执行这个 channel 中的事件。

produce_goroutine->event ->						consumer_goroutine
       					 	 |				|
produce_goroutine->event ->		channel -> 		consumer_goroutine
        					 |				|
produce_goroutine->event ->						consumer_goroutine

生产者消费者的使用场景:
1、生成协程生产数据到管道中
2、消费协程在管道中取数据进行处理

通过捕捉特定信号对程序进行相关处理,当某个信号进行触发的时候,主协程将向各个协程发送退出指令,当数据管道处理完成时,若接收到退出指令 将结束协程的执行。

package main
import (
	"fmt"
	"git.code.oa.com/gongyi/gongyi_base/log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)
var dataChan chan int
/*************************生产类*************************/
type Producer struct {
	closedChan chan struct{}
	wg sync.WaitGroup
}
func (producer *Producer)Produce() {
	defer producer.wg.Done()
	data := 1
	for { 
		dataChan<-data
		log.Infof("push data:%d succ", data)
		data++
		time.Sleep(time.Second * 1)
		select {
			// 	若关闭了通道,直接退出
			case <-producer.closedChan:
				return
			// 不可阻塞
			default:
				continue
		}
	}
}
func (producer *Producer) Stop() {
	close(producer.closedChan)
	producer.wg.Wait()
	log.Infof("producer has stoped...")
}
/*************************消费类*************************/
type Consumer struct {
	workNo int
	closedChan chan struct{}
	wg sync.WaitGroup
}
func (test* Consumer)Work() {
	defer test.wg.Done()
	for {
		select {
			case data := <-dataChan:
				//process data....
				continue
			case <-test.closedChan:
				log.Infof("worker %d exit...", test.workNo)
				return
		}
	}
}
func (test* Consumer)Stop() {
	close(test.closedChan)
	test.wg.Wait()
	log.Infof("%d has stoped...", test.workNo)
}
/*************************主逻辑*************************/
func main() {
	dataChan = make(chan int, 86400)
	// 创建生产协程,并启动
	producer := Producer{
		closedChan: make(chan struct{}),
	}
	producer.wg.Add(1)
	go producer.Produce()
	// 创建消费协程,并启动
	consumerNumber := 7
	var consumers []Consumer
	for i := 0; i < consumerNumber; i++ {
		workers = append(consumers, Consumer{
			workNo:    i,
			closedChan: make(chan struct{}),
		})
	}
	for i := 0; i < consumerNumber; i++ {
		consumers[i].wg.Add(1)
		go consumers[i].Work()
	}
	// 信号处理
	c := make(chan os.Signal)
	signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
	select {
	case sig := <-c:
		fmt.Printf("Got %s signal. Aborting...\n", sig)
		producer.Stop()
		for i := 0; i < consumerNumber; i++ {
			consumers[i].Stop()
		}
	}
}

三、利用chan实现任务的分发

Golang chan的任务分发和优雅退出

设计一个分发器,当有数据进入分发器后,需要将数据分发到多个处理器处理,每个处理器可以想象为一个协程,处理器在没有数据的时候要阻塞。代码参考

分发器的角色介绍:

Worker: 工作处理者,真正处理业务逻辑的地方,负责接收并处理任务,同时它需要告诉“调度员”是否做好了 接收更多任务的准备。

**WorkerPool:**我们把它比喻为“工人队列”,具体对应GO里的buffered channel of channels。

work: 工作请求

WorkQueue:工作请求队列。chan可以是有缓冲的,进而实现任务队列workqueue。

**步骤一 :**定义工作任务请求(WorkRequest)数据结构。通过dispatcher把任务请求分发给worker。定义work channel的统一接收类型的数据结构。(WorkRequest结构体)。

package main
import "time"
type WorkRequest struct {
  Name  string
  Delay time.Duration
}

步骤二 : Collector(工作收集者)接收客户的任务请求,将请求封装成worker能够理解的 WorkRequest,然后将其添加到全局任务队列的尾部。 Collector需要辨别哪些客户请求才是合理的。具体而言collector需要校验请求类型,必要数据 字段,数据边界等等。collector定义了工作队列workqueue,采用http的形式用于从多个客户处收集任务请求并缓存到workqueue。

package main

import (
  "fmt"
  "net/http"
  "time"
)

// A buffered channel that we can send work requests on.
var WorkQueue = make(chan WorkRequest, 100)

func Collector(w http.ResponseWriter, r *http.Request) {
  // Make sure we can only be called with an HTTP POST request.
  if r.Method != "POST" {
    w.Header().Set("Allow", "POST")
    w.WriteHeader(http.StatusMethodNotAllowed)
    return
  }

  // Parse the delay.
  delay, err := time.ParseDuration(r.FormValue("delay"))
  if err != nil {
    http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
    return
  }
  
  // Check to make sure the delay is anywhere from 1 to 10 seconds.
  if delay.Seconds() < 1 || delay.Seconds() > 10 {
    http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
    return
  }
  
  // Now, we retrieve the person's name from the request.
  name := r.FormValue("name")
  
  // Just do a quick bit of sanity checking to make sure the client actually provided us with a name.
  if name == "" {
    http.Error(w, "You must specify a name.", http.StatusBadRequest)
    return
  }
  
  // Now, we take the delay, and the person's name, and make a WorkRequest out of them.
  work := WorkRequest{Name: name, Delay: delay}
  
  // Push the work onto the queue.
  WorkQueue <- work
  fmt.Println("Work request queued")
  
  // And let the user know their work request was created.
  w.WriteHeader(http.StatusCreated)
  return
}

步骤三: 工作者(worker)需要一个专门的工作通道(Work chan WorkRequest)用于接收dispatcher分发给它的任务请求(WorkRequest)。worker的工作通道是一个不带缓冲的任务通道(WorkRequest channel),用来接收Work Request,之所以不带缓冲是因为一个worker一次只能处理一个任务,dispatcher负责将任务请求分发给空闲等待的worker。另外我们将赋予每个工人 一个ID,以便区分哪一个工人正在处理某项任务。 另外多个worker共享一个worker队列WorkerQueue(WorkerQueue chan chan WorkRequest ),用于将worker空闲时将自身的工作通道(chan WorkRequest)注册到WorkerQueue,从而worker能接收任务请求。并且每个worker共享一个退出通道(QuitChan),用于工作协程的集中优雅退出。

package main

import (
  "fmt"
  "time"
)

// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest,stopCh chan bool) Worker {
  // Create, and return the worker.
  worker := Worker{
    ID:          id,
    Work:        make(chan WorkRequest),
    WorkerQueue: workerQueue,
    QuitChan:    stopCh}
  
  return worker
}

type Worker struct {
  ID          int
  Work        chan WorkRequest
  WorkerQueue chan chan WorkRequest
  QuitChan    chan bool
}

// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w *Worker) Start() {
    go func() {
      for {
        // Add ourselves into the worker queue.
        w.WorkerQueue <- w.Work
        
        select {
        case work := <-w.Work:
          // Receive a work request.
          fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds())
          
          time.Sleep(work.Delay)
          fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name)
          
        case <-w.QuitChan:
          // We have been asked to stop.
          fmt.Printf("worker%d stopping\n", w.ID)
          return
        }
      }
    }()
}

// Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w *Worker) Stop() {
  go func() {
    w.QuitChan <- true
  }()
}

步骤四: dispatch任务分发的实现 ,dispatcher声明了全局的工作者队列(WorkerQueue),它用来接收每个空闲worker注册的工作通道(WorkRequest channel)。StartDispatcher方法里根据需要的worker数量,初始化worker队列。然后我们创建所需数量的 worker实例并“启动”它们。 最后一段代码,起了一个匿名协程,负责从全局任务队列(work queue)获取任务请求 (WorkRequest);然后在另一个匿名协程中,从全局工人队列(worker queue)“唤醒”一个 worker;最后将任务请求分发给worker。我们通过另外起一 个协程来“唤醒”worker并分配任务的原因在于,该操作(worker := <-WorkerQueue)是有可能阻塞的,将其异步化可以确保任务队列不会堆积。

package main
import "fmt"
var WorkerQueue chan chan WorkRequest
var stopCh chan bool
func StartDispatcher(nworkers int) {
  // First, initialize the channel we are going to but the workers' work channels into.
  WorkerQueue = make(chan chan WorkRequest, nworkers)
  stopCh = make(chan bool)
  // Now, create all of our workers.
  for i := 0; i<nworkers; i++ {
    fmt.Println("Starting worker", i+1)
    worker := NewWorker(i+1, WorkerQueue,stopCh)
    worker.Start()
  }
  
  go func() {
    for {
      select {
      case work := <-WorkQueue:
        fmt.Println("Received work requeust")
        go func() {
          worker := <-WorkerQueue
          
          fmt.Println("Dispatching work request")
          worker <- work
        }()
      }
    }
  }()
}

步骤五: 聚合组件处理请求。main函数是分发器的运行入口,允许客户指定worker的数量,以及HTTP server需要监听的地址。这两个命令行参数是可选的,我们提供了默认值。

package main

import (
  "flag"
  "fmt"
  "net/http"
)

var (
  NWorkers = flag.Int("n", 4, "The number of workers to start")
  HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on")
)

func main() {
  // Parse the command-line flags.
  flag.Parse()
  
  // Start the dispatcher.
  fmt.Println("Starting the dispatcher")
  StartDispatcher(*NWorkers)
  
  // Register our collector as an HTTP handler function.
  fmt.Println("Registering the collector")
  http.HandleFunc("/work", Collector)
    
  go func(){
 	 // Start the HTTP server!
  	 fmt.Println("HTTP server listening on", *HTTPAddr)
  	if err := http.ListenAndServe(*HTTPAddr, nil); err != nil {
   		 fmt.Println(err.Error())
     }
  }()
   // 信号处理
  c := make(chan os.Signal)
  signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
  select {
	case sig := <-c:
	  fmt.Printf("Got %s signal. Aborting...\n", sig)
      close(stopCh)
  }
}
上一篇:【博客413】Go 优雅关闭channel


下一篇:Go channel的使用场景