Tcp服务器实现

 

目录

 

基本Tcp服务器

  • Server模块:服务器配置信息、启动、停止、运行、添加路由、路由管理器、链接管理器等。
  • 链接属性配置模块:启动链接、停止链接、获取链接对象等。
  • 消息封装:消息id、长度、内容、及消息的封包与解包等。
  • 基础路由模块:提供路由接口及基类,开启服务前必须继承基类实现重写基类中的方法再注册。
  • 全局配置模块:配置服务器的信息,使用前必须在根目录下配置json文件。
  • 多路由模块:使服务器根据数据不同类型做不同业务,提供路由注册、添加、与删除。
  • 读写分离:服务端读写分离。
  • 任务队列+工作池:服务端默认开启工作池,工作池中协程的数量与任务队列数量相同,任务队列为缓冲信道。
  • 链接管理:提供链接管理功能,链接的添加、删除、查找、清理等。

Tcp服务器实现

简单使用

1.配置json文件,名字为server.json。

{
  "Name":"Demo_Server",
  "Version": "tcp4",
  "Host":"127.0.0.1",
  "TcpPort":"7777",
  "MaxPackSize":"4096",
  "MaxConn":"5000"
}

2.服务端继承重写路由基类方法,业务方法。

3.配置封包与解包。

Tcp服务器实现
package pack

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "sync"
)

type Message struct {
    Msglen uint32

    Msgid uint32

    Msgdata []byte
}


//封包函数
func Pack(len uint32,id uint32,data []byte)([]byte,error) {
    var bufferPool = sync.Pool{
        New:func() interface{}{
            return new(bytes.Buffer)
        },
    }
    //获取一个存放bytes的缓冲区,存储字节序列
    dataBuff := bufferPool.Get().(*bytes.Buffer)
    //将数据长度写入字节流
    err :=  binary.Write(dataBuff,binary.LittleEndian,len)
    checkerr(err)
    //将id写入字节流
    err = binary.Write(dataBuff,binary.LittleEndian,id)
    checkerr(err)
    //将数据内容写入字节流
    err = binary.Write(dataBuff,binary.LittleEndian,data)
    checkerr(err)
    return dataBuff.Bytes(),nil

}
//解包函数
func Unpack(data []byte)(*Message,error){
    //这里可以不需要额外创建一个数据缓冲
    //创建一个io。Reader
    boolBuffer := bytes.NewReader(data)
    msg := &Message{}
    //读取数据长度和id
    err := binary.Read(boolBuffer, binary.LittleEndian, &msg.Msglen)
    checkerr(err)
    err = binary.Read(boolBuffer, binary.LittleEndian, &msg.Msgid)
    checkerr(err)
    //数据包限制
    //if
    //
    //}
    return msg,nil
}

func checkerr(err error){
    if err != nil{
        fmt.Println("数据写入与读取失败")
    }
}
封包与解包

4.创建服务端句柄。

5.注册路由。

6.运行服务器。

服务端示例

MyDemo/tcptest/pack:为封包与解包目录
Zinx_Server/znet为服务源码目录
package main

import (
	"MyDemo/tcptest/pack"
	"Zinx_Server/znet"
	"fmt"
)

type PingRouter struct{
	znet.BaseRouter
}

//Test Handle
func (this *PingRouter)Handle(request znet.IRequest){
	var id uint32 =0
	s := "hello client,i am Handle"
	//将字符串转为字节切片
	data :=  []byte(s)
	//对数据进行封包
	data,_ = pack.Pack(uint32(len(data)),id,data)
	_,err := request.GetConnecttion().GetTcpConnecttion().Write(data)
	if err != nil {
		fmt.Println("call back  ping error")
	}
}

func main(){
	//创建Server句柄
	Server := znet.NerServer()
	//注册路由1:客户端数据类型为1,则由服务端注册的此条路由处理
	//所有注册的类都应该继承BaseRouter类并重写,而BaseRouter类则实现了IRouter接口。
	Server.AddRouter(1,&PingRouter{})
	//注册路由2:客户端数据类型为2,则由服务端注册的此条路由处理
	//Server.AddRouter(2,&test{})
	Server.Run()
}

