【Orleans开胃菜系列2】连接Connect源码简易分析

【Orleans开胃菜系列2】连接Connect源码简易分析

/**
* prism.js Github theme based on GitHub's theme.
* @author Sam Clarke
*/
code[class*="language-"],
pre[class*="language-"] {
color: #333;
background: none;
font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace;
text-align: left;
white-space: pre;
word-spacing: normal;
word-break: normal;
word-wrap: normal;
line-height: 1.4;

-moz-tab-size: 8;
-o-tab-size: 8;
tab-size: 8;

-webkit-hyphens: none;
-moz-hyphens: none;
-ms-hyphens: none;
hyphens: none;
}

/* Code blocks */
pre[class*="language-"] {
padding: .8em;
overflow: auto;
/* border: 1px solid #ddd; */
border-radius: 3px;
/* background: #fff; */
background: #f5f5f5;
}

/* Inline code */
:not(pre) > code[class*="language-"] {
padding: .1em;
border-radius: .3em;
white-space: normal;
background: #f5f5f5;
}

.token.comment,
.token.blockquote {
color: #969896;
}

.token.cdata {
color: #183691;
}

.token.doctype,
.token.punctuation,
.token.variable,
.token.macro.property {
color: #333;
}

.token.operator,
.token.important,
.token.keyword,
.token.rule,
.token.builtin {
color: #a71d5d;
}

.token.string,
.token.url,
.token.regex,
.token.attr-value {
color: #183691;
}

.token.property,
.token.number,
.token.boolean,
.token.entity,
.token.atrule,
.token.constant,
.token.symbol,
.token.command,
.token.code {
color: #0086b3;
}

.token.tag,
.token.selector,
.token.prolog {
color: #63a35c;
}

.token.function,
.token.namespace,
.token.pseudo-element,
.token.class,
.token.class-name,
.token.pseudo-class,
.token.id,
.token.url-reference .token.variable,
.token.attr-name {
color: #795da3;
}

.token.entity {
cursor: help;
}

.token.title,
.token.title .token.punctuation {
font-weight: bold;
color: #1d3e81;
}

.token.list {
color: #ed6a43;
}

.token.inserted {
background-color: #eaffea;
color: #55a532;
}

.token.deleted {
background-color: #ffecec;
color: #bd2c00;
}

.token.bold {
font-weight: bold;
}

.token.italic {
font-style: italic;
}

/* JSON */
.language-json .token.property {
color: #183691;
}

.language-markup .token.tag .token.punctuation {
color: #333;
}

/* CSS */
code.language-css,
.language-css .token.function {
color: #0086b3;
}

/* YAML */
.language-yaml .token.atrule {
color: #63a35c;
}

code.language-yaml {
color: #183691;
}

/* Ruby */
.language-ruby .token.function {
color: #333;
}

/* Markdown */
.language-markdown .token.url {
color: #795da3;
}

/* Makefile */
.language-makefile .token.symbol {
color: #795da3;
}

.language-makefile .token.variable {
color: #183691;
}

.language-makefile .token.builtin {
color: #0086b3;
}

