Spring Cloud Stream簡介
1.概述
Spring Cloud Stream是基於Spring Boot和Spring Integration構建的框架,可幫助創建事件驅動或消息驅動的微服務。
在本文中,我們將通過一些簡單的示例介紹Spring Cloud Stream的概念和構造。
2. Maven依賴
首先,我們需要將帶有代理RabbitMQ Maven依賴關係的Spring Cloud Starter Stream作為messaging-middleware添加到我們的pom.xml
:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
而且,我們將從Maven Central添加模塊依賴性,以啟用JUnit支持:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<version>1.3.0.RELEASE</version>
<scope>test</scope>
</dependency>
3.主要概念
微服務架構遵循“智能端點和啞管道”原則。端點之間的通信由RabbitMQ或Apache Kafka之類的消息傳遞中間件各方驅動。服務通過這些端點或通道發布域事件進行通信。
讓我們遍歷組成Spring Cloud Stream框架的概念,以及構建消息驅動的服務必須了解的基本範例。
3.1。創建Stream服務
讓我們看一下Spring Cloud Stream中的一個簡單服務,該服務偵聽input
綁定並發送對output
綁定的響應:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
public static void main(String[] args) {
SpringApplication.run(MyLoggerServiceApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public LogMessage enrichLogMessage(LogMessage log) {
return new LogMessage(String.format("[1]: %s", log.getMessage()));
}
}
註釋@EnableBinding
將應用程序配置為綁定接口Processor
定義的通道INPUT
和OUTPUT
。這兩個通道都是綁定,可以配置為使用具體的消息傳遞中間件或綁定程序。
讓我們看一下所有這些概念的定義:
-
Bindings
-聲明式標識輸入和輸出通道的接口的集合 -
Binder
—消息傳遞中間件的實現,例如Kafka或RabbitMQ -
Channel
—表示消息傳遞中間件和應用程序之間的通信管道 -
StreamListeners
—StreamListeners
消息處理方法,在MessageConverter
在特定於中間件的事件與域對像類型/ POJO之間進行序列化/反序列化之後,將從通道上的消息中自動調用該方法 -
Message Schemas
-用於序列化和消息的反序列化,這些架構可以靜態地從一位置讀取或動態加載,支持域對象類型的進化
3.2。溝通方式
指定給目標的消息是通過“ Publish-Subscribe
消息傳遞模式傳遞的。發布者將消息分類為主題,每個主題均由名稱標識。訂戶表達對一個或多個主題的興趣。中間件過濾消息,將有趣主題的消息傳遞給訂戶。
現在,可以將訂戶分組。 consumer group
是一組由group id
標識的訂戶或使用者,來自主題或主題分區的消息以負載平衡的方式傳遞。
4.編程模型
本節描述構建Spring Cloud Stream應用程序的基礎知識。
4.1。功能測試
測試支持是一個活頁夾實現,允許與渠道進行交互並檢查消息。
讓我們發送一條消息到上面的enrichLogMessage
服務,並檢查響應是否在消息的開頭包含文本“[1]: “
:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {
@Autowired
private Processor pipe;
@Autowired
private MessageCollector messageCollector;
@Test
public void whenSendMessage_thenResponseShouldUpdateText() {
pipe.input()
.send(MessageBuilder.withPayload(new LogMessage("This is my message"))
.build());
Object payload = messageCollector.forChannel(pipe.output())
.poll()
.getPayload();
assertEquals("[1]: This is my message", payload.toString());
}
}
4.2。自定義渠道
在上面的示例中,我們使用了Spring Cloud提供的Processor
接口,該接口只有一個輸入和一個輸出通道。
如果我們需要不同的東西,例如一個輸入和兩個輸出通道,則可以創建一個自定義處理器:
public interface MyProcessor {
String INPUT = "myInput";
@Input
SubscribableChannel myInput();
@Output("myOutput")
MessageChannel anOutput();
@Output
MessageChannel anotherOutput();
}
Spring將為我們提供此接口的正確實現。可以使用@Output(“myOutput”)
註釋來設置通道名稱。
否則,Spring將使用方法名稱作為通道名稱。因此,我們有三個通道,分別稱為myInput
, myOutput
和anotherOutput
。
現在,讓我們想像一下,如果值小於10,我們希望將消息路由到一個輸出,而值大於或等於10,則將消息路由到另一個輸出:
@Autowired
private MyProcessor processor;
@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
if (val < 10) {
processor.anOutput().send(message(val));
} else {
processor.anotherOutput().send(message(val));
}
}
private static final <T> Message<T> message(T val) {
return MessageBuilder.withPayload(val).build();
}
4.3。有條件的調度
使用@StreamListener
批註,我們還可以使用我們使用SpEL表達式定義的任何條件來過濾消費者中期望的消息。
例如,我們可以使用條件分派作為將消息路由到不同輸出的另一種方法:
@Autowired
private MyProcessor processor;
@StreamListener(
target = MyProcessor.INPUT,
condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
processor.anOutput().send(message(val));
}
@StreamListener(
target = MyProcessor.INPUT,
condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
processor.anotherOutput().send(message(val));
}
這種方法的唯一局限性是這些方法不得返回值。
5.設置應用程序
讓我們設置將處理來自RabbitMQ代理的消息的應用程序。
5.1。活頁夾配置
我們可以通過META-INF/spring.binders
將應用程序配置為使用默認的活頁夾實現:
rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration
或者,我們可以通過包含[this dependency](https://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22org.springframework.cloud%22%20AND%20a%3A%22spring-cloud-stream-binder-rabbit%22)
項將RabbitMQ的綁定程序庫添加到類路徑中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
如果沒有提供活頁夾實現,Spring將在通道之間使用直接消息通信。
5.2。 RabbitMQ配置
要配置第3.1節中的示例以使用RabbitMQ綁定器,我們需要更新位於src/main/resources
的application.yml
:
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
output:
destination: queue.pretty.log.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: <host>
port: 5672
username: <username>
password: <password>
virtual-host: /
input
綁定將使用名為queue.log.messages
的交換,而output
綁定將使用交換queue.pretty.log.messages
。這兩個綁定都將使用名為local_rabbit
綁定local_rabbit
。
請注意,我們不需要提前創建RabbitMQ交換或隊列。運行該應用程序時,將自動創建兩個交換。
為了測試該應用程序,我們可以使用RabbitMQ管理站點來發布消息。在交換queue.log.messages
的“ Publish Message
面板中,我們需要以JSON格式輸入請求。
5.3。自定義消息轉換
Spring Cloud Stream允許我們將消息轉換應用於特定的內容類型。在上面的示例中,我們要提供純文本,而不是使用JSON格式。
為此,我們將使用MessageConverter
將自定義轉換應用於LogMessage
:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
//...
@Bean
public MessageConverter providesTextPlainMessageConverter() {
return new TextPlainMessageConverter();
}
//...
}
public class TextPlainMessageConverter extends AbstractMessageConverter {
public TextPlainMessageConverter() {
super(new MimeType("text", "plain"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (LogMessage.class == clazz);
}
@Override
protected Object convertFromInternal(Message<?> message,
Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
String text = payload instanceof String
? (String) payload
: new String((byte[]) payload);
return new LogMessage(text);
}
}
應用這些更改後,返回到“ Publish Message
面板,如果將標頭“ contentTypes
”設置為“ text/plain
”,而有效載荷設置為“ Hello World
”,則它應該像以前一樣工作。
5.4。消費群體
當運行我們的應用程序的多個實例時,每當輸入通道中有一條新消息時,都會通知所有訂閱者。
大多數時候,我們只需要處理一次消息。 Spring Cloud Stream通過使用者組實現此行為。
要啟用此行為,每個使用者綁定都可以使用spring.cloud.stream.bindings.<CHANNEL>.group
屬性指定組名:
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
group: logMessageConsumers
...
6.消息驅動的微服務
在本部分中,我們介紹了在微服務環境中運行Spring Cloud Stream應用程序所需的所有必需功能。
6.1。擴大
當運行多個應用程序時,重要的是要確保將數據正確分配給各個使用者。為此,Spring Cloud Stream提供了兩個屬性:
- **
spring.cloud.stream.instanceCount
**正在運行的應用程序數 -
spring.cloud.stream.instanceIndex
—當前應用程序的索引
例如,如果我們已經部署了上述MyLoggerServiceApplication
應用程序的兩個實例,則兩個應用程序的spring.cloud.stream.instanceCount
屬性應為2, spring.cloud.stream.instanceIndex
屬性的屬性應分別為0和1。
如果我們按照本文所述使用Spring Data Flow部署Spring Cloud Stream應用程序,則會自動設置這些屬性。
6.2。分區
域事件可以是Partitioned
消息。這在我們擴大存儲規模並提高應用程序性能時會有所幫助。
域事件通常具有分區鍵,因此它與相關消息一起位於同一分區中。
假設我們希望將日誌消息按消息中的第一個字母(即分區鍵)進行分區,並分為兩個分區。
對於以AM
開頭的日誌消息,將有一個分區,對於NZ.
,將有另一個分區NZ.
可以使用兩個屬性進行配置:
-
spring.cloud.stream.bindings.output.producer.partitionKeyExpression
—對有效負載進行分區的表達式 -
spring.cloud.stream.bindings.output.producer.partitionCount
—組數
有時,要分區的表達式過於復雜,無法僅在一行中編寫。對於這些情況,我們可以使用屬性spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass
編寫我們的自定義分區策略。
6.3。健康指標
在微服務環境中,我們還需要檢測服務何時關閉或開始出現故障。 Spring Cloud Stream提供了屬性management.health.binders.enabled
來啟用活頁夾的運行狀況指示器。
運行應用程序時,我們可以在http://<host>:<port>/health
查詢運行http://<host>:<port>/health
。
7.結論
在本教程中,我們介紹了Spring Cloud Stream的主要概念,並通過RabbitMQ上的一些簡單示例展示瞭如何使用它。關於Spring Cloud Stream的更多信息可以在這裡找到。
可以在GitHub上找到本文的源代碼。