python GISAID 网站爬虫实践:fasta文件获取

python GISAID 网站爬虫实践:fasta文件获取

 

临近毕业,老师交给我的爬虫任务已经结束。反正留在电脑里也没用,不如发出来造福大家。

 

简介:

GISAID是流行病毒共享组织,网址“www.gisaid.org”,服务器位于美国洛杉矶、旧金山等地, 需要教育邮箱注册账号才能访问其数据。它的网站比较老旧,其中的病例数据、DNA序列文件有单次获取量限制,并且网站管理员从不回复任何消息,最关键的是,网站代码是动态的、并且有验证码,所以爬虫初学者往往很难在几周之内完成这个爬虫。为了获取这些宝贵的信息,我们学院的老师和学生也是焦头烂额。

无论如何,最终我写好了fasta文件和病例信息自动化获取的程序,发布在博客园这里。(估计也没人会转载吧,想拿就随便拿去吧)

 

使用步骤:

1.安装好python, 将python安装路径加入系统路径Path中

2.同时按住win徽标键 + R键,输入cmd打开DOS窗口,

  依次输入命令:

pip install selenium
pip install ctypes

 安装selenium 和ctypes软件包

3.百度webdriver Chrome ,找到和电脑上安装的Chrome版本相同的webdriver(chromedriver.exe),下载到任意文件夹内

4.依次复制下面两段代码,分别粘贴到chromedriver.exe所在文件夹的两个空白txt文档中, 重命名为xxx.py(xxx代表随便取名), 双击第一段代码所在文件,在弹出界面输入必要信息,即可开始爬虫。

 

用户界面代码(点击这段代码所在.py文件开始):

python GISAID 网站爬虫实践:fasta文件获取
__author__ = 'cquxiaoy'
from tkinter import *
import ctypes
import download_fasta_new as dt
from selenium import webdriver
import time
from threading import Thread


def check():
    # 监测主程序的状态
    while True:
        if dt.signal:
            dt.signal = False
        else:
            dt.if_terminate = True
            print('长时间未响应,即将重启')
        time.sleep(3000)


def _start():
    start_date_str, end_date_str = e1.get(), e2.get()
    dt.adpath = '进度'+ start_date_str + end_date_str +'.txt'

    for i in range(100):
        print('第%d次启动'%(i+1))
        if dt.if_done:
            break
        elif dt.if_terminate:
            dt.d.close()
            dt.d = webdriver.Chrome()
        dt.if_terminate = False
        dt.sms_start_date = start_date_str
        dt.sms_end_date = end_date_str
        try:
            Thread(target=check, daemon=True).start()
            dt.start()
        except Exception as e:
            print('出错:', e)


def start():
    Thread(target=_start, daemon=True).start()
    r.withdraw()


font = 'weiruanyahei 20'
ctypes.windll.shcore.SetProcessDpiAwareness(True)
r = Tk('gisaid fasta数据爬虫')
Label(r, text='账号:', font=font).grid(row=0, column=0)
Label(r, text='密码:', font=font).grid(row=1, column=0)
en, ep = Entry(r, width=11, font=font), Entry(r, width=11, font=font)
en.grid(row=0, column=1, columnspan=2)
ep.grid(row=1, column=1, columnspan=2)

Label(r, text='Submission Date:', font=font).grid(row=2, columnspan=3)
Label(r, text='to', font=font).grid(row=3, column=1)
Button(r, text='开始', command=start, font=font).grid(row=4, columnspan=3)

e1, e2 = Entry(r, width=11, font=font), Entry(r, width=11, font=font)
e1.grid(row=3, column=0)
e2.grid(row=3, column=2)
e1.insert(0, '2022-02-19')
e2.insert(0, '2022-02-20')
Label(r, text='日期仅支持范例的格式\n日期不变的话,程序能断点续下\n有问题请加QQ1977649208', font='weiruanyahei 16').grid(row=5, columnspan=3)

r.mainloop()
用户界面段代码

爬虫主程序(粘贴在同一目录下的.py文件中即可):

python GISAID 网站爬虫实践:fasta文件获取
import datetime
from datetime import *
from ast import literal_eval
import random
import os
import time
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By

iframe_xpath = '/html/body/iframe'
clt_date_1 = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[1]/div[4]/table/tbody/tr/td[2]/table/tbody/tr/td[' \
             '1]/div/div[1]/input '
clt_date_2 = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[1]/div[4]/table/tbody/tr/td[2]/table/tbody/tr/td[' \
             '3]/div/div[1]/input '
sms_date_1 = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[1]/div[4]/table/tbody/tr/td[2]/table/tbody/tr/td[' \
             '5]/div/div[1]/input '
