Java併發編程—併發流程控制與AQS原理及相關源碼解析

NO IMAGE

Java併發編程

代碼GitHub地址 github.com/imyiren/con…

  1. 刨根問底搞懂創建線程到底有幾種方法?

  2. 如何正確得啟動和停止一個線程 最佳實踐與源碼分析
  3. 多案例理解Object的wait,notify,notifyAll與Thread的sleep,yield,join等方法
  4. 瞭解線程屬性,如何處理子線程異常
  5. 多線程安全和性能問題
  6. JMM(Java內存模型)在併發中的原理與應用
  7. 深入理解死鎖問題及其解決方案
  8. 剖析線程池的使用與組成
  9. 帶你一文搞懂ThreadLocal的用法以及內部原理
  10. J.U.C下Lock的分類及特點詳解(結合案例和源碼)
  11. J.U.C下各種Atomic類使用及CAS相關源碼分析
  12. 結合源碼分析ConcurrenthashMap與CopyOnWriteArrayList的原理
  13. 併發流程控制與AQS原理及相關源碼解析

0. 主要內容

  • 文章分為兩部分:
  1. 第一個部分主要講併發流程控制的各大類的使用及案例
  2. 第二部分主要是先將AQS的組成及原理,然後結合CountDownLatch、Semaphore等分析源碼邏輯

ps: 文章內容比較多

1. 併發流程控制

1.1 什麼是併發流程控制

  • 併發流程控制,就是讓線程之間相互配合完成任務,來滿足業務邏輯
  • 如:讓線程A等待線程B完成後再執行等策略

1.2 併發流程控制的工具

作用說明
Semaphore信號量:可以通過控制“許可”的數量,來保證線程間配合線程只有拿到了許可才可以繼續運行
CyclicBarrier循環柵欄:線程會等待,直到足夠多線程達到了規定數量,再執行下一步任務適用於線程間相互等待處理結果就緒的場景
Phaser和CyclicBarrier類似,但是計數可變java7加入的新類
CountDownLatch也是一個計數等待相關,數量地見到0時,觸發動作不可重複使用
Exchanger讓兩個線程在合適時交換對象適用於兩個線程工作在同一個類的不同實例上時,用於交換數據
Condition可以控制線程的等待和喚醒是Object.wati()的升級版

2. CountDownLatch計數門閂

2.1 作用

  • 併發流程控制的工具,用於等待數量(我們設定的)足夠後再執行某些任務

2.2 主要方法

  • CountDownLatch(int count):只有一個構造方法,參數count為需要倒數的值
  • await():調用此方法的線程會被掛起,它會等到count值為零的時候才繼續執行
  • countdown():講count減1,直到0,等待的線程會被喚醒

2.3 用法一:等待線程執行完畢

/**
* @author yiren
*/
public class CountDownLatchExample01 {
public static void main(String[] args) throws InterruptedException {
AtomicInteger integer = new AtomicInteger(1);
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName()+ " produce ....");
TimeUnit.SECONDS.sleep(1);
integer.incrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
latch.countDown();
}
});
}
System.out.println(Thread.currentThread().getName() + " waiting....");
latch.await();
System.out.println(Thread.currentThread().getName() + " finished!");
System.out.println(Thread.currentThread().getName() + " num: " +  integer.get());
executorService.shutdown();
}
}
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-3 produce ....
main waiting....
pool-1-thread-4 produce ....
pool-1-thread-5 produce ....
main finished!
main num: 6
Process finished with exit code 0

2.4 用法二:多等一

/**
* @author yiren
*/
public class CountDownLatchExample02 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ready!");
try {
latch.await();
System.out.println(Thread.currentThread().getName()+ " produce ....");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
latch.countDown();
}
});
}
Thread.sleep(10);
System.out.println(Thread.currentThread().getName() + " ready!");
latch.countDown();
System.out.println(Thread.currentThread().getName() + " go!");
executorService.shutdown();
}
}
pool-1-thread-1 ready!
pool-1-thread-4 ready!
pool-1-thread-3 ready!
pool-1-thread-2 ready!
pool-1-thread-5 ready!
main ready!
main go!
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-5 produce ....
pool-1-thread-3 produce ....
pool-1-thread-4 produce ....
Process finished with exit code 0

2.4 注意

  • CountDownLatch不僅可以無限等待,還可以給參數,在指定的事件內如果等到就喚醒線程繼續執行

  • boolean await(long timeout, TimeUnit unit)
    
  • CountDownLatch不能重用,如果涉及重新計數,可以使用CyclicBarrier或者新創建CountDownLatch

3. Semaphore信號量