客户端示例:

package main

import (
	"MyDemo/tcptest/pack"
	"fmt"
	"net"
	"time"
)

func main() {
	//1.创建链接远程链接服务器,得到一个conn链接
	conn, err := net.Dial("tcp", "127.0.0.1:7777")
	if err != nil {
		fmt.Println("client start err,exit!")
		return
	}
	i := 1
	var id uint32 = 1
	for {
		//2.调用链接Write写数据
		s := "Hello Server ,i am clent2"
		//将字符串转为字节切片
		data :=  []byte(s)
		//对数据进行封包
		data,_ = pack.Pack(uint32(len(data)),id,data)
		_, err := conn.Write(data)
		if err != nil {
			fmt.Println("write conn err", err)
			return
		}

		buf := make([]byte, 512)
		_,err = conn.Read(buf)
		//解包tcp数据
		msg,_ := pack.Unpack(buf)
		fmt.Printf("recv msg id:%d\n", msg.Msgid)
		fmt.Printf("recv msg len:%d\n", msg.Msglen)
		fmt.Printf("recv buf:%s\n",string(buf[8:8+msg.Msglen]))
		if err != nil {
			fmt.Println("read buf err")
			return
		}
		//fmt.Printf("Server call back:%s,cnt = %d\n", buf, cnt)
		i++
		time.Sleep(5 *time.Second)
	}
}

服务端运行:

Tcp服务器实现

 

 

源码

文件目录

Tcp服务器实现

 

 

 server.go

Tcp服务器实现
package znet

import (
    "fmt"
    "net"
)
type IServer interface {
    //启动
    Start()
    //停止
    Stop()
    //运行
    Run()
    //路由功能:给当前服务注册一个路由方法,供客户端链接处理使用
    AddRouter(index uint32,router IRouter)
    //得到路由管理器
    GetRoutermanger()IRoutermanger
    //得到链接管理器
    GetConnManager()IConnManager
}

//IServer接口的实现,定义一个Server的服务模块
type Server struct{
    //服务器的名称
    Name string
    //IP版本
    IPversion string
    //服务器的IP
    IP string
    //服务器的端口
    Port string
    //最大链接数
    MaxConn int64
    //路由管理器
    Routermange IRoutermanger
    //链接管理器
    ConnManager IConnManager
    //工作池
    Pool *WorkerPool
}

//启动服务器
func (s *Server)Start(){
        s.Pool.StartWorkPool()
        //在这里使用go协程让服务端阻塞接收客户端的消息为异步。
        //1.获取一个tcp的addr
        addr, err := net.ResolveTCPAddr(s.IPversion, fmt.Sprintf("%s:%s", s.IP, s.Port))
        if err != nil {
            fmt.Println("resolve tcp addr error:", err)
            return
        }
        //2.监听服务器的地址
        listenner, err := net.ListenTCP(s.IPversion, addr)
        if err != nil {
            fmt.Printf("listen:%s err:%s", s.IPversion, err)
            return
        }
        fmt.Println("Start Zinx serversucc:", s.Name)

        var cid uint32
        cid = 0
        go func() {
        //3.阻塞的等待客户端链接,触处理客户端链接业务
        for {
            //如果有客户端链接过来,阻塞会返回
            conn, err := listenner.AcceptTCP()
            if err != nil {
                fmt.Println("Accept err", err)
                continue
            }
            //判断是否超过最大链接数
            if int64(s.ConnManager.GetConnLen()) >= s.MaxConn{
                err := conn.Close()
                if err != nil{
                    fmt.Println("拒绝用户失败!")
                }
            }else{
                //将每个客户端链接与服务端路由模块进行绑定
                //c:封装每个客户端链接信息,绑定链接、id、及服务器的路由管理器
                Conn := NewConnection(s,conn,cid,s.Routermange)
                cid++
                //启动链接进行业务处理
                s.Pool.Put(Conn)
            }
        }
    }()


}
//停止服务器
func (s *Server)Stop(){
    s.ConnManager.ClearConn()
    //将服务器的资源、状态或者已经开辟的链接信息回收

}
//运行服务器
func (s *Server)Run(){
    //启动Server的服务功能
    s.Start()

    //做一些启动服务器之后的业务//

    //阻塞状态
    select {}
}
//添加路由功能
func (s *Server)AddRouter(index uint32,router IRouter){
    s.Routermange.AddRouter(index,router)
    fmt.Println("Add Router Succ!!!")
}

