文章目录
1.霍夫曼编码介绍
在此我并不想长篇大论的介绍什么是霍夫曼编码,如果有对这个概念还不是特别了解的同学,建议先去看一下百度百科。我们先来回顾一下霍夫曼编码的思想,统计输入字节流中每个字节出现的频率,然后把出现频率比较高的字节编码成比较短比特流。同时,它还有一个非常重要的性质,就是任意一个字节的编码不会是其它字节的编码的前缀。由于这个性质,我们在传输数据时并不需要增加分隔符,接收端依然可以正常区分编码。
霍夫曼编码过程主要就是构建出霍夫曼树(二叉树),然后遍历树得到编码表,最后根据编码表对输入数据进行压缩,下面给出流程图:
最后说一下怎么依据霍夫曼树得到编码表。我们从根节点开始进行深度优先遍历,初始时编码字符串为空,进入左子树,编码字符串加上‘1’,进入右子树,编码字符串加上‘0’,遇到叶子节点,则该叶子节点对应的字符的编码就等于此时的编码字符串。上面的‘0’、‘1’可以互换位置,没有关系。
举一个例子,假设输入数据流只有a、b、c、d四个字符,其出现频率分别为:0.1、0.2、0.3、0.4,那么构建出来的霍夫曼树为:
所以字符a对应000,字符b对应001,字符c对应01,字符d对应1。假设输入字符串为abbcccdddd,那么对应的结果为:000 001 001 01 01 01 1 1 1 1。
2.回归到实际问题
现在让我们考虑一下,如果实际运行霍夫曼算法,还有哪些需要注意的地方。
2.1bit还是byte?
之前的例子中我们把字符映射到了bit流,然而实际输出时bit流还是要转换为byte流,这个比较简单,8个bit8个bit进行转换就可以了。
2.2如何标记数据流结束?
但是bit流转为byte流就引入了一个新的问题,bit流总长度并不一定被8整除,所以转换为byte流后可能会有多余的数据。比如我们在第一节中举的例子:000 001 001 01 01 01 1 1 1 1,在转换为byte流后:00000100 10101011 11100000。那么在解码时就会遇到多余的5个0,从而导致错误……
一般来说有两个解决方案,第一就是在头部先加上数据流的有效长度;第二就是引入一个结束字符。我个人感觉第二种方法是比较好的,考虑到字节的取值范围为0-255,我引入256作为终止字符,在建堆时把这个终止字符也插入到堆中,同时令它的频率为0,这样就可以得到终止字符的编码了。在解码时如果处理到了终止字符,就停止处理数据流。
2.3如何还原霍夫曼树?
现在我们到了最重要的一个问题,如何还原出霍夫曼树?如果是静态霍夫曼树,即通信双方约定好编码表,那么肯定无需还原。但是这种做法不具有一般性,对于动态霍夫曼树来说,发送方需要在数据流头部添加一些额外数据,以供接收方还原出霍夫曼树或编码表。
像我这种菜鸡能想到的方法就是,传输每个字节对应的编码,然而这样做会增加很多无用的数据……在数据流本身就不是很大的情况下简直就是灾难。有没有什么方法可以使用少量的数据就能还原出霍夫曼树?有,使用范式霍夫曼编码。
3.范式霍夫曼编码
范式哈夫曼编码最早由 Schwartz(1964) 提出,Canonical 这个词是规范化,遵守一定标准的意思。范式哈夫曼编码,是哈夫曼编码的一个子集。其基本思想,是对哈夫曼编码施加某些强制约定,让其遵守一些规则,之后根据规则,使用很少的数据便能重构出编码树。
范式霍夫曼编码要求相同长度编码必须是连续的,例如:长度为4的编码0001,其他相同长度的编码必须为0010、0011、0100…等等。为了尽可能降低存储空间,编码长度为
j
j
j 的第一个符号可以从编码长度为
j
−
1
j-1
j−1 的最后一个符号所得知,即
c
j
=
2
∗
(
c
j
−
1
+
1
)
c_j=2*(c_{j-1}+1)
cj=2∗(cj−1+1) ,例如:从长度为3的最后一个编码100,可推知长度为4的第一个编码为1010。最后,最小编码长度的第一个编码必须从0开始。范式霍夫曼编码通过这些原则,便可以从每个编码还原整个霍夫曼编码树。
说到底其实就是三个规则:
- 最小编码长度的第一个编码必须从0开始。
- 相同长度编码必须是连续的。
- 编码长度为 j j j 的第一个符号可以从编码长度为 j − 1 j-1 j−1 的最后一个符号所得知,即 c j = 2 ∗ ( c j − 1 + 1 ) c_j=2*(c_{j-1}+1) cj=2∗(cj−1+1)。
我们还是拿之前的例子来说:字符a对应000,字符b对应001,字符c对应01,字符d对应1。由于最小编码长度必须从0开始,所以d对应0,由公式知字符c对应10,字符a对应110,字符b对应111。从这里可以看出,霍夫曼编码中字符x对应ybit数据,那么在范式霍夫曼编码中,它依然对应ybit数据,所占空间不变,只不过是对应的比特流变化了。而且它依旧保持了霍夫曼编码的特性,即任意字节映射的编码都不是其它字节所映射的编码的前缀(更加细致的例子可以看这篇文章)。
从重新计算编码的过程中可以发现,一个字节的映射编码仅和长度有关。所以接收方重构编码表仅仅需要知道每个字节对应的编码长度,之前的那个例子可以通过下面这些数据还原:
- 1 ‘d’
- 1 ‘c’
- 2 ‘a’ ‘b’
什么意思呢?假设初始编码长度 l l l为1,每一行的第一个数字表示编码长度等于 l l l的字节个数,接下来 l l l个字节表示这些字节,每读一行数据就把编码长度 l l l自增一。这里的行是抽象的概念,实际工作时可以按照这个流程进行:
- 令 l = 1 l=1 l=1
- 读取一个字节的数据将其转换为整数 a a a,读取接下来的 a a a个字节。
- 令 l = l + 1 l=l+1 l=l+1,回到步骤2。
如果某个
l
l
l不对应任何数据,我们依然需要放置一个字符0,表示没有任何字节和这个编码长度对应。那么什么时候停止呢?我们可以预设一个最大编码长度
m
a
x
l
maxl
maxl,当
l
l
l增长到这个数值时就停止。那么最多需要额外的
m
a
x
l
+
x
maxl+x
maxl+x(
x
x
x表示输入字节流中不同的字节的个数)byte数据就可以还原出编码表,而且
m
a
x
l
maxl
maxl一般不会特别大。
接下来看一下怎么通过这些数据还原出编码表:
- 初始编码值 v = 0 v=0 v=0,长度 l = 1 l=1 l=1。
- 读入一个整数 x = 1 x=1 x=1,读入一个字节 d d d,由此知道 d d d对应的编码为0, v = v + 1 = 1 v=v+1=1 v=v+1=1,已经读完 x x x个字节, l = l + 1 = 2 , v = 2 ∗ v = 2 l=l+1=2,v=2*v=2 l=l+1=2,v=2∗v=2。
- 读入一个整数 x = 1 x=1 x=1,读入一个字节 c c c,由此知道 c c c对应的编码为10, v = v + 1 = 3 v=v+1=3 v=v+1=3,已经读完 x x x个字节, l = l + 1 = 3 , v = 2 ∗ v = 6 l=l+1=3,v=2*v=6 l=l+1=3,v=2∗v=6。
- 读入一个整数 x = 2 x=2 x=2,读入一个字节 a a a,由此知道 a a a对应的编码为110, v = v + 1 = 7 v=v+1=7 v=v+1=7,继续读入下一个字节 b b b,由此知道 b b b对应的编码为111,已经读完 x x x个字节, l = l + 1 = 4 , v = 2 ∗ v = 14 l=l+1=4,v=2*v=14 l=l+1=4,v=2∗v=14。
- ……
- l > m a x l l>maxl l>maxl,结束。
4.代码
# coding:utf-8
import struct
def uint_to_bits(val, width, return_tuple=False):
'''
将给定uint(val)转换为对应的二进制形式 返回一个长度为width的01列表/元组
(15,8)-->[0,0,0,0,1,1,1,1]
'''
i = width - 1
flag = 1 << i
bits = [0] * width
while i >= 0:
if val & flag:
bits[width - i - 1] = 1
i -= 1
flag >>= 1
if return_tuple:
return tuple(bits)
return bits
def bits_to_uint(bits, width):
'''
将给定的01列表(bits)转换为uint并返回 width指定位数
([1,1,1,1],8)-->128+64+32+16
'''
flag = 1 << (width - 1)
uint8 = 0
for each in bits:
if each:
uint8 += flag
flag >>= 1
return uint8
# 霍夫曼编码所允许的最多bit位 1个字符最多映射到max_encode_bits位数据
max_encode_bits = 16
class Heap:
'''
小根堆
'''
def __init__(self, lst=None):
'''
初始化接受一个列表 O(n)复杂度建堆
'''
self.element = [0]
self.num = 0
if lst:
self.element.extend(lst)
self.num = len(lst)
idx = self.num >> 1
while idx >= 1:
self.modify(idx)
idx -= 1
def insert(self, val):
'''
插入元素 元素需定义<方法
'''
self.element.append(val)
self.num += 1
idx = self.num
while idx > 1:
parrent_idx = idx >> 1
if self.element[idx] < self.element[parrent_idx]:
self.element[idx], self.element[parrent_idx] = self.element[parrent_idx], self.element[idx]
else:
break
idx = parrent_idx
def modify(self, beg_idx):
'''
自顶向下维护堆的结构
'''
idx = beg_idx
child_idx = idx << 1
while child_idx <= self.num:
if child_idx | 1 <= self.num and self.element[child_idx | 1] < self.element[child_idx]:
child_idx |= 1
if self.element[child_idx] < self.element[idx]:
self.element[child_idx], self.element[idx] = self.element[idx], self.element[child_idx]
else:
break
idx = child_idx
child_idx = idx << 1
def front(self):
'''
返回堆顶
'''
if self.num < 1:
raise Exception('empty heap')
else:
return self.element[1]
def remove(self):
'''
移除并返回堆顶
'''
if self.num < 1:
raise Exception('empty heap')
elif self.num == 1:
self.num = 0
return self.element.pop()
else:
self.element[1], self.element[-1] = self.element[-1], self.element[1]
self.num -= 1
self.modify(1)
return self.element.pop()
def size(self):
'''
返回堆的元素个数
'''
return self.num
class HofmannNode:
'''
霍夫曼节点
'''
def __init__(self, ch, count=0):
self.lc = None
self.rc = None
# 节点对应字符
self.ch = ch
# 字符出现的次数
self.count = count
def __lt__(self, rnode):
return self.count < rnode.count
class HofmannTree:
'''
霍夫曼树 准确说是范式霍夫曼树
'''
# 标记数据流的结尾
end_symbol = 256
def getHofmannNodeHeap(self, byte_list):
'''
从字节流构建出堆
'''
count = {}
for byte in byte_list:
if byte in count:
count[byte] += 1
else:
count[byte] = 1
count[self.end_symbol] = 0
lst = [HofmannNode(ch, ct) for ch, ct in count.items()]
self.heap = Heap(lst)
def buildTreeAndEncodeLenTable(self):
'''
构建霍夫曼树并得到编码长度表(范式霍夫曼编码要用到)
'''
while self.heap.size() > 1:
lc = self.heap.remove()
rc = self.heap.remove()
parrent = HofmannNode(None, lc.count + rc.count)
parrent.lc = lc
parrent.rc = rc
self.heap.insert(parrent)
self.encode_len_table = [[] for _ in range(max_encode_bits + 1)]
self.dfs(self.heap.front(),0)
def dfs(self, root, length):
'''
搜索霍夫曼树得到编码长度表
'''
if root.lc == None and root.rc == None:
if length > max_encode_bits:
raise Exception('Hofmann encode too large--more than {0} bits'.format(max_encode_bits))
else:
self.encode_len_table[length].append(root.ch)
return
if root.lc:
self.dfs(root.lc, length + 1)
if root.rc:
self.dfs(root.rc, length + 1)
def compress(self, byte_list):
'''
压缩
'''
self.getHofmannNodeHeap(byte_list)
self.buildTreeAndEncodeLenTable()
encode = {}
code_val = 0
end_symbol_len = 0
# 依据范式霍夫曼编码规则重新构建编码表
for length in range(1, max_encode_bits + 1):
for char in self.encode_len_table[length]:
if char == self.end_symbol:
end_symbol_len = length
else:
encode[char] = uint_to_bits(code_val, length)
code_val += 1
if end_symbol_len != 0:
encode[self.end_symbol] = uint_to_bits(code_val, end_symbol_len)
break
code_val <<= 1
bits = []
compress_data = ''
# 构建数据流的头部信息以便解压时还原编码表
for length in range(1, max_encode_bits + 1):
if length == end_symbol_len:
compress_data += chr(len(self.encode_len_table[length]) - 1)
else:
compress_data += chr(len(self.encode_len_table[length]))
for char in self.encode_len_table[length]:
if char != self.end_symbol:
compress_data += char
# 处理数据流
for byte in byte_list:
bits += encode[byte]
while len(bits) >= 8:
compress_data += chr(bits_to_uint(bits[:8], 8))
bits = bits[8:]
bits += encode[self.end_symbol]
while bits:
compress_data += chr(bits_to_uint(bits[:8], 8))
bits = bits[8:]
return compress_data
def getDecodeDict(self, byte_list):
'''
从字节流头部的数据还原出编码表 返回第一个有效数据的位置
'''
self.decode = {}
self.min_encode_len = 0
idx = 0
code_val = 0
end_symbol_len = 0
end_symbol_code_val = 0
for length in range(1, max_encode_bits + 1):
num = ord(byte_list[idx])
idx += 1
for _ in range(num):
self.decode[uint_to_bits(code_val, length, return_tuple=True)] = byte_list[idx]
idx += 1
code_val += 1
if num:
end_symbol_len = length
end_symbol_code_val = code_val
if self.min_encode_len == 0:
self.min_encode_len = length
code_val <<= 1
self.decode[uint_to_bits(end_symbol_code_val, end_symbol_len, return_tuple=True)] = self.end_symbol
return idx
def decompress(self, byte_list):
'''
解压
'''
beg_idx = self.getDecodeDict(byte_list)
decompress_data = ''
bits = tuple()
length = len(byte_list)
for idx in range(beg_idx, length):
bits += uint_to_bits(ord(byte_list[idx]), 8, return_tuple=True)
while len(bits) >= max_encode_bits or idx == length - 1:
bits_idx = self.min_encode_len
while bits[:bits_idx] not in self.decode:
bits_idx += 1
data = self.decode[bits[:bits_idx]]
if data == self.end_symbol:
break
else:
decompress_data += data
bits = bits[bits_idx:]
return decompress_data
class Hofmann:
'''
范式霍夫曼编码
'''
@staticmethod
def compress(byte_list):
tree = HofmannTree()
return tree.compress(byte_list)
@staticmethod
def decompress(byte_list):
tree = HofmannTree()
return tree.decompress(byte_list)