淺談ForkJoinPool

NO IMAGE

ForkJoinPool是什麼?

談到線程池,很多人會想到Executors提供的一些預設的線程池,比如單線程線程池SingleThreadExecutor,固定大小的線程池FixedThreadPool,但是很少有人會注意到其中還提供了一種特殊的線程池:WorkStealingPool,我們點進這個方法,會看到和其他方法不同的是,這種線程池並不是通過ThreadPoolExecutor來創建的,而是ForkJoinPool來創建的:

    public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

這兩種線程池之間並不是繼承關係,而是平級關係:

淺談ForkJoinPool

ThreadPoolExecutor應該都很瞭解了,就是一個基本的存儲線程的線程池,需要執行任務的時候就從線程池中拿一個線程來執行。而ForkJoinPool則不僅僅是這麼簡單,同樣也不是ThreadPoolExecutor的代替品,這種線程池是為了實現“分治法”這一思想而創建的,通過把大任務拆分成小任務,然後再把小任務的結果彙總起來就是最終的結果,和MapReduce的思想很類似

舉個例子,我們要統計1-100的累加和,如果使用ForkJoinPool來實現的話,就可以將1-100每5位劃分一段,劃分出20段,當作20個任務,每個任務只計算自己區間內的結果,最後將這20個任務的結果彙總起來就是1-100的累加和

ForkJoinPool怎麼使用?

ForkJoinPool的本質就是兩點:

  1. 如果任務很小:直接計算得出結果
  2. 如果任務很大
    • 拆分成N個子任務
    • 調用子任務的fork()進行計算
    • 調用子任務的join()合併結果

接來下我們來做一個1-100的累加例子:

  1. 首先定義我們需要執行的任務:
class Task extends RecursiveTask<Integer> {
private int start;
private int end;
private int mid;
public Task(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start < 6) {
// 當任務很小時,直接進行計算
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println(Thread.currentThread().getName() + " count sum: " + sum);
} else {
// 否則,將任務進行拆分
mid = (end - start) / 2 + start;
Task left = new Task(start, mid);
Task right = new Task(mid + 1, end);
// 執行上一步拆分的子任務
left.fork();
right.fork();
// 拿到子任務的執行結果
sum += left.join();
sum += right.join();
}
return sum;
}
}

這裡的RecursiveTaskForkJoinTask的子類,ForkJoinTask又是Future的子類,不瞭解Future類的可以認為是一個異步執行,並且可以有返回值的Runnable類

我們首先在Task類中定義了任務需要的一些數據,比如開始位置和結束位置。重點是其中的compute方法,在其中實現了我們剛才說到的步驟,如果任務很小(通過任務數據來判斷),就進行計算,否則將任務拆分,使用fork()執行,並通過join()拿到計算結果

  1. 將任務提交到線程池

剛才我們定義了任務類,接下來就需要把這個任務提交到線程池:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Task countTask = new Task(1, 100);
ForkJoinTask<Integer> result = forkJoinPool.submit(countTask);
System.out.println("result: " + result.get());
forkJoinPool.shutdown();
}

注意,這裡ForkJoinPool初始化可以傳入一個並行參數,如果不傳入該參數的話會默認使用處理器個數來作為並行參數

創建任務對象和線程池之後,使用submit方法來提交任務,該方法會返回一個ForkJoinTask<T>類型的對象,調用其get方法即可拿到執行結果

同時要注意,該線程池也需要調用shutdown方法來關閉

ForkJoinPool的原理

ForkJoinPool中有三個重要角色:

  • ForkJoinWorkerThread:工作線程,在內部對Thread進行的封裝
  • WorkQueue:任務隊列
  • ForkJoinTask:任務,繼承自Future,在含義上分為submission和task兩種

在線程池中,任務隊列使用數組來保存,其中保存了所有提交進來的任務:

  1. 奇數位置保存submission
  2. 偶數位置保存task

submission指的是本地提交的任務,如submit、execute提交的任務;而task則是通過fork方法添加的子任務。這兩種任務僅僅在含義上有所區別,所以一同保存在任務隊列中,通過位置進行區分

ForkJoinPool的核心

想理解ForkJoinPool的原理,就要理解其核心,一共有兩點,其一是分治法,其二就是工作竊取算法。分治法相信就不用多說了,就是通過把大任務拆分成小任務來提高併發度。重點要說的就是工作竊取算法,該算法的原理:

所有線程均嘗試找到並執行已提交的任務,或是通過其他任務創建的子任務

依賴於這種特性,來儘量避免一個線程執行完自己的任務後“無所事事”的情況。同時,竊取順序是FIFO的

相關文章

淺談Spring事務中的7種傳播特性

隨便分享點不那麼常規的面試題(二)

隨便分享點不那麼常規的面試題(一)

淺談零拷貝機制