使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

众所周知垂直扩展是提升单机的性能的方式,比如提升双路、四路的CPU运算能力,加大内存,更换速度更快的SSD,或者从代码根本上进行优化和性能提升。水平扩展是提供多台多种服务器分离单机性能的方式,比如集群,主从,队列,负载平衡等等。

 

 白话的垂直扩展

现在服务器都是云服务器,单纯从单机的硬件性能提升整体性能,可能已经不太适用,而从代码上,其实还有些功课可以做,即使不多:

  1. 优化多线程协调模式,优化多线程下资源共享问题,以免出现奇怪的运行时错误。
  2. 改同步为异步:此方法提升的是吞吐率,性能并不能提升,不过对于客户端响应也算是件好事吧。
  3. 使用磁盘预读模式,极小幅度提升IO性能。
  4. 使用单机任务队列,强制任务有序进行:此方法在单机上不会提升性能,甚至会减少原本的单机吞吐率,但是却能保证任务在同一时刻的完整性。

我们先从垂直扩展中压榨单机的性能,同时还要保证稳定性,甚至稳定性比单机极限性能更加重要,为何?因为多线程(web服务器都是多线程模型)资源互斥问题,会让你查找问题的时候抓狂(当然,如果你要访问的资源只是单个,就另当别论了)。因此,很多时候我们通常会加锁来避免这类事情发生(锁的问题和功能我们这里不讨论),虽然牺牲了性能,但却换来了每次高请求所带来的稳定性。

其次,我们知道,web服务器都属于多线程模型,这样设计的目的是为了提高该服务器的整体吞吐量(不同服务器语言采用不同的线程开辟模式,例如java使用的是系统级的线程),当一个线程正在接近满负荷的处理当前的任务,紧接着马上又来一个请求(系统不会因为当前正在运行任务而终止新的请求),那么将是雪上加霜的,多个任务同时长时间在抢占同一个CPU资源,无疑是对整体影响甚大的。

言归正传,我们在单机上面针对这类问题,既要尽可能的减少处理时间,又要绝对保证整体运行期间的稳定性(后期会介绍如何使用熔断机制提升多台服务器系统的整体稳定性)。

上一节,我们已经创建了一个同步的接口,下面我们将这个接口稍作改动,使其成为包含异步任务方法的接口,整体代码就不贴上来了,以免影响篇幅

  1. 如果你喜欢手动创建与管理任务,那么你可以new一个Task<TResult>实例。
  2. 如果你喜欢让系统为你管理该任务状态,那么你可以Task.Factary.StartNew来新建一个实例。

 

 白话的水平扩展

当一条街道上的小区越来越多,用水越来越大,而住户反应水压却越来越小,你是考虑增加主管道通水量大小、还是考虑增加每个小区的增压泵的功率、还是考虑增加主管道的数量(目前笔者小区就遇到水压不够的情况)。

在软件工程项目中,其实服务器TPS跟水压是同一个概念。

  1. 加大主水管道(如同提升CPU、内存)始终会有一个极限;
  2. 增加每个小区增压泵的功率(如同客户端使用大量的轮询,可主管道出口就只有那么点点量)始终要求比得到的多得多;

因此换句话说:

  1. 增加服务器数量(毕竟服务器比自来水厂容易建设:-)),提升管道入口的处理能力;
  2. 增加不同服务器类型,例如队列服务器,负载服务器,缓存服务器等等中间服务器,分摊和分离不同功能分到而行,如同主管道的分流阀,节流阀,增压泵等等;
  3. 增加带宽(这个是肯定的,提升TPS带宽肯定也是主要的);

 

一:点对点——原装

当然,如果接口已经成为了异步模式(本质其实是提前返回请求,但并没有返回请求所处理的结果),那么还需要一个接口来告诉客户端处理的结果,客户端通过该接口的轮询获得实时的结果。

在笔者介绍的这个服务中,流程架构如下:

  非常简单的点对点模式,用户请求一次,等待服务器处理响应完成后释放,所有内容均采用同步方式进行,得到结果是:使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

 非常简单的点对点模式,用户请求一次,等待服务器处理响应完成后释放,所有内容均采用同步方式进行,得到结果是:

用户等待时间 = 服务器处理时间