func (s *Server)GetRoutermanger()IRoutermanger{
    return s.Routermange
}

func (s *Server)GetConnManager()IConnManager{
    return s.ConnManager
}

//初始化Server模块方法
func NerServer() *Server{
    config := NewConfig()
    s := &Server{
        Name :config.Name,
        IPversion:config.Version,
        IP:config.Host,
        Port:config.TcpPort,
        MaxConn:config.MaxConn,
        Routermange:NewRoutermange(),
        ConnManager:NewConnManger(),
        Pool:NewWorkerPool(2,10),
    }
    return s
}
View Code

connection.go

Tcp服务器实现
package znet

import (
    "fmt"
    "net"
)

type IConnection interface {
    //启动链接:让当前链接开始工作
    Start()
    //停止链接:结束当前链接的工作
    Stop()
    //获取当前链接的绑定socket conn
    GetTcpConnecttion() *net.TCPConn
    //获取当前链接模块的ID
    GetConnID() uint32
    //获取远程客户端tpc的状态、ip、port
    GetRemoteAddr() net.Addr
    //发送数据、将数据发送给远程的客户端
    Send(data []byte) error
}
//链接模块
type Connection struct {
    //当前链接绑定的服务端
    Server IServer
    //当前链接的socket tcp套接字
    Conn *net.TCPConn
    //链接的ID
    ConnID uint32

    //当前的链接状态
    ConnStatus bool

    //告知当前链接已经退出的/停止的channel
    ExitChan chan bool

    //路由管理器
    Routermange IRoutermanger

    //读协程与写协程之间数据信道
    done chan IRequest
}


func (c *Connection)StartReader() {
    go c.Reader()
    go c.Write()
}

//读
func (c *Connection)Reader() {
    fmt.Println("Reader Goroutine is running")
    defer fmt.Println("connID=",c.ConnID,"Reader is exit,remote addr is",c.GetRemoteAddr().String())
    for{
        buf := make([]byte,1024)
        _,err := c.Conn.Read(buf)
        if err != nil{
            fmt.Printf("recv buf err %s\n",err)
            c.ExitChan <-true //通知Write,客户端断开链接
            break
        }
        c.ExitChan <- false //如果客户端没有断开链接,向信道发送信息,继续执行解包
        //将数据解包,得到结构体对象
        // msg:封装数据长度、数据id(类型)、数据内容
        msg := NewMessage(buf)
        //将当前链接信息和请求数据封装
        req :=NewRequest(c,msg)
        //将解包后的数据传入信道,数据传入信道后会也会阻塞,直到信道中的数据被取走
        c.done <- req
    }
}
//写
func (c *Connection)Write(){
    fmt.Println("Writer Goroutine is running")
    for{
        //信道阻塞,只有从信道中接收到信息才能往下执行
        status := <- c.ExitChan
        if status{
            c.Stop()
            break
        } else{
            req := <-c.done
            //执行注册的路由方法,每次将req传入
            go func(request IRequest){
                //获取数据类型,查询路由表执行不同的路由
                id := req.Getdata().Getmsgid()
                c.Routermange.IndexRouter(id).Handle(request)
            }(req)
        }
    }
}

