SEDA 與 Spring 集成和 Apache Camel
一、簡介
SEDA,即 Staged Event-Driven Architecture,是 Matt Welsh 在其博士論文中提出的一種建築風格。 論文。它的主要優點是可擴展性、對高並發流量的支持和可維護性。
在本教程中,我們將使用 SEDA 來計算句子中的唯一詞,並使用兩個單獨的實現:Spring Integration 和 Apache Camel。
2. 色達
SEDA 解決了特定於在線服務的幾個非功能性要求:
- 高並發:架構必須支持盡可能多的並發請求。
- 動態內容:軟件系統必須經常支持複雜的業務用例,需要許多步驟來處理用戶請求和生成響應。
- 負載魯棒性:在線服務的用戶流量是不可預測的,架構需要優雅地處理流量的變化。
為了滿足這些要求, SEDA 將復雜的服務分解為事件驅動的階段。這些階段與隊列間接連接,因此可以彼此完全解耦。此外,每個階段都有一個擴展機制來應對其傳入的負載:
上圖來自 Matt Welsh 的論文,描述了使用 SEDA 實現的 Web 服務器的整體結構。每個矩形代表傳入 HTTP 請求的單個處理階段。這些階段可以獨立地從其傳入隊列中消費任務,進行一些處理或 I/O 工作,然後將消息傳遞給下一個隊列。
2.1。成分
為了更好地理解 SEDA 的組件,讓我們看一下 Matt Welsh 論文中的這張圖表如何顯示單個階段的內部工作原理:
正如我們所見,每個 SEDA 階段都有以下組件:
- 事件:**事件是包含階段執行其處理所需的任何數據的數據結構**。例如,對於 HTTP Web 服務器,事件可能包含用戶數據——例如主體、標頭和請求參數——以及基礎設施數據,例如用戶的 IP、請求時間戳等。
- 事件隊列:這包含舞台的傳入事件。
- 事件處理程序:事件處理程序是舞台的程序邏輯。這可能是一個簡單的路由階段,將數據從其事件隊列轉發到其他相關事件隊列,或者是一個更複雜的階段,以某種方式處理數據。事件處理程序可以單獨或批量讀取事件——後者在批處理有性能優勢時很有幫助,例如使用一個查詢更新多個數據庫記錄。
- 傳出事件:根據業務用例和流的整體結構,每個階段都可以將新事件發送到零個或多個事件隊列。創建和發送傳出消息是在事件處理程序方法中完成的。
- 線程池:線程是一種眾所周知的並發機制。在 SEDA 中,線程是針對每個階段進行本地化和定制的。換句話說,每個階段都維護一個線程池。因此,與每個請求一個線程的模型不同,每個用戶請求都由 SEDA 下的多個線程處理。該模型允許我們根據其複雜性獨立調整每個階段。
- 控制器:SEDA 控制器是管理資源消耗的任何機制,例如線程池大小、事件隊列大小、調度等。控制器負責 SEDA 的彈性行為。一個簡單的控制器可能會管理每個線程池中的活動線程數。更複雜的控制器可以實現複雜的性能調整算法,在運行時監控整個應用程序並調整各種參數。此外,控制器將性能調整邏輯與業務邏輯分離。這種關注點的分離使得我們的代碼更容易維護。
通過將所有這些組件組合在一起,SEDA 提供了一個強大的解決方案來處理高波動的流量負載。
3. 樣本問題
在接下來的部分中,我們將創建兩個使用 SEDA 解決相同問題的實現。
我們的示例問題很簡單:計算每個單詞在給定字符串中出現不區分大小寫的次數。
讓我們將單詞定義為不帶空格的字符序列,我們將忽略標點符號等其他復雜情況。我們的輸出將是一個映射,其中包含作為鍵的單詞和作為值的計數。例如,給定輸入“ My name is Hesam
”,輸出將是:
{
"my": 1,
"name": 1,
"is": 1,
"hesam": 1
}
3.1。使問題適應 SEDA
讓我們從 SEDA 階段的角度來看我們的問題。由於可擴展性是 SEDA 的核心目標,通常最好設計專注於特定操作的小階段,尤其是在我們有 I/O 密集型任務的情況下。此外,擁有小階段有助於我們更好地調整每個階段的規模。
為了解決我們的字數問題,我們可以通過以下階段實現解決方案:
現在我們已經有了舞台設計,讓我們在接下來的部分中使用兩種不同的企業集成技術來實現它。在此表中,我們可以預覽 SEDA 在我們的實現中將如何顯示:
SEDA 組件 | 彈簧集成 | 阿帕奇駱駝 |
---|---|---|
事件 |
| org.springframework.messaging.Message
| org.apache.camel.Exchange
|
|
事件隊列
|
org.springframework.integration.channel
| 由 URI 字符串定義的端點 |
|
事件處理程序
| 功能接口實例 | Camel 處理器、Camel 實用程序類和Function
s |
|
線程池
| TaskExecutor
的 Spring 抽象 | SEDA 端點中的開箱即用支持 |
4. 使用 Spring Integration 的解決方案
對於我們的第一個實現,我們將使用 Spring Integration。 Spring Integration 基於 Spring 模型構建以支持流行的企業集成模式。
Spring Integration 具有三個主要組件:
- 消息是包含標頭和正文的數據結構。
- 通道將消息從一個端點傳送到另一個端點。 Spring Integration 中有兩種渠道:
- 點對點:只有一個端點可以消費該通道中的消息。
- 發布訂閱:多個端點可以消費此通道中的消息。
- 端點將消息路由到執行某些業務邏輯的應用程序組件。 Spring Integration 中有多種端點,例如轉換器、路由器、服務激活器和過濾器。
讓我們看一下我們的 Spring Integration 解決方案的概述:
4.1。依賴項
讓我們開始為Spring Integration、 Spring Boot Test和Spring Integration Test添加依賴項:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
4.2.消息網關
消息傳遞網關是一種代理,它隱藏了向集成流發送消息的複雜性。讓我們為我們的 Spring 集成流程設置一個:
@MessagingGateway
public interface IncomingGateway {
@Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
public Map<String, Long> countWords(String input);
}
稍後,我們將能夠使用此網關方法來測試我們的整個流程:
incomingGateway.countWords("My name is Hesam");
Spring 將“My name is Hesam”
輸入包裝在org.springframework.messaging.Message
的一個實例中,並將其傳遞給receiveTextChannel
,然後從returnResponseChannel
給我們最終結果。
4.3.消息渠道
在本節中,我們將了解如何設置網關的初始消息通道receiveTextChannel
。
在 SEDA 下,通道需要通過關聯的線程池進行擴展,所以讓我們從創建線程池開始:
@Bean("receiveTextChannelThreadPool")
TaskExecutor receiveTextChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("receive-text-channel-thread-pool");
executor.initialize();
return executor;
}
接下來,我們將使用我們的線程池來創建我們的通道:
@Bean(name = "receiveTextChannel")
MessageChannel getReceiveTextChannel() {
return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)
.get();
}
MessageChannels
是一個 Spring Integration 類,可以幫助我們創建各種類型的通道。在這裡,我們使用executor()
方法創建一個ExecutorChannel
,這是一個由線程池管理的通道。
我們的其他通道和線程池的設置方式與上述相同。
4.4.接收文本階段
設置好我們的頻道後,我們就可以開始實施我們的階段了。讓我們創建我們的初始階段:
@Bean
IntegrationFlow receiveText() {
return IntegrationFlows.from(receiveTextChannel)
.channel(splitWordsChannel)
.get();
}
IntegrationFlows
是一個流暢的 Spring Integration API,用於創建IntegrationFlow
對象,代表我們流程的各個階段。 from()
方法配置我們舞台的傳入通道,而channel()
配置傳出通道。
在此示例中,我們的階段將網關的輸入消息傳遞給splitWordsChannel
。這個階段在生產應用程序中可能更複雜且 I/O 密集,從持久隊列或通過網絡讀取消息。
4.5.分詞階段
我們的下一個階段有一個單一的責任:將我們的輸入String
拆分為句子中單個單詞的String
數組:
@Bean
IntegrationFlow splitWords() {
return IntegrationFlows.from(splitWordsChannel)
.transform(splitWordsFunction)
.channel(toLowerCaseChannel)
.get();
}
除了我們之前使用的from()
和channel()
調用之外,這裡我們還使用了transform()
,它將提供的Function
應用於我們的輸入消息。我們的splitWordsFunction
實現非常簡單:
final Function<String, String[]> splitWordsFunction = sentence -> sentence.split(" ");
4.6.轉換為小寫階段
此階段將String
數組中的每個單詞轉換為小寫:
@Bean
IntegrationFlow toLowerCase() {
return IntegrationFlows.from(toLowerCaseChannel)
.split()
.transform(toLowerCase)
.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)
.outputProcessor(buildMessageWithListPayload))
.channel(countWordsChannel)
.get();
}
我們在這裡使用的第一個新的IntegrationFlows
方法是split()
。 split()
方法使用拆分器模式將輸入消息的每個元素作為單獨的消息發送到toLowerCase
。
我們看到的下一個新方法是aggregate()
,它實現了聚合器模式。**聚合器模式有兩個基本參數:**
- 發布策略,它決定了何時將消息組合成一條消息
- 處理器,它決定如何將消息組合成單個消息
我們的發布策略函數使用listSizeReached
,它告訴聚合器在收集到輸入數組的所有元素後開始聚合:
final ReleaseStrategy listSizeReached = r -> r.size() == r.getSequenceSize();
buildMessageWithListPayload
處理器然後將我們的小寫結果打包到List
:
final MessageGroupProcessor buildMessageWithListPayload = messageGroup ->
MessageBuilder.withPayload(messageGroup.streamMessages()
.map(Message::getPayload)
.toList())
.build();
4.7.數詞階段
我們的最後階段將我們的單詞計數打包到一個Map
中,其中鍵是來自原始輸入的單詞,值是每個單詞的出現次數:
@Bean
IntegrationFlow countWords() {
return IntegrationFlows.from(countWordsChannel)
.transform(convertArrayListToCountMap)
.channel(returnResponseChannel)
.get();
}
在這裡,我們使用convertArrayListToCountMap
函數將計數打包為Map
:
final Function<List<String>, Map<String, Long>> convertArrayListToCountMap = list -> list.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
4.8.測試我們的流程
我們可以將初始消息傳遞給我們的網關方法來測試我們的流程:
public class SpringIntegrationSedaIntegrationTest {
@Autowired
TestGateway testGateway;
@Test
void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
Map<String, Long> actual = testGateway.countWords("My name is Hesam");
Map<String, Long> expected = new HashMap<>();
expected.put("my", 1L);
expected.put("name", 1L);
expected.put("is", 1L);
expected.put("hesam", 1L);
assertEquals(expected, actual);
}
}
5. Apache Camel 的解決方案
Apache Camel 是一個流行且功能強大的開源集成框架。它基於四個主要概念:
- Camel 上下文:Camel 運行時將不同的部分粘在一起。
- 路由:路由決定了一條消息應該如何被處理以及它接下來應該去哪裡。
- 處理器:這些是各種企業集成模式的即用型實現。
- 組件:組件是通過 JMS、HTTP、文件 IO 等集成外部系統的擴展點。
Apache Camel 有一個專門用於 SEDA 功能的組件,使構建 SEDA 應用程序變得簡單。
5.1。依賴項
讓我們為Apache Camel和Apache Camel Test添加所需的 Maven 依賴項:
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>3.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-junit5</artifactId>
<version>3.18.0</version>
<scope>test</scope>
</dependency>
</dependencies>
5.2.定義 SEDA 端點
首先,我們需要定義端點。端點是使用 URI 字符串定義的組件。 SEDA 端點必須以“ seda:[endpointName]
”開頭:
static final String receiveTextUri = "seda:receiveText?concurrentConsumers=5";
static final String splitWordsUri = "seda:splitWords?concurrentConsumers=5";
static final String toLowerCaseUri = "seda:toLowerCase?concurrentConsumers=5";
static final String countWordsUri = "seda:countWords?concurrentConsumers=5";
static final String returnResponse = "mock:result";
正如我們所看到的,每個端點都配置為有五個並發消費者。這相當於每個端點最多有 5 個線程。
為了測試, returnResponse
是一個模擬端點。
5.3.擴展RouteBuilder
接下來,讓我們定義一個擴展 Apache Camel 的RouteBuilder
並覆蓋其 configure() 方法的類。此類連接所有 SEDA 端點:
public class WordCountRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
}
}
在接下來的部分中,我們將使用我們從RouteBuilder
繼承的便捷方法向該configure()
方法添加行來定義我們的階段。
5.4.接收文本階段
此階段從 SEDA 端點接收消息並將它們路由到下一階段,無需任何處理:
from(receiveTextUri).to(splitWordsUri);
在這裡,我們使用繼承from()
方法指定傳入端點,並to()
設置傳出端點。
5.5.分詞階段
讓我們實現將輸入文本拆分為單個單詞的階段:
from(splitWordsUri)
.transform(ExpressionBuilder.bodyExpression(s -> s.toString().split(" ")))
.to(toLowerCaseUri);
transform()
方法將我們的Function
應用於我們的輸入消息,將其拆分為一個數組。
5.6.轉換為小寫階段
我們的下一個任務是將輸入中的每個單詞轉換為小寫。因為我們需要將轉換函數應用於消息中的每個String
與數組本身,所以我們將使用split()
方法來拆分輸入消息以進行處理,然後將結果聚合回ArrayList
:
from(toLowerCaseUri)
.split(body(), new ArrayListAggregationStrategy())
.transform(ExpressionBuilder.bodyExpression(body -> body.toString().toLowerCase()))
.end()
.to(countWordsUri);
end()
方法標誌著拆分過程的結束。一旦列表中的每個項目都被轉換,Apache Camel 就會應用我們指定的聚合策略ArrayListAggregationStrategy
。
ArrayListAggregationStrategy
擴展了 Apache Camel 的AbstractListAggregationStrategy
來定義應該聚合消息的哪一部分。在這種情況下,消息正文是新小寫的單詞:
class ArrayListAggregationStrategy extends AbstractListAggregationStrategy<String> {
@Override
public String getValue(Exchange exchange) {
return exchange.getIn()
.getBody(String.class);
}
}
5.7.數詞階段
最後一個階段使用轉換器將數組轉換為單詞到單詞計數的映射:
from(countWordsUri)
.transform(ExpressionBuilder.bodyExpression(List.class, body -> body.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))))
.to(returnResponse);
5.8.測試我們的路線
讓我們測試一下我們的路線:
public class ApacheCamelSedaIntegrationTest extends CamelTestSupport {
@Test
public void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap()
throws InterruptedException {
Map<String, Long> expected = new HashMap<>();
expected.put("my", 1L);
expected.put("name", 1L);
expected.put("is", 1L);
expected.put("hesam", 1L);
getMockEndpoint(WordCountRoute.returnResponse).expectedBodiesReceived(expected);
template.sendBody(WordCountRoute.receiveTextUri, "My name is Hesam");
assertMockEndpointsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
RoutesBuilder wordCountRoute = new WordCountRoute();
return wordCountRoute;
}
}
CamelTestSupport
超類提供了許多字段和方法來幫助我們測試我們的流程。我們使用getMockEndpoint()
和expectedBodiesReceived()
來設置我們的預期結果,並template.sendBody()
將測試數據提交到我們的模擬端點。最後,我們使用assertMockEndpointsSatisfied()
來測試我們的期望是否與實際結果相符。
六,結論
在本文中,我們了解了 SEDA 及其組件和用例。之後,我們探索瞭如何使用 SEDA 解決相同的問題,首先使用 Spring Integration,然後使用 Apache Camel。
與往常一樣,示例的源代碼可在 GitHub 上獲得。