ConcurrentHashMap原理淺析

NO IMAGE

1. 前言

為什麼要使用 ConcurrentHashMap

主要基於兩個原因:

  1. 在併發編程中使用 HashMap 可能造成死循環(jdk1.7,jdk1.8 中會造成數據丟失)
  2. HashTable 效率非常低下

2. ConcurrentHashMap 結構

jdk 1.7 和 jdk 1.8 中,ConcurrentHashMap 的結構有著很大的變化,後面會講解。

2.1 jdk 1.7 中結構

ConcurrentHashMap原理淺析

在 jdk 1.7 中,ConcurrentHashMap 是由 Segment 數據結構和 HashEntry 數組結構構成。採取分段鎖來保證安全性。

Segment 是 ReentrantLock 重入鎖,在 ConcurrentHashMap 中扮演鎖的角色;HashEntry 則用於存儲鍵值對數據。

一個 ConcurrentHashMap 裡包含一個 Segment 數組,一個 Segment 裡包含一個 HashEntry 數組,Segment 的結構和 HashMap 類似,是一個數組和鏈表結構。

2.2 jdk 1.8 中結構

ConcurrentHashMap原理淺析

JDK1.8 的實現已經摒棄了 Segment 的概念,而是直接用 Node 數組+鏈表+紅黑樹的數據結構來實現,併發控制使用 Synchronized 和 CAS 來操作,整個看起來就像是優化過且線程安全的 HashMap,雖然在 JDK1.8 中還能看到 Segment 的數據結構,但是已經簡化了屬性,只是為了兼容舊版本。

3. 實現

3.1 JDK 1.7 中的實現

3.1.1 初始化

ConcurrentHashMap 的初始化是通過位運算來初始化 Segment 的大小的(ssize 表示),通過concurrentLevel 計算得出。

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}

ssize 用位於運算來計算(ssize <<=1),所以 Segment 的大小取值都是以2的N次方,Segment 的大小 ssize 默認為16.

每一個 Segment 元素下的 HashEntry 的初始化也是按照位於運算來計算,用 cap 來表示

int cap = 1;
while (cap < c)
cap <<= 1;

HashEntry 大小的計算也是2的N次方(cap <<=1), cap 的初始值為1,所以 HashEntry 最小的容量為2.

3.1.2 get 操作

Segment 的 get 操作實現非常簡單和高效,先經過一次再散列,然後使用這個散列值通過散列運算定位到 Segment,再通過散列算法定位到元素。

public V get(Object key){
int hash = hash(key.hashCode());
return segmentFor(hash).get(key,hash);
}

get 操作的高效之處在於整個 get 過程都不需要加鎖,除非讀到空的值才會加鎖重讀。原因就是將使用的共享變量定義成 volatile 類型。

transient volatile int count;
volatile V value;

3.1.3 put 操作

對於 ConcurrentHashMap 的數據插入,這裡要進行兩次 Hash 去定位數據的存儲位置

static class Segment<K,V> extends ReentrantLock implements Serializable {
//省略
}

當執行put操作時,會經歷兩個步驟:

  1. 判斷是否需要擴容
  2. 定位到添加元素的位置,將其放入 HashEntry 數組中

插入過程會進行第一次 key 的 hash 來定位 Segment 的位置,如果該 Segment 還沒有初始化,即通過 CAS 操作進行賦值,然後進行第二次 hash 操作,找到相應的 HashEntry 的位置,這裡會利用繼承過來的鎖的特性,在將數據插入指定的 HashEntry 位置時(尾插法),會通過繼承 ReentrantLock 的 tryLock() 方法嘗試去獲取鎖,如果獲取成功就直接插入相應的位置,如果已經有線程獲取該Segment的鎖,那當前線程會以自旋的方式去繼續的調用 tryLock() 方法去獲取鎖,超過指定次數就掛起,等待喚醒。

3.1.4 size 操作

計算 ConcurrentHashMap 的元素大小是併發操作的,就是在你計算 size 的時候,他還在併發的插入數據,這就可能會導致你計算出來的 size 和你實際的 size 有相差。

ConcurrentHashMap 採取的解決方法是先嚐試 2 次通過不鎖住 Segment 的方式來統計各個 Segment 大小,統計過程中如果 count 發生變化,則再採用加鎖的方式來統計所有 Segment 的大小。

try {
for (; ; ) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation  
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K, V> seg = segmentAt(segments, j);
if (seg != null) {
/* 在put、remove、clean方法裡操作
* 元素都會將變量modCount進行加一,
* 統計也是依靠這個變量的前後變化來進行的 */
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0) overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}

3.2 JDK 1.8 中的實現

3.2.1 基本屬性及概念

看一下基本屬性

