如何在Python中編寫併發程式

NO IMAGE
1 Star2 Stars3 Stars4 Stars5 Stars 給文章打分!
Loading...

GIL

在Python中,由於歷史原因(GIL),使得Python中多執行緒的效果非常不理想.GIL使得任何時刻Python只能利用一個CPU核,並且它的排程演算法簡單粗暴:多執行緒中,讓每個執行緒執行一段時間t,然後強行掛起該執行緒,繼而去執行其他執行緒,如此周而復始,直到所有執行緒結束.

這使得無法有效利用計算機系統中的”區域性性”,頻繁的執行緒切換也對快取不是很友好,造成資源的浪費.

據說Python官方曾經實現了一個去除GIL的Python直譯器,但是其效果還不如有GIL的直譯器,遂放棄.後來Python官方推出了”利用多程序替代多執行緒”的方案,在Python3中也有concurrent.futures這樣的包,讓我們的程式編寫可以做到”簡單和效能兼得”.

多程序/多執行緒 Queue

一般來說,在Python中編寫併發程式的經驗是:計算密集型任務使用多程序,IO密集型任務使用多程序或者多執行緒.另外,因為涉及到資源共享,所以需要同步鎖等一系列麻煩的步驟,程式碼編寫不直觀.另外一種好的思路是利用多程序/多執行緒 Queue的方法,可以避免加鎖這樣麻煩低效的方式.

現在在Python2中利用Queue 多程序的方法來處理一個IO密集型任務.

假設現在需要下載多個網頁內容並進行解析,單程序的方式效率很低,所以使用多程序/多執行緒勢在必行.
我們可以先初始化一個tasks佇列,裡面將要儲存的是一系列dest_url,同時開啟4個程序向tasks中取任務然後執行,處理結果儲存在一個results佇列中,最後對results中的結果進行解析.最後關閉兩個佇列.

下面是一些主要的邏輯程式碼.


# -*- coding:utf-8 -*-
#IO密集型任務
#多個程序同時下載多個網頁
#利用Queue 多程序
#由於是IO密集型,所以同樣可以利用threading模組
import multiprocessing
def main():
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
cpu_count = multiprocessing.cpu_count() #程序數目==CPU核數目
create_process(tasks, results, cpu_count)  #主程序馬上建立一系列程序,但是由於阻塞佇列tasks開始為空,副程序全部被阻塞
add_tasks(tasks) #開始往tasks中新增任務
parse(tasks, results) #最後主程序等待其他執行緒處理完成結果
def create_process(tasks, results, cpu_count):
for _ in range(cpu_count):
p = multiprocessing.Process(target=_worker, args=(tasks, results)) #根據_worker建立對應的程序
p.daemon = True #讓所有程序可以隨主程序結束而結束
p.start() #啟動
def _worker(tasks, results):
while True:  #因為前面所有執行緒都設定了daemon=True,故不會無限迴圈
try:
task = tasks.get()  #如果tasks中沒有任務,則阻塞
result = _download(task)
results.put(result)  #some exceptions do not handled
finally:
tasks.task_done()
def add_tasks(tasks):
for url in get_urls(): #get_urls() return a urls_list
tasks.put(url)
def parse(tasks, results):
try: 
tasks.join()
except KeyboardInterrupt as err:
print "Tasks has been stopped!"
print err
while not results.empty():
_parse(results)
if __name__ == '__main__':
main()

利用Python3中的concurrent.futures包

在Python3中可以利用concurrent.futures包,編寫更加簡單易用的多執行緒/多程序程式碼.其使用感覺和Java的concurrent框架很相似(借鑑?)
比如下面的簡單程式碼示例


def handler():
futures = set()
with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
for task in get_task(tasks):
future = executor.submit(task)
futures.add(future)
def wait_for(futures):
try:
for future in concurrent.futures.as_completed(futures):
err = futures.exception()
if not err:
result = future.result()
else:
raise err
except KeyboardInterrupt as e:
for future in futures:
future.cancel()
print "Task has been canceled!"
print e
return result

總結

要是一些大型Python專案也這般編寫,那麼效率也太低了.在Python中有許多已有的框架使用,使用它們起來更加高效.
但是自己的一些”小打小鬧”的程式這樣來編寫還是不錯的.:)

您可能感興趣的文章:

Python多程序併發(multiprocessing)用法例項詳解python實現多執行緒的方式及多條命令併發執行Python控制多程序與多執行緒併發數總結Python中的併發程式設計例項簡單介紹Python中利用生成器實現的併發程式設計python併發程式設計之多程序、多執行緒、非同步和協程詳解

相關文章

程式語言 最新文章