go 多线程并发 queue demo

原文链接:Writing worker queues, in Go

1.work.go 

[root@wangjq queue]# cat work.go 
package main

import "time"

type WorkRequest struct {
        Name  string
        Delay time.Duration
}

2.collector.go

[root@wangjq queue]# cat collector.go
package main

import (
        "fmt"
        "net/http"
        "time"
)

// A buffered channel that we can send work requests on.
var WorkQueue = make(chan WorkRequest, 100)

func Collector(w http.ResponseWriter, r *http.Request) {
        // Make sure we can only be called with an HTTP POST request.
        if r.Method != "POST" {
                w.Header().Set("Allow", "POST")
                w.WriteHeader(http.StatusMethodNotAllowed)
                return
        }

        // Parse the delay.
        delay, err := time.ParseDuration(r.FormValue("delay"))
        if err != nil {
                http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
                return
        }

        // Check to make sure the delay is anywhere from 1 to 10 seconds.
        if delay.Seconds() < 1 || delay.Seconds() > 10 {
                http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
                return
        }

        // Now, we retrieve the person's name from the request.
        name := r.FormValue("name")

        // Just do a quick bit of sanity checking to make sure the client actually provided us with a name.
        if name == "" {
                http.Error(w, "You must specify a name.", http.StatusBadRequest)
                return
        }

        // Now, we take the delay, and the person's name, and make a WorkRequest out of them.
        work := WorkRequest{Name: name, Delay: delay}

        // Push the work onto the queue.
        WorkQueue <- work
        fmt.Println("Work request queued")

        // And let the user know their work request was created.
        w.WriteHeader(http.StatusCreated)
        return
}

3.worker.go

[root@wangjq queue]# cat worker.go
package main

import (
        "fmt"
        "time"
)

// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest) Worker {
        // Create, and return the worker.
        worker := Worker{
                ID:          id,
                Work:        make(chan WorkRequest),
                WorkerQueue: workerQueue,
                QuitChan:    make(chan bool)}

        return worker
}

type Worker struct {
        ID          int
        Work        chan WorkRequest
        WorkerQueue chan chan WorkRequest
        QuitChan    chan bool
}

// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w *Worker) Start() {
        go func() {
                for {
                        // Add ourselves into the worker queue.
                        w.WorkerQueue <- w.Work

                        select {
                        case work := <-w.Work:
                                // Receive a work request.
                                fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds())

                                time.Sleep(work.Delay)
                                fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name)

                        case <-w.QuitChan:
                                // We have been asked to stop.
                                fmt.Printf("worker%d stopping\n", w.ID)
                                return
                        }
                }
        }()
}

// Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w *Worker) Stop() {
        go func() {
                w.QuitChan <- true
        }()
}

4.dispatcher.go

[root@wangjq queue]# cat dispatcher.go
package main

import "fmt"

var WorkerQueue chan chan WorkRequest

func StartDispatcher(nworkers int) {
        // First, initialize the channel we are going to but the workers' work channels into.
        WorkerQueue = make(chan chan WorkRequest, nworkers)

        // Now, create all of our workers.
        for i := 0; i < nworkers; i++ {
                fmt.Println("Starting worker", i+1)
                worker := NewWorker(i+1, WorkerQueue)
                worker.Start()
        }

        go func() {
                for {
                        select {
                        case work := <-WorkQueue:
                                fmt.Println("Received work requeust")
                                go func() {
                                        worker := <-WorkerQueue

                                        fmt.Println("Dispatching work request")
                                        worker <- work
                                }()
                        }
                }
        }()
}

5.main.go

[root@wangjq queue]# cat main.go
package main

import (
        "flag"
        "fmt"
        "net/http"
)

var (
        NWorkers = flag.Int("n", 4, "The number of workers to start")
        HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on")
)

func main() {
        // Parse the command-line flags.
        flag.Parse()

        // Start the dispatcher.
        fmt.Println("Starting the dispatcher")
        StartDispatcher(*NWorkers)

        // Register our collector as an HTTP handler function.
        fmt.Println("Registering the collector")
        http.HandleFunc("/work", Collector)

        // Start the HTTP server!
        fmt.Println("HTTP server listening on", *HTTPAddr)
        if err := http.ListenAndServe(*HTTPAddr, nil); err != nil {
                fmt.Println(err.Error())
        }
}

6.编译

[root@wangjq queue]# go build -o queued *.go

7.运行

[root@wangjq queue]# ./queued -n 5
Starting the dispatcher
Starting worker 1
Starting worker 2
Starting worker 3
Starting worker 4
Starting worker 5
Registering the collector
HTTP server listening on 127.0.0.1:8000

8.测试

[root@wangjq ~]# for i in {1..3}; do curl localhost:8000/work -d name=$USER -d delay=$(expr $i % 11)s; done

9.效果

[root@wangjq queue]# ./queued -n 5
Starting the dispatcher
Starting worker 1
Starting worker 2
Starting worker 3
Starting worker 4
Starting worker 5
Registering the collector
HTTP server listening on 127.0.0.1:8000
Work request queued
Received work requeust
Dispatching work request
worker1: Received work request, delaying for 1.000000 seconds
Work request queued
Received work requeust
Dispatching work request
worker2: Received work request, delaying for 2.000000 seconds
Work request queued
Received work requeust
Dispatching work request
worker4: Received work request, delaying for 3.000000 seconds
worker1: Hello, root!
worker2: Hello, root!
worker4: Hello, root!

 

上一篇:Golang进阶之关于channel全面分析(三)


下一篇:Zynq Axidma linux下驱动axidmatest.c 驱动分析