最近代码里面写了一个缓存,有了一个简单的想法:
通常我们会有一个Cache(例如:MemoryCache)去缓存一些对象,那么当这个缓存项过期时,可能同时有很多线程都需要这个缓存项,那么就会有并发的去加载的情况发生,当然,如果这个加载时间并不长的话,那也没什么问题,但是如果加载过程本身比较慢,又比较消耗资源的话,恐怕就比较杯具了。
那么如果能让Cache不命中时,加载过程能串行,那么也有个问题,不通的缓存项在加载过程中其实不需要串行,这样整体的效率又会下降。
于是,我思考了一种既能每个资源串行加载,又能保证不同资源并行加载的方式。
简单的看就是:
这里用到了一个loader task,问题是这个loader task也需要线程安全,这可也不是一件轻松的事情,好在我们有TPL,Task<T>类除了Dispose方法在其它成员都是线程安全的,这自然也就包括了Result属性。
然后可能有人会有些疑问:
第一个问题,会不会用另一个线程去加载
答案是不确定,在线程池空闲的情况下,确实会用其他线程去加载。
第二个问题,在线程池忙碌的时候,会不会等其他任务,而导致更慢,或者严重的导致死锁
答案是不会,甚至说,这里用task,本质上是期待线程池处于一个忙碌的状态,这样可以防止加载过程跑到其他线程上去
至于什么,我们可以看下面这段代码:
1 ThreadPool.SetMinThreads(4, 100); 2 ThreadPool.SetMaxThreads(4, 100); 3 var dict = new Dictionary<string, Task<int>>(); 4 for (int i = 0; i < 8; i++) 5 { 6 ThreadPool.QueueUserWorkItem(_ => 7 { 8 Console.WriteLine("Thread:{0}, starting...", Thread.CurrentThread.ManagedThreadId); 9 Thread.Sleep(500); 10 bool ownTask = false; 11 Task<int> task; 12 try 13 { 14 lock (dict) 15 { 16 if (!dict.TryGetValue("foo", out task)) 17 { 18 ownTask = true; 19 task = Task.Factory.StartNew(() => 20 { 21 Thread.Sleep(100); 22 return Thread.CurrentThread.ManagedThreadId; 23 }); 24 dict["foo"] = task; 25 } 26 } 27 Console.WriteLine("Thread:{0}, own task:{1}, waiting result...", Thread.CurrentThread.ManagedThreadId, ownTask); 28 Console.WriteLine("Thread:{0}, own task:{1}, result:{2}", Thread.CurrentThread.ManagedThreadId, ownTask, task.Result); 29 } 30 finally 31 { 32 // add to cache 33 if (ownTask) 34 { 35 lock (dict) 36 { 37 dict.Remove("foo"); 38 } 39 } 40 } 41 }); 42 } 43 Thread.Sleep(2000); 44 lock (dict) 45 { 46 Console.WriteLine("dict count:{0}", dict.Count); 47 }
我们会发现即使线程池的线程全部被占满的情况下,这段代码也不会卡住,相反,会使用ownTask为true的那个线程来同步执行loader task,这是依赖于TPL中的任务窃取功能。
简单的说,这个功能就是:如果TPL发现需要等待一个任务的执行完成,并且这个任务并没有开始执行时,把这个任务从任务队列中窃取过来,同步执行。
到这里,本篇随笔也已经完成了。