-> filter过滤:
list(filter(lambda x: x[0].find('tmp') == -1, table_temp_r))
-> 自定义map:
def map_format(x):
if(x.find('d_f_') == -1):
return 'd_f_artemis{}_{}'.format(data_f,x)
else:
return x
return list(map(lambda x: map_format(x), tables_p))
-> 字符串EL表达式:
"use {}".format(prod_base)
-> 列表、字典不为空的判断:
if l == []: if m == {}:
-> 字典中查找是否存在key:
test = {}
if 'key' in test.keys():
if 'key' in test:
-> 异常处理:
try:
out_engine.execute("drop table if exists field_diff_database")
out_engine.execute("drop table if exists table_diff_database")
except Exception as e:
raise e
-> 获取时间:
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) #time.time() 是时间戳; time.localtime() 获取时间结构体; time.strftime() 格式化时间
-> IO处理:
pro_file = open(self.filename, 'Ur')
for line in pro_file.readlines():
line = line.strip().replace('\n', '')
字符串切分: 直接用[]
if line.find("#")!=-1:
line=line[0:line.find('#')]
获取文件地址:
path = os.getcwd()
file_path = os.path.join(path,'prod.cfg')
pandas:
data = {'a':[1,2,3],
'c':[4,5,6],
'b':[7,8,9]
}
frame = pd.DataFrame(data,index=['6','7','8'])
frame.iloc[0] # 获取第一行数据, 根据数据的list索引获取行数据;
frame.loc['6'] # 根据数据的pd索引获取行数据;
frame.a # 获取列
frame['a'] # 获取列
frame['a'].iloc[0] = 2 # 改变第a列, 第1行的数据;
result_df = pd.read_sql(result_sql, test_engine)
result_df.sort_values(by='branch_company',axis=0,ascending=True,inplace=True) # axis = 0 按列排序
result_df.to_sql('result_test',test_engine,index=False,if_exists='replace')
部署生产:
if __name__ == '__main__':
main_()
vim cron.txt
*/1 * * * * /data/anaconda/bin/python /home/hadoop/python_task/python_prod/prod.py > /home/hadoop/python_task/python_prod/result.log 2>&1
crontab -r / -l/ cron.txt
#crontab 不能使用os.getcwd() 不准确
path = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(path,'data/prod.cfg')
打包成二进制文件:
# 打包部署
import compileall
compileall.compile_dir(r'D:\python_prod')
# .pyc文件生成在__pycache__目录下
# 进入生产环境, 在相同的python版本下, python prod.pyc 直接运行即可。
需要安装的包:
conda install pymysql
pip install pyspark==2.3.2