Go 9
并发之goroutine
并发和并行的区别
goroutine的启动
将要并发执行的任务包装成一个函数,调用函数的时候前面加上go
关键字,就能够开启一个goroutine去执行该函数的任务
goroutine对应的函数执行完,该goroutine就结束了。
程序启动的时候就会自动创建一个goroutine去执行main函数
main函数结束了,那么程序也就结束了,由该程序启动的所有其他goroutine也都结束了。
goroutine的本质
goroutine的调度模型:GMP
M:N
:把m个goroutine分配给n个操作系统线程
goroutine与操作系统线程(os线程)的区别
goroutine是用户态的线程,比内核态的线程更轻量级一点。初始时只占用2KB的栈空间,可以轻松开启数十万的goroutine也不会崩内存
runtime.GOMAXPROCS
Go1.5之后默认就是操作系统的逻辑核心数,默认跑满CPU
runtime.GOMAXPROCS(1)
:只占用一个核。多用于日志监控等轻量级程序
wooker pool模式
开启一定数量的goroutine
package main
import (
"fmt"
"sync"
"time"
)
// worker pool
var wg sync.WaitGroup
var notice = make(chan struct{}, 5)
func worker(id int, jobs <-chan int, results chan<- int) {
defer wg.Done()
for j := range jobs {
fmt.Printf("worker:%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d\n", id, j)
results <- j * 2
notice <- struct{}{} // 通知
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 5个任务
go func() {
for j := 0; j < 5; j++ {
jobs <- j
}
close(jobs)
}()
// 开启3个goroutine
wg.Add(3)
for w := 0; w < 3; w++ {
go worker(w, jobs, results)
}
go func() {
for i := 0; i < 5; i++ {
<-notice
}
close(results)
}()
// 输出结果
for x := range results {
fmt.Println(x)
}
// for a := 1; a < 5; a++ {
// <-results
// }
}
sync.WaitGroup
var wg sync.WaitGroup
- wg.Add(1) :计数器+1
- wg.Done() :计数器-1
- wg.Wait() :等
channel
为什么需要channel
通过channel实现多个goroutine之间的通信
CSP
通过通信来共享内存
channel是一种类型,一种引用类型。make函数初始化之后才能使用.(slice,map,channel)
channel的声明:
var ch chan 元素类型
channel的初始化:
ch = make(chan 元素类型, [缓冲区大小])
channel的操作:
- 发送:
ch <- 100
- 接收:
x:= <- ch
- 关闭:
close(ch)
带缓冲区的通道和无缓冲区的通道:
快递员送快递的示例,有缓冲区就是有快递柜
for {
x, ok := <-ch // 再取阻塞
if !ok{ // 什么时候ok=false? ch通道被关闭的时候
break
}
fmt.Println(x, ok)
time.Sleep(time.Second)
}
for x := range ch {
fmt.Println(x)
}
单向通道:
通常是用作函数的参数,只读通道<- chan
和只写通道chan <- int
通道的各种考虑情况:
select多路复用
同一时刻有多个通道要操作的场景下,使用select。
使用select
语句能提高代码的可读性。
- 可处理一个或多个channel的发送/接收操作。
- 如果多个
case
同时满足,select
会随机选择一个。 - 对于没有
case
的select{}
会一直等待,可用于阻塞main函数。
今日内容
同步锁
互斥锁
package main
import (
"fmt"
"sync"
)
// 锁
var x = 0
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
defer wg.Done()
for i := 0; i < 50000; i++ {
lock.Lock()
x++
lock.Unlock()
}
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
读写互斥锁
package main
import (
"fmt"
"sync"
"time"
)
// 读写互斥锁rwlock
var lock sync.Mutex
var rwlock sync.RWMutex
var wg sync.WaitGroup
var x = 0
func read() {
defer wg.Done()
rwlock.RLock()
fmt.Println(x)
time.Sleep(time.Millisecond)
rwlock.RUnlock()
}
func write() {
defer wg.Done()
rwlock.RLock()
x++
time.Sleep(5 * time.Millisecond)
rwlock.RUnlock()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
fmt.Println(time.Since(start))
}
sync包
sync.once
package main
import (
"fmt"
"sync"
)
// sync.once
var wg sync.WaitGroup
var once sync.Once
func f1(ch1 chan<- int) {
defer wg.Done()
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1)
}
func f2(ch1 <-chan int, ch2 chan<- int) {
defer wg.Done()
for {
x, ok := <-ch1
if !ok {
break
}
ch2 <- x * x
}
once.Do(func() { close(ch2) }) // 确保某个操作只执行一次
}
func main() {
a := make(chan int, 100)
b := make(chan int, 100)
wg.Add(3)
go f1(a)
go f2(a, b)
go f2(a, b)
wg.Wait()
for ret := range b {
fmt.Println(ret)
}
}
sync.map
版本1:慢
package main
import (
"fmt"
"strconv"
"sync"
)
// sync.map
// Go内置的map不是并发安全的
var (
m = make(map[string]int)
lock sync.Mutex
)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 21; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
lock.Lock()
set(key, n)
lock.Unlock()
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
版本2:快
func main() {
m2 := sync.Map{}
for i := 0; i < 25; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m2.Store(key, n) // 必须使用sync.map内置的store去存值
value, _ := m2.Load(key) // 必须使用sync.map内置的load方法根据key去取值
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
atomic包(原子性)
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// 原子操作
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
// func add() {
// defer wg.Done()
// lock.Lock()
// x++
// lock.Unlock()
// }
func add1() {
atomic.AddInt64(&x, 1)
wg.Done()
}
func main() {
// wg.Add(100000)
// for i := 0; i < 100000; i++ {
// go add1()
// }
// wg.Wait()
// fmt.Println(x)
// 比较并交换
x = 100
ok := atomic.CompareAndSwapInt64(&x, 100, 300) // x是否等于100,若是则改为300,不是则返回false
fmt.Println(ok, x)
}
网络编程
server端
package main
import (
"fmt"
"net"
"strings"
)
// tcp server端
func processConn(conn net.Conn) {
defer conn.Close()
// 3.与客户端通信
var tmp [128]byte
for {
n, err := conn.Read(tmp[:])
if err != nil {
fmt.Printf("read failed, err:%v\n", err)
return
}
msg := string(tmp[:n])
fmt.Println(msg)
reback := strings.ToUpper(msg)
conn.Write([]byte(reback))
}
}
func main() {
// 1.本地端口启动服务
listener, err := net.Listen("tcp", "localhost:9000")
if err != nil {
fmt.Printf("start failed, err:%v\n", err)
return
}
defer listener.Close()
// 2.等待别人来连接
for {
conn, err := listener.Accept()
if err != nil {
fmt.Printf("accept failed, err:%v\n", err)
return
}
go processConn(conn)
}
}
client端
package main
import (
"bufio"
"fmt"
"net"
"os"
)
// tcp client
func main() {
// 1.与server端建立链接
conn, err := net.Dial("tcp", "localhost:9000")
if err != nil {
fmt.Printf("dial failed, err:%v\n", err)
return
}
// 2.发送数据
// var msg string
var tmp [128]byte
reader := bufio.NewReader(os.Stdin)
for {
fmt.Print("发送到服务端")
// fmt.Scan(&msg) // 有空格会有点小问题
text, _ := reader.ReadString('\n') // 读到换行
if text == "quit" {
break
}
conn.Write([]byte(text))
n, err := conn.Read(tmp[:])
if err != nil {
fmt.Printf("read failed, err:%v\n", err)
return
}
fmt.Println(string(tmp[:n]))
}
}
TCP粘包
大端和小端
解决粘包问题
// socket_stick/proto/proto.go
package proto
import (
"bufio"
"bytes"
"encoding/binary"
)
// Encode 将消息编码
func Encode(message string) ([]byte, error) {
// 读取消息的长度,转换成int32类型(占4个字节)
var length = int32(len(message))
var pkg = new(bytes.Buffer)
// 写入消息头
err := binary.Write(pkg, binary.LittleEndian, length)
if err != nil {
return nil, err
}
// 写入消息实体
err = binary.Write(pkg, binary.LittleEndian, []byte(message))
if err != nil {
return nil, err
}
return pkg.Bytes(), nil
}
// Decode 解码消息
func Decode(reader *bufio.Reader) (string, error) {
// 读取消息的长度
lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据
lengthBuff := bytes.NewBuffer(lengthByte)
var length int32
err := binary.Read(lengthBuff, binary.LittleEndian, &length)
if err != nil {
return "", err
}
// Buffered返回缓冲中现有的可读取的字节数。
if int32(reader.Buffered()) < length+4 {
return "", err
}
// 读取真正的消息数据
pack := make([]byte, int(4+length))
_, err = reader.Read(pack)
if err != nil {
return "", err
}
return string(pack[4:]), nil
}
server服务端
package main
import (
"bufio"
"fmt"
"io"
"net"
proto "go.study.com/hina/day01/day09/04nianbao/protocol"
)
// socket_stick/server/main.go
func process(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
recvStr, err := proto.Decode(reader)
// n, err := reader.Read(buf[:])
if err == io.EOF {
break
}
if err != nil {
fmt.Println("read from client failed, err:", err)
break
}
fmt.Println("收到client发来的数据:", recvStr)
}
}
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:30000")
if err != nil {
fmt.Println("listen failed, err:", err)
return
}
defer listen.Close()
for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept failed, err:", err)
continue
}
go process(conn)
}
}
client客户端
package main
import (
"fmt"
"net"
proto "go.study.com/hina/day01/day09/04nianbao/protocol"
)
// socket_stick/client/main.go
func main() {
conn, err := net.Dial("tcp", "127.0.0.1:30000")
if err != nil {
fmt.Println("dial failed, err", err)
return
}
defer conn.Close()
for i := 0; i < 20; i++ {
msg := `Hello, Hello. How are you?`
// 调用协议编码数据
b, _ := proto.Encode(msg)
conn.Write([]byte(b))
}
}
udp
服务端server
package main
import (
"fmt"
"net"
"strings"
)
// UDP server
func main() {
conn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 9000,
})
if err != nil {
fmt.Printf("conn failed, err:%v\n", err)
return
}
defer conn.Close()
// 不需要建立链接,直接发数据
var data [1024]byte
for {
n, addr, err := conn.ReadFromUDP(data[:])
if err != nil {
fmt.Printf("read failed, err:%v\n", err)
return
}
fmt.Println(data[:n])
reply := strings.ToUpper(string(data[:n]))
// 发送数据
conn.WriteToUDP([]byte(reply), addr)
}
}
客户端client
package main
import (
"bufio"
"fmt"
"net"
"os"
)
// UDP client
func main() {
socket, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 9000,
})
if err != nil {
fmt.Printf("socket failed, err:%v\n", err)
return
}
defer socket.Close()
var reply [1024]byte
reader := bufio.NewReader(os.Stdin)
for {
fmt.Print("发送到服务端:")
msg, _ := reader.ReadString('\n')
socket.Write([]byte(msg))
// 收回复的数据
n, _, err := socket.ReadFromUDP(reply[:])
if err != nil {
fmt.Printf("read failed, err:%v\n", err)
return
}
fmt.Println("收到回复信息:", string(reply[:n]))
}
}