asyncio:高效能非同步模組使用介紹

NO IMAGE

本文是作者「無名小妖」的第3篇原創投稿,文章重點探討 Python3 中的 asyncio 庫,這是官方推薦實現高併發的模組。如果你喜歡技術寫作並且願意分享,可投稿給我(公眾號選單有聯絡方式)。文章採納後會有 50~100元 稿費。

本文Asyncio 從 3.4 開始成為 Python 生態系統的一部分,從那時起因其令人印象深刻的速度和易用性,它成為了大量 Python 庫和框架的基礎。

640

Asyncio 允許您輕鬆地編寫利用協程的單執行緒併發程式,這些協程就像一個個被剝離的執行緒,並不會出現像使用執行緒時會遇到問題(要記住保留鎖保護程式中的重要部分,要防止多步操作在執行的過程中中斷,要防止資料處於無效狀態)。

Asyncio還做了很好的工作,將我們從I/O多路複用訪問的複雜性抽象出來,它還是程式執行緒安全的,因為在任意時刻只有一個協程執行。

入門

為了使用Asyncio,我們需要了解一下事件迴圈和協程。所有基於非同步的系統都需要一個事件迴圈,事件迴圈排程我們的非同步子程式並分配不同任務的執行。協程本質上是傳統的執行緒的輕量級版本,它與執行緒非常相似,通過使用協程,我們基本上可以自己編寫非同步程式。

對於不太瞭解同步非同步的同學在這裡略作解釋(已經知道的可以直接看示例):

同步阻塞,就好比火車站過安檢,需要你耗費幾分鐘時間,都檢查完了再進站,每個人都要耽誤幾分鐘。

同步非阻塞,我們假設火車站提供了一種服務名叫“反饋”,你交10塊錢就可以加一個微訊號,然後你把車票、身份證、行李一次性放到一個地方,同時人家還儲存了一下你的美照(這一系列操作後面統稱“打包”),這樣你可以直接進站買點東西呀上個廁所呀(後面統稱“閒逛”),再通過微信不斷詢問 我的票檢查好了嗎? 查好了嗎? 直到那頭回復你說“好了”,你到指定地點去把你剛才打的包取回(後面統稱“取包”),結束。

非同步阻塞,你交20塊錢買了“反饋2.0”—檢查完畢人家會主動發微信告訴你,不需要你在不斷詢問了,而你“打包”完畢,還在檢票口傻等,直到人家說“好了”,你在“取包”。這其實沒有任何意義,因為你還是在等待,真正有意義的是非同步非阻塞。

非同步非阻塞,你交20塊錢買了“反饋2.0”,“打包”完畢,“閒逛”,直到人家說“好了”,然後你“取包”。這才是真正把你解放了,既不用等著,也不用不斷詢問。而本文的asyncio用的就是非同步非阻塞的協程。

我們可以定義一個事件迴圈,用於執行一個簡單的協程。

示例 1

import asyncio

async def MyCoroutine():  # 一個簡單的協程
   print("Hello, world!")

def main():
   # 事件迴圈
   loop = asyncio.get_event_loop()
   # 執行事件迴圈,直到分配的所有任務已經完成
   loop.run_until_complete(MyCoroutine())
   loop.close()

if __name__ == '__main__':
   main()

示例1 能夠成功執行,可是它並沒有帶來什麼好處,因為本例的目的在於讓大家明白事件迴圈和協程的使用方式。在更復雜的場景中,我們才真正看到它在效能上的好處。

示例2

import asyncio

async def MyCoroutine(future):
   # 使用asyncio.sleep模擬一些耗時的操作(一般是一些IO操作,例如網路請求,檔案讀取(就是“過安檢”這個動作))
   await asyncio.sleep(1)
   # 設定future物件的返回結果
   future.set_result("myfuture 已執行")