//启动链接:让当前链接开始准备工作
func (c *Connection)Start(){
    fmt.Println("Start Conn,ConnID:",c.ConnID)
    //TODO 启动从当前链接写数据的业务
    go c.StartReader()
}
//停止链接:结束当前链接的工作
func (c *Connection)Stop(){
    fmt.Println("Stop Conn,ConnID:",c.ConnID)
    if c.ConnStatus == true{
        return
    }
    //设置状态
    c.ConnStatus = false
    //关闭链接
    c.Conn.Close()
    c.Server.GetConnManager().Remove(c.ConnID)
    //回收资源
    close(c.ExitChan)
    close(c.done)
}
//获取当前链接的绑定socket conn
func (c *Connection)GetTcpConnecttion() *net.TCPConn{
    return c.Conn
}
//获取当前链接模块的ID
func (c *Connection)GetConnID() uint32{
    return c.ConnID
}
//获取远程客户端tpc的状态、ip、port
func (c *Connection)GetRemoteAddr() net.Addr{
    return c.Conn.RemoteAddr()
}
//发送数据、将数据发送给远程的客户端
func (c *Connection)Send(data []byte) error{
    _,err := c.Conn.Write([]byte(fmt.Sprintf("Hello")))
    return  err
}

//此方法用来初始化每一个客户端链接
func NewConnection(Server IServer,conn *net.TCPConn,ConnID uint32,Routermange IRoutermanger)*Connection{
    c := & Connection{
        Server:Server,
        Conn:conn,
        ConnID:ConnID,
        ConnStatus:false,
        Routermange:Routermange,
        ExitChan:make(chan bool,1),
        done:make(chan IRequest),
    }
    Server.GetConnManager().Add(c.ConnID,c)
    return c
}
View Code

connectmager.go

Tcp服务器实现
package znet

import (
    "errors"
    "sync"
)

type IConnManager interface {
    Add(connID uint32,conn IConnection)
    Remove(connID uint32)
    GetConn(connID uint32)(IConnection,error)
    GetConnLen() int
    ClearConn()
}
type ConnManger struct {
    ConnMangerMap   map[uint32]IConnection
    connLock  *sync.RWMutex
}
//添加
func (M *ConnManger)Add(connID uint32,conn IConnection){
    M.connLock.Lock() //加写锁
    defer M.connLock.Unlock()//解写锁
    M.ConnMangerMap[connID] = conn

}
//删除
func (M *ConnManger)Remove(connID uint32){
    M.connLock.Lock() //加写锁
    defer M.connLock.Unlock()//解写锁
    delete(M.ConnMangerMap,connID)
}
//得到链接
func (M *ConnManger)GetConn(connID uint32)(IConnection,error){
    M.connLock.RLock() //加读锁
    defer M.connLock.RUnlock()//解读锁
    if conn,ok := M.ConnMangerMap[connID];ok{
        return conn,nil
    }else{
        return nil,errors.New("connnection not Found!")
    }

}
//得到链接个数
func (M *ConnManger)GetConnLen()int{
    return len(M.ConnMangerMap)
}

func (M *ConnManger)ClearConn(){
    M.connLock.Lock() //加写锁
    defer M.connLock.Unlock()//解写锁
    for ConnID,conn := range M.ConnMangerMap{
        //停止链接
        conn.Stop()
        //删除链接
        delete(M.ConnMangerMap,ConnID)
    }
}

func NewConnManger() *ConnManger{
    return &ConnManger{
        make(map[uint32]IConnection),
        new(sync.RWMutex),
    }
}
View Code

datapack.go

Tcp服务器实现
package znet

import (
    "bytes"
    "encoding/binary"
    "fmt"
)

type IDatapack interface {
    //获取包的头长度方法
    GetHead() uint32
    //封包
    Pack(msg IMessage)([]byte,error)
    //解包
    Unpack(data []byte)(Message,error)
}

type Datapack struct {}

func (dp *Datapack)GetHead() uint32 {
    //4字节长度+4字节长度
    return 8

}

