python GISAID 网站爬虫实践:fasta文件获取
临近毕业,老师交给我的爬虫任务已经结束。反正留在电脑里也没用,不如发出来造福大家。
简介:
GISAID是流行病毒共享组织,网址“www.gisaid.org”,服务器位于美国洛杉矶、旧金山等地, 需要教育邮箱注册账号才能访问其数据。它的网站比较老旧,其中的病例数据、DNA序列文件有单次获取量限制,并且网站管理员从不回复任何消息,最关键的是,网站代码是动态的、并且有验证码,所以爬虫初学者往往很难在几周之内完成这个爬虫。为了获取这些宝贵的信息,我们学院的老师和学生也是焦头烂额。
无论如何,最终我写好了fasta文件和病例信息自动化获取的程序,发布在博客园这里。(估计也没人会转载吧,想拿就随便拿去吧)
使用步骤:
1.安装好python, 将python安装路径加入系统路径Path中
2.同时按住win徽标键 + R键,输入cmd打开DOS窗口,
依次输入命令:
pip install selenium
pip install ctypes
安装selenium 和ctypes软件包
3.百度webdriver Chrome ,找到和电脑上安装的Chrome版本相同的webdriver(chromedriver.exe),下载到任意文件夹内
4.依次复制下面两段代码,分别粘贴到chromedriver.exe所在文件夹的两个空白txt文档中, 重命名为xxx.py(xxx代表随便取名), 双击第一段代码所在文件,在弹出界面输入必要信息,即可开始爬虫。
用户界面代码(点击这段代码所在.py文件开始):
__author__ = 'cquxiaoy' from tkinter import * import ctypes import download_fasta_new as dt from selenium import webdriver import time from threading import Thread def check(): # 监测主程序的状态 while True: if dt.signal: dt.signal = False else: dt.if_terminate = True print('长时间未响应,即将重启') time.sleep(3000) def _start(): start_date_str, end_date_str = e1.get(), e2.get() dt.adpath = '进度'+ start_date_str + end_date_str +'.txt' for i in range(100): print('第%d次启动'%(i+1)) if dt.if_done: break elif dt.if_terminate: dt.d.close() dt.d = webdriver.Chrome() dt.if_terminate = False dt.sms_start_date = start_date_str dt.sms_end_date = end_date_str try: Thread(target=check, daemon=True).start() dt.start() except Exception as e: print('出错:', e) def start(): Thread(target=_start, daemon=True).start() r.withdraw() font = 'weiruanyahei 20' ctypes.windll.shcore.SetProcessDpiAwareness(True) r = Tk('gisaid fasta数据爬虫') Label(r, text='账号:', font=font).grid(row=0, column=0) Label(r, text='密码:', font=font).grid(row=1, column=0) en, ep = Entry(r, width=11, font=font), Entry(r, width=11, font=font) en.grid(row=0, column=1, columnspan=2) ep.grid(row=1, column=1, columnspan=2) Label(r, text='Submission Date:', font=font).grid(row=2, columnspan=3) Label(r, text='to', font=font).grid(row=3, column=1) Button(r, text='开始', command=start, font=font).grid(row=4, columnspan=3) e1, e2 = Entry(r, width=11, font=font), Entry(r, width=11, font=font) e1.grid(row=3, column=0) e2.grid(row=3, column=2) e1.insert(0, '2022-02-19') e2.insert(0, '2022-02-20') Label(r, text='日期仅支持范例的格式\n日期不变的话,程序能断点续下\n有问题请加QQ1977649208', font='weiruanyahei 16').grid(row=5, columnspan=3) r.mainloop()用户界面段代码
爬虫主程序(粘贴在同一目录下的.py文件中即可):
import datetime from datetime import * from ast import literal_eval import random import os import time from selenium import webdriver from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By iframe_xpath = '/html/body/iframe' clt_date_1 = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[1]/div[4]/table/tbody/tr/td[2]/table/tbody/tr/td[' \ '1]/div/div[1]/input ' clt_date_2 = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[1]/div[4]/table/tbody/tr/td[2]/table/tbody/tr/td[' \ '3]/div/div[1]/input ' sms_date_1 = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[1]/div[4]/table/tbody/tr/td[2]/table/tbody/tr/td[' \ '5]/div/div[1]/input ' sms_date_2 = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[1]/div[4]/table/tbody/tr/td[2]/table/tbody/tr/td[' \ '7]/div/div[1]/input ' select_all = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[2]/div[1]/div[2]/table/thead/tr/th[1]/div/span/input' download = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[2]/div[2]/div[3]/button[3]' download2 = '/html/body/form/div[5]/div/div[2]/div/div/div[2]/div/button' # login_name = 'SunCe' # login_password = 'GZ4Tzcr9' # 如果你找到了这里,这个账号就拿去用吧,反正我也不需要了 signal = True if_terminate = False if_done = False def send(xpath, msg): ipt = d.find_element_by_xpath(xpath) ipt.clear() if msg != '': ipt.send_keys(msg + '\n') def Click(d, element_xpath): for i in range(20): try: bt = d.find_element_by_xpath(element_xpath) bt.click() return bt.text except Exception as e: pass # print('点击失败') time.sleep(0.5) def Send(d, element_xpath, msg): for i in range(20): try: bt = d.find_element_by_xpath(element_xpath) bt.clear() bt.send_keys(msg) break except Exception as e: pass # print('点击失败') time.sleep(0.5) def Keep(exe, times=20): # 最大次数默认为无限 # 保持尝试直到成功完成 counter = 0 global signal while True: signal = True counter += 1 try: if (counter < times) or (times <= 0): exe() break else: break except Exception as e: pass # print('Keep--',str(exe.__name__),':失败-',counter) time.sleep(0.5) def SwitchToIframe(d): def s1(): WebDriverWait(d, 3, 0.5).until(EC.presence_of_element_located((By.XPATH, iframe_xpath))) iframe = d.find_element_by_xpath(iframe_xpath) d.switch_to.frame(iframe) Keep(s1) def SwitchToDefault(d): def s2(): d.switch_to.default_content() Keep(s2) def Select_all(): sela = d.find_element_by_xpath(select_all) sela.click() def FindPatientNums(d): # 返回项目总数 total_num_xpath = '/html/body/form/div[5]/div/div[2]/div/div[2]/div[2]/div[2]/div[1]/span/span' total_num_str = d.find_element_by_xpath(total_num_xpath).text # total_num = total_num_str[7:-8].replace(',', '') # print('共',total_num,'个病例') return int(total_num) def GetWeb(d): d.get('https://www.epicov.org/epi3/frontend') time.sleep(1) def Login(d): login_xpath = '/html/body/form/div[5]/div/div[2]/div/div/div/div[1]/div/div/div[2]/input[1]' password_xpath = '/html/body/form/div[5]/div/div[2]/div/div/div/div[1]/div/div/div[2]/input[2]' login_bt_xpath = '/html/body/form/div[5]/div/div[2]/div/div/div/div[1]/div/div/div[2]/input[3]' Send(d, login_xpath, login_name) Send(d, password_xpath, login_password) Click(d, login_bt_xpath) print('Login') def ClickSearch(d): search_bt = '/html/body/form/div[5]/div/div[2]/div/div[1]/div/div/div[3]' WebDriverWait(d, 3, 0.5).until(EC.presence_of_element_located((By.XPATH, search_bt))) Click(d, search_bt) print('点击搜索按钮') def Submission_date(_date): Send(d, sms_date_1, _date) time.sleep(10 * random.random()) Send(d, sms_date_2, _date) wait_one_timer() def Collection_date(date1, date2): if date1 != '': Send(d, clt_date_1, date1) wait_one_timer() else: Send(d, clt_date_1, '') if date2 != '': Send(d, clt_date_2, date2) wait_one_timer() else: Send(d, clt_date_2, '') def wait_one_timer(max_time=13): max_time = max_time + int(6 * random.random()) print('等待{}秒'.format(str(max_time)), end=' ') global signal for i in range(max_time): if if_terminate: return signal = True time.sleep(1) print('结束等待') def getDownLoadedFileName(driver, waitTime): if len(driver.window_handles) == 1: driver.execute_script("window.open()") # switch to new tab driver.switch_to.window(driver.window_handles[-1]) # navigate to chrome downloads driver.get('chrome://downloads') else: # switch to new tab driver.switch_to.window(driver.window_handles[-1]) # define the endTime endTime = time.time() + waitTime global signal while True: try: # get downloaded percentage downloadPercentage = driver.execute_script( "return document.querySelector('downloads-manager').shadowRoot.querySelector('#downloadsList downloads-item').shadowRoot.querySelector('#progress').value") # check if downloadPercentage is 100 (otherwise the script will keep waiting) signal = True if downloadPercentage == 100: # return the file name once the download is completed fname = driver.execute_script( "return document.querySelector('downloads-manager').shadowRoot.querySelector('#downloadsList downloads-item').shadowRoot.querySelector('div#content #file-link').text") if not os.path.exists('C:/Users/小扬/Downloads/'+fname): raise NameError() else: break except: try: fname = driver.execute_script( "return document.querySelector('downloads-manager').shadowRoot.querySelector('#downloadsList downloads-item').shadowRoot.querySelector('div#content #file-link').text") if os.path.exists('C:/Users/小扬/Downloads/'+fname): break except: print('.', end='') time.sleep(5) if time.time() > endTime: return '' driver.switch_to.window(driver.window_handles[0]) return fname def download_fasta(filename): global if_terminate global signal filepath = r'C:\Users\小扬\Downloads' print('开始下载') # down1 = d.find_element_by_xpath(download) print('点击全选') Keep(Select_all) wait_one_timer() if if_terminate: return print('点击下载键') Click(d, download) wait_one_timer() if if_terminate: return SwitchToIframe(d) # down2 = d.find_element_by_xpath(download2) print('点击下载键二') Click(d, download2) time.sleep(5) SwitchToDefault(d) fn = getDownLoadedFileName(d, 4200) print('download filename:{}'.format(fn)) print('下载结束') if fn == '': if_terminate = True print('网络错误,重启,if_terminate=True') return num = 1 while True: new_fn = filename[:-6] new_fn += ('(' + str(num) + ')') if os.path.exists(filepath + os.sep + new_fn + '.fasta'): num += 1 else: filename = new_fn + '.fasta' break print('rename:({},{})'.format(filepath + os.sep + fn, filepath + os.sep + filename)) os.rename(filepath + os.sep + fn, filepath + os.sep + filename) print('再次点击全选键') Keep(Select_all) wait_one_timer() if if_terminate: return print('结束下载') def find_left(sms_date_str): start = 0 time.sleep(7) patient_nums = FindPatientNums(d) global signal for i in range(60): signal = True patient_nums = FindPatientNums(d) if patient_nums > 200000: time.sleep(1) else: break print('总数:', patient_nums) while True: left_date = datetime.strptime(sms_date_str, '%Y-%m-%d') - timedelta(days=30 + start) left_date_str = left_date.strftime('%Y-%m-%d') Collection_date(left_date_str, '') time.sleep(10) pn = 9999999 for i in range(60): signal = True pn = FindPatientNums(d) if pn > 50000: time.sleep(1) else: break print('patient_nums:{}, pn:{}'.format(patient_nums, pn)) if pn == patient_nums: print('左点极限') break elif patient_nums - pn <= 10000: print('左点左数据量{}右数据量{},小于10000'.format(str(patient_nums - pn), str(pn))) left_date2 = left_date - timedelta(days=1) left_date_str2 = left_date2.strftime('%Y-%m-%d') print('下载{}及之前的日期'.format(left_date_str2)) Collection_date('', left_date_str2) time.sleep(6) download_fasta(sms_date_str + '.fasta') # 储存进度 set_advance([left_date_str2, sms_date_str]) break else: start += 10 print('未找到,正在寻找下一个') print('找到左点:', left_date) return left_date_str def find_mid(left_date_str, sms_date_str, right_date_str=''): left_date = datetime.strptime(left_date_str, '%Y-%m-%d') if right_date_str != '': print('定制右点') right_date = datetime.strptime(right_date_str, '%Y-%m-%d') else: right_date = datetime.strptime(sms_date_str, '%Y-%m-%d') if right_date - left_date > timedelta(days=20): right_date = left_date + timedelta(days=20) right_date_str = right_date.strftime('%Y-%m-%d') Collection_date(left_date_str, right_date_str) wait_one_timer() if if_terminate: return time.sleep(7) pn = 999999 global signal for i in range(60): signal = True pn = FindPatientNums(d) if pn > 200000: time.sleep(1) else: break if pn == 0: print(left_date_str, '至', right_date_str, '无数据,跳过') elif pn <= 10000: print('找到合适日期,cd', left_date_str, 'to', right_date_str, ',sd', sms_date_str, '共', pn, '个数据') download_fasta(sms_date_str + '.fasta') set_advance([right_date_str, sms_date_str]) print('下载完成:cd', left_date_str, '__', right_date_str, 'sd', sms_date_str) elif right_date - left_date == timedelta(days=0): print('警告:一天内有超过10000条,cd', left_date_str, '__', right_date_str, 'sd', sms_date_str) return else: print(left_date_str, '和', right_date_str, '之间有', pn, '个项目,不合适,继续寻找中。') sub = right_date - left_date mid_date = left_date + sub / 2 mid_date_str = mid_date.strftime('%Y-%m-%d') # 二分法,在函数内完成 find_mid(left_date_str, sms_date_str, mid_date_str) find_mid(mid_date_str, sms_date_str, right_date_str) return right_date_str def loop(sms_date_str, left_date_str=''): if left_date_str == '': left_date_str = find_left(sms_date_str) else: print('断点继续,sd', sms_date_str, 'cd从', left_date_str, '开始') global signal while True: signal = True print('找中点') if if_terminate: print('terminate') return left_date = datetime.strptime(left_date_str, '%Y-%m-%d') sms_date = datetime.strptime(sms_date_str, '%Y-%m-%d') if left_date >= sms_date: break Collection_date('', '') left_date_str = find_mid(left_date_str, sms_date_str) print(sms_date_str, '完成!!!!!') d = webdriver.Chrome() sms_start_date = '' sms_end_date = '' adpath = '进度.txt' # [right_date, submission_date] def get_advance(): if not os.path.exists(adpath): set_advance(['', '']) with open(adpath, 'r') as f: return literal_eval(f.read()) def set_advance(ad): print('进度已储存') with open(adpath, 'w+') as f: f.write(str(ad)) def start(): GetWeb(d) Login(d) ClickSearch(d) wait_one_timer() if if_terminate: return ssd = datetime.strptime(sms_start_date, '%Y-%m-%d') sed = datetime.strptime(sms_end_date, '%Y-%m-%d') # 获取进度 adv = get_advance() left_date_str = '' if adv != ['', '']: # 读取进度 right_date_str = adv[0] right_date = datetime.strptime(right_date_str, '%Y-%m-%d') ssd = datetime.strptime(adv[1], '%Y-%m-%d') if right_date >= ssd: # 前一天已完结 ssd += timedelta(days=1) else: # 前一天未完结 left_date_str = right_date_str days = (sed - ssd).days + 1 for i in range(days): submission_date = (ssd + timedelta(days=i)).strftime('%Y-%m-%d') print('开始sd:', submission_date) Submission_date(submission_date) Collection_date('', '') # 检测数据量是否小于10000 time.sleep(10) pn = FindPatientNums(d) if pn <= 10000: print('一天数据量{},低于10000,直接下载'.format(str(pn))) download_fasta(submission_date + '.fasta') else: time.sleep(pn / 2000) # 防止数据未刷新完全 loop(submission_date, left_date_str) left_date_str = '' if if_terminate: return global if_done if_done = True print('if_done = True') if __name__ == '__main__': login_name = 'SunCe' login_password = 'GZ4Tzcr9' sms_start_date = '2021-09-01' sms_end_date = '2021-09-05' start()这段代码很长,直接复制粘贴就好