爬虫最快框架 collyx,今天开源了...

admin 2023年2月18日00:58:48评论133 views字数 647阅读2分9秒阅读模式
这是「进击的Coder」的第 798 篇技术分享
作者:TheWeiJun
来源:逆向与爬虫的故事

阅读本文大概需要 13 分钟。

工欲善其事,必先利其器。大家好,我是 TheWeiJun。之前接触 colly时,写过一篇关于 colly 框架的文章。由于当时能力有限加上时间不够充足,一直没能够去研究这个框架。后来经过 3 个多月的不断尝试完善,基于 colly 迭代的 collyx 框架今天准备开源给各位粉丝朋友。希望各位朋友能够多多提出宝贵意见,在阅读的同时记得给我一个 star!
特别声明:本公众号文章只作为学术研究,不用于其它不法用途;如有侵权请联系作者删除。
爬虫最快框架 collyx,今天开源了...

立即加星标

爬虫最快框架 collyx,今天开源了...

每天看好文

目录
一、前言介绍
二、collyx 框架
三、源码分享
四、实战展示
五、心得分享

爬虫最快框架 collyx,今天开源了...

开源展望

不知不觉写公众号已经 1 年了,记得当时和好朋友大壮兄开始写文章时,幻想着以后成为什么样的人。我一直信奉终身成长,今天我决定把自己研究多个月的框架开源分享给各位读者朋友们。希望大家能够一起成长、共同进步。技术总是在不停的迭代更新,我们不能止步不前。在不久的将来,希望能创造更多有价值的文章给各位读者朋友。


一、前言介绍

前言:colly 是 Go 实现的比较有名的一款爬虫框架,而且 Go 在高并发和分布式场景的优势也正是爬虫技术所需要的。它的主要特点是轻量、快速,设计非常优雅,并且分布式的支持也非常简单,易于扩展。

爬虫最快框架 collyx,今天开源了...

github 地址: github.com/gocolly/colly

colly 官网地址:http://go-colly.org/

从上图中,我们可以看出 colly 在 github 社区有着超高的人气。今天我们即将引出 collyx 爬虫框架,下面我将通过源码分享介绍这个框架给各位读者。


二、collyx 框架介绍

框架简介:基于 colly 框架及 net/http 进行封装,实现的一款可配置分布式爬虫架构。使用者只需要配置解析、并发数、入库 topic、请求方式、请求 url 等参数即可,其他代码类似于 scrapy,不需要单独编写。

框架优势:实现了重试机制,各个功能可插拔,自定义解析模块、结构体模块等,抽象了调度模块,大大减少代码冗余,快速提高开发能力;其中对于 feed 流并发的爬虫也能够生效,不止基于深度优先爬虫;也可以用于广度优先。

collyx 架构图预览:

爬虫最快框架 collyx,今天开源了...


三、源码分享

根据上面的架构图,我们可将框架分为 6 个组件,分别为:spiders、engine、items、downloader、pipelines、scheduler。下面,我们将从这几个部分逐一讲解 collyx 的整个源码,同时也将展示一部分 extensions 源码。完整目录如下:

爬虫最快框架 collyx,今天开源了...

1、spiders 模块分享,自定义代码结构,代码如下所示:

// Package spiders ---------------------------// @author    : TheWeiJunpackage main
import (  "collyx-spider/common"  "collyx-spider/items/http"  "collyx-spider/pipelines"  "collyx-spider/spiders/crawler")
func main() {  request := http.FormRequest{    Url:         "https://xxxxx",    Payload:     "xxxxx",    Method:      "POST",    RedisKey:    "ExplainingGoodsChan",    RedisClient: common.LocalRedis,    RedisMethod: "spop",    Process:     pipelines.DemoParse,    Topic:       "test",  }  crawler.Crawl(&request)
}

说明:只需要配置抓取的 url、payload、method、redis、kafka 等参数即可;如果某些参数不想使用,可以去掉。

2、engine 模块源码如下,对 colly 进行初始化参数配置:

package engine
import (  "collyx-spider/common"  downloader2 "collyx-spider/downloader"  extensions2 "collyx-spider/extensions"  "collyx-spider/items/http"  "collyx-spider/scheduler"  "github.com/gocolly/colly"  "time")
var Requests = common.GetDefaultRequests()var TaskQueue = common.GetDefaultTaskQueue()var Proxy = common.GetDefaultProxy()var KeepAlive = common.GetDefaultKeepAlive()var kafkaStatus = common.GetKafkaDefaultProducer()var RequestChan = make(chan bool, Requests)var TaskChan = make(chan interface{}, TaskQueue)
func CollyConnect(request *http.FormRequest) {  var c = colly.NewCollector(    colly.Async(true),    colly.AllowURLRevisit(),  )  c.Limit(&colly.LimitRule{    Parallelism: Requests,    Delay:       time.Second * 3,    RandomDelay: time.Second * 5,  })  if Proxy {    extensions2.SetProxy(c, KeepAlive)  }  //if kafkaStatus {  //  common.InitDefaultKafkaProducer()  //}  extensions2.URLLengthFilter(c, 10000)  downloader2.ResponseOnError(c, RequestChan)  downloader2.DownloadRetry(c, RequestChan)  request.SetConnect(c)  request.SetTasks(TaskChan)  request.SetRequests(RequestChan)}
func StartRequests(request *http.FormRequest) {  /*add headers add parse*/  go scheduler.GetTaskChan(request)  if request.Headers != nil {    request.Headers(request.Connect)  } else {    extensions2.GetHeaders(request.Connect)  }  downloader2.Response(request)
}

说明:该模块主要是做初始化调度器、请求 headers 扩展、初始化下载器、初始化 colly 等操作,是框架运行的重要模块之一。

3、scheduler 模块源码展示,完整代码:

package scheduler
import (  "collyx-spider/items/http"  log "github.com/sirupsen/logrus"  "strings"  "time")
func GetTaskChan(request *http.FormRequest) {  redisKey := request.RedisKey  redisClient := request.RedisClient  redisMethod := request.RedisMethod  limits := int64(cap(request.TasksChan))  TaskChan := request.TasksChan  methodLowerStr := strings.ToLower(redisMethod)  for {    switch methodLowerStr {    case "do":      result, _ := redisClient.Do("qpop", redisKey, 0, limits).Result()      searchList := result.([]interface{})      if len(searchList) == 0 {        log.Debugf("no task")        time.Sleep(time.Second * 3)        continue      }      for _, task := range searchList {        TaskChan <- task      }    case "spop":      searchList, _ := redisClient.SPopN(redisKey, limits).Result()      if len(searchList) == 0 {        log.Debugf("no task")        time.Sleep(time.Second * 3)        continue      }      for _, task := range searchList {        TaskChan <- task      }    default:      log.Info("Methods are not allowed.....")    }    time.Sleep(time.Second)  }}

说明:这里从 spider 里的结构体指针取值,获取任务交给 TaskChan 通道进行任务分发。

4、items 模块源码展示

4.1 request_struct.go 模块代码如下:

package http
import (  "github.com/go-redis/redis"  "github.com/gocolly/colly")
type FormRequest struct {  Url          string  Payload      string  Method       string  RedisKey     string  RedisClient  *redis.Client  RedisMethod  string  GetParamFunc func(*FormRequest)  Connect      *colly.Collector  Process      func([]byte, string, string) string  RequestChan  chan bool  TasksChan    chan interface{}  Topic        string  Headers      func(collector *colly.Collector)  TaskId       string}
func (request *FormRequest) SetRequests(requests chan bool) {  request.RequestChan = requests}
func (request *FormRequest) SetTasks(tasks chan interface{}) {  request.TasksChan = tasks}
func (request *FormRequest) SetConnect(conn *colly.Collector) {  request.Connect = conn}
func (request *FormRequest) SetUrl(url string) {  request.Url = url}

总结:request 结构体负责 spiders 请求自定义,设置初始化请求参数。

4.2 解析结构体,根据解析内容和保存内容自定义结构体,截图如下:

爬虫最快框架 collyx,今天开源了...

5、downloader 模块分享,目录代码结构如下图所示:

爬虫最快框架 collyx,今天开源了...

总结:该模块拥有下载成功、下载错误、下载重试三个功能,接来下分享下源代码。
5.1 download_error.go 代码如下:
package downloader
import (  "github.com/gocolly/colly")
func ResponseOnError(c *colly.Collector, taskLimitChan chan bool) {  c.OnError(func(r *colly.Response, e error) {    defer func() {      <-taskLimitChan    }()  })
  c.OnScraped(func(r *colly.Response) {    defer func() {      <-taskLimitChan    }()
  })}
模块说明:该模块为捕获错误请求,并及时释放并发通道。
5.2 download_ok.go 代码如下:
package downloader
import (  "collyx-spider/common"  "collyx-spider/items/http"  "github.com/gocolly/colly")
func Response(request *http.FormRequest) {  c := request.Connect  c.OnResponse(func(response *colly.Response) {    defer common.CatchError()    task := response.Ctx.Get("task")    isNext := request.Process(response.Body, task, request.Topic)    if isNext != "" {      request.RedisClient.SAdd(request.RedisKey, isNext)    }  })}
模块说明:该模块为处理 200 状态码请求,并将调用 spiders 提前定义的解析函数进行数据抽取。
5.3 download_rety.go 代码如下:
package downloader
import (  "collyx-spider/common"  "github.com/gocolly/colly"  "log")
func RetryFunc(c *colly.Collector, request *colly.Response, RequestChan chan bool) {  url := request.Request.URL.String()  body := request.Request.Body  method := request.Request.Method  ctx := request.Request.Ctx  RequestChan <- true  c.Request(method, url, body, ctx, nil)}
func DownloadRetry(c *colly.Collector, RequestChan chan bool) {  c.OnError(func(request *colly.Response, e error) {    if common.CheckErrorIsBadNetWork(e.Error()) {      taskId := request.Request.Ctx.Get("task")      log.Printf("Start the retry task:%s", taskId)      RetryFunc(c, request, RequestChan)    }  })
}
模块说明:通过自定义的错误函数捕获错误类型,并开启重试机制进行重试,弥补了 colly 请求失败数据缺失问题。
6、pipelines 模块分享,完整代码如下:
package pipelines
import (  "collyx-spider/common"  "collyx-spider/items"  "encoding/json"  log "github.com/sirupsen/logrus")
func DemoParse(bytes []byte, task, topic string) string {  item := items.Demo{}  json.Unmarshal(bytes, &item)  Promotions := item.Promotions  if Promotions != nil {    data := Promotions[0].BaseInfo.Title    proId := Promotions[0].BaseInfo.PromotionId    common.KafkaDefaultProducer.AsyncSendWithKey(task, topic, data+proId)    log.Println(data, Promotions[0].BaseInfo.PromotionId, topic)  } else {    log.Println(Promotions)  }  return ""}
模块说明:pipelines 在框架中职位主要是负责数据解析、数据持久化操作。
7、cralwer 模块分享,代码如下:
package crawler
import (  "collyx-spider/common"  "collyx-spider/engine"  "collyx-spider/items/http"  "fmt"  "github.com/gocolly/colly"  log "github.com/sirupsen/logrus"  "strings"  "time")
func MakeRequestFromFunc(request *http.FormRequest) {  for true {    select {    case TaskId := <-request.TasksChan:      ctx := colly.NewContext()      ctx.Put("task", TaskId)      request.TaskId = TaskId.(string)      if request.Method == "POST" {        request.GetParamFunc(request)        if strings.Contains(TaskId.(string), ":") {          split := strings.Split(TaskId.(string), ":")          TaskId = split[0]          data := fmt.Sprintf(request.Payload, TaskId)          ctx.Put("data", data)          request.Connect.Request(request.Method, request.Url, strings.NewReader(data), ctx, nil)        }        request.RequestChan <- true      } else {        if strings.Contains(TaskId.(string), "http") {          request.Url = TaskId.(string)        } else {          request.GetParamFunc(request)        }        request.Connect.Request(request.Method, request.Url, nil, ctx, nil)        request.RequestChan <- true      }    default:      time.Sleep(time.Second * 3)      log.Info("TaskChan not has taskId")    }  }}
func MakeRequestFromUrl(request *http.FormRequest) {  for true {    select {    case TaskId := <-request.TasksChan:      ctx := colly.NewContext()      ctx.Put("task", TaskId)      if request.Method == "POST" {        payload := strings.NewReader(fmt.Sprintf(request.Payload, TaskId))        request.Connect.Request(request.Method, request.Url, payload, ctx, nil)      } else {        fmt.Println(fmt.Sprintf(request.Url, TaskId))        request.Connect.Request(request.Method, fmt.Sprintf(request.Url, TaskId), nil, ctx, nil)      }      request.RequestChan <- true    default:      time.Sleep(time.Second * 3)      log.Info("TaskChan not has taskId.......")    }  }}
func RequestFromUrl(request *http.FormRequest) {  if request.GetParamFunc != nil {    MakeRequestFromFunc(request)  } else {    MakeRequestFromUrl(request)  }}
func Crawl(request *http.FormRequest) {  /*making requests*/  engine.CollyConnect(request)  engine.StartRequests(request)  go RequestFromUrl(request)  common.DumpRealTimeInfo(len(request.RequestChan))}
总结:cralwer 模块主要负责 engine 初始化及 engine 信号发送,驱动整个爬虫项目运行。
8、extensions 模块,代码目录截图如下:

爬虫最快框架 collyx,今天开源了...

8.1 AddHeaders.go 源码如下:
package extensions
import (  "fmt"  "github.com/gocolly/colly"  "math/rand")
var UaGens = []func() string{  genFirefoxUA,  genChromeUA,}
var ffVersions = []float32{  58.0,  57.0,  56.0,  52.0,  48.0,  40.0,  35.0,}
var chromeVersions = []string{  "65.0.3325.146",  "64.0.3282.0",  "41.0.2228.0",  "40.0.2214.93",  "37.0.2062.124",}
var osStrings = []string{  "Macintosh; Intel Mac OS X 10_10",  "Windows NT 10.0",  "Windows NT 5.1",  "Windows NT 6.1; WOW64",  "Windows NT 6.1; Win64; x64",  "X11; Linux x86_64",}
func genFirefoxUA() string {  version := ffVersions[rand.Intn(len(ffVersions))]  os := osStrings[rand.Intn(len(osStrings))]  return fmt.Sprintf("Mozilla/5.0 (%s; rv:%.1f) Gecko/20100101 Firefox/%.1f", os, version, version)}
func genChromeUA() string {  version := chromeVersions[rand.Intn(len(chromeVersions))]  os := osStrings[rand.Intn(len(osStrings))]  return fmt.Sprintf("Mozilla/5.0 (%s) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/%s Safari/537.36", os, version)}
func GetHeaders(c *colly.Collector) {  c.OnRequest(func(r *colly.Request) {    r.Headers.Set("User-Agent", UaGens[rand.Intn(len(UaGens))]())  })}
模块说明:负责更换随机 ua,防止 spider 被网站 gank。
8.2 AddProxy.go 源码如下:
package extensions
import (  "collyx-spider/common"  "github.com/gocolly/colly"  "github.com/gocolly/colly/proxy")
func SetProxy(c *colly.Collector, KeepAlive bool) {  proxyList := common.RefreshProxies()  if p, err := proxy.RoundRobinProxySwitcher(    proxyList...,  ); err == nil {    c.SetProxyFunc(p)  }}
模块说明:该模块主要是给 request 设置代理,防止出现请求失败等错误。
8.3 URLLengthFilter.go 源码分享:
package extensions
import "github.com/gocolly/colly"
func URLLengthFilter(c *colly.Collector, URLLengthLimit int) {  c.OnRequest(func(r *colly.Request) {    if len(r.URL.String()) > URLLengthLimit {      r.Abort()    }  })}
模块说明:对于 url 过长请求进行丢弃,Abort 在 OnRequest 回调中取消 HTTP 请求。源码分享环节到这里就结束了,接下来我们运行代码,展示一下 collyx 爬虫框架性能吧!

四、框架 demo 展示

1、启动编辑好的案例代码,运行截图如下:

爬虫最快框架 collyx,今天开源了...

总结:爬虫运行 5 分钟后,在代理足够充足情况下统计,抓取该网站每分钟约产生 2000 条数据,可以毫不吹牛的说,这是我迄今为止见过最快的爬虫框架。


五、心得分享

今天分享到这里就结束了,对于 collyx 框架而言还有很长的路要走。我始终觉得只要努力,我们就会朝着目标一步步去实现。最后,感谢大家耐心阅读本文!

今天分享到这里就结束了,欢迎大家关注下期文章,我们不见不散⛽️
爬虫最快框架 collyx,今天开源了...

End

崔庆才的新书《Python3网络爬虫开发实战(第二版)》已经正式上市了!书中详细介绍了零基础用 Python 开发爬虫的各方面知识,同时相比第一版新增了 JavaScript 逆向、Android 逆向、异步爬虫、深度学习、Kubernetes 相关内容,‍同时本书已经获得 Python 之父 Guido 的推荐,目前本书正在七折促销中!

内容介绍:《Python3网络爬虫开发实战(第二版)》内容介绍

爬虫最快框架 collyx,今天开源了...

扫码购买

爬虫最快框架 collyx,今天开源了...

好文和朋友一起看~

原文始发于微信公众号(进击的Coder):爬虫最快框架 collyx,今天开源了...

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年2月18日00:58:48
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   爬虫最快框架 collyx,今天开源了...https://cn-sec.com/archives/1556365.html

发表评论

匿名网友 填写信息