原文链接:Writing worker queues, in Go
1.work.go
[root@wangjq queue]# cat work.go package main import "time" type WorkRequest struct { Name string Delay time.Duration }
2.collector.go
[root@wangjq queue]# cat collector.go package main import ( "fmt" "net/http" "time" ) // A buffered channel that we can send work requests on. var WorkQueue = make(chan WorkRequest, 100) func Collector(w http.ResponseWriter, r *http.Request) { // Make sure we can only be called with an HTTP POST request. if r.Method != "POST" { w.Header().Set("Allow", "POST") w.WriteHeader(http.StatusMethodNotAllowed) return } // Parse the delay. delay, err := time.ParseDuration(r.FormValue("delay")) if err != nil { http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) return } // Check to make sure the delay is anywhere from 1 to 10 seconds. if delay.Seconds() < 1 || delay.Seconds() > 10 { http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) return } // Now, we retrieve the person's name from the request. name := r.FormValue("name") // Just do a quick bit of sanity checking to make sure the client actually provided us with a name. if name == "" { http.Error(w, "You must specify a name.", http.StatusBadRequest) return } // Now, we take the delay, and the person's name, and make a WorkRequest out of them. work := WorkRequest{Name: name, Delay: delay} // Push the work onto the queue. WorkQueue <- work fmt.Println("Work request queued") // And let the user know their work request was created. w.WriteHeader(http.StatusCreated) return }
3.worker.go
[root@wangjq queue]# cat worker.go package main import ( "fmt" "time" ) // NewWorker creates, and returns a new Worker object. Its only argument // is a channel that the worker can add itself to whenever it is done its // work. func NewWorker(id int, workerQueue chan chan WorkRequest) Worker { // Create, and return the worker. worker := Worker{ ID: id, Work: make(chan WorkRequest), WorkerQueue: workerQueue, QuitChan: make(chan bool)} return worker } type Worker struct { ID int Work chan WorkRequest WorkerQueue chan chan WorkRequest QuitChan chan bool } // This function "starts" the worker by starting a goroutine, that is // an infinite "for-select" loop. func (w *Worker) Start() { go func() { for { // Add ourselves into the worker queue. w.WorkerQueue <- w.Work select { case work := <-w.Work: // Receive a work request. fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds()) time.Sleep(work.Delay) fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name) case <-w.QuitChan: // We have been asked to stop. fmt.Printf("worker%d stopping\n", w.ID) return } } }() } // Stop tells the worker to stop listening for work requests. // // Note that the worker will only stop *after* it has finished its work. func (w *Worker) Stop() { go func() { w.QuitChan <- true }() }
4.dispatcher.go
[root@wangjq queue]# cat dispatcher.go package main import "fmt" var WorkerQueue chan chan WorkRequest func StartDispatcher(nworkers int) { // First, initialize the channel we are going to but the workers' work channels into. WorkerQueue = make(chan chan WorkRequest, nworkers) // Now, create all of our workers. for i := 0; i < nworkers; i++ { fmt.Println("Starting worker", i+1) worker := NewWorker(i+1, WorkerQueue) worker.Start() } go func() { for { select { case work := <-WorkQueue: fmt.Println("Received work requeust") go func() { worker := <-WorkerQueue fmt.Println("Dispatching work request") worker <- work }() } } }() }
5.main.go
[root@wangjq queue]# cat main.go package main import ( "flag" "fmt" "net/http" ) var ( NWorkers = flag.Int("n", 4, "The number of workers to start") HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on") ) func main() { // Parse the command-line flags. flag.Parse() // Start the dispatcher. fmt.Println("Starting the dispatcher") StartDispatcher(*NWorkers) // Register our collector as an HTTP handler function. fmt.Println("Registering the collector") http.HandleFunc("/work", Collector) // Start the HTTP server! fmt.Println("HTTP server listening on", *HTTPAddr) if err := http.ListenAndServe(*HTTPAddr, nil); err != nil { fmt.Println(err.Error()) } }
6.编译
[root@wangjq queue]# go build -o queued *.go
7.运行
[root@wangjq queue]# ./queued -n 5 Starting the dispatcher Starting worker 1 Starting worker 2 Starting worker 3 Starting worker 4 Starting worker 5 Registering the collector HTTP server listening on 127.0.0.1:8000
8.测试
[root@wangjq ~]# for i in {1..3}; do curl localhost:8000/work -d name=$USER -d delay=$(expr $i % 11)s; done
9.效果
[root@wangjq queue]# ./queued -n 5 Starting the dispatcher Starting worker 1 Starting worker 2 Starting worker 3 Starting worker 4 Starting worker 5 Registering the collector HTTP server listening on 127.0.0.1:8000 Work request queued Received work requeust Dispatching work request worker1: Received work request, delaying for 1.000000 seconds Work request queued Received work requeust Dispatching work request worker2: Received work request, delaying for 2.000000 seconds Work request queued Received work requeust Dispatching work request worker4: Received work request, delaying for 3.000000 seconds worker1: Hello, root! worker2: Hello, root! worker4: Hello, root!