Java SynchronousQueue指南
1.概述
在本文中,我們將從java.util.concurrent包中查看*SynchronousQueue* 。
簡而言之,此實現使我們能夠以線程安全的方式在線程之間交換信息。
2. API概述
SynchronousQueue僅具有兩個受支持的操作: take()和put(),它們都處於阻塞狀態。
例如,當我們想向隊列添加元素時,我們需要調用put()方法。該方法將阻塞,直到其他線程調用take()方法,表明已準備好接受元素。
儘管SynchronousQueue具有隊列的接口,但我們應該將其視為兩個線程之間單個元素的交換點,其中一個線程傳遞一個元素,另一個線程傳遞該元素。
3.使用共享變量實現切換
為了了解為什麼SynchronousQueue如此有用,我們將使用兩個線程之間的共享變量來實現一個邏輯,接下來,我們將使用SynchronousQueue重寫該邏輯,使我們的代碼更簡單易讀。
假設我們有兩個線程(生產者和消費者),並且當生產者設置共享變量的值時,我們希望將該事實告知消費者線程。接下來,使用者線程將從共享變量中獲取值。
我們將使用CountDownLatch來協調這兩個線程,以防止使用者訪問尚未設置的共享變量的值的情況。
我們將定義一個sharedState變量和一個CountDownLatch ,它們將用於協調處理:
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);
生產者將一個隨機整數保存到sharedState變量,並在countDownLatch上執行countDown()方法,向消費者表明它可以從sharedState中獲取值:
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
sharedState.set(producedElement);
countDownLatch.countDown();
};
使用者將使用await()方法等待**countDownLatch 。當生產者發出設置了變量的信號時,消費者將從sharedState獲取它:
Runnable consumer = () -> {
try {
countDownLatch.await();
Integer consumedElement = sharedState.get();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
最後但並非最不重要的一點,讓我們開始我們的程序:
executor.execute(producer);
executor.execute(consumer);
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);
它將產生以下輸出:
Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point
我們可以看到,有很多代碼可以實現簡單的功能,例如在兩個線程之間交換元素。在下一節中,我們將嘗試使其更好。
4. 使用SynchronousQueue實現切換
現在,讓我們實現與上一節相同的功能,但要使用SynchronousQueue。它具有雙重作用,因為我們可以使用它來在線程之間交換狀態並協調該動作,因此我們不需要使用SynchronousQueue之外的任何東西。
首先,我們將定義一個隊列:
ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
生產者將調用put()方法,該方法將阻塞,直到其他線程從隊列中取出一個元素為止:
Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
try {
queue.put(producedElement);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
使用者將使用take()方法簡單地檢索該元素:
Runnable consumer = () -> {
try {
Integer consumedElement = queue.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};
接下來,我們將開始我們的程序:
executor.execute(producer);
executor.execute(consumer);
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);
它將產生以下輸出:
Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point
我們可以看到,將SynchronousQueue用作線程之間的交換點,這比上一個示例使用共享狀態和CountDownLatch更好,也更容易理解。
5.結論
在本快速教程中,我們研究了SynchronousQueue構造。我們創建了一個程序,該程序使用共享狀態在兩個線程之間交換數據,然後重寫該程序以利用SynchronousQueue構造。這用作協調生產者和使用者線程的交換點。
所有這些示例和代碼段的實現都可以在GitHub項目中找到–這是一個Maven項目,因此應該很容易直接導入和運行。