java多線程系列:ThreadPoolExecutor源碼分析

NO IMAGE

前言

這篇主要講述ThreadPoolExecutor的源碼分析,貫穿類的創建、任務的添加到線程池的關閉整個流程,讓你知其然所以然。希望你可以通過本篇博文知道ThreadPoolExecutor是怎麼添加任務、執行任務的,以及延伸的知識點。那麼先來看看ThreadPoolExecutor的繼承關係吧。

繼承關係

java多線程系列:ThreadPoolExecutor源碼分析

Executor接口

public interface Executor {
void execute(Runnable command);
}

Executor接口只有一個方法execute,傳入線程任務參數

ExecutorService接口

public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService接口繼承Executor接口,並增加了submit、shutdown、invokeAll等等一系列方法。

AbstractExecutorService抽象類

public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {...}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {... }
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {...}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {...}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {...}
}

AbstractExecutorService抽象類實現ExecutorService接口,並且提供了一些方法的默認實現,例如submit方法、invokeAny方法、invokeAll方法。

像execute方法、線程池的關閉方法(shutdown、shutdownNow等等)就沒有提供默認的實現。

ThreadPoolExecutor

先介紹下ThreadPoolExecutor線程池的狀態吧

線程池狀態

int 是4個字節,也就是32位(注:一個字節等於8位

//記錄線程池狀態和線程數量(總共32位,前三位表示線程池狀態,後29位表示線程數量)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程數量統計位數29  Integer.SIZE=32 
private static final int COUNT_BITS = Integer.SIZE - 3;
//容量 000 11111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//運行中 111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
//關閉 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//停止 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
//整理 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
//終止 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;
//獲取運行狀態(獲取前3位)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//獲取線程個數(獲取後29位)
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • RUNNING:接受新任務並且處理阻塞隊列裡的任務
  • SHUTDOWN:拒絕新任務但是處理阻塞隊列裡的任務
  • STOP:拒絕新任務並且拋棄阻塞隊列裡的任務同時會中斷正在處理的任務
  • TIDYING:所有任務都執行完(包含阻塞隊列裡面任務)當前線程池活動線程為0,將要調用terminated方法
  • TERMINATED:終止狀態。terminated方法調用完成以後的狀態

線程池狀態轉換

RUNNING -> SHUTDOWN
顯式調用shutdown()方法, 或者隱式調用了finalize()方法
(RUNNING or SHUTDOWN) -> STOP
顯式調用shutdownNow()方法
SHUTDOWN -> TIDYING
當線程池和任務隊列都為空的時候
STOP -> TIDYING
當線程池為空的時候
TIDYING -> TERMINATED
當 terminated() hook 方法執行完成時候

構造函數

有四個構造函數,其他三個都是調用下面代碼中的這個構造函數

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}

參數介紹

參數類型含義
corePoolSizeint核心線程數
maximumPoolSizeint最大線程數
keepAliveTimelong存活時間
unitTimeUnit時間單位
workQueueBlockingQueue存放線程的隊列
threadFactoryThreadFactory創建線程的工廠
handlerRejectedExecutionHandler多餘的的線程處理器(拒絕策略)

提交任務

submit

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

流程步驟如下

  1. 調用submit方法,傳入Runnable或者Callable對象
  2. 判斷傳入的對象是否為null,為null則拋出異常,不為null繼續流程
  3. 將傳入的對象轉換為RunnableFuture對象
  4. 執行execute方法,傳入RunnableFuture對象
  5. 返回RunnableFuture對象

流程圖如下

java多線程系列:ThreadPoolExecutor源碼分析

execute

