进程-线程-多线程-异步

进程-线程-多线程-异步

一、多线程的本质

  • cpu的计算速度太快了导致硬件跟不上,就是木桶原理(盛水多少取决于最短板)。
  • cpu的计算能力很强,所以cpu可以分“时间片”,一个cpu可以分N+个时间片,每个时间片上跑一个线程(代码流)。cpu按顺序执行时间片,因为cpu太强,线程切换的太快,导致人感觉不出执行的卡顿,感觉上是多线程并发的。
  • 微观角度:一个核同一时刻只能执行一个线程;
  • 宏观角度: 是多线程并发的。
  • cpu分时间片执行线程,线程所需资源的不同会导致有 上下文的切换 ,其实上下文的切换会有性能的损失,但因为cpu计算能力超过其它硬件太多,不损失也是浪费。
  • 扩展:4核8线程—核是物理的核,这里的线程是指虚拟核(一个核虚拟出2个核)。

1、并发多线程的启动、结束顺序

并发线程的启动是无序。
执行相同代码的并发线程的运行时间也不相同,所以结束时间也不同。

a、如何控制多线程的调用顺序

  • 可以用委托提供的BeginInvoke方法,它是异步线程方法,并支持回调函数,用它来控制线程调用顺序。
    代码:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace Thread_Task_Async
{

    public class MoreThread
    {
        public void DoSometing(string name)
        {
            Console.WriteLine($"{name}____{Thread.CurrentThread.ManagedThreadId}");
        }

        public void Do()
        {
            Console.WriteLine($"主线程开始");
            Action<string> action = DoSometing;//无返回值的委托
            IAsyncResult asyncResult = null;//这是BeginInvoke的返回值类型。
            AsyncCallback callback = e =>  //这是回调函数
            {
                Console.WriteLine(object.ReferenceEquals(asyncResult, e));
                Console.WriteLine(e.AsyncState);
                Console.WriteLine($"回调函数完成   {Thread.CurrentThread.ManagedThreadId}");
            };
            //BeginInvoke是异步调用委托,callback是回调函数。
            //BeginInvoke的返回值是回调函数中的参数
            //第三个参数是object类型,它是BeginInvoke的返回值中的字段AsyncState,可以用它传递任何你想给回调函数传的东西。
            asyncResult = action.BeginInvoke("Do", callback, "xxxx");
            Console.WriteLine($"主线程完成");
        }
    }
}

b、阻塞主线程等待子线程

  • 方法一:IAsyncResult类是委托BeginInvoke方法的返回值,IAsyncResult的IsCompleted字段标志这委托是否完成。可以用IsCompleted字段写个循环阻塞主线程。
  • 方法二:IAsyncResult.AsyncWaitHandle.WaitOne();此方法可以阻塞主线程,等待子线程。
  • 方法三:委托的EndInvoke(IAsyncResult)方法可以阻塞主线程,等待子线程。并且该方法带返回值。

二、异步操作的本质

所有的程序最终都会由计算机硬件来执行,所以为了更好的理解异步操作的本质,我们有必要了解一下它的硬件基础。 熟悉电脑硬件的朋友肯定对DMA这个词不陌生,硬盘、光驱的技术规格中都有明确DMA的模式指标,其实网卡、声卡、显卡也是有DMA功能的。DMA就是直接内存访问的意思,也就是说,拥有DMA功能的硬件在和内存进行数据交换的时候可以不消耗CPU资源。只要CPU在发起数据传输时发送一个指令,硬件就开始自己和内存交换数据,在传输完成之后硬件会触发一个中断来通知操作完成。这些无须消耗CPU时间的I/O操作正是异步操作的硬件基础。所以即使在DOS这样的单进程(而且无线程概念)系统中也同样可以发起异步的DMA操作。

三、线程