func (dp *Datapack)Pack(msg IMessage)([]byte,error) {
    //创建一个存放bytes的缓冲
    dataBuff := bytes.NewBuffer([]byte{})
    //将数据长度写入字节流
    err :=  binary.Write(dataBuff,binary.LittleEndian,msg.Getmsglen())
    checkerr(err)
    //将id写入字节流
    err = binary.Write(dataBuff,binary.LittleEndian,msg.Getmsgid())
    checkerr(err)
    //将数据内容写入字节流
    err = binary.Write(dataBuff,binary.LittleEndian,msg.Getmsg())
    checkerr(err)
    return dataBuff.Bytes(),nil

}
func (dp *Datapack)Unpack(data []byte)(*Message,error){
    //这里可以不需要额外创建一个数据缓冲
    //创建一个io。Reader
    boolBuffer := bytes.NewReader(data)
    msg := &Message{}
    //读取数据长度和id
    err := binary.Read(boolBuffer, binary.LittleEndian, &msg.Msglen)
    checkerr(err)
    err = binary.Read(boolBuffer, binary.LittleEndian, &msg.Msgid)
    checkerr(err)
    //数据包限制
    //if
    //
    //}
    return msg,nil
}

func NewDatapack()*Datapack{
    return &Datapack{}
}

func checkerr(err error){
    if err != nil{
        fmt.Println("数据写入与读取失败")
    }
}
View Code

message.go

Tcp服务器实现
package znet

import (
    "fmt"
)

type IMessage interface {
    //获取消息的内容
    Getmsg() []byte
    //获取消息的长度
    Getmsglen() uint32
    //获取消息的ID
    Getmsgid() uint32
    //设置消息的ID
    Setmsgid(id uint32)
    //设置消息的内容
    Setmsg(data []byte)
}

type Message struct {
    //TLV格式数据
    //消息长度
    Msglen uint32
    //消息ID
    Msgid uint32
    //消息内容
    data []byte
}

func (M *Message)Getmsg() []byte{
    return M.data
}
func (M *Message)Getmsglen() uint32{
    return M.Msglen
}
func (M *Message)Getmsgid() uint32{
    return M.Msgid
}
func (M *Message)Setmsgid(id uint32){
    M.Msgid = id
}
func (M *Message)Setmsg(data []byte){
    M.data = data
}

func NewMessage(data []byte)*Message{
    pack := NewDatapack()
    msg,_ := pack.Unpack(data)
    fmt.Printf("recv buf:%s\n",string(data[8:8+msg.Msglen]))
    msg.data = data[8:8+msg.Msglen]
    return msg
}
View Code

request.go

Tcp服务器实现
package znet


type IRequest interface {
    //得到请求数据封装
    Getdata() IMessage
    //得到当前链接封装
    GetConnecttion() IConnection
}

type Request struct {
    //已经和客户端建立链接对象
     conn IConnection
     //客户端请求数据
     Msg IMessage
}

//得到请求数据
func (r *Request)Getdata()IMessage{
    return r.Msg
}
//得到当前链接
func (r *Request)GetConnecttion() IConnection{
    return r.conn
}
//初始化Request对象
func NewRequest(conn IConnection,Msg IMessage) *Request{
    r := &Request{
        conn:conn,
        Msg:Msg,
    }
    return r
}
View Code

router.go

Tcp服务器实现
package znet


type IRouter interface {
    //Hook机制:其主要思想是提前在可能增加功能的地方埋好(预设)一个钩子,这个钩子并没有实际的意义,当我们需要重新修改或者增加这个地方的逻辑的时候,把扩展的类或者方法挂载到这个点即可。
    //在处理conn业务之前的钩子方法Hook
    PreHandle(request IRequest)
    //在处理conn业务主方法Hook
    Handle(request IRequest)
    //在处理conn业务之后的钩子方法Hook
    PostHandle(request IRequest)
}

//基类实现所有接口方法,但不是不具体写死,然后子类继承基类,重写基类方法
type BaseRouter struct{}
//在处理conn业务之前的钩子方法Hook
func (br *BaseRouter)PreHandle(request IRequest){}
//在处理conn业务主方法Hook
func (br *BaseRouter)Handle(request IRequest){}
//在处理conn业务之后的钩子方法Hook
func (br *BaseRouter)PostHandle(request IRequest){}
View Code

routermange.go

Tcp服务器实现
package znet

type IRoutermanger interface {
    //返回索引对应的路由
    IndexRouter(index uint32)IRouter
    //添加路由
    AddRouter(index uint32, router IRouter)
    ////删除路由
    DeleteRouter(index uint32)
}