如果用户上传10秒,而服务器处理需要4秒,那么这个等待对于用户来说,是极为煎熬的。

也许聪明的你会说,在客户端给个友好的提示,比如让一个“风火轮”不停的转动,当处理完成后隐藏掉。的确,这从另一个角度上看确实也行得通(比如成本因素),但我们不讨论用户的视觉和等待等感觉上的东西,只讨论从技术上如何让这个响应时间更快,能快到几乎让用户察觉不到。

 

二:使用并行任务——小幅提升

单机并行模式大家应该都明白,毕竟现在CPU都是多核的了,干嘛要让其他CPU闲着呢,不管是JAVA还是C#,目前主流语言都可以完美的执行并行任务(python开多进程其实也算),各种语法请自行Google,既然文章标题是Net,那么笔者就少量的复制一下C#的代码。

Parallel.Invoke(() => { },() => { });

 

哇撒,真的很少,就是C#中的一个并行执行的语句而已,自己需要并行执行的代码放入花括号中就行,换成流程结构图如下:

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

画的很搓,欢迎拍砖。

笔者采用的CPU是I7-2700K,并行任务状态确实使用了起来,但减少时间却只有1秒,很不可思议,或许是笔者的代码优化不够好吧(并行原理和理想结果为何有出入请自行Google),所以就不毛遂自荐的贴上来了:-),但是这个3秒时间我会跟他死磕到底——用户不能等。

 

三:分离用户请求和耗时处理——异步

 当朋友们看到这里的时候,或许心里早就想到用异步的方式来实现C/S的接口请求了,对,但我们还是需要走一下流程,梳理一下思路。请继续接着看。

异步其实就是多线程,只是目前由于高级语言的发展,已经将线程的难点给隐藏掉了,在一个请求主线程中,新建一个异步线程(或任务),分离主线程的长时间处理耗时,将这块难啃的骨头交给子线程去做,自己只管轻松的执行到return,是的确很舒服哦(笔者也梦想拥有这样的码砖方式,o(∩_∩)o 哈哈),new一个线程我们不做介绍,毕竟他的管理模式是纯手动的、并且是复杂的,我们只介绍new一个任务来分离主线程之间的关联。

正如之前提到,微软巴巴已经将这种模型给封装好了,只需要在接口处理函数内、将处理模块塞进Task中即可,不用再去new一个线程、管理这个线程的状态、什么时候调度、什么时候阻塞等等一些较底层的操作。万事有好必有坏,多线程模型创建是很简单了,相应的实现细节对于很多入门的朋友就看不懂了。

不过,当一个对外接口(或者内部函数)采用异步模式,那么调用端也需要进行轮询(异步同步无所谓,看调用如何实现)处理结果,这个模式相比原来常规同步复杂许多,需要建立任务、执行任务、存储任务状态和结果等等,不废话,上图:

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

通过将“服务器处理耗时”进行分离,请求主线程只需要将相应的参数传递给子线程(或任务),主线程就直接返回到客户端,如果忽略子任务之前的逻辑时间复杂度,完全可以达到瞬间返回到客户端,具体时长根据不同的平台和架构不同而不同,正如之前国外有人对NET CORE和GO进行过空业务响应对比(具体链接得找找),在请求数高于100W(包括并发)和没有任何逻辑代码的前提下,直接请求某个接口,NET CORE只比GO慢了近40ms(相同单机)。这样的性能还是非常看好的。

另外,如果处理时间过长,而且子任务不能及时返回,那将产生越来越多的任务阻塞,毕竟一个CPU是有极限的,并且伴随着或多或少的运行时错误,而这种错误是最让我们程序员头疼的,因此,这时我们需要加入单机队列,来限制和防止处理和请求达到瞬时波峰,文章结尾提供一份单机队列的代码供大家参考:

实际证明,这样对于用户来说,是瞬间的,不用等待的,极大的提升了用户体验。不过呢,如果请求数越多,那么越后进来的请求,等待的时间将越长,对于客户端轮询的时间也将变得更长。

轮询时间 = 请求数(单机队列数) * 单个处理耗时时间

好像比原来的点对点更糟糕了,实际我们根据这个架构进行扩展,将得到更好的体验,请继续接着看。

 

四:让多台机器一起工作吧——集群