/* Bash */
.language-bash .token.keyword {
color: #0086b3;
}html body{font-family:"Helvetica Neue",Helvetica,"Segoe UI",Arial,freesans,sans-serif;font-size:16px;line-height:1.6;color:#333;background-color:#fff;overflow:initial;box-sizing:border-box;word-wrap:break-word}html body>:first-child{margin-top:0}html body h1,html body h2,html body h3,html body h4,html body h5,html body h6{line-height:1.2;margin-top:1em;margin-bottom:16px;color:#000}html body h1{font-size:2.25em;font-weight:300;padding-bottom:.3em}html body h2{font-size:1.75em;font-weight:400;padding-bottom:.3em}html body h3{font-size:1.5em;font-weight:500}html body h4{font-size:1.25em;font-weight:600}html body h5{font-size:1.1em;font-weight:600}html body h6{font-size:1em;font-weight:600}html body h1,html body h2,html body h3,html body h4,html body h5{font-weight:600}html body h5{font-size:1em}html body h6{color:#5c5c5c}html body strong{color:#000}html body del{color:#5c5c5c}html body a:not([href]){color:inherit;text-decoration:none}html body a{color:#08c;text-decoration:none}html body a:hover{color:#00a3f5;text-decoration:none}html body img{max-width:100%}html body>p{margin-top:0;margin-bottom:16px;word-wrap:break-word}html body>ul,html body>ol{margin-bottom:16px}html body ul,html body ol{padding-left:2em}html body ul.no-list,html body ol.no-list{padding:0;list-style-type:none}html body ul ul,html body ul ol,html body ol ol,html body ol ul{margin-top:0;margin-bottom:0}html body li{margin-bottom:0}html body li.task-list-item{list-style:none}html body li>p{margin-top:0;margin-bottom:0}html body .task-list-item-checkbox{margin:0 .2em .25em -1.8em;vertical-align:middle}html body .task-list-item-checkbox:hover{cursor:pointer}html body blockquote{margin:16px 0;font-size:inherit;padding:0 15px;color:#5c5c5c;border-left:4px solid #d6d6d6}html body blockquote>:first-child{margin-top:0}html body blockquote>:last-child{margin-bottom:0}html body hr{height:4px;margin:32px 0;background-color:#d6d6d6;border:0 none}html body table{margin:10px 0 15px 0;border-collapse:collapse;border-spacing:0;display:block;width:100%;overflow:auto;word-break:normal;word-break:keep-all}html body table th{font-weight:bold;color:#000}html body table td,html body table th{border:1px solid #d6d6d6;padding:6px 13px}html body dl{padding:0}html body dl dt{padding:0;margin-top:16px;font-size:1em;font-style:italic;font-weight:bold}html body dl dd{padding:0 16px;margin-bottom:16px}html body code{font-family:Menlo,Monaco,Consolas,'Courier New',monospace;font-size:.85em !important;color:#000;background-color:#f0f0f0;border-radius:3px;padding:.2em 0}html body code::before,html body code::after{letter-spacing:-0.2em;content:"\00a0"}html body pre>code{padding:0;margin:0;font-size:.85em !important;word-break:normal;white-space:pre;background:transparent;border:0}html body .highlight{margin-bottom:16px}html body .highlight pre,html body pre{padding:1em;overflow:auto;font-size:.85em !important;line-height:1.45;border:#d6d6d6;border-radius:3px}html body .highlight pre{margin-bottom:0;word-break:normal}html body pre code,html body pre tt{display:inline;max-width:initial;padding:0;margin:0;overflow:initial;line-height:inherit;word-wrap:normal;background-color:transparent;border:0}html body pre code:before,html body pre tt:before,html body pre code:after,html body pre tt:after{content:normal}html body p,html body blockquote,html body ul,html body ol,html body dl,html body pre{margin-top:0;margin-bottom:16px}html body kbd{color:#000;border:1px solid #d6d6d6;border-bottom:2px solid #c7c7c7;padding:2px 4px;background-color:#f0f0f0;border-radius:3px}@media print{html body{background-color:#fff}html body h1,html body h2,html body h3,html body h4,html body h5,html body h6{color:#000;page-break-after:avoid}html body blockquote{color:#5c5c5c}html body pre{page-break-inside:avoid}html body table{display:table}html body img{display:block;max-width:100%;max-height:100%}html body pre,html body code{word-wrap:break-word;white-space:pre}}.markdown-preview{width:100%;height:100%;box-sizing:border-box}.markdown-preview .pagebreak,.markdown-preview .newpage{page-break-before:always}.markdown-preview pre.line-numbers{position:relative;padding-left:3.8em;counter-reset:linenumber}.markdown-preview pre.line-numbers>code{position:relative}.markdown-preview pre.line-numbers .line-numbers-rows{position:absolute;pointer-events:none;top:1em;font-size:100%;left:0;width:3em;letter-spacing:-1px;border-right:1px solid #999;-webkit-user-select:none;-moz-user-select:none;-ms-user-select:none;user-select:none}.markdown-preview pre.line-numbers .line-numbers-rows>span{pointer-events:none;display:block;counter-increment:linenumber}.markdown-preview pre.line-numbers .line-numbers-rows>span:before{content:counter(linenumber);color:#999;display:block;padding-right:.8em;text-align:right}.markdown-preview .mathjax-exps .MathJax_Display{text-align:center !important}.markdown-preview:not([for="preview"]) .code-chunk .btn-group{display:none}.markdown-preview:not([for="preview"]) .code-chunk .status{display:none}.markdown-preview:not([for="preview"]) .code-chunk .output-div{margin-bottom:16px}.scrollbar-style::-webkit-scrollbar{width:8px}.scrollbar-style::-webkit-scrollbar-track{border-radius:10px;background-color:transparent}.scrollbar-style::-webkit-scrollbar-thumb{border-radius:5px;background-color:rgba(150,150,150,0.66);border:4px solid rgba(150,150,150,0.66);background-clip:content-box}html body[for="html-export"]:not([data-presentation-mode]){position:relative;width:100%;height:100%;top:0;left:0;margin:0;padding:0;overflow:auto}html body[for="html-export"]:not([data-presentation-mode]) .markdown-preview{position:relative;top:0}@media screen and (min-width:914px){html body[for="html-export"]:not([data-presentation-mode]) .markdown-preview{padding:2em calc(50% - 457px)}}@media screen and (max-width:914px){html body[for="html-export"]:not([data-presentation-mode]) .markdown-preview{padding:2em}}@media screen and (max-width:450px){html body[for="html-export"]:not([data-presentation-mode]) .markdown-preview{font-size:14px !important;padding:1em}}@media print{html body[for="html-export"]:not([data-presentation-mode]) #sidebar-toc-btn{display:none}}html body[for="html-export"]:not([data-presentation-mode]) #sidebar-toc-btn{position:fixed;bottom:8px;left:8px;font-size:28px;cursor:pointer;color:inherit;z-index:99;width:32px;text-align:center;opacity:.4}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] #sidebar-toc-btn{opacity:1}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc{position:fixed;top:0;left:0;width:300px;height:100%;padding:32px 0 48px 0;font-size:14px;box-shadow:0 0 4px rgba(150,150,150,0.33);box-sizing:border-box;overflow:auto;background-color:inherit}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc::-webkit-scrollbar{width:8px}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc::-webkit-scrollbar-track{border-radius:10px;background-color:transparent}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc::-webkit-scrollbar-thumb{border-radius:5px;background-color:rgba(150,150,150,0.66);border:4px solid rgba(150,150,150,0.66);background-clip:content-box}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc a{text-decoration:none}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc ul{padding:0 1.6em;margin-top:.8em}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc li{margin-bottom:.8em}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .md-sidebar-toc ul{list-style-type:none}html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .markdown-preview{left:300px;width:calc(100% - 300px);padding:2em calc(50% - 457px - 150px);margin:0;box-sizing:border-box}@media screen and (max-width:1274px){html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .markdown-preview{padding:2em}}@media screen and (max-width:450px){html body[for="html-export"]:not([data-presentation-mode])[html-show-sidebar-toc] .markdown-preview{width:100%}}html body[for="html-export"]:not([data-presentation-mode]):not([html-show-sidebar-toc]) .markdown-preview{left:50%;transform:translateX(-50%)}html body[for="html-export"]:not([data-presentation-mode]):not([html-show-sidebar-toc]) .md-sidebar-toc{display:none}
/* Please visit the URL below for more information: */
/* https://shd101wyy.github.io/markdown-preview-enhanced/#/customize-css */

简要说明

//连接代码。
using (var client = await StartClientWithRetries())
{ }

从方法看,只是一个简单允许重试的启动客户端。追踪进去会发现关于重试逻辑的实践,Socket编程的实践,基于内存的消息队列的实践,依赖注入。再看源码的基础上,最好能配合一些理论书籍来看。理论指导实践,实践反馈理论,才是技术成长的步骤。

这篇文章只涉及Connect所引用方法的部分说明,一步一步来加深理解。
本来我是打算把orleans研究透之后再来写一篇,但看了一周之后,发下connect里面调用了很多类,每个类又有很多方法,这样下去没有尽头,到最终估计什么也写不成。

分析源码本来就是循环渐进的过程,也是一个熟悉框架/原理/实践的过程。直接跳过这个步骤,必然损失良多。所以这部分就叫开胃菜吧。在查看connect过程,会越来越接触到各种知识。

本篇暂不涉及数据持久化,主要依赖.netcore内置方法操纵内存实现。

您会接触到的扩展知识

扩展知识之Timer&TimerQueue
Timer

Timer
在设置的间隔后生成事件,并提供生成重复事件的选项 TimerQueue
时间队列

扩展知识之信号量
SemaphoreSlim
SemaphoreSlim 实现

//信号量
SemaphoreSlim
表示Semaphore的轻量级替代,它限制了可以同时访问资源或资源池的线程数
>>Release 释放
>> Wait 等待。 信号量有两种类型:本地信号量和命名系统信号量。前者是应用程序的本地。后者在整个操作系统中是可见的,并且适用于进程间同步。该SemaphoreSlim是一个轻量级替代信号量不使用Windows内核中的信号类。与Semaphore类不同,SemaphoreSlim类不支持命名系统信号量。您只能将其用作本地信号量。所述SemaphoreSlim类为单一的应用程序内的同步推荐的信号量。

扩展知识之BlockingCollection
BlockingCollection介绍
利用BlockingCollection实现生产者和消费者队列

BlockingCollection
为实现 IProducerConsumerCollection<T> 的线程安全集合提供阻塞和限制功能。
>> Take
>> Add
有这个类型,

扩展知识之Interlocked
Interlocked

Interlocked为多个线程共享的变量提供原子操作。
>>Add
>>Decrement以原子操作的形式递减指定变量的值并存储结果。
>>Increment以原子操作的形式递增指定变量的值并存储结果
>>Exchange
>>CompareExchange
>>Read
个人想法:和Redis的Increment/Decrement类似,部分情况下可以取代Redis的increment/decrement,提高速度。

扩展知识之SpinWait
SpinWait
两阶段提交
Monitor

SpinWait
为基于旋转的等待提供支持。
SpinWait是一种值类型,这意味着低级代码可以使用SpinWait而不必担心不必要的分配开销。SpinWait通常不适用于普通应用程序。在大多数情况下,您应该使用.NET Framework提供的同步类,例如Monitor
>> SpinOnce

扩展知识之Queue&Stack
Queue
Stack

Queue<T>
表示先进先出的对象集合,此类将通用队列实现为循环数组。存储在队列<T>中的对象在一端插入并从另一端移除。
>Enqueue
>Dequeue
>Peek Stack<T>
表示具有相同指定类型的实例的可变大小后进先出(LIFO)集合。
>Push
>Pop
>PeeK ConcurrentQueue <T>
表示线程安全的先进先出的对象集合
ConcurrentStack <T>
表示线程安全的后进先出(LIFO)集合 如果需要以与存储在集合中的顺序相同的顺序访问信息,请使用Queue <T>。如果需要以相反的顺序访问信息,请使用Stack <T>。使用ConcurrentQueue <T>或ConcurrentStack <T> 如果您需要同时从多个线程访问该集合。

扩展知识之Task
TaskCompletionSource
基于Task的异步模式--全面介绍

TaskCompletionSource表示未绑定到委托的Task <TResult>的生产者端,通过Task属性提供对使用者端的访问。

扩展知识之线程安全的集合
System.Collections.Concurrent
ConcurrentDictionary
ConcurrentDictionary 对决 Dictionary+Locking

System.Collections.Concurrent提供了应在的地方对应的类型在使用几个线程安全的集合类System.Collections中和System.Collections.Generic命名空间,只要多线程并发访问的集合。
但是,通过当前集合实现的其中一个接口访问的成员(包括扩展方法)不保证是线程安全的,并且可能需要由调用者同步。 ConcurrentDictionary:表示可以由多个线程同时访问的键/值对的线程安全集合
对于ConcurrentDictionary <TKey,TValue>类上的所有其他操作,所有这些操作都是原子操作并且是线程安全的。唯一的例外是接受委托的方法,即AddOrUpdate和GetOrAdd。对于字典的修改和写入操作,ConcurrentDictionary <TKey,TValue>使用细粒度锁定来确保线程安全。(对字典的读取操作是以无锁方式执行的。)但是,这些方法的委托在锁外部调用,以避免在锁定下执行未知代码时可能出现的问题。因此,这些代理执行的代码不受操作的原子性影响。

扩展知识之网络编程
Socket微软官方文档
Socket博客园

Socket 类提供一组丰富的方法和属性进行网络通信
TCP协议
>BeginConnect
>EndConnect
>BeginSend
>EndSend
>BeginReceive
>EndReceive
>BeginAccept
>EndAccept
UDP协议
>BeginSendTo
>EndSendTo
>BeginReceiveFromandEndReceiveFrom

扩展知识之线程通知:
AutoResetEvent
ManualResetEvent
ManualResetEventSlim

AutoResetEvent允许线程通过信令相互通信。通常,当线程需要对资源的独占访问时,可以使用此类。
>Set释放线程
>WaitOne等待线程 ManualResetEvent
通知一个或多个等待线程发生了事件 ManualResetEventSlim
当等待时间预期非常短,并且事件未跨越进程边界时,您可以使用此类以获得比ManualResetEvent更好的性能

扩展知识之依赖注入:
ActivatorUtilities
扩展.net-使用.netcore进行依赖注入

服务可以通过两种机制来解析:
IServiceProvider
ActivatorUtilities – 允许在依赖关系注入容器中创建没有服务注册的对象。 ActivatorUtilities 用于面向用户的抽象,例如标记帮助器、MVC 控制器、SignalR 集线器和模型绑定器。
>ActivatorUtilities.CreateInstance
>ActivatorUtilities.GetServiceOrCreateInstance

Client连接代码。

//连接代码。
using (var client = await StartClientWithRetries())
{
await DoClientWork(client);
Console.ReadKey();
}

重点分析StartClientWithRetries

  • UseLocalhostClustering 用来配置连接参数:端口/ClusterId/ServiceId等。 配置一个连接本地silo的客户端,也有其他类型的如: UseServiceProviderFactory,UseStaticClustering

  • ConfigureLogging配置日志参数扩展阅读

  • Build用来注册默认服务和构建容器,扩展了解依赖注入知识。微软自带Microsoft.Extensions.DependencyInjection库

private static async Task<IClusterClient> StartClientWithRetries()
{
attempt = 0;
IClusterClient client;
client = new ClientBuilder()
.UseLocalhostClustering()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "dev";
options.ServiceId = "HelloWorldApp";
})
.ConfigureLogging(logging => logging.AddConsole())
.Build(); await client.Connect(RetryFilter);
Console.WriteLine("Client successfully connect to silo host");
return client;
}

