比較 Java Stream 和 Flux.fromIterable()
1. 簡介
在本教程中,我們將比較 Java Stream
和Flux.fromIterable()
。我們將首先探討它們的相似之處,然後深入探討它們的主要差異。
2. Java Stream
概述
Stream
表示支援函數式操作的元素序列。 Stream
實例是同步的、基於拉的,並且僅在呼叫終端操作時才處理元素。
讓我們看看它的主要特點:
- 支援順序和並行執行
- 使用函數式運算(
map()
、filter()
等) - 惰性執行(應用終端操作時執行)
- 同步和基於拉動
3. Flux.fromIterable()
概述
Flux.fromIterable()
是 Project Reactor 中的工廠方法,它從現有的Iterable
(例如List
、 Set
或Collection
建立Flux
。當我們擁有一組資料並想要對其進行反應式處理時,它很有用。
以下是其主要特點:
- 支援異步和非阻塞處理
- 提供反應式程式設計操作(
map()
、filter()
、flatMap()
等) - 基於推送——數據一可用就會發出
4. Stream
和Flux.fromIterable()
之間的相似之處
儘管Stream
和Flux.fromIterable()
是基於不同的範式建構的,但它們都有一些共同點。讓我們仔細看看這些共同點,並了解它們在資料處理中是如何協調的。
4.1.功能風格
這兩者都支援函數式程式設計風格。我們可以使用filter()
、 map()
和reduce()
等操作,也可以將多個操作連結在一起,以建立一個用於處理資料的聲明性、可讀性的管道。
讓我們看一個例子,我們過濾一串數字以僅保留偶數,然後將其值加倍:
@Test
void givenList_whenProcessedWithStream_thenReturnDoubledEvenNumbers() {
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
List<Integer> doubledEvenNumbers = numbers.stream()
.filter(n -> n % 2 == 0)
.map(n -> n * n)
.toList();
assertEquals(List.of(4, 16), doubledEvenNumbers);
}
現在,讓我們使用Flux.fromIterable()
執行相同的範例:
@Test
void givenList_whenProcessedWithFlux_thenReturnDoubledEvenNumbers() {
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
Flux<Integer> fluxPipeline = Flux.fromIterable(numbers)
.filter(n -> n % 2 == 0)
.map(n -> n * 2);
StepVerifier.create(fluxPipeline)
.expectNext(4, 16);
}
4.2.惰性求值
Stream
和Flux
實例都是惰性的,這意味著只有在真正需要結果時才執行操作。
對於Stream
實例,需要一個終端操作符來執行管道:
@Test
void givenList_whenNoTerminalOperator_thenNoResponse() {
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
Function<Integer, Integer> mockMapper = mock(Function.class);
Stream<Integer> streamPipeline = numbers.stream()
.map(mockMapper);
verifyNoInteractions(mockMapper);
List<Integer> mappedList = streamPipeline.toList();
verify(mockMapper, times(5));
}
在這個例子中,我們可以看到,在呼叫終端操作符toList()
之前,沒有與mockMapper
函數的互動。
類似地, Flux
只有在有訂閱者時才開始處理:
@Test
void givenList_whenFluxNotSubscribed_thenNoResponse() {
Function<Integer, Integer> mockMapper = mock(Function.class);
when(mockMapper.apply(anyInt())).thenAnswer(i -> i.getArgument(0));
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
Flux<Integer> flux = Flux.fromIterable(numbers)
.map(mockMapper);
verifyNoInteractions(mockMapper);
StepVerifier.create(flux)
.expectNextCount(5)
.verifyComplete();
verify(mockMapper, times(5)).apply(anyInt());
}
在這個例子中,我們可以驗證Flux
最初不與mockMapper
互動。然而,一旦我們使用StepVerifier
訂閱它,我們就可以觀察到Flux
開始與其方法互動。
5. Stream
和Flux.fromIterable()
之間的主要差異
儘管Stream
和Flux.fromIterable()
之間存在一些相似之處,但它們的目的、性質以及處理元素的方式有所不同。讓我們探討一下這兩者之間的一些主要差異。
5.1.同步與非同步
Stream
和Flux.fromIterable()
之間的一個主要區別是它們如何處理執行。
Stream
同步運行,這意味著計算在呼叫終端操作的同一執行緒上按順序進行。如果我們使用Stream
處理大型資料集或執行耗時任務,它可以阻塞執行緒直到完成。如果沒有明確使用parallelStream()
或手動管理線程,則沒有內建方法可以非同步執行Stream
實例。即使在並行模式下,流也以阻塞方式運作。
另一方面, Flux.fromIterable()
專注於同步與非同步。預設情況下,它的行為是同步的-在呼叫執行緒上發射元素,類似於順序流。但是, Flux
提供了對非同步和非阻塞執行的內建支持,可以使用subscribeOn()
、 publishOn(),
和delayElements()
等操作符來啟用。這使得Flux
可以並發處理元素而不會阻塞主線程,從而使其更適合反應式應用程式。
讓我們檢查一下Flux.fromIterable()
的異步行為:
@Test
void givenList_whenProcessingTakesLongerThanEmission_thenEmittedBeforeProcessing() {
VirtualTimeScheduler.set(VirtualTimeScheduler.create());
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
Flux<Integer> sourceFlux = Flux.fromIterable(numbers)
.delayElements(Duration.ofMillis(500));
Flux<Integer> processedFlux = sourceFlux.flatMap(n ->
Flux.just(n * n)
.delayElements(Duration.ofSeconds(1))
);
StepVerifier.withVirtualTime(() -> Flux.merge(sourceFlux, processedFlux))
.expectSubscription()
.expectNoEvent(Duration.ofMillis(500))
.thenAwait(Duration.ofMillis(500 * 5))
.expectNextCount(7)
.thenAwait(Duration.ofMillis(5000))
.expectNextCount(3)
.verifyComplete();
}
在上面的例子中,我們每50
0ms 發出一次數字,並將處理延遲1
。使用merge()
合併兩個Flux
實例的結果。在 2500 毫秒結束時,所有元素均已發出,且其中兩個已被處理,因此我們預計總共有七個項目。五秒鐘後,所有元素都處理完畢,因此我們期待剩下的三個項目。
這裡,發射器不會等待處理器完成其操作,而是繼續獨立發射值。
5.2.例外處理
在Stream
中,異常導致管道立即終止。如果某個元素在處理過程中拋出異常,則整個流將停止,並且不會處理其他元素。
這是因為Stream
實例將異常視為終端事件。我們必須依賴map()
、 filter()
中的try-catch
區塊,或在每個階段使用自訂異常處理邏輯來防止故障傳播。
讓我們來看看Stream
管道在遇到除以零異常時的行為:
@Test
void givenList_whenDividedByZeroInStream_thenThrowException() {
List<Integer> numbers = List.of(1, 2, 0, 4, 5);
assertThrows(ArithmeticException.class, () -> numbers.stream()
.map(n -> 10 / n)
.toList());
}
這裡,異常立即終止管道,並且4
和5
永遠不會被處理。
相較之下, Flux
將錯誤視為數據,這意味著錯誤透過單獨的錯誤通道傳播,而不是直接終止管道。這樣,我們就可以透過使用Flux
中提供的內建方法(如onErrorResume()
、 onErrorContinue()
和onErrorReturn()
來優雅地處理異常,確保即使個別元素失敗,處理仍可繼續。
現在,讓我們看看如何使用Flux.fromIterable()
進行相同的處理:
@Test
void givenList_whenDividedByZeroInFlux_thenReturnFallbackValue() {
List<Integer> numbers = List.of(1, 2, 0, 4, 5);
Flux<Integer> flux = Flux.fromIterable(numbers)
.map(n -> 10 / n)
.onErrorResume(e -> Flux.just(-1));
StepVerifier.create(flux)
.expectNext(10, 5, -1)
.verifyComplete();
}
這裡捕獲了異常,並且Flux
沒有失敗,而是發出了一個後備值 (-1)。這使得Flux
更具彈性,使其能夠處理網路故障、資料庫錯誤或意外輸入等現實場景。
5.3.單一管道與多個訂閱者
Stream
的一個基本限制是它代表一次性管道。一旦呼叫終端操作(如forEach()
或collect()
, Stream
就會被消耗並且無法重複使用。每次我們需要處理相同的資料集時,必須建立一個新的Stream
:
@Test
void givenStream_whenReused_thenThrowException() {
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
Stream<Integer> doubleStream = numbers.stream()
.map(n -> n * 2);
assertEquals(List.of(2, 4, 6, 8, 10), doubleStream.toList());
assertThrows(IllegalStateException.class, doubleStream::toList);
}
在上面的範例中,當我們第一次在Stream
上呼叫終止操作toList()
,它會傳回預期的結果。但是,當我們嘗試再次呼叫它時,它會拋出一個IllegalStateException
,因為Stream
實例一旦被使用就無法重複使用。
另一方面, Flux
支援多個訂閱者,允許不同的消費者獨立地對相同的數據做出反應。這使得事件驅動架構成為可能,其中系統中的多個元件可以回應相同的資料流,而無需重新載入來源。
現在,讓我們使用Flux
執行相同的操作並看看它的行為:
@Test
void givenFlux_whenMultipleSubscribers_thenEachReceivesData() {
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
Flux<Integer> flux = Flux.fromIterable(numbers).map(n -> n * 2);
StepVerifier.create(flux)
.expectNext(2, 4, 6, 8, 10)
.verifyComplete();
StepVerifier.create(flux)
.expectNext(2, 4, 6, 8, 10)
.verifyComplete();
}
這裡我們可以看到每個訂閱者都收到了相同的數據,並且沒有拋出異常。這使得Flux
比Stream
更通用,特別是在多個元件需要監聽和處理相同資料而又不需要重複來源的場景中。
5.4.表現
Stream
實例針對高效能記憶體處理進行了最佳化。它們操作非常敏捷,這意味著所有轉換都在資料的一次傳遞中發生。
此外, Stream
支援使用parallelStream()
進行平行處理,透過ForkJoinPool
利用多個 CPU 核心進一步提高大資料集的執行速度。
另一方面, Flux.fromIterable()
是為反應式資料處理而設計的,這使得它更加靈活,但在記憶體計算方面本質上速度較慢。
由於Flux
本質上是異步的,每個資料元素都包裹在反應訊號(onNext,
onError
、 onComplete
) 中,從而增加了處理開銷。
Flux
遵循非阻塞執行模型,這對於 I/O 密集型操作有益,但對於純計算任務效率較低。
6. 功能比較:Java Stream
與Flux.fromIterable()
特徵 | Java Stream |
Flux.fromIterable() |
---|---|---|
執行模型 | 基於拉取– 消費者在需要時請求數據 | 基於推播-生產者非同步向消費者推送數據 |
加工風格 | 功能性、管線基礎 | 功能性、反應性、基於事件 |
同步或非同步 | 同步– 可以使用parallelStream() 進行並行化 |
非同步– 可以在多個執行緒上執行而不會阻塞 |
錯誤處理 | 沒有內建支援。我們需要使用a try-catch 區塊 |
錯誤被視為數據並通過單獨的錯誤通道傳播 |
多個訂閱者 | 不支援 – 一旦使用,串流就無法重複使用 | 多個訂閱者可以監聽同一個Flux |
用例 | 適用於快速、CPU 密集型、記憶體轉換 | 當我們需要非同步和非阻塞等反應性功能時,反應性地處理錯誤和批次 |
7. 結論
在本文中,我們探討了 Java 的Stream
和 Reactor 的Flux.fromIterable()
之間的相似之處和主要差異。了解這些差異使我們能夠根據應用程式的需求選擇正確的方法。
與往常一樣,本文使用的所有程式碼片段均可在 GitHub 上找到。