Java PriorityBlockingQueue指南

1.簡介

在本文中,我們將重點介紹*PriorityBlockingQueue*類,並介紹一些實際示例。

從我們已經知道*Queue是什麼的假設開始,我們將首先演示**PriorityBlockingQueue*中的元素如何按priority排序**。

接下來,我們將演示如何使用這種類型的隊列來阻塞線程。

最後,我們將展示在跨多個線程處理數據時如何將這兩個功能一起使用。

2.元素優先排序

與標準隊列不同,您不能僅將任何類型的元素添加到PriorityBlockingQueue。有兩種選擇:

  1. 添加實現*Comparable的*元素
  2. 在您還提供*Comparator的情況下,添加未實現Comparable的*元素

通過使用ComparatorComparable實現比較元素,將始終對PriorityBlockingQueue進行排序。

目的是以始終優先排序最高優先級元素的方式實現比較邏輯。然後,當我們從隊列中刪除一個元素時,該元素將始終是優先級最高的元素。

首先,讓我們在單個線程中使用隊列,而不是在多個線程中使用隊列。通過這樣做,可以輕鬆證明單元測試中元素的排序方式:

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

 ArrayList<Integer> polledElements = new ArrayList<>();



 queue.add(1);

 queue.add(5);

 queue.add(2);

 queue.add(3);

 queue.add(4);



 queue.drainTo(polledElements);



 assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);

我們可以看到,儘管以隨機順序將元素添加到隊列中,但是當我們開始輪詢它們時,它們將被排序。這是因為*Integer類實現了Comparable,*而後者又將用於確保我們以升序從隊列中取出它們。

還值得注意的是,當兩個元素進行比較並且相同時,不能保證它們將如何排序。

3.使用隊列阻塞

如果要處理標準隊列,則將調用poll()來檢索元素。但是,如果隊列為空,則對poll()的調用將返回null。

PriorityBlockingQueue實現了BlockingQueue接口,該接口為我們提供了一些額外的方法,使我們可以在從空隊列中刪除時進行阻塞。讓我們嘗試使用take()方法,它應該完全做到這一點:

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();



 new Thread(() -> {

 System.out.println("Polling...");



 try {

 Integer poll = queue.take();

 System.out.println("Polled: " + poll);

 } catch (InterruptedException e) {

 e.printStackTrace();

 }

 }).start();



 Thread.sleep(TimeUnit.SECONDS.toMillis(5));

 System.out.println("Adding to queue");

 queue.add(1);

儘管使用sleep()是演示事情的一種較脆的方法,但是當我們運行此代碼時,我們將看到:

Polling...

 Adding to queue

 Polled: 1

這證明take()在添加項目之前一直處於阻塞狀態:

  1. 該線程將打印“ Polling”以證明它已啟動
  2. 然後測試將暫停約五秒鐘,以證明該線程此時必須已調用take()
  3. 我們添加到隊列中,應該或多或少立即看到“ Polled:1”,以證明take()一旦可用就返回了一個元素

還值得一提的是, BlockingQueue接口還為我們提供了添加到完整隊列時進行阻塞的方式。

但是, PriorityBlockingQueue是不受限制的。這意味著它將永遠不會充滿,因此始終可以添加新元素。

4.一起使用阻塞和優先級

現在,我們已經解釋了PriorityBlockingQueue的兩個關鍵概念讓我們一起使用它們。我們可以簡單地擴展前面的示例,但是這次將更多元素添加到隊列中:

Thread thread = new Thread(() -> {

 System.out.println("Polling...");

 while (true) {

 try {

 Integer poll = queue.take();

 System.out.println("Polled: " + poll);

 }

 catch (InterruptedException e) {

 e.printStackTrace();

 }

 }

 });



 thread.start();



 Thread.sleep(TimeUnit.SECONDS.toMillis(5));

 System.out.println("Adding to queue");



 queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));

 Thread.sleep(TimeUnit.SECONDS.toMillis(1));

再次說明,儘管由於使用sleep()有點脆弱但它仍然向我們展示了一個有效的用例。現在,我們有了一個阻塞的隊列,等待添加元素。然後,我們一次添加許多元素,然後顯示它們將按優先級順序處理。輸出將如下所示:

Polling...

 Adding to queue

 Polled: 1

 Polled: 1

 Polled: 2

 Polled: 5

 Polled: 6

 Polled: 6

 Polled: 7

5.結論

在本指南中,我們演示瞭如何使用PriorityBlockingQueue來阻塞線程,直到向其中添加了一些項目,並且我們還能夠根據其優先級來處理這些項目。

這些示例的實現可以在GitHub上找到。這是一個基於Maven的項目,因此應易於按原樣運行。