先来看下connect

这里的LockAsync,内部用了SemaphoreSlim.Wait需要扩展了解下。和lock的区别。信号量本地信号量和系统信号量。
这里用state来维护生命周期

public async Task Connect(Func<Exception, Task<bool>> retryFilter = null)
{
this.ThrowIfDisposedOrAlreadyInitialized();
using (await this.initLock.LockAsync().ConfigureAwait(false))
{
this.ThrowIfDisposedOrAlreadyInitialized();
if (this.state == LifecycleState.Starting)
{
throw new InvalidOperationException("A prior connection attempt failed. This instance must be disposed.");
} this.state = LifecycleState.Starting;
if (this.runtimeClient is OutsideRuntimeClient orc) await orc.Start(retryFilter).ConfigureAwait(false);
await this.clusterClientLifecycle.OnStart().ConfigureAwait(false);
this.state = LifecycleState.Started;
}
}

看下orc.Start

 public async Task Start(Func<Exception, Task<bool>> retryFilter = null)
{
// Deliberately avoid capturing the current synchronization context during startup and execute on the default scheduler.
// This helps to avoid any issues (such as deadlocks) caused by executing with the client's synchronization context/scheduler.
await Task.Run(() => this.StartInternal(retryFilter)).ConfigureAwait(false); logger.Info(ErrorCode.ProxyClient_StartDone, "{0} Started OutsideRuntimeClient with Global Client ID: {1}", BARS, CurrentActivationAddress.ToString() + ", client GUID ID: " + handshakeClientId);
}

