詳解CountDownLatch

NO IMAGE

CountDownLatch是什麼? 

jdk1.5開始concurrent包裡提供的,併發編程工具類。 

CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作後再執行。CountDownLatch允許一個或多個線程等待其他線程完成操作。 

例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有的框架服務之後再執行。

CountDownLatch非常適合於對任務進行拆分,使其並行執行,比如某個任務執行2s,其對數據的請求可以分為五個部分,那麼就可以將這個任務拆分為5個子任務,分別交由五個線程執行,執行完成之後再由主線程進行彙總,此時,總的執行時間將決定於執行最慢的任務,平均來看,還是大大減少了總的執行時間。

CountDownLatch是不能複用的,不可能重新初始化或者修改CountDownLatch對象的內部計數器的值。

CountDownLatch如何工作?

CountDownLatch是通過維護一個計數器 cnt 來實現的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務後,調用 countDown() 方法會讓計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,那些因為調用 await() 方法而在等待的線程就會被喚醒。
API
  • countDownLatch.countDown(); //使CountDownLatch初始值N減1;
  • countDownLatch.await(); //調用該方法的線程等到構造方法傳入的N減到0的時候,才能繼續往下執行;
  • await(long timeout, TimeUnit unit); //與上面的await方法功能一致,只不過這裡有了時間限制,調用該方法的線程等到指定的timeout時間後,不管N是否減至為0,都會繼續往下執行;
  • long getCount(); //獲取當前CountDownLatch維護的值;

CountDownLatch底層原理

CountDownLatch通過AQS(AbstractQueuedSynchronizer)裡面的共享鎖來實現的。
ReentrantLock也是使用AQS
CountDownLatch是基於AbstractQueuedSynchronizer實現的,在AbstractQueuedSynchronizer中維護了一個volatile類型的整數state,volatile可以保證多線程環境下該變量的修改對每個線程都可見,並且由於該屬性為整型,因而對該變量的修改也是原子的。創建一個CountDownLatch對象時,所傳入的整數n就會賦值給state屬性,當countDown()方法調用時,該線程就會嘗試對state減一,而調用await()方法時,當前線程就會判斷state屬性是否為0,如果為0,則繼續往下執行,如果不為0,則使當前線程進入等待狀態,直到某個線程將state屬性置為0,其就會喚醒在await()方法中等待的線程。如下是countDown()方法的源代碼:

public void countDown() {
sync.releaseShared(1);
}
這裡sync也即一個繼承了AbstractQueuedSynchronizer的類實例,該類是CountDownLatch的一個內部類,其聲明如下:

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();   // 獲取當前state屬性的值
if (c == 0)   // 如果state為0,則說明當前計數器已經計數完成,直接返回
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 使用CAS算法對state進行設置
return nextc == 0;  // 設置成功後返回當前是否為最後一個設置state的線程
}
}
}
      這裡tryReleaseShared(int)方法即對state屬性進行減一操作的代碼。可以看到,CAS也即compare and set的縮寫,jvm會保證該方法的原子性,其會比較state是否為c,如果是則將其設置為nextc(自減1),如果state不為c,則說明有另外的線程在getState()方法和compareAndSetState()方法調用之間對state進行了設置,當前線程也就沒有成功設置state屬性的值,其會進入下一次循環中,如此往復,直至其成功設置state屬性的值,即countDown()方法調用成功。
在countDown()方法中調用的sync.releaseShared(1)調用時實際還是調用的tryReleaseShared(int)方法,如下是releaseShared(int)方法的實現:

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
        可以看到,在執行sync.releaseShared(1)方法時,其在調用tryReleaseShared(int)方法時會在無限for循環中設置state屬性的值,設置成功之後其會根據設置的返回值(此時state已經自減了一),即當前線程是否為將state屬性設置為0的線程,來判斷是否執行if塊中的代碼。doReleaseShared()方法主要作用是喚醒調用了await()方法的線程。需要注意的是,如果有多個線程調用了await()方法,這些線程都是以共享的方式等待在await()方法處的,試想,如果以獨佔的方式等待,那麼當計數器減少至零時,就只有一個線程會被喚醒執行await()之後的代碼,這顯然不符合邏輯。如下是doReleaseShared()方法的實現代碼:

private void doReleaseShared() {
for (;;) {
Node h = head;  // 記錄等待隊列中的頭結點的線程
if (h != null && h != tail) {   // 頭結點不為空,且頭結點不等於尾節點
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {  // SIGNAL狀態表示當前節點正在等待被喚醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))    // 清除當前節點的等待狀態
continue;
unparkSuccessor(h); // 喚醒當前節點的下一個節點
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)  // 如果h還是指向頭結點,說明前面這段代碼執行過程中沒有其他線程對頭結點進行過處理
break;
}
}
在doReleaseShared()方法中(始終注意當前方法是最後一個執行countDown()方法的線程執行的),首先判斷頭結點不為空,且不為尾節點,說明等待隊列中有等待喚醒的線程,這裡需要說明的是,在等待隊列中,頭節點中並沒有保存正在等待的線程,其只是一個空的Node對象,真正等待的線程是從頭節點的下一個節點開始存放的,因而會有對頭結點是否等於尾節點的判斷。在判斷等待隊列中有正在等待的線程之後,其會清除頭結點的狀態信息,並且調用unparkSuccessor(Node)方法喚醒頭結點的下一個節點,使其繼續往下執行。如下是unparkSuccessor(Node)方法的具體實現:

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);   // 清除當前節點的等待狀態
Node s = node.next;
if (s == null || s.waitStatus > 0) {  // s的等待狀態大於0說明該節點中的線程已經被外部取消等待了
s = null;
// 從隊列尾部往前遍歷,找到最後一個處於等待狀態的節點,用s記錄下來
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);   // 喚醒離傳入節點最近的處於等待狀態的節點線程
}
可以看到,unparkSuccessor(Node)方法的作用是喚醒離傳入節點最近的一個處於等待狀態的線程,使其繼續往下執行。前面我們講到過,等待隊列中的線程可能有多個,而調用countDown()方法的線程只喚醒了一個處於等待狀態的線程,這裡剩下的等待線程是如何被喚醒的呢?其實這些線程是被當前喚醒的線程喚醒的。具體的我們可以看看await()方法的具體執行過程。如下是await()方法的代碼:

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
     await()方法實際還是調用了Sync對象的方法acquireSharedInterruptibly(int)方法,如下是該方法的具體實現:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
     可以看到acquireSharedInterruptibly(int)方法判斷當前線程是否需要以共享狀態獲取執行權限,這裡tryAcquireShared(int)方法是AbstractQueuedSynchronizer中的一個模板方法,其具體實現在前面的Sync類中,可以看到,其主要是判斷state是否為零,如果為零則返回1,表示當前線程不需要進行權限獲取,可直接執行後續代碼,返回-1則表示當前線程需要進行共享權限。具體的獲取執行權限的代碼在doAcquireSharedInterruptibly(int)方法中,如下是該方法的具體實現:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 使用當前線程創建一個共享模式的節點
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();    // 獲取當前節點的前一個節點
if (p == head) {  // 判斷前一個節點是否為頭結點
int r = tryAcquireShared(arg);  // 查看當前線程是否獲取到了執行權限
if (r >= 0) {   // 大於0表示獲取了執行權限
setHeadAndPropagate(node, r); // 將當前節點設置為頭結點,並且喚醒後面處於等待狀態的節點
p.next = null; // help GC
failed = false;
return;
}
}
// 走到這一步說明沒有獲取到執行權限,就使當前線程進入“擱置”狀態
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
      在doAcquireSharedInterruptibly(int)方法中,首先使用當前線程創建一個共享模式的節點。然後在一個for循環中判斷當前線程是否獲取到執行權限,如果有(r >= 0判斷)則將當前節點設置為頭節點,並且喚醒後續處於共享模式的節點;如果沒有,則對調用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法使當前線程處於“擱置”狀態,該“擱置”狀態是由操作系統進行的,這樣可以避免該線程無限循環而獲取不到執行權限,造成資源浪費,這裡也就是線程處於等待狀態的位置,也就是說當線程被阻塞的時候就是阻塞在這個位置。當有多個線程調用await()方法而進入等待狀態時,這幾個線程都將等待在此處。這裡回過頭來看前面將的countDown()方法,其會喚醒處於等待隊列中離頭節點最近的一個處於等待狀態的線程,也就是說該線程被喚醒之後會繼續從這個位置開始往下執行,此時執行到tryAcquireShared(int)方法時,發現r大於0(因為state已經被置為0了),該線程就會調用setHeadAndPropagate(Node, int)方法,並且退出當前循環,也就開始執行awat()方法之後的代碼。這裡我們看看setHeadAndPropagate(Node, int)方法的具體實現:

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);    // 將當前節點設置為頭節點
// 檢查喚醒過程是否需要往下傳遞,並且檢查頭結點的等待狀態
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())  // 如果下一個節點是嘗試以共享狀態獲取獲取執行權限的節點,則將其喚醒
doReleaseShared();
}
}
     setHeadAndPropagate(Node, int)方法主要作用是設置當前節點為頭結點,並且將喚醒工作往下傳遞,在傳遞的過程中,其會判斷被傳遞的節點是否是以共享模式嘗試獲取執行權限的,如果不是,則傳遞到該節點處為止(一般情況下,等待隊列中都只會都是處於共享模式或者處於獨佔模式的節點)。也就是說,頭結點會依次喚醒後續處於共享狀態的節點,這也就是共享鎖與獨佔鎖的實現方式。這裡doReleaseShared()方法也就是我們前面講到的會將離頭結點最近的一個處於等待狀態的節點喚醒的方法。

CountDownLatch的demo

public class CountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("run..");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
run..run..run..run..run..run..run..run..run..run..end

總結

CountDownLatch的作用就是允許一個或多個線程等待其他線程完成操作,看起來有點類似join() 方法,但其提供了比 join() 更加靈活的API。CountDownLatch可以手動控制在n個線程裡調用n次countDown方法使計數器進行減一操作,也可以在一個線程裡調用n次執行減一操作。而 join() 的實現原理是不停檢查join線程是否存活,如果 join 線程存活則讓當前線程永遠等待。所以兩者之間相對來說還是CountDownLatch使用起來較為靈活。

參考自:《Java併發編程的藝術》和www.jianshu.com/p/128476015…

相關文章

聊聊MVCC和NextkeyLocks

事務ACID特性與隔離級別

聊聊J.U.CAQS

詳解CyclicBarrier