C# 队列Queue,ConcurrentQueue,BlockingCollection 并发控制lock,Monitor,信号量Semaphore

什么是队列?

队列Queues,是一种遵循先进先出的原则的集合,在.netCore中微软给我们提供了很多个类,就目前本人所知的有三种,分别是标题提到的:Queue、ConcurrentQueue、BlockingCollection,下面分别介绍一下。

我想学习过数据结构应该很清楚,如果没有仔细了解,只要记住队列是一个先进先出的列表即可,列表中可以是线程,可以是预备执行的函数的入口,可以是地址,可以是数据,在C#中,Queue<T> 类可以实现队列,这一个类可以简单的让我们完成数据的插入和获取,可以在便利性这一块十分出众的。

Queue队列就是先进先出。它并没有实现 IList,ICollection。所以它不能按索引访问元素,不能使用Add和Remove。下面是 Queue的一些方法和属性

  Enqueue():在队列的末端添加元素

  Dequeue():在队列的头部读取和删除一个元素,注意,这里读取元素的同时也删除了这个元素。如果队列中不再有任何元素。就抛出异常

我们将建立一个存储String数据的队列,为了实用,队列能够用多线程的方式来插入,后台将有一个长时间运行的线程来不断将数据从队列取出并执行我们编写的内容。因此需要规划多线程之间资源冲突如何避免的措施

使用Queue类的Enqueue函数来添加,这里定义一个Enqueue函数来添加数据到队列,可以看到有一个object类的锁,它的作用就是防止冲突

当读取数据时候,上锁,防止此时写入数据,当线程读取到-1时候,线程退出程序结束。

微软给我们所提供的这些源码:

  • 队列 Queue ;
  • 泛型队列 Queue<T>;
  • 阻塞泛型集合 BlockingCollection<T>
  • 以及微软强大的并行库中的并发泛型队列 ConcurrentQueue<T>

 我们着重看一下泛型队列和并发泛型队列

ConcurrentQueue是一个先进先出的队列,常用的有以下几个方法:

  • Enqueue(T item)  将对象添加到队列的结尾处
  • TryDequeue(out T result)  尝试移除并返回并发队列开头处的对象
  • TryPeek(out T result)  尝试返回开头处的对象但不将其移除

Queue适用范围:单线程的队列,Queue是最简单的队列的一个实现,但是对于多线程的情况它就显得捉襟见肘了,因为它不是线程安全的,下面粗略的写一下用法,

Queue<string> messageQueue = new Queue<string>();//创建队列

    messageQueue.Enqueue("Hello");//向队列里添加元素

    messageQueue.Enqueue("World!")

    Console.WriteLine(messageQueue.Dequeue());//从队列里取出元素

    Console.WriteLine(messageQueue.Dequeue());

ConcurrentQueue很好的解决了Queue的线程不安全的问题,所以它是线程安全,它的应对场景就是多线程了

ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>();//创建实例

    concurrentQueue.Enqueue("Hello");//向队列里添加元素

    concurrentQueue.Enqueue("World!");

    while(concurrentQueue.TryDequeue(out message))//c‘v取出元素

    {

        Console.WriteLine(message);

    }

BlockingCollection属于功能最强大的一个队列,它不仅对于线程是安全的,而且它还可以定义队列中最多可以有多少个元素,应用场景比如秒杀活动等。

BlockingCollection<string> blockingCollection = new BlockingCollection<string>(2);//创建队列实例,其中2就是代表着此队列中元素个数的最大值

    blockingCollection.Add("Hello");//添加元素到队列中

    blockingCollection.Add("World!");

    blockingCollection.Add("Good");

    blockingCollection.Add("Evening!");

blockingCollection.take();//从队列中取出元素

总结:单线程用Queue,多线程用ConcurrentQueue,多线程又要限制元素个数用BlockingCollection。

Lock

说到并发控制,我们首先想到的肯定是 lock关键字。

这里要说一下,lock锁的究竟是什么?是lock下面的代码块吗,不,是locker对象。