重要的StartInternal

gateways获取网关列表
transport用来维护客户端消息管理。
RunClientMessagePump用来处理接收分发消息。

 private async Task StartInternal(Func<Exception, Task<bool>> retryFilter)
{
// Initialize the gateway list provider, since information from the cluster is required to successfully
// initialize subsequent services.
var initializedGatewayProvider = new[] {false};
await ExecuteWithRetries(async () =>
{
if (!initializedGatewayProvider[0])
{
await this.gatewayListProvider.InitializeGatewayListProvider();
initializedGatewayProvider[0] = true;
} var gateways = await this.gatewayListProvider.GetGateways();
if (gateways.Count == 0)
{
var gatewayProviderType = this.gatewayListProvider.GetType().GetParseableName();
var err = $"Could not find any gateway in {gatewayProviderType}. Orleans client cannot initialize.";
logger.Error(ErrorCode.GatewayManager_NoGateways, err);
throw new SiloUnavailableException(err);
}
},
retryFilter); var generation = -SiloAddress.AllocateNewGeneration(); // Client generations are negative
transport = ActivatorUtilities.CreateInstance<ClientMessageCenter>(this.ServiceProvider, localAddress, generation, handshakeClientId);
transport.Start();
CurrentActivationAddress = ActivationAddress.NewActivationAddress(transport.MyAddress, handshakeClientId); listeningCts = new CancellationTokenSource();
var ct = listeningCts.Token;
listenForMessages = true; // Keeping this thread handling it very simple for now. Just queue task on thread pool.
Task.Run(
() =>
{
while (listenForMessages && !ct.IsCancellationRequested)
{
try
{
RunClientMessagePump(ct);
}
catch (Exception exc)
{
logger.Error(ErrorCode.Runtime_Error_100326, "RunClientMessagePump has thrown exception", exc);
}
}
},
ct).Ignore(); await ExecuteWithRetries(
async () => this.GrainTypeResolver = await transport.GetGrainTypeResolver(this.InternalGrainFactory),
retryFilter); this.typeMapRefreshTimer = new AsyncTaskSafeTimer(
this.logger,
RefreshGrainTypeResolver,
null,
this.typeMapRefreshInterval,
this.typeMapRefreshInterval); ClientStatistics.Start(transport, clientId); await ExecuteWithRetries(StreamingInitialize, retryFilter); async Task ExecuteWithRetries(Func<Task> task, Func<Exception, Task<bool>> shouldRetry)
{
while (true)
{
try
{
await task();
return;
}
catch (Exception exception) when (shouldRetry != null)
{
var retry = await shouldRetry(exception);
if (!retry) throw;
}
}
}
}

