os和csv相关的问题随手记

os模块获取当前路径

import os

BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
print(BASE_DIR)

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
print(BASE_DIR)

BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
print(BASE_DIR)

BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
print(BASE_DIR)

我使用这个模块读取pycharm中的csv文件

pycharm中生成requirements.txt文件

pip install pipreqs
pipreqs .
报错的话,指定编码格式
pipreqs . --encoding=utf8 --force

csv的读取和写入

读取:
csv_file = csv.reader(open(file_name, "r"))
写入:
f = open('failed.csv', 'w', newline='', encoding="utf-8")
writer = csv.writer(f)
for failed in failed_list:
writer.writerow(failed)
f.close()
newline是去掉两行之间的空行

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import csv
import re
import os
import time
import requests
from getcookie import get_cookie

BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__)))

def read_csv():
    '''
    读取文件夹下的csv文件,并输出去重后的td列表
    :return:
    '''
    td_list = []
    file_list = []
    ls_dir = os.listdir(BASE_DIR)
    for file_name in ls_dir:
        try:
            if file_name.split(".")[1] == "csv":
                file_list.append(file_name)
        except:
            pass
    # 再次循环
    for file_name in file_list:
        csv_file = csv.reader(open(file_name, "r"))
        for item in csv_file:
            try:
                td_list.append(re.findall("\d{13}",item[1])[0])
            except:
                pass
    return list(set(td_list))

def deal_(td_lists):
    '''
    用于处理td,找到非确认关闭的td
    :param td_lists:
    :return:
    '''
    failed_list = []

    for td_id in td_lists:
        ts = str(int(time.time()*1000))
        url = "http://td.sangfor.com/api/v1/defect/es/es_search_by_fields?_t=" + ts

        data = {
            "product_id": 10010,
            "page_no": 1,
            "page_size": 25,
            "sort_by": {
                "key": "asc"
            },
            "query_conditions": {
                "and": {
                    "fields": {}
                },
                "or": {
                    "fields": {
                        "key": td_id
                    }
                },
                "not": {
                    "fields": {}
                },
                "cycle_analysis": {},
                "handle_by_self": False
            }
        }
        headers = {
          'Connection': 'keep-alive',
          'Pragma': 'no-cache',
          'Cache-Control': 'no-cache',
          'Accept': 'application/json, text/plain, */*',
          'PRODUCT-ID': '10010',
          'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36',
          'Content-Type': 'application/json;charset=UTF-8',
          'Origin': 'http://td.sangfor.com',
          'Referer': 'http://td.sangfor.com/',
          'Accept-Language': 'zh-CN,zh;q=0.9',
          'Cookie': 'ep_jwt_token=' + rule_cookies
        }

        try:
            ret = requests.post(url, headers=headers, json=data).json()
        except:
            print("=============没获取到数据,判断cookie是否已失效===========")

        for fields in ret['data']['rows']:
            state_tag = fields['fields']['status_tag']['id']
            state_name = fields['fields']['status_tag']['name']
            version_name = fields['fields']['version']['name']
            # 当状态为解决关闭、非问题关闭和重复问题关闭时结束
            if state_tag in [10009,10010,10011]:
                continue
            else:
                print("=============当前td为{},状态为{}===========".format(td_id, state_name))
                failed_id = []
                failed_id.append(td_id)
                failed_id.append(state_name)
                failed_id.append(version_name)
            failed_list.append(failed_id)
    print("=============总共找到{}条非正常关闭的===========".format(len(failed_list)))
    return failed_list

def write_csv(failed_list):
    '''
    写入csv
    :param failed_list:
    :return:
    '''
    f = open('failed.csv', 'w', newline='', encoding="utf-8")
    writer = csv.writer(f)
    for failed in failed_list:
        writer.writerow(failed)
    f.close()

if __name__ == '__main__':
    rule_cookies = get_cookie()
    if rule_cookies:
        td_list = read_csv()
        failed_list = deal_(td_list)
        write_csv(failed_list)
上一篇:CSS Table(表格)


下一篇:Python采集天天基金数据,帮你掌握基金最新动向