线程是操作系统能够进行运算调度的最小单位,操作系统线程进一步被封装成托管的Thread对象,手工创建并管理Thread对象已经成为了所能做到的对线程最细粒度的控制了。后来我们有了ThreadPool,可以更加方便地以池化的方式来使用线程。最后,Task诞生,它结合async/await关键字给与我们完美异步编程模式。但这一切让我们的编程体验越来越好,但是离线程的本质越来越远。被系列文章从“执行上下文传播”这个令开发者相对熟悉的角度来聊聊重新认识我们似乎已经很熟悉的主题。
目录
一、ThreadStatic字段或者ThreadLocal<T>对象
二、CallContext
三、支持跨线程传递吗?
四、IllogicalCallContext和LogicalCallContext
五、AsyncLocal<T>
一、ThreadStatic字段或者ThreadLocal<T>对象
本篇文章旨在解决一个问题:对于一个由多个方法组成的调用链,数据如何在上下游方法之间传递。我想很多人首先想到的就是通过方法的参数进行传递,但是作为方法签名重要组成部分的参数列表代表一种“契约”,往往是不能轻易更改的。既然不能通过参数直接进行传递,那么我们需要一个“共享”的数据容器,上游方法将需要传递的数据放到这个容器中,下游方法在使用的时候从该容器中将所需的数据提取出来。
那么这个共享的容器可以是一个静态字段,当然不行, 因为类型的静态字段类似于一个单例对象,它会被多个并发执行的调用链共享。虽然普通的静态字段不行,但是标注了ThreadStaticAttribute特性的静态字段则可以,因为这样的字段是线程独享的。为了方便演示,我们定义了如下一个CallStackContext类型来表示基于某个调用链的上下文,这是一个字典,用于存放任何需要传递的数据。自增的TraceId字段代码当前调用链的唯一标识。当前的CallStackContext上下文通过静态属性Current获取,可以看出它返回标注了ThreadStaticAttribute特性的静态字段_current。
public class CallStackContext : Dictionary<string, object> { [ThreadStatic] private static CallStackContext _current; private static int _traceId = 0; public static CallStackContext Current { get => _current; set => _current = value; } public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
我们通过如下这个CallStack对象创建一个“逻辑”上的调用链。在初始化的时候,CallStack会创建一个CallStackContext对象并将其放进CallContext对象并对静态字段_current进行复制。该字段会在Dispose方法中被置空,此时标志逻辑调用链生命周期的终止。
public class CallStack : IDisposable { public CallStack() => CallStackContext.Current = new CallStackContext(); public void Dispose() => CallStackContext.Current = null; }
我们通过如下的程序来演示针对CallStack和CallStackContext的使用。如代码片段所示,我们利用对象池并发调用Call方法。Call方法内部会依次调用Foo、Bar和Baz三个方法,需要传递的数据体现为一个Guid,我们将当存放在当前CallStackContext中。整个方法Call方法的操作均在创建Callback的using block中执行。
class Program { static void Main() { for (int i = 0; i < 5; i++) { ThreadPool.QueueUserWorkItem(_ => Call()); } Console.Read(); } static void Call() { using (new CallStack()) { CallStackContext.Current["argument"] = Guid.NewGuid(); Foo(); Bar(); Baz(); } } static void Foo() => Trace(); static void Bar() => Trace(); static void Baz() => Trace(); static void Trace([CallerMemberName] string methodName = null) { var threadId = Thread.CurrentThread.ManagedThreadId; var traceId = CallStackContext.Current?.TraceId; var argument = CallStackContext.Current?["argument"]; Console.WriteLine($"Thread: {threadId}; TraceId: {traceId}; Method: {methodName}; Argument:{argument}"); } }
为了验证三个方法获取的数据是否正确,我们让它们调用同一个Trace方法,该方法会在控制台上打印出当前线程ID、调用链标识(TraceId)、方法名和获取到的数据。如下所示的是该演示程序执行后的结果,可以看出置于CallContext中的CallStackContext对象帮助我们很好地完成了针对调用链的数据传递。
既然我们可以使用ThreadStatic静态字段,自然也可以使用ThreadLocal<T>对象来代替。如果希望时候后者,我们只需要将CallStackContext改写成如下的形式即可。
public class CallStackContext : Dictionary<string, object> { private static ThreadLocal<CallStackContext> _current = new ThreadLocal<CallStackContext>(); private static int _traceId = 0; public static CallStackContext Current { get => _current.Value; set => _current.Value = value; } public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
二、CallContext
除使用ThreadStatic字段来传递调用链数据之外,我们还可以使用CallContext。顾名思义,CallContext是专门为调用链创建的上下文,我们首先利用它来实现基于调用链的数据传递。如果采用这种解决方案,上述的CallStack和CallStackContext类型可以改写成如下的形式。如代码片段所示,当前的CallStackContext上下文通过静态属性Current获取,可以看出它是通过调用CallContext的静态方法GetData提取的,传入的类型名称作为存放“插槽”的名称。在初始化的时候,CallStack会创建一个CallStackContext对象并将其放进CallContext对应存储插槽中作为当前上下文,该插槽会在Dispose方法中被释放
public class CallStackContext: Dictionary<string, object> { private static int _traceId = 0; public static CallStackContext Current => CallContext.GetData(nameof(CallStackContext)) as CallStackContext; public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
public class CallStack : IDisposable { public CallStack() => CallContext.SetData(nameof(CallStackContext), new CallStackContext()); public void Dispose() => CallContext.FreeNamedDataSlot(nameof(CallStackContext)); }
三、支持跨线程传递吗?
对于上面演示的实例来说,调用链中的三个方法(Foo、Bar和Baz)均是在同一个线程中执行的,如果出现了跨线程调用,CallContext是否还能帮助我们实现上下文的快线程传递吗?为了验证CallContext跨线程传递的能力,我们将Call方法改写成如下的形式:Call方法直接调用Foo方法,但是Foo方法针对Bar方法的调用,以及Bar方法针对Baz方法的调用均在一个新创建的线程中进行的。
static void Call() { using (new CallStack()) { CallStackContext.Current["argument"] = Guid.NewGuid(); Foo(); } } static void Foo() { Trace(); new Thread(Bar).Start(); } static void Bar() { Trace(); new Thread(Baz).Start(); } static void Baz() => Trace();
再次执行我们我们的程序,不论是采用基于ThreadStatic静态字段,还是采用ThreadLocal<T>对象或者CallContext的解决方法,均会得到如下所示的输出结果。可以看出设置的数据只能在Foo方法中获取到,但是并没有自动传递到异步执行的Bar和Baz方法中。
四、IllogicalCallContext和LogicalCallContext
其实CallContext设置的上下文对象分为IllogicalCallContext和LogicalCallContext两种类型,调用SetData设置的是IllogicalCallContext,它并不具有跨线程传播的能力。如果希望在进行异步调用的时候自动传递到目标线程,必须调用CallContext的LogicalSetData方法设置为LogicalCallContext。所以我们应该将CallStack类型进行如下的改写。
public class CallStack : IDisposable { public CallStack() => CallContext.LogicalSetData(nameof(CallStackContext), new CallStackContext()); public void Dispose() => CallContext.FreeNamedDataSlot(nameof(CallStackContext)); }
与之相对,获取LogicalCallContext对象的方法也得换成LogicalGetData,为此我们将CallStackContext改写成如下的形式。
public class CallStackContext: Dictionary<string, object> { private static int _traceId = 0; public static CallStackContext Current => CallContext.LogicalGetData(nameof(CallStackContext)) as CallStackContext; public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
再次执行我们程序,依然能够得到希望的结果。
除了将设置和提取当前CallStackContext的方式进行修改(GetData=>LogicalGet; SetData=>LogicalSetData)之外,我们还有另一个解决方案,那就是让放存放在CallContext存储槽的数据类型实现ILogicalThreadAffinative接口。该接口没有定义任何成员,实现类型对应的对象将自动视为LogicalCallContext。对于我们的演示实例来说,我们只需要让CallStackContext实现该接口就可以了。
public class CallStackContext: Dictionary<string, object>, ILogicalThreadAffinative { private static int _traceId = 0; public static CallStackContext Current => CallContext.GetData(nameof(CallStackContext)) as CallStackContext; public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
五、AsyncLocal<T>
CallContext并没有被.NET Core继承下来。也就是,只有.NET Framework才提供针对CallContext的支持,.因为我们有更好的选择,那就是AsyncLocal<T>。如果使用AsyncLocal<T>作为存放调用链上下文的容器,我们的
public class CallStackContext: Dictionary<string, object>, ILogicalThreadAffinative { internal static readonly AsyncLocal<CallStackContext> _contextAccessor = new AsyncLocal<CallStackContext>(); private static int _traceId = 0; public static CallStackContext Current => _contextAccessor.Value; public long TraceId { get; } = Interlocked.Increment(ref _traceId); } public class CallStack : IDisposable { public CallStack() => CallStackContext._contextAccessor.Value = new CallStackContext(); public void Dispose() => CallStackContext._contextAccessor.Value = null; }
既然命名为AsyncLocal<T>,自然是支持异步调用。它不仅支持上面演示的直接创建线程的方式,最主要的是支持我们熟悉的await的方式(如下所示)。
class Program { static async Task Main(string[] args) { for (int i = 0; i < 5; i++) { ThreadPool.QueueUserWorkItem(_ => Call()); } Console.Read(); Console.Read(); async Task Call() { using (new CallStack()) { CallStackContext.Current["argument"] = Guid.NewGuid(); await FooAsync(); await BarAsync(); await BazAsync(); } } } static Task FooAsync() => Task.Run(() => Trace()); static Task BarAsync() => Task.Run(() => Trace()); static Task BazAsync() => Task.Run(() => Trace()); static void Trace([CallerMemberName] string methodName = null) { var threadId = Thread.CurrentThread.ManagedThreadId; var traceId = CallStackContext.Current?.TraceId; var argument = CallStackContext.Current?["argument"]; Console.WriteLine($"Thread: {threadId}; TraceId: {traceId}; Method: {methodName}; Argument:{argument}"); } }