.Net多线程编程—System.Threading.Tasks.Parallel
System.Threading.Tasks.Parallel类提供了Parallel.Invoke,Parallel.For,Parallel.ForEach这三个静态方法。
1 Parallel.Invoke
尽可能并行执行所提供的每个操作,除非用户取消了操作。
方法:
1)public static void Invoke(params Action[] actions);
2)public static void Invoke(ParallelOptions parallelOptions,
params Action[] actions);
参数:
parallelOptions:一个对象,用于配置此操作的行为。
Actions:要执行的操作数组
异常:
对方法1:
System.ArgumentNullException: actions 参数为 null。
System.AggregateException:当 actions 数组中的任何操作引发异常时引发的异常。
System.ArgumentException:actions数组包含 null 个元素。
对方法2除上述异常外还包括:
System.OperationCanceledException:parallelOptions 设置了System.Threading.CancellationToken。
System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的System.Threading.CancellationTokenSource已被释放。
说明:
1)Invoke方法只有在actions全部执行完才会返回,即使在执行过程中出现异常也会完成。
2)不能保证actions中的所有操作同时执行。比如actions大小为4,但硬件线程数为2,那么同时运行的操作数最多为2。
3)actions中的操作并行的运行且与顺序无关,若编写与运行顺序有关的并发代码,应选择其他方法。
4)如果使用Invoke加载多个操作,多个操作运行时间迥异,总的运行时间以消耗时间最长操作为基准,这会导致很多逻辑内核长时间处于空闲状态。
5)受限的并行可扩展性,这源于Invoke所调用的委托数目是固定的。
2 Parallel.For
可能会并行运行迭代,可以监视和操作循环的状态。Parallel.For有多个重载的方法,下面列举部分方法。
方法:
1)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body);
2)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body);
3)public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body);
4)public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
参数:
fromInclusive:开始索引(含)。
toExclusive:结束索引(不含)。
body:将被每个迭代调用一次的委托。
parallelOptions:一个对象,用于配置此操作的行为。
localInit:一个委托,用于返回每个任务的本地数据的初始状态。
localFinally:一个委托,用于对每个任务的本地状态执行一个最终操作。
返回结果:
ParallelLoopResult :包含有关已完成的循环部分的信息。
异常:
System.ArgumentNullException:body 参数为 null,或 localInit 参数为 null,或 localFinally 参数为 null,或 parallelOptions 参数为 null。 System.AggregateException:包含在所有线程上引发的全部单个异常的异常。
对于方法3)和4)除包含以上异常外还包括:
System.OperationCanceledException:在 parallelOptions 设置了参数 System.Threading.CancellationToken。
System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的 System.Threading.CancellationTokenSource已被释放。
说明:
1)不支持浮点和步进。
2)无法保证迭代的执行顺序。
3)如果fromInclusive大于或等于toExclusive,方法立即返回而不会执行任何迭代。
4)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。
5)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。
3 Parallel.ForEach
方法
1)public static ParallelLoopResult ForEach(IEnumerable<TSource> source, Action<TSource> body);
2)public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body);
3)public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body);
参数:
source:数据源
body:将被每个迭代调用一次的委托。
parallelOptions:一个对象,用于配置此操作的行为。
返回结果:
ParallelLoopResult :包含有关已完成的循环部分的信息。
异常:
System.ArgumentNullException:source 参数为 null。-或- 方body 参数为 null。
System.AggregateException:包含了所有线程上引发的全部单个异常。
对于方法2)还包括:
System.OperationCanceledException:在 parallelOptions 设置了参数 System.Threading.CancellationToken。
System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的 System.Threading.CancellationTokenSource已被释放。
对于3)包括的异常为:
System.ArgumentNullException:source 参数为 null。-或- 方body 参数为 null。
System.InvalidOperationException:source 分区程序中的 System.Collections.Concurrent.Partitioner<TSource>.SupportsDynamicPartitions 属性返回 false。或 在 source 分区程序中的任何方法返回 null 时引发异常。或在source 分区程序中的 System.Collections.Concurrent.Partitioner<TSource>.GetPartitions(System.Int32)方法不返回正确数目的分区。
说明:
1)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。
2)Parallel.ForEach方法不保证执行顺序,它不像foreach循环那样总是顺序执行。
3)对于方法3)中的source,它的类型是Partitioner<TSource>。可以使用Partitioner.Create方法创建分区,该方法的几个重整方法为:
l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive);
l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize);
fromInclusive为范围下限(含),toExclusive为范围下限(不含),rangeSize为每个子范围的大小。
使用Partitioner创建的子范围大小默认大约是计算机内核的三倍,而当使用rangeSize指定范围大小时,那么子范围大小为指定值。
4)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。
4 ParallelOptions
定义:
存储选项,用于配置 System.Threading.Tasks.Parallel 类的方法。
ParallelOptions属性:
1)public CancellationToken CancellationToken { get; set; }
获取或设置传播有关应取消操作的通知。
2)public int MaxDegreeOfParallelism { get; set; }
获取或设置此 ParallelOptions 实例所允许的最大并行度。
3)public TaskScheduler TaskScheduler { get; set; }
获取或设置与此 System.Threading.Tasks.ParallelOptions 实例关联的 System.Threading.Tasks.TaskScheduler
说明:
1)通过设置CancellationToken来取消并行循环,当前正在运行的迭代会执行完,然后抛出System.OperationCanceledException类型的异常。
2)TPL的方法总是会试图利用所有可用内核以达到最好的效果,但是很可能.NET Framework内部使用的启发式算法所得到的注入和使用的线程数比实际需要的多(通常都会高于硬件线程数,这样会更好地支持CPU和I/O混合型的工作负载)。
通常将最大并行度设置为小于等于逻辑内核数。如果设置为等于逻辑内核数,那么要确保不会影响其他程序的执行。设置为小于逻辑内核数是为了有空闲内核来处理其他紧急的任务。
用途:
1)从循环外部取消并行循环
2)指定并行度
3)指定自定义任务调度程序
5 ParallelLoopState
定义:
可使并行循环迭代与其他迭代交互。 此类的实例由 Parallel 类提供给每个循环;不能在用户代码中创建实例。
方法:
1)Break()方法:通知并行循环在执行完当前迭代之后尽快停止执行,可确保低索引步骤完成。且可确保正在执行的迭代继续运行直到完成。
2)Stop()方法:通知并行循环尽快停止执行。对于尚未运行的迭代不能会尝试执行低索引迭代。不保证所有已运行的迭代都执行完。
用途:提早退出并行循环。
说明:
1)不能同时在同一个并行循环中同时使用Break和Stop。
2)Stop比Break更常用。break语句用在并行循环中的效果和用在串行循环中不同。Break用在并行循环中,委托的主体方法在每次迭代的时候被调用,退出委托的主体方法对并行循环的执行没有影响。Stop停止循环比Break快。
6 ParallelLoopResult结构
定义:
并行循环运行结果的信息。
属性:
1)public bool IsCompleted { get; }
如果该循环已运行完成(该循环的所有迭代均已执行,并且该循环没有收到提前结束的请求),则为 true;否则为 false。
2)public long? LowestBreakIteration { get; }
返回一个表示从中调用 Break 语句的最低迭代的整数
用途:判断当并行循环结束时,是否因调用了break方法或stop方法而提前退出并行循环,或所有迭代均已执行。
判断依据:
条件 |
|
IsCompleted |
运行完成 |
!IsCompleted && LowestBreakIteration==null |
使用了Stop语句而提前终止 |
!IsCompleted && LowestBreakIteration!=null |
使用了Break语句而提前终止 |
7 捕获并行循环中的异常
原则:
1)异常优先于从循环外部取消和使用Break()方法或Stop()方法提前退出并行循环。
2)并行循环体抛出一个未处理的异常,并行循环就不能再开始新的迭代。
3)默认情况下当某次迭代抛出一个未处理异常,那么正在执行的迭代如果没抛出异常,正在执行的迭代会执行完。当所有迭代都执行完(有可能其他的迭代在执行的过程中也抛出异常),并行循环将在调用它的线程中抛出异常。
并行循环运行的过程中,可能有多个迭代抛出异常,所以一般使用AggregateException来捕获异常。AggregateException继承自Exception。为了防止仅使用AggregateException未能捕获某些异常,使用AggregateException的同时还要使用Exception。
8 使用模式
8.1 Parallel.Invoke
1 public static void DemonstrateInvoke()
2 {
3 //使用Lambda
4 Parallel.Invoke(
5 () =>
6 {
7 //具体操作1
8 },
9 () =>
10 {
11 //具体操作2
12 });
13
14 //不使用lambda
15 Parallel.Invoke(Operation1, Operation2);
16 }
17
18 private static void Operation1()
19 {
20 //具体操作1
21 }
22
23 private static void Operation2()
24 {
25 //具体操作2
26 }
8.2 Parallel.For
1 串行循环:
2 int toExclusive = ...;
3 for(int i =0;i<=toExclusive;i++){};
4
5 对应的并行循环:
6 Parallel.For(0, toExclusive+1, (i) =>
7 {
8 //具体操作
9 });
8.3 Parallel.ForEach
1 一般用法
2 IEnumerable<string> coll = ...;
3 Parallel.ForEach(coll,(str)=>
4 {
5 //具体操作
6 });
7
8 基于分区的模式
9 优化分区数,使其最接近系统逻辑内核数:
10 子分区范围 = 对“待处理集合大小/系统逻辑内核数”取整+1。
11 int logicalCores =...;
12 IEnumerable<string> collP = ...;
13 int fromInclusive = ...;
14 int toExclusiv = ...;
15 int rangeSize = (int)((toExclusiv-fromInclusive )/logicalCores) +1;
16 Parallel.ForEach(Partitioner.Create(fromInclusive, toExclusiv, rangeSize), range =>
17 {
18 for (int i = range.Item1; i < range.Item2; i++)
19 {
20 //使用集合:collection[i]
21 }
22 });
8.4 从循环外部取消并行循环
注意:不使用IsCancellationRequested或ThrowIfCancellationRequested()的情况下无法捕获类型为AggregateException的异常。
1)对于Parallel.For
使用IsCancellationRequested属性
1 public static void CancelFromExternal()
2 {
3 CancellationTokenSource cts = new CancellationTokenSource();
4 //其他操作...
5
6 //异步执行Operation方法
7 Task.Factory.StartNew(()=>{Operation(cts);});
8 //异步执行condition的计算过程
9 Task.Factory.StartNew(()=>{
10 bool condition = ...;
11 if (condition) cts.Cancel();
12 }
13
14 //其他操作...
15 }
16
17 private static void Operation(CancellationTokenSource cts)
18 {
19 CancellationToken ct = cts.Token;
20 ParallelOptions op = new ParallelOptions { CancellationToken = ct };
21 int toExclusive = ...;
22 Parallel.For(0, toExclusive, op, (i) =>
23 {
24
25 //其他操作...
26
27 //return只对当前子线程有效
28 if (ct.IsCancellationRequested)
29 { return; }
30
31 //其他操作...
32 });
33 }
使用ThrowIfCancellationRequested()方法抛出异常
将上面的并行循环部分替换为下面的代码:
1 Parallel.For(0, toExclusive, op, (i) =>
2 {
3
4 //其他操作...
5
6 ct.ThrowIfCancellationRequested();
7
8 //其他操作...
9 });
不使用IsCancellationRequested和ThrowIfCancellationRequested()方法
将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉
2)对于Parallel.ForEach
使用IsCancellationRequested属性
1 public static void CancelFromExternal()
2 {
3 //同1)中CancelFromExternal方法
4 }
5
6 private static void Operation(CancellationTokenSource cts)
7 {
8 CancellationToken ct = cts.Token;
9 ParallelOptions op = new ParallelOptions { CancellationToken = ct };
10 IEnumerable<string> coll = new List<string> { "str1", "str2" };
11 Parallel.ForEach(coll, op,(str, loopState) =>
12 {
13 //其他操作...
14
15 //return只对当前子线程有效
16 if (ct.IsCancellationRequested)
17 { return; }
18
19 //其他操作...
20 });
21 }
使用ThrowIfCancellationRequested()方法抛出异常
将Operation方法中的:
if (ct.IsCancellationRequested)
{ return; }
替换为:
ct.ThrowIfCancellationRequested();
不使用IsCancellationRequested和ThrowIfCancellationRequested()方法
将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉
8.5 指定并行度
1 int maxDegreeOfParallelism = Environment.ProcessorCount - 1;
2 ParallelOptions op = new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism };
3 IEnumerable<string> coll = new List<string> { };
4 Parallel.ForEach(coll, op ,(str) =>
5 {
6 //具体操作
7 });
8.6 提早退出并行循环
1)对于Parallel.For
1 int toExclusive = 10;
2 Parallel.For(0, toExclusive, (i, loopState) =>
3 {
4 //其他操作...
5 //计算condition
6 bool condition = ...;
7 if (condition)
8 {
9 loopState.Break();//或使用loopState.Stop
10 return;
11 }
12
13 //其他操作
14 });
2)对于Parallel.ForEach
1 IEnumerable<string> coll = new List<string> {"str1","str2" };
2 Parallel.ForEach(coll, (str, loopState) =>
3 {
4 //其他操作...
5
6 //计算condition
7 bool condition = ...;
8 if (condition)
9 {
10 loopState.Break();//或使用loopState.Stop
11 return;
12 }
13
14 //其他操作
15
16 });
9 异常处理模式
基本形式
在确保使用AggregateException 能够捕捉到所有的异常时,可以省去catch(Exception e)的部分。
1 try
2 {
3 //Do something
4 }
5 catch(AggregateException e)
6 {
7 Foreach(Exception ex in e.InnerExceptions)
8 {
9 //Do something
10 }
11 }
12 catch(Exception e)
13 {
14 //Do something
15 }
为上述并行循环使用模式添加异常处理机制
一种方式是把并行循环放入try块中,另一种方式是在每次迭代的过程中捕获异常。