学习迭代器实现C#异步编程——仿async/await(一)

  .NET 4.5的async/await真是个神奇的东西,巧妙异常以致我不禁对其实现充满好奇,但一直难以窥探其门径。不意间读了此篇强文《Asynchronous Programming in C# using Iterators》,犹如醍醐灌顶,茅厕顿开,思路犹如尿崩。美玉不敢独享,故写此篇,将所学中一些思考与诸君共享,期抛砖引玉,擦出一些基情火花……

  强文《Asynchronous Programming in C# using Iterators》出自大牛,大牛眼界高远。故文中所述较为简略,而文中所附代码亦较为晦涩,鄙人驽钝,反复阅读思考数十遍,方品出些味道。故本篇会对原文代码一个最简化的提取,再进行分析。

  强文提到的用迭代器在C#中进行异步编程,最核心的思想就是通过yield return产生一个可IEnumerable<Asyncable>的集合,取出第一个Asyncable,执行Async方法并立即返回,将控制权交给上层调用方,同时Async方法在完成后会回调MoveNext继续遍历之前集合。(本篇提到的最底层的Async方法均是以Begin/End来实现,之前的随笔也说过async/await只是语法糖)

  大概画了个草图意思一下,花了很久时间也没能画得特别清晰明了,请见谅,有好的想法还望赐教。

学习迭代器实现C#异步编程——仿async/await(一)

  接下来我们根据具体的代码来分析,首先看一下Main方法。第一行是一个异步方法,我们期待的结果是第二行的输出在异步方法结束前执行。

        static void Main(string[] args)
{
AsyncMethod("http://www.microsoft.com").Execute(); Console.WriteLine("我先执行,不等你了"); Console.ReadLine();
}

  AsyncMethod方法返回了一个IEnumerable<IAsync>的集合,这里需要注意的是AsyncMethod方法的返回值其实是一个类似状态机的类对象,这个对象本身不会执行内部的代码语句,我们需要一个Execute方法来开始遍历运行这个集合里的代码语句。而AsyncMethod方法里的语句又根据yield return的个数来划分成块,每一次MoveNext方法其实是执行一块的代码。也就是说存在多少个yield return,就会有多少次回调。

  

        static IEnumerable<IAsync> AsyncMethod(string url)
{
WebRequest req = HttpWebRequest.Create(url);
Console.WriteLine("[{0}] starting", url); // asynchronously get the response from http server
Async<WebResponse> response = req.GetResponseAsync();
yield return response; Console.WriteLine("[{0}] got response", url);
Stream resp = response.Result.GetResponseStream(); foreach (var item in resp.ReadToEndAsync())
{
yield return item;
} Console.WriteLine("done");
}

  GetResponseAsync方法看上去很简单,就是封装了一下Beginxx/Endxxx。为什么说看上去简单,后面会提到。AsyncPrimitive就是我们会接触到最底层的Asyncable对象了,本篇一切异步都是基于它来实现的。

        public static Async<WebResponse> GetResponseAsync(this WebRequest req)
{
return new AsyncPrimitive<WebResponse>(req.BeginGetResponse, req.EndGetResponse);
}

  ReadToEndAsync和AsyncMethod方法一样,是建立在可返回Async<T>对象的已有Async方法的基础上。

        public static IEnumerable<IAsync> ReadToEndAsync(this Stream stream)
{
MemoryStream ms = new MemoryStream();
int read = -;
while (read != )
{
byte[] buffer = new byte[];
Async<int> count = stream.ReadAsync(buffer, , );
yield return count; Console.WriteLine("[{0}] got data: {1}", "url", count.Result);
ms.Write(buffer, , count.Result);
read = count.Result;
}
}

  ReadAsync同样是通过Beginxxx/Endxxx来实现异步。他和GetResponseAsync一样是建立在AsyncPrimitive对象上的Async方法。

        public static Async<int> ReadAsync(this Stream stream, byte[] buffer, int offset, int count)
{
return new AsyncPrimitive<int>(
(callback, st) => stream.BeginRead(buffer, offset, count, callback, st),
stream.EndRead);
}

  下面让我们重点来看一下AsyncPrimitive类,你可能已经发现这个类和Task<T>.Factory.FromAsync方法有点相似。可是如果让自己来实现,怕不是想象的那么简单。

    public class AsyncPrimitive<T> : Async<T>
{
Action<Action<T>> func; public AsyncPrimitive(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, T> end)
{
this.func = (cont) => begin(delegate(IAsyncResult res) { cont(end(res)); }, null);
} public override void ExecuteStep(Action cont)
{
func((res) =>
{
result = res;
completed = true;
cont();
});
}
}

  完全由委托、匿名方法和lambda表达式组成的类。在ExecuteStep被调用前,它不会做任何事情,仅仅是构建了一个Action<Action<T>>的委托。那么分析这个类才是本篇最主要的目的,但难点在于这货不是三言两语就能说清楚的,鄙人在晕乎了很久很久以后,将该类翻译如下,去除了所有的匿名方法和lambda表达式。看明白了这个类,就明白了通过迭代器是如何实现异步的。

    public class AsyncPrimitive<T> : Async<T>
{
Action<Action<T>> func; public AsyncPrimitive(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, T> end)
{
this.Begin = begin;
this.End = end; this.func = this.ActionActionT;
} Func<IAsyncResult, T> End { get; set; }
Func<AsyncCallback, object, IAsyncResult> Begin { get; set; }
Action<T> RunInCallback { get; set; }
Action ActionOuter {get;set;} private void Callback(IAsyncResult ar)
{
this.RunInCallback(this.End(ar));
} private void ActionActionT(Action<T> cont)
{
this.RunInCallback = cont;
this.Begin(this.Callback, null);
} private void ActionT(T res)
{
this.result = res;
this.completed = true;
this.ActionOuter();
} public override void ExecuteStep(Action cont)
{
this.ActionOuter = cont;
this.func(this.ActionT);
}
}

  直观的就可以感觉到lambda帮助我们省略了多少代码,在简洁的同时,也增加了些许理解的难度。代码就是最好的注释,我实在没信心去用文字描述这个类如何工作。

  最后补充Execute方法,这个方法真正的开始执行Async方法,大体思路就是遍历集合,但不是通过while循环,而是通过callback来执行下一个MoveNext。

        public static void Execute(this IEnumerable<IAsync> async)
{
AsyncExtensions.Run(async.GetEnumerator());
} internal static void Run(IEnumerator<IAsync> en)
{
if (!en.MoveNext()) return;
en.Current.ExecuteStep
(() => AsyncExtensions.Run(en));
}

  附上可运行的工程供调试用。原文链接开头已给出,原文中也给出了原文代码的下载。推荐都下载比对着看,可能会更有帮助。

  本篇也是初写的时候信心满满,不知道被各位吐槽后会是怎样一副情景……之后还应该还会有第二篇,也许是明天,也许是明年……

代码下载

上一篇:AspectJ的基本使用


下一篇:hive学习笔记——表的基本的操作