詳解CyclicBarrier

NO IMAGE

CyclicBarrier是什麼?

CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以讓一組線程達到一個屏障時被阻塞,直到最後一個線程達到屏障時,屏障才會開門,所有被阻塞的線程才會繼續執行。
他的主要用途是控制多個線程互相等待,只有當多個線程都到達時,這些線程才會繼續執行。
CyclicBarrier好比一扇門,默認情況下關閉狀態,堵住了線程執行的道路,直到所有線程都就位,門才打開,讓所有線程一起通過。

CyclicBarrier如何使用和工作?

API

CyclicBarrier有兩個構造函數

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
第一個參數,其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier已經到達屏障位置,線程被阻塞。
第二個參數,表示線程都處於barrier時,一起執行之前,其中barrierAction任務會在所有線程到達屏障後執行。
讓線程處於barrier狀態的方法await()

public int await()
public int await(long timeout, TimeUnit unit)
第一個默認方法,表示要等到所有的線程都處於barrier狀態,才一起執行
第二個方法,指定了等待的時間,當所有線程沒有都處於barrier狀態,又到了指定的時間,所在的線程就繼續執行了。

其它的一些方法

獲取當前有多少個線程阻塞等待在臨界點上
int getNumberWaiting()
用於查詢阻塞等待的線程是否被中斷
boolean isBroken()
CyclicBarrier是通過維護計數器來實現的。線程執行 await() 方法之後計數器會減 1,並進行等待,直到計數器為 0,所有調用 await() 方法而在等待的線程才能繼續執行。

CyclicBarrier的底層原理

CyclicBarrier類是concurrent併發包下的一工具類。CyclicBarrier實現主要基於ReentrantLock。
線程間同步阻塞是使用的是ReentrantLock,可重入鎖
線程間通信使用的是Condition,Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用。

public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
...省略後面代碼
}
其中Generation用來控制屏障的循環使用,如果generation.broken為true的話,說明這個屏障已經損壞,當某個線程await的時候,直接拋出異常
await實現

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) {  // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
  1. 每當線程執行await,內部變量count減1,如果count!= 0,說明有線程還未到屏障處,則在鎖條件變量trip上等待。
  2. 當count == 0時,說明所有線程都已經到屏障處,執行條件變量的signalAll方法喚醒等待的線程。
其中 nextGeneration方法可以實現屏障的循環使用:
  • 重新生成Generation對象
  • 恢復count值

CountDownLatch與CyclicBarrier的比較

CountDownLatch與CyclicBarrier都是用於控制併發的工具類,都可以理解成維護的就是一個計數器,但是這兩者還是各有不同側重點的:
  1. CountDownLatch一般用於某個線程A等待若干個其他線程執行完任務之後,它才執行;而CyclicBarrier一般用於一組線程互相等待至某個狀態,然後這一組線程再同時執行;CountDownLatch強調一個線程等多個線程完成某件事情。CyclicBarrier是多個線程互等,等大家都完成,再攜手共進。
  2. 調用CountDownLatch的countDown方法後,當前線程並不會阻塞,會繼續往下執行;而調用CyclicBarrier的await方法,會阻塞當前線程,直到CyclicBarrier指定的線程全部都到達了指定點的時候,才能繼續往下執行;
  3. CountDownLatch方法比較少,操作比較簡單,而CyclicBarrier提供的方法更多,比如能夠通過getNumberWaiting(),isBroken()這些方法獲取當前多個線程的狀態,並且CyclicBarrier的構造方法可以傳入barrierAction,指定當所有線程都到達時執行的業務功能;
  4. CountDownLatch是不能複用的,而CyclicLatch是可以複用的。
和 CountdownLatch 相似,都是通過維護計數器來實現的。線程執行 await() 方法之後計數器會減 1,並進行等待,直到計數器為 0,所有調用 await() 方法而在等待的線程才能繼續執行。
CyclicBarrier 和 CountdownLatch 的一個區別是,CyclicBarrier 的計數器通過調用 reset() 方法可以循環使用,所以它才叫做循環屏障。

CyclicBarrier的demo

public class CyclicBarrierExample {
public static void main(String[] args) {
final int totalThread = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.println("到達屏障..");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("屏障之後開始執行..");
});
}
executorService.shutdown();
}
}
到達屏障..
到達屏障..
到達屏障..
屏障之後開始執行..
屏障之後開始執行..
屏障之後開始執行..

參考自:

www.jianshu.com/p/424374d71…

www.jianshu.com/p/4ef4bbf01…

相關文章

MySQL索引詳解

聊聊MVCC和NextkeyLocks

事務ACID特性與隔離級別

聊聊J.U.CAQS