Go 09锁、sync、网络编程

Go 9

并发之goroutine

并发和并行的区别

goroutine的启动

将要并发执行的任务包装成一个函数,调用函数的时候前面加上go关键字,就能够开启一个goroutine去执行该函数的任务

goroutine对应的函数执行完,该goroutine就结束了。

程序启动的时候就会自动创建一个goroutine去执行main函数

main函数结束了,那么程序也就结束了,由该程序启动的所有其他goroutine也都结束了。

goroutine的本质

goroutine的调度模型:GMP

M:N:把m个goroutine分配给n个操作系统线程

goroutine与操作系统线程(os线程)的区别

goroutine是用户态的线程,比内核态的线程更轻量级一点。初始时只占用2KB的栈空间,可以轻松开启数十万的goroutine也不会崩内存

runtime.GOMAXPROCS

Go1.5之后默认就是操作系统的逻辑核心数,默认跑满CPU

runtime.GOMAXPROCS(1):只占用一个核。多用于日志监控等轻量级程序

wooker pool模式

开启一定数量的goroutine

package main

import (
	"fmt"
	"sync"
	"time"
)

// worker pool
var wg sync.WaitGroup
var notice = make(chan struct{}, 5)

func worker(id int, jobs <-chan int, results chan<- int) {
	defer wg.Done()
	for j := range jobs {
		fmt.Printf("worker:%d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("worker:%d end job:%d\n", id, j)
		results <- j * 2
		notice <- struct{}{} // 通知
	}
}

func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)

	// 5个任务
	go func() {
		for j := 0; j < 5; j++ {
			jobs <- j
		}
		close(jobs)
	}()

	// 开启3个goroutine
	wg.Add(3)
	for w := 0; w < 3; w++ {
		go worker(w, jobs, results)
	}

	go func() {
		for i := 0; i < 5; i++ {
			<-notice
		}
		close(results)
	}()

	// 输出结果

	for x := range results {
		fmt.Println(x)
	}

	// for a := 1; a < 5; a++ {
	// 	<-results
	// }
}

sync.WaitGroup

var wg sync.WaitGroup

  • wg.Add(1) :计数器+1
  • wg.Done() :计数器-1
  • wg.Wait() :等

channel

为什么需要channel

通过channel实现多个goroutine之间的通信

CSP通过通信来共享内存

channel是一种类型,一种引用类型。make函数初始化之后才能使用.(slice,map,channel)

channel的声明:

var ch chan 元素类型

channel的初始化:

ch = make(chan 元素类型, [缓冲区大小])

channel的操作:

  • 发送:ch <- 100
  • 接收:x:= <- ch
  • 关闭:close(ch)

带缓冲区的通道和无缓冲区的通道:

快递员送快递的示例,有缓冲区就是有快递柜

for {
		x, ok := <-ch // 再取阻塞
		if !ok{  // 什么时候ok=false? ch通道被关闭的时候
			break
		}
		fmt.Println(x, ok)
		time.Sleep(time.Second)
	}
for x := range ch {
		fmt.Println(x)
	}

单向通道:

通常是用作函数的参数,只读通道<- chan和只写通道chan <- int

通道的各种考虑情况:

Go 09锁、sync、网络编程

select多路复用

同一时刻有多个通道要操作的场景下,使用select。

使用select语句能提高代码的可读性。

  • 可处理一个或多个channel的发送/接收操作。
  • 如果多个case同时满足,select会随机选择一个。
  • 对于没有caseselect{}会一直等待,可用于阻塞main函数。

今日内容

同步锁

互斥锁

package main

import (
	"fmt"
	"sync"
)

// 锁

var x = 0
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
	defer wg.Done()
	for i := 0; i < 50000; i++ {
		lock.Lock()
		x++
		lock.Unlock()
	}
}

func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

读写互斥锁

package main

import (
	"fmt"
	"sync"
	"time"
)

// 读写互斥锁rwlock

var lock sync.Mutex
var rwlock sync.RWMutex
var wg sync.WaitGroup
var x = 0

func read() {
	defer wg.Done()
	rwlock.RLock()
	fmt.Println(x)
	time.Sleep(time.Millisecond)
	rwlock.RUnlock()
}

func write() {
	defer wg.Done()
	rwlock.RLock()
	x++
	time.Sleep(5 * time.Millisecond)
	rwlock.RUnlock()
}

