Spring WebFlux 反應流中的條件語句
1. 概述
在 Spring WebFlux 反應流中使用條件語句可以在處理反應流時進行動態決策。與命令式方法不同,反應式方法中的條件邏輯不限於if-else
語句。相反,我們可以使用各種運算符,例如map(),
filter()
、 swithIfEmpty()
等,在不阻塞流的情況下引入條件流。
在本文中,我們將探索在Spring WebFlux 中使用條件語句的不同方法。除非明確指定,否則每種方法都適用於Mono
和Flux
。
2. 將條件結構與map()
一起使用
我們可以使用map()
運算子來轉換流的各個元素。此外,我們可以在映射器中使用if-else
語句來有條件地修改元素。
讓我們定義一個名為oddEvenFlux
的Flux
,並使用map()
運算子將其每個元素標記為「偶數」或「奇數」:
Flux<String> oddEvenFlux = Flux.just(1, 2, 3, 4, 5, 6)
.map(num -> {
if (num % 2 == 0) {
return "Even";
} else {
return "Odd";
}
});
我們應該注意, map()
是同步的,它在發出項目後立即應用轉換函數。
接下來,讓我們使用StepVerifier
來測試反應流的行為並確認每個項目的條件標籤:
StepVerifier.create(oddEvenFlux)
.expectNext("Odd")
.expectNext("Even")
.expectNext("Odd")
.expectNext("Even")
.expectNext("Odd")
.expectNext("Even")
.verifyComplete();
正如預期的那樣,每個數字都根據其奇偶性進行標記。
3.使用filter()
我們可以使用filter()
運算子透過謂詞過濾掉數據,確保下游運算子只收到相關資料。
讓我們從數位流建立一個名為evenNumbersFlux
的新Flux
:
Flux<Integer> evenNumbersFlux = Flux.just(1, 2, 3, 4, 5, 6)
.filter(num -> num % 2 == 0);
在這裡,我們為filter()
運算子添加了一個謂詞來確定數字是否為偶數。
現在,讓我們驗證evenNumbersFlux
只允許偶數向下游傳遞:
StepVerifier.create(evenNumbersFlux)
.expectNext(2)
.expectNext(4)
.expectNext(6)
.verifyComplete();
偉大的!它按預期工作。
4. 使用switchIfEmpty()
和defaultIfEmpty()
在本節中,我們將了解兩個有用的運算符,它們在底層通量不發出任何項目時啟用條件資料流。
4.1.使用switchIfEmpty()
當底層通量不發布任何項目時,我們可能想要切換到替代流。在這種情況下,我們可以透過switchIfEmpty()
運算子提供替代發布者。
假設我們有一系列與filter()
運算子連結的單字,該運算子僅允許長度為兩個或更多字元的單字:
Flux<String> flux = Flux.just("A", "B", "C", "D", "E")
.filter(word -> word.length() >= 2);
當然,當沒有單字滿足過濾條件時,通量不會發出任何項目。
現在,讓我們透過switchIfEmpty()
運算子提供替代通量:
flux = flux.switchIfEmpty(Flux.defer(() -> Flux.just("AA", "BB", "CC")));
我們使用Flux.defer()
方法來確保僅當上游通量不產生任何項目時才創建替代通量。
最後,讓我們驗證產生的通量是否從備用源產生所有項目:
StepVerifier.create(flux)
.expectNext("AA")
.expectNext("BB")
.expectNext("CC")
.verifyComplete();
結果看起來是正確的。
4.2.使用defaultIfEmpty()
或者,當上游通量不發出任何項目時,我們可以使用defaultIfEmpty()
運算子來提供後備值,而不是替代發布者:
flux = flux.defaultIfEmpty("No words found!");
使用switchIfEmpty()
和defaultIfEmpty()
之間的另一個主要區別是我們僅限於對後者使用單一預設值。
現在,讓我們驗證反應流的條件流:
StepVerifier.create(flux)
.expectNext("No words found!")
.verifyComplete();
我們做對了這一點。
5. 使用flatMap()
我們可以使用flatMap()
運算子在反應流中建立多個條件分支,同時保持非阻塞、非同步流。
讓我們來看看從單字建立並使用兩個flatMap()
運算子進行更改的Flux
:
Flux<String> flux = Flux.just("A", "B", "C")
.flatMap(word -> {
if (word.startsWith("A")) {
return Flux.just(word + "1", word + "2", word + "3");
} else {
return Flux.just(word);
}
})
.flatMap(word -> {
if (word.startsWith("B")) {
return Flux.just(word + "1", word + "2");
} else {
return Flux.just(word);
}
});
我們透過新增兩個階段的條件轉換來建立動態分支,從而為反應流的每個項目提供多個邏輯路徑。
現在,是時候驗證我們的反應流的條件流了:
StepVerifier.create(flux)
.expectNext("A1")
.expectNext("A2")
.expectNext("A3")
.expectNext("B1")
.expectNext("B2")
.expectNext("C")
.verifyComplete();
極好的!看起來我們已經解決了這個問題。此外,我們可以將flatMapMany()
用於Mono
發布者來實作類似的用例。
6. 使用副作用運算符
在本節中,我們將探討如何在處理反應流時執行基於條件的同步操作。
6.1.使用doOnNext()
我們可以使用doOnNext()
運算子為反應流的每個項目同步執行副作用運算。
讓我們先定義evenCounter
變數來追蹤反應流中偶數的計數:
AtomicInteger evenCounter = new AtomicInteger(0);
現在,讓我們建立一個整數流並將其與doOnNext()
運算子連結在一起以增加偶數的計數:
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6)
.doOnNext(num -> {
if (num % 2 == 0) {
evenCounter.incrementAndGet();
}
});
我們在 if 區塊中新增了該操作,從而啟用了計數器的有條件增量。
接下來,我們必須在處理反應流中的每個項目後驗證evenCounter
的邏輯和狀態:
StepVerifier.create(flux)
.expectNextMatches(num -> num == 1 && evenCounter.get() == 0)
.expectNextMatches(num -> num == 2 && evenCounter.get() == 1)
.expectNextMatches(num -> num == 3 && evenCounter.get() == 1)
.expectNextMatches(num -> num == 4 && evenCounter.get() == 2)
.expectNextMatches(num -> num == 5 && evenCounter.get() == 2)
.expectNextMatches(num -> num == 6 && evenCounter.get() == 3)
.verifyComplete();
偉大的!我們已經得到了預期的結果。
6.2.使用doOnComplete()
同樣,我們還可以根據從反應流接收訊號的條件來關聯操作,例如在發布所有項目後發送的完整訊號。
讓我們從初始化done
標誌開始:
AtomicBoolean done = new AtomicBoolean(false);
現在,讓我們定義一個整數通量並新增一個使用doOnComplete()
運算子done
標誌設為true
操作:
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6)
.doOnComplete(() -> done.set(true));
需要注意的是,完整的訊號僅發送一次,因此副作用動作最多會被觸發一次。
此外,讓我們透過在各個步驟驗證done
標誌來驗證副作用的條件執行:
StepVerifier.create(flux)
.expectNextMatches(num -> num == 1 && !done.get())
.expectNextMatches(num -> num == 2 && !done.get())
.expectNextMatches(num -> num == 3 && !done.get())
.expectNextMatches(num -> num == 4 && !done.get())
.expectNextMatches(num -> num == 5 && !done.get())
.expectNextMatches(num -> num == 6 && !done.get())
.then(() -> Assertions.assertTrue(done.get()))
.expectComplete()
.verify();
完美的!我們可以看到,只有在所有項目都成功發出後, done
標誌才會設定為true
。但是,需要注意的是, doOnComplete()
僅適用於Flux
發布者,對於Mono
發布者我們必須使用doOnSuccess()
。
7. 使用firstOnValue()
有時,我們可能有多個來源來收集數據,但每個來源可能有不同的延遲。從效能的角度來看,最好使用延遲最少的來源中的值。對於這種條件資料訪問,我們可以使用firstOnValue()
運算子。
首先,我們定義兩個來源,分別是source1
和source2,
延遲分別為200毫秒和10毫秒:
Mono<String[]> source1 = Mono.defer(() -> Mono.just(new String[] { "val", "source1" })
.delayElement(Duration.ofMillis(200)));
Mono<String[]> source2 = Mono.defer(() -> Mono.just(new String[] { "val", "source2" })
.delayElement(Duration.ofMillis(10)));
接下來,讓我們對兩個來源使用firstWithValue()
運算符,並依賴框架的條件邏輯來處理資料存取:
Mono<String[]> mono = Mono.firstWithValue(source1, source2);
最後,讓我們透過將發出的項目與來自延遲較低的來源的資料進行比較來驗證結果:
StepVerifier.create(mono)
.expectNextMatches(item -> "val".equals(item[0]) && "source2".equals(item[1]))
.verifyComplete();
出色的!我們做對了這一點。此外,值得注意的是, firstWithValue()
僅適用於Mono
發布者。
8. 使用zip()
和zipWhen()
在本節中,我們將學習如何使用zip()
和zipWhen()
運算子來利用條件流。
8.1.使用zip()
我們可以使用zip()
運算子來合併多個來源的排放。進一步,我們可以利用它的組合器函數來加入條件邏輯來進行資料處理。讓我們看看如何使用它來確定快取和資料庫中的值是否不一致
首先,我們定義dataFromDB
和dataFromCache
發布者來模擬具有不同延遲和值的來源:
Mono<String> dataFromDB = Mono.defer(() -> Mono.just("db_val")
.delayElement(Duration.ofMillis(200)));
Mono<String> dataFromCache = Mono.defer(() -> Mono.just("cache_val")
.delayElement(Duration.ofMillis(10)));
現在,我們將它們zip
並使用其組合器函數來新增判斷快取是否一致的條件:
Mono<String[]> mono = Mono.zip(dataFromDB, dataFromCache,
(dbValue, cacheValue) ->
new String[] { dbValue, dbValue.equals(cacheValue) ? "VALID_CACHE" : "INVALID_CACHE" });
最後,讓我們驗證一下這個模擬,並驗證快取是否不一致,因為db_val
與cache_val
不同:
StepVerifier.create(mono)
.expectNextMatches(item -> "db_val".equals(item[0]) && "INVALID_CACHE".equals(item[1]))
.verifyComplete();
結果看起來是正確的。
8.2.使用zipWhen()
當我們想要僅在存在來自第一個來源的發射的情況下收集來自第二個來源的發射時, zipWhen()
運算符更合適。此外,我們可以使用組合器函數來新增條件邏輯來處理我們的反應流。
假設我們要計算使用者的年齡類別:
int userId = 1;
Mono<String> userAgeCategory = Mono.defer(() -> Mono.just(userId))
.zipWhen(id -> Mono.defer(() -> Mono.just(20)), (id, age) -> age >= 18 ? "ADULT" : "KID");
我們模擬了具有有效使用者 ID 的使用者的場景。因此,我們可以保證第二個發布者的排放。隨後,我們將取得使用者的年齡類別。
現在,我們來驗證一下這個場景,並確認20
歲的用戶被歸類為「ADULT」:
StepVerifier.create(userDetail)
.expectNext("ADULT")
.verifyComplete();
接下來,我們使用Mono.empty()
來模擬找不到有效使用者的場景的分類:
Mono<String> noUserDetail = Mono.empty()
.zipWhen(id -> Mono.just(20), (id, age) -> age >= 18 ? "ADULT" : "KID");
最後,我們可以確認本例中沒有排放:
StepVerifier.create(noUserDetail)
.verifyComplete();
完美的!我們得到了這兩種情況的預期結果。此外,我們需要注意**zipWhen()
僅適用於Mono
發布者**。
9. 結論
在本文中,我們學習如何在 Spring WebFlux 中包含條件語句。此外,我們也探討了不同運算子如何促進條件流,例如map(),
flatMap()
、 zip()
、 firstOnValue()
、 switchIfEmpty()
、 defaultIfEmpty()
等。
與往常一樣,本文中的程式碼可以在 GitHub 上取得。