3.1 信號量作用

  • Semaphore可以用來限制或管理數量有限的資源使用情況

  • 信號量的租用是維護一個許可計數,線程可以獲取許可,然後信號量減一;線程也可以釋放許可,信號量就加一;如果信號量的許可頒發完了,其他線程想要獲取,就需要等待,直到有另外的線程釋放了許可。

3.2 信號量使用

  1. 初始化Semaphore指定許可數量
  2. 在需要獲取許可的代碼前面加上acquire()或者acquireUniterruptibly()方法
  3. 任務執行完成有調用release()釋放許可

3.3 主要方法

  • Semaphore(int permits, boolean fair)這裡設置許可數量,以及是否使用公平策略。
    • 如果傳入true那麼久吧等待線程放入到FIFO的隊列裡面。
  • aquire()請求許可,可以響應中斷
  • aquireUnniterruptibly()請求許可不可中斷
  • tryAcquire()看看現在有沒有空閒的許可,如果有那就返回true;這個方法還可以設置等待時間給一個timeout,讓線程等待一段時間。
  • release()釋放許可

3.4 案例演示

/**
* @author yiren
*/
public class SemaphoreExample01 {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3, true);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 8; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName()+" start to get permit");
semaphore.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now() +" finished!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
pool-1-thread-1 start to get permit
pool-1-thread-4 start to get permit
pool-1-thread-3 start to get permit
pool-1-thread-2 start to get permit
pool-1-thread-5 start to get permit
pool-1-thread-6 start to get permit
pool-1-thread-7 start to get permit
pool-1-thread-8 start to get permit
pool-1-thread-3 2020-02-21T19:54:47.392 finished!
pool-1-thread-1 2020-02-21T19:54:47.392 finished!
pool-1-thread-4 2020-02-21T19:54:47.392 finished!
pool-1-thread-6 2020-02-21T19:54:49.396 finished!
pool-1-thread-2 2020-02-21T19:54:49.396 finished!
pool-1-thread-5 2020-02-21T19:54:49.396 finished!
pool-1-thread-8 2020-02-21T19:54:51.401 finished!
pool-1-thread-7 2020-02-21T19:54:51.401 finished!
Process finished with exit code 0

3.5 注意點

  • 獲取和釋放的許可證必須一致,acquire和release都是可以傳入數值的來確定獲取和釋放的數量。如果我們獲取和釋放不一致,就會容易導致程序bug。當然也不是絕對,除非有特殊業務需求,否則都獲取釋放設置為一樣的
  • 注意在初始化Semaphore的時候設置公平性,一般設置為true會比較合理。如果插隊情況比較嚴重的話,某些線程可能一直阻塞
  • 獲取和釋放許可對線程並不要求,線程A獲取了可以線程B釋放。

4. Condition接口

4.1 作用

  • 當線程A需要等待某個任務或者某個資源,就可以執行condition.await()方法,然後就會陷入阻塞狀態。

  • 此時另一個線程B,去獲取資源或者執行任務完成後,調用condition.signal()或者signalAll()方法,通知線程A,繼續執行

  • 這個類似於object.wait()notify()notifyAll()

  • signal()方法如果遇到多個線程都在等待的時候,會去喚醒等待時間最長的那個

  • 在我們ReentrantLock中就可以直接新建Condition。看下面案例

4.2 案例演示

  • 普通用法
/**
* @author yiren
*/
public class ConditionExample01 {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
task1();
});
Thread thread2 = new Thread(() -> {
task2();
});
thread1.start();
Thread.sleep(100);
thread2.start();
}
private static void task1() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " start await()");
condition.await();
System.out.println(Thread.currentThread().getName() + " await finished!");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private static void task2() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " start signal()");
Thread.sleep(1000);
condition.signal();
System.out.println(Thread.currentThread().getName() + " signal finished!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
Thread-0 start await()
Thread-1 start signal()
Thread-1 signal finished!
Thread-0 await finished!
Process finished with exit code 0
  • 生產者消費者模式
/**
* @author yiren
*/
public class ConditionExample02 {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionExample02 conditionDemo2 = new ConditionExample02();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("隊列空,等待數據");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signalAll();
System.out.println("從隊列裡取走了一個數據,隊列剩餘" + queue.size() + "個元素");
} finally {
lock.unlock();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("隊列滿,等待有空餘");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("向隊列插入了一個元素,隊列剩餘空間" + (queueSize - queue.size()));
} finally {
lock.unlock();
}
}
}
}
}
  • 以上使用兩個Condition作為隊列滿和空的通知傳遞工具在生產者和消費者之間互通