1、Thread

  • Thread是C#推出的最早的封装了操作线程的类。
  • Thread中的Suspend()方法功能是线程挂起,C#现已经弃用,因为此方法在线程被挂起后仍然保持对资源的占有,使其它线程无法访问。
  • Thread中的Resume()方法的功能是唤醒线程,C#现已弃用。
  • Thread中的Abort()方法的功能是销毁,方式是抛出异常。不建议用。(主线程告诉子线程Abort时,需要时间,所以子线程停止运行会不及时,并且有些动作不能停止。)
  • Thread.ResetAbort();//取消异常。
  • Thread.Join();//当前线程等待thread完成。
  • Thread.Sleep(100);//当前线程休息100ms,线程在等待时cpu会把时间片交出去,也就是说cpu的这个时间片可以做别的事,但是内存是被占有的。
  • Thread.IsBackground = true;//true为 后台线程 ,false为 前台线程 ,默认为false。
  • Thread.Priority = ThreadPriority.Highest;// 设置线程优先级,CPU会给优先级高的线程分配时间片。

2、ThreadPool

  • DotNetFramework 2.0推出的
  • ThreadPool.QueueUserWorkItem(); // 把用户的工作队列进线程池。
  • ThreadPool.GetMaxThreads(); // 获取最大线程数,和最大I/O线程数
  • ThreadPool.GetMinThreads(); // 获取最小线程数,和最小I/O线程数
  • ThreadPool.SetMaxThreads() // 设置最大线程数,和最大I/O线程数
  • ThreadPool.SetMinThreads() // 设置最小线程数,和最小I/O线程数
  • ThreadPool没有提供阻塞主线程等待子线程完成的方法。
    可以用ManualResetEvent类,它有一个bool值的字段A,一个WaitOne()方法、Set()方法、Reset()方法。
    //A为false时,线程执行到WaitOne方法时,会被等待。
    //A为true时,线程执行到WaitOne方法时,可以通过。
    //Set()方法,会将A赋值为true。
    //Reset()方法,会将A赋值为flase。
    根据此类的特性,可以用它来实现“阻塞主线程等待子线程完成”的功能。
  • 回调函数可以自己模拟出来。
    做法:将两个委托传递给一个方法,此方法开一个线程顺序执行这两个委托。
  • 对ThreadPool进行设置,对Task和TaskFactory是生效,因为它们是基于ThreadPool的。

3、Task

  • DotNetFramework 3.0 的产物
  • Task 是基于ThreadPool的
  • Task.Run(委托) // 线程的启动方式。
  • Task(委托).Start() // 线程的启动方式
  • Task.WaitAll(Task数组) // 等待所有线程完成
  • Task.WaitAny(Task数组) // 等待任一线程完成
  • Task.WhenAll(Task数组) // 等待所有线程完成,并且返回值为Task,作用是返回Task后可以使用链式编程。
  • Task.WhenAny(Task数组) //同上
  • Task.ContinueWith(委托) // 此委托也是异步线程调用的(它也相当与回调函数)。
  • 如何控制线程的并发数量,不要用设置ThreadPool的线程池线程数量来控制。 用下面的方法。
    如下代码:
 private void button2_Click(object sender, EventArgs e)
        {
            Console.WriteLine("22222222222222222");
            List<int> list = new List<int>();
            for(int i =0; i<10000;i++)
            {
                list.Add(i);
            }
            Action<int> action = i =>
            {
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
                Thread.Sleep(new Random(i).Next(100, 300));
            };
            List<Task> taskList = new List<Task>();
            foreach(var i in list)
            {
                var k = i;
                taskList.Add(Task.Run(() => action.Invoke(k)));
                if(taskList.Count > 10)
                {
                    Task.WaitAny(taskList.ToArray());
                    taskList = taskList.Where(t => t.Status != TaskStatus.RanToCompletion).ToList();
                }
            }
            Task.WhenAll(taskList.ToArray());
            Console.WriteLine("22222222222222222");
        }
  • 建议:把Task线程放在List 里比较好控制。
  • Task.Delay(2000) // 延迟2秒,它和Sleep的区别是Delay是不卡界面的,也就是说Delay是另起了一个线程。
    Task.Delay(2000).ContinueWith(委托) // 用户在执行一个操作后延迟2秒,在执行委托里的操作。

