java.util.concurrent.BlockingQueue指南

1.概述

在本文中,我們將研究最有用的構造之一java.util.concurrent來解決並發的生產者-消費者問題。我們將研究*BlockingQueue*接口的API,以及該接口中的方法如何使編寫並發程序變得更加容易。

在本文的後面,我們將展示一個簡單程序的示例,該程序具有多個生產者線程和多個消費者線程。

2. BlockingQueue類型

我們可以區分兩種類型的BlockingQueue

  • 無限隊列–幾乎可以無限增長
  • 有界隊列–定義了最大容量

2.1。無限隊列

創建無限隊列很簡單:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();

blockingQueue的容量將設置為Integer.MAX_VALUE。將元素添加到無界隊列的所有操作都將永遠不會阻塞,因此它可能會變得很大。

使用無邊界BlockingQueue設計生產者-消費者程序時,最重要的事情是,消費者應該能夠像生產者將消息添加到隊列中一樣快地消費消息。否則,內存可能會填滿,我們將獲得OutOfMemory異常。

2.2。有界隊列

第二類隊列是有界隊列。我們可以通過將容量作為參數傳遞給構造函數來創建此類隊列:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

在這裡,我們有一個容量為10的blockingQueue 。這意味著當生產者嘗試將元素添加到已經滿的隊列中時,這取決於用來添加元素的方法( offer()add()put () ),它將阻塞直到可以插入對象的空間可用為止。否則,操作將失敗。

使用有界隊列是設計並發程序的好方法,因為當我們將元素插入到已經滿的隊列中時,該操作需要等到使用者趕上並在隊列中提供一些可用空間。它使我們在節流方面毫不費力。

3. BlockingQueue API

有兩種類型的BlockingQueue接口的方法*-*負責將元素添加到隊列和檢索這些元素的方法的方法。如果隊列已滿/為空,則這兩個組中的每種方法的行為都會有所不同。

3.1 添加元素

  • add()–如果插入成功,則返回true ,否則拋出IllegalStateException
  • put()–將指定的元素插入隊列,並在必要時等待可用插槽
  • offer()–如果插入成功,則返回true ,否則返回false
  • offer(E e,長超時,TimeUnit單位)–嘗試將元素插入隊列,並等待指定超時內的可用插槽

3.2 檢索元素

  • take() –等待隊列的head元素並將其刪除。如果隊列為空,它將阻塞並等待元素變為可用
  • poll(長超時,TimeUnit單位)–檢索並刪除隊列的開頭,如果有必要,直到指定的等待時間,元素才可用。超時後返回null

當構建生產者-消費者程序時,這些方法是BlockingQueue接口中最重要的構建塊。

4. 多線程生產者-消費者示例

讓我們創建一個包含兩部分的程序-生產者和消費者。

生產者將產生一個從0到100的隨機數,並將該數字放入BlockingQueue中。我們將有4個生產者線程,並使用put()方法進行阻塞,直到隊列中有可用空間為止。

要記住的重要一點是,我們需要停止使用者線程等待元素無限期地出現在隊列中。

從生產者向消費者發出不再需要處理的消息的一種好方法是發送一種稱為“毒丸”的特殊消息。我們需要發送盡可能多的有毒藥丸。然後,當消費者從隊列中獲取該特殊的毒丸消息時,它將正常完成執行。

讓我們看一個生產者類:

public class NumbersProducer implements Runnable {

 private BlockingQueue<Integer> numbersQueue;

 private final int poisonPill;

 private final int poisonPillPerProducer;



 public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {

 this.numbersQueue = numbersQueue;

 this.poisonPill = poisonPill;

 this.poisonPillPerProducer = poisonPillPerProducer;

 }

 public void run() {

 try {

 generateNumbers();

 } catch (InterruptedException e) {

 Thread.currentThread().interrupt();

 }

 }



 private void generateNumbers() throws InterruptedException {

 for (int i = 0; i < 100; i++) {

 numbersQueue.put(ThreadLocalRandom.current().nextInt(100));

 }

 for (int j = 0; j < poisonPillPerProducer; j++) {

 numbersQueue.put(poisonPill);

 }

 }

 }

我們的生產者構造函數將BlockingQueue作為參數,該變量用於協調生產者和使用者之間的處理。我們看到方法generateNumbers()會將100個元素放入隊列。它還需要毒藥消息,以知道執行完成時必須將哪種類型的消息放入隊列中。該消息需要放入poisonPillPerProducer時間隊列中。

每個使用者都將使用take()方法從BlockingQueue中獲取一個元素,因此它將阻塞直到隊列中有一個元素。從隊列中獲取整數後,它將檢查消息是否為毒藥,如果是,則線程執行完成。否則,它將在標準輸出上打印出結果以及當前線程的名稱。

這將使我們深入了解消費者的內部運作方式:

public class NumbersConsumer implements Runnable {

 private BlockingQueue<Integer> queue;

 private final int poisonPill;



 public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {

 this.queue = queue;

 this.poisonPill = poisonPill;

 }

 public void run() {

 try {

 while (true) {

 Integer number = queue.take();

 if (number.equals(poisonPill)) {

 return;

 }

 System.out.println(Thread.currentThread().getName() + " result: " + number);

 }

 } catch (InterruptedException e) {

 Thread.currentThread().interrupt();

 }

 }

 }

需要注意的重要事情是隊列的使用。與生產者構造函數相同,將隊列作為參數傳遞。我們之所以可以這樣做,是因為BlockingQueue可以在線程之間共享,而無需任何顯式同步。

現在我們有了生產者和消費者,就可以開始我們的程序了。我們需要定義隊列的容量,並將其設置為100個元素。

我們希望有4個生產者線程,而消費者線程的數量將等於可用處理器的數量:

int BOUND = 10;

 int N_PRODUCERS = 4;

 int N_CONSUMERS = Runtime.getRuntime().availableProcessors();

 int poisonPill = Integer.MAX_VALUE;

 int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;

 int mod = N_CONSUMERS % N_PRODUCERS;



 BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);



 for (int i = 1; i < N_PRODUCERS; i++) {

 new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();

 }



 for (int j = 0; j < N_CONSUMERS; j++) {

 new Thread(new NumbersConsumer(queue, poisonPill)).start();

 }



 new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();

BlockingQueue是使用具有容量的構造創建的。我們正在創建4個生產者和N個消費者。我們將毒藥消息指定為Integer.MAX_VALUE,因為生產者在正常工作條件下永遠不會發送該值。這裡要注意的最重要的事情是BlockingQueue用於協調它們之間的工作。

當我們運行程序時,將有4個生產者線程將隨機Integers放入BlockingQueue中,而消費者將從隊列中獲取這些元素。每個線程都會將線程的名稱和結果打印到標準輸出中。

5.結論

本文介紹了BlockingQueue的實際用法,並說明了用於添加和檢索其中的元素的方法。此外,我們還展示瞭如何使用BlockingQueue構建多線程的生產者-消費者程序,以協調生產者與消費者之間的工作。

所有這些示例和代碼片段的實現都可以在GitHub項目中找到–這是一個基於Maven的項目,因此應該很容易直接導入和運行。