type Routermange struct {
    //路由映射
    Routermap map[uint32]IRouter
    //负责worker取任务的消息队列

    //业务工作worker池的worker数量


}
//返回索引对应的路由,服务端通过消息绑定的ID(类型),可以执行对应不同的注册路由函数
func (R * Routermange)IndexRouter(index uint32)IRouter{
    return R.Routermap[index]
}
//添加路由
func (R * Routermange)AddRouter(index uint32,router IRouter){
    R.Routermap[index] = router
}
////删除路由
func (R * Routermange)DeleteRouter(index uint32){
    delete(R.Routermap,index)
}
//初始化路由
func NewRoutermange()*Routermange{
    return &Routermange{
        make(map[uint32]IRouter),
    }
}
View Code

config.go

Tcp服务器实现
package znet

import (
    "encoding/json"
    "fmt"
    "os"
    "strconv"
)

type Config struct {
    Name  string
    Version string
    Host   string
    TcpPort string
    MaxPackSize int64
    MaxConn   int64

}

func (c *Config)setname(Name string){
    c.Name = Name
}
func (c *Config)setVersion(Version string){
    c.Version = Version
}

func (c *Config)setHost(Host string){
    c.Host = Host
}

func (c *Config)setTcpPort(TcpPort string){
    c.TcpPort = TcpPort
}
func (c *Config)setMaxConn(MaxConn   int64){
    c.MaxConn = MaxConn
}

func (c *Config)setMaxPackSize(MaxPackSize   int64){
    c.MaxPackSize = MaxPackSize
}


func NewConfig() *Config {
    //打开json文件
    srcFile,err := os.Open("D:/Go/test/src/Zinx_Server/znet/server.json")
    if err != nil{
        fmt.Println("文件打开失败,err=",err)
        return nil
    }
    defer srcFile.Close()
    //创建接收数据的map类型数据
    datamap := make(map[string]string)
    //创建解码器
    decoder := json.NewDecoder(srcFile)
    //解码
    err = decoder.Decode(&datamap)
    if err != nil{
        fmt.Println("解码失败,err:",err)
        return nil
    }
    MaxConn,err := strconv.ParseInt(datamap["MaxConn"], 10, 64)
    MaxPackSize,err := strconv.ParseInt(datamap["MaxPackSize"],10,64)
    c := &Config{
        datamap["Name"],
        datamap["Version"],
        datamap["Host"],
        datamap["TcpPort"],
        MaxPackSize,
        MaxConn,
    }
    return c
}
View Code

WorkerPool.go

Tcp服务器实现
package znet




//限制
//1.worker工作池的任务队列的最大值
//2.任务队列中任务的最大数量
//协程池
type WorkerPool struct {
    cap            int
    tasksSize   int
    TaskQueue []chan IConnection //信道集合

}

//启动一个worker工作池,开启工作池只能发生一次
func (W *WorkerPool)StartWorkPool(){
    //根据任务队列的大小,分别开启worker,每个worker用go来承载,每一个worker对应一个任务队列
    for i:=0;i<W.cap;i++{
        //为每个worker开辟缓冲信道(任务队列)
        W.TaskQueue[i] = make(chan IConnection,W.tasksSize)
        //启动worker,阻塞等待任务从channel中到来
        go W.StartOneWorker(i,W.TaskQueue[i])
    }
}

func (W *WorkerPool)StartOneWorker(id int,taskqueue chan IConnection){

    for{
        select {
        case request :=<- taskqueue:
            //如果有消息过来,则处理业务
            request.Start()
        default:
            continue
        }
    }
}
func (W *WorkerPool)Put(Connection IConnection){
    index :=  Connection.GetConnID()%uint32(W.cap)
    W.TaskQueue[index] <- Connection
}

func  NewWorkerPool(cap int,len int)*WorkerPool{
    return &WorkerPool{
        cap:cap,
        tasksSize:len,
        TaskQueue:make([]chan IConnection,cap),
    }
}
View Code

 

上一篇:2018-7-18 STM32 开发基础


下一篇:【STM32H7教程】第24章 STM32H7的Cache解读(非常重要)