共享鎖和排它鎖(ReentrantReadWriteLock)
1、什麼是共享鎖和排它鎖

     共享鎖就是允許多個執行緒同時獲取一個鎖,一個鎖可以同時被多個執行緒擁有。
     排它鎖,也稱作獨佔鎖,一個鎖在某一時刻只能被一個執行緒佔有,其它執行緒必須等待鎖被釋放之後才可能獲取到鎖。
2、排它鎖和共享鎖例項
     ReentrantLock就是一種排它鎖。CountDownLatch是一種共享鎖。這兩類都是單純的一類,即,要麼是排它鎖,要麼是共享鎖。
   ReentrantReadWriteLock是同時包含排它鎖和共享鎖特性的一種鎖,這裡主要以ReentrantReadWriteLock為例來進行分析學習。我們使用ReentrantReadWriteLock的寫鎖時,使用的便是排它鎖的特性;使用ReentrantReadWriteLock的讀鎖時,使用的便是共享鎖的特性。
3、鎖的等待佇列組成
 ReentrantReadWriteLock有一個讀鎖(ReadLock)和一個寫鎖(WriteLock)屬性,分別代表可重入讀寫鎖的讀鎖和寫鎖。有一個Sync屬性來表示這個鎖上的等待佇列。ReadLock和WriteLock各自也分別有一個Sync屬性表示在這個鎖上的佇列
通過建構函式來看,
    public ReentrantReadWriteLock(boolean fair)
{
        sync = (fair)? new FairSync()
new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
在建立讀鎖和寫鎖物件的時候,會把這個可重入的讀寫鎖上的Sync屬性傳遞過去。
protected ReadLock(ReentrantReadWriteLock
lock) {
            sync = lock.sync;
        }
protected WriteLock(ReentrantReadWriteLock
lock) {
            sync = lock.sync;
        }
所以,最終的效果是讀鎖和寫鎖使用的是同一個執行緒等待佇列。這個佇列就是通過我們在前面介紹過的AbstractQueuedSynchronizer實現的。
4、鎖的狀態
    
既然讀鎖和寫鎖使用的是同一個等待佇列,那麼這裡要如何區分一個鎖的讀狀態(有多少個執行緒正在讀這個鎖)和寫狀態(是否被加了寫鎖,哪個執行緒正在寫這個鎖)。
首先每個鎖都有一個exclusiveOwnerThread屬性,這是繼承自AbstractQueuedSynchronizer,來表示當前擁有這個鎖的執行緒。那麼,剩下的主要問題就是確定,有多少個執行緒正在讀這個鎖,以及是否加了寫鎖。
這裡可以通過執行緒獲取鎖時執行的邏輯來看,下面是執行緒獲取讀鎖時會執行的一部分程式碼。
  final boolean tryReadLock()
{
            Thread current = Thread.currentThread();
            for (;;)
{
                int c
= getState();
                if (exclusiveCount(c)
!= 0 &&
                    getExclusiveOwnerThread() != current)
                    return false ;
                if (sharedCount(c)
== MAX_COUNT)
                    throw new Error(“Maximum
lock count exceeded” );
                if (compareAndSetState(c,
c SHARED_UNIT)) {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh
== null || rh.tid != current.getId())
                        cachedHoldCounter = rh = readHolds.get();
                    rh.count ;
                    return true ;
                }
            }
        }
注意這個函式的呼叫exclusiveCount(c) ,用來計算這個鎖當前的寫加鎖次數(同一個程序多次進入會累加)。程式碼如下
/** Returns the number of shared holds represented in count  */
        static int sharedCount( int c)   
return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds
represented in count  */
         static int exclusiveCount (int c)