重点关注下StartInternal里面ClientMessageCenter的初始化

用来处理消息分发等,也涉及网关部分调用。

 public ClientMessageCenter(
IOptions<GatewayOptions> gatewayOptions,
IOptions<ClientMessagingOptions> clientMessagingOptions,
IPAddress localAddress,
int gen,
GrainId clientId,
IGatewayListProvider gatewayListProvider,
SerializationManager serializationManager,
IRuntimeClient runtimeClient,
MessageFactory messageFactory,
IClusterConnectionStatusListener connectionStatusListener,
ExecutorService executorService,
ILoggerFactory loggerFactory,
IOptions<NetworkingOptions> networkingOptions,
IOptions<StatisticsOptions> statisticsOptions)
{
this.loggerFactory = loggerFactory;
this.openConnectionTimeout = networkingOptions.Value.OpenConnectionTimeout;
this.SerializationManager = serializationManager;
this.executorService = executorService;
lockable = new object();
MyAddress = SiloAddress.New(new IPEndPoint(localAddress, 0), gen);
ClientId = clientId;
this.RuntimeClient = runtimeClient;
this.messageFactory = messageFactory;
this.connectionStatusListener = connectionStatusListener;
Running = false;
GatewayManager = new GatewayManager(gatewayOptions.Value, gatewayListProvider, loggerFactory);
PendingInboundMessages = new BlockingCollection<Message>();
gatewayConnections = new Dictionary<Uri, GatewayConnection>();
numMessages = 0;
grainBuckets = new WeakReference[clientMessagingOptions.Value.ClientSenderBuckets];
logger = loggerFactory.CreateLogger<ClientMessageCenter>();
if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Proxy grain client constructed");
IntValueStatistic.FindOrCreate(
StatisticNames.CLIENT_CONNECTED_GATEWAY_COUNT,
() =>
{
lock (gatewayConnections)
{
return gatewayConnections.Values.Count(conn => conn.IsLive);
}
});
statisticsLevel = statisticsOptions.Value.CollectionLevel;
if (statisticsLevel.CollectQueueStats())
{
queueTracking = new QueueTrackingStatistic("ClientReceiver", statisticsOptions);
}
}