4.3 注意點

  • 我們知道Lock可以看做synchronized的替代方案,而Condition就是用來替代object.wait/notify的,在用法上幾乎一致。

  • 調用await()方法時必須持有Lock鎖,否則會拋出異常,並且await()方法會釋放當前持有的Lock鎖,

  • 一個Lock鎖可以有多個Condition更加靈活

5. CyclicBarrier循環柵欄

5.1 作用

  • CyclicBarrier循環柵欄和CountDownLatch很類似,都能阻塞一組線程
  • 當需要多個線程配合完成任務,並最後需要統一彙總時,我們就可以使用CyclicBarrier,當某個線程完成任務後,它先會等待,等到所有線程都執行好了任務,再一起繼續執行剩下的任務
    • 比如:同時出去聚餐約在了公司,等大家到公司了一起走過去。
  • 但是注意CyclicBarrier是可以重複使用的,這個和CountDownLatch不同

5.2 案例

/**
* @author yiren
*/
public class CyclicBarrierExample {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到場了, 大家統一出發!");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new Task(i, cyclicBarrier)).start();
}
}
static class Task implements Runnable {
private int id;
private CyclicBarrier cyclicBarrier;
public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程" + id + "現在前往集合地點");
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("線程" + id + "到了集合地點,開始等待其他人到達");
cyclicBarrier.await();
System.out.println("線程" + id + "出發了");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
線程0現在前往集合地點
線程2現在前往集合地點
線程3現在前往集合地點
線程1現在前往集合地點
線程4現在前往集合地點
線程5現在前往集合地點
線程6現在前往集合地點
線程7現在前往集合地點
線程8現在前往集合地點
線程9現在前往集合地點
線程3到了集合地點,開始等待其他人到達
線程9到了集合地點,開始等待其他人到達
線程8到了集合地點,開始等待其他人到達
線程4到了集合地點,開始等待其他人到達
線程5到了集合地點,開始等待其他人到達
所有人都到場了, 大家統一出發!
線程5出發了
線程3出發了
線程8出發了
線程4出發了
線程9出發了
線程1到了集合地點,開始等待其他人到達
線程6到了集合地點,開始等待其他人到達
線程0到了集合地點,開始等待其他人到達
線程7到了集合地點,開始等待其他人到達
線程2到了集合地點,開始等待其他人到達
所有人都到場了, 大家統一出發!
線程2出發了
線程1出發了
線程7出發了
線程0出發了
線程6出發了
Process finished with exit code 0
  • 每五個人到了過後,就出發一批

5.3 CountDownLatchCyclicBarrier`區別

  • 作用不同:CountDownLatch使用countDown()是用於事件的,而CyclicBarrier使用await()是用於線程的
  • 可重用性不同:CountDownLatch在倒數到0後不能再次重用,除非創建新對象;而CyclicBarrier是可以直接重用的

6. 深入AQS理解J.U.C的根基

6.1 AQS作用及其重要性

  • AQS在CountDownLatch等工具內都有使用,全稱是:AbstractQueuedSynchronizer是一個抽象類

  • 鎖和上面的線程併發控制類(Semaphore等)都有類似的地方。 其實他們底層都是使用了AQS作為基類的拓展

  • 正因為他們很多工作都類似,JDK就把這部分通用邏輯抽離了出來,提供給他們直接使用,使其不必關注很多深層次的細節,從而完成他們的功能。

  • 我們可以大致看一下我們鎖用到的這些併發控制的工具類和鎖的內部實現

    • `Semaphore“
    public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    Sync(int permits) {
    setState(permits);
    }
    ......
    
    • ReentrantLock
    public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;
    ......
    
    • CountDownLatch
    public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    Sync(int count) {
    setState(count);
    }
    ......
    
  • 由上源碼我們可以看到,裡面都有一個內部類,Sync繼承自AbstractQueuedSynchronizer

  • 那麼AQS是用來幹些什麼事情的呢?

    • J.U.C基本都是是基於AQS實現的,AQS是一個用於構建鎖、同步器、線程協作工具類的框架供給子類使用,主要使用模板模式來設計。
    • 它主要工作就是管理線程的阻塞與喚醒,實現同步的管理,以及阻塞線程的隊列管理工作

6.2 AQS的組成及內部原理

  • AbstractQueuedSynchronizer自JDK1.5加入,是基於FIFO等待隊列實現的一個用於同步器的基礎框架。
  • JDK1.8 繼承AQS實現的類:
Java併發編程—併發流程控制與AQS原理及相關源碼解析

  • 我們可以看到,在可重入鎖,讀寫鎖,計數門閂等,信號量裡面都是用了AQS的子類,接下來我們就學習一下AQS的內部原理
  1. AQS的三大部分