return c & EXCLUSIVE_MASK; }
相關常量的定義如下
static final int SHARED_SHIFT  
= 16;
static final int EXCLUSIVE_MASK =
(1 << SHARED_SHIFT) – 1; 
如果從二進位制來看EXCLUSIVE_MASK的表示,這個值的低16位全是1,而高16位則全是0,所以exclusiveCount是把state的低16位取出來,表示當前這個鎖的寫鎖加鎖次數。
再來看sharedCount,取出了state的高16位,用來表示這個鎖的讀鎖加鎖次數。所以,這裡是用state的高16位和低16位來分別表示這個鎖上的讀鎖和寫鎖的加鎖次數。
現在再回頭來看tryReadLock實現,首先檢查這個鎖上是否被加了寫鎖,同時檢查加寫鎖的是不是當前執行緒。如果不是被當前執行緒加了寫鎖,那麼試圖加讀鎖就失敗了。如果沒有被加寫鎖,或者是被當前執行緒加了寫鎖,那麼就把讀鎖加鎖次數加1,通過compareAndSetState(c,
c SHARED_UNIT)來實現
SHARED_UNIT的定義如下,剛好實現了高16位的加1操作。
static final int SHARED_UNIT   
= (1 << SHARED_SHIFT);
5、執行緒阻塞和喚醒的時機
執行緒的阻塞和訪問其他鎖的時機相似,線上程檢視獲取鎖,但這個鎖又被其它執行緒佔領無法獲取成功時,執行緒就會進入這個鎖物件的等待佇列中,並且執行緒被阻塞,等待前面執行緒釋放鎖時被喚醒。
但因為加讀鎖和加寫鎖進入等待佇列時存在一定的區別,加讀鎖時,final Node
node = addWaiter(Node.SHARED);節點的nextWaiter指向一個共享節點,表明當前這個執行緒是處於共享狀態進入等待佇列。
加寫鎖時如下,
public final void acquire (int arg)
{
        if (!tryAcquire(arg)
&&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
執行緒是處於排它狀態進入等待佇列的。
線上程的阻塞上,讀鎖和寫鎖的時機相似,但線上程的喚醒上,讀鎖和寫鎖則存在較大的差別。
讀鎖通過AbstractQueuedSynchronizer的doAcquireShared來完成獲取鎖的動作。
  private void doAcquireShared( int arg)
{
        final Node
node = addWaiter(Node.SHARED);
        try {
            boolean interrupted
false;
            for (;;)
{
                final Node
p = node.predecessor();
                if (p
== head) {
                    int r
= tryAcquireShared(arg);
                    if (r
>= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null ; //
help GC
                        if (interrupted)
                            selfInterrupt();
                        return ;
                    }
                }
                if (shouldParkAfterFailedAcquire(p,
node) &&
                    parkAndCheckInterrupt())
                    interrupted = true ;
            }
        } catch (RuntimeException
ex) {
            cancelAcquire(node);
            throw ex;
        }
    }
在tryAcquireShared獲取讀鎖成功後(返回正數表示獲取成功),有一個setHeadAndPropagate的函式呼叫。
寫鎖通過AbstractQueuedSynchronizer的acquire來實現鎖的獲取動作。
  public final void acquire( int arg)
{
        if (!tryAcquire(arg)
&&
            acquireQueued(addWaiter(Node.EXCLUSIVE),
arg))
            selfInterrupt();
    }
如果tryAcquire獲取成功則直接返回,否則把執行緒加入到鎖的等待佇列中。和一般意義上的ReentrantLock的原理一樣。
所以在加鎖上,主要的差別在於這個setHeadAndPropagate方法,其程式碼如下
private void setHeadAndPropagate (Node
node, int propagate) {
        Node h = head; // Record old head for check
below
        setHead(node);
        /*
         * Try to signal next queued node if:
         * Propagation was indicated by caller,
         * or was recorded (as h.waitStatus) by a previous operation
         * (note: this uses sign-check of waitStatus because
         * PROPAGATE status may transition to SIGNAL.)
         * and
         * The next node is waiting in shared mode,
         * or we don’t know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate
> 0 || h == null || h.waitStatus < 0)
{
            Node s = node.next;
            if (s
== null || s.isShared())
                doReleaseShared();
        }
    }
主要操作是把這個節點設為頭節點(成為頭節點,則表示不在等待佇列中,因為獲取鎖成功了),同時釋放鎖(doReleaseShared)。
下面來看doReleaseShared的實現
  private void doReleaseShared()
{
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases. This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;)
{
            Node h = head;
            if (h
!= null && h != tail) {
                int ws
= h.waitStatus;
                if (ws
== Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h,
Node.SIGNAL, 0))
                        continue ; //
loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws
== 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue ; //
loop on failed CAS
            }
            if (h
== head) // loop if head changed
                break ;
        }
    }
 
把頭節點的waitStatus這隻為0或者Node.PROPAGATE,並且喚醒下一個執行緒,然後就結束了。
總結一下,就是一個執行緒在獲取讀鎖後,會喚醒鎖的等待佇列中的第一個執行緒。如果這個被喚醒的執行緒是在獲取讀鎖時被阻塞的,那麼被喚醒後,就會在for迴圈中,又執行到setHeadAndPropagate,這樣就實現了讀鎖獲取時的傳遞喚醒。這種傳遞在遇到一個因為獲取寫鎖被阻塞的執行緒節點時被終止。
下面通過程式碼來理解這種等待和執行緒喚醒順序。
package lynn.lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestThread extends Thread
{
    private ReentrantReadWriteLock lock;
    private String threadName;
    private boolean isWriter ;
    public TestThread(ReentrantReadWriteLock
lock, String name, boolean isWriter)
{
        this.lock =
lock;
        this.threadName =
name;
        this.isWriter =
isWriter;
    }
    @Override
    public void run()
{
        while (true )
{
            try {
                if (isWriter )
{
                    lock.writeLock().lock();
                } else {
                    lock.readLock().lock();
                }
                if (isWriter )
{
                    Thread. sleep(3000);
                    System. out.println(“—————————-” );
                }
                System. out.println(System.currentTimeMillis()
 “:”   threadName );
                if (isWriter )
{
                    Thread. sleep(3000);
                    System. out.println(“—————————–” );
                }
            } catch (Exception
e) {
                e.printStackTrace();
            } finally {
                if (isWriter )
{
                    lock.writeLock().unlock();
                } else {
                    lock.readLock().unlock();
                }
            }
            break;
        }
    }
}
TestThread是一個自定義的執行緒類,在生成執行緒的時候,需要傳遞一個可重入的讀寫鎖物件進去,執行緒在執行時會先加鎖,然後進行內容輸出,然後釋放鎖。如果傳遞的是寫鎖,那執行緒在輸出結果前後會先沉睡3秒,便於區分輸出的結果時間。
package lynn.lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Main
{
    public static void blockByWriteLock()
{
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        lock.writeLock().lock();
        TestThread[] threads = new TestThread[10];
        for (int i
= 0; i < 10; i ) {
            boolean isWriter
= (i 1) % 4 == 0 ? true : false;
            TestThread thread = new TestThread(lock, “thread-” 
(i 1), isWriter);
            threads[i] = thread;
        }
        for (int i
= 0; i < threads.length; i ) {
            threads[i].start();
        }
        System. out.println(System.currentTimeMillis()
 “: block by write lock”);
        try {
            Thread. sleep(3000);
        } catch (Exception
e) {
            e.printStackTrace();
        }
        lock.writeLock().unlock();
    }
    public static void main(String[]
args) {
        blockByWriteLock();
    }
}
 
在Main中構造了10個執行緒,由於這個鎖一開始是被主執行緒擁有,並且是在排它狀態下加鎖的,所以我們構造的10個執行緒,在一開始執行便是按照其編號從小到大在等待佇列中(1到10)。然後主執行緒列印結果,等待3秒後釋放鎖。由於前3個執行緒,編號1到3是處於共享狀態阻塞的,而第4個執行緒是處於排它狀態阻塞,所以,按照上面的喚醒順序,喚醒傳遞到第4個執行緒時就結束。
依次類推,理論上的列印順序是 :主執行緒 [1,2,3]  4  [5,6,7] 8 [9,10]
從下面的執行結果來看,也是符合我們的預期的。

     
6、讀執行緒之間的喚醒
     
     如果一個執行緒在共享模式下獲取了鎖狀態,這個時候,它是否要喚醒其它在共享模式下等待在該鎖上的執行緒?
     由於多個執行緒可以同時獲取共享鎖而不相互影響,所以,當一個執行緒在共享狀態下獲取了鎖之後,理論上是可以喚醒其它在共享狀態下等待該鎖的執行緒。但如果這個時候,在這個等待佇列中,既有共享狀態的執行緒,同時又有排它狀態的執行緒,這個時候又該如何喚醒?
     實際上對於鎖來說,在共享狀態下,一個執行緒無論是獲取還是釋放鎖的時候,都會試著去喚醒下一個等待在這個鎖上的節點(通過上面的doAcquireShared程式碼能看出)。如果下一個執行緒也是處於共享狀態等待在鎖上,那麼這個執行緒就會被喚醒,然後接著試著去喚醒下一個等待在這個鎖上的執行緒,這種喚醒動作會一直持續下去,直到遇到一個在排它狀態下阻塞在這個鎖上的執行緒,或者等待佇列全部被釋放為止。
     因為執行緒是在一個FIFO的等待佇列中,所以,這這樣一個一個往後傳遞,就能保證喚醒被傳遞下去。
參考資料   http://www.liuinsect.com/2014/09/04/jdk1-8-abstractqueuedsynchronizer-part2/