python任務隊列框架,python隊列的基本操作

本文目錄一覽:

Python實現簡單多線程任務隊列

Python實現簡單多線程任務隊列

最近我在用梯度下降演算法繪製神經網路的數據時,遇到了一些演算法性能的問題。梯度下降演算法的代碼如下(偽代碼):

defgradient_descent(): # the gradient descent code plotly.write(X, Y)

一般來說,當網路請求 plot.ly 繪圖時會阻塞等待返回,於是也會影響到其他的梯度下降函數的執行速度。

一種解決辦法是每調用一次 plotly.write 函數就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分散式任務隊列)一樣大而全的任務隊列框架,因為框架對於我的這點需求來說太重了,並且我的繪圖也並不需要 redis 來持久化數據。

那用什麼辦法解決呢?我在 python 中寫了一個很小的任務隊列,它可以在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。

fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue):

首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。

def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()

初始化的時候,我們可以不用考慮工作線程的數量。

defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))

我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數量不等的參數,**kwargs 可以傳遞命名參數。

defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()

我們為每個 worker 創建一個線程,然後在後台刪除。

下面是 worker 函數的代碼:

defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()

worker 函數獲取隊列頂端的任務,並根據輸入參數運行,除此之外,沒有其他的功能。下面是隊列的代碼:

我們可以通過下面的代碼測試:

defblokkah(*args,**kwargs): time.sleep(5) print「Blokkah mofo!」 q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print「Alldone!」

Blokkah 是我們要做的任務名稱。隊列已經緩存在內存中,並且沒有執行很多任務。下面的步驟是把主隊列當做單獨的進程來運行,這樣主程序退出以及執行資料庫持久化時,隊列任務不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務寫成像工作隊列這樣複雜的程序。

defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)

修改之後,我的梯度下降演算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print”Blokkah mofo!” q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint”All done!” if__name__==”__main__”:tests()

Python數據結構-隊列與廣度優先搜索(Queue)

隊列(Queue) :簡稱為隊,一種線性表數據結構,是一種只允許在表的一端進行插入操作,而在表的另一端進行刪除操作的線性表。

我們把隊列中允許插入的一端稱為 「隊尾(rear)」 ;把允許刪除的另一端稱為 「隊頭(front)」 。當表中沒有任何數據元素時,稱之為 「空隊」 。

廣度優先搜索演算法(Breadth First Search) :簡稱為 BFS,又譯作寬度優先搜索 / 橫向優先搜索。是一種用於遍歷或搜索樹或圖的演算法。該演算法從根節點開始,沿著樹的寬度遍歷樹或圖的節點。如果所有節點均被訪問,則演算法中止。

廣度優先遍歷 類似於樹的層次遍歷過程 。呈現出一層一層向外擴張的特點。先看到的節點先訪問,後看到的節點後訪問。遍歷到的節點順序符合「先進先出」的特點,所以廣度優先搜索可以通過「隊列」來實現。

力扣933

遊戲時,隊首始終是持有土豆的人

模擬遊戲開始,隊首的人出隊,之後再到隊尾(類似於循環隊列)

傳遞了num次之後,將隊首的人移除

如此反覆,直到隊列中剩餘一人

多人共用一台印表機,採取「先到先服務」的隊列策略來執行列印任務

需要解決的問題:1 列印系統的容量是多少?2 在能夠接受的等待時間內,系統可容納多少用戶以多高的頻率提交列印任務?

輸入:abba

輸出:False

思路:1 先將需要判定的詞從隊尾加入 deque; 2從兩端同時移除字元並判斷是否相同,直到deque中剩餘0個(偶數)或1個字元(奇數)

內容參考:

Python 非同步任務隊列Celery 使用

在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是「中間人」的意思。在工頭(生產者)提出任務的時候,把所有的任務放到 Broker 裡面,在 Broker 的另外一頭,一群碼農(消費者)等著取出一個個任務準備著手做。這種模式註定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 也是存儲任務的信息用的,只不過這裡存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。

其實現架構如下圖所示:

可以看到,Celery 主要包含以下幾個模塊:

celery可以通過pip自動安裝。

broker 可選擇使用RabbitMQ/redis,backend可選擇使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安裝請參考對應的官方文檔。

——————————rabbitmq相關———————————————————-

官網安裝方法:

啟動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 啟動rabbitmq:sbin/rabbitmq-server -detached

rabbitmq已經啟動,可以打開頁面來看看 地址:

用戶名密碼都是guest 。進入可以看到具體頁面。 關於rabbitmq的配置,網上很多 自己去搜以下就ok了。

——————————rabbitmq相關——————————————————–

項目結構如下:

使用前,需要三個方面:celery配置,celery實例,需執行的任務函數,如下:

Celery 的配置比較多,可以在 官方配置文檔: 查詢每個配置項的含義。

當然,要保證上述非同步任務and下述定時任務都能正常執行,就需要先啟動celery worker,啟動命令行如下:

需 啟動beat ,執行定時任務時, Celery會通過celery beat進程來完成。Celery beat會保持運行, 一旦到了某一定時任務需要執行時, Celery beat便將其加入到queue中. 不像worker進程, Celery beat只需要一個即可。而且為了避免有重複的任務被發送出去,所以Celery beat僅能有一個。

命令行啟動:

如果你想將celery worker/beat要放到後台運行,推薦可以扔給supervisor。

supervisor.conf如下:

python爬蟲用什麼框架

python爬蟲框架概述

爬蟲框架中比較好用的是 Scrapy 和PySpider。pyspider上手更簡單,操作更加簡便,因為它增加了 WEB 界面,寫爬蟲迅速,集成了phantomjs,可以用來抓取js渲染的頁面。Scrapy自定義程度高,比 PySpider更底層一些,適合學習研究,需要學習的相關知識多,不過自己拿來研究分散式和多線程等等是非常合適的。

PySpider

PySpider是binux做的一個爬蟲架構的開源化實現。主要的功能需求是:

抓取、更新調度多站點的特定的頁面

需要對頁面進行結構化信息提取

靈活可擴展,穩定可監控

pyspider的設計基礎是:以python腳本驅動的抓取環模型爬蟲

通過python腳本進行結構化信息的提取,follow鏈接調度抓取控制,實現最大的靈活性

通過web化的腳本編寫、調試環境。web展現調度狀態

抓取環模型成熟穩定,模塊間相互獨立,通過消息隊列連接,從單進程到多機分散式靈活拓展

pyspider的架構主要分為 scheduler(調度器), fetcher(抓取器), processor(腳本執行):

各個組件間使用消息隊列連接,除了scheduler是單點的,fetcher 和 processor 都是可以多實例分散式部署的。 scheduler 負責整體的調度控制

任務由 scheduler 發起調度,fetcher 抓取網頁內容, processor 執行預先編寫的python腳本,輸出結果或產生新的提鏈任務(發往 scheduler),形成閉環。

每個腳本可以靈活使用各種python庫對頁面進行解析,使用框架API控制下一步抓取動作,通過設置回調控制解析動作。

Scrapy

Scrapy是一個為了爬取網站數據,提取結構性數據而編寫的應用框架。 可以應用在包括數據挖掘,信息處理或存儲歷史數據等一系列的程序中。

其最初是為了頁面抓取 (更確切來說, 網路抓取 )所設計的, 也可以應用在獲取API所返回的數據(例如 Amazon Associates Web Services ) 或者通用的網路爬蟲。Scrapy用途廣泛,可以用於數據挖掘、監測和自動化測試

Scrapy主要包括了以下組件:

引擎(Scrapy): 用來處理整個系統的數據流處理, 觸發事務(框架核心)

調度器(Scheduler): 用來接受引擎發過來的請求, 壓入隊列中, 並在引擎再次請求的時候返回. 可以想像成一個URL(抓取網頁的網址或者說是鏈接)的優先隊列, 由它來決定下一個要抓取的網址是什麼, 同時去除重複的網址

下載器(Downloader): 用於下載網頁內容, 並將網頁內容返回給蜘蛛(Scrapy下載器是建立在twisted這個高效的非同步模型上的)

爬蟲(Spiders): 爬蟲是主要幹活的, 用於從特定的網頁中提取自己需要的信息, 即所謂的實體(Item)。用戶也可以從中提取出鏈接,讓Scrapy繼續抓取下一個頁面

項目管道(Pipeline): 負責處理爬蟲從網頁中抽取的實體,主要的功能是持久化實體、驗證實體的有效性、清除不需要的信息。當頁面被爬蟲解析後,將被發送到項目管道,並經過幾個特定的次序處理數據。

下載器中間件(Downloader Middlewares): 位於Scrapy引擎和下載器之間的框架,主要是處理Scrapy引擎與下載器之間的請求及響應。

