Spring Reactive 中的 Mono.fromCallable 與 Mono.justOrEmpty
1. 概述
在響應式程式設計中,處理和轉換資料流對於建立響應式應用程式至關重要。創建Mono
實例的兩種常用方法是Mono.fromCallable
和Mono.justOrEmpty.
這兩種方法都有其獨特的用途,這取決於我們希望如何處理流中的可空性和惰性求值。
在本教程中,我們將探討這些方法之間的差異,展示Mono.fromCallable
如何透過包裝計算來推遲執行並優雅地處理錯誤,而Mono.justOrEmpty
直接從可選值創建Mono
實例,從而簡化了可能為null 的情況資料。
Mono
簡介
Mono
是 Project Reactor 中的一個[Publisher](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/package-summary.html)
,代表最多發出一個值的流。它可以包含一個值,也可以為空,也可以以錯誤結束。
Mono
支援兩種類型的發布者: 冷發布者和熱發布者。
冷發布者只會在消費者訂閱後發布元素,確保每個消費者從一開始就收到數據,而熱發布者在創建後立即發出數據,無論訂閱如何。
Mono
的fromCallable
是冷發布者的一個例子:它在訂閱時延遲返回Mono
。相反, justOrEmpty
是一個充當熱發布者的Mono
,立即發出其數據,無需等待任何訂閱。
來源:projectreactor.io
3. Mono.fromCallable
fromCallable
接受一個Callable
介面並傳回一個Mono
,該 Mono 會延遲該Callable
的執行,直到有訂閱為止。如果Callable
解析為null
,則產生的Mono
將完成為空。
讓我們考慮一個範例用例,其中我們在每次方法呼叫之間以一致的五秒延遲獲取資料。我們將將此邏輯設為延遲,因此它僅在訂閱時執行:
public String fetchData() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Data Fetched";
}
接下來,我們將定義一個使用fromCallable
來建立Mono
發布者的方法。 timeTakenForCompletion
屬性測量訂閱開始和收到onComplete
訊號之間的持續時間:
public void givenDataAvailable_whenCallingFromCallable_thenLazyEvaluation() {
AtomicLong timeTakenForCompletion = new AtomicLong();
Mono<String> dataFetched = Mono.fromCallable(this::fetchData)
.doOnSubscribe(subscription -> timeTakenForCompletion.set(-1 * System.nanoTime()))
.doFinally(consumer -> timeTakenForCompletion.addAndGet(System.nanoTime()));
StepVerifier.create(dataFetched)
.expectNext("Data Fetched")
.verifyComplete();
}
最後,斷言驗證從訂閱到收到onComplete
訊號的時間與預期的五秒延遲緊密一致,確認fromCallable
延遲執行直到訂閱:
assertThat(TimeUnit.NANOSECONDS.toMillis(timeTakenForCompletion.get()))
.isCloseTo(5000L, Offset.offset(50L));
3.1.內建錯誤處理
fromCallable
也支援內建錯誤處理,如果Callable
拋出異常, fromCallable
會捕捉該異常,從而允許錯誤透過反應式串流傳播。
讓我們考慮相同的範例來查看fromCallable
的錯誤處理:
public void givenExceptionThrown_whenCallingFromCallable_thenFromCallableCapturesError() {
Mono<String> dataFetched = Mono.fromCallable(() -> {
String data = fetchData();
if (data.equals("Data Fetched")) {
throw new RuntimeException("ERROR");
}
return data;
})
.onErrorResume(error -> Mono.just("COMPLETED"));
StepVerifier.create(dataFetched)
.expectNext("COMPLETED")
.verifyComplete();
}
4. Mono.justOrEmpty
justOrEmpty
建立一個Mono
,它要麼包含一個值,要麼在值為null
時完全為空。與fromCallable
不同,它不會推遲執行,而是在Mono
創建後立即進行計算。
justOrEmpty
不會傳播錯誤,因為它旨在處理可為 null 的值。
讓我們像之前一樣回顧一下模擬資料獲取的用例。這次,我們將使用justOrEmpty
方法來建立Mono
類型的發布者。
timeTakenToReceiveOnCompleteSignalAfterSubscription
屬性追蹤從訂閱到接收onComplete
訊號的時間。 timeTakenForMethodCompletion
屬性測量方法完成所需的總時間:
public void givenDataAvailable_whenCallingJustOrEmpty_thenEagerEvaluation() {
AtomicLong timeTakenToReceiveOnCompleteSignalAfterSubscription = new AtomicLong();
AtomicLong timeTakenForMethodCompletion = new AtomicLong(-1 * System.nanoTime());
Mono<String> dataFetched = Mono.justOrEmpty(fetchData())
.doOnSubscribe(subscription -> timeTakenToReceiveOnCompleteSignalAfterSubscription
.set(-1 * System.nanoTime()))
.doFinally(consumer -> timeTakenToReceiveOnCompleteSignalAfterSubscription
.addAndGet(System.nanoTime()));
timeTakenForMethodCompletion.addAndGet(System.nanoTime());
StepVerifier.create(dataFetched)
.expectNext("Data Fetched")
.verifyComplete();
}
讓我們寫一個斷言來證明從訂閱到收到onComplete訊號的時間非常短,確認Mono
已被急切地創建:
assertThat(TimeUnit.NANOSECONDS.toMillis(timeTakenToReceiveOnCompleteSignalAfterSubscription
.get())).isCloseTo(1L, Offset.offset(1L));
接下來,我們確認五秒的延遲包含在方法的完成時間中,並且發生在訂閱Mono
之前:
assertThat(TimeUnit.NANOSECONDS.toMillis(timeTakenForMethodCompletion.get()))
.isCloseTo(5000L, Offset.offset(50L));
5. 何時使用fromCallable
讓我們來看看可以使用Mono.fromCallable()
方法的用例:
- 當我們需要有條件地訂閱發布者以節省資源時
- 當每個訂閱都會導致不同的結果時
- 當操作有可能拋出異常時,我們希望它們透過反應流傳播
5.1.使用範例
讓我們來看看一個範例用例,其中條件延遲執行是有益的:
public Optional<String> fetchLatestStatus() {
List<String> activeStatusList = List.of("ARCHIVED", "ACTIVE");
if (activeStatusList.contains("ARCHIVED")) {
return Optional.empty();
}
return Optional.of(activeStatusList.get(0));
}
public void givenLatestStatusIsEmpty_thenCallingFromCallableForEagerEvaluation() {
Optional<String> latestStatus = fetchLatestStatus();
String updatedStatus = "ACTIVE";
Mono<String> currentStatus = Mono.justOrEmpty(latestStatus)
.switchIfEmpty(Mono.fromCallable(()-> updatedStatus));
StepVerifier.create(currentStatus)
.expectNext(updatedStatus)
.verifyComplete();
}
在此範例中, Mono
發布者是使用fromCallable
在switchIfEmpty
方法中定義的,允許它有條件地執行。因此,只有當Mono
被訂閱時才會回到ACTIVE
狀態,使其成為惰性執行。
六、結論
在本文中,我們討論了充當冷發布者的Mono.fromCallable
方法和充當熱發布者的Mono.justOrEmpty
。我們還探討了何時使用fromCallable
方法與justOrEmpty
,突出顯示它們的差異並討論範例用例。
與往常一樣,完整的源代碼可以在 GitHub 上取得。