async def main():
   # 定義一個future物件,asyncio.Future的例項表示將來要完成的任務
   future = asyncio.Future()
   # ensure_future方法 接收協程或者future作為引數,作用是排定他們的執行時間。
   await asyncio.ensure_future(MyCoroutine(future))
   # future.result()返回可呼叫物件的結果,或者重新丟擲執行可呼叫的物件時丟擲的異常。
   print(future.result())

# 將main加入事件迴圈
loop = asyncio.get_event_loop()
try:
   loop.run_until_complete(main())
finally:
   loop.close()

示例2 對一些方法進行了介紹,為我們使用asyncio提升效能做一些鋪墊。這裡涉及到了一個future的概念:future物件表示將來發生的事。可以對比期權、期房來理解。

現在讓我們嘗試用asyncio同時執行多個協程。這將讓你體會asyncio的強大,以及如何使用它來有效地建立一個在單個執行緒上執行的效能難以置信的Python程式。

示例3

import asyncio
import random

async def MyCoroutine(id):
   process_time = random.randint(1, 5)
   await asyncio.sleep(process_time)
   print("協程: {}, 執行完畢。用時: {} 秒".format(id, process_time))

async def main():
   tasks = [asyncio.ensure_future(MyCoroutine(i)) for i in range(10)]
   await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
try:
   loop.run_until_complete(main())
finally:
   loop.close()

為了便於閱讀,示例3沒有進行註釋。首先建立一個協程,它以ID為引數,生成一個名為process_time的1-5的隨機整數,並等待該時間長度,最後它會列印出它的ID以及它等待了多長時間。然後我們生成了10個不同的任務,最後在事件迴圈裡執行這些任務。
輸出:

協程: 2, 執行完畢。用時: 1 秒
協程: 5, 執行完畢。用時: 1 秒
協程: 3, 執行完畢。用時: 1 秒
協程: 9, 執行完畢。用時: 2 秒
協程: 1, 執行完畢。用時: 3 秒
協程: 8, 執行完畢。用時: 3 秒
協程: 7, 執行完畢。用時: 3 秒
協程: 0, 執行完畢。用時: 4 秒
協程: 6, 執行完畢。用時: 4 秒
協程: 4, 執行完畢。用時: 5 秒

從輸出結果可以看出兩點:1.協程並沒有按照順序返回結果;2.批量執行的任務所用的時間和所有任務中用時最長的相同。這就好比做飯的時候,先蒸米飯用時15分鐘,在蒸米飯期間又炒了兩個菜(當然不是一個人在炒),一個用了8分鐘一個用了12分鐘,所以先上的是用了8分鐘的菜,然後是12分鐘的,最後才是米飯。並且最後總用時是15分鐘,而不是35分鐘。這就是非同步帶來的效率提升!如果你覺得提升並不明顯,不妨把任務數提升到100或者1000……

另外,示例中我們使用的是ensure_future和gather,相對應的還有create_task和wait,也能起到類似的作用。

ensure_future接收的引數是協程或者future物件,create_task接收的引數只能是協程。

asyncio.gather 接收的引數是協程或者future物件,返回所有傳入協程或者future的結果集。asyncio.wait 只接收future物件,返回兩組future物件(已完成和等待),它有個timeout引數可用於控制返回之前等待的最大秒數,還有個return_when引數可以控制在什麼情況下讓函式返回。

詳細的可以看官網連結:

  • ensure_future:https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future

  • create_task:https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task

  • gather:https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

  • wait:https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

應用

現在的asyncio,已經有了很多的模組在支援:aiohttp,aiopg,aioredis等等, 可以在這裡檢視: https://github.com/aio-libs 。下面我們來了解其中一個模組aiofiles的使用。

示例4

import asyncio
import aiofiles

async def myopen():
   async with aiofiles.open('333.log', encoding='utf8') as file:
       contents = await file.read()
   print('my read done, file size is {}'.format(len(contents)))

async def test_read():
   print('begin readfile')
   await myopen()
   print('end readfile')

async def test_cumpute(x, y):
   print("begin cumpute")
   await asyncio.sleep(0.2)
   print('end cumpute')
   return x y