func main() {
	start := time.Now()
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go read()
	}
	wg.Wait()
	fmt.Println(time.Since(start))
}

sync包

sync.once

package main

import (
	"fmt"
	"sync"
)

// sync.once

var wg sync.WaitGroup
var once sync.Once

func f1(ch1 chan<- int) {
	defer wg.Done()
	for i := 0; i < 100; i++ {
		ch1 <- i
	}
	close(ch1)
}

func f2(ch1 <-chan int, ch2 chan<- int) {
	defer wg.Done()
	for {
		x, ok := <-ch1
		if !ok {
			break
		}
		ch2 <- x * x
	}
	once.Do(func() { close(ch2) }) // 确保某个操作只执行一次
}

func main() {
	a := make(chan int, 100)
	b := make(chan int, 100)
	wg.Add(3)
	go f1(a)
	go f2(a, b)
	go f2(a, b)
	wg.Wait()
	for ret := range b {
		fmt.Println(ret)
	}
}

sync.map

版本1:慢

package main

import (
	"fmt"
	"strconv"
	"sync"
)

// sync.map

// Go内置的map不是并发安全的
var (
	m    = make(map[string]int)
	lock sync.Mutex
)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 21; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			lock.Lock()
			set(key, n)
			lock.Unlock()
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}

版本2:快

