如何解決“java.lang.IllegalStateException:block()/blockFirst()/blockLast() 正在阻塞”
一、簡介
在本文中,我們將看到開發人員在使用 Spring Webflux 時常犯的一個錯誤。 Spring Webflux 是一個從頭開始建立的非阻塞 Web 框架,旨在利用多核心、下一代處理器並處理大量並發連接。
由於它是一個非阻塞框架,因此線程不應被阻塞。讓我們更詳細地探討這一點。
2. Spring Webflux線程模型
為了更好地理解這個問題,我們需要了解 Spring Webflux 的線程模型。
在 Spring Webflux 中,一小部分工作執行緒處理傳入請求。這與 Servlet 模型形成鮮明對比,在 Servlet 模型中,每個請求都有一個專用線程。因此,框架可以保護這些請求接受執行緒上發生的情況。
考慮到這一點,讓我們深入探討本文的主要焦點。
3. 理解IllegalStateException
與執行緒阻塞
讓我們透過一個範例來了解 Spring Webflux 中何時以及為何會出現錯誤「 java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread
」。
讓我們舉一個文件搜尋 API 的例子。該 API 從檔案系統讀取檔案並在檔案中搜尋使用者提供的文字。
3.1.文件服務
我們先定義一個FileService
類,它將檔案的內容當作字串讀取:
@Service
public class FileService {
@Value("${files.base.dir:/tmp/bael-7724}")
private String filesBaseDir;
public Mono<String> getFileContentAsString(String fileName) {
return DataBufferUtils.read(Paths.get(filesBaseDir + "/" + fileName), DefaultDataBufferFactory.sharedInstance, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY)
.map(dataBuffer -> dataBuffer.toString(StandardCharsets.UTF_8))
.reduceWith(StringBuilder::new, StringBuilder::append)
.map(StringBuilder::toString);
}
}
值得注意的是, FileService
反應式(非同步)地從檔案系統讀取檔案。
3.2.文件內容搜尋服務
我們準備利用此FileService
來編寫文件搜尋服務:
@Service
public class FileContentSearchService {
@Autowired
private FileService fileService;
public Mono<Boolean> blockingSearch(String fileName, String searchTerm) {
String fileContent = fileService
.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. BlockingSearch"))
.block();
boolean isSearchTermPresent = fileContent.contains(searchTerm);
return Mono.just(isSearchTermPresent);
}
}
文件搜尋服務會根據是否在文件中找到搜尋字詞傳回boolean
。為此,我們呼叫FileService
的getFileContentAsString()
方法。由於我們非同步取得結果,即作為Mono<String>
,我們呼叫block()
來取得String
值。之後,我們檢查fileContent
是否包含searchTerm.
最後,我們將結果包裝並返回Mono
。
3.3.文件控制器
最後,我們得到了FileController
,它使用了FileContentSearchService
的blockingSearch()
方法:
@RestController
@RequestMapping("bael7724/v1/files")
public class FileController {
...
@GetMapping(value = "/{name}/blocking-search")
Mono<Boolean> blockingSearch(@PathVariable("name") String fileName, @RequestParam String term) {
return fileContentSearchService.blockingSearch(fileName, term);
}
}
3.4.重現異常
我們可以觀察到Controller
呼叫了FileContentSearchService
的方法,而 FileContentSearchService 又呼叫了block()
方法。由於這是在請求接受線程上,因此如果我們在當前安排中呼叫 API,我們將遇到我們所追求的臭名昭著的異常:
12:28:51.610 [reactor-http-epoll-2] ERROR osbawreAbstractErrorWebExceptionHandler - [ea98e542-1] 500 Server Error for HTTP GET "/bael7724/v1/files/a/blocking-search?term=a"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/bael7724/v1/files/a/blocking-search?term=a" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
at reactor.core.publisher.Mono.block(Mono.java:1712)
at com.baeldung.bael7724.service.FileContentSearchService.blockingSearch(FileContentSearchService.java:20)
at com.baeldung.bael7724.controller.FileController.blockingSearch(FileController.java:35)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
3.5.根本原因
該異常的根本原因是在請求接受執行緒上呼叫了block()
。在上面的範例程式碼中,在接受請求的執行緒池中的執行緒之一上呼叫block()
方法。具體來說,在標記為「僅非阻塞操作」的線程上,即實作 Reactor 的NonBlocking
標記介面的線程,例如由Schedulers.parallel()
啟動的線程。
4、解決方案
現在讓我們看看可以採取哪些措施來解決此異常。
4.1.擁抱反應式操作
慣用的方法是使用反應式操作而不是呼叫block().
讓我們更新程式碼以使用*map()*操作將String
轉換為Boolean
:
public Mono<Boolean> nonBlockingSearch(String fileName, String searchTerm) {
return fileService.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. NonBlockingSearch"))
.map(content -> content.contains(searchTerm))
.doOnNext(content -> ThreadLogger.log("2. NonBlockingSearch"));
}
因此,我們完全不需要呼叫block()
。當我們運行上述方法時,我們注意到以下執行緒上下文:
[1. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506215299Z
[2. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506361786Z
[1. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506465805Z
[2. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506543145Z
上面的日誌語句表示我們已經在接受請求的相同執行緒池上執行了操作。
值得注意的是,即使我們沒有遇到異常,最好還是執行 I/O 操作,例如從不同執行緒池上的檔案讀取。
4.2.有界彈性線程池上的阻塞
假設由於某些原因我們無法避免使用block()
。那我們該怎麼辦呢?我們得出的結論是,當我們在接受請求的執行緒池上呼叫block()
時,發生了異常。因此,要呼叫block()
我們需要切換執行緒池。讓我們看看如何做到這一點:
public Mono<Boolean> workableBlockingSearch(String fileName, String searchTerm) {
return Mono.just("")
.doOnNext(s -> ThreadLogger.log("1. WorkableBlockingSearch"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(s -> ThreadLogger.log("2. WorkableBlockingSearch"))
.map(s -> fileService.getFileContentAsString(fileName)
.block()
.contains(searchTerm))
.doOnNext(s -> ThreadLogger.log("3. WorkableBlockingSearch"));
}
為了切換線程池,Spring Webflux提供了兩個操作publishOn()
和subscribeOn().
我們使用了publishOn()
,它更改了publishOn()
之後的操作的線程,而不影響訂閱或上游操作。由於線程池現在切換到有界彈性,我們可以呼叫block().
現在,如果我們執行workableBlockingSearch()
方法,我們將獲得以下執行緒上下文:
[1. WorkableBlockingSearch] ThreadName: parallel-2, Time: 2024-06-17T07:40:59.440562518Z
[2. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442161018Z
[3. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442891230Z
[1. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443058091Z
[2. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443181770Z
我們可以看到,從2
號開始,操作確實發生在有界彈性線程池上,因此,我們沒有得到IllegalStateException
。
4.3.注意事項
讓我們看看這個區塊解決方案的一些注意事項。
呼叫block()
有很多方法可能會出錯。讓我們舉一個例子,即使我們使用Scheduler
來切換線程上下文,它的行為也不會按照我們期望的方式進行:
public Mono<Boolean> incorrectUseOfSchedulersSearch(String fileName, String searchTerm) {
String fileContent = fileService.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. IncorrectUseOfSchedulersSearch"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(content -> ThreadLogger.log("2. IncorrectUseOfSchedulersSearch"))
.block();
boolean isSearchTermPresent = fileContent.contains(searchTerm);
return Mono.just(isSearchTermPresent);
}
在上面的程式碼範例中,我們按照解決方案中的建議使用了publishOn() ,但block()
方法仍然導致異常。當我們運行上面的程式碼時,我們將得到以下日誌:
[1. IncorrectUseOfSchedulersSearch] ThreadName: Thread-4, Time: 2024-06-17T08:57:02.490298417Z
[2. IncorrectUseOfSchedulersSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T08:57:02.491870410Z
14:27:02.495 [parallel-1] ERROR osbawreAbstractErrorWebExceptionHandler - [53e4bce1] 500 Server Error for HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
at reactor.core.publisher.Mono.block(Mono.java:1712)
at com.baeldung.bael7724.service.FileContentSearchService.incorrectUseOfSchedulersSearch(FileContentSearchService.java:64)
at com.baeldung.bael7724.controller.FileController.incorrectUseOfSchedulersSearch(FileController.java:48)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
這表示第二個日誌語句確實在有界彈性執行緒池上運行。然而,我們仍然遇到了異常。原因是block()
仍在同一請求接受執行緒池上執行。
讓我們看看另一個警告。即使我們正在切換線程池,我們也不能使用並行線程池,即Schedulers.parallel()
。如前所述,某些執行緒池不允許在其執行緒上呼叫block()
- 並行執行緒池就是其中之一。
最後,我們在範例中僅使用了Schedulers.boundedElastic()
。相反,我們也可以透過Schedulers.fromExecutorService()
使用任何自訂線程池。
5. 結論
總之,為了有效解決 Spring Webflux 中使用block()
等阻塞操作時出現IllegalStateException
的問題,我們應該採用非阻塞、反應式方法。透過利用諸如map()
之類的反應式運算符,我們可以在同一個反應式執行緒池上執行操作,從而消除了顯式block()
的需要。如果無法避免block()
,則使用publishOn()
將執行上下文切換到boundedElastic
調度程序或自訂線程池可以將這些操作與響應式請求接受線程池隔離,從而防止異常。
必須了解不支援阻塞呼叫的線程池,並確保應用正確的上下文切換以維持應用程式的穩定性和效能。
與往常一樣,本文中使用的源代碼可以在 GitHub 上取得。