public void execute(Runnable command) {
//傳進來的線程為null,則拋出空指針異常
if (command == null)
throw new NullPointerException();
//獲取當前線程池的狀態+線程個數變量
int c = ctl.get();
/**
* 3個步驟
*/
//1.判斷當前線程池線程個數是否小於corePoolSize,小於則調用addWorker方法創建新線程運行,且傳進來的Runnable當做第一個任務執行。
//如果調用addWorker方法返回false,則直接返回
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.如果線程池處於RUNNING狀態,則添加任務到阻塞隊列
if (isRunning(c) && workQueue.offer(command)) {
//二次檢查
int recheck = ctl.get();
//如果當前線程池狀態不是RUNNING則從隊列刪除任務,並執行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
//否者如果當前線程池線程空,則添加一個線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3.新增線程,新增失敗則執行拒絕策略
else if (!addWorker(command, false))
reject(command);
}

其實從上面代碼註釋中可以看出就三個判斷,

  1. 核心線程數是否已滿
  2. 隊列是否已滿
  3. 線程池是否已滿

然後根據這三個條件進行不同的操作,下圖是Java併發編程的藝術書中的線程池的主要處理流程,或許會比較容易理解些

java多線程系列:ThreadPoolExecutor源碼分析

下面是整個流程的詳細步驟

  1. 調用execute方法,傳入Runable對象
  2. 判斷傳入的對象是否為null,為null則拋出異常,不為null繼續流程
  3. 獲取當前線程池的狀態和線程個數變量
  4. 判斷當前線程數是否小於核心線程數,是走流程5,否則走流程6
  5. 添加線程數,添加成功則結束,失敗則重新獲取當前線程池的狀態和線程個數變量,
  6. 判斷線程池是否處於RUNNING狀態,是則添加任務到阻塞隊列,否則走流程10,添加任務成功則繼續流程7
  7. 重新獲取當前線程池的狀態和線程個數變量
  8. 重新檢查線程池狀態,不是運行狀態則移除之前添加的任務,有一個false走流程9,都為true則走流程11
  9. 檢查線程池線程數量是否為0,否則結束流程,是調用addWorker(null, false),然後結束
  10. 調用!addWorker(command, false),為true走流程11,false則結束
  11. 調用拒絕策略reject(command),結束

可能看上面會有點繞,不清楚的可以看下面的流程圖

java多線程系列:ThreadPoolExecutor源碼分析

##### addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查當前線程池狀態是否是SHUTDOWN、STOP、TIDYING或者TERMINATED
// 且!(當前狀態為SHUTDOWN、且傳入的任務為null,且隊列不為null)
// 條件都成立則返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//循環
for (;;) {
int wc = workerCountOf(c);
//如果當前的線程數量超過最大容量或者大於(根據傳入的core決定是核心線程數還是最大線程數)核心線程數 || 最大線程數,則返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加c,成功則跳出retry
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS失敗執行下面方法,查看當前線程數是否變化,變化則繼續retry循環,沒變化則繼續內部循環
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//CAS成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一個線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//重新檢查線程池狀態
//避免ThreadFactory退出故障或者在鎖獲取前線程池被關閉
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 先檢查線程是否是可啟動的
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//判斷worker是否添加成功,成功則啟動線程,然後將workerStarted設置為true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//判斷線程有沒有啟動成功,沒有則調用addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

這裡可以將addWorker分為兩部分,第一部分增加線程池個數,第二部分是將任務添加到workder裡面並執行。

第一部分主要是兩個循環,外層循環主要是判斷線程池狀態,下面描述來自Java中線程池ThreadPoolExecutor原理探究

rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())

展開!運算後等價於

s >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty())

也就是說下面幾種情況下會返回false:

  • 當前線程池狀態為STOP,TIDYING,TERMINATED
  • 當前線程池狀態為SHUTDOWN並且已經有了第一個任務
  • 當前線程池狀態為SHUTDOWN並且任務隊列為空

內層循環作用是使用cas增加線程個數,如果線程個數超限則返回false,否者進行cas,cas成功則退出雙循環,否者cas失敗了,要看當前線程池的狀態是否變化了,如果變了,則重新進入外層循環重新獲取線程池狀態,否者進入內層循環繼續進行cas嘗試。

到了第二部分說明CAS成功了,也就是說線程個數加一了,但是現在任務還沒開始執行,這裡使用全局的獨佔鎖來控制workers裡面添加任務,其實也可以使用併發安全的set,但是性能沒有獨佔鎖好(這個從註釋中知道的)。這裡需要注意的是要在獲取鎖後重新檢查線程池的狀態,這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行。

