Go:54---TCP通信(附TCP黏包)

一、net包

二、TCP通信演示

TCP服务端

  • Go语言中提供了goroutine的概念,服务端可以为每个客户端都开启一个goroutine去执行
  • TCP服务端程序的处理流程:
    • 监听端口
    • 接收客户端请求建立链接
    • 创建goroutine处理链接。
  • 使用net包实现的TCP服务端代码如下:
package main

import (
    "fmt"
    "net"
    "bufio"
)

// 处理客户端请求函数
func handler(conn net.Conn) {
    defer conn.Close()

    for {
        // 先接收数据
        reader := bufio.NewReader(conn)
        var buf [128]byte
        n, err := reader.Read(buf[:])
        if err != nil {
            fmt.Println("read failed, err: ", err)
            return
        }
        recvStr := string(buf[:n])
        fmt.Println("收到客户端数据: ", recvStr)

        // 接收到之后返回给客户端
        conn.Write([]byte(recvStr))
    }
}

func main() {
    // 1.监听
    listen, err := net.Listen("tcp", "127.0.0.1:9999")
    if err != nil {
        fmt.Println("listen failed, err: ", err)
        return
    }

    // 循环接收客户端的连接
    for {
        conn, err := listen.Accept()
        if err != nil {
            fmt.Println("accept faied, err: ", err)
            return
        }
        
        go handler(conn)
    }
}

TCP客户端

  • 一个TCP客户端进行TCP通信的流程如下:
    • 建立与服务端的链接
    • 进行数据收发
    • 关闭链接
  • 使用net包实现的TCP客户端代码如下:
package main

import (
    "fmt"
    "net"
    "os"
    "bufio"
    "strings"
)

func main() {
    // 拨号连接
    conn, err := net.Dial("tcp", "127.0.0.1:9999")
    if err != nil {
        fmt.Println("connect failed, err: ", err)
        return
    }
    defer conn.Close()

    // 循环读写数据
    inputReader := bufio.NewReader(os.Stdin)
    for {
        // 先发送数据
        input, _ := inputReader.ReadString('\n')
        inputInfo := strings.Trim(input, "\r\n")
        if strings.EqualFold(inputInfo, "quit") {
            fmt.Println("quit!")
            return
        }
        _, err := conn.Write([]byte(inputInfo))
        if err != nil {
            fmt.Println("write failed, err: ", err)
            return
        }
        
        // 再接收数据
        buf := [512]byte{}
        n, err := conn.Read(buf[:])
        if err != nil {
            fmt.Println("read failed, err: ", err)
            return
        }
        fmt.Println(string(buf[:n]))
    }
}
  • 中间启动服务端,左右两侧开启客户端,可以看到实验成功

Go:54---TCP通信(附TCP黏包)

三、解决TCP黏包

黏包演示

  • 服务端代码:其监听并接收客户端的连接,连接之后可以获取客户端发送过来的消息
package main

import (
	"io"
	"fmt"
	"net"
	"bufio"
)

func handler(conn net.Conn) {
	defer conn.Close()

	reader := bufio.NewReader(conn)
	var buf [1024]byte

	for {
		n, err := reader.Read(buf[:])
		if err == io.EOF {
			break
		}
		if err != nil {
			fmt.Println("recv failed, err: ", err)
			return
		}
		recvStr := string(buf[:n])
		fmt.Println("recv: ", recvStr)
	}
}


func main() {
	// 1.监听
	listen, err := net.Listen("tcp", "127.0.0.1:9999")
	if err != nil {
		fmt.Println("listen failed, err: ", err)
		return
	}
	defer listen.Close()

	// 循环接受客户端的连接
	for {
		conn, err := listen.Accept()
		if err != nil {
			fmt.Println("accept failed, err: ", err)
			return
		}
		go handler(conn)
	}
}
  • 客户端:客户端比较简单,其与服务端建立连接,然后使用for循环迅速发送10条数据
package main

import (
    "fmt"
    "net"
)

func main() {
    // 拨号连接服务端
    conn, err := net.Dial("tcp", "127.0.0.1:9999")
    if err != nil {
        fmt.Println("dial failed, err: ", err)
        return
    }
    defer conn.Close()

    for i := 0; i < 20; i++ {
        msg := "Hello Word, hello world!!!"
        conn.Write([]byte(msg))
    }
}
  • 先启动服务端:

Go:54---TCP通信(附TCP黏包)

  • 再启动客户端:可以看到虽然我们客户端是显式的发送20条数据,但是服务端再接收的时候缺只接收了2次

Go:54---TCP通信(附TCP黏包)

  • 从上面可以看出客户端的数据黏在一起了