//node數組最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默認初始值,必須是2的冪數
private static final int DEFAULT_CAPACITY = 16;
//數組可能最大值,需要與toArray()相關方法關聯
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//併發級別,遺留下來的,為兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//負載因子
private static final float LOAD_FACTOR = 0.75f;
//鏈表轉紅黑樹閥值,> 8 鏈表轉換為紅黑樹
static final int TREEIFY_THRESHOLD = 8;
//樹轉鏈表閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
//2^15-1,help resize的最大線程數
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//32-16=16,sizeCtl中記錄size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//forwarding nodes的hash值
static final int MOVED = -1;
//樹根節點的hash值
static final int TREEBIN = -2;
//ReservationNode的hash值
static final int RESERVED = -3;
//可用處理器數量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數組
transient volatile Node<K,V>[] table;
/*控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
*當為負數時:-1代表正在初始化,-N代表有N-1個線程正在 進行擴容
*當為0時:代表當時的table還沒有被初始化
*當為正數時:表示初始化或者下一次進行擴容的大小
*/
private transient volatile int sizeCtl;

重要概念:

  1. table: 默認為null,初始化發生在第一次插入操作,默認大小為16的數組,用來存儲Node節點數據,擴容時大小總是2的冪次方
  2. nextTable: 默認為null,擴容時新生成的數組,其大小為原數組的兩倍
  3. Node :保存 key,value 及 key 的 hash 值的數據結構。
class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
//省略部分代碼
}

其中 value 和 next 都用 volatile 修飾,保證併發的可見性。

  1. ForwardingNode: 一個特殊的 Node 節點,hash 值為 -1,其中存儲 nextTable 的引用。
final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}

只有table發生擴容的時候,ForwardingNode 才會發揮作用,作為一個佔位符放在table中表示當前節點為 null 或則已經被移動。

  1. TreeNode類和TreeBin類:  TreeNode類表示的是紅黑樹上的每個節點。當一個鏈表上的節點數量超過了指定的值,會將這個鏈表變為紅黑樹,當然每個節點就轉換為TreeNode。不像HashMap,ConcurrentHashMap在桶裡面直接存儲的不是TreeNode,而是一個TreeBin,在TreeBin內部維護一個紅黑樹,也就是說TreeNode在TreeBin內部使用的。

3.2.2 初始化

實例化 ConcurrentHashMap 時帶參數時,會根據參數調整 table 的大小,假設參數為 100,最終會調整成 256,確保 table 的大小總是2的冪次方.

table 初始化

private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果一個線程發現sizeCtl<0,意味著另外的線程執行CAS操作成功,當前線程只需要讓出cpu時間片
if ((sc = sizeCtl) < 0) 
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

3.2.3 put 操作

假設 table 已經初始化完成,put 操作採用 CAS + synchronized 實現併發插入或更新操作。

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;                   // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
...省略部分代碼
}
addCount(1L, binCount);
return null;
}

hash算法

static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

table 中定位索引位置,n 是 table 的大小

int index = (n - 1) & hash

獲取 table 中對應索引的元素f

Unsafe.getObjectVolatile 可以直接獲取指定內存的數據,保證了每次拿到數據都是最新的。

如果 f 為 null,說明 table 中這個位置第一次插入元素,利用Unsafe.compareAndSwapObject 方法插入 Node 節點。

如果 CAS 成功,說明 Node 節點已經插入,隨後 addCount(1L, binCount) 方法會檢查當前容量是否需要進行擴容。

如果 CAS 失敗,說明有其它線程提前插入了節點,自旋重新嘗試在這個位置插入節點。

如果f的 hash 值為 -1,說明當前 f 是 ForwardingNode 節點,意味有其它線程正在擴容,則一起進行擴容操作。

其餘情況把新的 Node 節點按鏈表或紅黑樹的方式插入到合適的位置,這個過程採用同步內置鎖實現併發,代碼如下:

synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

在節點 f 上進行同步,節點插入之前,再次利用tabAt(tab, i) == f判斷,防止被其它線程修改。

如果 f.hash >= 0,說明 f 是鏈表結構的頭結點,遍歷鏈表,如果找到對應的 node 節點,則修改 value,否則在鏈表尾部加入節點。
如果 f 是 TreeBin 類型節點,說明 f 是紅黑樹根節點,則在樹結構上遍歷元素,更新或增加節點。
如果鏈表中節點數 binCount >= TREEIFY_THRESHOLD(默認是8),則把鏈表轉化為紅黑樹結構。

table擴容

當 table 容量不足的時候,即 table 的元素數量達到容量閾值 sizeCtl,需要對 table 進行擴容。

整個擴容分為兩部分:

構建一個 nextTable,大小為 table 的兩倍。
把 table 的數據複製到 nextTable 中。

