在线程安全1中,我介绍了线程同步的意义和一种实现线程同步的方法:volatile。volatile关键字属于原子操作的一种,若对一个关键字使用volatile,很多时候会显得很“浪费”,因为只有在并发访问的情况下才需要“易变”读写,单线程访问时并不需要。在命名空间System.Threading命名空间中提供了InterLock类,该类中提供了一些原子方法。本文来介绍如何使用这些方法。
- 互锁
在抢占式系统中,一个线程在执行到任何阶段都有可能被其他线程“打断”,原子操作能够保证该操作不被其他线程打断,其他线程的“打断”只可能发生在该操作的之前或之后。InterLock类中的方法就是”原子“式的,最常用的方法有:
public static class InterLock{
// return (++location)
public static int Increment(ref int location);
// return(--location)
public static int Decrement(ref int location);
// return(location += value)
//注意,也可能是负数,从而实现减法运算。
public static int Add(ref int location, int value)
// int old = location; location = value; return old;
public static int Exchange(ref int location, int value);
//old = location1;
//if(location1 = comparand) location1 = value;
//return old;
public static int CompareExchange(ref int location1,
int value, int comparand);
...
}
《CLR via C#》的作者在书中说他喜欢Interlocked中的方法,因为它不但很快,而且不会阻塞线程。为了验证InterLock真的很快,我们对变量进行一百万次的写,与Volatile.Write()来进行对比,看看是否真的快。
static void Main(string[] args){
TestInterLock();
Console.ReadLine();
}
private static volatile int v = ;
private static int n = ;
static void TestInterLock(){
Stopwatch sw = Stopwatch.StartNew();
for (int i = ; i < ; i++)
v++;
Console.WriteLine("volatile write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
sw = Stopwatch.StartNew();
for (int i = ; i < ; i++)
Interlocked.Increment(ref n);
Console.WriteLine("InterLock write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
n = ;
sw = Stopwatch.StartNew();
for (int i = ; i < ; i++)
n++;
Console.WriteLine("n++ write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
}
运行结果如下:
volatile write 1000000 times takes:2
InterLock write 1000000 times takes:8
n++ write 1000000 times takes:2
我运行了好几次,结果会有些出入,但是大部分的结果都是volatile的写入速度和原生的n++的速度是一样的。InterLock也确实如Jeffrey Richter所说,很快,只是没有volatile关键字修饰的变量的读写快。这是在非并发情况下,下面来看一下并发情况下是否还是很快。
static void TestInterLock1(){
Stopwatch sw = Stopwatch.StartNew();
Task[] t2 = new[] { new Task(() => {
for (int i = ; i < ; i++)
Interlocked.Increment(ref n);
}), new Task(() => {
for (int i = ; i < ; i++)
Interlocked.Increment(ref n);}) };
Task[] t1 = new[] { new Task(() => {
for (int i = ; i < ; i++)
v++;
}), new Task(() => {
for (int i = ; i < ; i++)
v++;}) };
Task.WhenAll(t1)
.ContinueWith(t => Console.WriteLine("volatile write 2000000 times takes:{0}", sw.ElapsedMilliseconds));
Task.WhenAll(t2)
.ContinueWith(t => Console.WriteLine("InterLock write 2000000 times takes:{0}", sw.ElapsedMilliseconds));
t2[].Start();
t1[].Start();
t1[].Start();
t2[].Start();
}
先看运行结果,
volatile write 2000000 times takes:94
InterLock write 2000000 times takes:101
多次运行,运行时间会有不同,但是在并发情况下,volatile的写入和InterLock的写入速度几乎相同。上述代码写的如此丑陋,而不是直接写Task.Run(),是为了保证初始化部分都运行完成后,再Start(),且两个任务的先后顺序进行了打乱,最大限度减少误差。可以看到并发情况下,volatile和InterLock几乎一样,且在InterLock中的方法要比Volatile的功能要全,但是在串行时,Volatile的性能要比InterLock要好。结论是,若只对变量读写,没有替换或者其他复杂操作时,可以使用volatile关键字,但是一些复杂操作,需要原子操作时,就得使用InterLock中的方法了,如果使用volatile关键字修饰的变量来进行交换的话,很难保证原子性,只有引入锁才能保证线程同步。且InterLock中提供了几个重载方法,能够接受object类型,还有泛型版本。
可以利用InterLock来实现一个简单的自旋锁,代码如下:
public struct SimpleSpinLock{
private int m_Lock; public void Enter(){
while (true){
if (Interlocked.Exchange(ref m_Lock, ) == ) return;
//此处可以添加黑科技
}
} public void Leave(){
Volatile.Write(ref m_Lock, );
}
} //下面是如何使用SimpleSpinLock的例子
public class Simple{
private SimpleSpinLock m_lock = new SimpleSpinLock(); public void AccessResource(){
m_lock.Enter();
//执行某些程序,只有一个线程可以进入这里
m_lock.Leave();
}
}
这个简单的自旋锁在一个线程调用Enter()时,其他线程在调用m_lock.Enter()方法时,if (Interlocked.Exchange(ref m_lock, 1) == 0)会失败,因为该方法会将m_lock和1交换,并返回旧值,在已有线程调用m_lock.Enter()时,m_lock的旧值是1,因此该方法会在whle(true)处自旋,不断尝试获得锁。该锁的问题是,该线程没有被阻塞(挂起),而是一直在占用CPU资源,其他需要CPU资源的线程无法运行(可以在while内,我加注释的地方,加入”黑科技“来尝试解决此问题。其思路是在线程自旋的过程中,立刻交出CPU资源,可通过调用Thread.Sleep(0)或者Thread.Yield()来实现。尝试获得锁的线程交出时间片,这样当前获得锁的线程能够有更多的资源来运行程序,从而运行结束并交出锁,具体细节在这里不展开)。因此,自旋锁只适合那些运行非常快的方法。
- Interlocked Anything 模式
Interlocked中全部是原子性操作,那是否提供了一个方法,该方法可以接受一个委托,保证该委托在运行时是原子的。答案是没有,但是可以利用Interlocked.CompareExchange来自己实现一个。我们先来看一下利用CompareExchange来实现原子性的Maximum方法。
public static int Maximum(ref int target, int value){
int currentValue = target, startValue, desireValue;
do{
startValue = currentValue;
//可以在此处添加任何想要保证“原子性”的操作,此处是求最大值。
desireValue = Math.Max(startValue, value);
//注意,此处有可能被其他线程抢占,也有可能target的值被修改,因此if()语句会出问题,要使用Interlocked的方法。
//if (startValue = target) target = desireValue;
currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue);
}
//若在此时target已被修改,则重新计算最大值
while (startValue != currentValue);
return currentValue;
}
此方法的思路是:从CPU将target的值读到寄存器中,到计算最大值结束,期间的任何时间target都有可能被其他线程修改。因此保证原子性就被转换成保证计算最大值时,target的值没有变过,如果变过,就重新计算。因此,在最开始的时候,startValue=currentValue,currentValue是开始计算时target的值。然后求得最大值,并保存到desireValue中。注意,此时target有可能被修改,因此调用CompareExchange方法,该方法会将target与startValue比较,如果此时两值相等,那相当于我之前说的,target从开始到最后没有改变,那么这个最大值是准确的,并将target的旧值付给currentValue,最后如果startValue==currentValue,则计算完成,否则继续循环。
《CLR via C#》的作者Jeff很喜欢上面的方法,他在实际开发中,都是使用上面的方法,并对其进行了包装,使之能够支持Interlocked Anything。其原理就是在desireValue=Math.Max()处替换成其他方法,只要在返回结果时保证旧值和最开始读取的值一致就可以。我们来看一下他的封装:
delegate int Morpher<TResult, in TArgument>(int startValue, TArgument argument, out TResult result); static TResult Morph<TResult, TArgumen>(ref int target, TArgumen argumen, Morpher<TResult, TArgumen> morpher){
TResult mophorResult;
int currentValue = target, desireValue, startValue;
do{
startValue = currentValue;
desireValue = morpher(startValue, argumen, out mophorResult);
currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue);
}
while (currentValue != startValue);
return mophorResult;
}
说实话,我并不能非常好了理解这个封装,并不是不能理解做法,而是不能确定此方法到底能不能实现效果,那我们来测试一下。测试的基本思路是对一个变量执行1000次的result+=10。分别是不带线程同步的和利用Morph方法对result+=10的方法进行互锁,保证其原子性。我省去了Morpher和Morph的声明部分。之所以要在DelayAdd和DelayAdd1方法中调用ThreadSleep(20),是为了模拟当在运行较长的方法时,Morph方法是否还能够保证该方法运行的原子性。
static void Main(string[] args){
Test(new TestAction(Add), "Add");
Test(new TestAction(MorphAdd), "MorphAdd");
Console.ReadLine();
}
//DelayAdd1的变种,使之能够符合Morpher的签名
static int DelayAdd(int startValue, int argument, out int result){
Thread.Sleep();
result = startValue + argument;
return result;
} static void DelayAdd1(int argument, ref int result){
Thread.Sleep();
result += argument;
}
//测试的不具有线程同步的方法。
static void Add(ref int result){
DelayAdd1(, ref result);
}
//具有线程同步的方法。
static void MorphAdd(ref int result){
Morph(ref result, , new Morpher<int, int>(DelayAdd));
}
//要测试的委托签名
delegate void TestAction(ref int result);
//公共测试方法
static void Test(TestAction action, string actionName){
int result = ;
var tList = new Task[];
for (int i = ; i < ; i++)
tList[i] = Task.Run(() =>
{
action(ref result);
});
Task.WhenAll(tList).GetAwaiter().OnCompleted(
() => Console.WriteLine("{0}, Result is {1}", actionName, result));
}
运行,得到的结果是:
Add, Result is 8440
MorphAdd, Result is 10000
运行1000次result += 10,普通的Add不能够得到正确结果,但是MorphAdd可以。这是因为在1000次的Add中,某几个Add是同时调用的,result+=10在同一时间调用了多次,因此有156次的Add因为并行而被吞噬了。值得注意的是,MorphAdd方法因为需要线程同步,因此执行时间要慢很多。但是这些付出是值得的,因为这保证了结果的正确。
上述例子证明了Morph方法能够保证委托的原子性,且该方法既不会阻塞线程也不会长时间的自旋,推荐大家在实际中使用该方法。
本文中,我先介绍了Interlocked类中较常用的方法,以及Interlocked.Increment()方法与volatile关键字的对比,结论是虽然将变量设置为所有读写都是“易变的”看起来很浪费,但是该关键字能够保证在单线程时几乎没有性能损失,大部分情况下和原生的读写是一样的速度,且volatile比Interlocked类中提供的写要快2-4倍,但是在并发状态下其性能和volatile关键字是没有差别的。之后我介绍了用Interlocked类中的方法来实现简单的自旋锁,该锁的优点是在非竟态情况下非常快,但是在竟态情况下,未获得锁的线程会一直处于自旋状态,白白浪费CPU。最后介绍了《CLR via C#》书中提到的Interlocked Anything的方法(文中其他的知识点大多也是提取自《CLR via C#》),并测试了该方法确实可以保证委托的原子性,且不会阻塞线程,没有锁,不会造成死锁。至此线程同步中互锁构造就讲完了,后面我会给大家介绍内核构造的信号量和其他锁。