C#(99):多线程锁:Mutex互斥体,Semaphore信号量,Monitor监视器,lock,原子操作InterLocked
目录
一、Mutex类
“mutex”是术语“互相排斥(mutually exclusive)”的简写形式,也就是互斥量。互斥量跟临界区中提到的Monitor很相似,只有拥有互斥对象的线程才具有访问资源的权限,由于互斥对象只有一个,因此就决定了任何情况下此共享资源都不会同时被多个线程所访问。当前占据资源的线程在任务处理完后应将拥有的互斥对象交出,以便其他线程在获得后得以访问资源。互斥量比临界区复杂,因为使用互斥不仅仅能够在同一应用程序不同线程中实现资源的安全共享,而且可以在不同应用程序的线程之间实现对资源的安全共享。.Net中mutex由Mutex类来表示。
二、Mutex的用途
Mutex并不适合于有相互消息通知的同步;另一方面局部Mutex应该被Monitor/lock所取代;而跨应用程序的、相互消息通知的同步由EventWaiteHandle/AutoResetEvent/ManualResetEvent承担更合适。所以,Mutex在.net中应用的场景似乎不多。不过,Mutex有个最常见的用途:用于控制一个应用程序只能有一个实例运行。系统依靠这个name属性来标识唯一的Mutex。
private static Mutex mutex = null; //设为Static成员,是为了在整个程序生命周期内持有Mutex static void Main() { bool firstInstance; mutex = new Mutex(true, @"Global\MutexSampleApp", out firstInstance); try { if (firstInstance) { Console.WriteLine("我们是第一个实例!"); } else { Console.WriteLine("警告,已有实例运行!"); return; } } finally { //只有第一个实例获得控制权,因此只有在这种情况下才需要ReleaseMutex,否则会引发异常。 if (firstInstance) { mutex.ReleaseMutex(); } mutex.Close(); mutex = null; } }
三、Semaphore信号量
1、简介
Semaphore是操作系统中用于控制线程同步互斥的信号量。在编写多线程的程序时,可以使用Semaphore信号量来协调多线程并行,使各个线程能够合理地共享资源,保证程序正确运行。
2、初始化
初始化Semaphore可当做开启了一个线程池,initialCount代表剩余空位,maximumCount代表最大容量。示例如下,当前空位为0,最大容量为1:
Semaphore sem = new Semaphore(0, 1);
3、WaitOne()和Release()
Semaphore常用的方法有两个WaitOne()和Release()。
使用WaitOne()方法相当于等待出现退出的线程,而使用Release()方法为让一个线程退出。
假设initialCount和maximumCount都为5,开始的时候线程池有5个空位置,且总共只有5个位置,当需要并行的线程数量超过5个时,首先使用WaitOne()方法等待,发现有空位就依次进去,每进去一个空位减1,直到进去5个线程之后,空位(initialCount)为0,这时候后面的线程就一直等待,直到有线程调用了Release()方法,主动退出线程池,空位加1,在等待的线程才能继续进入线程池。
下面的代码示例创建一个信号量, 其最大计数为 3, 初始计数为零。 该示例启动五个线程, 这会阻止等待信号量。 主线程使用Release(Int32)方法重载将信号量计数增加到其最大值, 从而允许三个线程进入信号量。 每个线程使用Thread.Sleep方法等待一秒, 以模拟工作, 然后Release()调用方法重载以释放信号量。 每次释放信号灯时, 都将显示以前的信号量计数。 控制台消息跟踪信号量使用。 每个线程的模拟工作时间间隔略有增加, 使输出更易于读取。
private static Semaphore _pool; private static int _padding; public static void Main() { _pool = new Semaphore(0, 3); for (int i = 1; i <= 5; i++)//创建并启动五个线程。 { Thread t = new Thread(new ParameterizedThreadStart(Worker)); t.Start(i);// 启动线程,传递数字。 } Thread.Sleep(500); Console.WriteLine("Main thread calls Release(3)."); _pool.Release(3);//调用Release(3)会使信号量计数回其最大值,并允许等待的线程进入信号量,一次最多三个。 Console.WriteLine("Main thread exits."); } private static void Worker(object num) { Console.WriteLine("Thread {0} begins " + "and waits for the semaphore.", num); _pool.WaitOne(); //填充间隔,使输出更加有序。 int padding = Interlocked.Add(ref _padding, 100); Console.WriteLine("Thread {0} enters the semaphore.", num); Thread.Sleep(1000 + padding); Console.WriteLine("Thread {0} releases the semaphore.", num); Console.WriteLine("Thread {0} previous semaphore count: {1}", num, _pool.Release()); }
四、Monitor类
当多个线程公用一个对象时,也会出现和公用代码类似的问题,这就需要用到 System.Threading 中的 Monitor 类,我们可以称之为监视器,Monitor 提供了使线程共享资源的方案。
Monitor 类可以锁定一个对象,一个线程只有得到这把锁才可以对该对象进行操作。 对象锁机制保证了在可能引起混乱的情况下,一个时刻只有一个线程可以访问这个对象。Monitor 必须和一个具体的对象相关联。
下面代码说明了使用 Monitor 锁定一个对象的情形:
// 表示对象的先进先出集合 Queue oQueue = new Queue(); try { // 现在 oQueue 对象只能被当前线程操纵了 Monitor.Enter(oQueue); // do something...... } catch { } finally { // 释放锁 Monitor.Exit(oQueue); }
如上所示, 当一个线程调用 Monitor.Enter() 方法锁定一个对象时,这个对象就归它所有了,其它线程想要访问这个对象,只有等待它使用 Monitor.Exit() 方法释放锁。为了保证线程最终都能释放锁,你可以把 Monitor.Exit() 方法写在 try-catch-finally 结构中的 finally 代码块里。(lock 关键字就是这个步骤的语法糖)
任何一个被 Monitor 锁定的对象,内存中都保存着与它相关的一些信息:
- 现在持有锁的线程的引用
- 一个预备队列,队列中保存了已经准备好获取锁的线程
- 一个等待队列,队列中保存着当前正在等待这个对象状态改变的队列的引用
当拥有对象锁的线程准备释放锁时,它使用 Monitor.Pulse() 方法通知等待队列中的第一个线程,于是该线程被转移到预备队列中,当对象锁被释放时,在预备队列中的线程可以立即获得对象锁。
1、典型的生产者与消费者实例
下面是一个展示如何使用 lock 关键字和 Monitor 类来实现线程的同步和通讯的例子。在本例中,生产者线程和消费者线程是交替进行的,生产者写入一个数,消费者立即读取并且显示(注释中介绍了该程序的精要所在)。
/// <summary> /// 测试类 /// </summary> public class MonitorSample { public static void Main(String[] args) { // 一个标志位,如果是 0 表示程序没有出错,如果是 1 表明有错误发生 int result = 0; // 下面使用 cell 初始化 CellProd 和 CellCons 两个类,生产和消费次数均为 20 次 Cell cell = new Cell(); CellProd prod = new CellProd(cell, 20); CellCons cons = new CellCons(cell, 20); Thread producer = new Thread(new ThreadStart(prod.ThreadRun)); Thread consumer = new Thread(new ThreadStart(cons.ThreadRun)); // 生产者线程和消费者线程都已经被创建,但是没有开始执行 try { producer.Start(); consumer.Start(); producer.Join(); consumer.Join();//等待这两个线程结束才往下执行 Console.ReadLine(); } catch (ThreadStateException e) { // 当线程因为所处状态的原因而不能执行被请求的操作 Console.WriteLine(e); result = 1; } catch (ThreadInterruptedException e) { // 当线程在等待状态的时候中止 Console.WriteLine(e); result = 1; } // 尽管 Main() 函数没有返回值,但下面这条语句可以向父进程返回执行结果 Environment.ExitCode = result; } } /// <summary> /// 生产者 /// </summary> public class CellProd { /// <summary> /// 被操作的 Cell 对象 /// </summary> Cell cell; /// <summary> /// 生产者生产次数,初始化为 1 /// </summary> int quantity = 1; public CellProd(Cell box, int request) { cell = box; quantity = request; } public void ThreadRun() { for (int looper = 1; looper <= quantity; looper++) { // 生产者向操作对象写入信息 cell.WriteToCell(looper); } } } /// <summary> /// 消费者 /// </summary> public class CellCons { Cell cell; int quantity = 1; public CellCons(Cell box, int request) { cell = box; quantity = request; } public void ThreadRun() { int valReturned; for (int looper = 1; looper <= quantity; looper++) { valReturned = cell.ReadFromCell(); // 消费者从操作对象中读取信息 } } } /// <summary> /// 被操作的对象 /// </summary> public class Cell { /// <summary> /// Cell 对象里的内容 /// </summary> int cellContents; /// <summary> /// 状态标志: 为 true 时可以读取,为 false 则正在写入 /// </summary> bool readerFlag = false; public int ReadFromCell() { lock (this) { if (!readerFlag) { try { // 等待 WriteToCell 方法中调用 Monitor.Pulse()方法 Monitor.Wait(this); } catch (SynchronizationLockException e) { Console.WriteLine(e); } catch (ThreadInterruptedException e) { Console.WriteLine(e); } } // 开始消费行为 Console.WriteLine("Consume: {0}", cellContents); Console.WriteLine(); // 重置 readerFlag 标志,表示消费行为已经完成 readerFlag = false; Monitor.Pulse(this);// 通知 WriteToCell()方法(该方法在另外一个线程中执行,等待中) } return cellContents; } public void WriteToCell(int n) { lock (this) { if (readerFlag) { try { Monitor.Wait(this); } catch (SynchronizationLockException e) { // 当同步方法(指Monitor类除Enter之外的方法)在非同步的代码区被调用 Console.WriteLine(e); } catch (ThreadInterruptedException e) { // 当线程在等待状态的时候中止 Console.WriteLine(e); } } cellContents = n; Console.WriteLine("Produce: {0}", cellContents); readerFlag = true; Monitor.Pulse(this); // 通知另外一个线程中正在等待的 ReadFromCell() 方法 } } }
五、Lock
C# 提供了一个关键字 lock,它可以把一段代码定义为互斥段(critical section),互斥段在一个时刻内只允许一个线程进入执行,而其他线程必须等待。
在C#中,关键字 lock 的定义:lock(expression)
{statement_block}
expression 代表你希望跟踪的对象,通常是对象引用。如果你想保护一个类的实例,你可以使用 this;如果你想保护一个静态变量(如互斥代码段在一个静态方法内部),一般使用锁定一个私有的static 成员变量就可以了。而 statement_block 就是互斥段的代码,这段代码在一个时刻内只可能被一个线程执行。
NET在一些集合类中(比如ArrayList,HashTable,Queue,Stack)已经提供了一个供lock使用的对象SyncRoot,用Reflector工具查看了SyncRoot属性的代码,在Array中,该属性只有一句话:return this,这样和lock array的当前实例是一样的。
Lock 语法简单易用。其本质是针对 Monitor.Enter() 和 Monitor.Exit() 的封装,是一个语法糖!
static internal Thread[] threads = new Thread[10]; public static void Main() { Account acc = new Account(100); for (int i = 0; i < 10; i++) { Thread t = new Thread(new ThreadStart(acc.DoTransactions)); threads[i] = t; threads[i].Name = i.ToString(); threads[i].Start(); //10个线程同时启动 } } internal class Account { int balance;internal Account(int initial) { balance = initial; } internal void DoTransactions() { for (int i = 0; i < 100; i++) { int amount = new Random().Next(-100, 100); lock (this) { Console.WriteLine("当前线程:" + Thread.CurrentThread.Name + " 余额:" + balance.ToString() + " 数量:" + amount); if (balance >= amount) { Thread.Sleep(5); balance = balance - amount; } else { Console.WriteLine("当前线程:" + Thread.CurrentThread.Name + " 不能交易,余额不足:" + balance.ToString()); return; } } } } }
六、InterLocked(相当于lock,对整数)
在C#中,赋值和简单的数字运算都不是原子型操作。 在多线程环境下,我们可以通过使用System.Threading.Interlocked类来实现原子型操作当个数据,使用它比使用Monitor类跟简单。
使用.NET提供的Interlocked类可以对一些数据进行原子操作,看起来似乎跟lock锁一样,但它并不是lock锁,它的原子操作是基于CPU本身的,非阻塞的,所以要比lock的效率高。
1、Interlocked类主要方法
- Read() 安全读取数值,相等于int a=b
- Add() 安全相加一个数值,相当于 a = a + 3
- Increment() 安全递加1,相当于 i++。返回递增后的值。
- Decrement()安全递减1,相当于 i--Exchange() 安全交换数据,返回递减后的值。
- CompareExchange() 安全比较两个值是不是相等。如果相等,将第三个值与其中一个值交换。
2、实例
例一:
void Main() { TestIncrementUnSafe(); TestIncrementSafe(); } private int value1 = 0; public void TestIncrementUnSafe() { for (int i = 0; i < 5; i++) { Thread t = new Thread(IncrementValue1); t.Name = "t1 " + i; t.Start(); } Thread.Sleep(2000); //value maybe 500000 Console.WriteLine("value1 = " + value1); } private void IncrementValue1() { for (int i = 0; i < 1000000; i++) { value1++; } } private int value2 = 0; public void TestIncrementSafe() { for (int i = 0; i < 5; i++) { Thread t = new Thread(IncrementValue2); t.Name = "t2 " + i; t.Start(); } Thread.Sleep(2000); //value should be 500000 Console.WriteLine("value2 = " + value2); } private void IncrementValue2() { for (int i = 0; i < 1000000; i++) { Interlocked.Increment(ref value2); } }
运行结果
value1 = 4612592
value2 = 5000000
例二、
void Main() { TestExchangeSafe(); TestCompareExchangeSafe(); } private int value3 = 0; public void TestExchangeSafe() { for (int i = 0; i < 5; i++) { Thread t = new Thread(ExchangeValue3); t.Name = "t2 " + i; t.Start(); } Thread.Sleep(2000); //value should be 83 Console.WriteLine("value3 = " + value3); } private void ExchangeValue3() { Interlocked.Exchange(ref value3, 83); } private int value4 = 0; public void TestCompareExchangeSafe() { for (int i = 0; i < 5; i++) { Thread t = new Thread(ExchangeValue3); t.Name = "t2 " + i; t.Start(); } Thread.Sleep(2000); //value should be 99 or 0 Console.WriteLine("value4 = " + value4); } private void ExchangeValue4() { //if value4=0, set value4=99 Interlocked.CompareExchange(ref value4, 99, 0); }
运行结果:
value3 = 83
value4 = 0