Thread t = new Thread(PrintNumbersWithDelay);
t.Start();
t.Join();
但我们在主程序中调用了t.Join方法,该方法允许我们等待直到线程t完成。当线程t完成时,主程序会继续运行。借助该技术可以实现在两个线程间同步。
当主程序和单独的数字打印线程运行时,我们等待6秒后对线程调用了t.Abort方法。这给线程注入了ThreadAbortException方法,导致线程被终结。这非常危险,因为该异常可以在任何时刻发生并可能彻底摧毁应用程序。另外,使用该技术也不一定总能终止线程。目标线程可以通过处理该异常并调用Thread.ResetAbort方法来拒绝被终止。因此并不推荐使用Abort方法来关闭线程。可优先使用一些其他方法,比如提供一个CancellationToken方法来取消线程的执行。
进程会等待所有的前台线程完成后再结束工作,但是如果只剩下后台线程,则会直接结束工作。
线程之间传递参数:
var threadThree = new Thread(() => CountNumbers());
threadThree.Name = "ThreadThree";
threadThree.Start();
threadThree.Join(); static void CountNumbers(int iterations)
{
for (int i = ; i <= iterations; i++)
{
Sleep(TimeSpan.FromSeconds());
WriteLine($"{CurrentThread.Name} prints {i}");
}
}
当主程序启动时,定义了两个将会抛出异常的线程。其中一个对异常进行了处理,另一个则没有。可以看到第二个异常没有被包裹启动线程的try/catch代码块捕获到。所以如果直接使用线程,一般来说不要在线程中抛出异常,而是在线程代码中使用try/catch代码块。
第一章 线程基础
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
第二章 线程同步
为了利用好这两种方式,可以使用混合模式(hybrid)。混合模式先尝试使用用户模式等待,如果线程等待了足够长的时间,则会切换到阻塞状态以节省CPU资源。
·执行基本的原子操作
而借助于Interlocked类,我们无需锁定任何对象即可获取到正确的结果。Interlocked提供了Increment、Decrement和Add等基本数学操作的原子方法,从而帮助我们在编写Counter类时无需使用锁
class CounterNoLock : CounterBase
{
private int _count; public int Count => _count; public override void Increment()
{
Interlocked.Increment(ref _count);
} public override void Decrement()
{
Interlocked.Decrement(ref _count);
}
}
·使用Mutex类
static void Main(string[] args)
{
const string MutexName = "CSharpThreadingCookbook"; using (var m = new Mutex(false, MutexName))
{
if (!m.WaitOne(TimeSpan.FromSeconds(), false))
{
WriteLine("Second instance is running!");
}
else
{
WriteLine("Running!");
ReadLine();
m.ReleaseMutex();
}
}
}
当主程序启动时,定义了一个指定名称的互斥量,设置initialOwner标志为false。这意味着如果互斥量已经被创建,则允许程序获取该互斥量。如果没有获取到互斥量,程序则简单地显示Running,等待直到按下了任何键,然后释放该互斥量并退出。
如果再运行同样一个程序,则会在5秒钟内尝试获取互斥量。如果此时在第一个程序中按下了任何键,第二个程序则会开始执行。然而,如果保持等待5秒钟第二个程序将无法获取到该互斥量
注意具名的互斥量是全局的操作系统对象!请务必正确关闭互斥量。最好是使用using代码块来包裹互斥量对象。
该方式可用于在不同的程序中同步线程,可被推广到大量的使用场景中。
·使用SemaphoreSlim类
static SemaphoreSlim _semaphore = new SemaphoreSlim(); static void AccessDatabase(string name, int seconds)
{
WriteLine($"{name} waits to access a database");
_semaphore.Wait();
WriteLine($"{name} was granted an access to a database");
Sleep(TimeSpan.FromSeconds(seconds));
WriteLine($"{name} is completed");
_semaphore.Release();
}
每个线程都尝试获取数据库的访问,但是我们借助于信号系统限制了访问数据库的并发数为4个线程。当有4个线程获取了数据库的访问后,其他两个线程需要等待,直到之前线程中的某一个完成工作并调用_semaphore.Release方法来发出信号。
·使用AutoResetEvent类
private static AutoResetEvent _workerEvent = new AutoResetEvent(false);
private static AutoResetEvent _mainEvent = new AutoResetEvent(false);
AutoResetEvent类采用的是内核时间模式,所以等待时间不能太长。使用ManualResetEventslim类更好,因为它使用的是混合模式。
·使用ManualResetEventSlim类
在前面某小节中,我们使用了一种无法在操作系统层面工作的混合模式。如果我们需要全局事件,则可以使用EventWaitHandle类,其是AutoResetEvent和ManualResetEvent类的基类
·使用CountDownEvent类
本节将描述如何使用CountdownEvent信号类来等待直到一定数量的操作完成。
static void Main(string[] args)
{
WriteLine("Starting two operations");
var t1 = new Thread(() => PerformOperation("Operation 1 is completed", ));
var t2 = new Thread(() => PerformOperation("Operation 2 is completed", ));
t1.Start();
t2.Start();
_countdown.Wait();
WriteLine("Both operations have been completed.");
_countdown.Dispose(); System.Threading.Thread.Sleep(TimeSpan.FromSeconds());
} static CountdownEvent _countdown = new CountdownEvent(); static void PerformOperation(string message, int seconds)
{
Sleep(TimeSpan.FromSeconds(seconds));
WriteLine(message);
_countdown.Signal();
}
当主程序启动时,创建了一个CountdownEvent实例,在其构造函数中指定了当两个操作完成时会发出信号。然后我们启动了两个线程,当它们执行完成后会发出信号。一旦第二个线程完成,主线程会从等待CountdownEvent的状态中返回并继续执行。针对需要等待多个异步操作完成的情形,使用该方式是非常便利的。
然而这有一个重大的缺点。如果调用_countdown.Signal()没达到指定的次数,那么_countdown.Wait()将一直等待。请确保使用CountdownEvent时,所有线程完成后都要调用Signal方法。
·使用Barrier类
Barrier类用于组织多个线程及时在某个时刻碰面。其提供了一个回调函数,每次线程调用了SignalAndWait方法后该回调函数会被执行。
每个线程将向Barrier发送两次信号,所以会有两个阶段。每次这两个线程调用Signal-AndWait方法时,Barrier将执行回调函数。这在多线程迭代运算中非常有用,可以在每个迭代结束前执行一些计算。当最后一个线程调用SignalAndWait方法时可以在迭代结束时进行交互。
static void Main(string[] args)
{
var t1 = new Thread(() => PlayMusic("the guitarist", "play an amazing solo", ));
var t2 = new Thread(() => PlayMusic("the singer", "sing his song", )); t1.Start();
t2.Start();
} static Barrier _barrier = new Barrier(, b => WriteLine($"End of phase {b.CurrentPhaseNumber + 1}")); static void PlayMusic(string name, string message, int seconds)
{
for (int i = ; i < ; i++)
{
WriteLine("----------------------------------------------");
Sleep(TimeSpan.FromSeconds(seconds));
WriteLine($"{name} starts to {message}");
Sleep(TimeSpan.FromSeconds(seconds));
WriteLine($"{name} finishes to {message}");
_barrier.SignalAndWait();
}
}
·使用ReaderWriterLockSlim类
使用ReaderWriterLockSlim来创建一个线程安全的机制,在多线程中对一个集合进行读写操作。ReaderWriterLockSlim代表了一个管理资源访问的锁,允许多个线程同时读取,以及独占写。
当主程序启动时,同时运行了三个线程来从字典中读取数据,还有另外两个线程向该字典中写入数据。我们使用ReaderWriterLockSlim类来实现线程安全,该类专为这样的场景而设计。
这里使用两种锁:读锁允许多线程读取数据,写锁在被释放前会阻塞了其他线程的所有操作。获取读锁时还有一个有意思的场景,即从集合中读取数据时,根据当前数据而决定是否获取一个写锁并修改该集合。一旦得到写锁,会阻止阅读者读
取数据,从而浪费大量的时间,因此获取写锁后集合会处于阻塞状态。为了最小化阻塞浪费的时间,可以使用EnterUpgradeableReadLock和ExitUpgradeableReadLock方法。先获取读锁后读取数据。如果发现必须修改底层集合,只需使用EnterWriteLock方法升级锁,然后快速执行一次写操作,最后使用ExitWriteLock释放写锁。
在本例中,我们先生成一个随机数。然后获取读锁并检查该数是否存在于字典的键集合中。如果不存在,将读锁更新为写锁然后将该新键加入到字典中。始终使用try/finally代码块来确保在捕获锁后一定会释放锁,这是一项好的实践。
static void Main(string[] args)
{
new Thread(Read){ IsBackground = true }.Start();
new Thread(Read){ IsBackground = true }.Start();
new Thread(Read){ IsBackground = true }.Start(); new Thread(() => Write("Thread 1")){ IsBackground = true }.Start();
new Thread(() => Write("Thread 2")){ IsBackground = true }.Start(); Sleep(TimeSpan.FromSeconds());
} static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static Dictionary<int, int> _items = new Dictionary<int, int>(); static void Read()
{
WriteLine("Reading contents of a dictionary");
while (true)
{
try
{
_rw.EnterReadLock();
foreach (var key in _items.Keys)
{
Sleep(TimeSpan.FromSeconds(0.1));
}
}
finally
{
_rw.ExitReadLock();
}
}
} static void Write(string threadName)
{
while (true)
{
try
{
int newKey = new Random().Next();
_rw.EnterUpgradeableReadLock();
if (!_items.ContainsKey(newKey))
{
try
{
_rw.EnterWriteLock();
_items[newKey] = ;
WriteLine($"New key {newKey} is added to a dictionary by a {threadName}");
}
finally
{
_rw.ExitWriteLock();
}
}
Sleep(TimeSpan.FromSeconds(0.1));
}
finally
{
_rw.ExitUpgradeableReadLock();
}
}
}
·使用SpinWait类
本节将描述如何不使用内核模型的方式来使线程等待。另外,我们介绍了SpinWait,它是一个混合同步构造,被设计为使用用户模式等待一段时间,然后切换到内核模式以节省CPU时间。
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
第三章 使用线程池
在线程池中调用委托
向线程池中放入异步操作
ThreadPool.QueueUserWorkItem( _ =>
{
WriteLine($"Operation state: {x + y}, {lambdaState}");
WriteLine($"Worker thread id: {CurrentThread.ManagedThreadId}");
Sleep(TimeSpan.FromSeconds());
}, "lambda state");
线程池与并行度
实现一个取消按钮
在线程池中使用等待事件处理器及超时
使用计时器
使用backgroundwork组件
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
第四章 使用任务并行库
创建任务
Task.Run(() => TaskMethod("Task 3"));
使用任务执行基本的操作
组合任务
firstTask = new Task<int>(() =>
{
var innerTask = Task.Factory.StartNew(() => TaskMethod("Second Task", ),
TaskCreationOptions.AttachedToParent); innerTask.ContinueWith(t => TaskMethod("Third Task", ),
TaskContinuationOptions.AttachedToParent); return TaskMethod("First Task", );
});
将APM模式转换成任务
Task<string> task = Task<string>.Factory.FromAsync(
d.BeginInvoke("AsyncTaskThread", Callback,
"a delegate asynchronous call"), d.EndInvoke);
将EAP模式转化成任务
var tcs = new TaskCompletionSource<int>(); var worker = new BackgroundWorker();
worker.DoWork += (sender, eventArgs) =>
{
eventArgs.Result = TaskMethod("Background worker", );
}; worker.RunWorkerCompleted += (sender, eventArgs) =>
{
if (eventArgs.Error != null)
{
tcs.SetException(eventArgs.Error);
}
else if (eventArgs.Cancelled)
{
tcs.SetCanceled();
}
else
{
tcs.SetResult((int)eventArgs.Result);
}
}; worker.RunWorkerAsync(); int result = tcs.Task.Result; WriteLine($"Result is: {result}");
实现取消选项
var cts = new CancellationTokenSource();
var longTask = new Task<int>(() => TaskMethod("Task 1", , cts.Token), cts.Token);
处理任务中的异常
task = Task.Run(() => TaskMethod("Task 2", ));
int result = task.GetAwaiter().GetResult();
WriteLine($"Result: {result}");
var t1 = new Task<int>(() => TaskMethod("Task 3", ));
var t2 = new Task<int>(() => TaskMethod("Task 4", ));
var complexTask = Task.WhenAll(t1, t2);
并行运行任务
var whenAllTask = Task.WhenAll(firstTask, secondTask);
var completedTask = Task.WhenAny(tasks).Result;
使用TaskScheduler配置任务的执行
void ButtonAsyncOK_Click(object sender, RoutedEventArgs e)
{
ContentTextBlock.Text = string.Empty;
Mouse.OverrideCursor = Cursors.Wait;
Task<string> task = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext()); task.ContinueWith(t => Mouse.OverrideCursor = null,
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.FromCurrentSynchronizationContext());
}
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
第五章 使用c#6.0
使用await操作符获取异步任务结果
在lambda表达式中使用await操作符
static async Task AsynchronousProcessing()
{
Func<string, Task<string>> asyncLambda = async name => {
await Task.Delay(TimeSpan.FromSeconds());
return
$"Task {name} is running on a thread id {CurrentThread.ManagedThreadId}." +
$" Is thread pool thread: {CurrentThread.IsThreadPoolThread}";
}; string result = await asyncLambda("async lambda"); WriteLine(result);
}
对连续的异步任务使用await操作符
对并行执行的异步任务使用await操作符
static async Task AsynchronousProcessing()
{
Task<string> t1 = GetInfoAsync("Task 1", );
Task<string> t2 = GetInfoAsync("Task 2", ); string[] results = await Task.WhenAll(t1, t2);
foreach (string result in results)
{
WriteLine(result);
}
} static async Task<string> GetInfoAsync(string name, int seconds)
{
await Task.Delay(TimeSpan.FromSeconds(seconds));//使用同一线程
//await Task.Run(() => Thread.Sleep(TimeSpan.FromSeconds(seconds)));//使用不同线程
return
$"Task {name} is running on a thread id {CurrentThread.ManagedThreadId}." +
处理异步操作中的异常
避免使用捕获的同步上下文
static async Task<TimeSpan> TestNoContext()
{
const int iterationsNumber = ;
var sw = new Stopwatch();
sw.Start();
for (int i = ; i < iterationsNumber; i++)
{
var t = Task.Run(() => { });
await t.ConfigureAwait(
continueOnCapturedContext: false);
}
sw.Stop();
return sw.Elapsed;
}
使用 async void方法
设计一个自定义的awaitable类型
对动态类型使用await
IAwaiter<string> proxy = Impromptu.ActLike(awaiter); result.GetAwaiter = (Func<dynamic>) ( () => proxy );
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
第六章 使用并发集合
使用concurrentDictionary
使用concurrentQueue实现一步处理
改变concurrentStack异步处理顺序
使用concurrentBag创建一个可扩展的爬虫
使用concurrentCollection进行异步处理
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
第七章 使用PLINQ
使用Parallel类
Parallel.Invoke(
() => EmulateProcessing("Task1"),
() => EmulateProcessing("Task2"),
() => EmulateProcessing("Task3")
);
并行化Linq查询
parallelQuery = from t in GetTypes().AsParallel()
select EmulateProcessing(t); parallelQuery.ForAll(PrintInfo);
调整plinq查询的参数
try
{
parallelQuery
.WithDegreeOfParallelism(Environment.ProcessorCount)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.Default)
.WithCancellation(cts.Token)
.ForAll(WriteLine);
}
处理plinq查询中的异常
try
{
parallelQuery.ForAll(WriteLine);
}
catch (DivideByZeroException)
{
WriteLine("Divided by zero - usual exception handler!");
}
catch (AggregateException e)
{
e.Flatten().Handle(ex =>
{
if (ex is DivideByZeroException)
{
WriteLine("Divided by zero - aggregate exception handler!");
return true;
} return false;
});
}
管理plinq查询中的数据分区
public class StringPartitioner : Partitioner<string>
{
private readonly IEnumerable<string> _data; public StringPartitioner(IEnumerable<string> data)
{
_data = data;
} public override bool SupportsDynamicPartitions => false; public override IList<IEnumerator<string>> GetPartitions(int partitionCount)
{
var result = new List<IEnumerator<string>>(partitionCount); for (int i = ; i <= partitionCount; i++)
{
result.Add(CreateEnumerator(i, partitionCount));
} return result;
} IEnumerator<string> CreateEnumerator(int partitionNumber, int partitionCount)
{
int evenPartitions = partitionCount / ;
bool isEven = partitionNumber % == ;
int step = isEven ? evenPartitions : partitionCount - evenPartitions;
int startIndex = partitionNumber / + partitionNumber % ; var q = _data
.Where(v => !(v.Length % == ^ isEven) || partitionCount == )
.Skip(startIndex - ); return q
.Where((x, i) => i % step == )
.GetEnumerator(); }
}
为plinq查询创建一个自定义的聚合器
var parallelAggregator = parallelQuery.Aggregate(
() => new ConcurrentDictionary<char, int>(),
(taskTotal, item) => AccumulateLettersInformation(taskTotal, item),
(total, taskTotal) => MergeAccumulators(total, taskTotal),
total => total);
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
第八章 RE
将普通集合转换为异步的可观察集合
o = EnumerableEventSequence().ToObservable()
.SubscribeOn(TaskPoolScheduler.Default);
编写自定义的可观察对象
使用Subject
创建可观察的对象
对可观察的集合使用linq查询
使用RX创建异步操作
第九章 使用异步I/O
异步使用文件
using (var stream = File.Create("test3.txt", BUFFER_SIZE, FileOptions.Asynchronous))
using (var sw = new StreamWriter(stream))
{
WriteLine($"3. Uses I/O Threads: {stream.IsAsync}");
await sw.WriteAsync(CreateFileContent());
}
编写一个异步的http服务端和客户端
public async Task Start()
{
_listener.Start(); while (true)
{
var ctx = await _listener.GetContextAsync();
WriteLine("Client connected...");
var response = string.Format(RESPONSE_TEMPLATE, DateTime.Now); using (var sw = new StreamWriter(ctx.Response.OutputStream))
{
await sw.WriteAsync(response);
await sw.FlushAsync();
}
}
}
异步操作数据库
await connection.OpenAsync();
await detachCommand.ExecuteNonQueryAsync();
异步调用wcf服务
第十章 并行编程模式
实现惰性求值的共享状态
class BCLThreadSafeFactory : IHasValue
{
private ValueToAccess _value; public ValueToAccess Value =>
LazyInitializer.EnsureInitialized(ref _value, Compute);
}
class LazyWrapper : IHasValue
{
private readonly Lazy<ValueToAccess> _value; public LazyWrapper(Lazy<ValueToAccess> value )
{
_value = value;
} public ValueToAccess Value => _value.Value;
}
使用blockingcollection实现并行管道
使用tpl数据流实现并行管道
使用plinq实现map/reduce模式
第十一章 更多信息