本文目錄一覽:
如何用python簡單的設計開發異步任務調度隊列
首先,客戶端可以直接扔任務到一個web services的接口上 –》 web api接收到任務後,會根據客戶端的ip和時間戳做task_id,返回給客戶,緊接着在redis裏面標記這任務的狀態。 格式為 func,args,kwargs,timeout=xx,queue_level=xx,interval_time=xx
主服務端:
一個線程,會不停的掃描那個redis hash表,取出任務的interval_time後,進行取模,如果匹配成功,就會塞到 redis sorted set有續集和裏面。
主線程,會不停的看看sorted set裏面,有沒有比自己實現小的任務,有的話,執行並刪除。 這裡的執行是用多進程,為毛用多進程,因為線程很多時候是不好控制強制幹掉的。 每個任務都會用multiprocessing的方式去執行,去調用的時候,會多傳進一個task_id,用來把相關的進度推送到redis裏面。 另外,fork進程後,我會得到一個pid,我會把pid和timeout的信息,存放到kill_hash裏面。 然後會不間斷的查看,在指定的timeout內,這pid還在不在,如果還是存在,沒有退出的話,說明他的任務不太正常,我們就可以在main(),裏面幹掉這些任務。
所謂的優先級就是個 High + middle +Low 的三合一鏈條而已,我每次都會堅持從高到低取任務,如果你的High級別的任務不斷的話,那麼我會一直幹不了低級別的任務了。 代碼的體現是在redis sorted set這邊,設立三個有序集合,我的worker隊列會從high開始做……
那麼如果想幹掉一個任務是如何操作的,首先我需要在 kill_hash 裏面標記任務應該趕緊幹掉,在就是在task_hash裏面把那個task_id幹掉,好讓他不會被持續的加入待執行的隊列裏面。
python多任務之進程隊列queen
python的多進程之間無法用全局變量,需要只用隊列queen進行通訊。
1. 創建。q=multiprocessing.Queen(num),num最大存放多少數據
2.進程使用隊列,需要在創建進程時做為參數傳進去。p=multiprocessing.Process(target=fun_name,args=(q,))
3.隊列使用。隊列是先進先出的,p.put(任何數據類型),放進數據,當隊列滿時會進程會堵塞等待。p.get()取出數據,當隊列中無數據是,進程會堵塞等待。p.full()是否已滿,p.empty()是否空了。
Python實現簡單多線程任務隊列
Python實現簡單多線程任務隊列
最近我在用梯度下降算法繪製神經網絡的數據時,遇到了一些算法性能的問題。梯度下降算法的代碼如下(偽代碼):
defgradient_descent(): # the gradient descent code plotly.write(X, Y)
一般來說,當網絡請求 plot.ly 繪圖時會阻塞等待返回,於是也會影響到其他的梯度下降函數的執行速度。
一種解決辦法是每調用一次 plotly.write 函數就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分佈式任務隊列)一樣大而全的任務隊列框架,因為框架對於我的這點需求來說太重了,並且我的繪圖也並不需要 redis 來持久化數據。
那用什麼辦法解決呢?我在 python 中寫了一個很小的任務隊列,它可以在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。
fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue):
首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。
def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
初始化的時候,我們可以不用考慮工作線程的數量。
defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數量不等的參數,**kwargs 可以傳遞命名參數。
defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
我們為每個 worker 創建一個線程,然後在後台刪除。
下面是 worker 函數的代碼:
defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
worker 函數獲取隊列頂端的任務,並根據輸入參數運行,除此之外,沒有其他的功能。下面是隊列的代碼:
我們可以通過下面的代碼測試:
defblokkah(*args,**kwargs): time.sleep(5) print「Blokkah mofo!」 q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print「Alldone!」
Blokkah 是我們要做的任務名稱。隊列已經緩存在內存中,並且沒有執行很多任務。下面的步驟是把主隊列當做單獨的進程來運行,這樣主程序退出以及執行數據庫持久化時,隊列任務不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務寫成像工作隊列這樣複雜的程序。
defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
修改之後,我的梯度下降算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print”Blokkah mofo!” q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint”All done!” if__name__==”__main__”:tests()
Python 異步任務隊列Celery 使用
在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是「中間人」的意思。在工頭(生產者)提出任務的時候,把所有的任務放到 Broker 裏面,在 Broker 的另外一頭,一群碼農(消費者)等着取出一個個任務準備着手做。這種模式註定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 也是存儲任務的信息用的,只不過這裡存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。
其實現架構如下圖所示:
可以看到,Celery 主要包含以下幾個模塊:
celery可以通過pip自動安裝。
broker 可選擇使用RabbitMQ/redis,backend可選擇使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安裝請參考對應的官方文檔。
——————————rabbitmq相關———————————————————-
官網安裝方法:
啟動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 啟動rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已經啟動,可以打開頁面來看看 地址:
用戶名密碼都是guest 。進入可以看到具體頁面。 關於rabbitmq的配置,網上很多 自己去搜以下就ok了。
——————————rabbitmq相關——————————————————–
項目結構如下:
使用前,需要三個方面:celery配置,celery實例,需執行的任務函數,如下:
Celery 的配置比較多,可以在 官方配置文檔: 查詢每個配置項的含義。
當然,要保證上述異步任務and下述定時任務都能正常執行,就需要先啟動celery worker,啟動命令行如下:
需 啟動beat ,執行定時任務時, Celery會通過celery beat進程來完成。Celery beat會保持運行, 一旦到了某一定時任務需要執行時, Celery beat便將其加入到queue中. 不像worker進程, Celery beat只需要一個即可。而且為了避免有重複的任務被發送出去,所以Celery beat僅能有一個。
命令行啟動:
如果你想將celery worker/beat要放到後台運行,推薦可以扔給supervisor。
supervisor.conf如下:
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/308773.html