sms_date_2 = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[1]/div[4]/table/tbody/tr/td[2]/table/tbody/tr/td[' \
             '7]/div/div[1]/input '
select_all = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[2]/div[1]/div[2]/table/thead/tr/th[1]/div/span/input'
download = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[2]/div[2]/div[3]/button[3]'
download2 = '/html/body/form/div[5]/div/div[2]/div/div/div[2]/div/button'

# login_name = 'SunCe'
# login_password = 'GZ4Tzcr9'
# 如果你找到了这里,这个账号就拿去用吧,反正我也不需要了

signal = True
if_terminate = False
if_done = False


def send(xpath, msg):
    ipt = d.find_element_by_xpath(xpath)
    ipt.clear()
    if msg != '':
        ipt.send_keys(msg + '\n')


def Click(d, element_xpath):
    for i in range(20):
        try:
            bt = d.find_element_by_xpath(element_xpath)
            bt.click()
            return bt.text
        except Exception as e:
            pass
            # print('点击失败')
        time.sleep(0.5)


def Send(d, element_xpath, msg):
    for i in range(20):
        try:
            bt = d.find_element_by_xpath(element_xpath)
            bt.clear()
            bt.send_keys(msg)
            break
        except Exception as e:
            pass
            # print('点击失败')
        time.sleep(0.5)


def Keep(exe, times=20):
    #  最大次数默认为无限
    #  保持尝试直到成功完成
    counter = 0
    global signal
    while True:
        signal = True
        counter += 1
        try:
            if (counter < times) or (times <= 0):
                exe()
                break
            else:
                break
        except Exception as e:
            pass
            # print('Keep--',str(exe.__name__),':失败-',counter)

        time.sleep(0.5)


def SwitchToIframe(d):
    def s1():
        WebDriverWait(d, 3, 0.5).until(EC.presence_of_element_located((By.XPATH, iframe_xpath)))
        iframe = d.find_element_by_xpath(iframe_xpath)
        d.switch_to.frame(iframe)

    Keep(s1)


def SwitchToDefault(d):
    def s2():
        d.switch_to.default_content()

    Keep(s2)


def Select_all():
    sela = d.find_element_by_xpath(select_all)
    sela.click()


def FindPatientNums(d):
    #  返回项目总数
    total_num_xpath = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[2]/div[2]/div[1]/span/span'
    total_num_str = d.find_element_by_xpath(total_num_xpath).text
    #
    total_num = total_num_str[7:-8].replace(',', '')
    # print('共',total_num,'个病例')
    return int(total_num)


def GetWeb(d):
    d.get('https://www.epicov.org/epi3/frontend')
    time.sleep(1)


def Login(d):
    login_xpath = '/html/body/form/div[5]/div/div[2]/div/div/div/div[1]/div/div/div[2]/input[1]'
    password_xpath = '/html/body/form/div[5]/div/div[2]/div/div/div/div[1]/div/div/div[2]/input[2]'
    login_bt_xpath = '/html/body/form/div[5]/div/div[2]/div/div/div/div[1]/div/div/div[2]/input[3]'

    Send(d, login_xpath, login_name)
    Send(d, password_xpath, login_password)
    Click(d, login_bt_xpath)
    print('Login')


def ClickSearch(d):
    search_bt = '/html/body/form/div[5]/div/div[2]/div/div[1]/div/div/div[3]'
    WebDriverWait(d, 3, 0.5).until(EC.presence_of_element_located((By.XPATH, search_bt)))
    Click(d, search_bt)
    print('点击搜索按钮')


def Submission_date(_date):
    Send(d, sms_date_1, _date)
    time.sleep(10 * random.random())
    Send(d, sms_date_2, _date)
    wait_one_timer()


def Collection_date(date1, date2):
    if date1 != '':
        Send(d, clt_date_1, date1)
        wait_one_timer()
    else:
        Send(d, clt_date_1, '')
    if date2 != '':
        Send(d, clt_date_2, date2)
        wait_one_timer()
    else:
        Send(d, clt_date_2, '')


def wait_one_timer(max_time=13):
    max_time = max_time + int(6 * random.random())
    print('等待{}秒'.format(str(max_time)), end='  ')
    global signal
    for i in range(max_time):
        if if_terminate:
            return
        signal = True
        time.sleep(1)
    print('结束等待')


def getDownLoadedFileName(driver, waitTime):
    if len(driver.window_handles) == 1:
        driver.execute_script("window.open()")
        # switch to new tab
        driver.switch_to.window(driver.window_handles[-1])
        # navigate to chrome downloads
        driver.get('chrome://downloads')
    else:
        # switch to new tab
        driver.switch_to.window(driver.window_handles[-1])

    # define the endTime
    endTime = time.time() + waitTime
    global signal
    while True:
        try:
            # get downloaded percentage
            downloadPercentage = driver.execute_script(
                "return document.querySelector('downloads-manager').shadowRoot.querySelector('#downloadsList downloads-item').shadowRoot.querySelector('#progress').value")
            # check if downloadPercentage is 100 (otherwise the script will keep waiting)
            signal = True
            if downloadPercentage == 100:
                # return the file name once the download is completed
                fname = driver.execute_script(
                    "return document.querySelector('downloads-manager').shadowRoot.querySelector('#downloadsList downloads-item').shadowRoot.querySelector('div#content  #file-link').text")
                if not os.path.exists('C:/Users/小扬/Downloads/'+fname):
                    raise NameError()
                else:
                    break

        except:
            try:
                fname = driver.execute_script(
                    "return document.querySelector('downloads-manager').shadowRoot.querySelector('#downloadsList downloads-item').shadowRoot.querySelector('div#content  #file-link').text")
                if os.path.exists('C:/Users/小扬/Downloads/'+fname):
                    break
            except:
                print('.', end='')

        time.sleep(5)
        if time.time() > endTime:
            return ''
    driver.switch_to.window(driver.window_handles[0])
    return fname


def download_fasta(filename):
    global if_terminate
    global signal
    filepath = r'C:\Users\小扬\Downloads'
    print('开始下载')
    # down1 = d.find_element_by_xpath(download)
    print('点击全选')
    Keep(Select_all)
    wait_one_timer()
    if if_terminate:
        return
    print('点击下载键')
    Click(d, download)
    wait_one_timer()
    if if_terminate:
        return
    SwitchToIframe(d)
    # down2 = d.find_element_by_xpath(download2)
    print('点击下载键二')
    Click(d, download2)
    time.sleep(5)
    SwitchToDefault(d)
    fn = getDownLoadedFileName(d, 4200)
    print('download filename:{}'.format(fn))
    print('下载结束')
    if fn == '':
        if_terminate = True
        print('网络错误,重启,if_terminate=True')
        return
    num = 1
    while True:
        new_fn = filename[:-6]
        new_fn += ('(' + str(num) + ')')
        if os.path.exists(filepath + os.sep + new_fn + '.fasta'):
            num += 1
        else:
            filename = new_fn + '.fasta'
            break
    print('rename:({},{})'.format(filepath + os.sep + fn, filepath + os.sep + filename))
    os.rename(filepath + os.sep + fn, filepath + os.sep + filename)
    print('再次点击全选键')
    Keep(Select_all)
    wait_one_timer()
    if if_terminate:
        return
    print('结束下载')


def find_left(sms_date_str):
    start = 0
    time.sleep(7)
    patient_nums = FindPatientNums(d)
    global signal
    for i in range(60):
        signal = True
        patient_nums = FindPatientNums(d)
        if patient_nums > 200000:
            time.sleep(1)
        else:
            break
    print('总数:', patient_nums)
    while True:
        left_date = datetime.strptime(sms_date_str, '%Y-%m-%d') - timedelta(days=30 + start)
        left_date_str = left_date.strftime('%Y-%m-%d')
        Collection_date(left_date_str, '')
        time.sleep(10)
        pn = 9999999
        for i in range(60):
            signal = True
            pn = FindPatientNums(d)
            if pn > 50000:
                time.sleep(1)
            else:
                break

        print('patient_nums:{}, pn:{}'.format(patient_nums, pn))
        if pn == patient_nums:
            print('左点极限')
            break
        elif patient_nums - pn <= 10000:
            print('左点左数据量{}右数据量{},小于10000'.format(str(patient_nums - pn), str(pn)))
            left_date2 = left_date - timedelta(days=1)
            left_date_str2 = left_date2.strftime('%Y-%m-%d')
            print('下载{}及之前的日期'.format(left_date_str2))
            Collection_date('', left_date_str2)
            time.sleep(6)
            download_fasta(sms_date_str + '.fasta')
            #  储存进度
            set_advance([left_date_str2, sms_date_str])
            break
        else:
            start += 10
            print('未找到,正在寻找下一个')
    print('找到左点:', left_date)
    return left_date_str


def find_mid(left_date_str, sms_date_str, right_date_str=''):
    left_date = datetime.strptime(left_date_str, '%Y-%m-%d')
    if right_date_str != '':
        print('定制右点')
        right_date = datetime.strptime(right_date_str, '%Y-%m-%d')
    else:
        right_date = datetime.strptime(sms_date_str, '%Y-%m-%d')
        if right_date - left_date > timedelta(days=20):
            right_date = left_date + timedelta(days=20)
        right_date_str = right_date.strftime('%Y-%m-%d')

    Collection_date(left_date_str, right_date_str)
    wait_one_timer()
    if if_terminate:
        return
    time.sleep(7)
    pn = 999999
    global signal
    for i in range(60):
        signal = True
        pn = FindPatientNums(d)
        if pn > 200000:
            time.sleep(1)
        else:
            break
    if pn == 0:
        print(left_date_str, '至', right_date_str, '无数据,跳过')
    elif pn <= 10000:
        print('找到合适日期,cd', left_date_str, 'to', right_date_str, ',sd', sms_date_str, '共', pn, '个数据')
        download_fasta(sms_date_str + '.fasta')
        set_advance([right_date_str, sms_date_str])
        print('下载完成:cd', left_date_str, '__', right_date_str, 'sd', sms_date_str)
    elif right_date - left_date == timedelta(days=0):
        print('警告:一天内有超过10000条,cd', left_date_str, '__', right_date_str, 'sd', sms_date_str)
        return
    else:
        print(left_date_str, '和', right_date_str, '之间有', pn, '个项目,不合适,继续寻找中。')
        sub = right_date - left_date
        mid_date = left_date + sub / 2
        mid_date_str = mid_date.strftime('%Y-%m-%d')
        #  二分法,在函数内完成
        find_mid(left_date_str, sms_date_str, mid_date_str)
        find_mid(mid_date_str, sms_date_str, right_date_str)

    return right_date_str


def loop(sms_date_str, left_date_str=''):
    if left_date_str == '':
        left_date_str = find_left(sms_date_str)
    else:
        print('断点继续,sd', sms_date_str, 'cd从', left_date_str, '开始')
    global signal
    while True:
        signal = True
        print('找中点')
        if if_terminate:
            print('terminate')
            return
        left_date = datetime.strptime(left_date_str, '%Y-%m-%d')
        sms_date = datetime.strptime(sms_date_str, '%Y-%m-%d')
        if left_date >= sms_date:
            break
        Collection_date('', '')
        left_date_str = find_mid(left_date_str, sms_date_str)
    print(sms_date_str, '完成!!!!!')


d = webdriver.Chrome()

sms_start_date = ''
sms_end_date = ''

adpath = '进度.txt'


# [right_date, submission_date]


def get_advance():
    if not os.path.exists(adpath):
        set_advance(['', ''])
    with open(adpath, 'r') as f:
        return literal_eval(f.read())


def set_advance(ad):
    print('进度已储存')
    with open(adpath, 'w+') as f:
        f.write(str(ad))


def start():
    GetWeb(d)
    Login(d)
    ClickSearch(d)
    wait_one_timer()
    if if_terminate:
        return

    ssd = datetime.strptime(sms_start_date, '%Y-%m-%d')
    sed = datetime.strptime(sms_end_date, '%Y-%m-%d')
    # 获取进度
    adv = get_advance()
    left_date_str = ''
    if adv != ['', '']:
        #  读取进度
        right_date_str = adv[0]
        right_date = datetime.strptime(right_date_str, '%Y-%m-%d')
        ssd = datetime.strptime(adv[1], '%Y-%m-%d')
        if right_date >= ssd:
            #  前一天已完结
            ssd += timedelta(days=1)
        else:
            #  前一天未完结
            left_date_str = right_date_str

    days = (sed - ssd).days + 1
    for i in range(days):
        submission_date = (ssd + timedelta(days=i)).strftime('%Y-%m-%d')
        print('开始sd:', submission_date)
        Submission_date(submission_date)
        Collection_date('', '')
        #  检测数据量是否小于10000
        time.sleep(10)
        pn = FindPatientNums(d)
        if pn <= 10000:
            print('一天数据量{},低于10000,直接下载'.format(str(pn)))
            download_fasta(submission_date + '.fasta')
        else:
            time.sleep(pn / 2000)  # 防止数据未刷新完全
            loop(submission_date, left_date_str)
        left_date_str = ''
        if if_terminate:
            return
    global if_done
    if_done = True
    print('if_done = True')


if __name__ == '__main__':
    login_name = 'SunCe'
    login_password = 'GZ4Tzcr9'

    sms_start_date = '2021-09-01'
    sms_end_date = '2021-09-05'

    start()
这段代码很长,直接复制粘贴就好

 

上一篇:【笔记】Python | 04 | 操作列表 | 4.5 元组


下一篇:【笔记】Python | 04 | 操作列表 | 4.4 使用列表的一部分