关注下StartInternal的RunClientMessagePump

WaitMessage里面利用了BlockingCollection.Take

 private void RunClientMessagePump(CancellationToken ct)
{
incomingMessagesThreadTimeTracking?.OnStartExecution(); while (listenForMessages)
{
var message = transport.WaitMessage(Message.Categories.Application, ct); if (message == null) // if wait was cancelled
break; // when we receive the first message, we update the
// clientId for this client because it may have been modified to
// include the cluster name
if (!firstMessageReceived)
{
firstMessageReceived = true;
if (!handshakeClientId.Equals(message.TargetGrain))
{
clientId = message.TargetGrain;
transport.UpdateClientId(clientId);
CurrentActivationAddress = ActivationAddress.GetAddress(transport.MyAddress, clientId, CurrentActivationAddress.Activation);
}
else
{
clientId = handshakeClientId;
}
} switch (message.Direction)
{
case Message.Directions.Response:
{
ReceiveResponse(message);
break;
}
case Message.Directions.OneWay:
case Message.Directions.Request:
{
this.localObjects.Dispatch(message);
break;
}
default:
logger.Error(ErrorCode.Runtime_Error_100327, $"Message not supported: {message}.");
break;
}
} incomingMessagesThreadTimeTracking?.OnStopExecution();
}

RunClientMessagePump里面的ReceiveResponse

这里主要是对response做一些判断处理。

public void ReceiveResponse(Message response)
{
if (logger.IsEnabled(LogLevel.Trace)) logger.Trace("Received {0}", response); // ignore duplicate requests
if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.DuplicateRequest)
return; CallbackData callbackData;
var found = callbacks.TryGetValue(response.Id, out callbackData);
if (found)
{
// We need to import the RequestContext here as well.
// Unfortunately, it is not enough, since CallContext.LogicalGetData will not flow "up" from task completion source into the resolved task.
// RequestContextExtensions.Import(response.RequestContextData);
callbackData.DoCallback(response);
}
else
{
logger.Warn(ErrorCode.Runtime_Error_100011, "No callback for response message: " + response);
}
}
//DoCallBack
public void DoCallback(Message response)
{
if (this.IsCompleted)
return;
var requestStatistics = this.shared.RequestStatistics;
lock (this)
{
if (this.IsCompleted)
return; if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.Transient)
{
if (this.shared.ShouldResend(this.Message))
{
return;
}
} this.IsCompleted = true;
if (requestStatistics.CollectApplicationRequestsStats)
{
this.stopwatch.Stop();
} this.shared.Unregister(this.Message);
} if (requestStatistics.CollectApplicationRequestsStats)
{
requestStatistics.OnAppRequestsEnd(this.stopwatch.Elapsed);
} // do callback outside the CallbackData lock. Just not a good practice to hold a lock for this unrelated operation.
this.shared.ResponseCallback(response, this.context);
} //this.shared.Unregister(this.Message);

RunClientMessagePump里面的消息分发Dispatch(message)

这里面用ConcurrentDictionary<GuidId, LocalObjectData>来判断ObserverId是否存在,不存在移除。
如果存在,利用Queue的Enqueue将消息插入队列。

如果启动成功,异步调用LocalObjectMessagePumpAsync,然后利用Queue的Dequeue来取的最新消息,
然后调用SendResponseAsync来发送消息

private async Task LocalObjectMessagePumpAsync(LocalObjectData objectData)
{
while (true)
{
try
{
Message message;
lock (objectData.Messages)
{
if (objectData.Messages.Count == 0)
{
objectData.Running = false;
break;
} message = objectData.Messages.Dequeue();
} if (ExpireMessageIfExpired(message, MessagingStatisticsGroup.Phase.Invoke))
continue; RequestContextExtensions.Import(message.RequestContextData);
var request = (InvokeMethodRequest)message.GetDeserializedBody(this.serializationManager);
var targetOb = (IAddressable)objectData.LocalObject.Target;
object resultObject = null;
Exception caught = null;
try
{
// exceptions thrown within this scope are not considered to be thrown from user code
// and not from runtime code.
var resultPromise = objectData.Invoker.Invoke(targetOb, request);
if (resultPromise != null) // it will be null for one way messages
{
resultObject = await resultPromise;
}
}
catch (Exception exc)
{
// the exception needs to be reported in the log or propagated back to the caller.
caught = exc;
} if (caught != null)
this.ReportException(message, caught);
else if (message.Direction != Message.Directions.OneWay)
this.SendResponseAsync(message, resultObject);
}
catch (Exception)
{
// ignore, keep looping.
}
}
}

