C# 实现Parallel.For

static class MyParallel
{

    //4.0及以上用Task, Task的背后的实现也是使用了线程池线程
    //static List<Task> tasks = new List<Task>();

    //4.0以下用Thread
    static List<Thread> tasks = new List<Thread>();

    //队列
    static Queue<int[]> datas = new Queue<int[]>();
    public static void For(int start, int end, Action<int> action)
    {
        //逻辑处理器数量
        var taskAllCount = Environment.ProcessorCount*2;

        #region 每次处理的量
        int pageSize = end / taskAllCount;
        if (pageSize == 0)
        {
            pageSize = 1;
        }
        #endregion

        #region 分配处理范围

        var totalCount = end;
        for (int pageIndex = 0; totalCount >= 0; pageIndex++)
        {
            int pageStart = pageIndex * pageSize;
            int pageEnd = pageIndex * pageSize + pageSize;

            datas.Enqueue(new int[] { pageStart, pageEnd });  //入队
            totalCount -= pageSize;
        }

        #endregion


        #region 根据逻辑处理器数量创建任务,并启动

        int i = 0;
        while (tasks.Count < taskAllCount)
        {
            tasks.Add(new Thread(() =>
            {
                while (datas.Count > 0)
                {
                    var one = new int[] { end, end };

                    lock (datas)
                    {
                        one = datas.Dequeue(); 
                    }
                    for (int j = one.First(); j < end && j < one.Last(); j++)
                    {
                        action(j);
                    }

                    System.Threading.Thread.Sleep(10);
                }
            }));
            tasks[i].IsBackground = true;
            tasks[i].Start();
            i++;
        }
        #endregion


        //等待所有任务结束
        foreach (var task in tasks)
        {
            task.Join();
            task.Abort();
        }
        tasks.Clear();
        //Task.WaitAll(tasks.ToArray());
    }
}

 

性能测试

            var list = new List<int>(1000);
            for (int i = 0; i < 1000; i++)
            {
                list.Add(i);
            }
            string startTime = DateTime.Now.ToString();



            int ii = 0;
            MyParallel.For(0, list.Count, x =>
            {
                Thread.Sleep(10);
                Console.WriteLine($"{x.ToString()},{ Interlocked.Increment(ref ii)}");
            });
            Console.WriteLine("Parallel.For " + startTime + "," + DateTime.Now);

            Console.ReadKey();
            startTime = DateTime.Now.ToString();
            foreach (var item in list)
            {
                Thread.Sleep(10);
                Console.WriteLine(item.ToString());
            }
            Console.WriteLine("for" + startTime + "," + DateTime.Now);

            Console.ReadKey();

 

上一篇:jvm.concurrent.parallel.collection.map


下一篇:如何使用 jMeter Parallel Controller - 并行控制器以及一些常犯的错误