先看张图:

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?

哇塞,一下子变得这么复杂,好捉急啊。其实并不难理解,我们来看一看做了哪些变化:

  1. 单机的队列扩展为了使用服务器做队列集群;
  2. 增加调度任务;
  3. 将多个处理服务分配到多台机器上运行;
  4. 单机缓存增加到缓存集群;

其他也就没什么花头了。当请求任务过高,放入队列中,分离前级请求和后级处理,后级处理服务器的数量将直接影响整个平台的异步处理时间。如果非要对比单机模式,性能是随处理服务器的数量增加而提高的。下一节我们将详细讨论这套架构方案。

 

感谢阅读!

 

(附上单机队列的实现,仅供参考)

使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?使用.NET Core搭建分布式音频效果处理服务(四)选择垂直扩展还是水平扩展?
  1 /// <summary>
  2 /// 异步任务队列
  3 /// </summary>
  4 public class AsyncTaskQueue : IDisposable
  5 {
  6     private bool _isDisposed;
  7     private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
  8     private Thread _thread;
  9     private AutoResetEvent _autoResetEvent;
 10 
 11     /// <summary>
 12     /// 异步任务队列
 13     /// </summary>
 14     public AsyncTaskQueue()
 15     {
 16         _autoResetEvent = new AutoResetEvent(false);
 17         _thread = new Thread(InternalRuning) {IsBackground = true};
 18         _thread.Start();
 19     }
 20 
 21     private bool TryGetNextTask(out AwaitableTask task)
 22     {
 23         task = null;
 24         while (_queue.Count > 0)
 25         {
 26             if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) return true;
 27             task.Cancel();
 28         }
 29 
 30         return false;
 31     }
 32 
 33     private AwaitableTask PenddingTask(AwaitableTask task, int maxQueueCount = 1000)
 34     {
 35         lock (_queue)
 36         {
 37             if (_queue.Count >= maxQueueCount)
 38             {
 39                 throw new Exception($"超出最大队列数量,maxQueueCount={maxQueueCount}");
 40             }
 41 
 42             Debug.Assert(task != null);
 43             _queue.Enqueue(task);
 44             _autoResetEvent.Set();
 45         }
 46 
 47         return task;
 48     }
 49 
 50     private void InternalRuning()
 51     {
 52         while (!_isDisposed)
 53         {
 54             if (_queue.Count == 0)
 55             {
 56                 _autoResetEvent.WaitOne();
 57             }
 58 
 59             while (TryGetNextTask(out var task))
 60             {
 61                 if (task.IsCancel) continue;
 62 
 63                 if (UseSingleThread)
 64                 {
 65                     task.RunSynchronously();
 66                 }
 67                 else
 68                 {
 69                     task.Start();
 70                 }
 71             }
 72         }
 73     }
 74 
 75     /// <summary>
 76     /// 是否使用单线程完成任务.
 77     /// </summary>
 78     public bool UseSingleThread { get; set; } = true;
 79 
 80     /// <summary>
 81     /// 自动取消以前的任务。
 82     /// </summary>
 83     public bool AutoCancelPreviousTask { get; set; } = false;
 84 
 85     /// <summary>
 86     /// 执行任务
 87     /// </summary>
 88     /// <param name="action"></param>
 89     /// <param name="maxQueueCount"></param>
 90     /// <returns></returns>
 91     public AwaitableTask Run(Action action, int maxQueueCount = 1000)
 92         => PenddingTask(new AwaitableTask(new Task(action, new CancellationToken(false))), maxQueueCount);
 93 
 94     /// <summary>
 95     /// 执行任务
 96     /// </summary>
 97     /// <typeparam name="TResult"></typeparam>
 98     /// <param name="function"></param>
 99     /// <param name="maxQueueCount"></param>
