如何中止迭代Java Stream forEach
過濾Java中的可選流
Java 8並行流中的自定義線程池
合併Java流
map()和flatMap()之間的區別
Java流的字符串操作
如何使用索引迭代流
Java中將迭代器轉變爲流
如何在Java中獲取流的最後一個元素?
將字符串轉換為字符流
Java中的“流已被操作或關閉”異常
Java 8和無限流
如何向流中添加單個元素
Java 8中的原始類型流
在Java Stream API中有所不同
Java 9 Stream API的改進
Java Spliterator簡介
如何在Java 8流中使用if / else邏輯
Java 8謂詞鏈
具有Lambda表達式的Java流過濾器
用Java流求和
Java 8 Streams peek() API
與Map一起使用流
Java 8並行流中的自定義線程池
1.概述
Java 8引入了S treams
的概念, treams
是對數據執行批量操作的有效方法。並且可以在支持並發的環境中獲得併行Streams
。
這些流可以帶來更高的性能-以多線程開銷為代價。
在本快速教程中,我們將研究**Stream
API的最大局限之一,**並了解如何使並行流與自定義ThreadPool
實例一起使用-或者有一個庫來處理這個問題。
2.並行Stream
讓我們從一個簡單的示例開始-在任何Collection
類型上調用parallelStream
方法-這將返回一個可能的並行Stream
:
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
在此類Stream
中發生的默認處理使用ForkJoinPool.commonPool(),
這是整個應用程序共享的Thread Pool
。
3.自定義Thread Pool
實際上,在處理stream
時,我們可以傳遞自定義ThreadPool
。
下面的示例讓並行Stream
使用自定義Thread Pool
來計算1到1,000,000(含)之間的長值之和:
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
我們使用了並行度為4的ForkJoinPool
構造函數。需要進行一些實驗才能確定不同環境的最佳值,但是一個很好的經驗法則是根據CPU的核心數量來選擇數字。
接下來,我們處理了並行Stream
的內容,並在reduce
調用中將其匯總。
這個簡單的示例可能無法展示使用自定義Thread Pool
的全部用處,但是在我們不希望將通用Thread Pool
與長期運行的任務捆綁在一起的情況下(例如處理來自網絡源的數據),其好處顯而易見。 ,或者應用程序中的其他組件正在使用公共Thread Pool
。
4。結論
我們已經簡要介紹瞭如何使用自定義Thread Pool
運行並行Stream
。在正確的環境中,並通過適當使用並行度,在某些情況下可以提高性能。
可以在Github上找到本文引用的完整代碼示例。