DelayQueue指南

1.概述

在本文中,我們將研究java.util.concurrent包中的*DelayQueue*構造。這是可以在生產者-消費者程序中使用的阻塞隊列。

它具有一個非常有用的特性–當消費者想要從隊列中取出一個元素時,只有在該特定元素的延遲到期後,他們才能使用它。

2.為DelayQueue中的元素實現Delayed

我們要放入DelayQueue中的每個元素都需要實現*Delayed接口。假設我們要創建一個DelayObject類。該類的實例將放入DelayQueue中。*

我們將String數據和delayInMilliseconds作為參數傳遞給其構造函數:

public class DelayObject implements Delayed {

 private String data;

 private long startTime;



 public DelayObject(String data, long delayInMilliseconds) {

 this.data = data;

 this.startTime = System.currentTimeMillis() + delayInMilliseconds;

 }

我們正在定義一個startTime –這是應該從隊列中使用元素的時間。接下來,我們需要實現getDelay()方法-它應該以給定的時間單位返回與此對象關聯的剩餘延遲。

因此,我們需要使用*TimeUnit.convert()方法返回適當的TimeUnit中的剩餘延遲

@Override

 public long getDelay(TimeUnit unit) {

 long diff = startTime - System.currentTimeMillis();

 return unit.convert(diff, TimeUnit.MILLISECONDS);

 }

當使用者嘗試從隊列中取出一個元素時, DelayQueue將執行getDelay()以確定是否允許從隊列中返回該元素。如果getDelay()方法將返回零或負數,則意味著可以從隊列中檢索它。

我們還需要實現compareTo()方法,因為DelayQueue中的元素將根據到期時間進行排序。首先到期的項目將保留在隊列的開頭,而到期時間最長的元素將保留在隊列的末尾:

@Override

 public int compareTo(Delayed o) {

 return Ints.saturatedCast(

 this.startTime - ((DelayObject) o).startTime);

 }

3. DelayQueue客戶和生產者

為了能夠測試我們的DelayQueue,我們需要實現生產者和消費者邏輯。生產者類將隊列,要產生的元素數以及每條消息的延遲(以毫秒為單位)作為參數。

然後,當調用run()方法時,它將元素放入隊列,並在每次放置後休眠500毫秒:

public class DelayQueueProducer implements Runnable {



 private BlockingQueue<DelayObject> queue;

 private Integer numberOfElementsToProduce;

 private Integer delayOfEachProducedMessageMilliseconds;



 // standard constructor



 @Override

 public void run() {

 for (int i = 0; i < numberOfElementsToProduce; i++) {

 DelayObject object

 = new DelayObject(

 UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);

 System.out.println("Put object: " + object);

 try {

 queue.put(object);

 Thread.sleep(500);

 } catch (InterruptedException ie) {

 ie.printStackTrace();

 }

 }

 }

 }

使用者實現非常相似,但是它也跟踪消耗的消息數量:

public class DelayQueueConsumer implements Runnable {

 private BlockingQueue<DelayObject> queue;

 private Integer numberOfElementsToTake;

 public AtomicInteger numberOfConsumedElements = new AtomicInteger();



 // standard constructors



 @Override

 public void run() {

 for (int i = 0; i < numberOfElementsToTake; i++) {

 try {

 DelayObject object = queue.take();

 numberOfConsumedElements.incrementAndGet();

 System.out.println("Consumer take: " + object);

 } catch (InterruptedException e) {

 e.printStackTrace();

 }

 }

 }

 }

*4. 測試DelayQueue*

為了測試DelayQueue的行為我們將創建一個生產者線程和一個消費者線程。

生產者將兩個對象放置()到隊列中,延遲為500毫秒。該測試斷言使用者使用了兩條消息:

@Test

 public void givenDelayQueue_whenProduceElement

 _thenShouldConsumeAfterGivenDelay() throws InterruptedException {

 // given

 ExecutorService executor = Executors.newFixedThreadPool(2);



 BlockingQueue<DelayObject> queue = new DelayQueue<>();

 int numberOfElementsToProduce = 2;

 int delayOfEachProducedMessageMilliseconds = 500;

 DelayQueueConsumer consumer = new DelayQueueConsumer(

 queue, numberOfElementsToProduce);

 DelayQueueProducer producer = new DelayQueueProducer(

 queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);



 // when

 executor.submit(producer);

 executor.submit(consumer);



 // then

 executor.awaitTermination(5, TimeUnit.SECONDS);

 executor.shutdown();



 assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);

 }

我們可以觀察到運行該程序將產生以下輸出:

Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}

 Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}

 Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}

 Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}

生產者放置該對象,一段時間後,將消耗延遲過期的第一個對象。

第二個元素發生了相同的情況。

5.消費者在指定時間內無法消費

假設我們有一個生產者正在生產將在10秒後失效的元素:

int numberOfElementsToProduce = 1;

 int delayOfEachProducedMessageMilliseconds = 10_000;

 DelayQueueConsumer consumer = new DelayQueueConsumer(

 queue, numberOfElementsToProduce);

 DelayQueueProducer producer = new DelayQueueProducer(

 queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

我們將開始測試,但它將在5秒鐘後終止。由於DelayQueue的特性,使用將無法使用隊列中的消息,因為該元素尚未過期:

executor.submit(producer);

 executor.submit(consumer);



 executor.awaitTermination(5, TimeUnit.SECONDS);

 executor.shutdown();

 assertEquals(consumer.numberOfConsumedElements.get(), 0);

請注意,使用者的numberOfConsumedElements的值為零。

6.產生立即到期的元素

Delayed message getDelay()方法的實現返回負數時,意味著給定的元素已經過期。在這種情況下,生產者將立即消耗該元素。

我們可以測試產生負延遲的元素的情況:

int numberOfElementsToProduce = 1;

 int delayOfEachProducedMessageMilliseconds = -10_000;

 DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);

 DelayQueueProducer producer = new DelayQueueProducer(

 queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

當我們開始測試用例時,使用者將立即使用該元素,因為它已經過期:

executor.submit(producer);

 executor.submit(consumer);



 executor.awaitTermination(1, TimeUnit.SECONDS);

 executor.shutdown();

 assertEquals(consumer.numberOfConsumedElements.get(), 1);

7.結論

在本文中,我們正在研究java.util.concurrent包中的DelayQueue構造。

我們實現了一個延遲元素,該元素是從隊列中產生和消耗的。

我們利用DelayQueue的實現來消耗已過期的元素。

所有這些示例和代碼段的實現都可以在GitHub項目(這是一個Maven項目)中找到,因此應該很容易直接導入和運行。