100     /// <returns></returns>
101     public AwaitableTask<TResult> Run<TResult>(Func<TResult> function, int maxQueueCount = 1000)
102         => (AwaitableTask<TResult>) PenddingTask(new AwaitableTask<TResult>(new Task<TResult>(function)),
103             maxQueueCount);
104 
105 
106     /// <inheritdoc />
107     public void Dispose()
108     {
109         Dispose(true);
110         GC.SuppressFinalize(this);
111     }
112 
113     /// <summary>
114     /// 析构任务队列
115     /// </summary>
116     ~AsyncTaskQueue() => Dispose(false);
117 
118     private void Dispose(bool disposing)
119     {
120         if (_isDisposed) return;
121         if (disposing)
122         {
123             _autoResetEvent.Dispose();
124         }
125 
126         _thread = null;
127         _autoResetEvent = null;
128         _isDisposed = true;
129     }
130 
131     /// <summary>
132     /// 可等待的任务
133     /// </summary>
134     public class AwaitableTask
135     {
136         private readonly Task _task;
137 
138         /// <summary>
139         /// 初始化可等待的任务。
140         /// </summary>
141         /// <param name="task"></param>
142         public AwaitableTask(Task task) => _task = task;
143 
144         /// <summary>
145         /// 任务的Id
146         /// </summary>
147         public int TaskId => _task.Id;
148 
149         /// <summary>
150         /// 任务是否取消
151         /// </summary>
152         public bool IsCancel { get; private set; }
153 
154         /// <summary>
155         /// 开始任务
156         /// </summary>
157         public void Start() => _task.Start();
158 
159         /// <summary>
160         /// 同步执行开始任务
161         /// </summary>
162         public void RunSynchronously() => _task.RunSynchronously();
163 
164         /// <summary>
165         /// 取消任务
166         /// </summary>
167         public void Cancel() => IsCancel = true;
168 
169         /// <summary>
170         /// 获取任务等待器
171         /// </summary>
172         /// <returns></returns>
173         public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
174 
175         /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
176         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
177         public struct TaskAwaiter : INotifyCompletion
178         {
179             private readonly AwaitableTask _task;
180 
181             /// <summary>
182             /// 任务等待器
183             /// </summary>
184             /// <param name="awaitableTask"></param>
185             public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;
186 
187             /// <summary>
188             /// 任务是否完成.
189             /// </summary>
190             public bool IsCompleted => _task._task.IsCompleted;
191 
192             /// <inheritdoc />
193             public void OnCompleted(Action continuation)
194             {
195                 var This = this;
196                 _task._task.ContinueWith(t =>
197                 {
198                     if (!This._task.IsCancel) continuation?.Invoke();
199                 });
200             }
201 
202             /// <summary>
203             /// 获取任务结果
204             /// </summary>
205             public void GetResult() => _task._task.Wait();
206         }
207     }
208 
209     /// <summary>
210     /// 可等待的任务
211     /// </summary>
212     /// <typeparam name="TResult"></typeparam>
213     public class AwaitableTask<TResult> : AwaitableTask
214     {
215         /// <summary>
216         /// 初始化可等待的任务
217         /// </summary>
218         /// <param name="task">需要执行的任务</param>
219         public AwaitableTask(Task<TResult> task) : base(task) => _task = task;
220 
221 
222         private readonly Task<TResult> _task;
223 
224         /// <summary>
225         /// 获取任务等待器
226         /// </summary>
227         /// <returns></returns>
228         public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
229 
230         /// <summary>
231         /// 任务等待器
232         /// </summary>
233         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
234         public new struct TaskAwaiter : INotifyCompletion
235         {
236             private readonly AwaitableTask<TResult> _task;
237 
238             /// <summary>
239             /// 初始化任务等待器
240             /// </summary>
241             /// <param name="awaitableTask"></param>
242             public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
243 
244             /// <summary>
245             /// 任务是否已完成
246             /// </summary>
247             public bool IsCompleted => _task._task.IsCompleted;
248 
249             /// <inheritdoc />
250             public void OnCompleted(Action continuation)
251             {
252                 var This = this;
253                 _task._task.ContinueWith(t =>
254                 {
255                     if (!This._task.IsCancel) continuation?.Invoke();
256                 });
257             }
258 
259             /// <summary>
260             /// 获取任务结果
261             /// </summary>
262             /// <returns></returns>
263             public TResult GetResult() => _task._task.Result;
264         }
265     }
266 }
View Code

 

上一篇:使用.NET Core搭建分布式音频效果处理服务(六)让Middleware自动Invoke


下一篇:使用.NET Core搭建分布式音频效果处理服务(一)需求、问题和解决方案的几个坑