python併發2之使用asyncio處理併發

python併發2之使用asyncio處理併發
1 Star2 Stars3 Stars4 Stars5 Stars 給文章打分!
Loading...

asyncio

在Python 2的時代,高效能的網路程式設計主要是使用Twisted、Tornado和Gevent這三個庫,但是它們的非同步程式碼相互之間既不相容也不能移植。如上一節說的,Gvanrossum希望在Python 3 實現一個原生的基於生成器的協程庫,其中直接內建了對非同步IO的支援,這就是asyncio,它在Python 3.4被引入到標準庫。

asyncio 這個包使用事件迴圈驅動的協程實現併發。

asyncio 包在引入標準庫之前代號 “Tulip”(鬱金香),所以在網上搜尋資料時,會經常看到這種花的名字。

什麼是事件迴圈?

wiki 上說:事件迴圈是”一種等待程式分配事件或者訊息的程式設計架構“。基本上來說事件迴圈就是:”當A發生時,執行B”。或者用最簡單的例子來解釋這一概念就是每個瀏覽器中都存在的JavaScript事件迴圈。當你點選了某個東西(“當A發生時”),這一點選動作會傳送給JavaScript的事件迴圈,並檢查是否存在註冊過的onclick 回撥來處理這一點選(執行B)。只要有註冊過的回撥函式就會伴隨點選動作的細節資訊被執行。事件迴圈被認為是一種虛幻是因為它不停的手機事件並通過迴圈來發如何應對這些事件。

對 Python 來說,用來提供事件迴圈的 asyncio 被加入標準庫中。asyncio 重點解決網路服務中的問題,事件迴圈在這裡將來自套接字(socket)的 I/O 已經準備好讀和/或寫作為“當A發生時”(通過selectors模組)。除了 GUI 和 I/O,事件迴圈也經常用於在別的執行緒或子程序中執行程式碼,並將事件迴圈作為調節機制(例如,合作式多工)。如果你恰好理解 Python 的 GIL,事件迴圈對於需要釋放 GIL 的地方很有用。

執行緒與協程

我們先看兩斷程式碼,分別用 threading 模組和asyncio 包實現的一段程式碼。


# sinner_thread.py
import threading
import itertools
import time
import sys
class Signal: # 這個類定義一個可變物件,用於從外部控制執行緒
go = True
def spin(msg, signal): # 這個函式會在單獨的執行緒中執行,signal 引數是前邊定義的Signal類的例項
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'): # itertools.cycle 函式從指定的序列中反覆不斷地生成元素
status = char   ' '   msg
write(status)
flush()
write('\x08' * len(status)) # 使用退格符把游標移回行首
time.sleep(.1) # 每 0.1 秒重新整理一次
if not signal.go: # 如果 go屬性不是 True,退出迴圈
break
write(' ' * len(status)   '\x08' * len(status)) # 使用空格清除狀態訊息,把游標移回開頭
def slow_function(): # 模擬耗時操作
# 假裝等待I/O一段時間
time.sleep(3) # 呼叫sleep 會阻塞主執行緒,這麼做事為了釋放GIL,建立從屬執行緒
return 42
def supervisor(): # 這個函式設定從屬執行緒,顯示執行緒物件,執行耗時計算,最後殺死程序
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner) # 顯示執行緒物件 輸出 spinner object: <Thread(Thread-1, initial)>
spinner.start() # 啟動從屬程序
result = slow_function() # 執行slow_function 行數,阻塞主執行緒。同時叢書執行緒以動畫形式旋轉指標
signal.go = False
spinner.join() # 等待spinner 執行緒結束
return result
def main():
result = supervisor() 
print('Answer', result)
if __name__ == '__main__':
main()

執行一下,結果大致是這個樣子:

這是一個動圖,“thinking” 前的 線是會動的(為了錄屏,我把sleep 的時間調大了)