4、TaskFactory

  • DotNetFramework 4.0 的产物
  • TaskFactory.StartNew(委托) // 线程的启动方式。
  • TaskFactory.ContinueWhenAll(Task数组,回调委托) // 它是Task.WhenAny和Task.ContinueWith的结合。

5、Parallel

  • 并行编程
  • 是在Task的基础上做的封装
  • Parallel,主线程参与计算,节约了一个线程。
  • Parallel.Invoke(委托数组) // 启动线程,主线程参与计算
  • Parallel.For(0,5, i => {…} ) //执行5个线程,主线程 参与计算
  • Parallel.ForEach(object[] object , i => {…} ) // 执行线程,主线程 参与计算
  • ParallelOptions.MaxDegreeOfParallelism = 3 // 控制并发数量
  • Parallel支持结束多线程的本次线程和结束Parallel所有线程
    如下代码:
    Stop()和Break()不能同时存在于一个线程中。不推荐用这两个方法。
    需要注意的是Parallel没有浪费主线程,而Break()是结束当前线程,如果碰巧该Break()结束的是主线程,那么其他线程也会被干掉。
private void button3_Click(object sender, EventArgs e)
        {
            ParallelOptions parallelOptions = new ParallelOptions();
            parallelOptions.MaxDegreeOfParallelism = 3;
            Parallel.For(0, 40, parallelOptions, (i, state) =>
            {
                Console.WriteLine($"111111111111111111111111111111----{Thread.CurrentThread.ManagedThreadId}");
                //if (i == 2)
                //{
                //    Console.WriteLine("线程取消,当前任务结束");
                //    state.Break();//当前这次线程结束
                //    return;//有return代表此代码之后的操作不执行。 
                //}
                if (i == 20)
                {
                    Console.WriteLine("线程取消,当前任务结束");
                    state.Stop();   //  结束Parallel所有线程
                    return;//有return代表此代码之后的操作不执行。 
                }
                Console.WriteLine($"22222222222222222222222222----{Thread.CurrentThread.ManagedThreadId}");
            });
        }

6、async await 语法糖

  • 机制: 在async标记的方法中,以 await为界限;await之前的代码由主线程执行,await之后的代码其实被封装成了委托回调函数。该委托会在await task执行完成之后被调用,该委托也是线程的。
  • 主线程遇到await就会返回做自己的事情。
  • async await 是用状态机模式实现的。await task(委托)是一块,await 之前是一块,await之后是一块。主线程先执行await之前的代码,主线程遇到await启动子线程之后就从方法体里返回了,去做自己的事了。由子线程执行await task委托里的代码;子线程执行完之后;await之后的代码就会由任意线程执行。
  • async里的三块代码是顺序执行的。

四、异常处理

  • 异步线程里面的异常是被吞掉了,因为主线程执行完之后已经脱离try catch的范围了。
  • 如果想抓到线程里的异常,就需要await 线程。
  • AggregateException是专门处理线程的异常类。
  • 异常处理的建议:线程里不允许出现异常,自己处理好。

五、线程取消

  • task是外部无法中止的(你只能告诉OS,我要中止这个线程,而OS在接到线程中止命令后什么时候相应,你无法知道),因为线程是OS的资源,你无法掌控线程是什么时候取消的。(可能会有延迟)
  • Thread.Abort不靠谱,Abort是告诉OS取消这个线程,它有可能马上停止,也有可能有延迟(是不可控制的)。
  • 举个例子:
    你有个操作是写数据库的。Abort告诉OS停止后,OS可能有延迟,在这延迟的这段时间,cpu就有可能已经执行到数据库去写数据了。这就与我的意愿相违背了。(注意:数据库是基于http请求的,不可取消)
  • 线程取消的方法:
    原理:要让线程自己停止自己,声明一个多线程的公共访问变量,想让其他线程停止时,就改变这个公共变量的值,其他线程不断的检测它,根据值判断是否抛出 异常。
  • CancellationTokenSource此类时微软封装的标志任务是否取消,Cancel方法可使cancellationToken变量标志取消,IsCancellationRequested可以读取this是否标志取消
  • 代码参考:
private void button4_Click(object sender, EventArgs e)
        {
            try
            {
                List<Task> taskList = new List<Task>();
                TaskFactory taskFactory = new TaskFactory();
                CancellationTokenSource cts = new CancellationTokenSource();
                for (int i = 0; i < 40; i++)
                {
                    string name = String.Format($"this my work{i}");
                    Action<object> act = t =>
                    {
                        try
                        {
                            Thread.Sleep(2000);
                            if(t.ToString().Equals("this my work11"))
                            {
                                throw new Exception(string.Format($"{t} 执行失败"));
                            }
                            if (t.ToString().Equals("this my work12"))
                            {
                                throw new Exception(string.Format($"{t} 执行失败"));
                            }
                            if (cts.IsCancellationRequested)
                            {
                                Console.WriteLine($"{t}放弃执行");
                                return;
                            }
                            else
                            {
                                Console.WriteLine($"{t}执行成功");
                            }
                        }
                        catch (Exception ex)
                        {
                            cts.Cancel();
                            Console.WriteLine(ex.Message);
                        }
                    };
                    taskList.Add(taskFactory.StartNew(act, name, cts.Token));
                }
                Task.WaitAll(taskList.ToArray());
            }
            catch(AggregateException ae)
            {
                foreach(var item in ae.InnerExceptions)
                {
                    Console.WriteLine(item.Message);
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

六、线程安全

  • 多线程访问公共变量时可能会出问题。
  • 线程安全问题分析:
    代码:
private void button5_Click(object sender, EventArgs e)
        {
            #region 此代码存在线程安全问题
            TaskFactory taskFactory = new TaskFactory();
            List<Task> taskList = new List<Task>();
            int TotalCount = 0;
            List<int> intList = new List<int>();
            for (int i = 0; i < 1000; i++)
            {
                int newI = i;
                taskList.Add(taskFactory.StartNew(() =>
                {
                    TotalCount += 1;//TotalCount是公共资源,多线程用此资源存在问题(有可能两个线程同时获取到此资源进行+1,这样就少加了一个1).
                    intList.Add(newI);//这个同理
                }));
            }
            Task.WaitAll(taskList.ToArray());
            Console.WriteLine($"{TotalCount}");
            Console.WriteLine($"{taskList.Count}");
            Console.WriteLine($"{intList.Count}"); 
            #endregion
        }
  • 三种解决方法:
    第一种: lock解决,因为只有一个线程可以进去,没有并发,所以解决了问题,但是牺牲了性能。
  1. 用的时候注意尽量缩小lock的范围。
  2. 声明锁,必须是引用类型,值类型无效, 用private可以保证其他类不会锁此资源,static保证全局唯一性(也可以不声明)。
  3. 锁不可以声明为string类型,虽然它是引用类型,但是因为它用了享元模式。
private static readonly object ClickLock = new object();//这是微软推荐的锁的写法。
private void button5_Click(object sender, EventArgs e)
        {
            #region lock解决
            {
                TaskFactory taskFactory = new TaskFactory();
                List<Task> taskList = new List<Task>();
                int TotalCount = 0;
                List<int> intList = new List<int>();
                for (int i = 0; i < 1000; i++)
                {
                    int newI = i;
                    taskList.Add(taskFactory.StartNew(() =>
                    {
                        lock (ClickLock)
                        {
                            TotalCount += 1;
                            intList.Add(newI);
                        }
                        
                    }));
                }
                Task.WaitAll(taskList.ToArray());
                Console.WriteLine($"{TotalCount}");
                Console.WriteLine($"{taskList.Count}");
                Console.WriteLine($"{intList.Count}");
            }
            #endregion
        }

第二种: 安全队列 ConcurrentQueue,用一个线程去完成操作(不止单线程,还用了其他技术)。
第三种: 使用多线程,注意不要冲突,进行数据拆分,避免冲突。也是最推荐的一种。

进程-线程-多线程-异步进程-线程-多线程-异步 Core松 发布了8 篇原创文章 · 获赞 0 · 访问量 110 私信 关注
上一篇:activiti流程动态创建


下一篇:业务序号重排序