func main() {
	m2 := sync.Map{}

	for i := 0; i < 25; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m2.Store(key, n)         // 必须使用sync.map内置的store去存值
			value, _ := m2.Load(key) // 必须使用sync.map内置的load方法根据key去取值
			fmt.Printf("k=:%v,v:=%v\n", key, value)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

atomic包(原子性)

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

// 原子操作

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

// func add() {
// 	defer wg.Done()
// 	lock.Lock()
// 	x++
// 	lock.Unlock()
// }

func add1() {
	atomic.AddInt64(&x, 1)
	wg.Done()
}

func main() {
	// wg.Add(100000)
	// for i := 0; i < 100000; i++ {
	// 	go add1()
	// }
	// wg.Wait()
	// fmt.Println(x)

	// 比较并交换
	x = 100
	ok := atomic.CompareAndSwapInt64(&x, 100, 300) // x是否等于100,若是则改为300,不是则返回false
	fmt.Println(ok, x)
}

网络编程

server端

package main

import (
	"fmt"
	"net"
	"strings"
)

// tcp server端

func processConn(conn net.Conn) {
    defer conn.Close()
    // 3.与客户端通信
	var tmp [128]byte
	for {

		n, err := conn.Read(tmp[:])
		if err != nil {
			fmt.Printf("read failed, err:%v\n", err)
			return
		}
		msg := string(tmp[:n])
		fmt.Println(msg)
		reback := strings.ToUpper(msg)
		conn.Write([]byte(reback))
	}
}

func main() {
	// 1.本地端口启动服务
	listener, err := net.Listen("tcp", "localhost:9000")
	if err != nil {
		fmt.Printf("start failed, err:%v\n", err)
		return
	}
    defer listener.Close()
	// 2.等待别人来连接
	for {
		conn, err := listener.Accept()
		if err != nil {
			fmt.Printf("accept failed, err:%v\n", err)
			return
		}
		go processConn(conn)

	}
}

client端

package main

import (
	"bufio"
	"fmt"
	"net"
	"os"
)

// tcp client

func main() {
	// 1.与server端建立链接
	conn, err := net.Dial("tcp", "localhost:9000")
	if err != nil {
		fmt.Printf("dial failed, err:%v\n", err)
		return
	}
	// 2.发送数据
	// var msg string
	var tmp [128]byte
	reader := bufio.NewReader(os.Stdin)
	for {
		fmt.Print("发送到服务端")
		// fmt.Scan(&msg)  // 有空格会有点小问题
		text, _ := reader.ReadString('\n') // 读到换行
		if text == "quit" {
			break
		}
		conn.Write([]byte(text))
		n, err := conn.Read(tmp[:])
		if err != nil {
			fmt.Printf("read failed, err:%v\n", err)
			return
		}
		fmt.Println(string(tmp[:n]))
	}
}

TCP粘包

大端和小端

解决粘包问题

// socket_stick/proto/proto.go
package proto

import (
	"bufio"
	"bytes"
	"encoding/binary"
)

// Encode 将消息编码
func Encode(message string) ([]byte, error) {
	// 读取消息的长度,转换成int32类型(占4个字节)
	var length = int32(len(message))
	var pkg = new(bytes.Buffer)
	// 写入消息头
	err := binary.Write(pkg, binary.LittleEndian, length)
	if err != nil {
		return nil, err
	}
	// 写入消息实体
	err = binary.Write(pkg, binary.LittleEndian, []byte(message))
	if err != nil {
		return nil, err
	}
	return pkg.Bytes(), nil
}

// Decode 解码消息
func Decode(reader *bufio.Reader) (string, error) {
	// 读取消息的长度
	lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据
	lengthBuff := bytes.NewBuffer(lengthByte)
	var length int32
	err := binary.Read(lengthBuff, binary.LittleEndian, &length)
	if err != nil {
		return "", err
	}
	// Buffered返回缓冲中现有的可读取的字节数。
	if int32(reader.Buffered()) < length+4 {
		return "", err
	}

	// 读取真正的消息数据
	pack := make([]byte, int(4+length))
	_, err = reader.Read(pack)
	if err != nil {
		return "", err
	}
	return string(pack[4:]), nil
}

server服务端

package main

import (
	"bufio"
	"fmt"
	"io"
	"net"

	proto "go.study.com/hina/day01/day09/04nianbao/protocol"
)

// socket_stick/server/main.go

func process(conn net.Conn) {
	defer conn.Close()
	reader := bufio.NewReader(conn)
	for {
		recvStr, err := proto.Decode(reader)
		// n, err := reader.Read(buf[:])
		if err == io.EOF {
			break
		}
		if err != nil {
			fmt.Println("read from client failed, err:", err)
			break
		}
		fmt.Println("收到client发来的数据:", recvStr)
	}
}

func main() {

	listen, err := net.Listen("tcp", "127.0.0.1:30000")
	if err != nil {
		fmt.Println("listen failed, err:", err)
		return
	}
	defer listen.Close()
	for {
		conn, err := listen.Accept()
		if err != nil {
			fmt.Println("accept failed, err:", err)
			continue
		}
		go process(conn)
	}
}

client客户端

package main

import (
	"fmt"
	"net"

	proto "go.study.com/hina/day01/day09/04nianbao/protocol"
)

// socket_stick/client/main.go

func main() {
	conn, err := net.Dial("tcp", "127.0.0.1:30000")
	if err != nil {
		fmt.Println("dial failed, err", err)
		return
	}
	defer conn.Close()
	for i := 0; i < 20; i++ {
		msg := `Hello, Hello. How are you?`
		// 调用协议编码数据
		b, _ := proto.Encode(msg)
		conn.Write([]byte(b))
	}
}

udp

服务端server

package main

import (
	"fmt"
	"net"
	"strings"
)

// UDP server

func main() {
	conn, err := net.ListenUDP("udp", &net.UDPAddr{
		IP:   net.IPv4(127, 0, 0, 1),
		Port: 9000,
	})
	if err != nil {
		fmt.Printf("conn failed, err:%v\n", err)
		return
	}
	defer conn.Close()
	// 不需要建立链接,直接发数据
	var data [1024]byte
	for {
		n, addr, err := conn.ReadFromUDP(data[:])
		if err != nil {
			fmt.Printf("read failed, err:%v\n", err)
			return
		}
		fmt.Println(data[:n])
		reply := strings.ToUpper(string(data[:n]))
		// 发送数据
		conn.WriteToUDP([]byte(reply), addr)
	}
}

客户端client

package main

import (
	"bufio"
	"fmt"
	"net"
	"os"
)

// UDP client

func main() {
	socket, err := net.DialUDP("udp", nil, &net.UDPAddr{
		IP:   net.IPv4(127, 0, 0, 1),
		Port: 9000,
	})
	if err != nil {
		fmt.Printf("socket failed, err:%v\n", err)
		return
	}
	defer socket.Close()
	var reply [1024]byte
	reader := bufio.NewReader(os.Stdin)
	for {
		fmt.Print("发送到服务端:")
		msg, _ := reader.ReadString('\n')
		socket.Write([]byte(msg))
		// 收回复的数据
		n, _, err := socket.ReadFromUDP(reply[:])
		if err != nil {
			fmt.Printf("read failed, err:%v\n", err)
			return
		}
		fmt.Println("收到回复信息:", string(reply[:n]))
	}
}

上一篇:Go并发编程(三)context&waitgroup


下一篇:golang sync WaitGroup