    • state:狀態,

    • FIFO隊列:線程競爭鎖的管理隊列

    • 獲取和釋放方法:需要工具類去實現的方法

  2. state:狀態

        /**
    * The synchronization state.
    */
    private volatile int state;
    
    • 它的含義並不具體,根據實現的不同而不同,如:Semaphore內是剩餘許可數量、CountDownLatch內是還需要倒數的數量,可看做一個計數器,只是不同類的作用及意義不用
        protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    • 狀態值的更新,是使用Unsafe的CAS完成
    • 在ReentrantLock中:state表示鎖的佔用情況,可重入的計數,每重入一次就加一,當要釋放鎖時,它的值就會變成0,表示不被任何線程佔有。
  3. FIFO隊列:

        /**
    * Head of the wait queue, lazily initialized.  Except for
    * initialization, it is modified only via method setHead.  Note:
    * If head exists, its waitStatus is guaranteed not to be
    * CANCELLED.
    */
    private transient volatile Node head;
    /**
    * Tail of the wait queue, lazily initialized.  Modified only via
    * method enq to add new wait node.
    */
    private transient volatile Node tail;
    
    • 這個隊列是用來存放等待的線程的,AQS會對這個隊列進行管理。當多個線程競爭鎖時,沒有拿到鎖的,就會被翻到隊列中,當前拿到鎖的執行任務的線程結束,AQS就會從隊列中選一個線程來佔有這個鎖。
    • AQS維護一個雙向鏈表的等待隊列,把等待線程都放到這個隊列裡面管理;隊列頭節點是當前拿到鎖的線程;在AQS中保存了這個隊列的頭尾節點。
  4. 獲取和釋放的方法

    • 獲取方法:
      • 獲取操作會依賴state變量,經常會阻塞,如:獲取不到鎖的時候,獲取不到許可的時候等
      • ReentrantLock中,就是獲取鎖。state+1
      • Semaphore中就是acquire獲取許可,state-1,當state==0就會阻塞
      • CountDownLatch中就是await方法,就是等待state==0
    • 釋放方法:
      • 釋放操作不會阻塞
      • ReentrantLock中就是unlock方法調用release(1)對應state-1
      • Semaphore中就是realease,也是state-1
      • CountDownLatch中就是countDown方法,也是state-1
    • 一般情況下,實現類都會實現tryAcquiretryRelease相關方法,以對應各個類的需求

6.3 AQS的用法

  1. 指定協作邏輯,實現獲取和釋放方法
  2. 在內部寫一個Sync類繼承AQS
  3. 根據是否獨佔來決定重寫的方法:獨佔使用tryAcquire/tryRelease、共享使用tryAcquireShared(int acquires)/tryReleaseShared(int releases),在主邏輯裡面的獲取釋放相關方法中調用Sync的方法

7. AQS在CountDownLatch中的源碼剖析

  • 下面我們以CountDownLatch為例分析源碼:

  • 構造函數

