C#的变迁史 - C# 4.0 之并行处理篇

  前面看完了Task对象,这里再看一下另一个息息相关的对象Parallel。

Parallel对象

  Parallel对象封装了能够利用多核并行执行的多线程操作,其内部使用Task来分装多线程的任务并试图将它们分配到不同的内核中并行执行。请注意“试图”这个词,Parallel对象相当具有智能性,当它判断任务集并没有从并行运行中受益,就会选择按顺序运行。这样的做法是因为并非所有的项目都适合使用并行开发,创建过多并行任务可能会损害程序的性能,降低运行效率。

  Parallel对象是静态类,它主要有3个静态方法:Invoke,For,ForEach。针对这3个方法,该对象也提供了多种不同的重载方法,使用起来相当的简单。先看一个简单的例子:

static void Main(string[] args)
{
Parallel.Invoke(
()=>Console.WriteLine("1st task!"),
()=>Console.WriteLine("2nd task!"));
}

  这个例子中的两个任务就是并行执行的,所以结果可能是第一个先完成,也可能是第二个先输出结果。是不是超级简单?有没有使用一下Parallel对象的冲动?

  下面这个网上的例子验证了一下运行时间上并行计算的优越性:

private const int count = ;
private static void M1()
{
Console.WriteLine("M1 is busy now");
for (int i = ; i < count; i++)
;
Console.WriteLine("M1 is Done");
}
private static void M2()
{
Console.WriteLine("M2 is busy now");
for (int i = ; i < count; i++)
;
Console.WriteLine("M2 is Done");
}
static void Main(string[] args)
{
// 顺序执行
DateTime start1 = DateTime.Now;
M1();
M2();
Console.WriteLine(DateTime.Now - start1); // 并行执行
DateTime start2 = DateTime.Now;
Parallel.Invoke(M1, M2);
Console.WriteLine(DateTime.Now - start2);
}

  在不同的机器上,得到的结果可能不同,但是基本上所有的多核机器上得到的结果一定是并行执行的时候耗时比较短,例子比较简单,但是道理确实很直接。

  通常来说,对于一个程序,性能提升的关键是将可以并行执行的同步程序改成并行执行。这个上面的例子也反应了修改后的效果。此外,对于程序来说,循环是影响复杂度的最直接的因素,这个我们看看教科书上计算算法时间复杂度的算法就知道了,所以提升循环的执行效率往往是提升程序效率的关键一步。Parallel对象充分考虑到了这一点,提供了循环的并行版本。

例子一:For循环。

static void Main(string[] args)
{
for (int i = ; i < ; i++) Console.Write("{0} ", i);
Console.WriteLine("by serial");
Parallel.For(, , (n) => Console.Write("{0} ", n));
Console.WriteLine("by parallel");
}

  从输出的结果你可以很容易发现后面的结果顺序完全是不固定的,这是并行的特征。

例子二:ForEach循环

static void Main(string[] args)
{
int [] a = {,,,,,,,,};
foreach (var n in a) Console.Write("{0} ",n);
Console.WriteLine("by serial");
Parallel.ForEach(a, (n) => Console.Write("{0} ", n));
Console.WriteLine("by parallel");
}

  结果也很明显,就不多说了。

  通过上面的两个例子,其实我们就能发现一些问题:

1. 顺序要求严格的操作不能使用Parallel对象的方法,这个原因很简单。

2. 并不是所有的for语句都可以用并行处理来实行,只有在循环开始前循环的次数已确定的情况下可以采用并行处理。同理,do语句和while语句也不能采用并行处理。因为所谓“并行”就是在判定为“循环结束”之前,首先要把将要执行的循环实现分配好。

  好了,既然是对循环的并行处理,那就避不开break与continue的问题,也就是循环的主动中止问题。

循环的主动中止

  在Parallel对象中,也可以主动中止循环的执行:调用ParallelLoopState实例的Stop方法和Break方法,可以停止和中断当前循环的执行。其中,

1. Break 告知 Parallel 循环应在系统方便的时候尽早停止执行当前迭代之外的迭代,当前迭代之前的迭代任然会完成。
2. Stop 告知 Parallel 循环应在系统方便的时候尽早停止执行,不管其他的线程执行到什么程度。
  通常使用Stop会立即停止循环,使用Break却会执行完毕当前迭代次序前面的迭代后停止循环。例如,对于从 0 到 1000 并行迭代的 for 循环,如果从第 100 此迭代开始调用 Break,则低于 100 的所有迭代仍会运行,从 101 到 1000 的迭代则不一定会执行,注意是“不一定”,因为是并行执行的,说不定某些次序在后面的迭代已经执行了。看一下例子:

static void Main(string[] args)
{
DemoStop();
DemoBreak();
} /// <summary>
/// 中断Stop
/// </summary>
static void DemoStop()
{
List<int> data = new List<int>(){ , , , , , , , , , };
Parallel.For(, data.Count, (i, LoopState) =>
{
if (i > )
LoopState.Stop();
Thread.Sleep();
Console.WriteLine(i);
});
Console.WriteLine("Stop执行结束。");
}
/// <summary>
/// 中断Break
/// </summary>
static void DemoBreak()
{
List<int> data = new List<int>() { , , , , , , , , , };
Parallel.ForEach(data, (i, LoopState) =>
{
if (i > )
LoopState.Break();
Thread.Sleep();
Console.WriteLine(i);
});
Console.WriteLine("Break执行结束。");
}

  运行一下,对比结果,细细体会一下输出的结果,我想你就会清楚Stop方法与Break方法的区别。

  当然了,前面讲的使用CancellationTokenSource取消线程的方式这里任然是适用的,不过需要通过ParallelOptions传给Parallel对象对应的重载方法。ParallelOptions对象还可以配置其他的一些参数,比如最大的并行数量(其实就是使用的最大内核数量)等等。看一个简单的例子:

