爬取Freelancer上的“私活儿”信息

本文仅用于学习交流,请勿用于商业用途,或恶意破坏,未经允许请勿转载

前戏

首先你需要 注册一个Freelancer 的账户
接下来准备好Chrome浏览器和IDE,咱们开搞!

快速开始

  1. 在你的电脑上安装谷歌浏览器
  2. 并下载一个对应版本的chromedriver,将其解压到一个你喜欢的地方(你可以在代码中找到DRIVER_PATH变量设置chromedriver路径)
  3. 然后保存代码到一个py文件(代码见本文末)
  4. 在代码中设置用户名(USERNAME)、密码(PASSWORD)、chromedriver路径(DRIVER_PATH ),并保存
  5. 打开命令行使用命令python filename.py运行,默认爬取第一页的项目信息
  6. 使用命令python filename.py --all 1运行,可爬取所有的项目信息(自动换页),参数--all后跟任意字符均可

运行结果

爬取Freelancer上的“私活儿”信息

项目源码

#!/usr/bin/env python
# coding: utf-8
import time
import random
import argparse

import pandas as pd
from lxml import etree
from urllib.parse import urljoin
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec

MAIN_URL = "https://www.freelancer.com"
XPATH = {
    "用户名输入框": '//input[@placeholder="Email or Username"]',
    "密码输入框": '//input[@placeholder="Password"]',
    "登录按钮": '//button[contains(string(), "Log In")]',
    "登录成功验证": '//app-user-card',
    "搜索结果": '//ul[@class="search-result-list"]',
    "搜索结果项": '//ul[@class="search-result-list"]//a[@class="search-result-link"]',
    "下一页按钮已禁用": '//li[contains(string(), "Next") and @class="disabled"]',
}
JS = {
    "ProjectTypes": """return document.querySelectorAll('label[ng-repeat="type in ProjectsFilter.filterOptions.projectTypes"] > input')""",
    "ClearSkills": """return document.querySelector('button[i18n-msg="Clear Skills"]')""",
    "ClearLocation": """return document.querySelector('button[i18n-msg="Clear Location"]')""",
    "ClearLanguages": """return document.querySelector('button[i18n-msg="Clear Languages"]')""",
    "ListingTypes": """return document.querySelectorAll('label[ng-repeat="type in ProjectsFilter.filterOptions.projectUpgrades"] > input')""",
}

USERNAME = ""  # Freelancer用户名
PASSWORD = ""  # Freelancer密码

DRIVER_PATH = r"chromedriver"  # 设置chromedriver的路径

TIMEOUT = 200


class CrawlFreelancer:
    def __init__(self):
        self.result = []
        self.driver = None
        self.take_all = self.init_parse()  # 是否获取所有页面的项目

    @staticmethod
    def init_parse():
        parser = argparse.ArgumentParser(description="Whether to get all projects")
        parser.add_argument('--all', type=str, default=None)
        args = parser.parse_args()
        if args.all:
            print("Will get all projects")
        else:
            print("Will get projects only first page")
        return args.all

    def find_element(self, path, wait=0, is_join=False):
        """
        通过xpath寻找对应的元素,软处理,不报错
        return: 可迭代对象
        """
        time.sleep(wait)
        html_source = etree.HTML(self.driver.page_source)
        element = html_source.xpath(path)  # 获取元素,返回的将会是一个list
        # 判断列表的长度,来确定是否找到了元素
        if len(element):
            if is_join:
                return ''.join(element)
            return element
        else:
            # 由于会用这个方法来取列表,故返回可迭代对象
            return ''

    def set_global_driver(self):
        options = webdriver.ChromeOptions()
        # 不会被检测到是chromedriver访问
        options.add_experimental_option("excludeSwitches", ["enable-automation"])
        # 忽略 Chrome 浏览器证书错误报警提示
        options.add_argument('--ignore-certificate-errors')
        # 去掉开发者警告
        options.add_experimental_option('useAutomationExtension', False)

        # 声明driver,传入配置参数options
        self.driver = webdriver.Chrome(executable_path=DRIVER_PATH, options=options)
        self.driver.execute_cdp_cmd("Network.enable", {})

        self.driver.maximize_window()  # 设置打开页面最大化,目的是更好的截取错误图
        self.driver.set_page_load_timeout(TIMEOUT)  # 设置页面加载超时(全局)2分钟
        print('全局初始化浏览器配置完成')

    def login(self):
        self.driver.get("https://www.freelancer.com/login")
        # 执行登录操作
        self.driver.find_element_by_xpath(XPATH['用户名输入框']).send_keys(USERNAME)
        self.driver.find_element_by_xpath(XPATH['密码输入框']).send_keys(PASSWORD)
        self.driver.find_element_by_xpath(XPATH['登录按钮']).click()
        # 等待登录成功
        WebDriverWait(self.driver, TIMEOUT, 0.3).until(ec.presence_of_element_located((By.XPATH, XPATH['登录成功验证'])))
        # 打开项目界面
        self.driver.get("https://www.freelancer.com/search/projects/?ngsw-bypass=&w=f")
        WebDriverWait(self.driver, TIMEOUT, 0.3).until(ec.presence_of_element_located((By.XPATH, XPATH['搜索结果'])))

    def is_checked(self, element):
        return self.driver.execute_script("""return arguments[0].checked""", element)

    def clear_filters(self):
        # 清除所有的筛选条件
        for el in self.driver.execute_script(JS['ProjectTypes']):
            if not self.is_checked(el):
                el.click()

        clear_skills = self.driver.execute_script(JS['ClearSkills'])
        if clear_skills.is_enabled():
            clear_skills.click()

        for ele in self.driver.execute_script(JS['ListingTypes']):
            if self.is_checked(ele):
                ele.click()

        clear_location = self.driver.execute_script(JS['ClearLocation'])
        if clear_location.is_enabled():
            clear_location.click()

        clear_languages = self.driver.execute_script(JS['ClearLanguages'])
        if clear_languages.is_enabled():
            clear_languages.click()

        self.driver.refresh()

    @staticmethod
    def clear_string(text_list):
        """列表转字符串;清理字符串中多余的空格"""
        return ''.join(text_list).lstrip().rstrip()

    def parse_result(self):
        # 开始获取数据
        page_num = 1
        # 循环获取每页的数据
        while True:
            WebDriverWait(self.driver, TIMEOUT, 0.3).until(ec.presence_of_element_located((By.XPATH, XPATH['搜索结果项'])))
            li_list = self.find_element(XPATH['搜索结果项'])
            for li in li_list:
                data_dict = {
                    "link": urljoin(MAIN_URL, self.clear_string(li.xpath('./@href'))),
                    "title": self.clear_string(li.xpath('.//*[@class="info-card-title"]/text()')),
                    "content": self.clear_string(li.xpath('.//*[@class="info-card-description"]/text()')),
                    "salary": self.clear_string(li.xpath('.//*[@class="info-card-action"]//text()')).replace(' ', '').replace('\n', ''),
                    "lastEdit": self.clear_string(li.xpath('.//time[@class="info-card-details-item"]/text()')),
                    "tags": [self.clear_string(tag) for tag in li.xpath('.//div[@class="info-card-skills"]/span/text()')]
                }
                self.result.append(data_dict)
            else:
                print(f"第【{page_num}】页数据获取成功")
            # 点击下一页前判断按钮是否禁用
            if self.find_element(XPATH["下一页按钮已禁用"]) or not self.take_all:
                print("获取完毕")
                break
            else:
                page_num += 1
                time.sleep(random.uniform(2, 4))
                self.driver.find_element_by_link_text("Next").click()

    def output_excel(self):
        pf = pd.DataFrame(self.result)
        fields = ['link', 'title', 'content', 'salary', 'lastEdit', 'tags']
        pf = pf[fields]  # 指定列的顺序
        file_path = pd.ExcelWriter('freelancer.xlsx')  # 打开excel文件
        # 替换空单元格
        pf.fillna(' ', inplace=True)
        # 输出
        pf.to_excel(file_path, encoding='utf-8', index=False, sheet_name="sheet1")
        file_path.save()

    def main(self):
        try:
            # 设置全局WebDriver
            self.set_global_driver()
            # 登录
            self.login()
            # 清除筛选条件
            self.clear_filters()
            # 解析结果
            self.parse_result()
        except Exception as e:
            print("获取数据失败!!")
            print(e)
        else:
            print("获取数据成功")
        finally:
            self.driver.close()
            self.output_excel()


if __name__ == '__main__':
    CrawlFreelancer().main()

上一篇:11.25学习日志


下一篇:怎样清除浮动