SendResponseAsync经过序列化,DeepCopy,赋值各种请求参数等各种操作以后,来到最关键的部分
transport.SendMessage

第一步先获取活动的网关(silo),如没有则建立GatewayConnection
第二步启动Connection

Connect--调用socket创建连接
Start--GatewayClientReceiver间接调用Socket来接收消息,

 public void SendMessage(Message msg)
{
GatewayConnection gatewayConnection = null;
bool startRequired = false; if (!Running)
{
this.logger.Error(ErrorCode.ProxyClient_MsgCtrNotRunning, $"Ignoring {msg} because the Client message center is not running");
return;
} // If there's a specific gateway specified, use it
if (msg.TargetSilo != null && GatewayManager.GetLiveGateways().Contains(msg.TargetSilo.ToGatewayUri()))
{
Uri addr = msg.TargetSilo.ToGatewayUri();
lock (lockable)
{
if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
{
gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, executorService, this.loggerFactory, this.openConnectionTimeout);
gatewayConnections[addr] = gatewayConnection;
if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Creating gateway to {0} for pre-addressed message", addr);
startRequired = true;
}
}
}
// For untargeted messages to system targets, and for unordered messages, pick a next connection in round robin fashion.
else if (msg.TargetGrain.IsSystemTarget || msg.IsUnordered)
{
// Get the cached list of live gateways.
// Pick a next gateway name in a round robin fashion.
// See if we have a live connection to it.
// If Yes, use it.
// If not, create a new GatewayConnection and start it.
// If start fails, we will mark this connection as dead and remove it from the GetCachedLiveGatewayNames.
lock (lockable)
{
int msgNumber = numMessages;
numMessages = unchecked(numMessages + 1);
IList<Uri> gatewayNames = GatewayManager.GetLiveGateways();
int numGateways = gatewayNames.Count;
if (numGateways == 0)
{
RejectMessage(msg, "No gateways available");
logger.Warn(ErrorCode.ProxyClient_CannotSend, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager);
return;
}
Uri addr = gatewayNames[msgNumber % numGateways];
if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
{
gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout);
gatewayConnections[addr] = gatewayConnection;
if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayUnordered, "Creating gateway to {0} for unordered message to grain {1}", addr, msg.TargetGrain);
startRequired = true;
}
// else - Fast path - we've got a live gatewayConnection to use
}
}
// Otherwise, use the buckets to ensure ordering.
else
{
var index = msg.TargetGrain.GetHashCode_Modulo((uint)grainBuckets.Length);
lock (lockable)
{
// Repeated from above, at the declaration of the grainBuckets array:
// Requests are bucketed by GrainID, so that all requests to a grain get routed through the same bucket.
// Each bucket holds a (possibly null) weak reference to a GatewayConnection object. That connection instance is used
// if the WeakReference is non-null, is alive, and points to a live gateway connection. If any of these conditions is
// false, then a new gateway is selected using the gateway manager, and a new connection established if necessary.
var weakRef = grainBuckets[index];
if ((weakRef != null) && weakRef.IsAlive)
{
gatewayConnection = weakRef.Target as GatewayConnection;
}
if ((gatewayConnection == null) || !gatewayConnection.IsLive)
{
var addr = GatewayManager.GetLiveGateway();
if (addr == null)
{
RejectMessage(msg, "No gateways available");
logger.Warn(ErrorCode.ProxyClient_CannotSend_NoGateway, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager);
return;
}
if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_NewBucketIndex, "Starting new bucket index {0} for ordered messages to grain {1}", index, msg.TargetGrain);
if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
{
gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout);
gatewayConnections[addr] = gatewayConnection;
if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayToGrain, "Creating gateway to {0} for message to grain {1}, bucket {2}, grain id hash code {3}X", addr, msg.TargetGrain, index,
msg.TargetGrain.GetHashCode().ToString("x"));
startRequired = true;
}
grainBuckets[index] = new WeakReference(gatewayConnection);
}
}
} if (startRequired)
{
gatewayConnection.Start(); if (!gatewayConnection.IsLive)
{
// if failed to start Gateway connection (failed to connect), try sending this msg to another Gateway.
RejectOrResend(msg);
return;
}
} try
{
gatewayConnection.QueueRequest(msg);
if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_QueueRequest, "Sending message {0} via gateway {1}", msg, gatewayConnection.Address);
}
catch (InvalidOperationException)
{
// This exception can be thrown if the gateway connection we selected was closed since we checked (i.e., we lost the race)
// If this happens, we reject if the message is targeted to a specific silo, or try again if not
RejectOrResend(msg);
}
}
 public void Connect()
{
if (!MsgCenter.Running)
{
if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_MsgCtrNotRunning, "Ignoring connection attempt to gateway {0} because the proxy message center is not running", Address);
return;
} // Yes, we take the lock around a Sleep. The point is to ensure that no more than one thread can try this at a time.
// There's still a minor problem as written -- if the sending thread and receiving thread both get here, the first one
// will try to reconnect. eventually do so, and then the other will try to reconnect even though it doesn't have to...
// Hopefully the initial "if" statement will prevent that.
lock (Lockable)
{
if (!IsLive)
{
if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_DeadGateway, "Ignoring connection attempt to gateway {0} because this gateway connection is already marked as non live", Address);
return; // if the connection is already marked as dead, don't try to reconnect. It has been doomed.
} for (var i = 0; i < ClientMessageCenter.CONNECT_RETRY_COUNT; i++)
{
try
{
if (Socket != null)
{
if (Socket.Connected)
return; MarkAsDisconnected(Socket); // clean up the socket before reconnecting.
}
if (lastConnect != new DateTime())
{
// We already tried at least once in the past to connect to this GW.
// If we are no longer connected to this GW and it is no longer in the list returned
// from the GatewayProvider, consider directly this connection dead.
if (!MsgCenter.GatewayManager.GetLiveGateways().Contains(Address))
break; // Wait at least ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY before reconnection tries
var millisecondsSinceLastAttempt = DateTime.UtcNow - lastConnect;
if (millisecondsSinceLastAttempt < ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY)
{
var wait = ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY - millisecondsSinceLastAttempt;
if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_PauseBeforeRetry, "Pausing for {0} before trying to connect to gateway {1} on trial {2}", wait, Address, i);
Thread.Sleep(wait);
}
}
lastConnect = DateTime.UtcNow;
Socket = new Socket(Silo.Endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
Socket.EnableFastpath();
SocketManager.Connect(Socket, Silo.Endpoint, this.openConnectionTimeout);
NetworkingStatisticsGroup.OnOpenedGatewayDuplexSocket();
MsgCenter.OnGatewayConnectionOpen();
SocketManager.WriteConnectionPreamble(Socket, MsgCenter.ClientId); // Identifies this client
Log.Info(ErrorCode.ProxyClient_Connected, "Connected to gateway at address {0} on trial {1}.", Address, i);
return;
}
catch (Exception ex)
{
Log.Warn(ErrorCode.ProxyClient_CannotConnect, $"Unable to connect to gateway at address {Address} on trial {i} (Exception: {ex.Message})");
MarkAsDisconnected(Socket);
}
}
// Failed too many times -- give up
MarkAsDead();
}
}