CancellationTokenSource token = new CancellationTokenSource();
Task.Factory.StartNew(() =>
{
Thread.Sleep();
token.Cancel();
Console.WriteLine("Token Cancelled.");
}); ParallelOptions loopOptions = new ParallelOptions()
{
CancellationToken = token.Token,
MaxDegreeOfParallelism =
}; try
{
Parallel.For(, Int64.MaxValue, loopOptions, i =>
{
Console.WriteLine("i={0},thread id={1}", i, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep();
});
}
catch (OperationCanceledException)
{
Console.WriteLine("Exception...");
}

  讨论完了各种正常情况,下面来看一下不正常的情况:异常问题。

异常问题

  和普通的for/foreach中发生异常的表现一样,Parallel循环中的任何异常都会使整个循环终止,不过由于整个循环是分核同时进行的,因此整个循环不会立即终止,这个很好理解。循环中停止前所有的异常都会被封装在AggregateException的InnerExceptions中。捕获这些异常的方式很简单,使用try/catch就可以了,看一下下面的代码:

try
{
Parallel.For(, , (i) =>
{
throw new Exception(i.ToString());
});
}
catch (AggregateException ae)
{
foreach (var exp in ae.InnerExceptions)
{
Console.WriteLine(exp.Message);
}
}

  这段代码将会输出0-4的子集(也有可能是0-4全部输出,因为5个线程都很快)。

  不过,与Parallel.For和ForEach不一样的是,Parallel.Invoke总是会把所有任务都执行完,然后把所有的异常包装在AggregateException中。其实道理与上面的循环是一样的,都是把应该执行的任务执行完,来看这段代码:

try
{
Parallel.Invoke(() => { throw new Exception(""); },
() => { Thread.Sleep(); throw new Exception(""); },
() => { Thread.Sleep(); throw new Exception(""); });
}
catch (AggregateException ae)
{
foreach (var ex in ae.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
}

  结果会输出:3 2 1。

  除此以外,Task.WaitAll和Parallel.Invoke是类似,任何一个(或多个)Task的异常不会影响任何其他Task的执行。

try
{
var t1 = Task.Factory.StartNew(() =>
{
Thread.Sleep();
throw new Exception("");
}); var t2 = Task.Factory.StartNew(() =>
{
Thread.Sleep();
throw new Exception("");
}); Task.WaitAll(t1, t2);
}
catch (AggregateException ae)
{
foreach (var exp in ae.InnerExceptions)
{
Console.WriteLine(exp.Message);
}
}

  这段代码会输出:1 2。

  两个异常都会在AggregateException中的InnerExceptions属性中。不过很显然异常的顺序与上一个例子有点不同,这个需要注意一点。

  其实,在新的.NET类库中,不仅通过增加Parallel对象来增强并行处理的能力,而且在Linq语句中也有相应的增强,那就是PLinq。

PLinq简介

  PLINQ也就是Parallel Linq,它的使用方法是非常简单。
  下例本身没有什么太大意义,只不过是找出“2”,然后输出:

using System;
using System.Linq;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
int[] ar = { , , };
var q1 = from n in ar
where n ==
select n;
foreach (var n in q1)
{
Console.WriteLine("found {0}", n);
}
}
}

如果把上例改成用并行处理,只要在查询表达式中追加AsParallel方法就可以了:

var q1 = from n in ar.AsParallel()
where n ==
select n;

函数形式也是一样的。例如下面这个查询表达式:

var q1 = ar.Where((c) => c == );

改成并行执行也就是插入AsParallel方法就可以了:

var q1 = ar.AsParallel().Where((c) => c == );

  使用PLINQ是如此的简单,只要用一个方法就可以用并行来处理查询表达式了。但是,正如前面所讲的并行计算并不是适用于任何场合的灵丹妙药,它也有不太适用的场合:
1. 在大量使用查询表达式的时候,并不是每一句查询表达式都是性能瓶颈的关键,如果每一个查询表达式都插入AsParallel方法,不会带来太大好处,在浪费时间的同时,代码的可读性也降低了。
2. 插入AsParallel方法后,结果会发生变化,这个自然很好理解,因为并行执行了嘛,顺序得不到保证,所以与顺序有关的操作是适合使用同步操作的,并行执行就可能导致问题。

  其实AsParallel方法只是PLinq的基本入口点,在System.Linq.ParallelEnumerable类中,包含了并行查询的大部分其他有用的方法,比如:AsSequential(指定查询的其余部分应像非并行 LINQ 查询一样按顺序运行),AsOrdered(指定 PLINQ 应保留查询的其余部分的源序列排序,直到例如通过使用 orderby子句更改排序为止),AsUnordered(指定查询的其余部分的 PLINQ 不需要保留源序列的排序)等等方法。这个查看一下MSDN就可以了,使用起来还是比较方便的。也可查看博客园中的一些详细的文章,比如:http://www.cnblogs.com/leslies2/archive/2012/02/07/2320914.html

  并行计算就简单总结这些了,铭记一点:并行执行的任务要保证是顺序无关的,独立的。

上一篇:C#的变迁史 - C# 3.0篇


下一篇:C#的变迁史 - C# 4.0 之线程安全集合篇