我们想象一下,locker对象相当于一把门锁(或者钥匙),后面代码块相当于屋里的资源。

哪个线程先控制这把锁,就有权访问代码块,访问完成后再释放权限,下一个线程再进行访问。

注意:如果代码块中的逻辑执行时间很长,那么其他线程也会一直等下去,直到上一个线程执行完毕,释放锁。

Monitor

Monitor是一个静态类(System.Threading.Monitor),功能与lock关键字基本一样,也是加锁,控制并发。

有两个重要的方法:

Monitor.Enter  //获取一个锁

Monitor.Exit   //释放一个锁

另外几个方法:

public static bool TryEnter(object obj, int millisecondsTimeout)  //相比于 public static void Enter(object obj) 方法,多了超时时间设置,如果等待超过一定时间,就不再等待了,另外,只有TryEnter返回值为true时,才能进入代码块。

public static bool Wait(object obj, int millisecondsTimeout)    //这个方法在已经获得锁权限的代码块中调用时,或暂时释放锁,等待一定时间后,重新获取锁权限,继续执行Wait后面的代码。 (真想不明怎么会有这种相互礼让的操作)

public static void Pulse(object obj)      //这个方法的解释是,通知在等待队列中的线程,锁对象状态改变。 (测试发现,此方法并不会真正改变锁定状态,只是通知的作用)

Semaphore 信号量

System.Threading.Semaphore

lock和Monitor加锁之后,每次只能有一个线程访问临界代码,信号量类似于一个线程池,线程访问之前获取一个信号,访问完成释放信号,只要信号量内有可用信号便可以访问,否则等待。

构造函数:

public Semaphore(int initialCount, int maximumCount)  //创建一个信号量,指定初始信号数量和最大信号数量。

几个重要方法:

public int Release        //代码注释的意思是:退出信号量,并返回之前的(可用信号)数量。 实际上,除了退出,这个方法每调用一次会增加一个可用信号,但数量达到最大数量时会抛异常。

public int Release(int releaseCount) //和上面的方法类似,上面的方法每次只释放一个信号,这个方法可以指定信号数量。

public virtual bool WaitOne    //等待一个可用信号

看下面的示例代码,如果只初始一个信号量, new Semaphore(1, 100),运行结果与lock和Monitor是一样的,两个方法交替执行,如果初始信号量为多个时, new Semaphore(3, 100),执行效率高的方法要占用更多的信号,从而执行更多次。

总结

lock是最常用的并发控制方式,Monitor的功能与lock类似,但使用复杂,非必须不建议使用。

Semaphore,信号量,是一个不错的功能,特定应用场景下非常实用。

ConcurrentQueue 是一个线程安全的队列,在多线程并发环境下使用,可避免由于并发引起的错误。(我们可以使用lock+Queue,实现ConcurrentQueue,自己感兴趣可以试一下)

BlockingCollection 带阻塞功能的 ConcurrentQueue ,没有可用数据的情况下,进入等待状态,防止循环访问,减少CPU资源浪费。

帮助类

 

 public class BlockingQueue
    {
        private readonly BlockingCollection<string> _messageQueue = new BlockingCollection<string>(1024);

        private readonly Thread _writerThread;

        public BlockingQueue()
        {
            _messageQueue = new BlockingCollection<string>(1024);
            _writerThread = new Thread(Process)
            {
                IsBackground = true,
            };
            //Task.Run(()=> { Process(); });
            _writerThread.Start();
        }
        public void Process()
        {
            while (!_messageQueue.IsCompleted)
            {
                var x = _messageQueue.Take();
            }
        }

        public void AddMessage(string message)
        {
            _messageQueue.Add(message);
        }
    }

 

C# 队列Queue,ConcurrentQueue,BlockingCollection 并发控制lock,Monitor,信号量Semaphore

上一篇:使用Spark的newAPIHadoopRDD接口访问kerberos认证过的hbase


下一篇:Windows server 上安装WorkStation VDA代理,实现单用户交付