在接下来的章节中,我会重点介绍一下我自己写的基于之前做python数据分析的打包接口文件common_lib,可以认为是专用于python的第三方支持库。common_lib目前包括文件操作、时间操作、excel接口操作、数据库接口、邮件接口。这些打包接口的作用就像是堆积木一样,把积木的主要模块都搭好了,仅需要大家按照自己的想法把它们拼接在一起堆出你的理想城堡!
上面是common_lib的的文件目录结构,还处于不断的修改和完善的过程中,相信后续会有更多实用的接口文件会补充进来。命名方式参考之前写的《python的统一编码规范》,根据名字可以基本知道各文件对应的功能接口。
在本篇文章中我们将会接受文件和时间相关的操作接口,对应的是和io_datetime。
io_file
这个文件引用了一个标准模块,就是os。标准模块的意思就是不需要再下载安装的,python自带的接口文件。
python os模块包括了操作系统的基础功能,更为重要的是,如果希望编写的python程序跨平台运行的话,os模块尤其适合你。对这个模块感兴趣的同学可以敲以下两行代码进一步学习,不会让你失望的:
import os
print help(os)
好,我们进入主题。
在io_file我主要添加了在数据分析中常用到的对文件的处理,下面是函数介绍。
def has_file(filename): #判断是否存在给定的filename的文件 def is_file(filename): #判断给定filename是否是一个文件 def get_file_name(filename):#获得给定的filename的文件名 def get_file_type(filename): #获得给定的filename的类型 def get_curr_dir(): #获得当前的程序运行的路径 def get_files(path): #获得给定路径的所有文件名 def get_curr_files(dir=None):#获得当前路径中的所有文件名 def get_file_size(filename): #获得某个文件的大小,不常用 def delete_file(filename): #删除指定filename def readlines_file(file_name, flag=None): #按照逐行的方式读取文件的所有内容 def writelines_file(file_name, data, flag=None):#按照逐行的方式写入所有内容。 def cmd_open(filename):#打开文件或者路径。
上面的filename的定义是获取到的文件的绝对路径,例如“D:\temp.txt”这样。
文件读写
下面重点介绍读写文件的两个函数。
关于python的文本文件读写(仅针对文本)分别有三种:
读文件:
read():以字节的形式读取整个文件,并输出字符串,包括换行符。
readline():读取一行内容,输出字符串
readlines():读取每一行内容,并输出以行数组。
写文件:write()、writerline()、writelines()三个函数和读文件的函数类似。
在我的文件读写接口中采用的是readlines()和writelines(),原因是它们以数组返回结果(这算什么原因??/(ㄒoㄒ)/~~)
以下是读文件的源码:
def readlines_file(file_name, flag=None):
if not flag:
flag="r"
try:
with open(file_name, flag) as f:
data = f.readlines()
return data
except Exception, e:
print "readlines_file:", str(e)
# traceback.print_exc()
return False
输入的参数有两个:file_name 选择读入的文件名,flag是读写标记默认为空。如果成功返回文件数据,否则返回False。
with open() as ..这种用法是Pythonic风格的代码。这种用法会除了让代码更简洁以外,还有其他好处。更多关于Pythonic的讲解可以参阅书籍《编写高质量代码-改善Python程序的91个建议》。
关于读写标记的说明,读我默认是只读”r”,写是以追加的标记写”a+”,关于读写的标记,我从网上摘录了一段,忘了出处了:
关于文件的读写模式的说明:
r 打开只读文件,该文件必须存在。
r+ 打开可读写的文件,该文件必须存在。
w 打开只写文件,若文件存在则文件长度清为0,即该文件内容会消失。若文件不存在则建立该文件。
w+ 打开可读写文件,若文件存在则文件长度清为零,即该文件内容会消失。若文件不存在则建立该文件。
a 以附加的方式打开只写文件。若文件不存在,则会建立该文件,如果文件存在,写入的数据会被加到文件尾,即文件原先的内容会被保留。
a+ 以附加方式打开可读写的文件。若文件不存在,则会建立该文件,如果文件存在,写入的数据会被加到文件尾后,即文件原先的内容会被保留。
上述的形态字符串都可以再加一个b字符,如rb、w+b或ab+等组合,加入b 字符用来告诉函数库打开的文件为二进制文件,而非纯文字文件。不过在POSIX系统,包含Linux都会忽略该字符。
+同时读写,即可读又可写,边写边读、边读边写,不用flush,用seek 和 tell可测得。
写的代码与读类似,就不再赘述:
def writelines_file(file_name, data, flag=None):
if not flag:
flag="a+"
try:
with open(file_name, flag) as f:
f.writelines(data)
return True
except Exception, e:
print "writelines_file:", str(e)
# traceback.print_exc()
return False
io_datetime
在日期时间(就这样翻译datetime是不是太丑了??)接口文件中,主要通过解决下面两个问题来满足数据分析的需要
1.解决日期类型之前转换的问题,主要是string类型、datetime类型、time类型三种类型之间变量类型的转换。
2.解决两个日期变量的间隔时长,包括工作日差等。
第一个问题目前来看主要是由于excel数据源的问题,excel单元格中存储时间数据的格式可能是文本格式的,也有可能是时间或者日期格式的。第二个问题主要是因为在分析工作中需要计算服务SLA的情况,因此需要计算日期差和工作日差。
在io_datetime中需要导入的标准库有time、datetime和re以及第三方库dateutil,都是用来处理日期时间格式。
def get_now(pattern=None): #获得当前日期时间,pattern参数是设置格式默认是'%Y-%m-%d %H:%M:%S',输出字符串 def str2date(date, pattern):#将字符串转换成datetime格式,参数是字符串的格式 def str2date_parse(date_str):#利用dateutil库来将字符串转换成datetime,好处是不需要输入格式 def time2str(time1, pattern):#把time格式的转换成字符串 def get_date_pattern(date1):#利用正则表达式把常见的日期时间格式提取出来 def date2str(date1, pattern=None):#将datetime格式转换成字符串 def str2time(date1, pattern=None):#将字符串格式转换成time格式
上述的函数是用来处理第一个问题的。对于获取日期时间格式的函数,我着重讲一下,发现目前还没人做这方面的工作:
def get_date_pattern(date1):
date1 = str(date1).strip()
if re.match(r"\d\d\d\d-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}", date1):
return "%Y-%m-%d %H:%M:%S"
elif re.match(r"\d\d\d\d-\d{1,2}-\d{1,2}", date1):
return "%Y-%m-%d"
elif re.match(r"\d\d\d\d/\d{1,2}/\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}", date1):
return "%Y/%m/%d %H:%M:%S"
elif re.match(r"\d\d\d\d/\d{1,2}/\d{1,2}", date1):
return "%Y/%m/%d"
elif re.match(r"\d\d/\d\d/\d\d \d{1,2}:\d{1,2}:\d{1,2}", date1):
return "%m/%d/%y %H:%M:%S"
elif re.match(r"\d\d/\d\d/\d\d", date1):
return "%m/%d/%y"
else:
return None
通过get_date_pattern就可以把excel非中文的日期时间格式覆盖了。中文日期时间格式的如果有需要也可以补充进来。
下面是解决第2个问题的处理函数:
def subtract_date(date1, date2, days=None, seconds=None): #获取任意两个date之间的相差的天数或者秒数(由参数决定) def subtract_work_day(date1, date2): #获取任意两个日期之间相差的工作日(9:00~17:00)天数,相差的小时数同样计入差额.
对于工作日做差的计算的计算方法,我之前想在网上搜索一下已有的算法,没有找到。我就根据现有的计算逻辑实现了接口算法。逻辑如下:
1.工作日的范围是9:00~17:00。节假日和工作日定义按照国家的做法定义(国务院放假办12月会发布下一年的放假安排)。除了已经被定义为工作日的天数以外,其余皆算为非工作日。因此两个日期的天数之差应该排除中间的非工作日天数。
2.小时数差的话,先将两个日期进行归一化和转义,得出两个日期的时间肯定在[9:00,17:00]这个区间。然后在区间内进行计算。
具体的实现代码如下,有点复杂:
def subtract_work_day(date1, date2):
"""
do the subtract operation between two working dates, ignore the weekends,get day increment
:param date1:input date1
:param date2:input date2
:return:the time diff (value as a day) between date1 and date2
""" try:
d1 = str2date_parse(date1)
d2 = str2date_parse(date2)
maxDate = max([d1, d2])
#把跨天的日期拉回来
if maxDate.hour < 9:
maxDate = str2date(date2str(maxDate + datetime.timedelta(days=-1), "%Y-%m-%d")
+ " 17:00:00", '%Y-%m-%d %H:%M:%S')
minDate = min([d1, d2])
#将时间归一到[9:00~17:00]这个区间
maxDate = transfer(maxDate)
minDate = transfer(minDate)
#间隔的天数
total = (maxDate - minDate).days
wk = 0
if total >= 1:
while (minDate < maxDate):
if is_weekend(minDate):
wk += 1
minDate = minDate + datetime.timedelta(days=1)
result = total - wk
if result < 0:
result = 0
minDate = transfer(min([d1, d2]))
max_hour = maxDate.hour
min_hour = minDate.hour
max_minute = maxDate.minute
min_minute = minDate.minute
#确保不是同一天
if date2str(maxDate, "%Y-%m-%d") != date2str(minDate, "%Y-%m-%d"):
if is_weekend(maxDate) and is_weekend(minDate):
temp=0
elif is_weekend(minDate):
temp=1.0*(max_hour-9)/8
elif is_weekend(maxDate):
temp=1.0*(17-min_hour)/8
else:
if min_hour > max_hour or (max_hour == min_hour and min_minute > max_minute):
temp = min_hour - max_hour
if max_hour == min_hour:
temp += (min_minute - max_minute) // 60
temp = 1 - 1.0 * temp / 8
else:
temp = max_hour - min_hour
if max_hour == min_hour:
temp += (max_minute - min_minute) // 60
temp = 1.0 * temp / 8
else:
#如果是同一天,max_date的小时分钟数肯定大于min_date的小时分钟数
if is_weekend(maxDate):
temp=0
else:
temp = max_hour - min_hour
if max_hour == min_hour:
temp += (max_minute - min_minute) // 60
temp = 1.0 * temp / 8
result += temp
return result
except Exception, e:
print "ERROR IN subtract_date_working: " + str(e)
traceback.print_exc()
return None def is_weekend(date1):
#they are not weekends as defined, although they are not week days
wrong_wk = ['2016/02/06', '2016/02/14', '2016/06/12', '2016/09/18', '2016/10/08',
'2016/10/09']
#they are weekends as defined, although they are week days
right_wk = ['2015/10/01', '2015/10/02', '2015/10/05', '2015/10/06', '2015/10/07',
'2016/01/01', '2016/02/08', '2016/02/09', '2016/02/10', '2016/02/11',
'2016/02/12', '2016/04/04', '2016/05/02', '2016/06/09', '2016/06/10',
'2016/09/15', '2016/09/16', '2016/10/03', '2016/10/04', '2016/10/05',
'2016/10/06', '2016/10/07']
if date2str(date1, "%Y/%m/%d") in wrong_wk:
return False
elif date2str(date1, "%Y/%m/%d") in right_wk:
return True
else:
wd = date1.weekday()
if wd == 5 or wd == 6:
return True
return False def transfer(date1):
if date1.hour >= 17:
return str2date(date2str(date1, "%Y-%m-%d") + " 17:00:00", '%Y-%m-%d %H:%M:%S')
elif date1.hour < 9:
return str2date(date2str(date1, "%Y-%m-%d") + " 09:00:00", '%Y-%m-%d %H:%M:%S')
else:
return date1
需要源代码的同学请加我微博@cec_Fudan 私信留言吧。
本文原创,转载请征得作者同意,保留所有权利。