static class MyParallel { //4.0及以上用Task, Task的背后的实现也是使用了线程池线程 //static List<Task> tasks = new List<Task>(); //4.0以下用Thread static List<Thread> tasks = new List<Thread>(); //队列 static Queue<int[]> datas = new Queue<int[]>(); public static void For(int start, int end, Action<int> action) { //逻辑处理器数量 var taskAllCount = Environment.ProcessorCount*2; #region 每次处理的量 int pageSize = end / taskAllCount; if (pageSize == 0) { pageSize = 1; } #endregion #region 分配处理范围 var totalCount = end; for (int pageIndex = 0; totalCount >= 0; pageIndex++) { int pageStart = pageIndex * pageSize; int pageEnd = pageIndex * pageSize + pageSize; datas.Enqueue(new int[] { pageStart, pageEnd }); //入队 totalCount -= pageSize; } #endregion #region 根据逻辑处理器数量创建任务,并启动 int i = 0; while (tasks.Count < taskAllCount) { tasks.Add(new Thread(() => { while (datas.Count > 0) { var one = new int[] { end, end }; lock (datas) { one = datas.Dequeue(); } for (int j = one.First(); j < end && j < one.Last(); j++) { action(j); } System.Threading.Thread.Sleep(10); } })); tasks[i].IsBackground = true; tasks[i].Start(); i++; } #endregion //等待所有任务结束 foreach (var task in tasks) { task.Join(); task.Abort(); } tasks.Clear(); //Task.WaitAll(tasks.ToArray()); } }
性能测试
var list = new List<int>(1000); for (int i = 0; i < 1000; i++) { list.Add(i); } string startTime = DateTime.Now.ToString(); int ii = 0; MyParallel.For(0, list.Count, x => { Thread.Sleep(10); Console.WriteLine($"{x.ToString()},{ Interlocked.Increment(ref ii)}"); }); Console.WriteLine("Parallel.For " + startTime + "," + DateTime.Now); Console.ReadKey(); startTime = DateTime.Now.ToString(); foreach (var item in list) { Thread.Sleep(10); Console.WriteLine(item.ToString()); } Console.WriteLine("for" + startTime + "," + DateTime.Now); Console.ReadKey();