python併發程式設計之執行緒例項解析

python併發程式設計之執行緒例項解析

常用用法

t.is_alive()

Python中執行緒會在一個單獨的系統級別執行緒中執行(比如一個POSIX執行緒或者一個Windows執行緒)

這些執行緒將由作業系統來全權管理。執行緒一旦啟動,將獨立執行直到目標函式返回。可以通過查詢

一個執行緒物件的狀態,看它是否還在執行t.is_alive()

t.join()

可以把一個執行緒加入到當前執行緒,並等待它終止

Python直譯器在所有執行緒都終止後才繼續執行程式碼剩餘的部分

daemon

對於需要長時間執行的執行緒或者需要一直執行的後臺任務,可以用後臺執行緒(也稱為守護執行緒)

例:

t=Thread(target=func,args(1,),daemon=True)

t.start()

後臺執行緒無法等待,這些執行緒會在主執行緒終止時自動銷燬

小結:

後臺執行緒無法等待,不過,這些執行緒會在主執行緒終止時自動銷燬。你無法結束一個執行緒,無法給它傳送信

號,無法調整它的排程,也無法執行其他高階操作。如果需要這些特性,你需要自己新增。比如說,

如果你需要終止執行緒,那麼這個執行緒必須通過程式設計在某個特定點輪詢來退出

如果執行緒執行一些像I/O這樣的阻塞操作,那麼通過輪詢來終止執行緒將使得執行緒之間的協調變得非常棘手。

比如,如果一個執行緒一直阻塞在一個I/O操作上,它就永遠無法返回,也就無法檢查自己是否已經被結束了。

要正確處理這些問題,需要利用超時迴圈來小心操作執行緒。

執行緒間通訊

queue

一個執行緒向另外一個執行緒傳送資料最安全的方式應該就是queue庫中的佇列

先看一下使用例子,這裡是一個簡單的生產者和消費者模型:


from queue import Queue
from threading import Thread
import random
import time
_sentinel = object()
def producer(out_q):
n = 10
while n:
time.sleep(1)
data = random.randint(0, 10)
out_q.put(data)
print("生產者生產了資料{0}".format(data))
n -= 1
out_q.put(_sentinel)
def consumer(in_q):
while True:
data = in_q.get()
print("消費者消費了{0}".format(data))
if data is _sentinel:
in_q.put(_sentinel)
break
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

上述程式碼中設定了一個特殊值_sentinel用於當獲取到這個值的時候終止執行

關於queue的功能有個需要注意的地方:

Queue物件雖然已經包含了必要的鎖,主要有q.put和q.get

而q.size(),q.full(),q.empty()等方法不是執行緒安全的

使用佇列進行執行緒通訊是一個單向、不確定的過程。通常情況下,是沒有辦法知道接收資料的執行緒是什麼時候接收到的資料並開始工作的。但是佇列提供了一些基本的特性:q.task_done()和q.join()

如果一個執行緒需要在另外一個執行緒處理完特定的資料任務後立即得到通知,可以把要傳送的資料和一個Event放到一起使用

關於執行緒中的Event

執行緒有一個非常關鍵的特性:每個執行緒都是獨立執行的,且狀態不可預測

如果程式中的其他執行緒需要通過判斷每個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會比較麻煩。

解決方法:

使用threading庫中的Event

Event物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。

在初始化狀態下,event物件中的訊號標誌被設定為假。

如果有執行緒等待一個event物件,而這個event的標誌為假,這個執行緒將一直被阻塞知道該標誌為真。

一個執行緒如果把event物件的標誌設定為真,就會喚醒所有等待這個event物件的執行緒。

通過一個程式碼例子理解:


from threading import Thread, Event
import time
def countdown(n, started_evt):
print("countdown starting")
# set將event的標識設定為True
started_evt.set()
while n > 0:
print("T-mins", n)
n -= 1
time.sleep(2)
# 初始化的started_evt為False
started_evt = Event()
print("Launching countdown")
t = Thread(target=countdown, args=(10, started_evt,))
t.start()
# 會一直等待直到event的標誌為True的時候
started_evt.wait()
print("countdown is running")

而結果,我們也可以看出當執行緒執行了set之後,才列印running

實際用event物件最好是單次使用,建立一個event物件,讓某個執行緒等待這個物件,一旦物件被設定為Tru,就應該丟棄它,我們雖然可以通過clear()方法重置event物件,但是這個沒法確保安全的清理event物件並對它進行重新的賦值。會發生錯過事件,死鎖等各種問題。

event物件的一個重要特點是它被設定為True時會喚醒所有等待它的執行緒,如果喚醒單個執行緒的最好用Condition或訊號量Semaphore

和event功能類似的執行緒中還有一個Condition

關於執行緒中的Condition

關於Condition官網的一段話:

Aconditionvariableisalwaysassociatedwithsomekindoflock;thiscanbepassedinoronewillbecreatedbydefault.Passingoneinisusefulwhenseveralconditionvariablesmustsharethesamelock.Thelockispartoftheconditionobject:youdon’thavetotrackitseparately.

Othermethodsmustbecalledwiththeassociatedlockheld.Thewait()methodreleasesthelock,andthenblocksuntilanotherthreadawakensitbycallingnotify()ornotify_all().Onceawakened,wait()re-acquiresthelockandreturns.Itisalsopossibletospecifyatimeout.

但是需要注意的是:

notify()andnotify_all()這兩個方法,不會釋放鎖,這意味著執行緒或者被喚醒的執行緒不會立刻執行wait()

我們可以通過Conditon物件實現一個週期定時器的功能,每當定時器超時的時候,其他執行緒都可以檢測到,程式碼例子如下:


