[译]async/await中使用阻塞式代码导致死锁 百万数据排序:优化的选择排序(堆排序)

[译]async/await中使用阻塞式代码导致死锁

这篇博文主要是讲解在async/await中使用阻塞式代码导致死锁的问题,以及如何避免出现这种死锁。内容主要是从作者Stephen Cleary的两篇博文中翻译过来.

原文1:Don'tBlock on Async Code

原文2:why the AspNetSynchronizationContext was removed

示例代码:async_await中使用阻塞式代码导致死锁.rar

一、async/await 异步代码运行流程

async/await是在.NET4.5版本引入的关键字,让开发者可以更加轻松的创建异步方法。

我们从下图来认识async/await的运行流程:

二、在异步代码中阻塞,导致死锁的示例

UI 示例

单击一个按钮,将发起一个REST远程请求并且将结果显示到textbox控件上。(这是一个Windows Forms程序,同样也适用于其他任何UI应用程序)

// My "library" method.

public static async Task<JObject> GetJsonAsync(Uri uri)

{

using (var client = new HttpClient())

{

var jsonString = await client.GetStringAsync(uri);

return JObject.Parse(jsonString);

}

}

// My "top-level" method.

public void Button1_Click(...)

{

var jsonTask = GetJsonAsync(...);

textBox1.Text = jsonTask.Result;

}

类库方法GetJsonAsync发起REST远程请求并且将结果解析为JSON返回。Button1_Click方法调用Task .Result阻塞等待GetJsonAsync处理完毕并显示结果

这段代码会死锁。

ASP.NET 示例

在类库方法GetJsonAsync中发起一个REST远程请求,这次这个GetJsonAsync在ASP.NET context中被调用。(示例是Web API项目,同样适用于任何一个ASP.NET应用程序 - 注:非ASP.NET Core应用)

// My "library" method.

public static async Task<JObject> GetJsonAsync(Uri uri)

{

using (var client = new HttpClient())

{

var jsonString = await client.GetStringAsync(uri);

return JObject.Parse(jsonString);

}

}

// My "top-level" method.

public class MyController : ApiController

{

public string Get()

{

var jsonTask = GetJsonAsync(...);

return jsonTask.Result.ToString();

}

}

这段代码也会死锁。与UI示例是同一个原因。

三、是什么原因导致的死锁呢?

await一个Task后,在恢复继续执行时,会试图进入await之前的context。

第一个示例中,这个context是UI context(任何UI应用,除了控制台应用)。在第二个示例中,这个context是ASP.NET request context。

另一个需要注意的点:ASP.NET request context 没有绑定到特定的线程上(像UI context一样),但是request context同一时刻只允许被绑定到一个线程上。我曾经在MSDN上有发表过《关于SynchronzationContext》的文章.

死锁是怎么发生的呢?我们从top-level方法开始(UI的Button1_Click方法或ASP.NET的MyContoller.Get方法)

  1. top-level方法调用GetJsonAsync(在UI/ASP.NET context中)。
  2. GetJsonAsync通过HttpClient.GetStringAsync发起REST远程请求(在UI/ASP.NET context中)。
  3. GetStringAsync返回一个未完成Task,标识REST远程请求还未处理完
  4. GetJsonAsync方法中await GetStringAsync返回的未完成Task。等Task执行完毕,

会重新捕获等待之前的context并使用它继续执行GetJsonAsync。

  1. GetJsonAsync中await后,携带context的线程会跳出GetJsonAsync方法,继续执行后面的代码。并在jsonTask.Result发生阻塞。此时携带context的线程被阻塞了。
  2. 最终,REST请求处理完,GetStringAsync返回的Task处理完成。
  3. GetJsonAsync方法准备继续运行,并且等待context可用,以便在context中执行。
  4. 发生context死锁。在top-level方法中已经阻塞了携带context的线程,等待GetJsonAsync返回的Task完成。而此时,GetJsonAsync方法正在等待context被释放,以便在context中继续执行。

四、防止死锁

这里有三个最佳实践来避免这种死锁(更详细的传送门)。

  1. 在你的”library”异步方法中,返回未完成Task时都调用ConfigureAwait(false)。
  2. 始终使用 Async,不要混合阻塞式代码和异步代码。
  3. ASP.NET 升级为ASP.NET Core。在ASP.NET Core框架中,已经移除SynchronizationContext

按照第一条最佳实践,”library”中的异步方法修改如下:

public static async Task<JObject> GetJsonAsync(Uri uri)

{

using (var client = new HttpClient())

{

var jsonString = await client.GetStringAsync(uri).ConfigureAwait(false);

return JObject.Parse(jsonString);

}

}

ConfigureAwait(continueOnCapturedContext: false):continueOnCapturedContext参数表示是否尝试将延续任务封送回原始上下文。

ConfigureAwait(false)改变了GetJsonAsync的延续行为,使它不用在原来的context中恢复。GetJsonAsync将直接在线程池线程中恢复,这使得GetJsonAsync能完成任务,并且无需重新进入原来的context

按照第二条最佳实践。修改”top-level”方法如下:

public async void Button1_Click(...)

{

var json = await GetJsonAsync(...);

textBox1.Text = json;

}

public class MyController : ApiController

{

public async Task<string> Get()

{

var json = await GetJsonAsync(...);

return json.ToString();

}

}

这样修改,改变了top-level方法的阻塞行为。所有的”等待”都是”异步等待”,这样context就不会被阻塞。

其他”异步等待”指导原则:

执行以下操作…

不要使用以下方式…

使用以下方式

创建Task

Task constructor

Task.Run or TaskFactory.StartNew

检索后台任务的结果

Task.Wait 或 Task.Result

await

等待任何任务完成

Task.WaitAny

await Task.WhenAny

检索多个任务的结果

Task.WaitAll

await Task.WhenAll

等待一段时间

Thread.Sleep

await Task.Delay

五、在ASP.NET Core框架中,已经移除SynchronizationContext

为什么AspNetSynchronizationContext在ASP.NET Core中被移除。尽管我不知道ASP.NET团队内部是什么观点,但我认为的观点是两个:性能和简单。

性能方面:

在没有SynchronizationContext的ASP.NET Core中,当一个async/await异步处理恢复执行时,会从线程池中获取一个线程并且执行继续操作。

  1. 避免了把操作排队到request context队列(request context同一时刻只允许被绑定到一个线程上)
  2. 避免了因为携带 request context 的线程被阻塞而发生“死锁”
  3. 不需要重新进入request context(重新进入request context涉及到很多内部作业任务,例如:设置HttpContext.Current和当前线程的身份标识(identity)和语言(culture))

简单化:

旧版本ASP.NET中SynchronizationContext工作的很好,但是也存在棘手的问题,特别是在身份管理方面(参考:Thread.CurrentPrincipal VS HttpContext.Current.User)

六、async/await避免阻塞式死锁最佳实践

  1. 在你的“library”异步方法中,返回未完成Task时都调用ConfigureAwait(false),标识不需要将延续任务封送回原始上下文。

尽管在ASP.NET Core中,不用再调用ConfigureAwait(false)来防止死锁。但由于你提供的“类库”可能被用于UI 应用(eg:winform、wpf等)、旧版本ASP.NET应用、其他还存在context的应用。

  1. 更好的解决方案是“始终使用 async,不要混合阻塞式代码和异步代码”。

因为当你在异步代码中阻塞程序,将失去异步代码带来的所有好处。并且异步代码释放当前线程带来的伸缩性也会失效。

  1. 将ASP.NET项目升级为ASP.NET Core项目

推荐阅读:

Async/Await FAQ(原文##译文

Stephen Cleary 的开源组件:AsyncEx

Async/Await 异步编程中的最佳做法(原文##译文

(xishuai)ASP.NET混合阻塞式代码和异步代码,线程切换变化

百万数据排序:优化的选择排序(堆排序)

前一篇给大家介绍了《必知必会的冒泡排序和快速排序(面试必知)》,现在继续介绍排序算法

本博文介绍首先介绍直接选择排序,然后针对直接选择排序的缺点改进的“堆排序”,堆排序非常适合:数组规模非常大(数百万或更多) +严格要求辅助空间的场景。

直接选择排序

(一)概念及实现

直接选择排序的原理:将整个数组视为虚拟的有序区和无序区,重复的遍历数组,每次遍历从无序区中选出一个最小(或最大)的元素,放在有序区的最后,每一次遍历排序过程都是有序区元素个数增加,无序区元素个数减少的过程,直到无序区元素个数位0。

具体如下(实现为升序)

设数组为a[0…n-1]。

  1. 将原序列分成有序区和无序区。a[0…i-1]为有序区,a[i…n-1]为无序区。初始化有序区为0个元素。
  2. 遍历无序区元素,选出最小元素,放在有序区序列最后(即与无序区的第一个元素交换)
  3. 重复步骤2,直到无序区元素个数为0。

实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void Sort<T>(IList<T> arr) where T : IComparable<T>
{
    if (arr == null)
        throw new ArgumentNullException("arr");
 
    int length = arr.Count();
    if (length > 1)
    {
        int minValueIndex = 0;
        T minValue = default(T);
 
        // 循环length - 2次,最后一个元素无需再比较
        for (int i = 0; i < length - 1; i++)
        {
            minValueIndex = i;
            minValue = arr[i];
 
            // 内部循环,查找本次循环的最小值
            for (int j = i + 1; j < length; j++)
            {
                if (minValue.CompareTo(arr[j]) > 0)
                {
                    minValueIndex = j;
                    minValue = arr[j];
                }
            }
 
            if (minValueIndex == i)
                continue;
 
            // 交换:将本次循环选出的最小值,顺序放在有序区序列的最后(即与无序区的第一个元素交换)
            arr[minValueIndex] = arr[i];
            arr[i] = minValue;
        }
 
    }
}

示例:

89,-7,999,-89,7,0,-888,7,-7

排序的过程:

[-888]  [-7  999  -89  7  0  89  7  -7]

[-888  -89]  [999  -7  7  0  89  7  -7]

[-888  -89  -7]  [999  7  0  89  7  -7]

[-888  -89  -7  -7]  [7  0  89  7  999]

……

……

[-888  -89  -7  -7  0  7  7  89  999] []

(二)算法复杂度

  1. 时间复杂度:O(n^2)

直接选择排序耗时的操作有:比较 + 交换赋值。时间复杂度如下:

1)        最好情况:序列是升序排列,在这种情况下,需要进行的比较操作需n(n-1)/2次。交换赋值操作为0次。即O(n^2)

2)        最坏情况:序列是降序排列,那么此时需要进行的比较共有n(n-1)/2次。交换赋值n-1 次(交换次数比冒泡排序少多了),直接选择排序的效率比较稳定,最好情况和最坏情况差不多。即O(n^2)

3)        渐进时间复杂度(平均时间复杂度):O(n^2)

  1. 空间复杂度:O(1)

从实现原理可知,直接选择插入排序是在原输入数组上进行交换赋值操作的(称“就地排序”),所需开辟的辅助空间跟输入数组规模无关,所以空间复杂度为:O(1)

(三)稳定性

直接选择排序是不稳定的。

因为每次遍历比较完后会使用本次遍历选择的最小元素和无序区的第一个元素交换位置,所以如果无序区第一个元素后面有相同元素的,则可能会改变相同元素的相对顺序。

(四)优化改进

  1. 相同元素:如果数组元素重复率高,可以考虑使用辅助空间在每一次循环的时候,将本次选择的数及相同元素的索引记录下来,一起处理。
  2. 堆排序:直接选择排序中,为了从a[0..n-1]中选出关键字最小的记录,必须进行n-1次比较,然后在a[1..n-1]中选出关键字最小的记录,又需要做n-2次比较。事实上,后面的n-2次比较中,有许多比较可能在前面的n-1次比较中已经做过,但由于前一趟排序时未保留这些比较结果,所以后一趟排序时又重复执行了这些比较操作。堆排序可通过树形结构保存部分比较结果,可减少比较次数。(这种效果在数组规模越大越能体现效果)

堆排序

(一)概念及实现

堆排序(Heapsort)的原理:是指利用“二叉堆”这种数据结构所设计的一种排序算法,可以利用数组的特点快速定位指定索引的元素。

  1. 二叉堆

是完全二叉树或者是近似完全二叉树,它有两种形式:最大堆(大顶堆、大根堆)和最小堆(小顶堆、小根堆)。

  1. 二叉堆满足二个特性

1)        父结点的键值总是大于或等于(小于或等于)任何一个子节点的键值。

2)        每个结点的左子树和右子树都是一个二叉堆(最大堆或最小堆)。

  1. 二叉堆一般用数组来表示

如果根节点在数组中的位置是0,第n个位置的子节点分别在2n+1和 2n+2,其父节点的下标是 (n-1)/2 。

  1. 示例

原数组:

初始化为最大堆:

具体如下(实现为升序)

设数组为a[0…n-1]。

  1. 将原序列分成有序区和无序区。a[0…i-1]为无序区,a[i…n-1]为有序区。初始化有序区为0个元素。
  2. (从下往上)从数组最后一个根节点开始 (maxIndex - 1)/2 ,将原数组初始化为最大堆。(如上图)
  3. (从上往下)将堆顶元素与无序区的最后一个元素交换(即插入有序区的第一个位置),将剩余的无序区元素重建最大堆。
  4. 重复步骤3,每一次重复都是有序区元素个数增加,无序区元素个数减少的过程,直到无序区元素个数位0

实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/// <summary>
/// 堆排序
/// </summary>
public class Heap
{
    public static void Sort<T>(IList<T> arr) where T : IComparable<T>
    {
        if (arr == null)
            throw new ArgumentNullException("arr");
 
        int length = arr.Count();
        if (length > 1)
        {
            // 1、初始化最大堆
            InitMaxHeap<T>(arr, length - 1);
 
            // 2、堆排序
            // 将堆顶数据与末尾数据交换,再将i=N-1长的堆调整为最大堆;不断缩小待排序范围直到,无序区元素为0。
            for (int i = length - 1; i > 0; i--)
            {
                // 2.1 将堆顶数据与末尾数据交换
                Swap<T>(arr, 0, i);
                // 2.2 缩小数组待排序范围 i - 1 ,重新调整为最大堆
                AdjustMaxHeap<T>(arr, 0, i - 1);
            }
        }
    }
 
    /// <summary>
    /// 构建最大堆  (还未进行排序)
    /// </summary>
    /// <param name="arr">待排序数组</param>
    /// <param name="maxIndex">待排序数组最大索引</param>
    private static void InitMaxHeap<T>(IList<T> arr, int maxIndex) where T : IComparable<T>
    {
        // 从完全二叉树最后一个非叶节点 :
        // 如果根节点在数组中的位置是0,第n个位置的子节点分别在2n+1和 2n+2,其父节点的下标是 (n-1)/2 。
        for (int i = (maxIndex - 1) / 2; i >= 0; i--)
        {
            AdjustMaxHeap<T>(arr, i, maxIndex);
        }
    }
 
    /// <summary>
    /// 调整指定父节点的二叉树为最大堆
    /// </summary>
    /// <param name="arr">待排序数组</param>
    /// <param name="parentNodeIndex">指定父节点</param>
    /// <param name="maxIndex">待排序数组最大索引</param>
    private static void AdjustMaxHeap<T>(IList<T> arr, int parentNodeIndex, int maxIndex)
        where T : IComparable<T>
    {
        if (maxIndex > 0)   // 只有堆顶一个元素,就不用调整了
        {
            int resultIndex = -1;
            // 下标为i的节点的子节点是2i + 1与2i + 2
            int leftIndex = 2 * parentNodeIndex + 1;
            int rightIndex = 2 * parentNodeIndex + 2;
            if (leftIndex > maxIndex)
            {
                // 该父节点没有左右子节点
                return;
            }
            else if (rightIndex > maxIndex)
                resultIndex = leftIndex;
            else
                // 比较左右节点。
                resultIndex = Max<T>(arr, leftIndex, rightIndex);
 
            // 父节点与较大的子节点进行比较
            resultIndex = Max<T>(arr, parentNodeIndex, resultIndex);
 
            if (resultIndex != parentNodeIndex)
            {
                // 如果最大的不是父节点,则交换。
                Swap<T>(arr, parentNodeIndex, resultIndex);
                // 交换后子树可能不是最大堆,所以需要重新调整交换元素的子树
                AdjustMaxHeap<T>(arr, resultIndex, maxIndex);
            }
        }
    }
 
    /// <summary>
    /// 获取较大数的数组索引
    /// </summary>
    /// <param name="arr">待排序数组</param>
    /// <param name="leftIndex">左节点索引</param>
    /// <param name="rightIndex">右节点索引</param>
    /// <returns>返回较大数的数组索引</returns>
    private static int Max<T>(IList<T> arr, int leftIndex, int rightIndex) where T : IComparable<T>
    {
        // 相等,以左节点为大
        return arr[leftIndex].CompareTo(arr[rightIndex]) >= 0 ? leftIndex : rightIndex;
    }
 
    /// <summary>
    /// 数组元素交换
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="arr">数组</param>
    /// <param name="i">交换元素1</param>
    /// <param name="j">交换元素2</param>
    private static void Swap<T>(IList<T> arr, int i, int j)
    {
        T temp = arr[i];
        arr[i] = arr[j];
        arr[j] = temp;
    }
}

示例:

89,-7,999,-89,7,0,-888,7,-7

排序的过程:

初始化最大堆

将堆顶元素999移到有序区过程:(红色为需要调节的元素,黄色为有序区元素)

同理,(再将堆顶元素89移到有序区,即与-89交换。)我们不断缩小无序区的范围,扩大有序区的元素,最后结果如下:

(二)算法复杂度

  1. 时间复杂度:O(nlog2n)

堆排序耗时的操作有:初始堆 + 反复调整堆。时间复杂度如下:

1)        初始堆(从下往上):每个父节点会和左右子节点进行最多2次比较和1次交换,所以复杂度跟父节点个数有关。根据2^x<=n(x为n个元素可以折半的次数,也就是父节点个数),得出x = log2n。即O(log2n)

2)        反复调整堆(从上往下):由于初始化堆过程中,会记录数组比较结果,所以堆排序对原序列的数组顺序并不敏感,最好情况和最坏情况差不多。需要抽取 n-1 次堆顶元素,每次取堆顶元素都需要重建堆(O(重建堆) < O(初始堆))。所以小于 O(n-1) * O(log2n)

3)        渐进时间复杂度(平均时间复杂度):O(nlog2n)

4)        使用建议:由于初始化堆需要比较的次数较多,因此,堆排序比较适合于数据量非常大的场合(百万数据或更多)。并且在由于高效的快速排序是基于递归实现的,所以在数据量非常大时会发生堆栈溢出错误。

  1. 空间复杂度:O(1)

从实现原理可知,堆排序是在原输入数组上进行交换赋值操作的(称“就地排序”),所需开辟的辅助空间跟输入数组规模无关,所以空间复杂度为:O(1)

(三)稳定性

堆排序是不稳定的。

因为在初始化堆时,相同元素可能被分配到不同的父节点下,所以在反复调整堆过程中,可能会改变相同元素的相对顺序。

性能测试

测试步骤:

  1. 随机生成10个测试数组。
  2. 每个数组中包含5000个元素。
  3. 对这个数组集合进行本博文中介绍的两种排序。(另外加入快速排序测试结果《快速排序源码在这》
  4. 重复执行1~3步骤。执行20次。
  5. 部分顺序测试用例:顺序率5%。

共测试 10*20 次,长度为5000的数组排序

参数说明:

(Time Elapsed:所耗时间。CPU Cycles:CPU时钟周期。Gen0+Gen1+Gen2:垃圾回收器的3个代各自的回收次数)

从这个比较结果看:快速排序的性能幅度较大。而堆排序对原数组的序列不敏感,所以效率稳定性很高。

更加详细的测试报告以及整个源代码,会在写完基础排序算法后,写一篇总结性博文分享。

评论中讨论的问题

  1. 关于log2n、logn、lgn的讨论

感谢讨论参与园友:@assiwe@梦烬

讨论结果我直接引用“《算法复杂度分析》”中的结论:

1)        注1:快速的数学回忆,logab = y 其实就是 a^y = b。所以,log24 = 2,因为 22 = 4。同样 log28 = 3,因为 23 = 8。我们说,log2n 的增长速度要慢于 n,因为当 n = 8 时,log2n = 3。

2)        注2:通常将以 10 为底的对数叫做常用对数。为了简便,N 的常用对数 log10 N 简写做 lg N,例如 log10 5 记做 lg 5。

3)        注3:通常将以无理数 e 为底的对数叫做自然对数。为了方便,N 的自然对数 loge N 简写做 ln N,例如 log3 记做 ln 3。

4)        注4:在算法导论中,采用记号 lg n = logn ,也就是以 2 为底的对数。改变一个对数的底只是把对数的值改变了一个常数倍,所以当不在意这些常数因子时,我们将经常采用 "lg n"记号,就像使用 O 记号一样。计算机工作者常常认为对数的底取 2 最自然,因为很多算法和数据结构都涉及到对问题进行二分。

喜欢这个系列的小伙伴,还请多多推荐啊……

求助……非常不明白,这个问题我弄了两个晚上,也不清楚问题出在哪。这个问题不解决,后续的大数据性能测试没办法做……

  1. 第一组非部分排序的数据可以正常跑
  2. “部分排序”的排序率为5%,所以有非常多数据是排序好的。(测试中排序率小就不会出现这个问题,比如改为0.05%)
  3. 从递归次数来看,第一组跑完的递归次数比“部分排序”抛出异常时的递归次数多很多,但是第一组没有报错
  4. 自己检查代码没有发现死循环
  5. 直接快速排序和平衡排序会报错。。随机排序不会报错。所以问题是基准值选取的问题,但是看了理论上基准值代码是对的。是和数组特征冲突导致的堆栈溢出,但是我没查出问题。。。。
  6. 解决方案在这下载:快速排序,堆栈溢出,问题项目.rar

然后,我再将生成数组集合个数和循环执行次数都设置为1,将单个数组元素个数设置为1000000(一百万)。

上一篇:Docker 安装部署RabbitMQ


下一篇:GlassFish 任意文件读取漏洞