這兩個過程在單線程下實現很簡單,但是 ConcurrentHashMap 是支持併發插入的,擴容操作自然也會有併發的出現,這種情況下,第二步可以支持節點的併發複製,這樣性能自然提升不少,但實現的複雜度也上升了一個臺階。

先看第一步,構建nextTable,毫無疑問,這個過程只能只有單個線程進行 nextTable 的初始化,具體實現如下:

private final void addCount(long x, int check) {
... 省略部分代碼
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

通過 Unsafe.compareAndSwapInt 修改 sizeCtl 值,保證只有一個線程能夠初始化 nextTable,擴容後的數組長度為原來的兩倍,但是容量是原來的 1.5

節點從 table 移動到 nextTable,大體思想是遍歷、複製的過程。

首先根據運算得到需要遍歷的次數i,然後利用 tabAt 方法獲得 i 位置的元素 f,初始化一個 forwardNode 實例 fwd。

如果 f == null,則在 table 中的 i 位置放入 fwd,這個過程是採用 Unsafe.compareAndSwapObjectf 方法實現的,很巧妙的實現了節點的併發移動。

如果 f 是鏈表的頭節點,就構造一個反序鏈表,把他們分別放在 nextTable 的 i 和 i+n 的位置上,移動完成,採用 Unsafe.putObjectVolatile 方法給 table 原位置賦值 fwd。
如果 f 是 TreeBin 節點,也做一個反序處理,並判斷是否需要 untreeify,把處理的結果分別放在 nextTable 的 i 和 i+n 的位置上,移動完成,同樣採用 Unsafe.putObjectVolatile 方法給 table 原位置賦值 fwd。
遍歷過所有的節點以後就完成了複製工作,把 table 指向 nextTable,並更新 sizeCtl 為新數組大小的 0.75 倍 ,擴容完成。

紅黑樹構造

注意:如果鏈表結構中元素超過 TREEIFY_THRESHOLD 閾值,默認為 8 個,則把鏈表轉化為紅黑樹,提高遍歷查詢效率。

if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}

接下來我們看看如何構造樹結構,代碼如下:

private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

可以看出,生成樹節點的代碼塊是同步的,進入同步代碼塊之後,再次驗證 table 中 index 位置元素是否被修改過。

  1. 根據 table 中 index 位置 Node 鏈表,重新生成一個 hd 為頭結點的 TreeNode 鏈表。
  2. 根據 hd 頭結點,生成 TreeBin 樹結構,並把樹結構的root節點寫到 table 的 index 位置的內存中,具體實現如下:
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}

主要根據 Node 節點的 hash 值大小構建二叉樹。

3.2.4 get 操作

get操作和put操作相比,顯得簡單了許多。

public V get(Object key) {
Node<K,V>[] tab; 
Node<K,V> e, p;
int n, eh; 
K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
  1. 判斷table是否為空,如果為空,直接返回null。
  2. 計算key的hash值,並獲取指定table中指定位置的Node節點,通過遍歷鏈表或則樹結構找到對應的節點,返回value值。

3.2.4 size 操作

JDK1.8 size 是通過對 baseCount 和 counterCell 進行 CAS 計算,最終通過 baseCount 和 遍歷 CounterCell 數組得出 size。
具體參考:ConcurrentHashMap 的size方法原理分析

4. JDK 1.8 中為什麼要摒棄分段鎖

很多人不明白為什麼Doug Lea在JDK1.8為什麼要做這麼大變動,使用重級鎖synchronized,性能反而更高,原因如下:

  1. jdk1.8中鎖的粒度更細了。jdk1.7中ConcurrentHashMap 的concurrentLevel(併發數)基本上是固定的。jdk1.8中的concurrentLevel是和數組大小保持一致的,每次擴容,併發度擴大一倍.
  2. 紅黑樹的引入,對鏈表的優化使得 hash 衝突時的 put 和 get 效率更高
  3. 獲得JVM的支持 ,ReentrantLock 畢竟是 API 這個級別的,後續的性能優化空間很小。 synchronized 則是 JVM 直接支持的, JVM 能夠在運行時作出相應的優化措施:鎖粗化、鎖消除、鎖自旋等等。這就使得 synchronized 能夠隨著 JDK 版本的升級而不改動代碼的前提下獲得性能上的提升。

5. 小結&參考資料

小結

可以看出 JDK1.8 版本的 ConcurrentHashMap 的數據結構已經接近 HashMap,相對而言,ConcurrentHashMap 只是增加了同步的操作來控制併發,從 JDK1.7 版本的 ReentrantLock+Segment+HashEntry,到 JDK1.8 版本中synchronized+CAS+HashEntry+紅黑樹,優化確實很大。

參考資料

相關文章

「從模板消息改版訂閱消息」小程序推送

【Oracle學習06】DML與併發性,UNDO,死鎖

瀏覽器進程架構的演化

c站主站重構總結,fre強力驅動