聊聊J.U.CAQS

NO IMAGE

java.util.concurrent(J.U.C)大大提高了併發性能,AQS 被認為是 J.U.C 的核心。

AQS簡介

java的內置鎖一直都是備受爭議的,在JDK 1.6之前,synchronized這個重量級鎖其性能一直都是較為低下,雖然在1.6後,進行大量的鎖優化策略,但是與Lock相比synchronized還是存在一些缺陷的:雖然synchronized提供了便捷性的隱式獲取鎖釋放鎖機制(基於JVM機制),但是它卻缺少了獲取鎖與釋放鎖的可操作性,可中斷、超時獲取鎖,且它為獨佔式在高併發場景下性能大打折扣。
在介紹Lock之前,我們需要先熟悉一個非常重要的組件,掌握了該組件JUC包下面很多問題都不在是問題了。該組件就是AQS。

AQS是什麼?

AQS:AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其他同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC併發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。它是JUC併發包中的核心基礎組件。
AQS解決了實現同步器時涉及當的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基於AQS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。

工作過程

AQS通過內置的FIFO同步隊列來完成資源獲取線程的排隊工作,如果當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構造成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。

AQS主要提供瞭如下一些方法

  • getState():返回同步狀態的當前值;
  • setState(int newState):設置當前同步狀態;
  • compareAndSetState(int expect, int update):使用CAS設置當前狀態,該方法能夠保證狀態設置的原子性;
  • tryAcquire(int arg):獨佔式獲取同步狀態,獲取同步狀態成功後,其他線程需要等待該線程釋放同步狀態才能獲取同步狀態
  • tryRelease(int arg):獨佔式釋放同步狀態;
  • tryAcquireShared(int arg):共享式獲取同步狀態,返回值大於等於0則表示獲取成功,否則獲取失敗;
  • tryReleaseShared(int arg):共享式釋放同步狀態;
  • isHeldExclusively():當前同步器是否在獨佔式模式下被線程佔用,一般該方法表示是否被當前線程所獨佔;
  • acquire(int arg):獨佔式獲取同步狀態,如果當前線程獲取同步狀態成功,則由該方法返回,否則,將會進入同步隊列等待,該方法將會調用可重寫的tryAcquire(int arg)方法;
  • acquireInterruptibly(int arg):與acquire(int arg)相同,但是該方法響應中斷,當前線程為獲取到同步狀態而進入到同步隊列中,如果當前線程被中斷,則該方法會拋出InterruptedException異常並返回;
  • tryAcquireNanos(int arg,long nanos):超時獲取同步狀態,如果當前線程在nanos時間內沒有獲取到同步狀態,那麼將會返回false,已經獲取則返回true;
  • acquireShared(int arg):共享式獲取同步狀態,如果當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨佔式的主要區別是在同一時刻可以有多個線程獲取到同步狀態;
  • acquireSharedInterruptibly(int arg):共享式獲取同步狀態,響應中斷;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式獲取同步狀態,增加超時限制;
  • release(int arg):獨佔式釋放同步狀態,該方法會在釋放同步狀態之後,將同步隊列中第一個節點包含的線程喚醒;
  • releaseShared(int arg):共享式釋放同步狀態;

AQS的原理

在基於AQS構建的同步器中,只能在一個時刻發生阻塞,從而降低上下文切換的開銷,提高了吞吐量。同時在設計AQS時充分考慮了可伸縮性,因此J.U.C中所有基於AQS構建的同步器均可以獲得這個優勢。
AQS的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態。
AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操作,當然AQS可以確保對state的操作是安全的。

定義(源碼分析)

public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java.io.Serializable { 
//等待隊列的頭節點
private transient volatile Node head;
//等待隊列的尾節點
private transient volatile Node tail;
//同步狀態
private volatile int state;
protected final int getState() { return state;}
protected final void setState(int newState) { state = newState;}
...
}
隊列同步器AQS是用來構建鎖或其他同步組件的基礎框架,內部使用一個int成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作,其中內部狀態state,等待隊列的頭節點head和尾節點head,都是通過volatile修飾,保證了多線程之間的可見。
在深入實現原理之前,我們先看看內部的FIFO隊列是如何實現的。

static final class Node {
//該等待同步的節點處於共享模式
static final Node SHARED = new Node();
//該等待同步的節點處於獨佔模式
static final Node EXCLUSIVE = null;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//等待狀態,這個和state是不一樣的:有1,0,-1,-2,-3五個值
volatile int waitStatus;
volatile Node prev;//前驅節點
volatile Node next;//後繼節點
volatile Thread thread;//等待鎖的線程
Node nextWaiter;//和節點是否共享有關
...
}
先來一張形象的圖

聊聊J.U.CAQS