python 並沒有提供終止執行緒的API,所以若想關閉執行緒,必須給執行緒傳送訊息。這裡我們使用signal.go 屬性:在主執行緒中把它設定為False後,spinner 執行緒會接收到,然後退出

現在我們再看下使用 asyncio 包的版本:


# spinner_asyncio.py
# 通過協程以動畫的形式顯示文字式旋轉指標
import asyncio
import itertools
import sys
@asyncio.coroutine # 打算交給asyncio 處理的協程要使用 @asyncio.coroutine 裝飾
def spin(msg):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'): # itertools.cycle 函式從指定的序列中反覆不斷地生成元素
status = char   ' '   msg
write(status)
flush()
write('\x08' * len(status)) # 使用退格符把游標移回行首
try:
yield from asyncio.sleep(0.1) # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 這樣的休眠不會阻塞事件迴圈
except asyncio.CancelledError: # 如果 spin 函式甦醒後丟擲 asyncio.CancelledError 異常,其原因是發出了取消請求
break
write(' ' * len(status)   '\x08' * len(status)) # 使用空格清除狀態訊息,把游標移回開頭
@asyncio.coroutine
def slow_function(): # 5 現在此函式是協程,使用休眠假裝進行I/O 操作時,使用 yield from 繼續執行事件迴圈
# 假裝等待I/O一段時間
yield from asyncio.sleep(3) # 此表示式把控制權交給主迴圈,在休眠結束後回覆這個協程
return 42
@asyncio.coroutine
def supervisor(): #這個函式也是協程,因此可以使用 yield from 驅動 slow_function
spinner = asyncio.async(spin('thinking!')) # asyncio.async() 函式排定協程的執行時間,使用一個 Task 物件包裝spin 協程,並立即返回
print('spinner object:', spinner) # Task 物件,輸出類似 spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
# 驅動slow_function() 函式,結束後,獲取返回值。同事事件迴圈繼續執行,
# 因為slow_function 函式最後使用yield from asyncio.sleep(3) 表示式把控制權交給主迴圈
result = yield from slow_function()
# Task 物件可以取消;取消後會在協程當前暫停的yield處丟擲 asyncio.CancelledError 異常
# 協程可以捕獲這個異常,也可以延遲取消,甚至拒絕取消
spinner.cancel()
return result
def main():
loop = asyncio.get_event_loop() # 獲取事件迴圈引用
# 驅動supervisor 協程,讓它執行完畢;這個協程的返回值是這次呼叫的返回值
result = loop.run_until_complete(supervisor())
loop.close()
print('Answer', result)
if __name__ == '__main__':
main()

除非想阻塞主執行緒,從而凍結事件迴圈或整個應用,否則不要再 asyncio 協程中使用 time.sleep().

如果協程需要在一段時間內什麼都不做,應該使用 yield from asyncio.sleep(DELAY)

使用 @asyncio.coroutine 裝飾器不是強制要求,但建議這麼做因為這樣能在程式碼中突顯協程,如果還沒從中產出值,協程就把垃圾回收了(意味著操作未完成,可能有缺陷),可以發出警告。這個裝飾器不會預激協程。

這兩段程式碼的執行結果基本相同,現在我們看一下兩段程式碼的核心程式碼 supervisor 主要區別:

asyncio.Task 物件差不多與 threading.Thread 物件等效(Task 物件像是實現寫作時多工的庫中的綠色執行緒
Task 物件用於驅動協程,Thread 物件用於呼叫可呼叫的物件
Task 物件不由自己動手例項化,而是通過把協程傳給 asyncio.async(…) 函式或 loop.create_task(…) 方法獲取
獲取的Task 物件已經排定了執行時間;Thread 例項必須呼叫start方法,明確告知它執行
線上程版supervisor函式中,slow_function 是普通的函式,由執行緒直接呼叫,而非同步版的slow_function 函式是協程,由yield from 驅動。
沒有API能從外部終止執行緒,因為執行緒隨時可能被中斷。而如果想終止任務,可以使用Task.cancel() 例項方法,在協程內部丟擲CancelledError 異常。協程可以在暫停的yield 處捕獲這個異常,處理終止請求
supervisor 協程必須在main 函式中由loop.run_until_complete 方法執行。

協程和執行緒相比關鍵的一個優點是,執行緒必須記住保留鎖,去保護程式中的重要部分,防止多步操作再執行的過程中中斷,防止山水處於於曉狀態協程預設會做好保護,我們必須顯式產出(使用yield 或 yield from 交出控制權)才能讓程式的餘下部分執行。

asyncio.Future:故意不阻塞

asynci.Future 類與 concurrent.futures.Future 類的介面基本一致,不過實現方式不同,不可互換。

上一篇[python併發 1:使用 futures 處理併發]()我們介紹過 concurrent.futures.Future 的 future,在 concurrent.futures.Future 中,future只是排程執行某物的結果。在 asyncio 包中,BaseEventLoop.create_task(…) 方法接收一個協程,排定它的執行時間,然後返回一個asyncio.Task 例項(也是asyncio.Future 類的例項,因為 Task 是 Future 的子類,用於包裝協程。(在 concurrent.futures.Future 中,類似的操作是Executor.submit(…))。

與concurrent.futures.Future 類似,asyncio.Future 類也提供了

.done() 返回布林值,表示Future 是否已經執行
.add_done_callback() 這個方法只有一個引數,型別是可呼叫物件,Future執行結束後會回撥這個物件。
.result() 這個方法沒有引數,因此不能指定超時時間。 如果呼叫 .result() 方法時期還沒有執行完畢,會丟擲asyncio.InvalidStateError 異常。

對應的 concurrent.futures.Future 類中的 Future 執行結束後呼叫result(), 會返回可呼叫物件的結果或者丟擲執行可呼叫物件時丟擲的異常,如果是 Future 沒有執行結束時呼叫 f.result()方法,這時會阻塞呼叫方所在的執行緒,直到有結果返回。此時result 方法還可以接收 timeout 引數,如果在指定的時間內 Future 沒有執行完畢,會丟擲 TimeoutError 異常。

我們使用asyncio.Future 時, 通常使用yield from,從中獲取結果,而不是使用 result()方法 yield from 表示式在暫停的協程中生成返回值,回覆執行過程。

asyncio.Future 類的目的是與 yield from 一起使用,所以通常不需要使用以下方法:

不需呼叫 my_future.add_down_callback(…), 因為可以直接把想在 future 執行結束後的操作放在協程中 yield from my_future 表示式的後邊。(因為協程可以暫停和恢複函式)
無需呼叫 my_future.result(), 因為 yield from 產生的結果就是(result = yield from my_future)

在 asyncio 包中,可以使用yield from 從asyncio.Future 物件中產出結果。這也就意味著我們可以這麼寫:


res = yield from foo() # foo 可以是協程函式,也可以是返回 Future 或 task 例項的普通函式

asyncio.async(…)* 函式


asyncio.async(coro_or_future, *, loop=None)

這個函式統一了協程和Future: 第一個引數可以是二者中的任意一個。如果是Future 或者 Task 物件,就直接返回,如果是協程,那麼async 函式會自動呼叫 loop.create_task(…) 方法建立 Task 物件。 loop 引數是可選的,用於傳入事件迴圈; 如果沒有傳入,那麼async函式會通過呼叫asyncio.get_event_loop() 函式獲取迴圈物件。

BaseEventLoop.create_task(coro)

這個方法排定協程的執行時間,返回一個 asyncio.Task 物件。如果在自定義的BaseEventLoop 子類上呼叫,返回的物件可能是外部庫中與Task類相容的某個類的例項。

BaseEventLoop.create_task() 方法只在Python3.4.2 及以上版本可用。 Python3.3 只能使用 asyncio.async(…)函式。
如果想在Python控制檯或者小型測試指令碼中實驗future和協程,可以使用下面的片段:


import asyncio
def run_sync(coro_or_future):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro_or_future)
a = run_sync(some_coroutine())

使用asyncio 和 aiohttp 包下載

現在,我們瞭解了asyncio 的基礎知識,是時候使用asyncio 來重寫我們 上一篇 [python併發 1:使用 futures 處理併發]() 下載國旗的指令碼了。

先看一下程式碼:


import asyncio
import aiohttp # 需要pip install aiohttp
from flags import save_flag, show, main, BASE_URL
@asyncio.coroutine # 我們知道,協程應該使用 asyncio.coroutine 裝飾
def get_flag(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
# 阻塞的操作通過協程實現,客戶程式碼通過yield from 把指責委託給協程,以便非同步操作
resp = yield from aiohttp.request('GET', url) 
# 讀取也是非同步操作
image = yield from resp.read()
return image
@asyncio.coroutine
def download_one(cc): # 這個函式也必須是協程,因為用到了yield from
image = yield from get_flag(cc) 
show(cc)
save_flag(image, cc.lower()   '.gif')
return cc
def download_many(cc_list):
loop = asyncio.get_event_loop() # 獲取事件序號底層實現的引用
to_do = [download_one(cc) for cc in sorted(cc_list)] # 呼叫download_one 獲取各個國旗,構建一個生成器物件列表
# 雖然函式名稱是wait 但它不是阻塞型函式,wait 是一個協程,等傳給他的所有協程執行完畢後結束
wait_coro = asyncio.wait(to_do)
res, _ = loop.run_until_complete(wait_coro) # 執行事件迴圈,知道wait_coro 執行結束;事件迴圈執行的過程中,這個指令碼會在這裡阻塞。
loop.close() # 關閉事件迴圈
return len(res)
if __name__ == '__main__':
main(download_many)

這段程式碼的執行簡述如下:

在download_many 函式獲取一個事件迴圈,處理呼叫download_one 函式生成的幾個協程物件
asyncio 事件迴圈一次啟用各個協程
客戶程式碼中的協程(get_flag)使用 yield from 把指責委託給庫裡的協程(aiohttp.request)時,控制權交還給事件迴圈,執行之前排定的協程
事件迴圈通過基於回撥的底層API,在阻塞的操作執行完畢後獲得通知。
獲得通知後,主迴圈把結果發給暫停的協程
協程向前執行到下一個yield from 表示式,例如 get_flag 函式的yield from resp.read()。事件迴圈再次得到控制權,重複第4~6步,直到迴圈終止。

download_many 函式中,我們使用了 asyncio.wait(…) 函式,這個函式是一個協程,協程的引數是一個由future或者協程構成的可迭代物件;wait 會分別把各個協程包裝進一個Task物件。最終的結果是,wait 處理的所有物件都通過某種方式變成Future 類的例項。

wait 是協程函式,因此,返回的是一個協程或者生成器物件;waite_coro 變數中儲存的就是這種物件

loop.run_until_complete 方法的引數是一個future 或協程。如果是協程,run_until_complete 方法與 wait 函式一樣,把協程包裝進一個Task 物件中。這裡 run_until_complete 方法把 wait_coro 包裝進一個Task 物件中,由yield from 驅動。wait_coro 執行結束後返回兩個引數,第一個引數是結束的future 第二個引數是未結束的future。

<section class=”caption”>wait</section>有兩個命名引數,timeout 和 return_when 如果設定了可能會返回未結束的future。

有一點你可能也注意到了,我們重寫了get_flags 函式,是因為之前用到的 requests 庫執行的是阻塞型I/O操作。為了使用 asyncio 包,我們必須把函式改成非同步版。

小技巧

如果你覺得 使用了協程後程式碼難以理解,可以採用 Python之父(Guido van Rossum)的建議,假裝沒有yield from。

已上邊這段程式碼為例:


@asyncio.coroutine
def get_flag(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url) 
image = yield from resp.read()
return image
# 把yield form 去掉
def get_flag(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = aiohttp.request('GET', url) 
image = resp.read()
return image
# 現在是不是清晰多了

知識點

在asyncio 包的API中使用 yield from 時,有個細節要注意:

使用asyncio包時,我們編寫的非同步程式碼中包含由asyncio本身驅動的協程(委派生成器),而生成器最終把指責委託給asyncio包或者第三方庫中的協程。這種處理方式相當於架起了管道,讓asyncio事件迴圈驅動執行底層非同步I/O的庫函式。

避免阻塞型呼叫

我們先看一個圖,這個圖顯示了電腦從不同儲存介質中讀取資料的延遲情況:

通過這個圖,我們可以看到,阻塞型呼叫對於CPU來說是巨大的浪費。有什麼辦法可以避免阻塞型呼叫中止整個應用程式麼?

有兩種方法:

在單獨的執行緒中執行各個阻塞型操作
把每個阻塞型操作轉化成非阻塞的非同步呼叫使用

當然我們推薦第二種方案,因為第一種方案中如果每個連線都使用一個執行緒,成本太高。

第二種我們可以使用把生成器當做協程使用的方式實現非同步程式設計。對事件迴圈來說,呼叫回撥與在暫停的協程上呼叫 .send() 方法效果差不多。各個暫停的協程消耗的記憶體比執行緒小的多。

現在,你應該能理解為什麼 flags_asyncio.py 指令碼比 flags.py 快的多了吧。

因為flags.py 是依次同步下載,每次下載都要用幾十億個CPU週期等待結果。而在flags_asyncio.py中,在download_many 函式中呼叫loop.run_until_complete 方法時,事件迴圈驅動各個download_one 協程,執行到yield from 表示式出,那個表示式又驅動各個 get_flag 協程,執行到第一個yield from 表示式處,呼叫 aiohttp.request()函式。這些呼叫不會阻塞,因此在零點幾秒內所有請求都可以全部開始。

改進 asyncio 下載指令碼

現在我們改進一下上邊的 flags_asyncio.py,在其中新增上異常處理,計數器


import asyncio
import collections
from collections import namedtuple
from enum import Enum
import aiohttp
from aiohttp import web
from flags import save_flag, show, main, BASE_URL
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
# 自定義異常用於包裝其他HTTP貨網路異常,並獲取country_code,以便報告錯誤
class FetchError(Exception):
def __init__(self, country_code):
self.country_code = country_code
@asyncio.coroutine
def get_flag(cc):
# 此協程有三種返回結果:
# 1. 返回下載到的圖片
# 2. HTTP 響應為404 時,丟擲web.HTTPNotFound 異常
# 3. 返回其他HTTP狀態碼時, 丟擲aiohttp.HttpProcessingError
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HttpNotFound()
else:
raise aiohttp.HttpProcessionError(
code=resp.status, message=resp.reason,
headers=resp.headers
)
@asyncio.coroutine
def download_one(cc, semaphore):
# semaphore 引數是 asyncio.Semaphore 類的例項
# Semaphore 類是同步裝置,用於限制併發請求
try:
with (yield from semaphore):
# 在yield from 表示式中把semaphore 當成上下文管理器使用,防止阻塞整個系統
# 如果semaphore 計數器的值是所允許的最大值,只有這個協程會阻塞
image = yield from get_flag(cc)
# 退出with語句後 semaphore 計數器的值會遞減,
# 解除阻塞可能在等待同一個semaphore物件的其他協程例項
except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc
else:
save_flag(image, cc.lower()   '.gif')
status = HTTPStatus.ok
msg = 'ok'
return Result(status, cc)
@asyncio.coroutine
def downloader_coro(cc_list):
counter = collections.Counter()
# 建立一個 asyncio.Semaphore 例項,最多允許啟用MAX_CONCUR_REQ個使用這個計數器的協程
semaphore = asyncio.Semaphore(MAX_CONCUR_REQ)
# 多次呼叫 download_one 協程,建立一個協程物件列表
to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
# 獲取一個迭代器,這個迭代器會在future執行結束後返回future
to_do_iter = asyncio.as_completed(to_do)
for future in to_do_iter:
# 迭代允許結束的 future 
try:
res = yield from future # 獲取asyncio.Future 物件的結果(也可以呼叫future.result)
except FetchError as exc:
# 丟擲的異常都包裝在FetchError 物件裡
country_code = exc.country_code
try:
# 嘗試從原來的異常 (__cause__)中獲取錯誤訊息
error_msg = exc.__cause__.args[0]
except IndexError:
# 如果在原來的異常中找不到錯誤訊息,使用所連線異常的類名作為錯誤訊息
error_msg = exc.__cause__.__class__.__name__
if error_msg:
msg = '*** Error for {}: {}'
print(msg.format(country_code, error_msg))
status = HTTPStatus.error
else:
status = res.status
counter[status]  = 1
return counter
def download_many(cc_list):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list)
counts = loop.run_until_complete(coro)
loop.close()
return counts
if __name__ == '__main__':
main(download_many)

由於協程發起的請求速度較快,為了防止向伺服器發起太多的併發請求,使伺服器過載,我們在download_coro 函式中建立一個asyncio.Semaphore 例項,然後把它傳給download_one 函式。

<secion class=”caption”>Semaphore</section> 物件維護著一個內部計數器,若在物件上呼叫 .acquire() 協程方法,計數器則遞減;若在物件上呼叫 .release() 協程方法,計數器則遞增。計數器的值是在初始化的時候設定。

如果計數器大於0,那麼呼叫 .acquire() 方法不會阻塞,如果計數器為0, .acquire() 方法會阻塞呼叫這個方法的協程,直到其他協程在同一個 Semaphore 物件上呼叫 .release() 方法,讓計數器遞增。

在上邊的程式碼中,我們並沒有手動呼叫 .acquire() 或 .release() 方法,而是在 download_one 函式中 把 semaphore 當做上下文管理器使用:


with (yield from semaphore):
image = yield from get_flag(cc)

這段程式碼保證,任何時候都不會有超過 MAX_CONCUR_REQ 個 get_flag 協程啟動。

使用 asyncio.as_completed 函式

因為要使用 yield from 獲取 asyncio.as_completed 函式產出的future的結果,所以 as_completed 函式秩序在協程中呼叫。由於 download_many 要作為引數傳給非協程的main 函式,我已我們新增了一個新的 downloader_coro 協程,讓download_many 函式只用於設定事件迴圈。

使用Executor 物件,防止阻塞事件迴圈

現在我們回去看下上邊關於電腦從不同儲存介質讀取資料的延遲情況圖,有一個實時需要注意,那就是訪問本地檔案系統也會阻塞。

上邊的程式碼中,save_flag 函式阻塞了客戶程式碼與 asyncio 事件迴圈公用的唯一執行緒,因此儲存檔案時,整個應用程式都會暫停。為了避免這個問題,可以使用事件迴圈物件的 run_in_executor 方法。

asyncio 的事件迴圈在後臺維護著一個ThreadPoolExecutor 物件,我們可以呼叫 run_in_executor 方法,把可呼叫的物件發給它執行。

下邊是我們改動後的程式碼:


@asyncio.coroutine
def download_one(cc, semaphore):
try:
with (yield from semaphore):
image = yield from get_flag(cc)
except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc
else:
# 這裡是改動部分
loop = asyncio.get_event_loop() # 獲取事件迴圈的引用
loop.run_in_executor(None, save_flag, image, cc.lower()   '.gif')
status = HTTPStatus.ok
msg = 'ok'
return Result(status, cc)

run_in_executor 方法的第一個引數是Executor 例項;如果設為None,使用事件迴圈的預設 ThreadPoolExecutor 例項。

從回撥到future到協程

在接觸協程之前,我們可能對回撥有一定的認識,那麼和回撥相比,協程有什麼改進呢?

python中的回撥程式碼樣式:


def stage1(response1):
request2 = step1(response1)
api_call2(request2, stage2)
def stage2(response2):
request3 = step3(response3)
api_call3(request3, stage3) 
def stage3(response3):
step3(response3) 
api_call1(request1, stage1)

上邊的程式碼的缺陷:

容易出現回撥地獄
程式碼難以閱讀

在這個問題上,協程能發揮很大的作用。如果換成協程和yield from 結果做的非同步程式碼,程式碼示例如下:


@asyncio.coroutine
def three_stages(request1):
response1 = yield from api_call1(request1)
request2 = step1(response1)
response2 = yield from api_call2(requests)
request3 = step2(response2)
response3 = yield from api_call3(requests)
step3(response3) 
loop.create_task(three_stages(request1)

和之前的程式碼相比,這個程式碼就容易理解多了。如果非同步呼叫 api_call1,api_call2,api_call3 會丟擲異常,那麼可以把相應的 yield from 表示式放在 try/except 塊中處理異常。

使用協程必須習慣 yield from 表示式,並且協程不能直接呼叫,必須顯式的排定協程的執行時間,或在其他排定了執行時間的協程中使用yield from 表示式吧它啟用。如果不使用 loop.create_task(three_stages(request1)),那麼什麼都不會發生。

下面我們用一個實際的例子來演示一下:

每次下載發起多次請求

我們修改一下上邊下載國旗的程式碼,使在下載國旗的同時還可以獲取國家名稱在儲存圖片的時候使用。
我們使用協程和yield from 解決這個問題:


@asyncio.coroutine
def http_get(url):
resp = yield from aiohttp.request('GET', url)
if resp.status == 200:
ctype = resp.headers.get('Content-type', '').lower()
if 'json' in ctype or url.endswith('json'):
data = yield from resp.json()
else:
data = yield from resp.read()
return data
elif resp.status == 404:
raise web.HttpNotFound()
else:
raise aiohttp.HttpProcessionError(
code=resp.status, message=resp.reason,
headers=resp.headers)
@asyncio.coroutine
def get_country(cc):
url = "{}/{cc}/metadata.json".format(BASE_URL, cc=cc.lower())
metadata = yield from http_get(url)
return metadata['country']
@asyncio.coroutine
def get_flag(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
return (yield from http_get(url))
@asyncio.coroutine
def download_one(cc, semaphore):
try:
with (yield from semaphore):
image = yield from get_flag(cc)
with (yield from semaphore):
country = yield from get_country(cc)
except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc
else:
country = country.replace(' ', '_')
filename = '{}--{}.gif'.format(country, cc)
print(filename)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, save_flag, image, filename)
status = HTTPStatus.ok
msg = 'ok'
return Result(status, cc)

在這段程式碼中,我們在download_one 函式中分別在 semaphore 控制的兩個with 塊中呼叫get_flag 和 get_country,是為了節約時間。

get_flag 的return 語句在外層加上括號,是因為() 的運算子優先順序高,會先執行括號內的yield from 語句 返回的結果。如果不加 會報句法錯誤

加() ,相當於


image = yield from http_get(url)
return image

如果不加(),那麼程式會在 yield from 處中斷,交出控制權,這時使用return 會報句法錯誤。

總結

這一篇我們討論了:

對比了一個多執行緒程式和asyncio版,說明了多執行緒和非同步任務之間的關係
比較了 asyncio.Future 類 和 concurrent.futures.Future 類的區別
如何使用非同步程式設計管理網路應用中的高併發
在非同步程式設計中,與回撥相比,協程顯著提升效能的方式

相關文章

程式語言 最新文章