原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch6-cloud/ch6-07-crawler.html
互聯(lián)網(wǎng)時代的信息爆炸是很多人倍感頭痛的問題,應接不暇的新聞、信息、視頻,無孔不入地侵占著我們的碎片時間。但另一方面,在我們真正需要數(shù)據(jù)的時候,卻感覺數(shù)據(jù)并不是那么容易獲取的。比如我們想要分析現(xiàn)在人在討論些什么,關(guān)心些什么。甚至有時候,可能我們只是暫時沒有時間去一一閱覽心儀的小說,但又想能用技術(shù)手段把它們存在自己的資料庫里。哪怕是幾個月或一年后再來回顧。再或者我們想要把互聯(lián)網(wǎng)上這些稍縱即逝的有用信息保存起來,例如某個非常小的論壇中聚集的同好們的高質(zhì)量討論,在未來某個時刻,即使這些小眾的聚集區(qū)無以為繼時,依然能讓我們從硬盤中翻出當初珍貴的觀點來。
除去情懷需求,互聯(lián)網(wǎng)上有大量珍貴的開放資料,近年來深度學習如雨后春筍一般火熱起來,但機器學習很多時候并不是苦于我的模型是否建立得合適,我的參數(shù)是否調(diào)整得正確,而是苦于最初的起步階段:沒有數(shù)據(jù)。
作為收集數(shù)據(jù)的前置工作,有能力去寫一個簡單的或者復雜的爬蟲,對于我們來說依然非常重要。
《Go 語言編程》一書給出了簡單的爬蟲示例,經(jīng)過了多年的發(fā)展,現(xiàn)在使用 Go 語言寫一個網(wǎng)站的爬蟲要更加方便,比如用 colly 來實現(xiàn)爬取某網(wǎng)站(虛擬站點,這里用 abcdefg 作為占位符)在 Go 語言標簽下的前十頁內(nèi)容:
package main
import (
"fmt"
"regexp"
"time"
"github.com/gocolly/colly"
)
var visited = map[string]bool{}
func main() {
// Instantiate default collector
c := colly.NewCollector(
colly.AllowedDomains("www.abcdefg.com"),
colly.MaxDepth(1),
)
// 我們認為匹配該模式的是該網(wǎng)站的詳情頁
detailRegex, _ := regexp.Compile(`/go/go\?p=\d+)
// 匹配下面模式的是該網(wǎng)站的列表頁
listRegex, _ := regexp.Compile(`/t/\d+#\w+`)
// 所有 a 標簽,上設置回調(diào)函數(shù)
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
// 已訪問過的詳情頁或列表頁,跳過
if visited[link] && (detailRegex.Match([]byte(link)) || listRegex.Match([]byte(link))) {
return
}
// 既不是列表頁,也不是詳情頁
// 那么不是我們關(guān)心的內(nèi)容,要跳過
if !detailRegex.Match([]byte(link)) && !listRegex.Match([]byte(link)) {
println("not match", link)
return
}
// 因為大多數(shù)網(wǎng)站有反爬蟲策略
// 所以爬蟲邏輯中應該有 sleep 邏輯以避免被封殺
time.Sleep(time.Second)
println("match", link)
visited[link] = true
time.Sleep(time.Millisecond * 2)
c.Visit(e.Request.AbsoluteURL(link))
})
err := c.Visit("https://www.abcdefg.com/go/go")
if err != nil {fmt.Println(err)}
}
想像一下,你們的信息分析系統(tǒng)運行非常之快。獲取信息的速度成為了瓶頸,雖然可以用上 Go 語言所有優(yōu)秀的并發(fā)特性,將單機的 CPU 和網(wǎng)絡帶寬都用滿,但還是希望能夠加快爬蟲的爬取速度。在很多場景下,速度是有意義的:
所以我們需要分布式爬蟲。從本質(zhì)上來講,分布式爬蟲是一套任務分發(fā)和執(zhí)行系統(tǒng)。而常見的任務分發(fā),因為上下游存在速度不匹配問題,必然要借助消息隊列。
圖 6-14 爬蟲工作流程
上游的主要工作是根據(jù)預先配置好的起點來爬取所有的目標 “列表頁”,列表頁的 html 內(nèi)容中會包含有所有詳情頁的鏈接。詳情頁的數(shù)量一般是列表頁的 10 到 100 倍,所以我們將這些詳情頁鏈接作為“任務” 內(nèi)容,通過消息隊列分發(fā)出去。
針對頁面爬取來說,在執(zhí)行時是否偶爾會有重復其實不太重要,因為任務結(jié)果是冪等的(這里我們只爬頁面內(nèi)容,不考慮評論部分)。
本節(jié)我們來簡單實現(xiàn)一個基于消息隊列的爬蟲,本節(jié)我們使用 nats 來做任務分發(fā)。實際開發(fā)中,應該針對自己的業(yè)務對消息本身的可靠性要求和公司的基礎(chǔ)架構(gòu)組件情況進行選型。
nats 是 Go 實現(xiàn)的一個高性能分布式消息隊列,適用于高并發(fā)高吞吐量的消息分發(fā)場景。早期的 nats 以速度為重,沒有支持持久化。從 16 年開始,nats 通過 nats-streaming 支持基于日志的持久化,以及可靠的消息傳輸。為了演示方便,我們本節(jié)中只使用 nats。
nats 的服務端項目是 gnatsd,客戶端與 gnatsd 的通信方式為基于 tcp 的文本協(xié)議,非常簡單:
向 subject 為 task 發(fā)消息:
圖 6-15 nats 協(xié)議中的 pub
以 workers 的 queue 從 tasks subject 訂閱消息:
圖 6-16 nats 協(xié)議中的 sub
其中的 queue 參數(shù)是可選的,如果希望在分布式的消費端進行任務的負載均衡,而不是所有人都收到同樣的消息,那么就要給消費端指定相同的 queue 名字。
生產(chǎn)消息只要指定 subject 即可:
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {return}
// 指定 subject 為 tasks,消息內(nèi)容隨意
err = nc.Publish("tasks", []byte("your task content"))
nc.Flush()
直接使用 nats 的 subscribe API 并不能達到任務分發(fā)的目的,因為 pub sub 本身是廣播性質(zhì)的。所有消費者都會收到完全一樣的所有消息。
除了普通的 subscribe 之外,nats 還提供了 queue subscribe 的功能。只要提供一個 queue group 名字(類似 Kafka 中的 consumer group),即可均衡地將任務分發(fā)給消費者。
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {return}
// queue subscribe 相當于在消費者之間進行任務分發(fā)的分支均衡
// 前提是所有消費者都使用 workers 這個 queue
// nats 中的 queue 概念上類似于 Kafka 中的 consumer group
sub, err := nc.QueueSubscribeSync("tasks", "workers")
if err != nil {return}
var msg *nats.Msg
for {
msg, err = sub.NextMsg(time.Hour * 10000)
if err != nil {break}
// 正確地消費到了消息
// 可用 nats.Msg 對象處理任務
}
我們?yōu)槊恳粋€網(wǎng)站定制一個對應的 collector,并設置相應的規(guī)則,比如 abcdefg,hijklmn(虛構(gòu)的),再用簡單的工廠方法來將該 collector 和其 host 對應起來,每個站點爬到列表頁之后,需要在當前程序中把所有鏈接解析出來,并把落地頁的 URL 發(fā)往消息隊列。
package main
import (
"fmt"
"net/url"
"github.com/gocolly/colly"
)
var domain2Collector = map[string]*colly.Collector{}
var nc *nats.Conn
var maxDepth = 10
var natsURL = "nats://localhost:4222"
func factory(urlStr string) *colly.Collector {
u, _ := url.Parse(urlStr)
return domain2Collector[u.Host]
}
func initABCDECollector() *colly.Collector {
c := colly.NewCollector(
colly.AllowedDomains("www.abcdefg.com"),
colly.MaxDepth(maxDepth),
)
c.OnResponse(func(resp *colly.Response) {
// 做一些爬完之后的善后工作
// 比如頁面已爬完的確認存進 MySQL
})
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
// 基本的反爬蟲策略
link := e.Attr("href")
time.Sleep(time.Second * 2)
// 正則 match 列表頁的話,就 visit
if listRegex.Match([]byte(link)) {
c.Visit(e.Request.AbsoluteURL(link))
}
// 正則 match 落地頁的話,就發(fā)消息隊列
if detailRegex.Match([]byte(link)) {
err = nc.Publish("tasks", []byte(link))
nc.Flush()
}
})
return c
}
func initHIJKLCollector() *colly.Collector {
c := colly.NewCollector(
colly.AllowedDomains("www.hijklmn.com"),
colly.MaxDepth(maxDepth),
)
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
})
return c
}
func init() {
domain2Collector["www.abcdefg.com"] = initABCDECollector()
domain2Collector["www.hijklmn.com"] = initHIJKLCollector()
var err error
nc, err = nats.Connect(natsURL)
if err != nil {os.Exit(1)}
}
func main() {
urls := []string{"https://www.abcdefg.com", "https://www.hijklmn.com"}
for _, url := range urls {
instance := factory(url)
instance.Visit(url)
}
}
消費端就簡單一些了,我們只需要訂閱對應的主題,并直接訪問網(wǎng)站的詳情頁 (落地頁) 即可。
package main
import (
"fmt"
"net/url"
"github.com/gocolly/colly"
)
var domain2Collector = map[string]*colly.Collector{}
var nc *nats.Conn
var maxDepth = 10
var natsURL = "nats://localhost:4222"
func factory(urlStr string) *colly.Collector {
u, _ := url.Parse(urlStr)
return domain2Collector[u.Host]
}
func initV2exCollector() *colly.Collector {
c := colly.NewCollector(
colly.AllowedDomains("www.abcdefg.com"),
colly.MaxDepth(maxDepth),
)
return c
}
func initV2fxCollector() *colly.Collector {
c := colly.NewCollector(
colly.AllowedDomains("www.hijklmn.com"),
colly.MaxDepth(maxDepth),
)
return c
}
func init() {
domain2Collector["www.abcdefg.com"] = initV2exCollector()
domain2Collector["www.hijklmn.com"] = initV2fxCollector()
var err error
nc, err = nats.Connect(natsURL)
if err != nil {os.Exit(1)}
}
func startConsumer() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {return}
sub, err := nc.QueueSubscribeSync("tasks", "workers")
if err != nil {return}
var msg *nats.Msg
for {
msg, err = sub.NextMsg(time.Hour * 10000)
if err != nil {break}
urlStr := string(msg.Data)
ins := factory(urlStr)
// 因為最下游拿到的一定是對應網(wǎng)站的落地頁
// 所以不用進行多余的判斷了,直接爬內(nèi)容即可
ins.Visit(urlStr)
// 防止被封殺
time.Sleep(time.Second)
}
}
func main() {
startConsumer()
}
從代碼層面上來講,這里的生產(chǎn)者和消費者其實本質(zhì)上差不多。如果日后我們要靈活地支持增加、減少各種網(wǎng)站的爬取的話,應該思考如何將這些爬蟲的策略、參數(shù)盡量地配置化。
在本章的分布式配置一節(jié)中已經(jīng)講了一些配置系統(tǒng)的使用,讀者可以自行進行嘗試,這里就不再贅述了。
![]() | ![]() |
更多建議: