Spring Cloud數據流的流處理入門
1.簡介
Spring Cloud Data Flow
是可組合數據微服務的雲原生編程和操作模型。
借助Spring Cloud Data Flow
,開發人員可以為常見的用例(例如數據提取,實時分析和數據導入/導出)創建和編排數據管道。
該數據管道有兩種形式,即流數據管道和批處理數據管道。
在第一種情況下,通過消息傳遞中間件消耗或產生了無限制的數據量。在第二種情況下,短期任務將處理一組有限的數據,然後終止。
本文將重點介紹流處理。
2. 架構概述
這些類型的體系結構的關鍵組件是Applications
, Data Flow Server
和目標運行時。
除了這些關鍵組件之外,我們在體系結構中通常還具有Data Flow Shell
和message broker
。
讓我們更詳細地查看所有這些組件。
2.1。應用領域
通常,流數據管道包括使用來自外部系統的事件,數據處理和多語言持久性。這些階段在Spring Cloud
術語中通常稱為Source
, Processor
和Sink
:
- 來源:是使用事件的應用程序
- 處理器:從
Source
消費數據,對其進行一些處理,然後將處理後的數據發送到管道中的下一個應用程序 - 接收器:從
Source
或Processor
進行消耗,並將數據寫入所需的持久層
這些應用程序可以通過兩種方式打包:
- 在Maven存儲庫,文件,http或任何其他Spring資源實現中託管的Spring Boot uber-jar(本文將使用此方法)
- 碼頭工人
Spring Cloud Data Flow
團隊已經提供了許多通用用例的源,處理器和接收器應用程序(例如jdbc,hdfs,http,路由器),並可供使用。
2.2。運行
此外,需要運行時才能使這些應用程序執行。支持的運行時為:
- Cloud Foundry
- Apache YARN
- Kubernetes
- Apache Mesos
- 用於開發的本地服務器(本文將使用)
2.3。數據流服務器
負責將應用程序部署到運行時的組件是Data Flow Server
。每個目標運行時都有一個Data Flow Server
可執行jar。
Data Flow Server
負責解釋:
- 流DSL,它描述通過多個應用程序的邏輯數據流。
- 部署清單,描述應用程序到運行時的映射。
2.4。數據流Shell
數據流外殼程序是數據流服務器的客戶端。該外殼程序使我們能夠執行與服務器交互所需的DSL命令。
例如,描述從http源到jdbc接收器的數據流的DSL將寫為“ http | jdbc”。 DSL中的這些名稱已在Data Flow Server
中註冊,並映射到可以託管在Maven或Docker存儲庫中的應用程序工件。
Spring還提供了一個名為Flo
的圖形界面,用於創建和監視流數據管道。但是,其用法不在本文討論範圍之內。
2.5。消息代理
正如在上一節的示例中所看到的,我們已經在數據流的定義中使用了管道符號。管道符號表示兩個應用程序之間通過消息傳遞中間件的通信。
這意味著我們需要在目標環境中啟動並運行消息代理。
支持的兩個消息傳遞中間件代理是:
- Apache Kafka
- RabbitMQ
因此,現在我們已經對架構組件進行了概述–是時候構建我們的第一個流處理管道了。
3.安裝Message Broker
如我們所見,管道中的應用程序需要消息中間件進行通信。就本文而言,我們將使用RabbitMQ
。
有關安裝的完整詳細信息,您可以按照官方網站上的說明進行操作。
4.本地數據流服務器
為了加快生成應用程序的過程,我們將使用Spring Initializr ;有了它的幫助,我們可以在幾分鐘內獲得我們的Spring Boot
應用程序。
進入網站後,只需選擇一個Group
和一個Artifact
名稱。
完成此操作後,單擊按鈕Generate Project
來開始Maven工件的下載。
下載完成後,解壓縮該項目並將其作為Maven項目導入您選擇的IDE中。
讓我們向項目添加一個Maven依賴項。由於我們需要Dataflow Local Server
庫,因此我們添加spring-cloud-starter-dataflow-server-local依賴項:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
</dependency>
現在我們需要使用@EnableDataFlowServer
註釋對Spring Boot
主類進行註釋:
@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowServerApplication.class, args);
}
}
就這樣。我們的Local Data Flow Server
可以執行了:
mvn spring-boot:run
該應用程序將在端口9393上啟動。
5.數據流Shell
同樣,轉到Spring Initializr,然後選擇一個Group
and Artifact
名稱。
下載並導入項目後,讓我們添加spring-cloud-dataflow-shell依賴項:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-shell</artifactId>
</dependency>
現在我們需要在Spring Boot
主類中添加@EnableDataFlowShell
批註:
@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
public static void main(String[] args) {
SpringApplication.run(SpringDataFlowShellApplication.class, args);
}
}
我們現在可以運行外殼程序:
mvn spring-boot:run
Shell運行之後,我們可以在提示符下鍵入help
命令,以查看我們可以執行的命令的完整列表。
6.源應用程序
同樣,在Initializr上,我們現在將創建一個簡單的應用程序,並添加一個稱為spring-cloud-starter-stream-rabbit的Stream Rabbit
依賴項:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
然後,我們將@EnableBinding(Source.class)
批註添加到Spring Boot
主類中:
@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeSourceApplication.class, args);
}
}
現在我們需要定義必須處理的數據源。該來源可能是任何潛在的無盡工作負載(物聯網傳感器數據,24/7事件處理,在線交易數據提取)。
在我們的示例應用程序中,我們使用Poller
每10秒產生一個事件(為簡單起見,新的時間戳記)。
@InboundChannelAdapter
批註使用返回值作為消息的有效負載將消息發送到源的輸出通道:
@Bean
@InboundChannelAdapter(
value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}
我們的數據源已準備就緒。
7.處理器應用程序
接下來,我們將創建一個應用程序並添加Stream Rabbit
依賴項。
然後,我們將@EnableBinding(Processor.class)
批註添加到Spring Boot
主類中:
@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeProcessorApplication.class, args);
}
}
接下來,我們需要定義一種方法來處理來自源應用程序的數據。
要定義一個轉換器,我們需要使用@Transformer
註釋來註釋此方法:
@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
String date = dateFormat.format(timestamp);
return date;
}
它將時間戳從“輸入”通道轉換為格式化日期,該日期將發送到“輸出”通道。
8.接收器應用程序
創建的最後一個應用程序是Sink應用程序。
同樣,轉到Spring Initializr並選擇一個Group
,一個Artifact
名稱。下載項目後,讓我們添加Stream Rabbit
依賴項。
然後將@EnableBinding(Sink.class)
批註添加到Spring Boot
主類中:
@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowLoggingSinkApplication.class, args);
}
}
現在,我們需要一種方法來攔截來自處理器應用程序的消息。
為此,我們需要在方法中添加@StreamListener(Sink.INPUT)
批註:
@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
logger.info("Received: " + date);
}
該方法僅將轉換為格式化日期的時間戳打印到日誌文件中。
9.註冊一個流應用
Spring Cloud Data Flow Shell允許我們使用app register
命令在App Registry中註冊Stream App。
我們必須提供唯一的名稱,應用程序類型和可以解析為應用程序工件的URI。對於類型,指定“ source
”,“ processor
”或“ sink
”。
在為URI提供maven方案時,格式應符合以下要求:
maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
要註冊先前創建的Source
, Processor
和Sink
應用程序,請轉到Spring Cloud Data Flow Shell
並從提示符處發出以下命令:
app register --name time-source --type source
--uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT
app register --name time-processor --type processor
--uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT
app register --name logging-sink --type sink
--uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT
10.創建和部署流
要創建新的流定義,請轉到Spring Cloud Data Flow Shell
並執行以下shell命令:
stream create --name time-to-log
--definition 'time-source | time-processor | logging-sink'
這定義了一個名為流time-to-log
基礎上,DSL表達'time-source | time-processor | logging-sink'
。
然後,要部署流,請執行以下shell命令:
stream deploy --name time-to-log
Data Flow Server
將time-source
, time-processor
和logging-sink
解析為Maven坐標,並使用它們來啟動流的time-source
, time-processor
和logging-sink
應用程序。
如果流被正確部署,你會在看到Data Flow Server
日誌,該模塊已經啟動並連接在一起:
2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0
Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0
Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source
11.查看結果
在此示例中,源僅每秒發送一次當前時間戳作為消息,處理器對其進行格式化,日誌接收器使用日誌記錄框架輸出格式化的時間戳。
日誌文件位於Data Flow Server
日誌輸出中顯示的目錄內,如上所示。要查看結果,我們可以拖尾日誌:
tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21
12.結論
在本文中,我們已經看到瞭如何通過使用Spring Cloud Data Flow
構建用於流處理的數據管道。
此外,我們看到了流中Source
, Processor
和Sink
應用程序的作用,以及如何通過使用Data Flow Shell
在Data Flow Server
插入和綁定此模塊。
示例代碼可以在GitHub項目中找到。