Python通過future處理併發問題

Python通過future處理併發問題
1 Star2 Stars3 Stars4 Stars5 Stars 給文章打分!
Loading...

future初識

通過下面指令碼來對future進行一個初步瞭解:

例子1:普通通過迴圈的方式


import os
import time
import sys
import requests
POP20_CC = (
"CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR"
).split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
def save_flag(img,filename):
path = os.path.join(DEST_DIR,filename)
with open(path,'wb') as fp:
fp.write(img)
def get_flag(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL,cc=cc.lower())
resp = requests.get(url)
return resp.content
def show(text):
print(text,end=" ")
sys.stdout.flush()
def download_many(cc_list):
for cc in sorted(cc_list):
image = get_flag(cc)
show(cc)
save_flag(image,cc.lower() ".gif")
return len(cc_list)
def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time()-t0
msg = "\n{} flags downloaded in {:.2f}s"
print(msg.format(count,elapsed))
if __name__ == '__main__':
main(download_many)

    例子2:通過future方式實現,這裡對上面的部分程式碼進行了複用


from concurrent import futures
from flags import save_flag, get_flag, show, main
MAX_WORKERS = 20
def download_one(cc):
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() ".gif")
return cc
def download_many(cc_list):
workers = min(MAX_WORKERS,len(cc_list))
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(download_one, sorted(cc_list))
return len(list(res))
if __name__ == '__main__':
main(download_many)

    分別執行三次,兩者的平均速度:13.67和1.59s,可以看到差別還是非常大的。

future

future是concurrent.futures模組和asyncio模組的重要元件

從python3.4開始標準庫中有兩個名為Future的類:concurrent.futures.Future和asyncio.Future
這兩個類的作用相同:兩個Future類的例項都表示可能完成或者尚未完成的延遲計算。與Twisted中的Deferred類、Tornado框架中的Future類的功能類似

注意:通常情況下自己不應該建立future,而是由併發框架(concurrent.futures或asyncio)例項化

原因:future表示終將發生的事情,而確定某件事情會發生的唯一方式是執行的時間已經安排好,因此只有把某件事情交給concurrent.futures.Executor子類處理時,才會建立concurrent.futures.Future例項。
如:Executor.submit()方法的引數是一個可呼叫的物件,呼叫這個方法後會為傳入的可呼叫物件排定時間,並返回一個

future

客戶端程式碼不能應該改變future的狀態,併發框架在future表示的延遲計算結束後會改變期物的狀態,我們無法控制計算何時結束。

這兩種future都有.done()方法,這個方法不阻塞,返回值是布林值,指明future連結的可呼叫物件是否已經執行。客戶端程式碼通常不會詢問future是否執行結束,而是會等待通知。因此兩個Future類都有.add_done_callback()方法,這個方法只有一個引數,型別是可呼叫的物件,future執行結束後會呼叫指定的可呼叫物件。

.result()方法是在兩個Future類中的作用相同:返回可呼叫物件的結果,或者重新丟擲執行可呼叫的物件時丟擲的異常。但是如果future沒有執行結束,result方法在兩個Futrue類中的行為差別非常大。

對concurrent.futures.Future例項來說,呼叫.result()方法會阻塞呼叫方所在的執行緒,直到有結果可返回,此時,result方法可以接收可選的timeout引數,如果在指定的時間內future沒有執行完畢,會丟擲TimeoutError異常。

而asyncio.Future.result方法不支援設定超時時間,在獲取future結果最好使用yield from結構,但是concurrent.futures.Future不能這樣做

不管是asyncio還是concurrent.futures.Future都會有幾個函式是返回future,其他函式則是使用future,在最開始的例子中我們使用的Executor.map就是在使用future,返回值是一個迭代器,迭代器的__next__方法呼叫各個future的result方法,因此我們得到的是各個futrue的結果,而不是future本身

關於future.as_completed函式的使用,這裡我們用了兩個迴圈,一個用於建立並排定future,另外一個用於獲取future的結果


from concurrent import futures
from flags import save_flag, get_flag, show, main
MAX_WORKERS = 20
def download_one(cc):
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() ".gif")
return cc
def download_many(cc_list):
cc_list = cc_list[:5]
with futures.ThreadPoolExecutor(max_workers=3) as executor:
to_do = []
for cc in sorted(cc_list):
future = executor.submit(download_one,cc)
to_do.append(future)
msg = "Secheduled for {}:{}"
print(msg.format(cc,future))
results = []
for future in futures.as_completed(to_do):
res = future.result()
msg = "{}result:{!r}"
print(msg.format(future,res))
results.append(res)
return len(results)
if __name__ == '__main__':
main(download_many)

    結果如下:

注意:Python程式碼是無法控制GIL,標準庫中所有執行阻塞型IO操作的函式,在等待作業系統返回結果時都會釋放GIL.執行其他執行緒執行,也正是因為這樣,Python執行緒可以在IO密集型應用中發揮作用

以上都是concurrent.futures啟動執行緒,下面通過它啟動程序

concurrent.futures啟動程序

concurrent.futures中的ProcessPoolExecutor類把工作分配給多個Python程序處理,因此,如果需要做CPU密集型處理,使用這個模組能繞開GIL,利用所有的CPU核心。

其原理是一個ProcessPoolExecutor建立了N個獨立的Python直譯器,N是系統上面可用的CPU核數。

使用方法和ThreadPoolExecutor方法一樣

總結

您可能感興趣的文章:

Python使用asyncio包處理併發詳解python併發2之使用asyncio處理併發Python基礎教程之利用期物處理併發

相關文章

程式語言 最新文章