为什么我的基数排序python实现比快速排序慢?

我使用SciPy的数组重写了Wikipedia中用于Python的原始基数排序算法,以提高性能并减少代码长度,这是我设法实现的.然后,我采用了Literate Programming中的经典(基于内存,基于透视图的)快速排序算法,并比较了它们的性能.

我曾期望基数排序将比快速排序超出某个阈值,但事实并非如此.此外,我发现Erik Gorset’s Blog’s问一个问题:“对于整数数组,基数排序是否快于快速排序?”.答案是

.. the benchmark shows the MSB in-place radix sort to be consistently over 3 times faster than quicksort for large arrays.

不幸的是,我无法重现结果.区别在于(a)Erik选择Java而不是Python,以及(b)他使用MSB就地基数排序,而我只是在Python字典中填充存储桶.

根据理论,基数排序应比快速排序更快(线性).但是显然,这在很大程度上取决于实现.那我的错误在哪里?

这是比较两种算法的代码:

from sys   import argv
from time  import clock

from pylab import array, vectorize
from pylab import absolute, log10, randint
from pylab import semilogy, grid, legend, title, show

###############################################################################
# radix sort
###############################################################################

def splitmerge0 (ls, digit): ## python (pure!)

    seq = map (lambda n: ((n // 10 ** digit) % 10, n), ls)
    buf = {0:[], 1:[], 2:[], 3:[], 4:[], 5:[], 6:[], 7:[], 8:[], 9:[]}

    return reduce (lambda acc, key: acc.extend(buf[key]) or acc,
        reduce (lambda _, (d,n): buf[d].append (n) or buf, seq, buf), [])

def splitmergeX (ls, digit): ## python & numpy

    seq = array (vectorize (lambda n: ((n // 10 ** digit) % 10, n)) (ls)).T
    buf = {0:[], 1:[], 2:[], 3:[], 4:[], 5:[], 6:[], 7:[], 8:[], 9:[]}

    return array (reduce (lambda acc, key: acc.extend(buf[key]) or acc,
        reduce (lambda _, (d,n): buf[d].append (n) or buf, seq, buf), []))

def radixsort (ls, fn = splitmergeX):

    return reduce (fn, xrange (int (log10 (absolute (ls).max ()) + 1)), ls)

###############################################################################
# quick sort
###############################################################################

def partition (ls, start, end, pivot_index):

    lower = start
    upper = end - 1

    pivot = ls[pivot_index]
    ls[pivot_index] = ls[end]

    while True:

        while lower <= upper and ls[lower] <  pivot: lower += 1
        while lower <= upper and ls[upper] >= pivot: upper -= 1
        if lower > upper: break

        ls[lower], ls[upper] = ls[upper], ls[lower]

    ls[end] = ls[lower]
    ls[lower] = pivot

    return lower

def qsort_range (ls, start, end):

    if end - start + 1 < 32:
        insertion_sort(ls, start, end)
    else:
        pivot_index = partition (ls, start, end, randint (start, end))
        qsort_range (ls, start, pivot_index - 1)
        qsort_range (ls, pivot_index + 1, end)

    return ls

def insertion_sort (ls, start, end):

    for idx in xrange (start, end + 1):
        el = ls[idx]
        for jdx in reversed (xrange(0, idx)):
            if ls[jdx] <= el:
                ls[jdx + 1] = el
                break
            ls[jdx + 1] = ls[jdx]
        else:
            ls[0] = el

    return ls

def quicksort (ls):

    return qsort_range (ls, 0, len (ls) - 1)

###############################################################################
if __name__ == "__main__":
###############################################################################

    lower = int (argv [1]) ## requires: >= 2
    upper = int (argv [2]) ## requires: >= 2
    color = dict (enumerate (3*['r','g','b','c','m','k']))

    rslbl = "radix sort"
    qslbl = "quick sort"

    for value in xrange (lower, upper):

        #######################################################################

        ls = randint (1, value, size=value)

        t0 = clock ()
        rs = radixsort (ls)
        dt = clock () - t0

        print "%06d -- t0:%0.6e, dt:%0.6e" % (value, t0, dt)
        semilogy (value, dt, '%s.' % color[int (log10 (value))], label=rslbl)

        #######################################################################

        ls = randint (1, value, size=value)

        t0 = clock ()
        rs = quicksort (ls)
        dt = clock () - t0

        print "%06d -- t0:%0.6e, dt:%0.6e" % (value, t0, dt)
        semilogy (value, dt, '%sx' % color[int (log10 (value))], label=qslbl)

    grid ()
    legend ((rslbl,qslbl), numpoints=3, shadow=True, prop={'size':'small'})
    title ('radix & quick sort: #(integer) vs duration [s]')
    show ()

###############################################################################
###############################################################################

这是比较大小为2到1250(水平轴)的整数数组的以秒为单位的排序持续时间(对数垂直轴)的结果;下部曲线属于快速排序:

> Radix vs Quick Sort Comparison

快速排序在功率变化时很平滑(例如10、100或1000),但基数排序只是跳了一点,但在定性上遵循了与快速排序相同的路径,只是速度慢得多!

解决方法:

您在这里遇到几个问题.

首先,正如注释中指出的那样,您的数据集对于理论上的复杂性而言太小了,无法克服代码中的开销.

接下来,使用所有那些不必要的函数调用和周围的列表复制实现将非常低效.以简单的过程方式编写代码几乎总是比功能解决方案要快(对于Python,此处其他语言会有所不同).您具有快速排序的程序实现,因此,如果以相同的样式编写基数排序,即使对于较小的列表,其结果也可能更快.

最后,可能是当您尝试使用大型列表时,内存管理的开销开始占主导地位.这意味着您在小列表(其中实现的效率是主要因素)与大列表(其中内存管理是主要因素)之间的窗口有限.

这是一些使用您的快速排序的代码,但是使用了一个简单的基数排序过程,但是试图避免大量复制数据.您会看到,即使对于短列表,它也比快速排序更胜一筹,但随着数据大小的增加,更有趣的是,快速排序和基数排序之间的比率也会随之下降,然后随着内存管理开始占主导地位,它又开始下降(诸如释放之类的简单操作)一百万个项目的清单需要花费大量时间):

from random import randint
from math import log10
from time import clock
from itertools import chain

def splitmerge0 (ls, digit): ## python (pure!)

    seq = map (lambda n: ((n // 10 ** digit) % 10, n), ls)
    buf = {0:[], 1:[], 2:[], 3:[], 4:[], 5:[], 6:[], 7:[], 8:[], 9:[]}

    return reduce (lambda acc, key: acc.extend(buf[key]) or acc,
        reduce (lambda _, (d,n): buf[d].append (n) or buf, seq, buf), [])

def splitmerge1 (ls, digit): ## python (readable!)
    buf = [[] for i in range(10)]
    divisor = 10 ** digit
    for n in ls:
        buf[(n//divisor)%10].append(n)
    return chain(*buf)

def radixsort (ls, fn = splitmerge1):
    return list(reduce (fn, xrange (int (log10 (max(abs(val) for val in ls)) + 1)), ls))

###############################################################################
# quick sort
###############################################################################

def partition (ls, start, end, pivot_index):

    lower = start
    upper = end - 1

    pivot = ls[pivot_index]
    ls[pivot_index] = ls[end]

    while True:

        while lower <= upper and ls[lower] <  pivot: lower += 1
        while lower <= upper and ls[upper] >= pivot: upper -= 1
        if lower > upper: break

        ls[lower], ls[upper] = ls[upper], ls[lower]

    ls[end] = ls[lower]
    ls[lower] = pivot

    return lower

def qsort_range (ls, start, end):

    if end - start + 1 < 32:
        insertion_sort(ls, start, end)
    else:
        pivot_index = partition (ls, start, end, randint (start, end))
        qsort_range (ls, start, pivot_index - 1)
        qsort_range (ls, pivot_index + 1, end)

    return ls

def insertion_sort (ls, start, end):

    for idx in xrange (start, end + 1):
        el = ls[idx]
        for jdx in reversed (xrange(0, idx)):
            if ls[jdx] <= el:
                ls[jdx + 1] = el
                break
            ls[jdx + 1] = ls[jdx]
        else:
            ls[0] = el

    return ls

def quicksort (ls):

    return qsort_range (ls, 0, len (ls) - 1)

if __name__=='__main__':
    for value in 1000, 10000, 100000, 1000000, 10000000:
        ls = [randint (1, value) for _ in range(value)]
        ls2 = list(ls)
        last = -1
        start = clock()
        ls = radixsort(ls)
        end = clock()
        for i in ls:
            assert last <= i
            last = i
        print("rs %d: %0.2fs" % (value, end-start))
        tdiff = end-start
        start = clock()
        ls2 = quicksort(ls2)
        end = clock()
        last = -1
        for i in ls2:
            assert last <= i
            last = i
        print("qs %d: %0.2fs %0.2f%%" % (value, end-start, ((end-start)/tdiff*100)))

运行此命令时的输出是:

C:\temp>c:\python27\python radixsort.py
rs 1000: 0.00s
qs 1000: 0.00s 212.98%
rs 10000: 0.02s
qs 10000: 0.05s 291.28%
rs 100000: 0.19s
qs 100000: 0.58s 311.98%
rs 1000000: 2.47s
qs 1000000: 7.07s 286.33%
rs 10000000: 31.74s
qs 10000000: 86.04s 271.08%

编辑:
只是为了澄清.这里的quicksort实现对内存非常友好,它可以就地排序,因此无论列表多大,它都只是在乱序地复制数据而不复制它.原始的radixsort对每个数字有效地复制一次列表:一次复制到较小的列表中,然后在连接列表时再次复制.使用itertools.chain可以避免第二个副本,但是仍然存在大量内存分配/取消分配. (“两次”也是近似值,因为列表附加确实涉及额外的复制,即使将其摊销为O(1)也是如此,所以我应该说“成倍地成比例”.)

上一篇:为什么我的quicksort在python中这么慢?


下一篇:使用网云穿「内网穿透」实现外网访问内网瑞友天翼等软件