所以這裡也將流程圖分為兩部分來描述

第一部分流程圖

java多線程系列:ThreadPoolExecutor源碼分析

第二部分流程圖

java多線程系列:ThreadPoolExecutor源碼分析

Worker對象

Worker是定義在ThreadPoolExecutor中的finnal類,其中繼承了AbstractQueuedSynchronizer類和實現Runnable接口,其中的run方法如下

public void run() {
runWorker(this);
}

線程啟動時調用了runWorker方法,關於類的其他方面這裡就不在敘述。

runWorker

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//循環獲取任務
while (task != null || (task = getTask()) != null) {
w.lock();
// 當線程池是處於STOP狀態或者TIDYING、TERMINATED狀態時,設置當前線程處於中斷狀態
// 如果不是,當前線程就處於RUNNING或者SHUTDOWN狀態,確保當前線程不處於中斷狀態
// 重新檢查當前線程池的狀態是否大於等於STOP狀態
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//提供給繼承類使用做一些統計之類的事情,在線程運行前調用
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//提供給繼承類使用做一些統計之類的事情,在線程運行之後調用
afterExecute(task, thrown);
}
} finally {
task = null;
//統計當前worker完成了多少個任務
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//整個線程結束時調用,線程退出操作。統計整個線程池完成的任務個數之類的工作
processWorkerExit(w, completedAbruptly);
}
}

getTask

getTask方法的主要作用其實從方法名就可以看出來了,就是獲取任務

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//循環
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//線程線程池狀態和隊列是否為空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//線程數量
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(當前線程數是否大於最大線程數或者)
//且(線程數大於1或者任務隊列為空)
//這裡有個問題(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

關閉線程池

shutdown

當調用shutdown方法時,線程池將不會再接收新的任務,然後將先前放在隊列中的任務執行完成。

下面是shutdown方法的源碼

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

shutdownNow

立即停止所有的執行任務,並將隊列中的任務返回

public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

shutdown和shutdownNow區別

shutdown和shutdownNow這兩個方法的作用都是關閉線程池,流程大致相同,只有幾個步驟不同,如下

  1. 加鎖
  2. 檢查關閉權限
  3. CAS改變線程池狀態
  4. 設置中斷標誌(線程池不在接收任務,隊列任務會完成)/中斷當前執行的線程
  5. 調用onShutdown方法(給子類提供的方法)/獲取隊列中的任務
  6. 解鎖
  7. 嘗試將線程池狀態變成終止狀態TERMINATED
  8. 結束/返回隊列中的任務

總結

線程池可以給我們多線程編碼上提供極大便利,就好像數據庫連接池一樣,減少了線程的開銷,提供了線程的複用。而且ThreadPoolExecutor也提供了一些未實現的方法,供我們來使用,像beforeExecute、afterExecute等方法,我們可以通過這些方法來對線程進行進一步的管理和統計。

在使用線程池上好需要注意,提交的線程任務可以分為CPU 密集型任務IO 密集型任務,然後根據任務的不同進行分配不同的線程數量。

  • CPU密集型任務:
    • 應當分配較少的線程,比如 CPU個數相當的大小
  • IO 密集型任務:
    • 由於線程並不是一直在運行,所以可以儘可能的多配置線程,比如 CPU 個數 * 2
  • 混合型任務:
    • 可以將其拆分為 CPU 密集型任務以及 IO 密集型任務,這樣來分別配置。

好了,這篇博文到這裡就結束了,文中可能會有些紕漏,歡迎留言指正。

如果本文對你有所幫助,給個star唄,謝謝。本文GitHub地址:點這裡點這裡

參考資料

  1. 併發編程網-Java中線程池ThreadPoolExecutor原理探究
  2. Java併發編程的藝術

相關文章

redis系列:通過demo學習list命令

redis系列:通過demo學習hash命令

redis系列:通過demo學習string命令

redis系列:redis介紹與安裝