爬蟲中間件(Spider Middlewares): 介於Scrapy引擎和爬蟲之間的框架,主要工作是處理蜘蛛的響應輸入和請求輸出。

調度中間件(Scheduler Middewares): 介於Scrapy引擎和調度之間的中間件,從Scrapy引擎發送到調度的請求和響應。

Scrapy運行流程大概如下:

首先,引擎從調度器中取出一個鏈接(URL)用於接下來的抓取

引擎把URL封裝成一個請求(Request)傳給下載器,下載器把資源下載下來,並封裝成應答包(Response)

然後,爬蟲解析Response

若是解析出實體(Item),則交給實體管道進行進一步的處理。

若是解析出的是鏈接(URL),則把URL交給Scheduler等待抓取

python多任務之進程隊列queen

python的多進程之間無法用全局變數,需要只用隊列queen進行通訊。

1. 創建。q=multiprocessing.Queen(num),num最大存放多少數據

2.進程使用隊列,需要在創建進程時做為參數傳進去。p=multiprocessing.Process(target=fun_name,args=(q,))

3.隊列使用。隊列是先進先出的,p.put(任何數據類型),放進數據,當隊列滿時會進程會堵塞等待。p.get()取出數據,當隊列中無數據是,進程會堵塞等待。p.full()是否已滿,p.empty()是否空了。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/206939.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-08 14:18
下一篇 2024-12-08 14:18

相關推薦

  • Python棧操作用法介紹

    如果你是一位Python開發工程師,那麼你必須掌握Python中的棧操作。在Python中,棧是一個容器,提供後進先出(LIFO)的原則。這篇文章將通過多個方面詳細地闡述Pytho…

    編程 2025-04-29
  • Ojlat:一款快速開發Web應用程序的框架

    Ojlat是一款用於快速開發Web應用程序的框架。它的主要特點是高效、易用、可擴展且功能齊全。通過Ojlat,開發人員可以輕鬆地構建出高質量的Web應用程序。本文將從多個方面對Oj…

    編程 2025-04-29
  • Zlios——一個多功能的開發框架

    你是否在開發過程中常常遇到同樣的問題,需要不斷去尋找解決方案?你是否想要一個多功能、易於使用的開發框架來解決這些問題?那麼,Zlios就是你需要的框架。 一、簡介 Zlios是一個…

    編程 2025-04-29
  • Java任務下發回滾系統的設計與實現

    本文將介紹一個Java任務下發回滾系統的設計與實現。該系統可以用於執行複雜的任務,包括可回滾的任務,及時恢復任務失敗前的狀態。系統使用Java語言進行開發,可以支持多種類型的任務。…

    編程 2025-04-29
  • agavi開發框架

    Agavi是一個基於MVC模式的Web應用程序開發框架,以REST和面向資源的設計為核心思想。本文章將從Agavi的概念、優點、使用方法和實例等方面進行詳細介紹。 一、概念 Aga…

    編程 2025-04-29
  • Python中的隊列定義

    本篇文章旨在深入闡述Python中隊列的定義及其應用,包括隊列的定義、隊列的類型、隊列的操作以及隊列的應用。同時,我們也會為您提供Python代碼示例。 一、隊列的定義 隊列是一種…

    編程 2025-04-29
  • Python操作數組

    本文將從多個方面詳細介紹如何使用Python操作5個數組成的列表。 一、數組的定義 數組是一種用於存儲相同類型數據的數據結構。Python中的數組是通過列表來實現的,列表中可以存放…

    編程 2025-04-29
  • Python unittest框架用法介紹

    Python unittest框架是Python自帶的一種測試框架,可以用來編寫並運行測試用例。在本文中,我們將從以下幾個方面詳細介紹Python unittest框架的使用方法和…

    編程 2025-04-29
  • com.alipay.sofa.bolt框架

    com.alipay.sofa.bolt框架是一款高性能、輕量級、可擴展的RPC框架。其廣泛被應用於阿里集團內部服務以及阿里雲上的服務。該框架通過NIO支持高並發,同時還內置了多種…

    編程 2025-04-29
  • RabbitMQ和Yii2的消息隊列應用

    本文將探討RabbitMQ和Yii2之間的消息隊列應用。從概念、安裝和配置、使用實例等多個方面詳細講解,幫助讀者了解和掌握RabbitMQ和Yii2的消息隊列應用。 一、Rabbi…

    編程 2025-04-29

發表回復

登錄後才能評論