GRPC流模式

简单模式

又称为一元 RPC,类似于常规的 http 请求,客户端发送请求,服务端响应请求

 

服务端流模式

stream.proto

syntax = "proto3";

option go_package=".;proto";

service Greeter {
  rpc GetStream(StreamReqData) returns (stream StreamResData) {} // 服务端流模式
  rpc PutStream(stream StreamReqData) returns (StreamResData) {} // 客户端流模式
  rpc AllStream(stream StreamReqData) returns (stream StreamResData) {} // 双向流模式
}

message StreamReqData {
  string data = 1;
}

message StreamResData {
  string data = 1;
}

//protoc --go_out=. --go_opt=paths=source_relative \
//--go-grpc_out=. --go-grpc_opt=paths=source_relative \
//stream.proto

 

server.go

type server struct {
    proto.UnimplementedGreeterServer
}

//GetStream 服务端流模式实现
func (s *server) GetStream(data *proto.StreamReqData, streamServer proto.Greeter_GetStreamServer) error {
    for i := 0; i < 10; i++ {
        _ = streamServer.Send(&proto.StreamResData{
            Data: data.Data+strconv.Itoa(i),
        })
        time.Sleep(time.Second * 2)
    }
    return nil
}

func main() {
    lis,err := net.Listen("tcp",PORT)
    if err != nil {
        panic(err)
    }

    s := grpc.NewServer()

    proto.RegisterGreeterServer(s,&server{})

    err = s.Serve(lis)

    if err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

 

client.go

func main() {
    conn,err := grpc.Dial(":50052",grpc.WithInsecure())
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    c := proto.NewGreeterClient(conn)
    res,_ := c.GetStream(context.Background(),&proto.StreamReqData{
        Data: "看看",
    })

    for {
        a,err := res.Recv()
        if err != nil {
            panic(err)
        }
        fmt.Println(a)
    }
}

 

客户端流模式

server.go

// PutStream 客户端流模式
func (s *server) PutStream(streamServer proto.Greeter_PutStreamServer) error {
    for{
        if a,err := streamServer.Recv(); err != nil {
            fmt.Println(err.Error())
            break
        } else {
            fmt.Println(a)
        }
    }
    return nil
}

 

client.go

func main() {
    conn,err := grpc.Dial(":50052",grpc.WithInsecure())
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    c := proto.NewGreeterClient(conn)
    res,_ := c.PutStream(context.Background())

    for i := 0; i < 10; i++ {
        res.Send(&proto.StreamReqData{
            Data: "我来自客户端"+strconv.Itoa(i),
        })
        time.Sleep(time.Second)
    }
}

 

双向流模式

serer.go

func (s *server) AllStream(streamServer proto.Greeter_AllStreamServer) error {
    wg := sync.WaitGroup{}
    wg.Add(2)

    // 收客户端消息
    go func() {
        for {
            data,_ := streamServer.Recv()
            if data != nil {
                fmt.Println(data.Data)
            }
        }
        wg.Done()
    }()

    // 发客户端消息
    go func() {
        for i := 0; i < 10; i++ {
            _ = streamServer.Send(&proto.StreamResData{
                Data: "我来自服务端" + strconv.Itoa(i),
            })
            time.Sleep(time.Second)
        }
        wg.Done()
    }()
    wg.Wait()
    return nil
}

 

client.go

func main() {
    conn,err := grpc.Dial(":50052",grpc.WithInsecure())
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    c := proto.NewGreeterClient(conn)
    res,_ := c.AllStream(context.Background())

    wg := sync.WaitGroup{}
    wg.Add(2)

    // 收服务端消息
    go func() {
        wg.Done()
        for {
            data,_ := res.Recv()
            if data != nil {
                fmt.Println(data.Data)
            }
        }
    }()

    // 发服务端消息
    go func() {
        for i := 0; i < 10; i++ {
            _ = res.Send(&proto.StreamReqData{
                Data: "我来自客户端" + strconv.Itoa(i),
            })
            time.Sleep(time.Second)
        }
        wg.Done()
    }()
    wg.Wait()
}

 

上一篇:npm i 报错


下一篇:Android Studio 报错:Could not determine the dependencies of task ‘...‘.Installed Build Tools revision