如何將虛擬線程與 ScheduledExecutorService 一起使用
一、簡介
虛擬線程是 JDK 21 中正式引入的一項實用功能,作為提高高吞吐量應用程式效能的解決方案。
然而,JDK並沒有內建使用虛擬執行緒的任務調度工具。因此,我們必須編寫使用虛擬執行緒運行的任務排程器。
在本文中,我們將使用Thread.sleep()
方法和ScheduledExecutorService
類別為虛擬執行緒建立自訂排程器。
2.什麼是虛擬線程?
虛擬執行緒作為Thread
類別的輕量級版本在JEP-444中引入,最終提高了高吞吐量應用程式中的並發性。
虛擬線程使用的空間比通常的作業系統線程(或平台線程)少得多。因此,我們可以在應用程式中同時產生比平台執行緒更多的虛擬執行緒。毫無疑問,這增加了最大並發單元數,這也增加了我們應用程式的吞吐量。
關鍵的一點是虛擬線程並不比平台線程快。在我們的應用程式中,它們僅以比平台執行緒更多的數量出現,以便它們可以執行更多並行工作。
虛擬執行緒很便宜,因此我們不需要使用資源池等技術將任務調度到有限數量的執行緒。相反,我們可以在現代電腦中幾乎無限地生成它們,而不會出現記憶體問題。
最後,虛擬線程是動態的,而平台線程的大小是固定的。因此,虛擬執行緒比平台執行緒更適合小型任務,例如簡單的 HTTP 或資料庫呼叫。
3. 虛擬執行緒調度
我們已經看到虛擬線程的一大優點是它們小且便宜。我們可以在一台簡單的機器中有效地產生數十萬個虛擬線程,而不會陷入記憶體不足錯誤。因此,像我們使用更昂貴的資源(例如平台線程和網路或資料庫連接)那樣池化虛擬線程沒有多大意義。
透過保留執行緒池,我們為池中的可用執行緒創建了另一個池任務開銷,這更加複雜且可能更慢。此外,Java中的大多數執行緒池都受到平台執行緒數量的限制,該數量總是小於程式中可能的虛擬執行緒數量。
因此,我們必須避免將虛擬執行緒與執行緒池 API(例如ForkJoinPool
或ThreadPoolExecutor
一起使用。相反,我們應該始終為每個任務建立一個新的虛擬執行緒。
目前,Java 不提供標準 API 來調度虛擬線程,就像我們使用其他並發 API(例如ScheduledExecutorService's schedule()
方法)一樣。因此,為了有效地使我們的虛擬執行緒運行計劃任務,我們需要編寫自己的調度程序。
3.1.使用Thread.sleep()
調度虛擬線程
我們將看到創建自訂調度程序的最直接方法是使用Thread.sleep()
方法使程式等待當前執行緒執行:
static Future<?> schedule(Runnable task, int delay, TemporalUnit unit, ExecutorService executorService) {
return executorService.submit(() -> {
try {
Thread.sleep(Duration.of(delay, unit));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
task.run();
});
}
schedule()
方法接收要調度的task
、 delay
和ExecutorService
。然後,我們使用ExecutorService
的submit()
啟動任務。在try
區塊中,我們透過呼叫Thread.sleep().
因此,執行緒在等待時可能會中斷,因此我們透過中斷目前執行緒執行來處理InterruptedException
。
最後,等待之後,我們呼叫run()
來接收task
。
要使用自訂的schedule()
方法來調度虛擬線程,我們需要將虛擬線程的執行器服務傳遞給它:
ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
try (virtualThreadExecutor) {
var taskResult = schedule(() ->
System.out.println("Running on a scheduled virtual thread!"), 5, ChronoUnit.SECONDS,
virtualThreadExecutor);
try {
Thread.sleep(10 * 1000); // Sleep for 10 seconds to wait task results
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
}
System.out.println(taskResult.get());
}
首先,我們實例化一個ExecutorService
,它為我們提交的每個任務產生一個新的虛擬線程。然後,我們將virtualThreadExecutor
變數包裝在try-with-resources
語句中,使執行程式服務保持開啟狀態,直到我們使用完它為止。或者,在使用執行器服務後,我們可以使用[shutdown()](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/ExecutorService.html#shutdown()) .
我們在5秒後呼叫schedule()
來執行任務,然後等待10秒再嘗試取得任務執行結果。
3.2.使用SingleThreadExecutor
調度虛擬線程
我們了解如何使用sleep()
將任務調度到虛擬執行緒。或者,我們可以在虛擬執行緒執行器中為每個提交的任務實例化一個新的單執行緒調度程序:
static Future<?> schedule(Runnable task, int delay, TimeUnit unit, ExecutorService executorService) {
return executorService.submit(() -> {
ScheduledExecutorService singleThreadScheduler = Executors.newSingleThreadScheduledExecutor();
try (singleThreadScheduler) {
singleThreadScheduler.schedule(task, delay, unit);
}
});
}
程式碼也使用作為參數傳遞的虛擬線程ExecutorService
來提交任務。但現在,對於每個任務,我們使用newSingleThreadScheduledExecutor()
方法實例化單一執行緒的單一ScheduledExecutorService
。
然後,在try-with-resources
區塊內,我們使用單一執行緒執行器schedule()
方法來調度任務。此方法接受task
和delay
量作為參數,並且不會像 sleep() 那樣拋出已檢查的InterruptedException
sleep().
最後,我們可以使用schedule()
將任務調度到虛擬執行緒執行器:
ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
try (virtualThreadExecutor) {
var taskResult = schedule(() ->
System.out.println("Running on a scheduled virtual thread!"), 5, TimeUnit.SECONDS,
virtualThreadExecutor);
try {
Thread.sleep(10 * 1000); // Sleep for 10 seconds to wait task results
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
}
System.out.println(taskResult.get());
}
這類似於第 3.1 節中的schedule()
方法的用法,但在這裡,我們傳遞一個TimeUnit
而不是ChronoUnit
。
3.3.使用sleep()
與調度單執行緒執行器調度任務
在sleep()
調度方法中,我們只是在有效運行任務之前呼叫一個等待方法。因此,很容易理解程式碼在做什麼,並且更容易調試它。另一方面,每個任務使用計劃執行程序服務取決於庫的調度程序代碼,因此可能更難調試或排除故障。
此外,如果我們選擇使用sleep()
,我們只能安排任務在固定延遲後執行。相反,使用ScheduledExecutorService
,我們可以存取三種調度方法: schedule()
、 scheduleAtFixedRate(),
和scheduleWithFixedDelay().
ScheduledExecutorService's
schedule()
方法會加入延遲,就像sleep()
一樣。 scheduleAtFixedRate()
和scheduleWithFixedDelay() methods
為調度添加了週期性,因此我們可以在固定大小的周期內重複任務執行。因此,我們使用ScheduledExecutorService
內建Java庫來調度任務時具有更大的靈活性。
4. 結論
在本文中,我們介紹了使用虛擬線程相對於傳統平台線程的一些優點。然後,我們研究了使用Thread.sleep()
和ScheduledExecutorService
來安排任務在虛擬執行緒中執行。
與往常一樣,原始碼可以在 GitHub 上取得。