Java併發fork-join框架

fork-join框架允許在幾個工作進程中斷某個任務,然後等待結果組合它們。 它在很大程度上利用了多處理器機器的生產能力。 以下是fork-join框架中使用的核心概念和對象。

Fork

Fork是一個進程,其中任務將其分成可以併發執行的較小且獨立的子任務。

語法

Sum left  = new Sum(array, low, mid);
left.fork();

這裏SumRecursiveTask的子類,left.fork()方法將任務分解爲子任務。

Join

連接(Join)是子任務完成執行後任務加入子任務的所有結果的過程,否則它會持續等待。

語法

left.join();

這裏剩下的是Sum類的一個對象。

ForkJoinPool

它是一個特殊的線程池,旨在使用fork-and-join任務拆分。

語法

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

這裏有一個新的ForkJoinPool,並行級別爲4個CPU。

RecursiveAction

RecursiveAction表示不返回任何值的任務。

語法

class Writer extends RecursiveAction {
   @Override
   protected void compute() { }
}

遞歸任務

RecursiveTask表示返回值的任務。

語法

class Sum extends RecursiveTask {
   @Override
   protected Long compute() { return null; }
}

實例

以下TestThread程序顯示了基於線程的環境中Fork-Join框架的使用。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException, ExecutionException {
      int nThreads = Runtime.getRuntime().availableProcessors();
      System.out.println(nThreads);

      int[] numbers = new int[1000]; 

      for(int i=0; i< numbers.length; i++){
         numbers[i] = i;
      }

      ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
      Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
      System.out.println(result);
   }  

   static class Sum extends RecursiveTask<Long> {

      int low;
      int high;
      int[] array;

      Sum(int[] array, int low, int high) {
         this.array = array;
         this.low   = low;
         this.high  = high;
      }

      protected Long compute() {
         if(high - low <= 10) {
            long sum = 0;
            for(int i=low; i < high; ++i) 
               sum += array[i];
               return sum;
         } else {            
            int mid = low + (high - low) / 2;
            Sum left  = new Sum(array, low, mid);
            Sum right = new Sum(array, mid, high);
            left.fork();
            long rightResult = right.compute();
            long leftResult  = left.join();
            return leftResult + rightResult;
         }
      }
   }
}

這將產生以下結果 -

4
499500