os模块获取当前路径
import os
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
print(BASE_DIR)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
print(BASE_DIR)
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
print(BASE_DIR)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
print(BASE_DIR)
我使用这个模块读取pycharm中的csv文件
pycharm中生成requirements.txt文件
pip install pipreqs
pipreqs .
报错的话,指定编码格式pipreqs . --encoding=utf8 --force
csv的读取和写入
读取: csv_file = csv.reader(open(file_name, "r"))
写入:f = open('failed.csv', 'w', newline='', encoding="utf-8")
writer = csv.writer(f)
for failed in failed_list:
writer.writerow(failed)
f.close()
newline是去掉两行之间的空行
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import csv
import re
import os
import time
import requests
from getcookie import get_cookie
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__)))
def read_csv():
'''
读取文件夹下的csv文件,并输出去重后的td列表
:return:
'''
td_list = []
file_list = []
ls_dir = os.listdir(BASE_DIR)
for file_name in ls_dir:
try:
if file_name.split(".")[1] == "csv":
file_list.append(file_name)
except:
pass
# 再次循环
for file_name in file_list:
csv_file = csv.reader(open(file_name, "r"))
for item in csv_file:
try:
td_list.append(re.findall("\d{13}",item[1])[0])
except:
pass
return list(set(td_list))
def deal_(td_lists):
'''
用于处理td,找到非确认关闭的td
:param td_lists:
:return:
'''
failed_list = []
for td_id in td_lists:
ts = str(int(time.time()*1000))
url = "http://td.sangfor.com/api/v1/defect/es/es_search_by_fields?_t=" + ts
data = {
"product_id": 10010,
"page_no": 1,
"page_size": 25,
"sort_by": {
"key": "asc"
},
"query_conditions": {
"and": {
"fields": {}
},
"or": {
"fields": {
"key": td_id
}
},
"not": {
"fields": {}
},
"cycle_analysis": {},
"handle_by_self": False
}
}
headers = {
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'Accept': 'application/json, text/plain, */*',
'PRODUCT-ID': '10010',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36',
'Content-Type': 'application/json;charset=UTF-8',
'Origin': 'http://td.sangfor.com',
'Referer': 'http://td.sangfor.com/',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cookie': 'ep_jwt_token=' + rule_cookies
}
try:
ret = requests.post(url, headers=headers, json=data).json()
except:
print("=============没获取到数据,判断cookie是否已失效===========")
for fields in ret['data']['rows']:
state_tag = fields['fields']['status_tag']['id']
state_name = fields['fields']['status_tag']['name']
version_name = fields['fields']['version']['name']
# 当状态为解决关闭、非问题关闭和重复问题关闭时结束
if state_tag in [10009,10010,10011]:
continue
else:
print("=============当前td为{},状态为{}===========".format(td_id, state_name))
failed_id = []
failed_id.append(td_id)
failed_id.append(state_name)
failed_id.append(version_name)
failed_list.append(failed_id)
print("=============总共找到{}条非正常关闭的===========".format(len(failed_list)))
return failed_list
def write_csv(failed_list):
'''
写入csv
:param failed_list:
:return:
'''
f = open('failed.csv', 'w', newline='', encoding="utf-8")
writer = csv.writer(f)
for failed in failed_list:
writer.writerow(failed)
f.close()
if __name__ == '__main__':
rule_cookies = get_cookie()
if rule_cookies:
td_list = read_csv()
failed_list = deal_(td_list)
write_csv(failed_list)