golang总结-并发

2.7 并发编程

go协程

golang 通过一个go关键字就可以开启一个协程。

func main() {
//两个交错输出
go sayHello()
go sayHello2()
time.Sleep(time.Second * 3) //阻塞主线程
} func sayHello() {
for i := 0; i < 30; i++ {
fmt.Println("hello world")
}
} func sayHello2() {
for i := 0; i < 30; i++ {
fmt.Println("你好中国")
}
}
//通过sync.WaitGroup来等待所有线程完成
package main import (
"fmt"
"sync"
) func main() {
var w = &sync.WaitGroup{}
w.Add(2)
go sayEn(w)
go sayZh(w)
w.Wait()
} func sayEn(w *sync.WaitGroup) {
for i := 0; i < 30; i++ {
fmt.Println("hello world")
}
w.Done() //每当这个方法完成则减少1
} func sayZh(w *sync.WaitGroup) {
for i := 0; i < 30; i++ {
fmt.Println("中国你好")
}
w.Done() //每当这个方法完成则减少1
}

go管道

管道的定义:

//无缓冲管道
flag := make(chan bool)
//有缓冲管道
data := make(chan int, 10)
//向管道中添加值
data <- 10
//从管道中取值
agr := <- data
<- data //也可以直接释放值,不用变量接收

1. 通过go实现同步

package main

import (
"fmt"
) func main() {
w1, w2 := make(chan bool), make(chan bool)
go sayEn_chan(w1)
go sayZh_chan(w2)
<- w1 //阻塞,直到chan 可以取出数据
<- w2
} func sayEn_chan(w chan bool) {
for i := 0; i < 30; i++ {
fmt.Println("hello world")
}
w <- true //方法完成写入通道
} func sayZh_chan(w chan bool) {
for i := 0; i < 30; i++ {
fmt.Println("中国你好")
}
w <- true
}

2. 正确处理累加

package main

import (
"fmt"
"sync/atomic"
) var (
count int64
) func main() {
w1, w2 := make(chan bool), make(chan bool)
go add(w1)
go add(w2)
<- w1 //阻塞,直到chan 可以取出数据
<- w2
fmt.Println(count)
} func add(w chan bool) {
for i := 0; i < 5000; i++ {
atomic.AddInt64(&count, 1)
}
w <- true
}

3. 通道实现数据共享

package main

import (
"fmt"
"math/rand"
"sync"
) var wg sync.WaitGroup func main() {
count := make(chan int)
wg.Add(2)
go player("张三", count)
go player("李四", count)
//发球
count <- 1
wg.Wait() //阻塞等待2个线程完成
} func player(name string, count chan int) {
defer wg.Done() for {
i, ok := <-count if !ok { //通道关闭
fmt.Printf("运动员 %s 赢了\n", name)
return
} tmp := rand.Intn(100)
if tmp % 13 == 0 { //没有接到球
fmt.Printf("运动员 %s 输了\n", name)
close(count)
return
}
fmt.Printf("运动员 %s 击球 %d \n", name , i)
i ++
count <- i
}
}

4. 缓冲管道

package main

import (
"fmt"
"sync"
"time"
) var (
numberTasks = 10
workers = 4
) var wg2 sync.WaitGroup func main() {
wg2.Add(workers)
tasks := make(chan int, numberTasks) for i := 0; i < workers; i++ {
go work(tasks, i)
} for j := 1; j <= numberTasks; j++ {
tasks <- j
}
close(tasks) wg2.Wait()
} func work(tasks chan int, worker int) {
defer wg2.Done()
for {
task, ok := <- tasks
if !ok {
fmt.Printf("任务完成,工号:%d\n", worker)
return
}
fmt.Printf("工号:%d, 开始工作:%d\n", worker, task)
time.Sleep(time.Microsecond * 100)
fmt.Printf("工号:%d, 完成工作:%d\n", worker, task) } }

5. select

select 的特点是:不会阻塞,哪个管道有值,我取哪个。所以,下面当运行到go的时候,a,b还没有添值,所以只能选择defaul运行,这里可以把defualt部分和b<-2去掉,select会被阻塞,直到a<-1执行

func main() {
a := make(chan int)
b := make(chan int)
go func() {
b <- 2
time.Sleep(time.Second * 3)
a <- 1
}()
select {
case <- a:
fmt.Println("a")
case <- b:
fmt.Println("b")
time.Sleep(time.Second * 3)
default:
fmt.Println("hello world")
}
}

6. runner并发模型

package runner

import (
"errors"
"os"
"os/signal"
"time"
) type Runner struct {
interrupt chan os.Signal complete chan error timeout <-chan time.Time //声明一个只读的管道 tasks []func(int)
} var ErrorTimeout = errors.New("receive timeout") var ErrorInterrupt = errors.New("interrupt error") func New(duration time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(duration),
}
} func (r *Runner) Add(tasks...func(int)) {
r.tasks = append(r.tasks, tasks...)
} func (r *Runner) getInterrupt() bool {
select {
case <-r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
} func (r *Runner) run() error {
for id, task := range r.tasks {
if r.getInterrupt() {
return ErrorInterrupt
}
task(id)
}
return nil
} func (r *Runner) Start() error {
signal.Notify(r.interrupt, os.Interrupt)
go func() {
r.complete <- r.run()
}() select {
case err := <- r.complete:
return err
case <- r.timeout:
return ErrorTimeout
}
}

测试

package main

import (
"gorounting/runner"
"log"
"os"
"time"
) const (
timeout = 4 * time.Second
) func main() {
log.Println("任务开始")
ru := runner.New(timeout)
ru.Add(createTask(), createTask(), createTask(), createTask()) if err := ru.Start(); err != nil {
switch err {
case runner.ErrorInterrupt:
log.Println("系统被中断")
os.Exit(1)
case runner.ErrorTimeout:
log.Println("系统超时")
os.Exit(2) }
}
log.Println("程序结束") } func createTask() func(int) {
return func(id int) {
log.Printf("process-task #%d\n", id)
time.Sleep(time.Duration(id) * time.Second )
}
}
上一篇:js复习---string


下一篇:angular2--Tour of Heroes学习和分析--路由