Golang分布式爬虫:抓取煎蛋文章|Redis/Mysql|56,961 篇文章

---
layout: post
title: "Golang分布式爬虫:抓取煎蛋文章"
date: 2017-04-15
author: hunterhug
categories: [代码]
desc: "Golang分布式爬虫:抓取煎蛋文章"
tags: ["爬虫","Golang"]
permalink: "/spider/jiandan.html"
---

版权所有,转载请注明:www.lenggirl.com/spider/jiandan.html

项目地址:https://github.com/hunterhug/GoSpiderExample

  1. 克隆https://github.com/hunterhug/GoSpider仓库到GOPATH路径下,地址:https://github.com/hunterhug/GoSpider
  2. 示例仅供学习,爬虫有风险,如果太暴力,会给别人带来损失,在此申明不承担相应责任,查看Example

一、介绍

多浏览器持久化cookie分布式爬虫爬取数据,使用到redis,mysql,将网页数据保存在磁盘中,详情页解析后存入数据库。中级示例!

Golang分布式爬虫:抓取煎蛋文章|Redis/Mysql|56,961 篇文章
Golang分布式爬虫:抓取煎蛋文章|Redis/Mysql|56,961 篇文章
Golang分布式爬虫:抓取煎蛋文章|Redis/Mysql|56,961 篇文章

二、架构

使用Redis进行分布式,多只爬虫并发抓取,保存cookie,且利用保存在本地的文件。

我们先看如何运行,再看代码

代码结构

clear.go  清除Redis中抓取失败的URL,从doing队列移到todo队列
cont.go  配置
detail.go 详情页抓取
index.go 首页页面抓取
parse.go 解析功能
store.go 存储功能
--main
    main.go 程序入口

三、使用方法

  1. cont.go编辑配置,RootDir = "E:\\jiandan"为数据目录
  2. 进main文件夹运行
  3. 数据保存在RootDir下文件夹,和Mysql数据库中
  4. 重抓要删除Redis数据库和相应文件夹,否则已抓将不再抓。

四、代码解释

各种解释均写在代码中

1.main.go入口

package main

import (
    //"fmt"
    "github.com/hunterhug/GoSpiderExample/jiandan"
    "os"
    "os/signal"
)

var Clear = false

func main() {
    if Clear {
        // Reids中Doing的迁移到Todo,需手动,var Clear = true
        go jiandan.Clear()
    } else {
        // 首页爬虫爬取
        go jiandan.IndexSpiderRun()

        // 详情页抓取
        go jiandan.DetailSpidersRun()
    }

    c := make(chan os.Signal)
    //监听指定信号
    signal.Notify(c, os.Interrupt)

    //阻塞直至有信号传入
    <-c
}

Reids中Doing的迁移到Todo,需手动设置var Clear = true,然后两个协程一起抓取,信号量监听才退出!

2.cont.go配置

package jiandan

import (
    "fmt"
    "github.com/hunterhug/GoSpider/spider"
    "github.com/hunterhug/GoSpider/store/myredis"
    "github.com/hunterhug/GoSpider/store/mysql"
    "github.com/hunterhug/GoSpider/util"
    "path/filepath"
)

// 可抽离到配置文件中
const (
    // 网站
    Url  = "http://jandan.net"
    Host = "jandan.net"

    // 详情页爬虫数量
    DetailSpiderNum        = 30
    DetailSpiderNamePrefix = "detail"
    // 首页爬虫数量
    IndexSpiderNum        = 3
    IndexSpiderNamePrefix = "index"

    // 爬虫暂停时间
    StopTime = 1
    // 日志级别
    LogLevel = "info"
)

var (
    // 首页页数
    IndexPage int

    // 根目录
    //RootDir = util.CurDir()
    RootDir = "E:\\jiandan"

    // Redis配置
    RedisConfig = myredis.RedisConfig{
        DB:       0,
        Host:     "127.0.0.1:6379",
        Password: "smart2016",
    }

    RedisClient myredis.MyRedis

    RedisListTodo  = "jiandantodo"
    RedisListDoing = "jiandandoing"
    RedisListDone  = "jiandandone"

    // mysql config
    mysqlconfig = mysql.MysqlConfig{
        Username: "root",
        Password: "smart2016",
        Ip:       "127.0.0.1",
        Port:     "3306",
        Dbname:   "jiandan",
    }

    MysqlClient mysql.Mysql
)

