在 Spring Boot 中將 CompletableFuture 與 Feign 用戶端結合使用
一、簡介
使用分散式系統時,呼叫外部 Web 相依性同時保持低延遲是一項關鍵任務。
在本教程中,我們將使用 OpenFeign 和CompletableFuture
並行化多個 HTTP 請求、處理錯誤並設定網路和執行緒逾時。
2. 設定演示應用程式
為了說明平行請求的用法,我們將建立一個允許客戶在網站上購買商品的功能。首先,該服務提出一個請求,根據客戶居住的國家/地區取得可用的付款方式。其次,它請求向客戶產生有關購買的報告。購買報告不包含付款方式的資訊。
因此,我們首先新增依賴項以使用[spring-cloud-starter-openfeign](https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-openfeign)
:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
3. 建立外部依賴客戶端
現在,讓我們使用@FeignClient
註解建立兩個指向localhost:8083
客戶端:
@FeignClient(name = "paymentMethodClient", url = "http://localhost:8083")
public interface PaymentMethodClient {
@RequestMapping(method = RequestMethod.GET, value = "/payment_methods")
String getAvailablePaymentMethods(@RequestParam(name = "site_id") String siteId);
}
我們的第一個客戶端名稱是paymentMethodClient
。它呼叫 GET /payment_methods
使用代表客戶國家/地區的site_id
請求參數來取得可用的付款方式。
讓我們看看我們的第二個客戶:
@FeignClient(name = "reportClient", url = "http://localhost:8083")
public interface ReportClient {
@RequestMapping(method = RequestMethod.POST, value = "/reports")
void sendReport(@RequestBody String reportRequest);
}
我們將其命名為reportClient
,它調用 POST /reports
來產生購買報告。
4. 建立並行請求執行器
按順序呼叫兩個客戶端就足以滿足演示應用程式的要求。在這種情況下,該 API 的總回應時間將至少是兩個請求回應時間的總和。
值得注意的是,該報告不包含有關付款方式的信息,因此這兩個請求是獨立的。因此,我們可以並行化工作,將 API 的總回應時間減少到與最慢請求的回應時間大致相同。
在接下來的部分中,我們將了解如何建立 HTTP 呼叫的平行執行器並處理外部錯誤。
4.1.建立並行執行器
因此,讓我們使用CompletableFuture
來建立並行化兩個請求的服務:
@Service
public class PurchaseService {
private final PaymentMethodClient paymentMethodClient;
private final ReportClient reportClient;
// all-arg constructor
public String executePurchase(String siteId) throws ExecutionException, InterruptedException {
CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() ->
paymentMethodClient.getAvailablePaymentMethods(siteId));
CompletableFuture.runAsync(() -> reportClient.sendReport("Purchase Order Report"));
return String.format("Purchase executed with payment method %s", paymentMethodsFuture.get());
}
}
executePurchase()
方法首先發布一個平行任務,以使用supplyAsync()
來取得可用的付款方式。然後,我們提交另一個並行任務以使用runAsync()
產生報表。最後,我們使用get()
檢索支付方式結果並傳回完整結果。
為兩個任務選擇supplyAsync()
和runAsync()
是由於這兩個方法的不同性質。 supplyAsync()
方法傳回 GET 呼叫的結果。另一方面, runAsync()
不傳回任何內容,因此它更適合產生報表。
另一個差異是,一旦我們呼叫程式碼, runAsync()
就會立即啟動一個新線程,而無需線程池進行任何任務調度。相反, supplyAsync()
任務可能會被調度或延遲,這取決於線程池是否調度了其他任務。
為了驗證我們的程式碼,讓我們使用 WireMock 進行整合測試:
@BeforeEach
public void startWireMockServer() {
wireMockServer = new WireMockServer(8083);
configureFor("localhost", 8083);
wireMockServer.start();
stubFor(post(urlEqualTo("/reports"))
.willReturn(aResponse().withStatus(HttpStatus.OK.value())));
}
@AfterEach
public void stopWireMockServer() {
wireMockServer.stop();
}
@Test
void givenRestCalls_whenBothReturnsOk_thenReturnCorrectResult() throws ExecutionException, InterruptedException {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withStatus(HttpStatus.OK.value()).withBody("credit_card")));
String result = purchaseService.executePurchase("BR");
assertNotNull(result);
assertEquals("Purchase executed with payment method credit_card", result);
}
在上面的測試中,我們首先使用@BeforeEach
和@AfterEach
註釋將WireMockServer
配置為在localhost:8083
啟動並在完成後關閉。
然後,在測試場景方法中,我們使用了兩個存根,當我們呼叫兩個 feign 用戶端時,它們會回應200
HTTP 狀態。最後,我們使用assertEquals()
來斷言並行執行器的正確結果。
4.2.使用exceptionally()
處理外部 API 錯誤
如果 GET /payment_methods
請求失敗並出現404
HTTP 狀態(表示該國家/地區沒有可用的付款方式)怎麼辦?在此類場景中執行某些操作非常有用,例如傳回預設值。
為了處理CompletableFuture
中的錯誤,讓我們將以下exceptionally()
區塊加入到paymentMethodsFuture
中:
CompletableFuture <String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
.exceptionally(ex -> {
if (ex.getCause() instanceof FeignException &&
((FeignException) ex.getCause()).status() == 404) {
return "cash";
});
現在,如果我們收到404
,我們將回傳名為cash:
@Test
void givenRestCalls_whenPurchaseReturns404_thenReturnDefault() throws ExecutionException, InterruptedException {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withStatus(HttpStatus.NOT_FOUND.value())));
String result = purchaseService.executePurchase("BR");
assertNotNull(result);
assertEquals("Purchase executed with payment method cash", result);
}
5. 為平行任務和網路請求新增逾時
當呼叫外部依賴項時,我們無法確定請求需要運行多長時間。因此,如果一個請求花費的時間太長,在某些時候我們應該放棄該請求。考慮到這一點,我們可以添加兩種類型: FeignClient
和CompletableFuture
逾時。
5.1.為 Feign 用戶端新增網路逾時
這種類型的逾時適用於透過網路傳送的單一請求。因此,它在網路層級切斷了與一個請求的外部依賴的連接。
我們可以使用 Spring Boot 自動配置為FeignClient
配置逾時:
feign.client.config.paymentMethodClient.readTimeout: 200
feign.client.config.paymentMethodClient.connectTimeout: 100
在上面的application.properties
檔案中,我們為PaymentMethodClient
設定讀取和連接逾時.
數值以毫秒為單位測量。
連線逾時告訴 feign 用戶端在閾值之後停止 TCP 握手連線嘗試。同樣,當連接正確時,讀取逾時會中斷請求,但協定無法從套接字讀取資料。
然後,我們可以在並行執行器的exceptionally()
區塊內處理該類型的錯誤:
if (ex.getCause() instanceof RetryableException) {
// handle TCP timeout
throw new RuntimeException("TCP call network timeout!");
}
為了驗證正確的行為,我們可以新增另一個測試場景:
@Test
void givenRestCalls_whenPurchaseRequestWebTimeout_thenReturnDefault() {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withFixedDelay(250)));
Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));
assertEquals("java.lang.RuntimeException: REST call network timeout!", error.getMessage());
}
在這裡,我們使用了250
毫秒的withFixedDelay()
方法來模擬 TCP 逾時。
5.2.新增線程超時
另一方面,執行緒逾時會停止整個CompletableFuture
內容,而不僅僅是單一請求嘗試。例如,對於假客戶端重試,在評估逾時閾值時,原始請求和重試嘗試的次數也會計算在內。
要設定執行緒逾時,我們可以稍微修改我們的付款方法CompletableFuture
:
CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
.orTimeout(400, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
// exception handlers
});
然後,我們可以在exceptionally()
區塊內處理威脅逾時錯誤:
if (ex instanceof TimeoutException) {
// handle thread timeout
throw new RuntimeException("Thread timeout!", ex);
}
因此,我們可以驗證它是否正常工作:
@Test
void givenRestCalls_whenPurchaseCompletableFutureTimeout_thenThrowNewException() {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withFixedDelay(450)));
Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));
assertEquals("java.lang.RuntimeException: Thread timeout!", error.getMessage());
}
我們為 / payments_method
添加了更長的延遲,因此它通過了網路逾時閾值,但在執行緒逾時時失敗。
六、結論
在本文中,我們學習如何使用CompletableFuture
和FeignClient.
我們也了解如何新增網路和執行緒逾時以在時間閾值後中斷程式執行。
最後,我們使用CompletableFuture.exceptionally()
優雅地處理了404
API 和逾時錯誤。
與往常一樣,原始碼可以在 GitHub 上取得。