Java TransferQueue指南
1.概述
在本文中,我們將研究標準java.util.concurrent包中的*TransferQueue*構造。
簡而言之,該隊列使我們能夠根據生產者-消費者模式創建程序,並協調從生產者到消費者的消息傳遞。
該實現實際上類似於BlockingQueue-但為我們提供了實現某種形式的背壓的新功能。這意味著,當生產者使用transfer()方法將消息發送給消費者時,生產者將保持阻塞狀態,直到消息被消耗為止。
2.一個生產者–零消費者
讓我們從TransferQueue中測試一個transfer()方法-預期的行為是生產者將被阻塞,直到使用者使用take()方法從隊列接收消息為止。
為此,我們將創建一個程序,其中有一個生產者,但零消費者。由於我們沒有任何使用者可從隊列中獲取該元素,因此對生產者線程的transfer()的首次調用將無限期阻塞。
讓我們看看Producer類的樣子:
class Producer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private Integer numberOfMessagesToProduce;
public AtomicInteger numberOfProducedMessages
= new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToProduce; i++) {
try {
boolean added
= transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
if(added){
numberOfProducedMessages.incrementAndGet();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// standard constructors
}
我們正在將TransferQueue的一個實例與要提供給生產者的名稱以及應該轉移到隊列中的元素數一起傳遞給構造函數。
請注意,我們使用帶有給定超時時間的tryTransfer()方法。我們正在等待四秒鐘,如果生產者無法在給定的超時時間內傳輸消息,則它返回false並繼續處理下一條消息。生產者有一個numberOfProducedMessages變量來跟踪產生了多少消息。
接下來,讓我們看一下Consumer類:
class Consumer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private int numberOfMessagesToConsume;
public AtomicInteger numberOfConsumedMessages
= new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToConsume; i++) {
try {
String element = transferQueue.take();
longProcessing(element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(String element)
throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(500);
}
// standard constructors
}
它類似於生產者,但是我們使用take()方法從隊列中接收元素。我們還使用longProcessing()方法來模擬一些長時間運行的操作,在該方法中,我們將遞增numberOfConsumedMessages變量,該變量是接收到的消息的計數器。
現在,讓我們從一個生產者開始我們的程序:
@Test
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
// when
exService.execute(producer);
// then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer.numberOfProducedMessages.intValue(), 0);
}
我們希望將三個元素發送到隊列,但是生產者在第一個元素上被阻塞,並且沒有使用者從隊列中獲取該元素。我們正在使用tryTransfer()方法 它將阻塞,直到消息被消耗或超時為止。超時後,它將返回false以指示傳輸已失敗,並且它將嘗試傳輸下一個傳輸。這是上一個示例的輸出:
Producer: 1 is waiting to transfer...
can not add an element due to the timeout
Producer: 1 is waiting to transfer...
3.一位生產者–一位消費者
讓我們測試一下有一個生產者和一個消費者的情況:
@Test
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
Consumer consumer = new Consumer(transferQueue, "1", 3);
// when
exService.execute(producer);
exService.execute(consumer);
// then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer.numberOfProducedMessages.intValue(), 3);
assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
}
TransferQueue用作交換點,並且在使用者使用隊列中的一個元素之前,生產者無法繼續向其添加另一個元素。讓我們看一下程序輸出:
Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 1 received element: A2
我們看到,由於TransferQueue的規範,從隊列中生成和使用元素是順序的。
4. 多個生產者–多個消費者
在最後一個示例中,我們將考慮擁有多個消費者和多個生產者:
@Test
public void whenMultipleConsumersAndProducers_thenProcessAllMessages()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(3);
Producer producer1 = new Producer(transferQueue, "1", 3);
Producer producer2 = new Producer(transferQueue, "2", 3);
Consumer consumer1 = new Consumer(transferQueue, "1", 3);
Consumer consumer2 = new Consumer(transferQueue, "2", 3);
// when
exService.execute(producer1);
exService.execute(producer2);
exService.execute(consumer1);
exService.execute(consumer2);
// then
exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
}
在此示例中,我們有兩個消費者和兩個生產者。程序啟動時,我們看到兩個生產者都可以生產一個元素,然後,它們將阻塞,直到其中一個消費者從隊列中獲取該元素:
Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 2 is waiting to transfer...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 2 transferred element: A0
Producer: 2 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A1
Producer: 2 is waiting to transfer...
Consumer: 2 received element: A1
Consumer: 2 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 2 received element: A2
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A2
Consumer: 2 received element: A2
5.結論
在本文中,我們正在研究java.util.concurrent包中的TransferQueue構造。
我們看到瞭如何使用該構造實現生產者-消費者程序。我們使用了transfer()方法來創建某種形式的背壓,其中生產者在消費者從隊列中檢索元素之前,無法發布其他元素。
當我們不希望生產過多的生產者將消息氾濫到隊列,從而導致OutOfMemory錯誤時, TransferQueue可能會非常有用。在這種設計中,消費者將決定生產者產生消息的速度。
所有這些示例和代碼片段都可以在GitHub上找到-這是一個Maven項目,因此應易於導入和運行。