本节课程大纲:
1.模块介绍 2.time &datetime模块 3.random 4.os 5.sys 6.shutil 7.json & picle 8.shelve 9.xml处理 10.yaml处理 11.configparser 12.hashlib 13.subprocess 14.logging模块 15.re正则表达式
一、模块介绍
1.定义:
用来从逻辑上组织Python代码(变量,函数,类,逻辑:实现一个功能),本质就是.py结尾的Python文件(例如test.py 对应的模块名就是test)
包的定义:用来从逻辑上组织模块的,本质就是一个目录(必须带有一个__init__.py文件)。
2.导入方法:
import module_name
import module1_name,import modulel2_name
from module_name import *
from module_name import m1,m2.m3
3.import本质(路径搜索和搜索路径)
导入模块的本质就是把python文件解释一遍,如:
import module_name --->module_name.py----->module_name.py的路径---->sys.path
导入包的本质就是执行该包下的__init__.py文件。
4.导入优化
from module_test import test
5.模块分类
A.标准库
B.开源模块
C.自定义模块
二、time 与 datetime模块
Python中,通常有这几种方式来表示时间:
1)时间戳
2)格式化的时间字符串
3)元组(struct_time)共九个元素。
由于Python的time模块实现主要调用C库,所以各个平台可能有所不同。
UTC(Coordinated Universal Time,世界协调时)亦即格林威治天文时间,世界标准时间。
在中国为UTC+8。
DST(Daylight Saving Time)即夏令时。
时间戳(timestamp)的方式:
通常来说,时间戳表示的是从1970年1月1日00:00:00开始按秒计算的偏移量。
我们运行“type(time.time())”,返回的是float类型。返回时间戳方式的函数主要有time(),clock()等。
元组(struct_time)方式:struct_time元组共有9个元素,返回struct_time的函数主要有gmtime(),localtime(),strptime()。
下面列出这种方式元组中的几个元素:
1.time:
>>> time.time() #返回当前时间戳
1471938474.8799927
>>> time.localtime() #返回本地时间 的struct time对象格式
time.struct_time(tm_year=2016, tm_mon=8, tm_mday=23, tm_hour=15, tm_min=18, tm_sec=10, tm_wday=1, tm_yday=236, tm_isdst=0)
>>> time.gmtime() #当前时间戳转化为UTC
time.struct_time(tm_year=2016, tm_mon=8, tm_mday=23, tm_hour=7, tm_min=52, tm_sec=15, tm_wday=1, tm_yday=236, tm_isdst=0)
>>> time.localtime()#当前时间utc+8
time.struct_time(tm_year=2016, tm_mon=8, tm_mday=23, tm_hour=15, tm_min=53, tm_sec=59, tm_wday=1, tm_yday=236, tm_isdst=0)
>>> time.localtime()
time.struct_time(tm_year=2016, tm_mon=8, tm_mday=23, tm_hour=16, tm_min=6, tm_sec=54, tm_wday=1, tm_yday=236, tm_isdst=0)
>>> time.mktime(x) #元组转时间戳
1471939614.0
>>> time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) #元组转化格式化字符串
'2016-08-23 16:06:54'
>>> time.strptime('2016-08-23 16:06:54','%Y-%m-%d %H:%M:%S') #格式化字符串转化原组
time.struct_time(tm_year=2016, tm_mon=8, tm_mday=23, tm_hour=16, tm_min=6, tm_sec=54, tm_wday=1, tm_yday=236, tm_isdst=-1)
strftime("格式",struct_time)--->"格式化字符串"
strptime("格式化字符串",“格式”)--->struct_time
>>> time.asctime()
'Tue Aug 23 16:44:04 2016'
>>> time.ctime()
'Tue Aug 23 16:44:20 2016'
>>> time.ctime(1471941808.3526566) #时间戳转特殊格式
'Tue Aug 23 16:43:28 2016'
2.datetime
>>> import datetime
>>> datetime.datetime.now()
datetime.datetime(2016, 8, 23, 17, 2, 0, 51242)
>>> print(datetime.datetime.now())
2016-08-23 17:02:12.747968
>>> print(datetime.datetime.now()+datetime.timedelta(+3)) #三天后的时间
2016-08-26 17:04:02.340236
>>> print(datetime.datetime.now()+datetime.timedelta(-3)) #三天前的时间
2016-08-20 17:04:08.062563
>>> print(datetime.datetime.now()+datetime.timedelta(hours=3)) #三小时后
2016-08-23 20:05:09.227062
>>> print(datetime.datetime.now()+datetime.timedelta(hours=-3))#三小时前
二、random模块
1.随机浮点数
>>> import random
>>> random.random()
0.38741916300777435
>>> random.random()
0.2726009482506605
>>> random.random()
0.8928518510787847
>>> random.random()
0.12703455294635024
>>> random.random()
0.054001403811667514
2.整数随机数
>>> random.randint(1,9) #随机1-9不包括9
3
>>> random.uniform(1,9) #指定区间
6.532363738442411
>>>random.randrange(0,5)#指基数递增集合中取随机数
3.随机获取元素
>>> random.choice('YoungCheung')
'C'
4.从序列中随机取指定长度的片
>>> random.sample('Young',5)
['g', 'o', 'Y', 'n', 'u']
实践:生成随机数
#!/usr/bin/python
# -*- conding:utf-8 -*-
__Author__ = "YoungCheung" import random checkcode=''
for i in range(5):
current=random.randrange(0,9)
if current == i:
tmp=chr(random.randint(65,90))
else:
tmp=random.randint(0,9)
checkcode+=str(tmp)
print(checkcode)
三、OS模块
os.getcwd() 获取当前工作目录,即当前python脚本工作的目录路径
os.chdir("dirname") 改变当前脚本工作目录;相当于shell下cd
os.curdir 返回当前目录: ('.')
os.pardir 获取当前目录的父目录字符串名:('..')
os.makedirs('dirname1/dirname2') 可生成多层递归目录
os.removedirs('dirname1') 若目录为空,则删除,并递归到上一级目录,如若也为空,则删除,依此类推
os.mkdir('dirname') 生成单级目录;相当于shell中mkdir dirname
os.rmdir('dirname') 删除单级空目录,若目录不为空则无法删除,报错;相当于shell中rmdir dirname
os.listdir('dirname') 列出指定目录下的所有文件和子目录,包括隐藏文件,并以列表方式打印
os.remove() 删除一个文件
os.rename("oldname","newname") 重命名文件/目录
os.stat('path/filename') 获取文件/目录信息
os.sep 输出操作系统特定的路径分隔符,win下为"\\",Linux下为"/"
os.linesep 输出当前平台使用的行终止符,win下为"\t\n",Linux下为"\n"
os.pathsep 输出用于分割文件路径的字符串
os.name 输出字符串指示当前使用平台。win->'nt'; Linux->'posix'
os.system("bash command") 运行shell命令,直接显示
os.environ 获取系统环境变量
os.path.abspath(path) 返回path规范化的绝对路径
os.path.split(path) 将path分割成目录和文件名二元组返回
os.path.dirname(path) 返回path的目录。其实就是os.path.split(path)的第一个元素
os.path.basename(path) 返回path最后的文件名。如何path以/或\结尾,那么就会返回空值。即os.path.split(path)的第二个元素
os.path.exists(path) 如果path存在,返回True;如果path不存在,返回False
os.path.isabs(path) 如果path是绝对路径,返回True
os.path.isfile(path) 如果path是一个存在的文件,返回True。否则返回False
os.path.isdir(path) 如果path是一个存在的目录,则返回True。否则返回False
os.path.join(path1[, path2[, ...]]) 将多个路径组合后返回,第一个绝对路径之前的参数将被忽略
os.path.getatime(path) 返回path所指向的文件或者目录的最后存取时间
os.path.getmtime(path) 返回path所指向的文件或者目录的最后修改时间
四、sys
sys.argv 命令行参数List,第一个元素是程序本身路径
sys.exit(n) 退出程序,正常退出时exit()
sys.version 获取Python解释程序的版本信息
sys.maxint 最大的Int值
sys.path 返回模块的搜索路径,初始化时使用PYTHONPATH环境变量的值
sys.platform 返回操作系统平台名称
sys.stdout.write('please:')
val = sys.stdin.readline()[:-]
五、shutil
高级的 文件、文件夹、压缩包 处理模块
shutil.copyfileobj(fsrc, fdst[, length])
将文件内容拷贝到另一个文件中,可以部分内容
def copyfileobj(fsrc, fdst, length=16*1024):
"""copy data from file-like object fsrc to file-like object fdst"""
while 1:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
shutil.copyfile(src, dst)
拷贝文件
def copyfile(src, dst):
"""Copy data from src to dst"""
if _samefile(src, dst):
raise Error("`%s` and `%s` are the same file" % (src, dst)) for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if stat.S_ISFIFO(st.st_mode):
raise SpecialFileError("`%s` is a named pipe" % fn) with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst)
shutil.copymode(src, dst)
仅拷贝权限。内容、组、用户均不变
def copymode(src, dst):
"""Copy mode bits from src to dst"""
if hasattr(os, 'chmod'):
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
os.chmod(dst, mode)
shutil.copystat(src, dst)
拷贝状态的信息,包括:mode bits, atime, mtime, flags
def copystat(src, dst):
"""Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
if hasattr(os, 'utime'):
os.utime(dst, (st.st_atime, st.st_mtime))
if hasattr(os, 'chmod'):
os.chmod(dst, mode)
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
try:
os.chflags(dst, st.st_flags)
except OSError, why:
for err in 'EOPNOTSUPP', 'ENOTSUP':
if hasattr(errno, err) and why.errno == getattr(errno, err):
break
else:
raise
shutil.copy(src, dst)
拷贝文件和权限
def copy(src, dst):
"""Copy data and mode bits ("cp src dst"). The destination may be a directory. """
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst)
copymode(src, dst)
shutil.copy2(src, dst)
拷贝文件和状态信息
def copy2(src, dst):
"""Copy data and all stat info ("cp -p src dst"). The destination may be a directory. """
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst)
copystat(src, dst)
shutil.ignore_patterns(*patterns)
shutil.copytree(src, dst, symlinks=False, ignore=None)
递归的去拷贝文件
例如:copytree(source, destination, ignore=ignore_patterns('*.pyc', 'tmp*'))
def ignore_patterns(*patterns):
"""Function that can be used as copytree() ignore parameter. Patterns is a sequence of glob-style patterns
that are used to exclude files"""
def _ignore_patterns(path, names):
ignored_names = []
for pattern in patterns:
ignored_names.extend(fnmatch.filter(names, pattern))
return set(ignored_names)
return _ignore_patterns def copytree(src, dst, symlinks=False, ignore=None):
"""Recursively copy a directory tree using copy2(). The destination directory must not already exist.
If exception(s) occur, an Error is raised with a list of reasons. If the optional symlinks flag is true, symbolic links in the
source tree result in symbolic links in the destination tree; if
it is false, the contents of the files pointed to by symbolic
links are copied. The optional ignore argument is a callable. If given, it
is called with the `src` parameter, which is the directory
being visited by copytree(), and `names` which is the list of
`src` contents, as returned by os.listdir(): callable(src, names) -> ignored_names Since copytree() is called recursively, the callable will be
called once for each directory that is copied. It returns a
list of names relative to the `src` directory that should
not be copied. XXX Consider this example code rather than the ultimate tool. """
names = os.listdir(src)
if ignore is not None:
ignored_names = ignore(src, names)
else:
ignored_names = set() os.makedirs(dst)
errors = []
for name in names:
if name in ignored_names:
continue
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
if symlinks and os.path.islink(srcname):
linkto = os.readlink(srcname)
os.symlink(linkto, dstname)
elif os.path.isdir(srcname):
copytree(srcname, dstname, symlinks, ignore)
else:
# Will raise a SpecialFileError for unsupported file types
copy2(srcname, dstname)
# catch the Error from the recursive copytree so that we can
# continue with other files
except Error, err:
errors.extend(err.args[0])
except EnvironmentError, why:
errors.append((srcname, dstname, str(why)))
try:
copystat(src, dst)
except OSError, why:
if WindowsError is not None and isinstance(why, WindowsError):
# Copying file access times may fail on Windows
pass
else:
errors.append((src, dst, str(why)))
if errors:
raise Error, errors
shutil.rmtree(path[, ignore_errors[, onerror]])
递归的去删除文件
def rmtree(path, ignore_errors=False, onerror=None):
"""Recursively delete a directory tree. If ignore_errors is set, errors are ignored; otherwise, if onerror
is set, it is called to handle the error with arguments (func,
path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
path is the argument to that function that caused it to fail; and
exc_info is a tuple returned by sys.exc_info(). If ignore_errors
is false and onerror is None, an exception is raised. """
if ignore_errors:
def onerror(*args):
pass
elif onerror is None:
def onerror(*args):
raise
try:
if os.path.islink(path):
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
except OSError:
onerror(os.path.islink, path, sys.exc_info())
# can't continue even if onerror hook returns
return
names = []
try:
names = os.listdir(path)
except os.error, err:
onerror(os.listdir, path, sys.exc_info())
for name in names:
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except os.error:
mode = 0
if stat.S_ISDIR(mode):
rmtree(fullname, ignore_errors, onerror)
else:
try:
os.remove(fullname)
except os.error, err:
onerror(os.remove, fullname, sys.exc_info())
try:
os.rmdir(path)
except os.error:
onerror(os.rmdir, path, sys.exc_info())
shutil.move(src, dst)
递归的去移动文件
def move(src, dst):
"""Recursively move a file or directory to another location. This is
similar to the Unix "mv" command. If the destination is a directory or a symlink to a directory, the source
is moved inside the directory. The destination path must not already
exist. If the destination already exists but is not a directory, it may be
overwritten depending on os.rename() semantics. If the destination is on our current filesystem, then rename() is used.
Otherwise, src is copied to the destination and then removed.
A lot more could be done here... A look at a mv.c shows a lot of
the issues this implementation glosses over. """
real_dst = dst
if os.path.isdir(dst):
if _samefile(src, dst):
# We might be on a case insensitive filesystem,
# perform the rename anyway.
os.rename(src, dst)
return real_dst = os.path.join(dst, _basename(src))
if os.path.exists(real_dst):
raise Error, "Destination path '%s' already exists" % real_dst
try:
os.rename(src, real_dst)
except OSError:
if os.path.isdir(src):
if _destinsrc(src, dst):
raise Error, "Cannot move a directory '%s' into itself '%s'." % (src, dst)
copytree(src, real_dst, symlinks=True)
rmtree(src)
else:
copy2(src, real_dst)
os.unlink(src)
shutil.make_archive(base_name, format,...)
创建压缩包并返回文件路径,例如:zip、tar
- base_name: 压缩包的文件名,也可以是压缩包的路径。只是文件名时,则保存至当前目录,否则保存至指定路径,
如:www =>保存至当前路径
如:/Users/wupeiqi/www =>保存至/Users/wupeiqi/ - format: 压缩包种类,“zip”, “tar”, “bztar”,“gztar”
- root_dir: 要压缩的文件夹路径(默认当前目录)
- owner: 用户,默认当前用户
- group: 组,默认当前组
- logger: 用于记录日志,通常是logging.Logger对象
#将 /Users/wupeiqi/Downloads/test 下的文件打包放置当前程序目录 import shutil
ret = shutil.make_archive("wwwwwwwwww", 'gztar', root_dir='/Users/wupeiqi/Downloads/test') #将 /Users/wupeiqi/Downloads/test 下的文件打包放置 /Users/wupeiqi/目录
import shutil
ret = shutil.make_archive("/Users/wupeiqi/wwwwwwwwww", 'gztar', root_dir='/Users/wupeiqi/Downloads/test')
def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
dry_run=0, owner=None, group=None, logger=None):
"""Create an archive file (eg. zip or tar). 'base_name' is the name of the file to create, minus any format-specific
extension; 'format' is the archive format: one of "zip", "tar", "bztar"
or "gztar". 'root_dir' is a directory that will be the root directory of the
archive; ie. we typically chdir into 'root_dir' before creating the
archive. 'base_dir' is the directory where we start archiving from;
ie. 'base_dir' will be the common prefix of all files and
directories in the archive. 'root_dir' and 'base_dir' both default
to the current directory. Returns the name of the archive file. 'owner' and 'group' are used when creating a tar archive. By default,
uses the current owner and group.
"""
save_cwd = os.getcwd()
if root_dir is not None:
if logger is not None:
logger.debug("changing into '%s'", root_dir)
base_name = os.path.abspath(base_name)
if not dry_run:
os.chdir(root_dir) if base_dir is None:
base_dir = os.curdir kwargs = {'dry_run': dry_run, 'logger': logger} try:
format_info = _ARCHIVE_FORMATS[format]
except KeyError:
raise ValueError, "unknown archive format '%s'" % format func = format_info[0]
for arg, val in format_info[1]:
kwargs[arg] = val if format != 'zip':
kwargs['owner'] = owner
kwargs['group'] = group try:
filename = func(base_name, base_dir, **kwargs)
finally:
if root_dir is not None:
if logger is not None:
logger.debug("changing back to '%s'", save_cwd)
os.chdir(save_cwd) return filename
shutil 对压缩包的处理是调用 ZipFile 和 TarFile 两个模块来进行的,详细:
import zipfile # 压缩
z = zipfile.ZipFile('laxi.zip', 'w')
z.write('a.log')
z.write('data.data')
z.close() # 解压
z = zipfile.ZipFile('laxi.zip', 'r')
z.extractall()
z.close()
zipfile压缩解压
import tarfile # 压缩
tar = tarfile.open('your.tar','w')
tar.add('/Users/wupeiqi/PycharmProjects/bbs2.zip', arcname='bbs2.zip')
tar.add('/Users/wupeiqi/PycharmProjects/cmdb.zip', arcname='cmdb.zip')
tar.close() # 解压
tar = tarfile.open('your.tar','r')
tar.extractall() # 可设置解压地址
tar.close()
tar 压缩解压
class ZipFile(object):
""" Class with methods to open, read, write, close, list zip files. z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=False) file: Either the path to the file, or a file-like object.
If it is a path, the file will be opened and closed by ZipFile.
mode: The mode can be either read "r", write "w" or append "a".
compression: ZIP_STORED (no compression) or ZIP_DEFLATED (requires zlib).
allowZip64: if True ZipFile will create files with ZIP64 extensions when
needed, otherwise it will raise an exception when this would
be necessary. """ fp = None # Set here since __del__ checks it def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False):
"""Open the ZIP file with mode read "r", write "w" or append "a"."""
if mode not in ("r", "w", "a"):
raise RuntimeError('ZipFile() requires mode "r", "w", or "a"') if compression == ZIP_STORED:
pass
elif compression == ZIP_DEFLATED:
if not zlib:
raise RuntimeError,\
"Compression requires the (missing) zlib module"
else:
raise RuntimeError, "That compression method is not supported" self._allowZip64 = allowZip64
self._didModify = False
self.debug = 0 # Level of printing: 0 through 3
self.NameToInfo = {} # Find file info given name
self.filelist = [] # List of ZipInfo instances for archive
self.compression = compression # Method of compression
self.mode = key = mode.replace('b', '')[0]
self.pwd = None
self._comment = '' # Check if we were passed a file-like object
if isinstance(file, basestring):
self._filePassed = 0
self.filename = file
modeDict = {'r' : 'rb', 'w': 'wb', 'a' : 'r+b'}
try:
self.fp = open(file, modeDict[mode])
except IOError:
if mode == 'a':
mode = key = 'w'
self.fp = open(file, modeDict[mode])
else:
raise
else:
self._filePassed = 1
self.fp = file
self.filename = getattr(file, 'name', None) try:
if key == 'r':
self._RealGetContents()
elif key == 'w':
# set the modified flag so central directory gets written
# even if no files are added to the archive
self._didModify = True
elif key == 'a':
try:
# See if file is a zip file
self._RealGetContents()
# seek to start of directory and overwrite
self.fp.seek(self.start_dir, 0)
except BadZipfile:
# file is not a zip file, just append
self.fp.seek(0, 2) # set the modified flag so central directory gets written
# even if no files are added to the archive
self._didModify = True
else:
raise RuntimeError('Mode must be "r", "w" or "a"')
except:
fp = self.fp
self.fp = None
if not self._filePassed:
fp.close()
raise def __enter__(self):
return self def __exit__(self, type, value, traceback):
self.close() def _RealGetContents(self):
"""Read in the table of contents for the ZIP file."""
fp = self.fp
try:
endrec = _EndRecData(fp)
except IOError:
raise BadZipfile("File is not a zip file")
if not endrec:
raise BadZipfile, "File is not a zip file"
if self.debug > 1:
print endrec
size_cd = endrec[_ECD_SIZE] # bytes in central directory
offset_cd = endrec[_ECD_OFFSET] # offset of central directory
self._comment = endrec[_ECD_COMMENT] # archive comment # "concat" is zero, unless zip was concatenated to another file
concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
if endrec[_ECD_SIGNATURE] == stringEndArchive64:
# If Zip64 extension structures are present, account for them
concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator) if self.debug > 2:
inferred = concat + offset_cd
print "given, inferred, offset", offset_cd, inferred, concat
# self.start_dir: Position of start of central directory
self.start_dir = offset_cd + concat
fp.seek(self.start_dir, 0)
data = fp.read(size_cd)
fp = cStringIO.StringIO(data)
total = 0
while total < size_cd:
centdir = fp.read(sizeCentralDir)
if len(centdir) != sizeCentralDir:
raise BadZipfile("Truncated central directory")
centdir = struct.unpack(structCentralDir, centdir)
if centdir[_CD_SIGNATURE] != stringCentralDir:
raise BadZipfile("Bad magic number for central directory")
if self.debug > 2:
print centdir
filename = fp.read(centdir[_CD_FILENAME_LENGTH])
# Create ZipInfo instance to store file information
x = ZipInfo(filename)
x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET]
(x.create_version, x.create_system, x.extract_version, x.reserved,
x.flag_bits, x.compress_type, t, d,
x.CRC, x.compress_size, x.file_size) = centdir[1:12]
x.volume, x.internal_attr, x.external_attr = centdir[15:18]
# Convert date/time code to (year, month, day, hour, min, sec)
x._raw_time = t
x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F,
t>>11, (t>>5)&0x3F, (t&0x1F) * 2 ) x._decodeExtra()
x.header_offset = x.header_offset + concat
x.filename = x._decodeFilename()
self.filelist.append(x)
self.NameToInfo[x.filename] = x # update total bytes read from central directory
total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH]
+ centdir[_CD_EXTRA_FIELD_LENGTH]
+ centdir[_CD_COMMENT_LENGTH]) if self.debug > 2:
print "total", total def namelist(self):
"""Return a list of file names in the archive."""
l = []
for data in self.filelist:
l.append(data.filename)
return l def infolist(self):
"""Return a list of class ZipInfo instances for files in the
archive."""
return self.filelist def printdir(self):
"""Print a table of contents for the zip file."""
print "%-46s %19s %12s" % ("File Name", "Modified ", "Size")
for zinfo in self.filelist:
date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6]
print "%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size) def testzip(self):
"""Read all the files and check the CRC."""
chunk_size = 2 ** 20
for zinfo in self.filelist:
try:
# Read by chunks, to avoid an OverflowError or a
# MemoryError with very large embedded files.
with self.open(zinfo.filename, "r") as f:
while f.read(chunk_size): # Check CRC-32
pass
except BadZipfile:
return zinfo.filename def getinfo(self, name):
"""Return the instance of ZipInfo given 'name'."""
info = self.NameToInfo.get(name)
if info is None:
raise KeyError(
'There is no item named %r in the archive' % name) return info def setpassword(self, pwd):
"""Set default password for encrypted files."""
self.pwd = pwd @property
def comment(self):
"""The comment text associated with the ZIP file."""
return self._comment @comment.setter
def comment(self, comment):
# check for valid comment length
if len(comment) > ZIP_MAX_COMMENT:
import warnings
warnings.warn('Archive comment is too long; truncating to %d bytes'
% ZIP_MAX_COMMENT, stacklevel=2)
comment = comment[:ZIP_MAX_COMMENT]
self._comment = comment
self._didModify = True def read(self, name, pwd=None):
"""Return file bytes (as a string) for name."""
return self.open(name, "r", pwd).read() def open(self, name, mode="r", pwd=None):
"""Return file-like object for 'name'."""
if mode not in ("r", "U", "rU"):
raise RuntimeError, 'open() requires mode "r", "U", or "rU"'
if not self.fp:
raise RuntimeError, \
"Attempt to read ZIP archive that was already closed" # Only open a new file for instances where we were not
# given a file object in the constructor
if self._filePassed:
zef_file = self.fp
should_close = False
else:
zef_file = open(self.filename, 'rb')
should_close = True try:
# Make sure we have an info object
if isinstance(name, ZipInfo):
# 'name' is already an info object
zinfo = name
else:
# Get info object for name
zinfo = self.getinfo(name) zef_file.seek(zinfo.header_offset, 0) # Skip the file header:
fheader = zef_file.read(sizeFileHeader)
if len(fheader) != sizeFileHeader:
raise BadZipfile("Truncated file header")
fheader = struct.unpack(structFileHeader, fheader)
if fheader[_FH_SIGNATURE] != stringFileHeader:
raise BadZipfile("Bad magic number for file header") fname = zef_file.read(fheader[_FH_FILENAME_LENGTH])
if fheader[_FH_EXTRA_FIELD_LENGTH]:
zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) if fname != zinfo.orig_filename:
raise BadZipfile, \
'File name in directory "%s" and header "%s" differ.' % (
zinfo.orig_filename, fname) # check for encrypted flag & handle password
is_encrypted = zinfo.flag_bits & 0x1
zd = None
if is_encrypted:
if not pwd:
pwd = self.pwd
if not pwd:
raise RuntimeError, "File %s is encrypted, " \
"password required for extraction" % name zd = _ZipDecrypter(pwd)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# or the MSB of the file time depending on the header type
# and is used to check the correctness of the password.
bytes = zef_file.read(12)
h = map(zd, bytes[0:12])
if zinfo.flag_bits & 0x8:
# compare against the file type from extended local headers
check_byte = (zinfo._raw_time >> 8) & 0xff
else:
# compare against the CRC otherwise
check_byte = (zinfo.CRC >> 24) & 0xff
if ord(h[11]) != check_byte:
raise RuntimeError("Bad password for file", name) return ZipExtFile(zef_file, mode, zinfo, zd,
close_fileobj=should_close)
except:
if should_close:
zef_file.close()
raise def extract(self, member, path=None, pwd=None):
"""Extract a member from the archive to the current working directory,
using its full name. Its file information is extracted as accurately
as possible. `member' may be a filename or a ZipInfo object. You can
specify a different directory using `path'.
"""
if not isinstance(member, ZipInfo):
member = self.getinfo(member) if path is None:
path = os.getcwd() return self._extract_member(member, path, pwd) def extractall(self, path=None, members=None, pwd=None):
"""Extract all members from the archive to the current working
directory. `path' specifies a different directory to extract to.
`members' is optional and must be a subset of the list returned
by namelist().
"""
if members is None:
members = self.namelist() for zipinfo in members:
self.extract(zipinfo, path, pwd) def _extract_member(self, member, targetpath, pwd):
"""Extract the ZipInfo object 'member' to a physical
file on the path targetpath.
"""
# build the destination pathname, replacing
# forward slashes to platform specific separators.
arcname = member.filename.replace('/', os.path.sep) if os.path.altsep:
arcname = arcname.replace(os.path.altsep, os.path.sep)
# interpret absolute pathname as relative, remove drive letter or
# UNC path, redundant separators, "." and ".." components.
arcname = os.path.splitdrive(arcname)[1]
arcname = os.path.sep.join(x for x in arcname.split(os.path.sep)
if x not in ('', os.path.curdir, os.path.pardir))
if os.path.sep == '\\':
# filter illegal characters on Windows
illegal = ':<>|"?*'
if isinstance(arcname, unicode):
table = {ord(c): ord('_') for c in illegal}
else:
table = string.maketrans(illegal, '_' * len(illegal))
arcname = arcname.translate(table)
# remove trailing dots
arcname = (x.rstrip('.') for x in arcname.split(os.path.sep))
arcname = os.path.sep.join(x for x in arcname if x) targetpath = os.path.join(targetpath, arcname)
targetpath = os.path.normpath(targetpath) # Create all upper directories if necessary.
upperdirs = os.path.dirname(targetpath)
if upperdirs and not os.path.exists(upperdirs):
os.makedirs(upperdirs) if member.filename[-1] == '/':
if not os.path.isdir(targetpath):
os.mkdir(targetpath)
return targetpath with self.open(member, pwd=pwd) as source, \
file(targetpath, "wb") as target:
shutil.copyfileobj(source, target) return targetpath def _writecheck(self, zinfo):
"""Check for errors before writing a file to the archive."""
if zinfo.filename in self.NameToInfo:
import warnings
warnings.warn('Duplicate name: %r' % zinfo.filename, stacklevel=3)
if self.mode not in ("w", "a"):
raise RuntimeError, 'write() requires mode "w" or "a"'
if not self.fp:
raise RuntimeError, \
"Attempt to write ZIP archive that was already closed"
if zinfo.compress_type == ZIP_DEFLATED and not zlib:
raise RuntimeError, \
"Compression requires the (missing) zlib module"
if zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED):
raise RuntimeError, \
"That compression method is not supported"
if not self._allowZip64:
requires_zip64 = None
if len(self.filelist) >= ZIP_FILECOUNT_LIMIT:
requires_zip64 = "Files count"
elif zinfo.file_size > ZIP64_LIMIT:
requires_zip64 = "Filesize"
elif zinfo.header_offset > ZIP64_LIMIT:
requires_zip64 = "Zipfile size"
if requires_zip64:
raise LargeZipFile(requires_zip64 +
" would require ZIP64 extensions") def write(self, filename, arcname=None, compress_type=None):
"""Put the bytes from filename into the archive under the name
arcname."""
if not self.fp:
raise RuntimeError(
"Attempt to write to ZIP archive that was already closed") st = os.stat(filename)
isdir = stat.S_ISDIR(st.st_mode)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
# Create ZipInfo instance to store file information
if arcname is None:
arcname = filename
arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
while arcname[0] in (os.sep, os.altsep):
arcname = arcname[1:]
if isdir:
arcname += '/'
zinfo = ZipInfo(arcname, date_time)
zinfo.external_attr = (st[0] & 0xFFFF) << 16L # Unix attributes
if compress_type is None:
zinfo.compress_type = self.compression
else:
zinfo.compress_type = compress_type zinfo.file_size = st.st_size
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell() # Start of header bytes self._writecheck(zinfo)
self._didModify = True if isdir:
zinfo.file_size = 0
zinfo.compress_size = 0
zinfo.CRC = 0
zinfo.external_attr |= 0x10 # MS-DOS directory flag
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
self.fp.write(zinfo.FileHeader(False))
return with open(filename, "rb") as fp:
# Must overwrite CRC and sizes with correct data later
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
# Compressed size can be larger than uncompressed size
zip64 = self._allowZip64 and \
zinfo.file_size * 1.05 > ZIP64_LIMIT
self.fp.write(zinfo.FileHeader(zip64))
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -15)
else:
cmpr = None
file_size = 0
while 1:
buf = fp.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
self.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
if not zip64 and self._allowZip64:
if file_size > ZIP64_LIMIT:
raise RuntimeError('File size has increased during compressing')
if compress_size > ZIP64_LIMIT:
raise RuntimeError('Compressed size larger than uncompressed size')
# Seek backwards and write file header (which will now include
# correct CRC and file sizes)
position = self.fp.tell() # Preserve current position in file
self.fp.seek(zinfo.header_offset, 0)
self.fp.write(zinfo.FileHeader(zip64))
self.fp.seek(position, 0)
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo def writestr(self, zinfo_or_arcname, bytes, compress_type=None):
"""Write a file into the archive. The contents is the string
'bytes'. 'zinfo_or_arcname' is either a ZipInfo instance or
the name of the file in the archive."""
if not isinstance(zinfo_or_arcname, ZipInfo):
zinfo = ZipInfo(filename=zinfo_or_arcname,
date_time=time.localtime(time.time())[:6]) zinfo.compress_type = self.compression
if zinfo.filename[-1] == '/':
zinfo.external_attr = 0o40775 << 16 # drwxrwxr-x
zinfo.external_attr |= 0x10 # MS-DOS directory flag
else:
zinfo.external_attr = 0o600 << 16 # ?rw-------
else:
zinfo = zinfo_or_arcname if not self.fp:
raise RuntimeError(
"Attempt to write to ZIP archive that was already closed") if compress_type is not None:
zinfo.compress_type = compress_type zinfo.file_size = len(bytes) # Uncompressed size
zinfo.header_offset = self.fp.tell() # Start of header bytes
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = crc32(bytes) & 0xffffffff # CRC-32 checksum
if zinfo.compress_type == ZIP_DEFLATED:
co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -15)
bytes = co.compress(bytes) + co.flush()
zinfo.compress_size = len(bytes) # Compressed size
else:
zinfo.compress_size = zinfo.file_size
zip64 = zinfo.file_size > ZIP64_LIMIT or \
zinfo.compress_size > ZIP64_LIMIT
if zip64 and not self._allowZip64:
raise LargeZipFile("Filesize would require ZIP64 extensions")
self.fp.write(zinfo.FileHeader(zip64))
self.fp.write(bytes)
if zinfo.flag_bits & 0x08:
# Write CRC and file sizes after the file data
fmt = '<LQQ' if zip64 else '<LLL'
self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size,
zinfo.file_size))
self.fp.flush()
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo def __del__(self):
"""Call the "close()" method in case the user forgot."""
self.close() def close(self):
"""Close the file, and for mode "w" and "a" write the ending
records."""
if self.fp is None:
return try:
if self.mode in ("w", "a") and self._didModify: # write ending records
pos1 = self.fp.tell()
for zinfo in self.filelist: # write central directory
dt = zinfo.date_time
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
extra = []
if zinfo.file_size > ZIP64_LIMIT \
or zinfo.compress_size > ZIP64_LIMIT:
extra.append(zinfo.file_size)
extra.append(zinfo.compress_size)
file_size = 0xffffffff
compress_size = 0xffffffff
else:
file_size = zinfo.file_size
compress_size = zinfo.compress_size if zinfo.header_offset > ZIP64_LIMIT:
extra.append(zinfo.header_offset)
header_offset = 0xffffffffL
else:
header_offset = zinfo.header_offset extra_data = zinfo.extra
if extra:
# Append a ZIP64 field to the extra's
extra_data = struct.pack(
'<HH' + 'Q'*len(extra),
1, 8*len(extra), *extra) + extra_data extract_version = max(45, zinfo.extract_version)
create_version = max(45, zinfo.create_version)
else:
extract_version = zinfo.extract_version
create_version = zinfo.create_version try:
filename, flag_bits = zinfo._encodeFilenameFlags()
centdir = struct.pack(structCentralDir,
stringCentralDir, create_version,
zinfo.create_system, extract_version, zinfo.reserved,
flag_bits, zinfo.compress_type, dostime, dosdate,
zinfo.CRC, compress_size, file_size,
len(filename), len(extra_data), len(zinfo.comment),
0, zinfo.internal_attr, zinfo.external_attr,
header_offset)
except DeprecationWarning:
print >>sys.stderr, (structCentralDir,
stringCentralDir, create_version,
zinfo.create_system, extract_version, zinfo.reserved,
zinfo.flag_bits, zinfo.compress_type, dostime, dosdate,
zinfo.CRC, compress_size, file_size,
len(zinfo.filename), len(extra_data), len(zinfo.comment),
0, zinfo.internal_attr, zinfo.external_attr,
header_offset)
raise
self.fp.write(centdir)
self.fp.write(filename)
self.fp.write(extra_data)
self.fp.write(zinfo.comment) pos2 = self.fp.tell()
# Write end-of-zip-archive record
centDirCount = len(self.filelist)
centDirSize = pos2 - pos1
centDirOffset = pos1
requires_zip64 = None
if centDirCount > ZIP_FILECOUNT_LIMIT:
requires_zip64 = "Files count"
elif centDirOffset > ZIP64_LIMIT:
requires_zip64 = "Central directory offset"
elif centDirSize > ZIP64_LIMIT:
requires_zip64 = "Central directory size"
if requires_zip64:
# Need to write the ZIP64 end-of-archive records
if not self._allowZip64:
raise LargeZipFile(requires_zip64 +
" would require ZIP64 extensions")
zip64endrec = struct.pack(
structEndArchive64, stringEndArchive64,
44, 45, 45, 0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset)
self.fp.write(zip64endrec) zip64locrec = struct.pack(
structEndArchive64Locator,
stringEndArchive64Locator, 0, pos2, 1)
self.fp.write(zip64locrec)
centDirCount = min(centDirCount, 0xFFFF)
centDirSize = min(centDirSize, 0xFFFFFFFF)
centDirOffset = min(centDirOffset, 0xFFFFFFFF) endrec = struct.pack(structEndArchive, stringEndArchive,
0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset, len(self._comment))
self.fp.write(endrec)
self.fp.write(self._comment)
self.fp.flush()
finally:
fp = self.fp
self.fp = None
if not self._filePassed:
fp.close()
zipfile
class ZipFile(object):
""" Class with methods to open, read, write, close, list zip files. z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=False) file: Either the path to the file, or a file-like object.
If it is a path, the file will be opened and closed by ZipFile.
mode: The mode can be either read "r", write "w" or append "a".
compression: ZIP_STORED (no compression) or ZIP_DEFLATED (requires zlib).
allowZip64: if True ZipFile will create files with ZIP64 extensions when
needed, otherwise it will raise an exception when this would
be necessary. """ fp = None # Set here since __del__ checks it def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False):
"""Open the ZIP file with mode read "r", write "w" or append "a"."""
if mode not in ("r", "w", "a"):
raise RuntimeError('ZipFile() requires mode "r", "w", or "a"') if compression == ZIP_STORED:
pass
elif compression == ZIP_DEFLATED:
if not zlib:
raise RuntimeError,\
"Compression requires the (missing) zlib module"
else:
raise RuntimeError, "That compression method is not supported" self._allowZip64 = allowZip64
self._didModify = False
self.debug = 0 # Level of printing: 0 through 3
self.NameToInfo = {} # Find file info given name
self.filelist = [] # List of ZipInfo instances for archive
self.compression = compression # Method of compression
self.mode = key = mode.replace('b', '')[0]
self.pwd = None
self._comment = '' # Check if we were passed a file-like object
if isinstance(file, basestring):
self._filePassed = 0
self.filename = file
modeDict = {'r' : 'rb', 'w': 'wb', 'a' : 'r+b'}
try:
self.fp = open(file, modeDict[mode])
except IOError:
if mode == 'a':
mode = key = 'w'
self.fp = open(file, modeDict[mode])
else:
raise
else:
self._filePassed = 1
self.fp = file
self.filename = getattr(file, 'name', None) try:
if key == 'r':
self._RealGetContents()
elif key == 'w':
# set the modified flag so central directory gets written
# even if no files are added to the archive
self._didModify = True
elif key == 'a':
try:
# See if file is a zip file
self._RealGetContents()
# seek to start of directory and overwrite
self.fp.seek(self.start_dir, 0)
except BadZipfile:
# file is not a zip file, just append
self.fp.seek(0, 2) # set the modified flag so central directory gets written
# even if no files are added to the archive
self._didModify = True
else:
raise RuntimeError('Mode must be "r", "w" or "a"')
except:
fp = self.fp
self.fp = None
if not self._filePassed:
fp.close()
raise def __enter__(self):
return self def __exit__(self, type, value, traceback):
self.close() def _RealGetContents(self):
"""Read in the table of contents for the ZIP file."""
fp = self.fp
try:
endrec = _EndRecData(fp)
except IOError:
raise BadZipfile("File is not a zip file")
if not endrec:
raise BadZipfile, "File is not a zip file"
if self.debug > 1:
print endrec
size_cd = endrec[_ECD_SIZE] # bytes in central directory
offset_cd = endrec[_ECD_OFFSET] # offset of central directory
self._comment = endrec[_ECD_COMMENT] # archive comment # "concat" is zero, unless zip was concatenated to another file
concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
if endrec[_ECD_SIGNATURE] == stringEndArchive64:
# If Zip64 extension structures are present, account for them
concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator) if self.debug > 2:
inferred = concat + offset_cd
print "given, inferred, offset", offset_cd, inferred, concat
# self.start_dir: Position of start of central directory
self.start_dir = offset_cd + concat
fp.seek(self.start_dir, 0)
data = fp.read(size_cd)
fp = cStringIO.StringIO(data)
total = 0
while total < size_cd:
centdir = fp.read(sizeCentralDir)
if len(centdir) != sizeCentralDir:
raise BadZipfile("Truncated central directory")
centdir = struct.unpack(structCentralDir, centdir)
if centdir[_CD_SIGNATURE] != stringCentralDir:
raise BadZipfile("Bad magic number for central directory")
if self.debug > 2:
print centdir
filename = fp.read(centdir[_CD_FILENAME_LENGTH])
# Create ZipInfo instance to store file information
x = ZipInfo(filename)
x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH])
x.comment = fp.read(centdir[_CD_COMMENT_LENGTH])
x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET]
(x.create_version, x.create_system, x.extract_version, x.reserved,
x.flag_bits, x.compress_type, t, d,
x.CRC, x.compress_size, x.file_size) = centdir[1:12]
x.volume, x.internal_attr, x.external_attr = centdir[15:18]
# Convert date/time code to (year, month, day, hour, min, sec)
x._raw_time = t
x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F,
t>>11, (t>>5)&0x3F, (t&0x1F) * 2 ) x._decodeExtra()
x.header_offset = x.header_offset + concat
x.filename = x._decodeFilename()
self.filelist.append(x)
self.NameToInfo[x.filename] = x # update total bytes read from central directory
total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH]
+ centdir[_CD_EXTRA_FIELD_LENGTH]
+ centdir[_CD_COMMENT_LENGTH]) if self.debug > 2:
print "total", total def namelist(self):
"""Return a list of file names in the archive."""
l = []
for data in self.filelist:
l.append(data.filename)
return l def infolist(self):
"""Return a list of class ZipInfo instances for files in the
archive."""
return self.filelist def printdir(self):
"""Print a table of contents for the zip file."""
print "%-46s %19s %12s" % ("File Name", "Modified ", "Size")
for zinfo in self.filelist:
date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6]
print "%-46s %s %12d" % (zinfo.filename, date, zinfo.file_size) def testzip(self):
"""Read all the files and check the CRC."""
chunk_size = 2 ** 20
for zinfo in self.filelist:
try:
# Read by chunks, to avoid an OverflowError or a
# MemoryError with very large embedded files.
with self.open(zinfo.filename, "r") as f:
while f.read(chunk_size): # Check CRC-32
pass
except BadZipfile:
return zinfo.filename def getinfo(self, name):
"""Return the instance of ZipInfo given 'name'."""
info = self.NameToInfo.get(name)
if info is None:
raise KeyError(
'There is no item named %r in the archive' % name) return info def setpassword(self, pwd):
"""Set default password for encrypted files."""
self.pwd = pwd @property
def comment(self):
"""The comment text associated with the ZIP file."""
return self._comment @comment.setter
def comment(self, comment):
# check for valid comment length
if len(comment) > ZIP_MAX_COMMENT:
import warnings
warnings.warn('Archive comment is too long; truncating to %d bytes'
% ZIP_MAX_COMMENT, stacklevel=2)
comment = comment[:ZIP_MAX_COMMENT]
self._comment = comment
self._didModify = True def read(self, name, pwd=None):
"""Return file bytes (as a string) for name."""
return self.open(name, "r", pwd).read() def open(self, name, mode="r", pwd=None):
"""Return file-like object for 'name'."""
if mode not in ("r", "U", "rU"):
raise RuntimeError, 'open() requires mode "r", "U", or "rU"'
if not self.fp:
raise RuntimeError, \
"Attempt to read ZIP archive that was already closed" # Only open a new file for instances where we were not
# given a file object in the constructor
if self._filePassed:
zef_file = self.fp
should_close = False
else:
zef_file = open(self.filename, 'rb')
should_close = True try:
# Make sure we have an info object
if isinstance(name, ZipInfo):
# 'name' is already an info object
zinfo = name
else:
# Get info object for name
zinfo = self.getinfo(name) zef_file.seek(zinfo.header_offset, 0) # Skip the file header:
fheader = zef_file.read(sizeFileHeader)
if len(fheader) != sizeFileHeader:
raise BadZipfile("Truncated file header")
fheader = struct.unpack(structFileHeader, fheader)
if fheader[_FH_SIGNATURE] != stringFileHeader:
raise BadZipfile("Bad magic number for file header") fname = zef_file.read(fheader[_FH_FILENAME_LENGTH])
if fheader[_FH_EXTRA_FIELD_LENGTH]:
zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) if fname != zinfo.orig_filename:
raise BadZipfile, \
'File name in directory "%s" and header "%s" differ.' % (
zinfo.orig_filename, fname) # check for encrypted flag & handle password
is_encrypted = zinfo.flag_bits & 0x1
zd = None
if is_encrypted:
if not pwd:
pwd = self.pwd
if not pwd:
raise RuntimeError, "File %s is encrypted, " \
"password required for extraction" % name zd = _ZipDecrypter(pwd)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# or the MSB of the file time depending on the header type
# and is used to check the correctness of the password.
bytes = zef_file.read(12)
h = map(zd, bytes[0:12])
if zinfo.flag_bits & 0x8:
# compare against the file type from extended local headers
check_byte = (zinfo._raw_time >> 8) & 0xff
else:
# compare against the CRC otherwise
check_byte = (zinfo.CRC >> 24) & 0xff
if ord(h[11]) != check_byte:
raise RuntimeError("Bad password for file", name) return ZipExtFile(zef_file, mode, zinfo, zd,
close_fileobj=should_close)
except:
if should_close:
zef_file.close()
raise def extract(self, member, path=None, pwd=None):
"""Extract a member from the archive to the current working directory,
using its full name. Its file information is extracted as accurately
as possible. `member' may be a filename or a ZipInfo object. You can
specify a different directory using `path'.
"""
if not isinstance(member, ZipInfo):
member = self.getinfo(member) if path is None:
path = os.getcwd() return self._extract_member(member, path, pwd) def extractall(self, path=None, members=None, pwd=None):
"""Extract all members from the archive to the current working
directory. `path' specifies a different directory to extract to.
`members' is optional and must be a subset of the list returned
by namelist().
"""
if members is None:
members = self.namelist() for zipinfo in members:
self.extract(zipinfo, path, pwd) def _extract_member(self, member, targetpath, pwd):
"""Extract the ZipInfo object 'member' to a physical
file on the path targetpath.
"""
# build the destination pathname, replacing
# forward slashes to platform specific separators.
arcname = member.filename.replace('/', os.path.sep) if os.path.altsep:
arcname = arcname.replace(os.path.altsep, os.path.sep)
# interpret absolute pathname as relative, remove drive letter or
# UNC path, redundant separators, "." and ".." components.
arcname = os.path.splitdrive(arcname)[1]
arcname = os.path.sep.join(x for x in arcname.split(os.path.sep)
if x not in ('', os.path.curdir, os.path.pardir))
if os.path.sep == '\\':
# filter illegal characters on Windows
illegal = ':<>|"?*'
if isinstance(arcname, unicode):
table = {ord(c): ord('_') for c in illegal}
else:
table = string.maketrans(illegal, '_' * len(illegal))
arcname = arcname.translate(table)
# remove trailing dots
arcname = (x.rstrip('.') for x in arcname.split(os.path.sep))
arcname = os.path.sep.join(x for x in arcname if x) targetpath = os.path.join(targetpath, arcname)
targetpath = os.path.normpath(targetpath) # Create all upper directories if necessary.
upperdirs = os.path.dirname(targetpath)
if upperdirs and not os.path.exists(upperdirs):
os.makedirs(upperdirs) if member.filename[-1] == '/':
if not os.path.isdir(targetpath):
os.mkdir(targetpath)
return targetpath with self.open(member, pwd=pwd) as source, \
file(targetpath, "wb") as target:
shutil.copyfileobj(source, target) return targetpath def _writecheck(self, zinfo):
"""Check for errors before writing a file to the archive."""
if zinfo.filename in self.NameToInfo:
import warnings
warnings.warn('Duplicate name: %r' % zinfo.filename, stacklevel=3)
if self.mode not in ("w", "a"):
raise RuntimeError, 'write() requires mode "w" or "a"'
if not self.fp:
raise RuntimeError, \
"Attempt to write ZIP archive that was already closed"
if zinfo.compress_type == ZIP_DEFLATED and not zlib:
raise RuntimeError, \
"Compression requires the (missing) zlib module"
if zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED):
raise RuntimeError, \
"That compression method is not supported"
if not self._allowZip64:
requires_zip64 = None
if len(self.filelist) >= ZIP_FILECOUNT_LIMIT:
requires_zip64 = "Files count"
elif zinfo.file_size > ZIP64_LIMIT:
requires_zip64 = "Filesize"
elif zinfo.header_offset > ZIP64_LIMIT:
requires_zip64 = "Zipfile size"
if requires_zip64:
raise LargeZipFile(requires_zip64 +
" would require ZIP64 extensions") def write(self, filename, arcname=None, compress_type=None):
"""Put the bytes from filename into the archive under the name
arcname."""
if not self.fp:
raise RuntimeError(
"Attempt to write to ZIP archive that was already closed") st = os.stat(filename)
isdir = stat.S_ISDIR(st.st_mode)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
# Create ZipInfo instance to store file information
if arcname is None:
arcname = filename
arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
while arcname[0] in (os.sep, os.altsep):
arcname = arcname[1:]
if isdir:
arcname += '/'
zinfo = ZipInfo(arcname, date_time)
zinfo.external_attr = (st[0] & 0xFFFF) << 16L # Unix attributes
if compress_type is None:
zinfo.compress_type = self.compression
else:
zinfo.compress_type = compress_type zinfo.file_size = st.st_size
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell() # Start of header bytes self._writecheck(zinfo)
self._didModify = True if isdir:
zinfo.file_size = 0
zinfo.compress_size = 0
zinfo.CRC = 0
zinfo.external_attr |= 0x10 # MS-DOS directory flag
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
self.fp.write(zinfo.FileHeader(False))
return with open(filename, "rb") as fp:
# Must overwrite CRC and sizes with correct data later
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
# Compressed size can be larger than uncompressed size
zip64 = self._allowZip64 and \
zinfo.file_size * 1.05 > ZIP64_LIMIT
self.fp.write(zinfo.FileHeader(zip64))
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -15)
else:
cmpr = None
file_size = 0
while 1:
buf = fp.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
self.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
if not zip64 and self._allowZip64:
if file_size > ZIP64_LIMIT:
raise RuntimeError('File size has increased during compressing')
if compress_size > ZIP64_LIMIT:
raise RuntimeError('Compressed size larger than uncompressed size')
# Seek backwards and write file header (which will now include
# correct CRC and file sizes)
position = self.fp.tell() # Preserve current position in file
self.fp.seek(zinfo.header_offset, 0)
self.fp.write(zinfo.FileHeader(zip64))
self.fp.seek(position, 0)
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo def writestr(self, zinfo_or_arcname, bytes, compress_type=None):
"""Write a file into the archive. The contents is the string
'bytes'. 'zinfo_or_arcname' is either a ZipInfo instance or
the name of the file in the archive."""
if not isinstance(zinfo_or_arcname, ZipInfo):
zinfo = ZipInfo(filename=zinfo_or_arcname,
date_time=time.localtime(time.time())[:6]) zinfo.compress_type = self.compression
if zinfo.filename[-1] == '/':
zinfo.external_attr = 0o40775 << 16 # drwxrwxr-x
zinfo.external_attr |= 0x10 # MS-DOS directory flag
else:
zinfo.external_attr = 0o600 << 16 # ?rw-------
else:
zinfo = zinfo_or_arcname if not self.fp:
raise RuntimeError(
"Attempt to write to ZIP archive that was already closed") if compress_type is not None:
zinfo.compress_type = compress_type zinfo.file_size = len(bytes) # Uncompressed size
zinfo.header_offset = self.fp.tell() # Start of header bytes
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = crc32(bytes) & 0xffffffff # CRC-32 checksum
if zinfo.compress_type == ZIP_DEFLATED:
co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -15)
bytes = co.compress(bytes) + co.flush()
zinfo.compress_size = len(bytes) # Compressed size
else:
zinfo.compress_size = zinfo.file_size
zip64 = zinfo.file_size > ZIP64_LIMIT or \
zinfo.compress_size > ZIP64_LIMIT
if zip64 and not self._allowZip64:
raise LargeZipFile("Filesize would require ZIP64 extensions")
self.fp.write(zinfo.FileHeader(zip64))
self.fp.write(bytes)
if zinfo.flag_bits & 0x08:
# Write CRC and file sizes after the file data
fmt = '<LQQ' if zip64 else '<LLL'
self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size,
zinfo.file_size))
self.fp.flush()
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo def __del__(self):
"""Call the "close()" method in case the user forgot."""
self.close() def close(self):
"""Close the file, and for mode "w" and "a" write the ending
records."""
if self.fp is None:
return try:
if self.mode in ("w", "a") and self._didModify: # write ending records
pos1 = self.fp.tell()
for zinfo in self.filelist: # write central directory
dt = zinfo.date_time
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
extra = []
if zinfo.file_size > ZIP64_LIMIT \
or zinfo.compress_size > ZIP64_LIMIT:
extra.append(zinfo.file_size)
extra.append(zinfo.compress_size)
file_size = 0xffffffff
compress_size = 0xffffffff
else:
file_size = zinfo.file_size
compress_size = zinfo.compress_size if zinfo.header_offset > ZIP64_LIMIT:
extra.append(zinfo.header_offset)
header_offset = 0xffffffffL
else:
header_offset = zinfo.header_offset extra_data = zinfo.extra
if extra:
# Append a ZIP64 field to the extra's
extra_data = struct.pack(
'<HH' + 'Q'*len(extra),
1, 8*len(extra), *extra) + extra_data extract_version = max(45, zinfo.extract_version)
create_version = max(45, zinfo.create_version)
else:
extract_version = zinfo.extract_version
create_version = zinfo.create_version try:
filename, flag_bits = zinfo._encodeFilenameFlags()
centdir = struct.pack(structCentralDir,
stringCentralDir, create_version,
zinfo.create_system, extract_version, zinfo.reserved,
flag_bits, zinfo.compress_type, dostime, dosdate,
zinfo.CRC, compress_size, file_size,
len(filename), len(extra_data), len(zinfo.comment),
0, zinfo.internal_attr, zinfo.external_attr,
header_offset)
except DeprecationWarning:
print >>sys.stderr, (structCentralDir,
stringCentralDir, create_version,
zinfo.create_system, extract_version, zinfo.reserved,
zinfo.flag_bits, zinfo.compress_type, dostime, dosdate,
zinfo.CRC, compress_size, file_size,
len(zinfo.filename), len(extra_data), len(zinfo.comment),
0, zinfo.internal_attr, zinfo.external_attr,
header_offset)
raise
self.fp.write(centdir)
self.fp.write(filename)
self.fp.write(extra_data)
self.fp.write(zinfo.comment) pos2 = self.fp.tell()
# Write end-of-zip-archive record
centDirCount = len(self.filelist)
centDirSize = pos2 - pos1
centDirOffset = pos1
requires_zip64 = None
if centDirCount > ZIP_FILECOUNT_LIMIT:
requires_zip64 = "Files count"
elif centDirOffset > ZIP64_LIMIT:
requires_zip64 = "Central directory offset"
elif centDirSize > ZIP64_LIMIT:
requires_zip64 = "Central directory size"
if requires_zip64:
# Need to write the ZIP64 end-of-archive records
if not self._allowZip64:
raise LargeZipFile(requires_zip64 +
" would require ZIP64 extensions")
zip64endrec = struct.pack(
structEndArchive64, stringEndArchive64,
44, 45, 45, 0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset)
self.fp.write(zip64endrec) zip64locrec = struct.pack(
structEndArchive64Locator,
stringEndArchive64Locator, 0, pos2, 1)
self.fp.write(zip64locrec)
centDirCount = min(centDirCount, 0xFFFF)
centDirSize = min(centDirSize, 0xFFFFFFFF)
centDirOffset = min(centDirOffset, 0xFFFFFFFF) endrec = struct.pack(structEndArchive, stringEndArchive,
0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset, len(self._comment))
self.fp.write(endrec)
self.fp.write(self._comment)
self.fp.flush()
finally:
fp = self.fp
self.fp = None
if not self._filePassed:
fp.close() ZipFile
tarfile
六、json & pickle
用于序列化的两个模块
- json,用于字符串 和 python数据类型间进行转换
- pickle,用于python特有的类型 和 python的数据类型间进行转换
Json模块提供了四个功能:dumps、dump、loads、load
pickle模块提供了四个功能:dumps、dump、loads、load
七、shelve
shelve模块是一个简单的k,v将内存数据通过文件持久化的模块,可以持久化任何pickle可支持的python数据格式
import shelve d = shelve.open('shelve_test') #打开一个文件 class Test(object):
def __init__(self,n):
self.n = n t = Test(123)
t2 = Test(123334) name = ["alex","rain","test"]
d["test"] = name #持久化列表
d["t1"] = t #持久化类
d["t2"] = t2 d.close()
八、xml
xml是实现不同语言或程序之间进行数据交换的协议,跟json差不多,但json使用起来更简单,不过,古时候,在json还没诞生的黑暗年代,大家只能选择用xml呀,至今很多传统公司如金融行业的很多系统的接口还主要是xml。
xml的格式如下,就是通过<>节点来区别数据结构的:
<?xml version="1.0"?>
<data>
<country name="Liechtenstein">
<rank updated="yes">2</rank>
<year>2008</year>
<gdppc>141100</gdppc>
<neighbor name="Austria" direction="E"/>
<neighbor name="Switzerland" direction="W"/>
</country>
<country name="Singapore">
<rank updated="yes">5</rank>
<year>2011</year>
<gdppc>59900</gdppc>
<neighbor name="Malaysia" direction="N"/>
</country>
<country name="Panama">
<rank updated="yes">69</rank>
<year>2011</year>
<gdppc>13600</gdppc>
<neighbor name="Costa Rica" direction="W"/>
<neighbor name="Colombia" direction="E"/>
</country>
</data>
xml协议在各个语言里的都 是支持的,在python中可以用以下模块操作xml
import xml.etree.ElementTree as ET tree = ET.parse("xmltest.xml")
root = tree.getroot()
print(root.tag) #遍历xml文档
for child in root:
print(child.tag, child.attrib)
for i in child:
print(i.tag,i.text) #只遍历year 节点
for node in root.iter('year'):(node.tag,node.text)
修改和删除xml文档内容
import xml.etree.ElementTree as ET tree = ET.parse("xmltest.xml")
root = tree.getroot() #修改
for node in root.iter('year'):
new_year = int(node.text) + 1
node.text = str(new_year)
node.set("updated","yes") tree.write("xmltest.xml") #删除node
for country in root.findall('country'):
rank = int(country.find('rank').text)
if rank > 50:
root.remove(country) tree.write('output.xml')
自己创建xml文档
import xml.etree.ElementTree as ET new_xml = ET.Element("namelist")
name = ET.SubElement(new_xml,"name",attrib={"enrolled":"yes"})
age = ET.SubElement(name,"age",attrib={"checked":"no"})
sex = ET.SubElement(name,"sex")
sex.text = ''
name2 = ET.SubElement(new_xml,"name",attrib={"enrolled":"no"})
age = ET.SubElement(name2,"age")
age.text = '' et = ET.ElementTree(new_xml) #生成文档对象
et.write("test.xml", encoding="utf-8",xml_declaration=True) ET.dump(new_xml) #打印生成的格式
九、PyYAML模块
Python也可以很容易的处理ymal文档格式,只不过需要安装一个模块,
十、configparser模块
用于生成和修改常见配置文档,当前模块的名称在 python 3.x 版本中变更为 configparser。
来看一个好多软件的常见文档格式如下
[DEFAULT]
ServerAliveInterval = 45
Compression = yes
CompressionLevel = 9
ForwardX11 = yes [bitbucket.org]
User = hg [topsecret.server.com]
Port = 50022
ForwardX11 = no
如果想用python生成一个这样的文档怎么做呢?
import configparser config = configparser.ConfigParser()
config["DEFAULT"] = {'ServerAliveInterval': '',
'Compression': 'yes',
'CompressionLevel': ''} config['bitbucket.org'] = {}
config['bitbucket.org']['User'] = 'hg'
config['topsecret.server.com'] = {}
topsecret = config['topsecret.server.com']
topsecret['Host Port'] = '' # mutates the parser
topsecret['ForwardX11'] = 'no' # same here
config['DEFAULT']['ForwardX11'] = 'yes'
with open('example.ini', 'w') as configfile:
config.write(configfile)
写完了还可以再读出来哈。
>>> import configparser
>>> config = configparser.ConfigParser()
>>> config.sections()
[]
>>> config.read('example.ini')
['example.ini']
>>> config.sections()
['bitbucket.org', 'topsecret.server.com']
>>> 'bitbucket.org' in config
True
>>> 'bytebong.com' in config
False
>>> config['bitbucket.org']['User']
'hg'
>>> config['DEFAULT']['Compression']
'yes'
>>> topsecret = config['topsecret.server.com']
>>> topsecret['ForwardX11']
'no'
>>> topsecret['Port']
''
>>> for key in config['bitbucket.org']: print(key)
...
user
compressionlevel
serveraliveinterval
compression
forwardx11
>>> config['bitbucket.org']['ForwardX11']
'yes'
configparser增删改查语法
[section1]
k1 = v1
k2:v2 [section2]
k1 = v1 import ConfigParser config = ConfigParser.ConfigParser()
config.read('i.cfg') # ########## 读 ##########
#secs = config.sections()
#print secs
#options = config.options('group2')
#print options #item_list = config.items('group2')
#print item_list #val = config.get('group1','key')
#val = config.getint('group1','key') # ########## 改写 ##########
#sec = config.remove_section('group1')
#config.write(open('i.cfg', "w")) #sec = config.has_section('wupeiqi')
#sec = config.add_section('wupeiqi')
#config.write(open('i.cfg', "w")) #config.set('group2','k1',11111)
#config.write(open('i.cfg', "w")) #config.remove_option('group2','age')
#config.write(open('i.cfg', "w"))
十一、hashlib模块
用于加密相关的操作,3.x里代替了md5模块和sha模块,主要提供 SHA1, SHA224, SHA256, SHA384, SHA512 ,MD5 算法
import hashlib m = hashlib.md5()
m.update(b"Hello")
m.update(b"It's me")
print(m.hexdigest())
m.update(b"It's been a long time since last time we ...") print(m.hexdigest()) #2进制格式hash
print(len(m.hexdigest())) #16进制格式hash
'''
def digest(self, *args, **kwargs): # real signature unknown
""" Return the digest value as a string of binary data. """
pass def hexdigest(self, *args, **kwargs): # real signature unknown
""" Return the digest value as a string of hexadecimal digits. """
pass '''
import hashlib # ######## md5 ######## hash = hashlib.md5()
hash.update('admin')
print(hash.hexdigest()) # ######## sha1 ######## hash = hashlib.sha1()
hash.update('admin')
print(hash.hexdigest()) # ######## sha256 ######## hash = hashlib.sha256()
hash.update('admin')
print(hash.hexdigest()) # ######## sha384 ######## hash = hashlib.sha384()
hash.update('admin')
print(hash.hexdigest()) # ######## sha512 ######## hash = hashlib.sha512()
hash.update('admin')
print(hash.hexdigest())
还不够吊?python 还有一个 hmac 模块,它内部对我们创建 key 和 内容 再进行处理然后再加密
import hmac
h = hmac.new('wueiqi')
h.update('hellowo')
print h.hexdigest()
更多关于md5,sha1,sha256等介绍的文章看这里https://www.tbs-certificates.co.uk/FAQ/en/sha256.html
十二、re
1.基本正则表达式元字符和语法
最常用的匹配语法
re.match 从头开始匹配
re.search 匹配包含
re.findall 把所有匹配到的字符放到以列表中的元素返回
re.splitall 以匹配到的字符当做列表分隔符
re.sub 匹配字符并替换
1、match(pattern, string, flags=0)
从起始位置开始根据模型去字符串中匹配指定内容,匹配单个
- 正则表达式
- 要匹配的字符串
- 标志位,用于控制正则表达式的匹配方式
import re obj = re.match('\d+', '123uuasf')
if obj:
print obj.group()
import re obj = re.findall('\d+', 'fa123uu888asf')
print obj
# flags
I = IGNORECASE = sre_compile.SRE_FLAG_IGNORECASE # ignore case
L = LOCALE = sre_compile.SRE_FLAG_LOCALE # assume current 8-bit locale
U = UNICODE = sre_compile.SRE_FLAG_UNICODE # assume unicode locale
M = MULTILINE = sre_compile.SRE_FLAG_MULTILINE # make anchors look for newline
S = DOTALL = sre_compile.SRE_FLAG_DOTALL # make dot match newline
X = VERBOSE = sre_compile.SRE_FLAG_VERBOSE # ignore whitespace and comments flags
2、search(pattern, string, flags=0)
根据模型去字符串中匹配指定内容,匹配单个
import re obj = re.search('\d+', 'u123uu888asf')
if obj:
print obj.group()
3、group和groups
a = "123abc456"
print re.search("([0-9]*)([a-z]*)([0-9]*)", a).group() print re.search("([0-9]*)([a-z]*)([0-9]*)", a).group(0)
print re.search("([0-9]*)([a-z]*)([0-9]*)", a).group(1)
print re.search("([0-9]*)([a-z]*)([0-9]*)", a).group(2) print re.search("([0-9]*)([a-z]*)([0-9]*)", a).groups()
4、findall(pattern, string, flags=0)
上述两中方式均用于匹配单值,即:只能匹配字符串中的一个,如果想要匹配到字符串中所有符合条件的元素,则需要使用 findall。
import re obj = re.findall('\d+', 'fa123uu888asf')
print obj
5、sub(pattern, repl, string, count=0, flags=0)
用于替换匹配的字符串
content = "123abc456"
new_content = re.sub('\d+', 'sb', content)
# new_content = re.sub('\d+', 'sb', content, 1)
print new_content
相比于str.replace功能更加强大
6、split(pattern, string, maxsplit=0, flags=0)
根据指定匹配进行分组
content = "'1 - 2 * ((60-30+1*(9-2*5/3+7/3*99/4*2998+10*568/14))-(-4*3)/(16-3*2) )'"
new_content = re.split('\*', content)
# new_content = re.split('\*', content, 1)
print new_content
content = "'1 - 2 * ((60-30+1*(9-2*5/3+7/3*99/4*2998+10*568/14))-(-4*3)/(16-3*2) )'"
new_content = re.split('[\+\-\*\/]+', content)
# new_content = re.split('\*', content, 1)
print new_content
inpp = '1-2*((60-30 +(-40-5)*(9-2*5/3 + 7 /3*99/4*2998 +10 * 568/14 )) - (-4*3)/ (16-3*2))'
inpp = re.sub('\s*','',inpp)
new_content = re.split('\(([\+\-\*\/]?\d+[\+\-\*\/]?\d+){1}\)', inpp, 1)
print new_content
相比于str.split更加强大
实例:计算器源码