DelayQueue指南
1.概述
在本文中,我們將研究java.util.concurrent包中的*DelayQueue*構造。這是可以在生產者-消費者程序中使用的阻塞隊列。
它具有一個非常有用的特性–當消費者想要從隊列中取出一個元素時,只有在該特定元素的延遲到期後,他們才能使用它。
2.為DelayQueue中的元素實現Delayed
我們要放入DelayQueue中的每個元素都需要實現*Delayed接口。假設我們要創建一個DelayObject類。該類的實例將放入DelayQueue中。*
我們將String數據和delayInMilliseconds作為參數傳遞給其構造函數:
public class DelayObject implements Delayed {
private String data;
private long startTime;
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
我們正在定義一個startTime –這是應該從隊列中使用元素的時間。接下來,我們需要實現getDelay()方法-它應該以給定的時間單位返回與此對象關聯的剩餘延遲。
因此,我們需要使用*TimeUnit.convert()
方法返回適當的TimeUnit中的剩餘延遲:
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
當使用者嘗試從隊列中取出一個元素時, DelayQueue將執行getDelay()以確定是否允許從隊列中返回該元素。如果getDelay()方法將返回零或負數,則意味著可以從隊列中檢索它。
我們還需要實現compareTo()方法,因為DelayQueue中的元素將根據到期時間進行排序。首先到期的項目將保留在隊列的開頭,而到期時間最長的元素將保留在隊列的末尾:
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
3. DelayQueue客戶和生產者
為了能夠測試我們的DelayQueue,我們需要實現生產者和消費者邏輯。生產者類將隊列,要產生的元素數以及每條消息的延遲(以毫秒為單位)作為參數。
然後,當調用run()方法時,它將元素放入隊列,並在每次放置後休眠500毫秒:
public class DelayQueueProducer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToProduce;
private Integer delayOfEachProducedMessageMilliseconds;
// standard constructor
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
使用者實現非常相似,但是它也跟踪消耗的消息數量:
public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToTake;
public AtomicInteger numberOfConsumedElements = new AtomicInteger();
// standard constructors
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
*4. 測試DelayQueue*
為了測試DelayQueue的行為,我們將創建一個生產者線程和一個消費者線程。
生產者將兩個對象放置()到隊列中,延遲為500毫秒。該測試斷言使用者使用了兩條消息:
@Test
public void givenDelayQueue_whenProduceElement
_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
我們可以觀察到運行該程序將產生以下輸出:
Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
生產者放置該對象,一段時間後,將消耗延遲過期的第一個對象。
第二個元素發生了相同的情況。
5.消費者在指定時間內無法消費
假設我們有一個生產者正在生產將在10秒後失效的元素:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
我們將開始測試,但它將在5秒鐘後終止。由於DelayQueue的特性,使用方將無法使用隊列中的消息,因為該元素尚未過期:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
請注意,使用者的numberOfConsumedElements的值為零。
6.產生立即到期的元素
當Delayed message getDelay()方法的實現返回負數時,意味著給定的元素已經過期。在這種情況下,生產者將立即消耗該元素。
我們可以測試產生負延遲的元素的情況:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
當我們開始測試用例時,使用者將立即使用該元素,因為它已經過期:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
7.結論
在本文中,我們正在研究java.util.concurrent包中的DelayQueue構造。
我們實現了一個延遲元素,該元素是從隊列中產生和消耗的。
我們利用DelayQueue的實現來消耗已過期的元素。
所有這些示例和代碼段的實現都可以在GitHub項目(這是一個Maven項目)中找到,因此應該很容易直接導入和運行。