GatewayConnection的Start会调用到GatewayClientReceiver的Run方法,利用BlockingCollection的Add方法添加到PendingInboundMessages,而之前的RunClientMessagePump里面transport.WaitMessage方法正式通过PendingInboundMessages.Take()来获取消息,至此形成了闭环。

 protected override void Run()
{
try
{
while (!Cts.IsCancellationRequested)
{
int bytesRead = FillBuffer(buffer.BuildReceiveBuffer());
if (bytesRead == 0)
{
continue;
} buffer.UpdateReceivedData(bytesRead); Message msg;
while (buffer.TryDecodeMessage(out msg))
{
gatewayConnection.MsgCenter.QueueIncomingMessage(msg);
if (Log.IsEnabled(LogLevel.Trace)) Log.Trace("Received a message from gateway {0}: {1}", gatewayConnection.Address, msg);
}
}
}
catch (Exception ex)
{
buffer.Reset();
Log.Warn(ErrorCode.ProxyClientUnhandledExceptionWhileReceiving, $"Unexpected/unhandled exception while receiving: {ex}. Restarting gateway receiver for {gatewayConnection.Address}.", ex);
throw;
}
}

关注SafeTimerBase类

Orleans用于处理定时或延时回调作业。

总结

创建一个简单的connect,里面有这么多沟沟渠渠,但本质上来说,最底层是利用Socket套接字机制来实施机制。在Socket的基础之上,又封装维护了一层GatewayConnection和GatewayClientReceiver来实现网关(Silo)的操作,比如重试/监控/熔断等,再结合Timer,Queue,BlockingCollection,Task,ConcurrentDictionary,Interlocked等知识,构建一个可用的通信框架。
说来容易几句话,实现起来都是泪。

如果您完全熟悉异步编程,并行编程,Socket网络编程。又对分布式/微服务理论有较深的理解,那么orleans实现机制,对您来说可能是相对容易。

本期结束,下期更精彩!

上一篇:关于C#联接数据库是出现'未在本地计算机上注册'错误的解决办法


下一篇:Neural Pathways of Interaction Mediating the Central Control of Autonomic Bodily State 自主神经系统-大脑调节神经通路