Python使用asyncio包處理併發詳解

NO IMAGE

阻塞型I/O和GIL

CPython 直譯器本身就不是執行緒安全的,因此有全域性直譯器鎖(GIL),一次只允許使用一個執行緒執行 Python 位元組碼。因此,一個 Python 程序通常不能同時使用多個 CPU 核心。

然而,標準庫中所有執行阻塞型 I/O 操作的函式,在等待作業系統返回結果時都會釋放GIL。這意味著在 Python 語言這個層次上可以使用多執行緒,而 I/O 密集型 Python 程式能從中受益:一個 Python 執行緒等待網路響應時,阻塞型 I/O 函式會釋放 GIL,再執行一個執行緒。

asyncio

這個包使用事件迴圈驅動的協程實現併發。 asyncio 大量使用 yield from 表示式,因此與Python 舊版不相容。

asyncio 包使用的“協程”是較嚴格的定義。適合asyncio API 的協程在定義體中必須使用 yield from,而不能使用 yield。此外,適合 asyncio 的協程要由呼叫方驅動,並由呼叫方通過 yield from 呼叫;

示例1


import threading
import asyncio
@asyncio.coroutine
def hello():
print('Start Hello', threading.currentThread())
yield from asyncio.sleep(5)
print('End Hello', threading.currentThread())
@asyncio.coroutine
def world():
print('Start World', threading.currentThread())
yield from asyncio.sleep(3)
print('End World', threading.currentThread())
# 獲取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), world()]
# 執行coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine把生成器函式標記為協程型別。
asyncio.sleep(3) 建立一個3秒後完成的協程。
loop.run_until_complete(future),執行直到future完成;如果引數是 coroutine object,則需要使用 ensure_future()函式包裝。
loop.close() 關閉事件迴圈

示例2


import asyncio
@asyncio.coroutine
def worker(text):
"""
協程執行的函式
:param text:
:return:
"""
i = 0
while True:
print(text, i)
try:
yield from asyncio.sleep(.1)
except asyncio.CancelledError:
break
i  = 1
@asyncio.coroutine
def client(text, io_used):
worker_fu = asyncio.ensure_future(worker(text))
# 假裝等待I/O一段時間
yield from asyncio.sleep(io_used)
# 結束執行協程
worker_fu.cancel()
return 'done'
loop = asyncio.get_event_loop()
tasks = [client('xiaozhe', 3), client('zzzz', 5)]
result = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('Answer:', result)

解釋:

1. asyncio.ensure_future(coro_or_future, *, loop=None):計劃安排一個 coroutine object的執行,返回一個 asyncio.Task object。
2. worker_fu.cancel(): 取消一個協程的執行,丟擲CancelledError異常。
3. asyncio.wait():協程的引數是一個由期物或協程構成的可迭代物件; wait 會分別把各個協程包裝進一個 Task 物件。

asyncio.Task 物件與threading.Thread物件的比較

asyncio.Task 物件差不多與 threading.Thread 物件等效。
Task 物件用於驅動協程, Thread 物件用於呼叫可呼叫的物件。
Task 物件不由自己動手例項化,而是通過把協程傳給 asyncio.ensure_future(…) 函式或loop.create_task(…) 方法獲取。
獲取的 Task 物件已經排定了執行時間;Thread 例項則必須呼叫 start 方法,明確告知讓它執行。
如果想終止任務,可以使用 Task.cancel() 例項方法,在協程內部丟擲CancelledError 異常。

執行緒與協程的安全比較

如果使用執行緒做過重要的程式設計,因為排程程式任何時候都能中斷執行緒。必須記住保留鎖,去保護程式中的重要部分,防止多步操作在執行的過程中中斷,防止資料處於無效狀態。

協程預設會做好全方位保護,以防止中斷。我們必須顯式產出才能讓程式的餘下部分執行。對協程來說,無需保留鎖,在多個執行緒之間同步操作,協程自身就會同步,因為在任意時刻只有一個協程執行。想交出控制權時,可以使用 yield 或 yield from 把控制權交還排程程式。這就是能夠安全地取消協程的原因:按照定義,協程只能在暫停的 yield處取消,因此可以處理 CancelledError 異常,執行清理操作。

Future(期物)

通常情況下自己不應該建立期物,而只能由併發框架(concurrent.futures 或 asyncio)例項化。原因很簡單:期物表示終將發生的事情,而確定某件事會發生的唯一方式是執行的時間已經排定。

asyncio.Future

在 asyncio 包中, BaseEventLoop.create_task(…) 方法接收一個協程,排定它的執行時間,然後返回一個 asyncio.Task 例項——也是 asyncio.Future 類的例項,因為 Task 是Future 的子類,用於包裝協程。

