在上一篇文章《神器!五分鐘完成大型爬蟲項目!》,我們介紹了一個類似於 Scrapy 的開源爬蟲框架——feapder,並著重介紹了該框架的一種應用——AirSpider,它是一個輕量級的爬蟲。
接下來我們再來介紹另一種爬蟲應用——Spider,它是是一款基於 redis 的分散式爬蟲,適用于海量數據採集,支持斷點續爬、爬蟲報警、數據自動入庫等功能。
安裝
和 AirSpider 一樣,我們也是通過命令行安裝。
由於 Spider 是分散式爬蟲,可能涉及到多個爬蟲,所以最好以項目的方式來創建。
創建項目
我們首先來創建項目:
feapder create -p spider-project
創建的項目目錄是這樣的:

創建好項目後,開發時我們需要將項目設置為工作區間,否則引入非同級目錄下的文件時,編譯器會報錯。
設置工作區間方式(以pycharm為例):項目->右鍵->Mark Directory as -> Sources Root。
創建爬蟲
創建爬蟲的命令行語句為:
feapder create -s <spider_name> <spider_type>
- AirSpider 對應的 spider_type 值為 1
- Spider 對應的 spider_type 值為 2
- BatchSpider 對應的 spider_type 值為 3
默認 spider_type 值為 1。
所以創建 Spider 的語句為:
feapder create -s spider_test 2
運行語句後,我們可以看到在 spiders 目錄下生成了 spider_test.py 文件。

對應的文件內容為:
import feapder
class SpiderTest(feapder.Spider):
# 自定義資料庫,若項目中有setting.py文件,此自定義可刪除
__custom_setting__ = dict(
REDISDB_IP_PORTS="localhost:6379", REDISDB_USER_PASS="", REDISDB_DB=0
)
def start_requests(self):
yield feapder.Request("https://www.baidu.com")
def parse(self, request, response):
print(response)
if __name__ == "__main__":
SpiderTest(redis_key="xxx:xxx").start()
因Spider是基於redis做的分散式,因此模板代碼默認給了redis的配置方式。關於 Redis 的配置信息:
- REDISDB_IP_PORTS:連接地址,若為集群或哨兵模式,多個連接地址用逗號分開,若為哨兵模式,需要加個REDISDB_SERVICE_NAME參數。
- REDISDB_USER_PASS:連接密碼。
- REDISDB_DB:資料庫。
在 main 函數中,我們可以看到有個 redis_key 的參數,這個參數是redis中存儲任務等信息的 key 前綴,如 redis_key=”feapder:spider_test”, 則redis中會生成如下:

特性
我們在 AirSpider 裡面講的方法,在 Spider 這裡都支持,下面我們來看看 Spider 相對於 AirSpider 的不同之處。
數據自動入庫
寫過爬蟲的人都知道,如果要將數據持久化到 MySQL 資料庫,如果碰到欄位特別多的情況,就會很煩人,需要解析之後手寫好多欄位,拼湊 SQL 語句。
這個問題,Spider 幫我們想到了,我們可以利用框架幫我們自動入庫。
建表
第一步,我們需要在資料庫中創建一張數據表,這個大家都會,這裡就不說了。
配置 setting
將 setting.py 裡面的資料庫配置改為自己的配置:
# # MYSQL
MYSQL_IP = ""
MYSQL_PORT =
MYSQL_DB = ""
MYSQL_USER_NAME = ""
MYSQL_USER_PASS = ""
也就是這幾個配置。
生成實體類 Item
接著,我們將命令行切換到我們項目的 items 目錄,運行命令:
feapder create -i <item_name>
我這裡資料庫里使用的是 report 表,所以命令為:
feapder create -i report
然後,我們就可以在 items 目錄下看到生成的 report_item.py 實體類了。我這裡生成的實體類內容是:
from feapder import Item
class ReportItem(Item):
"""
This class was generated by feapder.
command: feapder create -i report.
"""
__table_name__ = "report"
def __init__(self, *args, **kwargs):
self.count = None
self.emRatingName = None # 評級名稱
self.emRatingValue = None # 評級代碼
self.encodeUrl = None # 鏈接
# self.id = None
self.indvInduCode = None # 行業代碼
self.indvInduName = None # 行業名稱
self.lastEmRatingName = None # 上次評級名稱
self.lastEmRatingValue = None # 上次評級代碼
self.orgCode = None # 機構代碼
self.orgName = None # 機構名稱
self.orgSName = None # 機構簡稱
self.predictNextTwoYearEps = None
self.predictNextTwoYearPe = None
self.predictNextYearEps = None
self.predictNextYearPe = None
self.predictThisYearEps = None
self.predictThisYearPe = None
self.publishDate = None # 發表時間
self.ratingChange = None # 評級變動
self.researcher = None # 研究員
self.stockCode = None # 股票代碼
self.stockName = None # 股票簡稱
self.title = None # 報告名稱
若欄位有默認值或者自增,則默認注釋掉,可按需打開。大家可以看到我這張表的 id 欄位在這裡被注釋了。
若item欄位過多,不想逐一賦值,可通過如下方式創建:
feapder create -i report 1
這時候生成的實體類是這樣的:
class ReportItem(Item):
"""
This class was generated by feapder.
command: feapder create -i report 1.
"""
__table_name__ = "report 1"
def __init__(self, *args, **kwargs):
self.count = kwargs.get('count')
self.emRatingName = kwargs.get('emRatingName') # 評級名稱
self.emRatingValue = kwargs.get('emRatingValue') # 評級代碼
self.encodeUrl = kwargs.get('encodeUrl') # 鏈接
# self.id = kwargs.get('id')
self.indvInduCode = kwargs.get('indvInduCode') # 行業代碼
self.indvInduName = kwargs.get('indvInduName') # 行業名稱
self.lastEmRatingName = kwargs.get('lastEmRatingName') # 上次評級名稱
self.lastEmRatingValue = kwargs.get('lastEmRatingValue') # 上次評級代碼
self.orgCode = kwargs.get('orgCode') # 機構代碼
self.orgName = kwargs.get('orgName') # 機構名稱
self.orgSName = kwargs.get('orgSName') # 機構簡稱
self.predictNextTwoYearEps = kwargs.get('predictNextTwoYearEps')
self.predictNextTwoYearPe = kwargs.get('predictNextTwoYearPe')
self.predictNextYearEps = kwargs.get('predictNextYearEps')
self.predictNextYearPe = kwargs.get('predictNextYearPe')
self.predictThisYearEps = kwargs.get('predictThisYearEps')
self.predictThisYearPe = kwargs.get('predictThisYearPe')
self.publishDate = kwargs.get('publishDate') # 發表時間
self.ratingChange = kwargs.get('ratingChange') # 評級變動
self.researcher = kwargs.get('researcher') # 研究員
self.stockCode = kwargs.get('stockCode') # 股票代碼
self.stockName = kwargs.get('stockName') # 股票簡稱
self.title = kwargs.get('title') # 報告名稱
這樣當我們請求回來的json數據時,可直接賦值,如:
response_data = {"title":" 測試"} # 模擬請求回來的數據
item = SpiderDataItem(**response_data)
想要數據自動入庫也比較簡單,在解析完數據之後,將數據賦值給 Item,然後 yield 就行了:
def parse(self, request, response):
html = response.content.decode("utf-8")
if len(html):
content = html.replace('datatable1351846(', '')[:-1]
content_json = json.loads(content)
print(content_json)
for obj in content_json['data']:
result = ReportItem()
result['orgName'] = obj['orgName'] #機構名稱
result['orgSName'] = obj['orgSName'] #機構簡稱
result['publishDate'] = obj['publishDate'] #發布日期
result['predictNextTwoYearEps'] = obj['predictNextTwoYearEps'] #後年每股盈利
result['title'] = obj['title'] #報告名稱
result['stockName'] = obj['stockName'] #股票名稱
result['stockCode'] = obj['stockCode'] #股票code
result['orgCode'] = obj['stockCode'] #機構code
result['predictNextTwoYearPe'] = obj['predictNextTwoYearPe'] #後年市盈率
result['predictNextYearEps'] = obj['predictNextYearEps'] # 明年每股盈利
result['predictNextYearPe'] = obj['predictNextYearPe'] # 明年市盈率
result['predictThisYearEps'] = obj['predictThisYearEps'] #今年每股盈利
result['predictThisYearPe'] = obj['predictThisYearPe'] #今年市盈率
result['indvInduCode'] = obj['indvInduCode'] # 行業代碼
result['indvInduName'] = obj['indvInduName'] # 行業名稱
result['lastEmRatingName'] = obj['lastEmRatingName'] # 上次評級名稱
result['lastEmRatingValue'] = obj['lastEmRatingValue'] # 上次評級代碼
result['emRatingValue'] = obj['emRatingValue'] # 評級代碼
result['emRatingName'] = obj['emRatingName'] # 評級名稱
result['ratingChange'] = obj['ratingChange'] # 評級變動
result['researcher'] = obj['researcher'] # 研究員
result['encodeUrl'] = obj['encodeUrl'] # 鏈接
result['count'] = int(obj['count']) # 近一月個股研報數
yield result
返回item後,item 會流進到框架的 ItemBuffer, ItemBuffer 每.05秒或當item數量積攢到5000個,便會批量將這些 item 批量入庫。表名為類名去掉 Item 的小寫,如 ReportItem 數據會落入到 report 表。
調試
開發過程中,我們可能需要針對某個請求進行調試,常規的做法是修改下發任務的代碼。但這樣並不好,改來改去可能把之前寫好的邏輯搞亂了,或者忘記改回來直接發布了,又或者調試的數據入庫了,污染了庫里已有的數據,造成了很多本來不應該發生的問題。
本框架支持Debug爬蟲,可針對某條任務進行調試,寫法如下:
if __name__ == "__main__":
spider = SpiderTest.to_DebugSpider(
redis_key="feapder:spider_test", request=feapder.Request("http://www.baidu.com")
)
spider.start()
對比之前的啟動方式:
spider = SpiderTest(redis_key="feapder:spider_test")
spider.start()
可以看到,代碼中 to_DebugSpider 方法可以將原爬蟲直接轉為 debug 爬蟲,然後通過傳遞 request 參數抓取指定的任務。
通常結合斷點來進行調試,debug 模式下,運行產生的數據默認不入庫。
除了指定 request 參數外,還可以指定 request_dict 參數,request_dict 接收字典類型,如 request_dict={“url”:”http://www.baidu.com”}, 其作用於傳遞 request 一致。request 與 request_dict 二者選一傳遞即可。
運行多個爬蟲
通常,一個項目下可能存在多個爬蟲,為了規範,建議啟動入口統一放到項目下的 main.py 中,然後以命令行的方式運行指定的文件。
例如如下項目:

項目中包含了兩個spider,main.py寫法如下:
from spiders import *
from feapder import Request
from feapder import ArgumentParser
def test_spider():
spider = test_spider.TestSpider(redis_key="spider:report")
spider.start()
def test_spider2():
spider = test_spider.TestSpider2(redis_key="spider:report")
spider.start()
if __name__ == "__main__":
parser = ArgumentParser(description="Spider測試")
parser.add_argument(
"--test_spider", action="store_true", help="測試Spider", function=test_spider
)
parser.add_argument(
"--test_spider2", action="store_true", help="測試Spider2", function=test_spider2
)
parser.start()
這裡使用了 ArgumentParser 模塊,使其支持命令行參數,如運行 test_spider:
python3 main.py –test_spider
分散式
分散式說白了就是啟動多個進程,處理同一批任務。Spider 支持啟動多份,且不會重複發下任務,我們可以在多個伺服器上部署啟動,也可以在同一個機器上啟動多次。
總結
到這裡, Spider 分散式爬蟲咱就講完了,還有一些細節的東西,大家在用的時候還需要琢磨一下。總體來說,這個框架還是比較好用的,上手簡單,應對一些不是很複雜的場景綽綽有餘,大家可以嘗試著將自己的爬蟲重構一下,試試這款框架
原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/222127.html
微信掃一掃
支付寶掃一掃