// 设置全局
func init() {
    e := util.MakeDir(filepath.Join(RootDir, "data", "detail"))
    if e != nil {
        spider.Log().Panic(e.Error())
    }
    spider.SetGlobalTimeout(StopTime)
    spider.SetLogLevel(LogLevel)
    indexstopchan = make(chan bool, 1)

    // 初始化爬虫们,一种多爬虫方式,设置到全局MAP中
    for i := 0; i <= IndexSpiderNum; i++ {
        s, e := spider.New(nil)
        if e != nil {
            spider.Log().Panicf("index spider %d new error: %s", i, e.Error())
        }
        // 设置随机UA
        s.SetUa(spider.RandomUa())
        spider.Pool.Set(fmt.Sprintf("%s-%d", IndexSpiderNamePrefix, i), s)
    }
    for i := 0; i <= DetailSpiderNum; i++ {
        s, e := spider.New(nil)
        if e != nil {
            spider.Log().Panicf("detail spider %d new error: %s", i, e.Error())
        }
        s.SetUa(spider.RandomUa())
        spider.Pool.Set(fmt.Sprintf("%s-%d", DetailSpiderNamePrefix, i), s)
    }
}

首先进行各种常量定义,然后REDIS和MYSQL配置,新建数据保存文件夹,初始化多只爬虫

3.store.go存储功能

package jiandan

import (
    "github.com/hunterhug/GoSpider/spider"
    "github.com/hunterhug/GoSpider/store/myredis"
    "github.com/hunterhug/GoSpider/store/mysql"
    "github.com/hunterhug/GoSpider/util"
)

func init() {
    // 新建Redis池,方便爬虫们插和抽!!
    client, err := myredis.NewRedisPool(RedisConfig, DetailSpiderNum+IndexSpiderNum+2)
    if err != nil {
        spider.Log().Error(err.Error())
    }
    RedisClient = client

    // 新建数据库
    e := mysqlconfig.CreateDb()
    if e != nil {
        spider.Log().Error(e.Error())
    }
    // a new db connection
    MysqlClient = mysql.New(mysqlconfig)

    // open connection
    MysqlClient.Open(500, 300)

    // create sql
    sql := `
  CREATE TABLE IF NOT EXISTS pages (
  id varchar(255) NOT NULL,
  url varchar(255) NOT NULL,
  title varchar(255) NOT NULL,
  shortcontent varchar(255) NOT NULL DEFAULT '',
  tags varchar(255) NOT NULL DEFAULT '',
  content longtext NOT NULL,
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='煎蛋文章';`

    // create
    _, err = MysqlClient.Create(sql)
    if err != nil {
        spider.Log().Error(err.Error())
    }

}

func SentRedis(urls []string) {
    var interfaceSlice []interface{} = make([]interface{}, len(urls))
    for i, d := range urls {
        interfaceSlice[i] = d
    }
    _, e := RedisClient.Lpush(RedisListTodo, interfaceSlice...)
    if e != nil {
        spider.Log().Errorf("sent redis error:%s", e.Error())
    }
}

func SaveToMysql(url string, m map[string]string) {
    if m["title"] == "" {
        return
    }
    _, e := MysqlClient.Insert("INSERT INTO `jiandan`.`pages`(`id`,`url`,`title`,`shortcontent`,`tags`,`content`)VALUES(?,?,?,?,?,?)", util.Md5(url), url, m["title"], m["shortcontent"], m["tags"], m["content"])
    if e != nil {
        spider.Log().Error("save mysql error:" + e.Error())
    }
}

首先初始化Redis池和新建MYSQL数据库和表,数据库采用编码:utf8mb4,因为字节数会造成错误

