Java併發fork-join框架
fork-join
框架允許在幾個工作進程中斷某個任務,然後等待結果組合它們。 它在很大程度上利用了多處理器機器的生產能力。 以下是fork-join
框架中使用的核心概念和對象。
Fork
Fork是一個進程,其中任務將其分成可以併發執行的較小且獨立的子任務。
語法
Sum left = new Sum(array, low, mid);
left.fork();
這裏Sum
是RecursiveTask
的子類,left.fork()
方法將任務分解爲子任務。
Join
連接(Join
)是子任務完成執行後任務加入子任務的所有結果的過程,否則它會持續等待。
語法
left.join();
這裏剩下的是Sum
類的一個對象。
ForkJoinPool
它是一個特殊的線程池,旨在使用fork-and-join
任務拆分。
語法
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
這裏有一個新的ForkJoinPool
,並行級別爲4
個CPU。
RecursiveAction
RecursiveAction
表示不返回任何值的任務。
語法
class Writer extends RecursiveAction {
@Override
protected void compute() { }
}
遞歸任務
RecursiveTask
表示返回值的任務。
語法
class Sum extends RecursiveTask {
@Override
protected Long compute() { return null; }
}
實例
以下TestThread
程序顯示了基於線程的環境中Fork-Join
框架的使用。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException, ExecutionException {
int nThreads = Runtime.getRuntime().availableProcessors();
System.out.println(nThreads);
int[] numbers = new int[1000];
for(int i=0; i< numbers.length; i++){
numbers[i] = i;
}
ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
System.out.println(result);
}
static class Sum extends RecursiveTask<Long> {
int low;
int high;
int[] array;
Sum(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
protected Long compute() {
if(high - low <= 10) {
long sum = 0;
for(int i=low; i < high; ++i)
sum += array[i];
return sum;
} else {
int mid = low + (high - low) / 2;
Sum left = new Sum(array, low, mid);
Sum right = new Sum(array, mid, high);
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
return leftResult + rightResult;
}
}
}
}
這將產生以下結果 -
4
499500