使用 Java JMS 讀取和寫入 IBM MQ 佇列
一、簡介
在本教程中,我們將學習如何使用 Java JMS(Java 訊息服務)從 IBM MQ 佇列讀取和寫入訊息。
2. 搭建環境
為了避免手動安裝和設定的複雜性,我們可以在 Docker 容器內執行 IBM MQ。我們可以使用以下命令來運行具有基本配置的容器:
docker run -d --name my-mq -e LICENSE=accept -e MQ_QMGR_NAME=QM1 MQ_QUEUE_NAME=QUEUE1 -p 1414:1414 -p 9443:9443 ibmcom/mq
接下來,我們需要在pom.xml
檔案中加入IBM MQ 客戶端:
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.4.0.0</version>
</dependency>
3. 設定JMS連接
首先,我們需要使用QueueConnectionFactory
設定 JMS 連接,該連接用於建立與佇列管理器的連接:
public class JMSSetup {
public QueueConnectionFactory createConnectionFactory() throws JMSException {
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setHostName("localhost");
factory.setPort(1414);
factory.setQueueManager("QM1");
factory.setChannel("SYSTEM.DEF.SVRCONN");
return factory;
}
}
我們首先建立MQQueueConnectionFactory
的實例,它用於配置和建立與 IBM MQ 伺服器的連線。我們將hostname
設定為localhost
因為 MQ 伺服器在 Docker 容器內本地運行。連接埠1414
從 Docker 容器映射到主機。
然後我們使用預設通道SYSTEM.DEF.SVRCONN
。這是客戶端連接到 IBM MQ 的公共通道。
4. 將訊息寫入 IBM MQ 佇列
在本節中,我們將完成向 IBM MQ 佇列傳送訊息的程序。
4.1.建立 JMS 連接
首先,我們先創建MessageSender
類別。此類別負責建立與 IBM MQ 伺服器的連線、管理會話以及處理訊息傳送操作。我們為QueueConnectionFactory
、 QueueConnection
、 QueueSession
和QueueSender
聲明實例變量,它們將用於與 IBM MQ 伺服器互動。
以下是 IBM MQ 連線設定、會話建立和訊息傳送的範例實作:
public class MessageSender {
private QueueConnectionFactory factory;
private QueueConnection connection;
private QueueSession session;
private QueueSender sender;
public MessageSender() throws JMSException {
factory = new JMSSetup().createConnectionFactory();
connection = factory.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("QUEUE1");
sender = session.createSender(queue);
connection.start();
}
// ...
}
接下來,在MessageSender
的建構子中,我們使用JMSSetup
類別初始化QueueConnectionFactory
。然後使用該factory
建立QueueConnection
。此連接允許我們與 IBM MQ 伺服器互動。
建立連線後,我們使用createQueueSession()
建立一個QueueSession
。該會話允許我們與隊列進行通訊。這裡我們傳遞false
來指示會話是非事務性的,並Session.AUTO_ACKNOWLEDGE
來在收到訊息時自動確認訊息。
之後,我們定義特定的隊列「 QUEUE1
」並建立一個QueueSender
來處理發送訊息。最後,我們啟動連線以確保會話處於活動狀態並準備好傳輸訊息。
4.2.寫簡訊
現在我們已經建立了連接,創建了會話,定義了隊列,並創建了訊息生產者,我們準備向隊列發送文字訊息:
public void sendMessage(String messageText) {
try {
TextMessage message = session.createTextMessage();
message.setText(messageText);
sender.send(message);
} catch (JMSException e) {
// handle exception
} finally {
// close resources
}
}
首先,我們建立一個帶有messageText
參數的sendMessage()
方法。 sendMessage()
方法負責傳送文字訊息給佇列。它會建立一個TextMessage
物件並使用setText()
方法設定訊息內容。
接下來,我們使用QueueSender
物件的send()
方法將訊息傳送到定義的佇列。這種設計可以實現高效率的訊息傳輸,因為只要MessageSender
物件存在,連線和會話就保持開啟狀態。
4.3.訊息類型
除了TextMessage
之外,IBM MQ 還支援各種其他滿足不同用例的訊息類型。例如,我們可以發送以下內容:
-
BytesMessage
:訊息以位元組形式保存原始二進位資料。 -
ObjectMessage
:訊息攜帶序列化的 Java 物件。 -
MapMessage
:包含鍵值對的訊息。 -
StreamMessage
:訊息包含原始資料類型的流。
5. 從 IBM MQ 佇列讀取訊息
現在我們已經向佇列發送了一條訊息,讓我們探討如何從佇列中讀取訊息。
5.1.建立 JMS 連線並建立會話
首先,我們需要建立連線並建立會話,類似於發送訊息時所做的操作。我們先建立一個MessageReceiver
類別。此類處理與 IBM MQ 伺服器的連接並設定訊息消費所需的元件:
public class MessageReceiver {
private QueueConnectionFactory factory;
private QueueConnection connection;
private QueueSession session;
private QueueReceiver receiver;
public MessageReceiver() throws JMSException {
factory = new JMSSetup().createConnectionFactory();
connection = factory.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("QUEUE1");
receiver = session.createReceiver(queue);
connection.start();
}
// ...
}
在此類中,我們首先建立一個QueueConnectionFactory
來設定與 IBM MQ 伺服器的連線。然後,我們使用此連接來建立一個QueueSession
,它允許我們與佇列互動。
最後,我們定義特定的隊列「 QUEUE1
」並建立一個QueueReceiver
來處理來自隊列的傳入訊息。
5.2.閱讀簡訊
一旦建立了連線、會話和接收器,我們就可以開始從佇列接收訊息。我們使用QueueReceiver
的receive()
方法從指定佇列中提取訊息:
public void receiveMessage() {
try {
Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
} else {
// ...
}
} catch (JMSException e) {
// handle exception
} finally {
// close resources
}
}
在receiveMessage()
方法中,我們使用receive()
函數等待佇列中的消息,逾時時間為1000
毫秒。收到訊息後,我們檢查它是否屬於TextMessage
類型。
如果是,我們可以使用getText()
方法檢索實際的訊息內容,該方法以字串形式傳回文字內容。
6. 訊息屬性和標頭
在本節中,我們將討論發送或接收訊息時可以使用的一些常用訊息屬性和標頭。
6.1.訊息屬性
訊息屬性可用於儲存和檢索訊息正文以外的附加資訊。這對於過濾訊息或向訊息添加上下文資料特別有用。以下是我們在發送訊息時設定自訂屬性的方法:
TextMessage message = session.createTextMessage();
message.setText(messageText);
message.setStringProperty("OrderID", "12345");
接下來,我們可以在收到訊息時檢索該屬性:
Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String orderID = message.getStringProperty("OrderID");
}
6.2.訊息頭
訊息標頭提供預定義字段,其中包括有關訊息的元資料。一些常用的訊息標頭包括:
-
JMSMessageID
:JMS 提供者指派給每個訊息的唯一識別碼。我們可以使用此 ID 來追蹤和記錄訊息。 -
JMSExpiration
:定義訊息過期時間(毫秒)。如果在此時間內未發送訊息,則該訊息將被丟棄。 -
JMSTimestamp
:訊息發送的時間。 -
JMSPriority
:訊息的優先順序。
讓我們看看如何在接收訊息時檢索訊息標頭:
Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String messageId = message.getJMSMessageID();
long timestamp = message.getJMSTimestamp();
long expiration = message.getJMSExpiration();
int priority = message.getJMSPriority();
}
7. 使用 Mockito 進行模擬測試
在本節中,我們將使用 Mockito 來模擬依賴項並驗證MessageSender
和MessageReceiver
類別的交互作用。我們首先使用@Mock
註解建立依賴項的模擬實例.
接下來,我們驗證sendMessage()
方法是否與模擬的QueueSender
正確交互作用。我們模擬QueueSender
的send()
方法,並驗證TextMessage
是否已正確建立:
String messageText = "Hello Baeldung! Nice to meet you!";
doNothing().when(sender).send(any(TextMessage.class));
messageSender.sendMessage(messageText);
verify(sender).send(any(TextMessage.class));
verify(textMessage).setText(messageText);
最後,我們驗證 receiveMessage() 方法是否與模擬的QueueReceiver
正確交互作用。我們模擬receive()
方法以返回預先定義的TextMessage
,並按我們的預期檢索訊息文字:
when(receiver.receive(anyLong())).thenReturn(textMessage);
when(textMessage.getText()).thenReturn("Hello Baeldung! Nice to meet you!");
messageReceiver.receiveMessage();
verify(textMessage).getText();
八、結論
在本文中,我們探討了設定 JMS 連線、會話和訊息產生器/接收器以與 IBM MQ 佇列互動的過程。我們也介紹了 IBM MQ 支援的幾種訊息類型。此外,我們還重點介紹了自訂屬性和標頭如何增強訊息處理。
像往常一樣,這裡討論的程式碼可以在 GitHub 上找到。