    • 我們看到內部實現就是初始化一個Sync然後把計數值傳入
        public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
    }
    
    • 我們可以看下面的CountDownlatchSync的實現,在構造方法創建的Sync傳入的count調用了setState方法傳入了AQSstate
  • CountDownLatch內部有一個繼承AQS的Sync

    
    private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    Sync(int count) {
    setState(count);
    }
    int getCount() {
    return getState();
    }
    protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
    }
    protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
    int c = getState();
    if (c == 0)
    return false;
    int nextc = c-1;
    if (compareAndSetState(c, nextc))
    return nextc == 0;
    }
    }
    }
    
  • CountDownLatchgetCount()方法

    public long getCount() {
    return sync.getCount();
    }
    
    • 我們可以看到getCount實際也是調用SyncgetCount()來獲取state並返回
  • CountDownLatchcountDown()方法

        public void countDown() {
    sync.releaseShared(1);
    }
    
    • 我們看一看到它直接調用了AQSreleaseShared(1)
        public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
    }
    return false;
    }
    
    • releaseShared則是回去調用CountDownLatch中實現的tryReleaseShared
            protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
    int c = getState();
    if (c == 0)
    return false;
    int nextc = c-1;
    if (compareAndSetState(c, nextc))
    return nextc == 0;
    }
    }
    
    • 而在tryReleaseShared中則是主要對state的值做-1操作,如果state大於零可以獲取到就減一併且用CAS併發更新值,如果最新值為0就返回true
    • 返回true過後就doReleaseShared釋放鎖,喚醒隊列裡面的等待線程。也就是調用了await()方法的線程
  • CountDownLatchawait()方法

    public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
    }
    
    • await則會調用AQS中的默認實現sync.acquireSharedInterruptibly(1);
        public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
    }
    
    • 而裡面則是調用tryAcquireShared(arg) < 0看是否小於0,如果小於0就代表沒有獲取到鎖,就調用doAcquireSharedInterruptibly(arg);入隊

    • tryAcquireShared則是在CountDownLatch中的Sync實現的

            protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
    }
    
    • 如果當前state為0了(也就是說計數已經到0了)就返回一個1就不會滿住上面的acquireSharedInterruptibly方法中的條件,就會放行,如果不等於0就會返回-1,此時就會入隊。調用doAcquireSharedInterruptibly方法
        private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    
    • 這個方法首先會把當前線程在addWaiter中包裝成一個Node節點並添加到隊列尾部;而這個Node節點就是FIFO隊列的節點。
    • 然後就會進入循環,如果當前節點不是head,那麼就會進入到後面的判斷,其中重要的是parkAndCheckInterrupt,方法如下:
        private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
    }
    
    • 它會調用LockSupportpark並且此park方法就是封裝了Unsafe的native方法park()來把線程掛起進入阻塞狀態
        public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
    }
    
    • 再往下就沒意義了。我們只需要知道doAcquireSharedInterruptibly方法就是把當前線程放到阻塞隊列中,並且把線程阻塞就OK了。
  • AQS在CountDownLatch中使用的一些點:

    • 調用CountDownLatchawait()時,便會嘗試獲取共享鎖,開始時是獲取不到鎖的,於是就被阻塞
    • 可以獲取到的條件就是計數器為0,也就是state==0的時候。
    • 只有每次調用countDown方法才會使得計數器減一,減到0時就回去喚醒阻塞中的線程。

8. AQS在Semaphore中的源碼剖析

  • 由於上面講得很細了,接下來就簡略一些

  • Semaphorestate就是許可證的數量

  • 主要的操作就是acquire和release,也是借用Sync對state的操作來控制線程的阻塞與喚醒

    public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
    public void release() {
sync.releaseShared(1);
}
  • 先看下acquire調用的acquireSharedInterruptibly此方法在上面已經說過。
    public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
  • 而在Semaphore中Sync有兩個實現:NonfairSyncFairSync
  • 在FairSync中tryAcquireShared就會有hasQueuedPredecessors判斷,如果不是頭節點,那就返回-1,在acquireSharedInterruptibly方法中去調用doAcquireSharedInterruptibly入隊並且阻塞線程
        protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
  • 而在NonfairSync中而是直接調用SyncnonfairTryAcquireShared

    protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
    }
    
            final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 ||
    compareAndSetState(available, remaining))
    return remaining;
    }
    }
    
    • 可以看到其中並沒有對是否阻塞隊列的頭節點判斷,直接去獲取值,判斷是會否許可足夠。
  • release中則是調用AQS的releaseShared其也是調用SemaphoreSynctryReleaseShared來判斷是否需要釋放鎖,去喚醒阻塞線程

    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
    }
    return false;
    }
    
  • tryReleaseShared

        protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
  • 我們可以看到此處就是關於Semaphore的已獲取許可的釋放 把state加回去然後用CAS更新state

9. AQS在ReentrantLock中的應用

  • 源碼就不分析了

  • ReentrantLock中,state主要是重入的次數,加鎖的時候state+1 ,而在釋放鎖的時候,state-1然後判斷當前的state==0

  • ReentrantLock中與AQS相關的有三個類:UnfairSyncFairSyncSync

  • 關於加鎖和解鎖的邏輯也是AQS中的acquire方法的邏輯(獲取鎖失敗就會放入隊列中)和release方法(調用子類的tryRelease來去掉頭部,並且喚醒線程)

  • 而加鎖解鎖中的邏輯,主要是公平鎖和非公平鎖的區別,公平鎖會去判斷是否在隊列頭部,如果在才會去執行,而非公平鎖則會搶鎖。不會管你是不是在隊列頭部。

  • 相信在上面的源碼分析過後,分析ReentrantLock是十分簡單的。大家可以自行分析。


  • 覺得可以就點個贊吧👍 Thanks!

關於我

  • 座標杭州,普通本科高校計算機科學與技術專業。
  • 20年畢業,主做Java技術棧後端開發。
  • GitHub: github.com/imyiren
  • Blog : imyi.ren

相關文章

一文完全吃透JavaScript繼承(面試必備良藥)

2020年你不能不知道的webpack基本配置

風物長宜放眼量,人間正道是滄桑一位北美IT技術人破局

領域驅動設計(DDD)實踐之路(一)