asyncio.ensure_future(coro_or_future, *, loop=None)

這個函式統一了協程和期物:第一個引數可以是二者中的任何一個。如果是 Future 或 Task 物件,那就原封不動地返回。如果是協程,那麼 async 函式會呼叫loop.create_task(…) 方法建立 Task 物件。 loop= 關鍵字引數是可選的,用於傳入事件迴圈;如果沒有傳入,那麼 async 函式會通過呼叫 asyncio.get_event_loop() 函式獲取迴圈物件。

BaseEventLoop.create_task(coro)

這個方法排定協程的執行時間,返回一個 asyncio.Task 物件。

asyncio 包中有多個函式會自動把引數指定的協程包裝在 asyncio.Task 物件中,例如 BaseEventLoop.run_until_complete(…) 方法。

asyncio.as_completed

為了整合進度條,我們可以使用的是 as_completed 生成器函式;幸好, asyncio 包提供了這個生成器函式的相應版本。

使用asyncio和aiohttp包

從 Python 3.4 起, asyncio 包只直接支援 TCP 和 UDP。如果想使用 HTTP 或其他協議,那麼要藉助第三方包 aiohttp 。


cc_list = ['China', 'USA']
@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
image = yield from resp.read()
return image
@asyncio.coroutine
def download_one(name): 
image = yield from get_flag(name) 
save_flag(image, name.lower()   '.gif')
return name
loop = asyncio.get_event_loop() 
wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)]) 
res, _ = loop.run_until_complete(wait_coro) 
loop.close()

使用 asyncio 包時,我們編寫的非同步程式碼中包含由 asyncio 本身驅動的協程(即委派生成器),而生成器最終把職責委託給 asyncio 包或第三方庫(如aiohttp)中的協程。這種處理方式相當於架起了管道,讓 asyncio 事件迴圈(通過我們編寫的協程)驅動執行低層非同步 I/O 操作的庫函式。

避免阻塞型呼叫

有兩種方法能避免阻塞型呼叫中止整個應用程式的程序:
1. 在單獨的執行緒中執行各個阻塞型操作
2. 把每個阻塞型操作轉換成非阻塞的非同步呼叫使用

多個執行緒是可以的,但是各個作業系統執行緒(Python 使用的是這種執行緒)消耗的記憶體達兆位元組(具體的量取決於作業系統種類)。如果要處理幾千個連線,而每個連線都使用一個執行緒的話,我們負擔不起。

把生成器當作協程使用是非同步程式設計的另一種方式。對事件迴圈來說,呼叫回撥與在暫停的協程上呼叫 .send() 方法效果差不多。各個暫停的協程是要消耗記憶體,但是比執行緒消耗的記憶體數量級小。

上面的指令碼為什麼會很快

在上面的指令碼中,呼叫 loop.run_until_complete 方法時,事件迴圈驅動各個download_one 協程,執行到第一個 yield from 表示式處時,那個表示式驅動各個get_flag 協程,然後在get_flag協程裡面執行到第一個 yield from 表示式處時,呼叫 aiohttp.request(…)函式。這些呼叫都不會阻塞,因此在零點幾秒內所有請求全部開始。

asyncio 的基礎設施獲得第一個響應後,事件迴圈把響應發給等待結果的 get_flag 協程。得到響應後, get_flag 向前執行到下一個 yield from 表示式處,呼叫resp.read() 方法,然後把控制權還給主迴圈。其他響應會陸續返回。所有 get_ flag 協程都獲得結果後,委派生成器 download_one 恢復,儲存影象檔案。

async和await

為了簡化並更好地標識非同步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的程式碼更簡潔易讀。

async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換。
1. 把@asyncio.coroutine替換為async
2. 把yield from替換為await

例如:


@asyncio.coroutine
def hello():
print("Hello world!")
r = yield from asyncio.sleep(1)
print("Hello again!")

等同於


async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")

網站請求例項


import asyncio
import aiohttp
urls = [
'http://www.163.com/',
'http://www.sina.com.cn/',
'https://www.hupu.com/',
'http://www.csdn.net/'
]
async def get_url_data(u):
"""
讀取url的資料
:param u:
:return:
"""
print('running ', u)
async with aiohttp.ClientSession() as session:
async with session.get(u) as resp:
print(u, resp.status, type(resp.text()))
# print(await resp.text())
return resp.headers
async def request_url(u):
"""
主排程函式
:param u:
:return:
"""
res = await get_url_data(u)
return res
loop = asyncio.get_event_loop()
task_lists = asyncio.wait([request_url(u) for u in urls])
all_res, _ = loop.run_until_complete(task_lists)
loop.close()
print(all_res)