Java 8並行流中的自定義線程池

1.概述

Java 8引入了S treams的概念, treams是對數據執行批量操作的有效方法。並且可以在支持並發的環境中獲得併行Streams

這些流可以帶來更高的性能-以多線程開銷為代價。

在本快速教程中,我們將研究**Stream API的最大局限之一,**並了解如何使並行流與自定義ThreadPool實例一起使用-或者有一個庫來處理這個問題

2.並行Stream

讓我們從一個簡單的示例開始-在任何Collection類型上調用parallelStream方法-這將返回一個可能的並行Stream

@Test

 public void givenList_whenCallingParallelStream_shouldBeParallelStream(){

 List<Long> aList = new ArrayList<>();

 Stream<Long> parallelStream = aList.parallelStream();



 assertTrue(parallelStream.isParallel());

 }

在此類Stream中發生的默認處理使用ForkJoinPool.commonPool(),這是整個應用程序共享的Thread Pool

3.自定義Thread Pool

實際上,在處理stream時,我們可以傳遞自定義ThreadPool

下面的示例讓並行Stream使用自定義Thread Pool來計算1到1,000,000(含)之間的長值之和:

@Test

 public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()

 throws InterruptedException, ExecutionException {



 long firstNum = 1;

 long lastNum = 1_000_000;



 List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()

 .collect(Collectors.toList());



 ForkJoinPool customThreadPool = new ForkJoinPool(4);

 long actualTotal = customThreadPool.submit(

 () -> aList.parallelStream().reduce(0L, Long::sum)).get();



 assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);

 }

我們使用了並行度為4的ForkJoinPool構造函數。需要進行一些實驗才能確定不同環境的最佳值,但是一個很好的經驗法則是根據CPU的核心數量來選擇數字。

接下來,我們處理了並行Stream的內容,並在reduce調用中將其匯總。

這個簡單的示例可能無法展示使用自定義Thread Pool的全部用處,但是在我們不希望將通用Thread Pool與長期運行的任務捆綁在一起的情況下(例如處理來自網絡源的數據),其好處顯而易見。 ,或者應用程序中的其他組件正在使用公共Thread Pool

4。結論

我們已經簡要介紹瞭如何使用自定義Thread Pool運行並行Stream 。在正確的環境中,並通過適當使用並行度,在某些情況下可以提高性能。

可以在Github上找到本文引用的完整代碼示例。