gRPC之Server Streaming

gRPC是什么?
简单来说,gRPC是google开发的一款开源RPC通讯框架,支持request/reponse模式、client stream模式、server stream模式和双向模式。
(双向(bi-directional)模式,感觉和传统的web socket很像)

现实场景中我们经常会遇到client端的一个请求,会在server端执行多个长耗时的任务,client端只能等待server全部完成。如果我们要在client实时端显示server端的进度,该如何实现呢? 本文将使用.net core+gRPC的server stream给出实现代码。

vscode安装语法扩展
gRPC之Server Streaming

1) .net core创建server端gRPC服务
dotnet new grpc -o server
cd server
dotnet new gitignore

添加gRPC定义文件:charging.proto

syntax = "proto3";

option csharp_namespace = "server";

package charging;

service Charging {
    rpc ReportProgress (ProgressRequest) returns (stream ProgressResponse);
}

message ProgressRequest {
    bool continue = 1;
}

message ProgressResponse {
    int32 percentage = 1;
}

打开server.csproj,加入以下配置

<ItemGroup>
    <Protobuf Include="Protos\charging.proto" GrpcServices="Server" />
  </ItemGroup>

运行dotnet build,在obj文件夹下生成Charging代码

gRPC之Server Streaming

Services文件夹下添加一个新的服务:ChargingService.cs

using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;

namespace server
{
    public class ChargingService : Charging.ChargingBase
    {
        public override async Task ReportProgress(
              ProgressRequest request, 
              IServerStreamWriter<ProgressResponse> responseStream, 
              ServerCallContext context)
        {
            int percentage = 0;           
            try
            {                
                while (!context.CancellationToken.IsCancellationRequested)
                {
                    Thread.Sleep(5000);
                    percentage += 20;
                    await responseStream.WriteAsync(new ProgressResponse { 
                     Percentage = percentage
                    });
                }
            }
            catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
            {
                Console.WriteLine("Operation Cancelled.");
            }

            Console.WriteLine("Processing Complete.");
        }
    }
}

打开startup.cs,加入

endpoints.MapGrpcService<ChargingService>();

建立Client端
参考: https://docs.microsoft.com/en-us/aspnet/core/tutorials/grpc/grpc-start?view=aspnetcore-6.0&tabs=visual-studio-code

dotnet new console -o client
cd client

dotnet add client.csproj package Grpc.Net.Client
dotnet add client.csproj package Google.Protobuf
dotnet add client.csproj package Grpc.Tools

把server端的proto文件copy到client端,修改namespace

option csharp_namespace = "client";

csproj加入Protobuf定义

<ItemGroup>
    <Protobuf Include="Protos\charging.proto" GrpcServices="Client" />
  </ItemGroup>

dotnet build 生成ChargingClient对象
最后program.cs加入以下代码

using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Net.Client;
using client;

namespace client
{
    class Program
    {
        static void Main(string[] args)
        {
            GetServerProgress();
        }

        static void GetServerProgress()
        {
            using var channel = GrpcChannel.ForAddress("https://localhost:5001");
            var client = new client.Charging.ChargingClient(channel); //GrpcGreeterClient.Greeter.GreeterClient(channel);
            var request = new client.ProgressRequest { Continue = true };
            var cancellationToken = new CancellationTokenSource();

            try
            {
                AsyncServerStreamingCall<ProgressResponse> response = 
                    client.ReportProgress(request, 
                      cancellationToken: cancellationToken.Token);

                // loop through each object from the ResponseStream
                while (response.ResponseStream.MoveNext().Result)
                {
                    // fetch the object currently pointed
                    var current = response.ResponseStream.Current;
                    if(current.Percentage >= 100)
                    {
                        cancellationToken.Cancel();
                    }

                    // print it
                    Console.WriteLine($"{current.Percentage}%");
                }
            }
            catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
            {
                Console.WriteLine("Operation Cancelled.");
            }

            Console.ReadLine();
        }
    }
}

server端dotnet run,
Client端dotnet run

gRPC之Server Streaming

只是这种模式必须通过Client发送CalcellationToken来结束,而结束时会引发exception。 还没发现有更优雅的结束方式。

代码:
https://github.com/992990831/gRPC

参考:
https://docs.microsoft.com/en-us/aspnet/core/grpc/services?view=aspnetcore-6.0
https://referbruv.com/blog/posts/implementing-stream-based-communication-with-grpc-and-aspnet-core

上一篇:Mysql UDF提权方法


下一篇:debug 3的配置