loop = asyncio.get_event_loop()
to_do = [asyncio.ensure_future(test_read()), asyncio.ensure_future(test_cumpute(1, 5))]
loop.run_until_complete(asyncio.wait(to_do))
loop.close()

仔細觀察示例4的程式碼我們應該能夠猜出輸出的順序是什麼:由於事件迴圈裡test_read先加入,test_cumpute後加入,所以程式碼一執行必然先輸出begin readfile和begin cumpute,然後看這兩個誰的執行時間更短,如果test_cumpute執行時間更短那麼就先輸出end cumpute,反之則最後輸出end cumpute。由於本例的目的是要模擬在進行一個長時間阻塞任務的同時,做一些其他事,所以檔案要稍微大一些,在讀取檔案期間做一個計算。筆者測試用的檔案‘333.log’有160M,輸出如下:

begin readfile
begin cumpute
end cumpute
my read done, file size is 162538819
end readfile

由此可見,在讀取檔案的過程中計算已經完成。美中不足的是我們並沒有獲取到test_cumpute計算的結果,如何獲取計算結果呢? 請看程式碼:
示例5

import asyncio
import aiofiles

async def myopen():
   async with aiofiles.open('333.log', encoding='utf8') as file:
       contents = await file.read()
   print('my read done, file size is {}'.format(len(contents)))

async def test_read():
   print('begin readfile')
   await myopen()
   print('end readfile')

async def test_cumpute(x, y):
   print("begin cumpute")
   await asyncio.sleep(0.2)
   print('end cumpute')
   return x y

def got_result(future):
   print('The result is ', future.result())

loop = asyncio.get_event_loop()
to_do = [asyncio.ensure_future(test_read()), asyncio.ensure_future(test_cumpute(1, 5))]
to_do[1].add_done_callback(got_result)
loop.run_until_complete(asyncio.wait(to_do))
loop.close()

示例5比示例4只多瞭如下兩句:

def got_result(future):
   print('The result is ', future.result())

to_do[1].add_done_callback(got_result)

示例5引出了add_done_callback,通過add_done_callback方法給test_compute加一個回撥函式get_result,而get_result函式中通過future.result方法獲取協程的返回值。

總結

asyncio使用了與以往python用法完全不同的構造:事件迴圈、協程和futures。這給我們的日常學習和使用增加了難度,但是由於協程提供了相較於執行緒更加安全的使用方式和並不遜色的效能,使得asyncio的應用前景非常廣闊(本文中提到了很多的模組已經在支援asyncio),喜歡python的你,怎麼能不認真學習一下呢?

最後,提供一個用Python和aiohttp建立RESTful API的小例子:

示例6

from aiohttp import web
import json

async def handle(request):
   response_obj = {'status': 'success'}
   return web.Response(text=json.dumps(response_obj))

async def new_user(request):
   try:
       # 獲取url中的name值
       user = request.query['name']
       # 模擬建立了一個使用者
       print("Creating new user with name: ", user)
       # 狀態為200時返回的內容
       response_obj = {'status': 'success'}
       return web.Response(text=json.dumps(response_obj), status=200)
   except Exception as e:
       response_obj = {'status': 'failed', 'reason': str(e)}
       return web.Response(text=json.dumps(response_obj), status=500)

app = web.Application()
# 新增路由
app.router.add_get('/', handle)
app.router.add_get('/user', new_user)

web.run_app(app)

執行程式碼,在瀏覽器中輸入: http://localhost:8080/ 頁面可以看到如下輸出表示成功:

{"status": "success"}

輸入: http://localhost:8080/user?name=wumingxiaoyao 後臺可以看到:

======== Running on http://0.0.0.0:8080 ========
(Press CTRL C to quit)
Creating new user with name:  wumingxiaoyao

640
如果文章對你有幫助,可對作者進行小額讚賞

640?
關注公眾號,訂閱Python技術

相關閱讀: