多线程 基础(Multithreading)
一些基本的关于线程和与其相关的概念
1.系统资源管理器
管理进程 3.线程 System.Threading.Thread
也称控制点,由控制点进入Main函数,逐步执行程序 4.多线程
多个控制点同时执行 5.线程池 System.Threading.ThreadPool
可以手动创建一个线程(使用Thread),也可以通过线程池(ThreadPool)获取一个线程,两种方式都可以创建一个线程。线程池是分配线程的管理器,线程池假定所有的线程都应该是短时间运行的,这样它以此创建可控制数量的线程,以避免因过多分配线程造成资源消耗和过于漫长的把时间浪费在时间分片上。也即,如果执行的程序并不需要消耗大量的资源和时间,也不希望自己手动去管理线程,那么久不要使用Thread创建线程,而是通过从ThreadPool去获取由线程池为我们创建并分配好的线程 6.调度器 System.Threading.Task.TaskScheduler
调度器从线程池获取线程并负责对任务进行调度、重复使用和资源清理。创建线程时,调度器将从线程池中取出处于闲置状态的线程,但如果是长时间运行的线程,那么调度器不会从线程池中取线程,它会创建专用线程来处理这种需要长时间运行的工作。这样做的好处显而易见,如果长时间运行的工作也由线程池负责,那么大量内存资源和时间分片时切换的时间会被这种线程占尽致使其它短时间运行的线程可能得不到执行或执行效率下降。为此,可以在创建需要长时间运行的Task任务时指定TaskContinuationOptions.LongRunning选项以通知调度器开辟专用线程来维持和控制它 7.任务 System.Threading.Tasks
工作的内容,为了生成某些结果,任务运行在线程中,即线程是执行任务的对象。任务由Task表示,不同类型的任务则由Task的派生Task<T> 表示 8.进程 System.Diagnostics.Process
一个应用程序至少有一个进程,进程在一小块内存中保存着应用的资源,它是应用程序的实例。而一个进程里至少有一个线程,线程就是进程的执行路径,多个线程就表示一个进程有多条执行路径。重点是:在一个进程里多个线程共享着它们隶属的进程所在的内存单元,也即A线程操作的结果B线程也是可以访问,因为多线程的操作始终是在一个进程的内存中执行的,所以多线程并发时可能会导致代码的逻辑出现问题。多线程的意义在于它们可以在一个进程中同时执行任务,共享同一块内存,提高了完成任务的速度,节约了时间。多线程之间可以并发执行,也可以相互终结 9.同步
比如A线程执行完之后B线程才开始执行,也即B必须等待A完成处理后它才会执行任务 10.异步
A线程、B线程同时执行任务 11.时间分片
操作系统通过时间分片来模拟多线程的并行执行,它用极快的速度从A线程切换到B线程,每个线程在执行的那段时期被称为时间片段或量子,在进程里切换线程的执行称为上下文切换 12.并发
具备处理多个任务的能力,比如一个任务暂停后去处理另一个任务,另一个任务完成后继续处理前面暂停的任务,这看起来似乎是同时在处理多个任务,但这种快速切换线程达到看上去似乎是多个线程同时在执行的效果实际上是靠时间分片技术。除非cpu个数与线程数对等,否则算不上是并行,而是看起来像是并行行为实际上是交替执行任务的时间分片 13.并行
真正具备同时处理多个任务的能力,它是靠多个cpu(多核)运行多个线程从而达到真正的同时执行任务 14.串行
完全遵守一个任务一个任务的执行逻辑,没有时间分片 15.任务的原子性
在编程的世界里,原子是不可再分割的最小单位,原子性操作即不可分割的最小单位的操作,比如64位操作系统处理一个long类型(位)的变量赋值,这个操作就是原子性的。因为它可以一次性填充64位的二进制数据到栈上,属于一步完成,不会发生断裂。而假如给一个128位的decimal类型的变量赋值,这个操作会被分成两步,第一步填充64位二进制数到栈上,第二步再填充64位二进制数到栈,这个操作就不是原子性的。大多数操作都不是原子性的,比如一个线程调用某个方法,方法内部有多个表达式,即便每个表达式是原子性的,但它们组合在一起就可以说该方法是非原子性的操作 16.竞态条件
假设A和B同时在网上买票,票只有一张,但他们同时下单付款,请求发送到服务端,服务端开启两个线程处理这两个任务,但你根本无法知晓计算机在什么时候开始时间分片,也即有可能会发生这样的情况:当A和B同时付款后,突然切换到B线程,B成功买到票,而A失败,或者正好相反,这些结果是我们无法提前预测掌控的。为了防止非原子性的操作,C#提供Lock语句将多个针对同一目标的线程阻塞起来,然后队列执行,每处理一个线程就为其上锁,处理完成再解锁,接着再处理下一个线程的操作 17.什么时候使用多线程?
.程序需要同时并发/并行执行多个任务
.程序需要等待远程返回结果,同时本地代码还得继续执行
.解决延迟(当用户使用网络导入功能导入大量数据时,如果发生网络延迟,他可能想要点击取消或尝试其它操作)
直接操纵线程的API(System.Threading.Thread)
通过Thread类来表示在CRL中非托管的线程,这个对象代表了应用程序的控制点,通过给Thread类的构造函数传递一个委托即可,该委托用于代理执行任务。
namespace ConsoleApp1
{
class Program
{
public const int i = ;
public static void Dowork( )
{
for (int c = ; c < i; c++)
{
Console.Write( '+' );
}
}
static void Main( string[] args )
{
//Main就是一个线程入口(控制)点,所以它是一个线程
//在Main中创建了另一个新的线程
//可以将Main看成主线程
//ThreadStart start = Dowork;
//Thread thread = new Thread(start);
//或
Thread thread = new Thread( Dowork );
thread.Start( ); //开启新线程
thread.Join( );//阻止主线程执行直到本线程执行完毕再执行主线程
for (int h = ; h < i; h++)
{
Console.Write( '-' );
}
//或
//Thread thread = new Thread( ( ) =>
//{
// for (int c = 0; c < i; c++)
// {
// Console.Write( '+' );
// }
//} );
}
}
}
Thread类
//方法
Thread(ParameterizedThreadStart parameterizedThread | ThreadStart threadStart)
//创建线程实例,参数可以是两种委托中的任意一种,前者带一个object参数,后者无参。
Join(Int Millisecond | TimeSpan time)
//阻止其它线程的执行直到本线程(调用Join方法的线程)执行完毕,参数可以指定最多等待当前线程执行多少时间,过期则阻止变成失效
Abort()
//立即终止线程,如果线程没有执行完毕却粗暴的终止就有可能会引发异常 //属性
IsBackground
//设置或获取线程是否是后台线程,后台线程的意思是即使进程关闭,后台线程也不会退出直到它自己执行完毕,前台线程则相反,进程必须等待线程全部退出才能终止
Priority
//设置或获取线程优先级,优先级越高的线程,时间分片会优先考虑它。值为ThreadPriority枚举,可能的值:AboveNormal | BelowNormal | Highest | Lowest | Normal
IsAlive
//获取线程是否还活着
ThreadState
//获取线程的更多状态信息,返回一个ThreadState标志枚举 //静态方法
Thread.Sleep(int Millisecond | TimeSpan time)
//使正在执行的当前线程进入暂停,然后至少在指定时间内操作系统不为线程分配时间。即使参数设为0,线程也会暂停,然后经过0毫秒唤醒。但唤醒时间不保证,假设设为10000毫秒,则唤醒时间根本无法预料,我们唯一知道的只是线程会沉睡至少10000毫秒
//假设有一个A线程的异步调用正在工作,你寄希望于B线程等待A完成任务后再执行,你可能会调用Sleep方法让B线程沉睡。但此方法作为线程同步的滥用方法应予以摈弃,因为你不知道它什么时候可以被唤醒,此方法只能作为有意图的人工延迟被使用
Thread.SpinWait(int Millisecond | TimeSpan time)
//在CPU运转的周期内,如果在参数指定的时间内就进入内核队列等待(自旋),超过时间则执行线程,性能高于Sleep //静态属性
//指示线程等待参数指定的时间后再执行
Thread.CurrentThread.ManagedThreadId
//获取当前正在执行的线程的托管ID
利用线程池间接操纵线程
重复性的创建、启动一个新线程的开销很大,而且创建太多的线程会使cpu不停地在时间分片上切换,耗费大量时间在切换任务上。而将异步任务交由线程池对象,由线程池去创建、管理线程,就可以达到重复使用闲置线程的目的,线程池只分配一定份额的线程数,然后对它们进行统一调度,这样就可以节省存开销。线程池的缺点:1.假如池中的线程已经被占尽,那么其他异步任务将被阻塞直到完成任务的线程回到池中接受新任务。2.因为你不能直接使用Thread或Task操纵线程,这些都由线程池来管理,所以如果需要处理花费以天计数的耗时任务或需要同步的任务则线程池就不合适。
using System.Threading; namespace ConsoleApp1
{
class Program
{
public const int i=1000; public static void Dowork(object parameter)
{
for (int c = 0; c < i; c++)
{
Console.Write('+');
}
}
static void Main(string[] args)
{
ThreadPool.QueueUserWorkItem(Dowork, '+');
for (int h = 0; h < i; h++)
{
Console.Write('-');
}
}
}
}
使用TPL(Task Parallel Library)任务并行扩展库
.NET4.0提供了新的并行编程模型TPL,TPL也是利用线程池来管理线程,它向外界公开一个Task的类来表示异步任务。Task.Run(Action action)方法接收一个Action委托进行异步任务的执行,委托与任务(Task)的区别在于,控制流从调用者处进入委托,要等待委托执行完毕才会将控制权返回到调用者处,然后继续下一行代码的执行,而Task任务则是封装一个委托,控制流从调用者处进入Task后会立即将控制权返回到调用者处,这样多个任务得以并行执行且互不影响。
//三种方式创建异步Task,后两种直接开启任务的运行,第一种需要手动调用Start方法
//第一种
Task task = new Task(() => { TestRun.RunTask(); });
task.Start();
//第二种
Task task = Task.Run(() => { TestRun.RunTask(); });
//第三种
Task task = Task.Factory.StartNew(() => { TestRun.RunTask(); });
//Task.Run是Task.Factory.StartNew的简化版,但后者可以指定一些参数更精确的掌控任务的行为。
Task类
如果异步任务需要返回结果,则可以使用Task<T>,泛型类型参数T就是返回的结果类型。通过在主线程中轮询异步任务的IsCompleted来获得异步是否执行完毕以便获取执行的返回结果。
//属性
CurrentId
//获取异步任务的 ID,这对调试有帮助,一个线程上运行一个任务,线程有它的ID,同样的,任务也有它的ID //方法
Wait(int Time | TimeSpan timespan)
//阻止(阻塞)其它任务的执行直到本任务(调用Wait方法的任务)执行完毕,参数可以指定最多等待当前任务执行多少时间,过期则阻止变成失效,如果在等待的时间内完成任务则返回true,否则返回false ContinueWith()
//ContinueWith方法返回一个Task实例,如果ContinueWith方法的委托返回某种类型,那么ContinueWith最终会用一个Task实例包装委托的返回类型,比如委托返回string,则ContinueWith方法返回Task<string>
//如果委托返回Task<string>,则ContinueWith方法返回Task<Task<string>> Unwrap()
//提取被ContinueWith方法包装过的Task实例(比如ContinueWith方法返回一个Task<Task<string>>,提取后Unwrap方法返回Task<string>)
//打印网页大小的例子:参看本页面:C#5.0新的异步任务关键字 //属性
IsCompleted
//异步任务是否已经执行完成,对于其Status属性返回的TaskStatus枚举值是RanToCompletion、Faulted、Canceled中的任何一个,则IsCompleted=true Result
//获取异步任务的返回值,此属性会阻塞其它线程,如Task<string>实例.Result==string实例 Status
//获取异步任务的详细状态,一个TaskStatus 枚举。可能的值如下:
//1.RanToCompletion:任务成功完成
//2.Faulted:任务出现异常,也算完成任务
//3.Running:任务正在执行,尚未完成
//4.Canceled:任务已经取消,也算完成任务
//5.Created:任务已初始化,但尚未列入激活计划
//6.WaitingForActivation:任务正在等待线程池列入激活计划
//7.WaitingToRun:任务已列入激活计划,但尚未开始执行
//8.WaitingForChildrenToComplete:任务已完成,正在隐式的等待附加的子任务完成 AsyncState
//在创建Task时附加到Task对象上的其它数据,通过参数state指定,用于需要时可通过AsyncState属性存取
var task = Task.Factory.StartNew(s => "119", "火警");
Console.WriteLine(task.AsyncState); //静态方法
Task.WaitAll(Task[] tasks)
//阻止非参数指定的任务的执行直到参数指定的任务全部执行完毕 Task.WaitAny(Task[] tasks)
//阻止非参数指定的任务的执行直到参数指定的任务中的任意一个执行完毕,也即只要参数指定的任意一个任务已经完毕则不再等待 Delay(int Time | TimeSpan timespan,[CancellationToken token])
//延时执行后面的任务,如果CancellationToken存在且在规定时间内取消令牌为true则取消任务,此方法会自动监视CancellationToken,不需要手动测试IsCancellationRequested
//例子:
Stopwatch stopwatch = new Stopwatch();
Task task = Task.Run(() => { Console.WriteLine("5秒后会执行延续任务……"); });
stopwatch.Start();
Task.Delay().ContinueWith(a => { Console.WriteLine($"经过了{stopwatch.ElapsedMilliseconds}毫秒后延续任务已执行……"); });
stopwatch.Stop();
Console.Read(); Task task2 = Task.Run(async () => {
Console.WriteLine("5秒后会执行延续任务……");
stopwatch.Restart();
await Task.Delay();
Console.WriteLine($"经过了{stopwatch.ElapsedMilliseconds}毫秒后延续任务已执行……");
stopwatch.Stop();
});
Console.Read();
轮询的例子:
using System.Threading; namespace ConsoleApp1
{
public class PiCalculator
{
public static string Calculate()
{
Thread.Sleep(10000);
return "异步任务已经完成……";
}
} class Program
{
static void Main(string[] args)
{
//Task<string> task = new Task<string>(()=> PiCalculator.Calculate());
//task.Start();
//Task<string> task = Task.Run(() => PiCalculator.Calculate());
Task<string> task = Task.Factory.StartNew<string>(() => PiCalculator.Calculate()); //新线程的异步任务
string rotate = @"↘↓ ↙";
for (int c = 0; c <= 1000000; c++) //主线程的遍历任务
{
if (task.IsCompleted) //轮询新线程
{
Console.WriteLine(task.Result); //取异步任务返回值
break;
}
else
{
for (int f = 0; f < rotate.Length; f++)
{
Console.Write($"{rotate[f]}");
Console.Write("\b"); //退格删除状态标
Console.Write("\b");
}
}
}
}
}
}
线程任务
子任务
分为两种,第一种是分离的子任务,第二种是附加的子任务。分离与附加的区别在于以下几点:
1.父子同步(附加子任务)、父子异步(分离子任务)
2.父任务会等待子任务完成(附加子任务)、父任务不会等待子任务完成(分离子任务)
3.子任务的状态、异常等都会冒泡到父任务层面上(附加子任务)、子任务的状态、异常等不会冒泡到父任务层面上(分离子任务)
分离子任务
在主线程写的第一个Task就称为父任务,在第一个Task之后再创建的Task统统称为分离子任务,分离子任务顾名思义表示它独立于父任务执行,父子关系是并发的,而非同步。
Task parent = Task.Factory.StartNew(()=>Console.WriteLine("parent"));
Task child = Task.Factory.StartNew(()=>Console.WriteLine("child"));
Task.WaitAll(parent, child); //打印结果有可能child在前也可能在后
Console.WriteLine();
附加子任务
如果需要子任务与父任务同步,则使用附加子任务,附加子任务必须在父任务的委托中定义,通过为子任务指定TaskCreationOptions.AttachedToParent选项明确表示子任务将附加到父任务上同步执行。注意Task.Run方法默认阻止子任务的附加,如果需要附加子任务则父任务不要使用Run方法,改用工厂方法。附加子任务和父任务之间的关系是一个层次结构,也即离包含子任务最近的那一层任务就是子任务的父任务。
Task parent = Task.Factory.StartNew
(
() => {
Console.WriteLine("parent");
Task child = Task.Factory.StartNew(() => Console.WriteLine("child"), TaskCreationOptions.AttachedToParent);
}
);
Task.WaitAll(parent); //打印结果是parent,然后是child,同步执行
Console.WriteLine();
延续任务
延续任务没有父子关系,此处只描述为在前面的是先驱任务,在后面的是延续任务。在一个异步任务上延续更多的任务,这需要使用Task对象的ContinueWith方法,可以将调用了ContinueWith方法的Task看成是先驱任务或延续任务,这取决于该调用该方法后是否继续调用了ContinueWith方法,先驱任务和延续任务之间的关系是一个层次结构,也即最后一个ContinueWith指定的延续任务的先驱任务是紧靠在它之前的那个任务。ContinueWith方法接收的委托是在先驱执行完毕后的延续。ContinueWith方法可以是一连串的链式调用。连续任务不是子任务,它属于先驱任务的一部分。但是,也可以通过TaskContinuationOptions.AttachedToParent将延续任务转换为先驱任务的子任务。
Task taskA = Task.Run(() => Console.WriteLine("异步开始……"));
Task taskB = taskA.ContinueWith(a => Console.WriteLine("延续A……"));
Task taskC = taskA.ContinueWith(a => Console.WriteLine("延续A……"));
Task.WaitAll(taskB, taskC);
Console.WriteLine("完成……"); //taskA执行完成后将并发执行taskB,taskC,但两者的执行顺序是不确定的 Task parent = Task.Factory.StartNew(() => Console.WriteLine("parent"));
Task continuation = parent.ContinueWith(p => Console.WriteLine("child continuation"), TaskContinuationOptions.AttachedToParent);//将延续任务转换为子任务
Task.WaitAll(parent);
Console.WriteLine();
控制异步任务行为的两个枚举
1.TaskCreationOptions枚举
TaskCreationOptions 枚举可能的值
//None:默认,指定应使用默认行为
//RunContinuationsAsynchronously:强制异步执行添加到当前任务的延续任务
//AttachedToParent:指定将父任务中包含的任务附加到任务层次结构中的父级,使其成为子任务
//DenyChildAttach:父任务中指定此选项则会阻止子任务使用AttachedToParent选项附加到父任务上
//LongRunning:向TaskScheduler提示,这是一个长时间运行的任务,请不要从线程池调用线程处理任务,而是分配更大的内存来调度任务
//PreferFairness:异步任务无法保证谁最先开始执行,此选项可以提示 TaskScheduler 以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行
2.TaskContinuationOptions枚举
这是一个比较重要的与异步任务有关的位标志枚举,可以使用位操作符合并多个选项,它用于控制延续任务的行为。
TaskContinuationOptions 枚举可能的值
//None:默认,先驱任务一旦完成则继续执行延续任务,而无视先驱任务的状态
//PreferFairness:异步任务无法保证谁最先开始执行,此选项可以提示 TaskScheduler 以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行
//LongRunning:通知线程池的任务调度器对象,这可能是一个I/O受限的高延迟任务,调度器可处理已经队列的其他任务。应尽量少用此项。
//AttachedToParent:指定将某个延续任务附加到任务层次结构中的父级,使其成为子任务
//DenyChildAttach:父任务中指定此选项则会阻止子任务使用AttachedToParent选项附加到父任务上
//一般通过在父任务类构造函数中或Task.Factory.StartNew方法中指定 TaskCreationOptions.DenyChildAttach选项即可成功阻止子任务的附加
//DenyChildAttach:任何延续任务(ContinueWith)如果被指定为附加到父任务的子任务,则引发异常
//OnlyOnRanToCompletion:如果先驱任务成功完成,才会执行此延续任务,否则不。也即,先驱任务未成功完成,延续任务就不会执行
//NotOnRanToCompletion:如果先驱任务成功完成,则不会执行此延续任务,否则不。也即,先驱任务未成功完成,延续任务就会执行
//OnlyOnFaulted:如果先驱任务没有成功完成,才会执行此延续任务,否则不。也即,先驱任务成功完成,延续任务就不会执行
//NotOnFaulted:如果先驱任务没有成功完成,则不会执行此延续任务,否则不。也即,先驱任务成功完成,延续任务就会执行
//OnlyOnCanceled:如果先驱任务已经被取消,则执行此延续任务,否则不。也即,先驱任务没有被取消,延续任务就不会执行
//NotOnCanceled:如果先驱任务已经被取消,则不会执行此延续任务,否则不。也即,先驱任务没有被取消,延续任务就会执行
//特别注意:任何试图为任务手动捕获异常的行为都将被视为任务正常完成,为了正确掌握先驱任务是否正常完成,应考虑手动捕获Wait方法而不是在任务内部使用try catch
// 特别注意:延续任务如果指定了TaskContinuationOptions的以On或NotOn开头的选项,则先驱任务在不满足延续任务的枚举选项且延续任务在Wait的时候会引发异常
//例子:
static void Run(string num)
{
int n = Convert.ToInt32(num);
}
static void Main(string[] args)
{
Task taskA = Task.Run(() => Run("123"));
Task taskB = taskA.ContinueWith(a => Console.WriteLine("先驱任务正常完成,我才会执行"), TaskContinuationOptions.OnlyOnRanToCompletion);
try { taskB.Wait(); }
catch (Exception ex) { }
}
取消任务
如果对线程任务粗暴的使用类似Abort,那么无法预知程序如此武断的结束会不会造成异常的发生,为了避免这种情况,可以使用CancellationTokenSource类,此类表示取消任务的中间人,它提供Token属性返回一个表示取消标识的CancellationToken对象,你可以在任务执行的方法内部使用CancellationToken对象的IsCancellationRequested属性,该属性是一个指示器,它将根据CancellationTokenSource对象是否调用了Cancel方法判断是否接到取消任务的请求,如果是则不再执行任务。这是通过在任务执行的方法内部使用循环轮询的方式测试IsCancellationRequested属性是否为真,这样就避免了任务半途而废造成的异常。注:如果父任务在其子任务开始前取消了自身,则子任务将永远不会开始。 如果父任务在其子任务已经开始后才取消了自身,则子任务将完成运行。
CancellationTokenSource
//属性
Token
//获取一个取消任务的标识,返回一个CancellationToken对象
//CancellationToken对象具有IsCancellationRequested属性,IsCancellationRequested属性是一个指示器,它根据CancellationTokenSource对象的Cancel方法判断是否接到取消任务的请求 //方法
Cancel
//取消任务,这会使CancellationTokenSource.Token.IsCancellationRequested=true
取消任务的例子
class Program
{
static void PrintStr()
{
string[] strAry = { "a","b","c","d" };
Action<int> action = (i) => { Console.Write(i); };
for (int c=;c<;c++)
{
action(c);
for (int z = ; z < ; z++)
{
action(z);
}
}
}
static void WriteValue(CancellationToken token)
{
string pSection = string.Empty;
while (!token.IsCancellationRequested) //一些内置的供异步任务调用的方法,在取消任务的时候不需要你做判断,但自定义的供异步调用的方法需要手动判断
{
PrintStr();
}
} static IEnumerable<string> YiedTest()
{
string[] strAry = { "a", "b", "c", "d" };
for (var c = ; c < strAry.Length; c++)
{
yield return strAry[c];
}
}
static void Main(string[] args)
{
string cancelOperator = "************************************";
Console.WriteLine("可以随时按Enter键取消任务");
CancellationTokenSource tokenSource = new CancellationTokenSource();//创建负责提供'取消标识'的CancellationTokenSource对象
tokenSource.Token.Register(() => { Console.WriteLine("取消任务事件被触发……"); }); //注册取消事件
Task task = Task.Run(() => { WriteValue(tokenSource.Token); }, tokenSource.Token);
Console.ReadLine();//阻塞主线程,同时异步任务会继续执行
tokenSource.Cancel();//按下了Enter键,主线程被激活,此时要取消异步任务的执行
Console.WriteLine(cancelOperator);
task.Wait();
Console.WriteLine("\n你已经取消了任务,任务优雅结束");
}
}
如图所示,取消任务的命令发出后,打印了一串*号,接着任务再执行了一次才结束,这就避免了中途粗暴的结束任务造成异常。
取消任务时抛出异常
可以使用两种方式在任务执行的方法内部抛出异常,修改上面取消任务的代码:
static void WriteValue(CancellationToken token)
{
string pSection = string.Empty;
while (!token.IsCancellationRequested)
{
pSection =string.Join("",Print.PrintNum());
Console.Write(pSection);
}
//while循环的轮询是基于没有取消任务,否则循环会终止从而执行异常的抛出
token.ThrowIfCancellationRequested(); //第一种
throw new OperationCanceledException(token);//第二种
}
C#5.0新的异步任务关键字
Task类的Unwrap方法可以提取由ContinueWith方法返回的被封装的返回值,下面给出一个延续异步任务的例子,但代码比较杂乱,不好理解,如下:
class Program
{
/// <summary>
/// 异步任务未执行完成之前打印句点
/// </summary>
static void PrintSymbol()
{
List<string> SymbolList = new List<string> { ".", ".", ".", ".", "." };
int c = ;
Console.Write("稍等");
while (true)
{
c++;
Thread.Sleep();
string[] argsArray = System.Environment.GetCommandLineArgs();
if (c == )
{
Console.Clear();
c = ;
Console.Write("稍等");
}
Thread.Sleep();
Console.Write(GetWaitingSymbol(SymbolList).First());
}
}
/// <summary>
/// 句点迭代器
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
static IEnumerable<string> GetWaitingSymbol(List<string> list)
{
for (var c = ; c < list.Count; c++)
{
yield return list[c];
}
}
/// <summary>
/// 异步任务计算远程网页的大小
/// </summary>
/// <param name="url"></param>
/// <returns></returns>
static Task WriteWebRequestSizeAsync(string url)
{
StreamReader reader = null;
WebRequest web = WebRequest.Create(url);
//发起异步请求
Task task = web.GetResponseAsync().ContinueWith(a =>
{
WebResponse response = a.Result; //获得Response对象
reader = new StreamReader(response.GetResponseStream()); //将数据流存入StreamReader
return reader.ReadToEndAsync();//此处用了ReadToEndAsync方法是因为该方法所处于的ContinueWith必须返回一个有返回值的Task<string>,否则它只能返回Task实例
}).Unwrap().ContinueWith(a => //本来可以将reader读取的数据直接进行处理后输出,此处故意多写了一个ContinueWith是为了演示Unwrap方法
{ //前一个ContinueWith返回一个Task<string>实例,在该实例上调用了最后一个ContinueWith方法,其委托中的a参数就是Task<string>实例
//然后调用Result取出返回值用于处理后输出网页大小
if (reader != null)
{
reader.Dispose();
string text = a.Result;
UnicodeEncoding encoding = new UnicodeEncoding();
byte[] bytes = encoding.GetBytes(text.ToCharArray());
Console.WriteLine($"网页大小:{ bytes.Length / }kb");
}
});
return task;
} static void Main(string[] args)
{
string url = "http://gsoogle.com";
Task task = WriteWebRequestSizeAsync(url);
try
{
while (!task.Wait()) //循环等待,每次循环都等待异步任务1秒
{
PrintSymbol();
}
}
catch (AggregateException ex)
{
ex.Handle(eachException => { Console.WriteLine(eachException.GetType().Name); Console.WriteLine(eachException.Message); return true; }); //如果返回false则会重新引发异常
}
}
}
改写上面的例子,将使用新的关键字来简化代码量,提高可阅读性:
/// <summary>
/// 异步任务计算远程网页的大小
/// </summary>
/// <param name="url"></param>
/// <returns></returns>
static async Task WriteWebRequestSizeAsync(string url)
{
WebRequest web = WebRequest.Create(url);
WebResponse response =await web.GetResponseAsync();
StreamReader reader = new StreamReader(response.GetResponseStream());
string text =await reader.ReadToEndAsync();
UnicodeEncoding encoding = new UnicodeEncoding();
byte[] bytes = encoding.GetBytes(text.ToCharArray());
Console.WriteLine($"网页大小:{ bytes.Length / 1024}kb");
} static void Main(string[] args)
{
string url = "https://www.google.com/";
Task task = WriteWebRequestSizeAsync(url);
try
{
while (!task.Wait(1000)) //循环等待,每次循环都等待异步任务1秒
{
PrintSymbol();
}
}
catch (AggregateException ex)
{
ex.Handle(eachException => { Console.WriteLine(eachException.Message); return true; }); //如果返回false则会重新引发异常
}
}
async关键字
C#5.0提供了一个async关键字和一个await操作符来简化延续任务的复杂度,这提高代码的可阅读性。被async修饰的方法表示为一个可能需要异步执行的方法,该方法只能返回三种类型:void、Task或Task<T> ,如果外部需要async修饰的方法返回Task以便可以得到异步对象的相关信息,那么只需要声明该方法会返回Task即可(return Task是不必须的),而如果外部需要异步方法返回Task<T>,则必须return一个T供外部获取,如:
public async Task<string> Test() { return "hello"; }
Task<string> task = Test();
string r = task.Result; //hello
调用async修饰的方法并不表示该方法会异步执行,实际上这种方法是同步执行,这种方法与普通方法没有任何区别,它将在调用它的线程上执行而非在异步线程上执行,方法执行它内部的代码逻辑时遇到异步任务后才会开启异步执行,所以无论如何async修饰的方法存粹是一个同步方法,因为它始终运行在调用它的线程上而非另一个异步的线程上,真正异步执行的其实是该方法体内部的实现存在异步任务,下面的例子中的GetResponseAsync和ReadToEndAsync才是异步执行的任务。
await操作符
要使用await操作符,则该方法必须被修饰为async,await操作符的行为也并非是阻塞线程等待异步任务的完成,实际上它的作用就是为将那些异步任务ContinueWith成一个整体的异步任务的延续链条,简化使用ContinueWith方法的复杂度而已。真正阻塞线程的其实就是那些异步执行的方法,await只是将它们串联成一个整体的异步任务,如:
WebRequest web = WebRequest.Create(url);
WebResponse response = await web.GetResponseAsync(); //GetResponseAsync阻塞了当前的线程
StreamReader reader = new StreamReader(response.GetResponseStream());
string text = await reader.ReadToEndAsync(); //ReadToEndAsync阻塞了当前线程,await的工作仅仅是ContinueWith了这两个异步方法
异步Lambda
简化函数,直接用当做委托使用的Lambda表达式替换掉WriteWebRequestSizeAsync方法,同样的该Lambda委托并非就是异步执行,它在调用它的线程上执行而非异步,直到Lambda主体代码开启了异步任务,才称其为一个异步的Lambda,注意异步Lambda与异步任务同样只能返回void、Task或Task<T>。
static void Main(string[] args)
{
string url = "https://www.google.com/";
Func<string,Task> lambda = async (urlPath) => {
WebRequest web = WebRequest.Create(urlPath);
WebResponse response = await web.GetResponseAsync();
StreamReader reader = new StreamReader(response.GetResponseStream());
string text = await reader.ReadToEndAsync();
UnicodeEncoding encoding = new UnicodeEncoding();
byte[] bytes = encoding.GetBytes(text.ToCharArray());
Console.WriteLine($"网页大小:{ bytes.Length / 1024}kb");
};
Task task = lambda(url); try
{
while (!task.Wait(1000)) //循环等待,每次循环都等待异步任务1秒
{
PrintSymbol();
}
}
catch (AggregateException ex)
{
ex.Handle(eachException => { Console.WriteLine(eachException.Message); return true; }); //如果返回false则会重新引发异常
}
}
有返回值的异步Lambda
Func<string, Task<string>> GetInRoleUserName = async ( userID ) =>
{
AppUser appUser = await UserManager.FindByIdAsync ( userID );
string userName = appUser.UserName;
return userName;
};
//GetInRoleUserName方法返回的是Task<T>
//提取Task<T>中的T类型实例
string[] names= await Task.WhenAll ( GetInRoleUserName("1") );
并行迭代 System.Threading.Task.Parallel
在计算机拥有多个CPU的情况下,并行迭代在执行大量计算时的运算效率会显著提升。手动捕获异常时,可以将Parallel的方法调用放进try块,捕获AggregateException异常包。
//方法
//For(int fromIncludesive, int toExclusive,[ParallelOptions] Action<int counter, ParallelLoopState state> lambda)
//ParallelOptions:控制并行行为的对象,可以创建这个对象并初始化其某些属性来精确控制并行迭代
//counter:计数器
//ParallelLoopState:是可选的迭代状态对象,可以在并行迭代时随时中断(break,与for循环的continue相同)当前迭代或停止(stop)所有迭代
//该方法会调用委托toExclusive-1次,因为是并行任务,所以其执行顺序是不能被保证的。而调度器会自动计算需要多少个线程来执行并行迭代最合适
//最终该方法会返回一个ParallelLoopState表示迭代执行的状态,ParallelLoopState的IsCompleted获取并行迭代是否全部执行完成,如果因为内部使用了条件来中断或停止迭代,则此属性会返回false,LowestBreakIteration表示最低迭代次数(只有在有中断或停止的条件下,此属性才会返回值)
//例子:
//该方法会调用委托10次,计数器i在并行前已经恰当的分配了值,所以每个任务为数组的某个元素赋值时不会发生同时访问同一个元素且赋值两次的情况。
string[] nameArray = new string[];
Parallel.For(, , (i) =>
{
nameArray[i] = $"卫星{i}号";
Console.WriteLine($"值:{nameArray[i]},线程ID:{Thread.CurrentThread.ManagedThreadId},任务ID:{Task.CurrentId}");
});
//break的例子:
Parallel.For(, , (i, state) =>
{
nameArray[i] = $"卫星{i}号";
if (i > )
{
state.Break();
return;//必须加return清理资源
}
Console.WriteLine($"值:{nameArray[i]},线程ID:{Thread.CurrentThread.ManagedThreadId},任务ID:{Task.CurrentId}");
});
//或:
Parallel.For(, , (i) =>
{
nameArray[i] = $"卫星{i}号";
if (i < )
{
Console.WriteLine($"值:{nameArray[i]},线程ID:{Thread.CurrentThread.ManagedThreadId},任务ID:{Task.CurrentId}");
}
});
//例子:使用ParallelOptions控制最大并行任务的数量
ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = };
Parallel.For(, , parallelOptions, (i) => { /*……*/ }); ForEach<T>(IEnumerable < T > foreachSource, Action < T, ParallelLoopState > lambda)
//并行迭代集合 Invoke(params Action[] lambdaList)
//并行多个无返回值的任务
//例子:
Parallel.Invoke(() => Console.WriteLine("TaskA"), () => Console.WriteLine("TaskB")); }}}
取消并行迭代
与异步任务一样,用CancellationTokenSource来取消任务,但需要一个额外操作,就是将CancellationToken对象在ParallelOptions 中初始化,然后在并行迭代时提供这个ParallelOptions即可,不需要手动判断CancellationToken的IsCancellationRequested。
ParallelOptions parallelOptions = new ParallelOptions { CancellationToken = tokenSource.Token };
Parallel.For(, , parallelOptions,(i) => { Console.WriteLine(i)}); //ParallelOptions会自动轮询检测CancellationToken的IsCancellationRequested
并行Linq(PLinq Parallel Linq)
只需在集合上调用AsParallel方法就可以开启集合的并行查询。
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
MyEntity.DrugsErpEntities dbcontext = new MyEntity.DrugsErpEntities();
var list=dbcontext.UserManagers.Where(user => user.LoginName.Contains("a")).ToList();
stopwatch.Stop();
Console.WriteLine($"Linq查询耗时:{stopwatch.ElapsedMilliseconds}");
stopwatch.Restart();
var plist= dbcontext.UserManagers.AsParallel().Where(user => user.LoginName.Contains("a")).ToList();
stopwatch.Stop();
Console.WriteLine($"PLinq并行查询耗时:{stopwatch.ElapsedMilliseconds}");
//Linq并行方法
AsParallel()
//为集合启用并行化查询,该方法返回一个ParallelQuery<T>,ParallelQuery<T> 从IEnumerable<T>派生,而ParallelEnumerable对ParallelQuery<T> 扩展出了Linq查询方法
//不要盲目的使用PLNQ,PLINQ会对输入元素进行分区,增加了算法的复杂性,会有一定性能的消耗,通俗的讲就是非并行的Linq方法消耗的时间很短,要小于多线程之间频繁切换的时间。
//例子:
var pResultList = list.AsParallel().Where(s => s.Length == ).ToList(); WithExecutionMode(ParallelExecutionMode parallelExecutionMode)
//并行查询的执行模式,在默认情况下,执行PLINQ查询的时候,.NET机制会尽量避免高开销的并行化算法,这些算法有可能会将执行的性能降低到比串行执行的性能还要低。
//所以对一个可能会高消耗的集合调用AsParallel时可能.NET机制并不会开启并行执行而是串行,通过设置ParallelExecutionMode.ForceParallelism可强制并行查询
//例子
var pResultList = list.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).Where(s => s.Length == ).ToList();
取消并行Linq任务
static List<string> Test(CancellationToken token)
{
List<string> list = new List<string>
{
"8","2","3","4","5", "8","2","3","4","5", "8","2","3","4","5", "8","2","3","4","5"
};
return list.AsParallel().WithCancellation(token).Where(s => s.Length == ).ToList(); //WithCancellation方法会自动轮询检测CancellationToken的IsCancellationRequested
}
static void Main(string[] args)
{
CancellationTokenSource tokenSource = new CancellationTokenSource();
Task task = Task.Run(()=> Test(tokenSource.Token),tokenSource.Token); Console.ReadLine();
tokenSource.Cancel();
Console.WriteLine("……");
try { task.Wait(); }
catch (AggregateException ex) { }
finally { Console.WriteLine("The End"); }
}
线程安全(多线程并发同步)
假设多个线程并行访问一个全局资源,它们同时修改该资源的数据,那么就会发生竞争甚至出现混乱,这种混乱就可以称为非安全的线程任务。考虑下面的代码:
static int size = ;
static void Main(string[] args)
{
Task task = Task.Run(()=>
{
for (var c = ; c < int.MaxValue; c++)
{
size--;
}
});
for (var c = ; c < int.MaxValue; c++)
{
size++;
} task.Wait();
Console.WriteLine($"size:{size}");
//两个任务并行执行,它们都在争夺size字段,尝试修改它的值。我们无法确知最终size的值是多少,也即值可能不符合我们的预期。
}
一个64位的操作系统能原子性地读写一个64位的变量,但如果有一个decimal(128位)类型的变量被操作系统读写,那么该变量就不是原子性的。假设该变量同时被两个线程访问,当A线程正在为该资源赋值,但是刚在栈上填充了64位的时候突然切换到了B线程,那么A线程的操作就是不完整的,它在填充二进制数据的时候被打断,致使以后的读取出现问题。所以多线程对同一个全局变量的操作必须确保是同步的、队列的。全局变量就是一个唯一的资源,多线程并行访问它就会出现竟态导致逻辑混乱,出现非预期的结果。而局部变量则可以无视多线程的竟态,因为多线程中的局部变量都有自己的一个版本,各自操作自己的变量是互不影响的。假设两个线程发起对方法Test的调用,两个线程都想修改Test所在的类的某个字段A,A字段就是一个全局资源,而Test方法的参数和其内部的变量都属于局部,对这些变量进行修改不存在线程间的竞争,而这些线程要修改A字段,则会发生竞争。
注意一个事件、委托可能是全局资源,多线程操作它们时可能会发生下面的情况:
class Program
{
static Action LambdaEventHandler = () => { Console.WriteLine("hello world"); };
static string RunLambda()
{
if (LambdaEventHandler != null)
{
Thread.Sleep(); //模拟沉睡,这段时间内主线程会在500毫秒后率先被唤醒,然后将LambdaEventHandler设为null
LambdaEventHandler(); //调用失败,抛出异常
}
return "OK";
}
static void Main(string[] args)
{
Task<string> task = Task.Run(() => { return RunLambda(); });
Thread.Sleep();
LambdaEventHandler = null;
string result=task.Result;//阻塞线程
Console.WriteLine("end");
}
}
解决办法是将全局资源拷贝一个副本
static string RunLambda()
{
Action resultLambdaEventHandler = LambdaEventHandler;//将全局资源先赋给局部变量,建立了副本
if (LambdaEventHandler != null)
{
Thread.Sleep(); //模拟沉睡,这段时间内主线程会在500毫秒后率先被唤醒,然后将LambdaEventHandler设为null
resultLambdaEventHandler(); //调用成功
}
return "OK";
}
使用Monitor类使并发线程同步
这是一个监控线程的类,它提供方法锁定某个维持不变的量,同时它会监视即将获取全局资源的线程,每次只会有一个线程能访问到资源,其它线程将被阻塞。具体做法是先将Monitor的两个方法用try、finally包装起来。而对全局资源的访问应放在Monitor的两个方法之间。Monitor的Enter方法的第一个参数必须是一个静态只读的对象,这个对象就是锁的目标,该目标必须是维持不变的一个量。静态可以保证该对象在内存中只有一个,具有唯一性。而只读是为了保证该对象不会被多线程任务任意修改。锁住这个目标,其状态不发生改变,才能使上锁和解锁成为可能。上锁之后线程会阻塞,确保了一次只有一个线程能进入Enter方法之后的代码。第二个参数需要一个描述是否释放锁的token(一个bool值),这个token的作用就是在线程完成对资源的访问后或者线程访问资源时出现异常后能全身而退,能使Monitor正确释放锁,否则就会造成死锁。因为如果线程在操作资源时出现了异常,那么Monitor不知道如何处理,所以将Monitor的Exit方法放到finally块中,这样,无论线程操作资源时是否发生异常,finally都会执行,此时只需要测试token是否为true就能确知Monior是否已经为参数1的对象加了锁,如果已经加锁则释放锁即可。可以使用Monitor或lock,这两种方式实际上差不多,lock的底层实现运用的就是Monitor。
锁的目标
多个线程要访问某个全局资源,那么它们要请求的锁的目标必须是同一个对象,也即多个线程请求锁定的这个对象在内存中的地址必须是一样的,否则,锁是无效的。
锁的目标不能是值类型、this,typeof(Animal),string,锁这些对象锁不上。this不是原子性的,它可能有多个实例,如果另一个线程Monitor.Enter了另一个this实例,则两个this指向的内存地址是不同的。它们对某个资源的访问就会发生竞争。Monitor.Enter(typeof(Animal))同样如此,该方法返回一个Type实例,两个线程都Monitor.Enter(typeof(Animal)),那么两个线程锁定的也不是同一个对象,Monitor.Enter("AA")不被锁定机制支持,所以锁定字符也不行。Monitor.Enter(100)也不可以,因为Monitor.Enter会装箱值类型使其变成对象类型,当另一个线程Monitor.Enter该值类型时,又会发生装箱,两次装箱得到的结果肯定不是同一个对象。如果使用lock会编译错误,如果使用Monitor则运行时错误。锁定的对象按照官方的推荐,应是private static readonly的对象。这是因为private可保证该对象不会被外部调用修改,造成多个线程对该对象申请锁时失效,比如线程a锁定了一个非private的对象,线程b修改了该对象,重新new了一个新对象赋值给它,而此时b线程如果要锁定该对象,那么a线程锁定的对象和b线程锁定的对象在内存中就不是同一个对象,锁就无效,无法实现同步。static可以保证该对象在内存中只有唯一的一个实例,readonly可保证对象不被修改,三个修饰符恰好可以确保该对象不会被任意修改、保证了它的唯一性。
class Program
{
private static readonly object obj = new object();
static int size = ;
static void Main(string[] args)
{ Task task = Task.Run(() =>
{
for (var c = ; c < int.MaxValue; c++)
{
bool tokenForLock = false; //是否加锁的状态标
try
{
Monitor.Enter(obj, ref tokenForLock); //为obj加锁后,代码至Exit方法之前的逻辑,其它线程无法访问。调用Enter方法后,状态标会自动设为true并传出
size--;
}
finally
{
if (tokenForLock)
{
Monitor.Exit(obj); /*释放排它锁*/
}
}
}
}); for (var c = ; c < int.MaxValue; c++)
{
bool tokenForLock = false; //是否加锁的状态标
try
{
Monitor.Enter(obj, ref tokenForLock);
size++;
}
finally
{
if (tokenForLock)
{
Monitor.Exit(obj); /*释放排它锁*/
}
}
}
task.Wait();
Console.WriteLine($"size:{size}"); //值为0
}
}
使用lock关键字使并发线程同步
lock关键字简化了Monitor的代码量,它表示请求为某个对象加锁,它在内存中查找该对象,查看该对象是否已经上锁,如果没有,则为其加锁,否则一直阻塞当前线程直到拥有锁的线程释放锁,再为其加锁。使用lock不需要token,也不需要try、finally,以下代码可以替换使用Monitor的代码
class Program
{
static readonly object obj = new object();
static int size = ;
static void Main(string[] args)
{ Task task = Task.Run(() =>
{
for (var c = ; c < int.MaxValue; c++)
{
lock (obj) //为obj加锁,lock代码块的逻辑其它线程无法访问,lock表达的意思是:在内存中查找obj,查看其是否已经上锁,如果已经上锁则阻塞当前线程,等待obj释放锁,如果obj还没有被加锁或它的锁已经被释放,则立即为其加锁。
{
size--;
}
}
}); for (var c = ; c < int.MaxValue; c++)
{
lock(obj)
{
size++;
}
}
task.Wait();
Console.WriteLine($"size:{size}"); //值为0
}
}
使用Interlocked类使并发线程同步
可以使用Interlocked对象使线程同步。Interlocked对象不会为线程上锁,它是通过为多个线程共享的资源数据提供独占的原子性操作从而使并发线程同步执行成为可能。(为多个线程共享的变量提供原子操作),由于Interlocked是使用的硬件原语,其效率自然也比加锁高得多,但它仅仅用于修改一些可能发生多线程竞争的全局资源的简单操作。
//方法
CompareExchange(ref object[int | long | IntPtr] location, object[int | long | IntPtr] newValue, object[int | long | IntPtr] globalData)
//以原子操作的形式修改全局资源location的值,比如一个受多个线程竞争的资源,每个线程都要修改它的值,利用循环和此方法可以使线程同步修改其值,不出现混乱
//location是一个会遭到多线程竞争的全局资源,globalData是首个线程入栈时location的值
//如果location==globalData,则将location修改为newValue,这证明没有其他线程竞争
//如果location!=globalData,则不做任何操作,这证明已经有其它线程在竞争
//返回location被CompareExchange方法修改前的值,如果它没有被方法操作(修改),则返回当前的值
//CompareExchange就是为了阻止多线程同时并行,使他们有序的、一前一后的执行 Increment(ref int[long] location)
//以原子操作的形式递增location参数 Decrement(ref int[long] location)
//以原子操作的形式递减location参数
//例子:同步多线程执行循环递增
static int size = ; //全局资源
static void Test()
{
for (var c = ; c < ; c++)
{
Interlocked.Increment(ref size);
}
}
static void Main(string[] args)
{
Task task = Task.Run(() =>
{
Test();
});
Test();
task.Wait();
Console.WriteLine($"size:{size}"); //两个线程会同步执行,最终结果值为20
} Add(ref int[long] location, int[long] value)
//以原子操作的形式设置location=value+location,并返回location
//例子:
class Program
{
static int size = ;
static void Main(string[] args)
{ Task task = Task.Run(() =>
{
Interlocked.Add(ref size, );
}); Interlocked.Add(ref size, );
task.Wait();
Console.WriteLine($"size:{size}"); //值为3
}
}
下面的例子描述了线程竞争导致的结果不符合预期的情况:
class ThreadSafe
{
public int Age = ;
public string Test(int age)
{
Age = age;
Thread.Sleep(); //多线程竞争资源,导致结果不符合预期,但处理速度太快有可能看不到效果,所以此处沉睡一下,
return $"年龄:{Age}";
}
}
class Program
{
static void Main(string[] args)
{
ThreadSafe s1 = new ThreadSafe();
Task<string> task = Task.Run(() => s1.Test());
string result2 = s1.Test();
//以下两个打印结果只可能同时是18或同时是32,假设先执行了主线程,然后主线程睡眠5秒,同时task线程修改了Age的值(18)再沉睡,然后又切换到主线程,所以主线程和task都返回18,
Console.WriteLine(task.Result);
Console.WriteLine(result2);
}
}
使用Interlocked的CompareExchange可以解决以上问题:
class ThreadSafe
{
public int Age = ; //要处理的全局资源,为了防止多个线程修改Age,必须使Age的赋值是原子性的,利用循环和Interlocked.CompareExchange方法为Age赋值就可以保证线程的同步
public string Test(int age)
{
do
{
Age = ;
}
while (Interlocked.CompareExchange(ref Age, age, )!=);
return $"年龄:{Age}";
}
//线程A入栈(假设A传递的age参数的值是32),do块中将Age重置为0,CompareExchange方法测试Age是否是最开始的初始值0,如果是则将age参数(局部变量)赋值给Age,Age=32
//假设在return之前,线程调度器突然切换到了B线程(假设B线程传递的age参数的值是18),那么因为Age是全局资源,所以它的值是被A线程已经修改过的值,值为32,如果此时直接return Age则不符合预期
//正是因为使用了CompareExchange方法为Age赋值,再通过do循环来重置Age的初始值,所以当B线程执行While判断时,Age为32,Age与初始值0不相等,则CompareExchange方法不会将局部变量age(此时是18)赋给Age
//这样,CompareExchange方法返回已经被A线程修改过的Age的值32,32!=0,这满足了循环的条件,所以开始执行do块,do块重置了Age的值,再通过while测试,Age与0相等,所以将局部变量age的值18赋给了Age,最终return Age
}
class Program
{
static void Main(string[] args)
{
ThreadSafe s1 = new ThreadSafe();
Task<string> task = Task.Run(() => s1.Test());
string result2 = s1.Test(); Console.WriteLine(result2 );
Console.WriteLine(task.Result);
}
}
使用Mutex类使并发线程同步
几乎与Monitor、lock的机制完全一样,但它是内核级别的,可以用于进程间的同步。可以使用它来阻止用户重复多次开启一个实例。也即一次只运行一个实例的进程。此类从WaitHandle派生,WaitHandle提供了基本的线程同步方法。
WaitOne(int millisecond, [bool exit])
//阻止当前线程,直到WaitHandle实例收到信号,也即直到这个线程成功得到互斥锁。相当于检测每个入栈的线程,阻塞它们,直到其中一个分配到一把锁,如果前面的线程没有释放锁,则此方法永不返回,永久阻塞
//millisecond:如果带时间,则在规定的时间内阻止当前线程直到前面的线程释放锁,如果前面的线程没有释放锁,则返回false,否则返回true
//exit:设为true时,如果在规定的时间内不能得到互斥锁,则强制线程并行(直接进入独占区与其他线程竞争资源),否则不
//例子:
private static Mutex mutex = new Mutex();
static void Run(object taskName)
{
string name = taskName.ToString();
Console.WriteLine($"{name} 正在请求互斥锁……\n");
mutex.WaitOne(); //阻塞当前线程,为其申请互斥锁,等待前面的线程释放锁的信号,如果锁被释放则允许当前线程往下执行,否则永久等待
Console.WriteLine($"{name} 申请互斥成功,已经进入了独占区……\n");
Thread.Sleep();
Console.WriteLine($"{name} 正在离开保护区……\n");
mutex.ReleaseMutex();//释放互斥锁
Console.WriteLine($"{name} 已经释放了互斥锁……\n");
}
static void Main(string[] args)
{
Task task1 = Task.Factory.StartNew((s) => { Run(s); }, "1号");
Task task2 = Task.Factory.StartNew((s) => { Run(s); }, "2号");
Console.ReadLine();
}
//终止
if (!mutex.WaitOne()) //如果100毫秒内拿不到互斥权限则终止当前线程
{
Thread.CurrentThread.Abort();
} ReleaseMutex()
//释放当前线程的锁一次,也即如果是循环锁,则需要循环调用此方法解锁
死锁(dead lock)
死锁的三种可能
1.A对象被申请上锁后,接下来执行的独占代码出现异常,致使锁无法解开,锁死。上面介绍的多线程同步的技术已经解决了这个问题,它们都是线程安全的技术。但纵然Monitor和lock可以确保线程正确执行完成或发生异常时通过finally释放锁,但它们无法确保在独占代码里发生线程持续占用资源不退出甚至死循环的情况,这样,finally不会执行,死锁就会发生。
2.在函数递归时,如果函数为某对象加了锁,递归会继续请求该锁,这很有可能造成死锁,因为前面一层已经锁上了对象,递归执行时会继续请求该锁,这样,前面的锁未释放,后面的锁是前面代码的执行逻辑,后面请求锁时,会卡死,不能继续执行,造成死锁。这种情况称为递归死锁,而Monitor或lock或Mutex的机制支持递归锁,也即在同一个线程中这不会发生死锁,代码逻辑不受任何影响,Monitor和lock只限制不同线程之间的互斥,同一个线程之内,都不受限制。
3.某个线程锁上了A对象,接下来执行的独占代码又请求为B对象上锁,另一个线程锁上了B对象后,接下来执行的独占代码又请求为A对象上锁,这样两个线程的独占代码都在等待对方释放它们正在申请的加锁对象,造成两个线程的独占代码无法继续往下执行,不能执行则无法释放第一层的锁,从而进入死循环,锁死。
并发同步集合(System.Collections.Concurrent)
以下集合支持并发时同步,使用时不必担心并发时线程的竞争。
BlockingCollection<T>
//线程同步的列表集合
ConcurrentBag<T>
//线程同步的无序列表集合
ConcurrentDictionary<TKey, TValue>
//线程同步的字典集合
ConcurrentQueue<T>
//线程同步的先进先出的单项队列集合类
ConcurrentStack<T>
//线程同步的后进先出的栈集合
线程本地存储(System.Collections.Concurrent)
如果你希望多个线程访问某个全局资源时,该资源可以变成每个独立线程的局部变量,那么可以使用隔离机制, 线程本地存储ThreadLocal类可以将全局资源转化为线程的专属变量从而不必担心多线程的竞争。但需要注意,这个机制根同步没有什么关系,因为全局资源一旦转为局部变量,那么多线程对该资源的访问就不存在竞争,也没有必要使用加锁去同步该资源,因为它仅仅是一个局部变量,仅仅与当前线程有关系。
//不使用ThreadLocal:
public static string Name { get; set; }
static void Main(string[] args)
{
Task task = Task.Run(() =>
{
Name = "sam";
Console.WriteLine($"name:{Name}");
}); Name = "korn";
task.Wait();
Console.WriteLine($"name:{Name}");
//如果没有ThreadLocal的隔离机制,那么主线程和异步任务并发执行,主线程设置了Name=korn,然后主线程会阻塞,异步任务打印sam后,主线程接着也会打印sam。
}
//使用ThreadLocal
static ThreadLocal<string> threadLocal = new ThreadLocal<string>(() => ""); //创建一个本地存储的全局资源,该资源默认值是空字符
public static string Name { get { return threadLocal.Value; } set { threadLocal.Value = value; } } //全局资源通过本地存储来存取
static void Main(string[] args)
{
Task task = Task.Run(() =>
{
Name = "sam";
Console.WriteLine($"name:{Name}"); //sam
}); Name = "korn";
task.Wait();
Console.WriteLine($"name:{Name}"); //korn
或使用ThreadStaticAttribute特性替代ThreadLocal,ThreadStaticAttribute特性只能应用在字段上。另外,虽然ThreadStaticAttribute和ThreadLocal的实现机制是一样的,但ThreadStaticAttribute标注的字段如果在声明时就赋了值,比如初始化为120,那么120这个值只有主线程会得到,其它线程不会得到这个初始值,其它线程总是为其初始化为该字段的类型所对应的默认值。
[ThreadStatic]
private static string name;
public static string Name { get { return name; } set { name = value; } }
多线程的异常捕获
不要尝试用try包含异步任务的Start方法,因为控制会立即从异步任务返回到主线程。所以在异步任务的异常还未发生时,控制已经立即返回主线程,也即控制立即会离开try块,此时,异常的捕获就是失败的。任何线程上的异常都必须手动捕获,否则Clr会终止程序。
AggregateException 异常集合
可以使用异常包(异常集合)去捕获所有线程任务的异常信息,再调用异常信息的Handle方法,通过一个委托来迭代异常信息。
static void Main(string[] args)
{
Task taskA = Task.Run(()=> { throw new InvalidOperationException(); });
Task taskB = Task.Run(() => { throw new DllNotFoundException(); });
try
{
Task.WaitAll(taskA, taskB);
}
catch (AggregateException ex)
{
ex.Handle(eachException => { Console.WriteLine(eachException.Message); return true; }); //如果返回false则会重新引发异常
}
}
Task.Exception异常集合
在确定任务必然出现异常的情况下,可以使用Task的Exception.Handle方法,Task的Exception也是一个异常集合,Handle接收一个委托,可以输出异常信息。
Task task = Task.Factory.StartNew(() => { Run("ss"); });
Task taskB = task.ContinueWith(a => { Console.WriteLine("hello"); },TaskContinuationOptions.OnlyOnFaulted); try
{
//说明taskB已经正确执行,则task必然有异常
taskB.Wait();
//输出task的异常
task.Exception.Handle(eachException => { Console.WriteLine(eachException.Message); return true; });
}
catch (Exception ex)
{
//说明taskB有异常,而task没有异常
}
AppDomain.CurrentDomain.UnhandledException异常事件
class Program
{
static Stopwatch clock = new Stopwatch();
static void Message(string text)
{
Console.WriteLine($"当前线程ID:{Thread.CurrentThread.ManagedThreadId},{clock.ElapsedMilliseconds}:0000:{text}");
}
static void Delay(int Milliseconds)
{
Message($"睡眠{Milliseconds}毫秒");
Thread.Sleep(Milliseconds);
Message("唤醒");
}
static void Main(string[] args)
{
try
{
clock.Start(); //计时
AppDomain.CurrentDomain.UnhandledException += (objSender, eventArgs) => { Message("事件被触发……"); Delay(); }; //注册异常事件
Thread thread = new Thread(() => { Message("即将抛出异常……"); throw new Exception(); }); //触发异常事件
thread.Start();
Delay();
//主线程ID是1,它将先执行Delay(2000);接着打印睡眠2000毫秒,再使主线程沉睡2秒,在主线程苏醒后,接下来thread.Start()将执行
//该线程ID是3,它主动将一个异常抛出,在抛出异常之前会进入Message方法打印相关的信息,打印完成后异常在被手动抛出之前会先触发UnhandledException事件
//UnhandledException事件调用Message方法打印相关的信息,打印完成后调用Delay方法使thread沉睡4秒后苏醒,再次打印相关信息后因为没有catch,所以最终异常会直接抛出
}
finally
{ }
}
}