.NET Framework 3.5 中引入了语言集成查询 (LINQ),它具有统一的模型,以类型安全方式查询任何 System.Collections.IEnumerable或 System.Collections.Generic.IEnumerable<T> 数据源。关于LINQ函数的使用,可以参考:https://www.cnblogs.com/zhaotianff/p/6236062.html
并行 LINQ (PLINQ) 是 LINQ 模式的并行实现
注意:以下示例代码仅是演示用,其运行速度可能不如等效的顺序LINQ查询快
1、如何进行PLINQ查询
ParallelEnumerable类提供了一个扩展方法AsParallel()可以并行执行LINQ查询
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 }; 14 15 var result = list.AsParallel().Where(x => x > 30); 16 } 17 } 18 }
2、PLINQ查询的参数
WithDegreeOfParallelism:指定 PLINQ 应用于并行化查询的处理器的最大数量。
WithExecutionMode:指定 PLINQ 应如何并行化查询(即使是当默认行为是按顺序运行查询时)。
WithMergeOptions:提供有关 PLINQ 应如何(如果可能)将并行结果合并回使用线程上的一个序列的提示。
ParallelMergeOptions 枚举值如下:
AutoBuffered | 2 |
利用系统选定大小的输出缓冲区进行合并。 在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。 |
Default | 0 |
使用默认合并类型,即 AutoBuffered。 |
FullyBuffered | 3 |
利用整个输出缓冲区进行合并。 在向查询使用者提供任何结果之前,系统会先累计所有结果。 |
NotBuffered | 1 |
不利用输出缓冲区进行合并。 一旦计算出结果元素,就向查询使用者提供这些元素。 |
WithCancellation:指定 PLINQ 应定期监视请求取消时所提供的取消标记的状态以及取消执行。(这个操作和线程的取消操作是一致的,如果不理解线程中的取消操作,可以访问:https://docs.microsoft.com/zh-cn/dotnet/standard/threading/cancellation-in-managed-threads)
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 var cts = new System.Threading.CancellationTokenSource(); 14 15 try 16 { 17 var result2 = list.AsParallel().Where(x => x > 30) 18 .WithDegreeOfParallelism(Environment.ProcessorCount) 19 .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 20 .WithMergeOptions(ParallelMergeOptions.Default) 21 .WithCancellation(cts.Token); 22 23 PrintResult(result2); 24 } 25 catch(OperationCanceledException) 26 { 27 Console.WriteLine("并行查询已被取消"); 28 } 29 } 30 31 32 static void PrintResult(IEnumerable<int> collection) 33 { 34 foreach (var item in collection) 35 { 36 Console.WriteLine(item); 37 } 38 } 39 } 40 }
2、PLINQ查询中常用的函数
AsOrdered:指定 PLINQ 应为查询的其余部分保留源序列的排序,或直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。
AsUnordered:指定保留源序列的排序不需要查询其余部分的 PLINQ。
AsSequential:指定查询的其余部分应像非并行的 LINQ 查询一样按顺序运行。
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static Func<int, bool> query => x => x > 30;
12 static void Main(string[] args) 13 { 14 var orderResult = list.AsParallel().AsOrdered().Where(query);//asc 15 //list.AsParallel().Where(query).OrderBy(x=>x);//asc 16 //list.AsParallel().Where(query).OrderByDescending(x => x);//desc 17 PrintResult(orderResult); 18 19 var unorderResult = list.AsParallel().AsUnordered().Where(query); 20 PrintResult(unorderResult); 21 22 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x)); 23 Console.WriteLine("\n\n"); 24 25 var linqResult = list.Where(query); 26 PrintResult(linqResult); 27 28 var pinqSequentialResult = list.AsParallel().AsSequential().Where(query); 29 PrintResult(pinqSequentialResult); 30 31 var forceExecutionMode = list.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).AsSequential().Where(query); 32 PrintResult(forceExecutionMode); 33 } 34 35 36 static void PrintResult(IEnumerable<int> collection) 37 { 38 foreach (var item in collection) 39 { 40 Console.WriteLine(item); 41 } 42 43 Console.WriteLine("\n\n"); 44 } 45 } 46 }