一 问题描述:
由来:公司有个功能需要被大量请求,并且中间涉及到多个不同的语言组成(c++/java/c#等),就决定使用grpc来做rpc服务。我是做c#的当然使用grpc for c# 来处理。这里涉及到一个问题,这个底层服务耗费性能,并且只是在一定时间内被大量请求,所以运维启用监视,当单个容器使用过多时候,便增加新pod,然后通过k8s自己的负载均衡进行协调。大体流程:
注:1.pod1,pod2是grpcserver
2.pod会根据容器检测自动启用新容器
3.api层只负责转发和记录当前请求内容,不做io处理,所以只需要启用一个站点,便可以支撑所有请求,(即使真的太多,那么重启一个服务也是毫无难度的,类似于nginx)
二:问题产生
从理论来看这种属于最简单的流式调用,没有任何问题。一个pod处理不过来,那么便多启用几个pod处理增加速度。一切都很好。但是由于k8s负载均衡只是做个中转,然后因为grpc的http2.0长连接导致只要连接一个pod,那么在使用当前grpcclient的情况下,一直指向一个pod无法释放,导致在巅峰期,出现一个pod累成狗,其他pod看热闹的情况发生。
最终导致pod1一直重启。然后连接到pod2,然后pod2挂掉,持续如此
三:问题原因
由于grpc使用的http2.0长连接(注意与http1.0的长连接,即连接复用one by one方式不一样),是多个请求可同时在一个连接上并行执行
通过tcpdump抓包可以看出来,44026与6001之间多次连接传输数据,并且即使6001没有回传数据,44026也会传输新请求给6001.这就是http2.0的连接并行
四:解决问题的方式:
1.最简单的方式是在api层与k8s之间通过一个nginx来处理长连接,但是此处将grpc的长连接强制改为短连接了,此方法pass掉
2.将k8s连接到pod中的方式改为通过k8s自己的负载均衡处理。但是使用的是阿x云服务器,阿x云不提供当前方案,自己的运维也不愿意在线上折腾,所以此方案pass
3.最傻的解决方式,将api层做负载,每次启动2个pod,一个api的pod一个grpc的pod,然后在api层做负载,让api每次使用同一个长连接。大概如下
如图所示,就知道这种方案有多傻是多傻。但是由于线上紧急,所以使用了此种方式进行处理
5.产生新的问题
由于只有c#出现这个问题,所以有些人啊,一直在那里说c#垃圾,c#不如java,我那个气啊。这不行,不争口香争口气。刚好乘着过年好好捋一捋grpc的代码,看到底啥情况,那么上github看源码
https://github.com/grpc/grpc-dotnet.git,上面就是官方提供的grpc的连接,一步步来看,首先看我们注入的解决方式
1.先看创建过程AddGrpcClient<TClient>
public static void AddGrpcClient<TClient>(this IServiceCollection services, Action<GrpcClientFactoryOptions> configureClient) where TClient : class { var name = TypeNameHelper.GetTypeDisplayName(typeof(TClient), fullName: false); services.Configure(name, configureClient); services.TryAddSingleton<GrpcClientFactory, DefaultGrpcClientFactory>(); services.TryAddSingleton<GrpcCallInvokerFactory>(); services.TryAddSingleton<DefaultClientActivator<TClient>>(); services.TryAddSingleton(new GrpcClientMappingRegistry()); Action<IServiceProvider, HttpClient> configureTypedClient = (s, httpClient) => { var os = s.GetRequiredService<IOptionsMonitor<GrpcClientFactoryOptions>>(); var clientOptions = os.Get(name); httpClient.BaseAddress = clientOptions.Address; httpClient.Timeout = Timeout.InfiniteTimeSpan; }; services .AddHttpClient(name, configureTypedClient) .ConfigurePrimaryHttpMessageHandler(() => { var handler = new HttpClientHandler(); return handler; }); services.AddTransient<TClient>(s => { var clientFactory = s.GetRequiredService<GrpcClientFactory>(); return clientFactory.CreateClient<TClient>(name); }); }View Code
整理出最核心代码,可以发现,生成GrpcClient过程中还是基于HttpClient.这些是注入过程,其中看到一个关键的注入方式
Services.AddTransient<TClient>(s => { var clientFactory = s.GetRequiredService<GrpcClientFactory>(); return clientFactory.CreateClient<TClient>(builder.Name); });
可以看见,当我注入Greet.GreetClient时候,在ioc获取的时候 是基于Transient来获取的
2.在看看创建GrpcClient过程,通过上面的注入方式获取GrpcClientFactory来获取Client:
services.TryAddSingleton<GrpcClientFactory, DefaultGrpcClientFactory>();
再来看看DefaultGrpcClientFactory的CreateClient
public override TClient CreateClient<TClient>(string name) where TClient : class { var defaultClientActivator = _serviceProvider.GetService<DefaultClientActivator<TClient>>(); var clientFactoryOptions = _clientFactoryOptionsMonitor.Get(name); var httpClient = _httpClientFactory.CreateClient(name); var callInvoker = _callInvokerFactory.CreateCallInvoker(httpClient, name, clientFactoryOptions); if (clientFactoryOptions.Creator != null) { var c = clientFactoryOptions.Creator(callInvoker); if (c is TClient client) { return client; } } else { return defaultClientActivator.CreateClient(callInvoker); } }View Code
有个DefaultClientActivator<TClient>用来生成TClient
private readonly static Func<ObjectFactory> _createActivator = () => ActivatorUtilities.CreateFactory(typeof(TClient), new Type[] { typeof(CallInvoker), });
var activator = LazyInitializer.EnsureInitialized(ref _activator,ref _initialized,ref _lock,_createActivator);
这个方法查看了注释,是用来通过Create a delegate that will instantiate a type with constructor arguments provided directly and/or from an System.IServiceProvider.
也就是通过注入IServiceProvider创建一个基于CallInvoker对象生成的Client,但是这点也是我比较奇怪的地方。都已经提供了创建对象的arguments了,为什么还需要通过IServiceProvider来获取注入的参数,暂时没有去看这方面的源码,我就不去猜想这种实现的差异,反正这里目的是创建一个Client,在看看Callinvoke的实现方式
var clientFactoryOptions = _clientFactoryOptionsMonitor.Get(name); var httpClient = _httpClientFactory.CreateClient(name); var callInvoker = _callInvokerFactory.CreateCallInvoker(httpClient, name, clientFactoryOptions);
这里面的代码都比较熟悉,通过IOption注入的GrpcClientFactoryOptions,注入的HttpClient,最后关键点的是CallInvoke
var channelOptions = new GrpcChannelOptions(); channelOptions.HttpClient = httpClient; channelOptions.LoggerFactory = _loggerFactory; if (clientFactoryOptions.ChannelOptionsActions.Count > 0) { foreach (var applyOptions in clientFactoryOptions.ChannelOptionsActions) { applyOptions(channelOptions); } } var address = clientFactoryOptions.Address ?? httpClient.BaseAddress; var channel = GrpcChannel.ForAddress(address, channelOptions); var httpClientCallInvoker = channel.CreateCallInvoker();
可以很清晰的看出来,是通过GrpcChannel.ForAddress(address, channelOptions);来解析所有的参数,只不过有个CallInvoke来当做解析点,在与官方提供的Grpc点比较
var handler = new HttpClientHandler(); handler.ClientCertificates.Add(cert); var httpClient = new HttpClient(handler); var channel = GrpcChannel.ForAddress("https://localhost:5001/", new GrpcChannelOptions { HttpClient = httpClient }); var grpc = new Greeter.GreeterClient(channel); var response = await grpc.SayHelloAsync(new HelloRequest { Name = "Bob" });
也就是换个方式来实现new Client的步骤。这就是所有的Grpc生成的源码