import threading
import time
class PeriodicTimer:
"""
這裡做了一個定時器
""" def __init__(self, interval):
self._interval = interval
self._flag = 0
self._cv = threading.Condition()
def start(self):
t = threading.Thread(target=self.run)
t.daemon = True
t.start()
def run(self):
while True:
time.sleep(self._interval)
with self._cv:
# 這個點還是非常有意思的^=
self._flag ^= 1
self._cv.notify_all()
def wait_for_tick(self):
with self._cv:
last_flag = self._flag
while last_flag == self._flag:
self._cv.wait()
# 下面兩個分別為兩個需要定時執行的任務
def countdown(nticks):
while nticks > 0:
ptimer.wait_for_tick()
print('T-minus', nticks)
nticks -= 1
def countup(last):
n = 0
while n < last:
ptimer.wait_for_tick()
print('Counting', n)
n  = 1
ptimer = PeriodicTimer(5)
ptimer.start()
threading.Thread(target=countdown, args=(10,)).start()
threading.Thread(target=countup, args=(5,)).start()

關於執行緒中鎖的使用

要在多執行緒中安全使用可變物件,需要使用threading庫中的Lock物件

先看一個關於鎖的基本使用:


import threading
class SharedCounter:
def __init__(self, initial_value=0):
self._value = initial_value
self._value_lock = threading.Lock()
def incr(self,delta = 1):
with self._value_lock:
self._value  = delta
def decr(self, delta=1):
with self._value_lock:
self._value -= delta

Lock物件和with語句塊一起使用可以保證互斥執行,這樣每次就只有一個執行緒可以執行with語句包含的程式碼塊。with語句會在這個程式碼快執行前自動獲取鎖,在執行結束後自動釋放所。

執行緒的排程本質上是不確定的,因此,在多執行緒程式中錯誤的使用鎖機制可能會導致隨機資料

損壞或者其他異常錯誤,我們稱之為競爭條件

你可能看到有些“老python程式設計師”

還是通過_value_lock.acquire()和_value_lock.release(),明顯看來

還是with更加方便,不容易出錯,畢竟你無法保證那次就忘記釋放鎖了

為了避免死鎖,使用鎖機制的程式應該設定每個執行緒一次只能獲取一個鎖

threading庫中還提供了其他的同步原語:RLock,Semaphore物件。但是這兩個使用場景相對來說比較特殊

RLock(可重入鎖)可以被同一個執行緒多次獲取,主要用來實現基於檢測物件模式的鎖定和同步。在使用這種鎖的時候,當鎖被持有時,只有一個執行緒可以使用完整的函式或者類中的方法,例子如下:


import threading
class SharedCounter:
_lock = threading.RLock()
def __init__(self,initial_value=0):
self._value = initial_value
def incr(self,delta=1):
with SharedCounter._lock:
self._value  = delta
def decr(self,delta=1):
with SharedCounter._lock:
self.incr(-delta)

這個例子中的鎖是一個類變數,也就是所有例項共享的類級鎖,這樣就保證了一次只有一個執行緒可以呼叫這個類的方法。與標準鎖不同的是已經持有這個鎖的方法再呼叫同樣適用這個鎖的方法時,無需再次獲取鎖,例如上面例子中的decr方法。

這種方法的特點是:無論這個類有多少例項都使用一個鎖。因此在需要使用大量使用計數器的情況下記憶體效率更高。

缺點:在程式中使用大量執行緒並頻繁更新計數器時會有競爭用鎖的問題。

訊號量物件是一個建立在共享計數器基礎上的同步原語,如果計數器不為0,with語句講計數器減1,

執行緒被允許執行。with語句執行結束後,計數器加1。如果計數器為0,執行緒將被阻塞,直到其他執行緒結束並將計數器加1。但是訊號量不推薦使用,增加了複雜性,影響程式效能。

所以訊號量更適用於哪些需要線上程之間引入訊號或者限制的程式。例如限制一段程式碼的併發量


from threading import Semaphore
import requests
_fetch_url_sema = Semaphore(5)
def fetch_url(url):
with _fetch_url_sema:
return requests.get(url)

關於防止死鎖的加鎖機制

在多執行緒程式中,死鎖問題很大一部分是由於多執行緒同時獲取多個鎖造成的。

舉個例子:一個執行緒獲取一個第一個鎖,在獲取第二個鎖的時候發生阻塞,那麼這個執行緒就可能阻塞其他執行緒執行,從而導致整個程式假死。

一種解決方法:為程式中每一個鎖分配一個唯一的id,然後只允許按照升序規則來使用多個鎖。


import threading
from contextlib import contextmanager
# 儲存已經請求鎖的資訊
_local = threading.local()
@contextmanager
def acquire(*locks):
# 把鎖通過id進行排序
locks = sorted(locks, key=lambda x: id(x))
acquired = getattr(_local, 'acquired', [])
if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
raise RuntimeError("Lock order Violation")
acquired.extend(locks)
_local.acquired = acquired
try:
for lock in locks:
lock.acquire()
yield
finally:
for lock in reversed(locks):
lock.release()
del acquired[-len(locks):]
x_lock = threading.Lock()
y_lock = threading.Lock()
def thread_1():
while True:
with acquire(x_lock,y_lock):
print("Thread-1")
def thread_2():
while True:
with acquire(y_lock,x_lock):
print("Thread-2")
t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()
t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()

通過排序,不管以什麼樣的順序來請求鎖,這些鎖都會按照固定的順序被獲取。

這裡也用了thread.local()來儲存請求鎖的資訊

同樣的這個東西也可以用來儲存執行緒的資訊,而這個執行緒對其他的執行緒是不可見的

總結