CREATE TABLE IF NOT EXISTS pages (
  id varchar(255) NOT NULL,
  url varchar(255) NOT NULL,
  title varchar(255) NOT NULL,
  shortcontent varchar(255) NOT NULL DEFAULT '',
  tags varchar(255) NOT NULL DEFAULT '',
  content longtext NOT NULL,
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='煎蛋文章';

函数SentRedisSaveToMysql分别是发送网址到redis,和发送文章到Mysql中。

4.parse.go 解析库

package jiandan

import (
    "errors"
    "github.com/PuerkitoBio/goquery"
    "github.com/hunterhug/GoSpider/query"
    "github.com/hunterhug/GoSpider/util"
    "strings"
)

// 解析页面数量
func ParseIndexNum(data []byte) error {
    doc, e := query.QueryBytes(data)
    if e != nil {
        return e
    }
    s := doc.Find(".pages").Text()
    temp := strings.Split(s, "/")
    if len(temp) != 2 {
        return errors.New("index page not found")
    }
    result := strings.Replace(strings.TrimSpace(temp[1]), ",", "", -1)
    i, e := util.SI(result)
    if e != nil {
        return e
    }
    IndexPage = i
    return nil
}

// 提取信息
func ParseIndex(data []byte) []string {
    list := []string{}
    doc, e := query.QueryBytes(data)
    if e != nil {
        return list
    }
    doc.Find(".post").Each(func(num int, node *goquery.Selection) {
        //title := node.Find("h2").Text()
        //if title == "" {
        //  return
        //}
        url, _ := node.Find("h2").Find("a").Attr("href")
        if url == "" {
            return
        }
        //tag := node.Find(".time_s").Text()
        //if strings.Contains(tag, "·") {
        //  tag = strings.Split(tag, "·")[1]
        //}
        //fmt.Printf("%s,%s,%s\n", title, url, tag)
        list = append(list, url)
    })
    return list
}

func ParseDetail(data []byte) map[string]string {
    returnmap := map[string]string{
        "title": "", "tags": "", "content": "", "shortcontent": "",
    }
    doc, e := query.QueryBytes(data)
    if e != nil {
        return returnmap
    }
    // 标题
    title := doc.Find("title").Text()
    if strings.TrimSpace(title) == "" {
        return returnmap
    }
    shortcontent, _ := doc.Find(`meta[name="description"]`).Attr("content")
    tags, _ := doc.Find(`meta[name="keywords"]`).Attr("content")

    result := ""
    doc.Find("#content").Find(".post p").Each(func(num int, node *goquery.Selection) {
        temp, _ := node.Html()
        result = result + "<p>" + temp + "</p>"
    })

    returnmap["title"] = strings.Replace(title,"\"","'",-1)
    returnmap["tags"] = strings.Replace(tags,"\"","'",-1)
    returnmap["shortcontent"] = strings.Replace(shortcontent,"\"","'",-1)
    returnmap["content"] = strings.Replace(result,"\"","'",-1)
    return returnmap
}

三个函数分别是获取首页中页数,抽取非详情页的URl和抽取详情页的信息。

5.clear.go 失败清除功能

package jiandan

import "github.com/hunterhug/GoSpider/spider"

// 将Doing移到Todo
func Clear() {
    for {
        s, _ := RedisClient.Brpoplpush(RedisListDoing, RedisListTodo, 0)
        spider.Log().Info("movw :" + s)
    }
}

由于爬虫中途会死亡,所以doing的Redis池会反应出来,所以我们每次失败后先将Doing的URL移到Todo

6.index.go 非详情页抓取,保存本地,打Redis

package jiandan

import (
    "fmt"
    "github.com/hunterhug/GoSpider/spider"
    "github.com/hunterhug/GoSpider/util"
    "path/filepath"
)

var (
    // 信号量
    indexstopchan chan bool
)

// 首页启动入口,包括所有非详情页面的抓取
// 抓取网址到redis,因为页数经常变动,所以这个爬虫比较暴力,借助文件夹功能接力,如果页面更新,请将data数据夹删除
func IndexSpiderRun() {
    // 获取首页页数并把首页网址打到redis
    IndexStep()
    // 按顺序抓取页面,打到redis
    PagesStep()
}

// 步骤1:首页随便取只爬虫抓取
func IndexStep() {
    s, ok := spider.Pool.Get(IndexSpiderNamePrefix + "-0")
    if !ok {
        spider.Log().Panic("IndexStep:Get Index Spider error!")
    }
    // 爬取首页
    s.SetUrl(Url).SetMethod("get").SetHost(Host)
    data, e := s.Go()
    if e != nil {
        // 错误直接退出
        spider.Log().Panicf("Get Index Error:%s", e.Error())
    }

    spider.Log().Info("Catch Index!")

    // 实验的
    indexfile := filepath.Join(RootDir, "data", "index.html")
    e = util.SaveToFile(indexfile, data)
    if e != nil {
        spider.Log().Errorf("Save Index Error:%s", e.Error())
    }

    // 获取页数
    e = ParseIndexNum(data)
    if e != nil {
        spider.Log().Panic(e.Error())
    }
    SentRedis(ParseIndex(data))
}

// 步骤2:分配任务
func PagesStep() {
    urllist := []string{}
    for i := 2; i <= IndexPage; i++ {
        urllist = append(urllist, fmt.Sprintf("%s/page/%d", Url, i))
    }
    // 分配任务
    tasks, e := util.DevideStringList(urllist, IndexSpiderNum)
    if e != nil {
        spider.Log().Panic(e.Error())
    }
    // 任务开始
    for i, task := range tasks {
        go PagesTaskGoStep(i, task)
    }
    for i, _ := range tasks {
        // 等待爬虫结束
        <-indexstopchan
        spider.Log().Infof("index spider %s-%d finish", IndexSpiderNamePrefix, i)
    }
}

// 步骤2接力:多只爬虫并发抓页面
func PagesTaskGoStep(name int, task []string) {
    var e error
    var data []byte
    // 获取池中爬虫
    spidername := fmt.Sprintf("%s-%d", IndexSpiderNamePrefix, name)
    s, ok := spider.Pool.Get(spidername)
    if !ok {
        spider.Log().Panicf("Pool Spider %s not get", spidername)
    }
Outloop:
    for _, url := range task {
        // 文件存在,那么不抓
        pagename := fmt.Sprintf("%s.html", util.ValidFileName(url))
        savepath := filepath.Join(RootDir, "data", pagename)
        if util.FileExist(savepath) {
            spider.Log().Infof("page %s Exist", pagename)
            data, e = util.ReadfromFile(savepath)
            if e != nil {
                spider.Log().Errorf("take data from exist file error:%s", e.Error())
            } else {
                SentRedis(ParseIndex(data))
            }
            continue
        }
        s.SetUrl(url)
        s.SetRefer(s.Preurl)
        retrynum := 5
        for {
            if retrynum == 0 {
                goto Outloop
            }
            data, e = s.Go()
            if e != nil {
                spider.Log().Errorf("%s: index page %s fetch error:%s,remain %d times", spidername, url, e.Error(), retrynum)
                retrynum = retrynum - 1
                continue
            }
            SentRedis(ParseIndex(data))
            spider.Log().Infof("%s:index page %s fetch!", spidername, url)
            break
        }

        // 保存文件
        e = util.SaveToFile(savepath, data)
        if e != nil {
            spider.Log().Errorf("Save page %s Fail:%s", pagename, e.Error())
        }
        spider.Log().Infof("Save page %s Done", pagename)
    }

    indexstopchan <- true
}

首先,先设置信号量,方便结束,我们先抓取首页,解析首页页数,存到本地,打Redis,再以此多只爬虫分配任何,抓取各个页面,先判断本地是否存在文件,存在读取解析插到Redis,不存在抓取存入本地,解析,打到Redis。

同时,打到Redis的Url,这时另外的爬虫会分布式抓取这些详情页,见下面:

7.detail.go 抓取详情页,保存本地,和数据库

package jiandan

import (
    "fmt"
    "github.com/hunterhug/GoSpider/spider"
    "github.com/hunterhug/GoSpider/util"
    "path/filepath"
)

// 详情页爬虫
func DetailSpidersRun() {
    for i := 0; i < DetailSpiderNum; i++ {
        go DetailTaskStep(i)
    }
}

func DetailTaskStep(name int) {
    spidername := fmt.Sprintf("%s-%d", DetailSpiderNamePrefix, name)
    detailpath := filepath.Join(RootDir, "data", "detail")
    s, ok := spider.Pool.Get(spidername)
    if !ok {
        spider.Log().Panicf("Pool Spider %s not get", spidername)
    }

    for {
        // 将Todo移到Doing
        url, e := RedisClient.Brpoplpush(RedisListTodo, RedisListDoing, 0)
        if e != nil {
            spider.Log().Errorf("BrpopLpush % error:%s", url, e.Error())
            break
        }
        // Done已经存在
        ok, _ := RedisClient.Hexists(RedisListDone, url)
        if ok {
            // 删除Doing!
            RedisClient.Lrem(RedisListDoing, 0, url)
            continue
        }
        // 文件存在不抓!
        filename := filepath.Join(detailpath, util.ValidFileName(url))
        if util.FileExist(filename) {
            spider.Log().Infof("file:%s exist", filename)
            // 删除Doing!
            RedisClient.Lrem(RedisListDoing, 0, url)
            // 读取后解析存储
            data, e := util.ReadfromFile(filename)
            if e != nil {
                spider.Log().Errorf("take from file %s error: %s", filename, e.Error())
            } else {
                SaveToMysql(url, ParseDetail(data))
            }
            RedisClient.Hset(RedisListDone, url, "")
            continue
        }
        s.SetUrl(url)
        retrynum := 5
        for {
            if retrynum == 0 {
                break
            }
            data, e := s.Go()
            if e != nil {
                spider.Log().Errorf("%s:detail url %s catch error:%s remian %d times", spidername, url, e.Error(), retrynum)
                retrynum = retrynum - 1
                continue
            } else {
                spider.Log().Infof("catch url:%s", url)
                e := util.SaveToFile(filename, data)
                if e != nil {
                    spider.Log().Errorf("file %s save error:%s", filename, e.Error())
                }

                SaveToMysql(url, ParseDetail(data))

                // 删除Doing!
                RedisClient.Lrem(RedisListDoing, 0, url)
                // 送到Done中
                RedisClient.Hset(RedisListDone, url, "")
                break
            }
        }
    }
}

会阻塞拿取todo队列的URL,打到doing后开始抓取,抓取结束删除doing URL,如果检测到本地存在详情页,则直接删除doing,然后读取文件,解析文件,打到MYSQL。如果抓取失败,会重试5次!

五、结果

结果,总共抓取了56,961 篇文章

分布式爬虫,下一篇,抓取妹子图:啥Redis都不用,准备好网速就行!

上一篇:C#抓取和分析网页的类


下一篇:redis未授权