黃色節點是默認head節點,其實是一個空節點,我覺得可以理解成代表當前持有鎖的線程,每當有線程競爭失敗,都是插入到隊列的尾節點,tail節點始終指向隊列中的最後一個元素。
每個節點中, 除了存儲了當前線程,前後節點的引用以外,還有一個waitStatus變量,用於描述節點當前的狀態。多線程併發執行時,隊列中會有多個節點存在,這個waitStatus其實代表對應線程的狀態:有的線程可能獲取鎖因為某些原因放棄競爭;有的線程在等待滿足條件,滿足之後才能執行等等。一共有4種狀態:
1. CANCELLED = 1 取消狀態
該節點的線程可能由於超時或被中斷而處於被取消(作廢)狀態,一旦處於這個狀態,節點狀態將一直處於CANCELLED(作廢),因此應該從隊列中移除.
2. SIGNAL = -1 等待觸發狀態
當前節點為SIGNAL時,後繼節點會被掛起,因此在當前節點釋放鎖或被取消之後必須被喚醒(unparking)其後繼結點.
3. CONDITION = -2 等待條件狀態
該節點的線程處於等待條件狀態,不會被當作是同步隊列上的節點,直到被喚醒(signal),設置其值為0,重新進入阻塞狀態.
4. PROPAGATE 狀態需要向後傳播
等待隊列是FIFO先進先出,只有前一個節點的狀態為SIGNAL時,當前節點的線程才能被掛起。

實現原理

子類重寫tryAcquire和tryRelease方法通過CAS指令修改狀態變量state。

public final void acquire(int arg) {   
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    
selfInterrupt();
}

線程獲取鎖過程

下列步驟中線程A和B進行競爭。
1. 線程A執行CAS執行成功,state值被修改並返回true,線程A繼續執行。
2. 線程A執行CAS指令失敗,說明線程B也在執行CAS指令且成功,這種情況下線程A會執行步驟3。
3. 生成新Node節點node,並通過CAS指令插入到等待隊列的隊尾(同一時刻可能會有多個Node節點插入到等待隊列中),如果tail節點為空,則將head節點指向一個空節點(代表線程B),具體實現如下:

private Node addWaiter(Node mode) {
//把當前線程包裝為node,設為獨佔模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果tail不為空,把node插入末尾
if (pred != null) {
node.prev = pred;
//此時可能有其他線程插入,所以重新判斷tail
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//此時可能有其他線程插入,所以重新判斷tail是否為空
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
4. node插入到隊尾後,該線程不會立馬掛起,會進行自旋操作。因為在node的插入過程,線程B(即之前沒有阻塞的線程)可能已經執行完成,所以要判斷該node的前一個節點pred是否為head節點(代表線程B),如果pred == head,表明當前節點是隊列中第一個“有效的”節點,因此再次嘗試tryAcquire獲取鎖,
  1. 如果成功獲取到鎖,表明線程B已經執行完成,線程A不需要掛起。
  2. 如果獲取失敗,表示線程B還未完成,至少還未修改state值。進行步驟5。

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果它的前繼節點為頭結點,嘗試獲取鎖,獲取成功則返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
5. 前面我們已經說過只有前一個節點pred的線程狀態為SIGNAL時,當前節點的線程才能被掛起。
  1. 如果pred的waitStatus == 0,則通過CAS指令修改waitStatus為Node.SIGNAL。
  2. 如果pred的waitStatus > 0,表明pred的線程狀態CANCELLED,需從隊列中刪除。
  3. 如果pred的waitStatus為Node.SIGNAL,則通過LockSupport.park()方法把線程A掛起,並等待被喚醒,被喚醒後進入步驟6。
具體實現如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE.  Indicate that we
* need a signal, but don't park yet.  Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
6. 線程每次被喚醒時,都要進行中斷檢測,如果發現當前線程被中斷,那麼拋出InterruptedException並退出循環。從無限循環的代碼可以看出,並不是被喚醒的線程一定能獲得鎖,必須調用tryAccquire重新競爭,因為鎖是非公平的,有可能被新加入的線程獲得,從而導致剛被喚醒的線程再次被阻塞,這個細節充分體現了“非公平”的精髓。

線程釋放鎖過程

  1. 如果頭結點head的waitStatus值為-1,則用CAS指令重置為0;
  2. 找到waitStatus值小於0的節點s,通過LockSupport.unpark(s.thread)喚醒線程。

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
如果node的後繼節點不為空且不是作廢狀態,則喚醒這個後繼節點,否則
從末尾開始尋找合適的節點,如果找到,則喚醒
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

總結

對獲取獨佔式鎖過程總結

AQS的模板方法acquire通過調用子類自定義實現的tryAcquire獲取同步狀態失敗後
->將線程構造成Node節點(addWaiter)
->將Node節點添加到同步隊列對尾(addWaiter)
->節點以自旋的方法獲取同步狀態(acquirQueued)。在節點自旋獲取同步狀態時,只有其前驅節點是頭節點的時候才會嘗試獲取同步狀態,如果該節點的前驅不是頭節點或者該節點的前驅節點是頭節點單獲取同步狀態失敗,則判斷當前線程需要阻塞,如果需要阻塞則需要被喚醒過後才返回。

釋放鎖過程總結

首先調用子類的tryRelease()方法釋放鎖,然後喚醒後繼節點,在喚醒的過程中,需要判斷後繼節點是否滿足情況,如果後繼節點不為且不是作廢狀態,則喚醒這個後繼節點,否則從tail節點向前尋找合適的節點,如果找到,則喚醒.

參考自:github.com/CyC2018/CS-…

相關文章

說一下聚簇索引&非聚簇索引

MySQL索引詳解

聊聊MVCC和NextkeyLocks

事務ACID特性與隔離級別