为什么会出现黏包?

  • 主要原因就是tcp数据传递模式是流模式,在保持长连接的时候可以进行多次的收和发
  • “粘包”可发生在发送端也可发生在接收端:
    • 由Nagle算法造成的发送端的粘包:Nagle算法是一种改善网络传输效率的算法。简单来说就是当我们提交一段数据给TCP发送时,TCP并不立刻发送此段数据,而是等待一小段时间看看在等待期间是否还有要发送的数据,若有则会一次把这两段数据发送出去。
    • 接收端接收不及时造成的接收端粘包:TCP会把接收到的数据存在自己的缓冲区中,然后通知应用层取数据。当应用层由于某些原因不能及时的把TCP的数据取出来,就会造成TCP缓冲区中存放了几段数据。

缓慢发送数据避免黏包

  • 从上面的演示案例可以看出,出现黏包是因为客户端发送数据过快,一种比较简单的做法是减缓客户端数据的发送,下面我们让客户端发送一次数据休眠一小会儿,这种做法不是解决黏包的办法,此处我们只是为了演示而已
  • 服务端代码不变,只要编辑客户端代码即可,更新之后的客户端代码如下
package main

import (
    "fmt"
    "net"
    "time"
)

func main() {
    // 拨号连接服务端
    conn, err := net.Dial("tcp", "127.0.0.1:9999")
    if err != nil {
        fmt.Println("dial failed, err: ", err)
        return
    }
    defer conn.Close()

    for i := 0; i < 20; i++ {
        msg := "Hello Word, hello world!!!"
        conn.Write([]byte(msg))
        // 每次发送完数据休眠一小会儿
        time.Sleep(time.Second)
    }
}
  •  先启动服务端:

Go:54---TCP通信(附TCP黏包)

  • 再启动客户端:可以看到服务端顺序的接收到了客户端的数据,没有出现黏包现象

Go:54---TCP通信(附TCP黏包)

通过编解码解决黏包问题

  • 创建proto包:定义两个函数,分别对数据进行编码和解码
package proto

import (
	"bufio"
	"bytes"
	"encoding/binary"
)

// Encode 将消息编码
func Encode(message string) ([]byte, error) {
	// 读取消息的长度,转换成int32类型(占4个字节)
	var length = int32(len(message))
	var pkg = new(bytes.Buffer)
	// 写入消息头
	err := binary.Write(pkg, binary.LittleEndian, length)
	if err != nil {
		return nil, err
	}
	// 写入消息实体
	err = binary.Write(pkg, binary.LittleEndian, []byte(message))
	if err != nil {
		return nil, err
	}
	return pkg.Bytes(), nil
}

// Decode 解码消息
func Decode(reader *bufio.Reader) (string, error) {
	// 读取消息的长度
	lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据
	lengthBuff := bytes.NewBuffer(lengthByte)
	var length int32
	err := binary.Read(lengthBuff, binary.LittleEndian, &length)
	if err != nil {
		return "", err
	}
	// Buffered返回缓冲中现有的可读取的字节数。
	if int32(reader.Buffered()) < length+4 {
		return "", err
	}

	// 读取真正的消息数据
	pack := make([]byte, int(4+length))
	_, err = reader.Read(pack)
	if err != nil {
		return "", err
	}
	return string(pack[4:]), nil
}
  • 重构服务端代码如下
package main

import (
    "fmt"
    "io"
    "net"
    "bufio"
    "proto"
)

func process(conn net.Conn) {
	defer conn.Close()
	reader := bufio.NewReader(conn)
	for {
		msg, err := proto.Decode(reader)
		if err == io.EOF {
			return
		}
		if err != nil {
			fmt.Println("decode msg failed, err:", err)
			return
		}
		fmt.Println("收到client发来的数据:", msg)
	}
}

func main() {

	listen, err := net.Listen("tcp", "127.0.0.1:30000")
	if err != nil {
		fmt.Println("listen failed, err:", err)
		return
	}
	defer listen.Close()
	for {
		conn, err := listen.Accept()
		if err != nil {
			fmt.Println("accept failed, err:", err)
			continue
		}
		go process(conn)
	}
}
  • 重构客户端代码如下
package main

import (
    "fmt"
    "net"
    "proto"
)

func main() {
	conn, err := net.Dial("tcp", "127.0.0.1:30000")
	if err != nil {
		fmt.Println("dial failed, err", err)
		return
	}
	defer conn.Close()
	for i := 0; i < 20; i++ {
		msg := `Hello, Hello. How are you?`
		data, err := proto.Encode(msg)
		if err != nil {
			fmt.Println("encode msg failed, err:", err)
			return
		}
		conn.Write(data)
	}
}
  • 测试如下:
    • 左侧运行服务端,右侧运行客户端
    • proto包是自己定义的,运行的时候自己导入正确的路径

Go:54---TCP通信(附TCP黏包) 

上一篇:spark执行优化——依赖上传到HDFS(spark.yarn.jar和spark.yarn.archive的使用)


下一篇:定义新运算(YZOJ-1062)