本文目錄一覽:
python非同步有哪些方式
yield相當於return,他將相應的值返回給調用next()或者send()的調用者,從而交出了CPU使用權,而當調用者再次調用next()或者send()的時候,又會返回到yield中斷的地方,如果send有參數,還會將參數返回給yield賦值的變數,如果沒有就和next()一樣賦值為None。但是這裡會遇到一個問題,就是嵌套使用generator時外層的generator需要寫大量代碼,看如下示例:
注意以下代碼均在Python3.6上運行調試
#!/usr/bin/env python# encoding:utf-8def inner_generator():
i = 0
while True:
i = yield i if i 10: raise StopIterationdef outer_generator():
print(“do something before yield”)
from_inner = 0
from_outer = 1
g = inner_generator()
g.send(None) while 1: try:
from_inner = g.send(from_outer)
from_outer = yield from_inner except StopIteration: breakdef main():
g = outer_generator()
g.send(None)
i = 0
while 1: try:
i = g.send(i + 1)
print(i) except StopIteration: breakif __name__ == ‘__main__’:
main()1234567891011121314151617181920212223242526272829303132333435363738394041
為了簡化,在Python3.3中引入了yield from
yield from
使用yield from有兩個好處,
1、可以將main中send的參數一直返回給最裡層的generator,
2、同時我們也不需要再使用while循環和send (), next()來進行迭代。
我們可以將上邊的代碼修改如下:
def inner_generator():
i = 0
while True:
i = yield i if i 10: raise StopIterationdef outer_generator():
print(“do something before coroutine start”) yield from inner_generator()def main():
g = outer_generator()
g.send(None)
i = 0
while 1: try:
i = g.send(i + 1)
print(i) except StopIteration: breakif __name__ == ‘__main__’:
main()1234567891011121314151617181920212223242526
執行結果如下:
do something before coroutine start123456789101234567891011
這裡inner_generator()中執行的代碼片段我們實際就可以認為是協程,所以總的來說邏輯圖如下:
接下來我們就看下究竟協程是啥樣子
協程coroutine
協程的概念應該是從進程和線程演變而來的,他們都是獨立的執行一段代碼,但是不同是線程比進程要輕量級,協程比線程還要輕量級。多線程在同一個進程中執行,而協程通常也是在一個線程當中執行。它們的關係圖如下:
我們都知道Python由於GIL(Global Interpreter Lock)原因,其線程效率並不高,並且在*nix系統中,創建線程的開銷並不比進程小,因此在並發操作時,多線程的效率還是受到了很大制約的。所以後來人們發現通過yield來中斷代碼片段的執行,同時交出了cpu的使用權,於是協程的概念產生了。在Python3.4正式引入了協程的概念,代碼示例如下:
import asyncio# Borrowed from countdown(number, n):
while n 0:
print(‘T-minus’, n, ‘({})’.format(number)) yield from asyncio.sleep(1)
n -= 1loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(countdown(“A”, 2)),
asyncio.ensure_future(countdown(“B”, 3))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()12345678910111213141516
示例顯示了在Python3.4引入兩個重要概念協程和事件循環,
通過修飾符@asyncio.coroutine定義了一個協程,而通過event loop來執行tasks中所有的協程任務。之後在Python3.5引入了新的async await語法,從而有了原生協程的概念。
async await
在Python3.5中,引入了ayncawait 語法結構,通過」aync def」可以定義一個協程代碼片段,作用類似於Python3.4中的@asyncio.coroutine修飾符,而await則相當於」yield from」。
先來看一段代碼,這個是我剛開始使用asyncawait語法時,寫的一段小程序。
#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time
async def wait_download(url):
response = await requets.get(url)
print(“get {} response complete.”.format(url))
async def main():
start = time.time()
await asyncio.wait([
wait_download(“”),
wait_download(“”),
wait_download(“”)])
end = time.time()
print(“Complete in {} seconds”.format(end – start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())12345678910111213141516171819202122232425
這裡會收到這樣的報錯:
Task exception was never retrieved
future: Task finished coro=wait_download() done, defined at asynctest.py:9 exception=TypeError(“object Response can’t be used in ‘await’ expression”,)
Traceback (most recent call last):
File “asynctest.py”, line 10, in wait_download
data = await requests.get(url)
TypeError: object Response can’t be used in ‘await’ expression123456
這是由於requests.get()函數返回的Response對象不能用於await表達式,可是如果不能用於await,還怎麼樣來實現非同步呢?
原來Python的await表達式是類似於」yield from」的東西,但是await會去做參數檢查,它要求await表達式中的對象必須是awaitable的,那啥是awaitable呢? awaitable對象必須滿足如下條件中其中之一:
1、A native coroutine object returned from a native coroutine function .
原生協程對象
2、A generator-based coroutine object returned from a function decorated with types.coroutine() .
types.coroutine()修飾的基於生成器的協程對象,注意不是Python3.4中asyncio.coroutine
3、An object with an await method returning an iterator.
實現了await method,並在其中返回了iterator的對象
根據這些條件定義,我們可以修改代碼如下:
#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time
async def download(url): # 通過async def定義的函數是原生的協程對象
response = requests.get(url)
print(response.text)
async def wait_download(url):
await download(url) # 這裡download(url)就是一個原生的協程對象
print(“get {} data complete.”.format(url))
async def main():
start = time.time()
await asyncio.wait([
wait_download(“”),
wait_download(“”),
wait_download(“”)])
end = time.time()
print(“Complete in {} seconds”.format(end – start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())123456789101112131415161718192021222324252627282930
好了現在一個真正的實現了非同步編程的小程序終於誕生了。
而目前更牛逼的非同步是使用uvloop或者pyuv,這兩個最新的Python庫都是libuv實現的,可以提供更加高效的event loop。
uvloop和pyuv
pyuv實現了Python2.x和3.x,但是該項目在github上已經許久沒有更新了,不知道是否還有人在維護。
uvloop只實現了3.x, 但是該項目在github上始終活躍。
它們的使用也非常簡單,以uvloop為例,只需要添加以下代碼就可以了
import asyncioimport uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())123
python里怎麼實現非同步調用
本文實例講述了python使用multiprocessing模塊實現帶回調函數的非同步調用方法。分享給大家供大家參考。具體分析如下:
multipressing模塊是python 2.6版本加入的,通過這個模塊可以輕鬆實現非同步調用
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == ‘__main__’:
pool = Pool(processes=1)
# Start a worker processes.
result = pool.apply_async(f, [10], callback)
# Evaluate “f(10)” asynchronously calling callback when finished.
希望本文所述對大家的Python程序設計有所幫助。
python 非同步是什麼意思
非同步是計算機多線程的非同步處理。與同步處理相對,非同步處理不用阻塞當前線程來等待處理完成,而是允許後續操作,直至其它線程將處理完成,並回調通知此線程。
Python 非同步任務隊列Celery 使用
在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是「中間人」的意思。在工頭(生產者)提出任務的時候,把所有的任務放到 Broker 裡面,在 Broker 的另外一頭,一群碼農(消費者)等著取出一個個任務準備著手做。這種模式註定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 也是存儲任務的信息用的,只不過這裡存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。
其實現架構如下圖所示:
可以看到,Celery 主要包含以下幾個模塊:
celery可以通過pip自動安裝。
broker 可選擇使用RabbitMQ/redis,backend可選擇使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安裝請參考對應的官方文檔。
——————————rabbitmq相關———————————————————-
官網安裝方法:
啟動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 啟動rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已經啟動,可以打開頁面來看看 地址:
用戶名密碼都是guest 。進入可以看到具體頁面。 關於rabbitmq的配置,網上很多 自己去搜以下就ok了。
——————————rabbitmq相關——————————————————–
項目結構如下:
使用前,需要三個方面:celery配置,celery實例,需執行的任務函數,如下:
Celery 的配置比較多,可以在 官方配置文檔: 查詢每個配置項的含義。
當然,要保證上述非同步任務and下述定時任務都能正常執行,就需要先啟動celery worker,啟動命令行如下:
需 啟動beat ,執行定時任務時, Celery會通過celery beat進程來完成。Celery beat會保持運行, 一旦到了某一定時任務需要執行時, Celery beat便將其加入到queue中. 不像worker進程, Celery beat只需要一個即可。而且為了避免有重複的任務被發送出去,所以Celery beat僅能有一個。
命令行啟動:
如果你想將celery worker/beat要放到後台運行,推薦可以扔給supervisor。
supervisor.conf如下:
Python非同步編程全攻略
如果你厭倦了多線程,不妨試試python的非同步編程,再引入async, await關鍵字之後語法變得更加簡潔和直觀,又經過幾年的生態發展,現在是一個很不錯的並發模型。
下面介紹一下python非同步編程的方方面面。
因為GIL的存在,所以Python的多線程在CPU密集的任務下顯得無力,但是對於IO密集的任務,多線程還是足以發揮多線程的優勢的,而非同步也是為了應對IO密集的任務,所以兩者是一個可以相互替代的方案,因為設計的不同,理論上非同步要比多線程快,因為非同步的花銷更少, 因為不需要額外系統申請額外的內存,而線程的創建跟系統有關,需要分配一定量的內存,一般是幾兆,比如linux默認是8MB。
雖然非同步很好,比如可以使用更少的內存,比如更好地控制並發(也許你並不這麼認為:))。但是由於async/await 語法的存在導致與之前的語法有些割裂,所以需要適配,需要付出額外的努力,再者就是生態遠遠沒有同步編程強大,比如很多庫還不支持非同步,所以你需要一些額外的適配。
為了不給其他網站帶來困擾,這裡首先在自己電腦啟動web服務用於測試,代碼很簡單。
本文所有依賴如下:
所有依賴可通過代碼倉庫的requirements.txt一次性安裝。
首先看一個錯誤的例子
輸出如下:
發現花費了3秒,不符合預期呀。。。。這是因為雖然用了協程,但是每個協程是串列的運行,也就是說後一個等前一個完成之後才開始,那麼這樣的非同步代碼並沒有並發,所以我們需要讓這些協程並行起來
為了讓代碼變動的不是太多,所以這裡用了一個笨辦法來等待所有任務完成, 之所以在main函數中等待是為了不讓ClientSession關閉, 如果你移除了main函數中的等待代碼會發現報告異常 RuntimeError: Session is closed ,而代碼里的解決方案非常的不優雅,需要手動的等待,為了解決這個問題,我們再次改進代碼。
這裡解決的方式是通過 asyncio.wait 方法等待一個協程列表,默認是等待所有協程結束後返回,會返回一個完成(done)列表,以及一個待辦(pending)列表。
如果我們不想要協程對象而是結果,那麼我們可以使用 asyncio.gather
結果輸出如下:
通過 asyncio.ensure_future 我們就能創建一個協程,跟調用一個函數差別不大,為了等待所有任務完成之後退出,我們需要使用 asyncio.wait 等方法來等待,如果只想要協程輸出的結果,我們可以使用 asyncio.gather 來獲取結果。
雖然前面能夠隨心所欲的創建協程,但是就像多線程一樣,我們也需要處理協程之間的同步問題,為了保持語法及使用情況的一致,多線程中用到的同步功能,asyncio中基本也能找到, 並且用法基本一致,不一致的地方主要是需要用非同步的關鍵字,比如 async with/ await 等
通過鎖讓並發慢下來,讓協程一個一個的運行。
輸出如下:
通過觀察很容易發現,並發的速度因為鎖而慢下來了,因為每次只有一個協程能獲得鎖,所以並發變成了串列。
通過事件來通知特定的協程開始工作,假設有一個任務是根據http響應結果選擇是否激活。
輸出如下:
可以看到事件(Event)等待者都是在得到響應內容之後輸出,並且事件(Event)可以是多個協程同時等待。
上面的事件雖然很棒,能夠在不同的協程之間同步狀態,並且也能夠一次性同步所有的等待協程,但是還不夠精細化,比如想通知指定數量的等待協程,這個時候Event就無能為力了,所以同步原語中出現了Condition。
輸出如下:
可以看到,前面兩個等待的協程是在同一時刻完成,而不是全部等待完成。
通過創建協程的數量來控制並發並不是非常優雅的方式,所以可以通過信號量的方式來控制並發。
輸出如下:
可以發現,雖然同時創建了三個協程,但是同一時刻只有兩個協程工作,而另外一個協程需要等待一個協程讓出信號量才能運行。
無論是協程還是線程,任務之間的狀態同步還是很重要的,所以有了應對各種同步機制的同步原語,因為要保證一個資源同一個時刻只能一個任務訪問,所以引入了鎖,又因為需要一個任務等待另一個任務,或者多個任務等待某個任務,因此引入了事件(Event),但是為了更精細的控制通知的程度,所以又引入了條件(Condition), 通過條件可以控制一次通知多少的任務。
有時候的並發需求是通過一個變數控制並發任務的並發數而不是通過創建協程的數量來控制並發,所以引入了信號量(Semaphore),這樣就可以在創建的協程數遠遠大於並發數的情況下讓協程在指定的並發量情況下並發。
不得不承認非同步編程相比起同步編程的生態要小的很多,所以不可能完全非同步編程,因此需要一種方式兼容。
多線程是為了兼容同步得代碼。
多進程是為了利用CPU多核的能力。
輸出如下:
可以看到總耗時1秒,說明所有的線程跟進程是同時運行的。
下面是本人使用過的一些非同步庫,僅供參考
web框架
http客戶端
資料庫
ORM
雖然非同步庫發展得還算不錯,但是中肯的說並沒有覆蓋方方面面。
雖然我鼓勵大家嘗試非同步編程,但是本文的最後卻是讓大家謹慎的選擇開發環境,如果你覺得本文的並發,同步,兼容多線程,多進程不值得一提,那麼我十分推薦你嘗試以非同步編程的方式開始一個新的項目,如果你對其中一些還有疑問或者你確定了要使用的依賴庫並且大多數是沒有非同步庫替代的,那麼我還是建議你直接按照自己擅長的同步編程開始。
非同步編程雖然很不錯,不過,也許你並不需要。
原創文章,作者:TGQAE,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/329107.html