很简单的代码,看注释
class Program
{
private static ManualResetEvent _manualResetEvent = new ManualResetEvent(false);
// 参数值1,约束集合的大小。当_messageQueue.Add时大小已经为1时,Add会阻塞。可注释掉Thread.Sleep(5000);尝试
//private static BlockingCollection<string> _messageQueue = new BlockingCollection<string>(1);
// 不限大小
private static BlockingCollection<string> _messageQueue = new BlockingCollection<string>();
static void Main(string[] args)
{
startProducer();
startConsumer();
_manualResetEvent.WaitOne();
}
/// <summary>
/// 生产者
/// </summary>
private static void startProducer()
{
var ProducerThread = new Thread(() =>
{
while (true)
{
var message = Console.ReadLine();
if (message == "exit")
{
_manualResetEvent.Set();
return;
}
Console.WriteLine($"生产者发出消息: {message}");
_messageQueue.Add(message);
}
});
ProducerThread.IsBackground = true;
ProducerThread.Start();
}
/// <summary>
/// 消费者
/// </summary>
private static void startConsumer()
{
var consumerThread = new Thread(() =>
{
// 这里会一直执行下去, 有消息则会处理
foreach (var message in _messageQueue.GetConsumingEnumerable())
{
Console.WriteLine($"消费者收到消息:{message}");
//Thread.Sleep(5000);
}
});
consumerThread.IsBackground = true;
consumerThread.Start();
}
}