具有Spring Cloud數據流的ETL

1.概述

Spring Cloud Data Flow是一個用於構建實時數據管道和批處理流程的雲原生工具包。 Spring Cloud Data Flow已準備好用於一系列數據處理用例,例如簡單的導入/導出,ETL處理,事件流和預測分析。

在本教程中,我們將學習一個使用流管道的實時提取轉換和加載(ETL)示例,該流管道從JDBC數據庫中提取數據,將其轉換為簡單的POJO,然後將其加載到MongoDB中。

2. ETL和事件流處理

ETL(提取,轉換和加載)通常被稱為將多個數據庫和系統中的數據批量加載到公共數據倉庫中的過程。在此數據倉庫中,可以進行繁重的數據分析處理而不會影響系統的整體性能。

但是,新趨勢正在改變這種方式。 ETL在將數據傳輸到數據倉庫和數據湖中仍然發揮著作用。

如今,借助Spring Cloud Data Flow可以使用事件流體系結構中的流完成此操作。

3. Spring Cloud數據流

使用Spring Cloud Data Flow(SCDF),開發人員可以創建兩種形式的數據管道:

  • 使用Spring Cloud Stream的長期實時流應用程序
  • 使用Spring Cloud Task的短期批處理任務應用程序

在本文中,我們將介紹第一個基於Spring Cloud Stream的長期流式應用程序。

3.1。 Spring Cloud Stream應用程序

SCDF Stream管道由步驟組成,其中**每個步驟都是使用Spring Cloud Stream微框架以Spring Boot樣式構建的應用程序。**這些應用程序由諸如Apache Kafka或RabbitMQ的消息傳遞中間件集成。

這些應用程序分為源,處理器和接收器。與ETL流程相比,我們可以說源是“提取”部分,處理器是“變壓器”,宿是“負載”部分。

在某些情況下,我們可以在管道的一個或多個步驟中使用應用程序啟動器。這意味著我們不需要一步就實現一個新的應用程序,而是配置一個已經實現的現有應用程序啟動器。

可以在此處找到應用程序啟動器的列表。

3.2。 Spring Cloud Data Flow服務器

該架構的最後一部分是Spring Cloud Data Flow Server 。 SCDF Server使用Spring Cloud Deployer規范進行應用程序和管道流的部署。該規範通過部署到一系列現代運行時(例如Kubernetes,Apache Mesos,Yarn和Cloud Foundry)來支持SCDF雲原生風格。

另外,我們可以將流作為本地部署運行。

有關SCDF體系結構的更多信息,請參見此處。

4.環境設置

在開始之前,我們需要選擇此復雜部署的各個部分。要定義的第一部分是SCDF服務器。

為了進行測試,我們將使用SCDF Server Local進行本地開發。對於生產部署,我們稍後可以選擇雲原生運行時,例如SCDF Server Kubernetes 。我們可以在此處找到服務器運行時列表。

現在,讓我們檢查運行此服務器的系統要求。

4.1。系統要求

要運行SCDF Server,我們必須定義並設置兩個依賴項:

  • 消息傳遞中間件,以及
  • RDBMS。

對於消息傳遞中間件,我們將使用RabbitMQ,並且選擇PostgreSQL作為RDBMS來存儲管道流定義。

要運行RabbitMQ,請在此處下載最新版本並使用默認配置啟動RabbitMQ實例,或運行以下Docker命令:

docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

作為最後的設置步驟,在默認端口5432上安裝並運行PostgreSQL RDBMS。此後,創建數據庫,SCDF可以使用以下腳本在其中存儲其流定義:

CREATE DATABASE dataflow;

4.2。本地Spring Cloud Data Flow Server

為了在本地運行SCDF Server,我們可以選擇使用docker-compose啟動服務器也可以將其作為Java應用程序啟動。

在這裡,我們將SCDF Server Local作為Java應用程序運行。為了配置應用程序,我們必須將配置定義為Java應用程序參數。我們將在系統路徑中需要Java 8。

要託管jar和依賴項,我們需要為我們的SCDF Server創建一個主文件夾,並將SCDF Server Local發行版下載到該文件夾中。您可以在此處下載SCDF Server Local的最新發行版。

另外,我們需要創建一個lib文件夾,並在其中放置一個JDBC驅動程序。此處提供最新版本的PostgreSQL驅動程序。

最後,讓我們運行SCDF本地服務器:

$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \

 --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \

 --spring.datasource.username=postgres_username \

 --spring.datasource.password=postgres_password \

 --spring.datasource.driver-class-name=org.postgresql.Driver \

 --spring.rabbitmq.host=127.0.0.1 \

 --spring.rabbitmq.port=5672 \

 --spring.rabbitmq.username=guest \

 --spring.rabbitmq.password=guest

我們可以通過查看以下URL來檢查它是否正在運行:

http:// localhost:9393 / dashboard

4.3。 Spring Cloud Data Flow Shell

SCDF Shell是一個命令行工具,可輕鬆組成和部署我們的應用程序和管道。這些Shell命令在Spring Cloud Data Flow Server REST API上運行

將jar的最新版本下載到您的SCDF主文件夾中,可從此處獲得。完成後,運行以下命令(根據需要更新版本):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar

 ____ ____ _ __

 / ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| |

 \___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` |

 ___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| |

 |____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_|

 ____ |_| _ __|___/ __________

 | _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \

 | | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \

 | |_| | (_| | || (_| | | _| | | (_) \ VV / / / / / / /

 |____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/





 Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".

 dataflow:>

如果在最後一行中顯示的是“ server-unknown:>” ,而不是“ dataflow:>” ,則說明您不在本地主機上運行SCDF Server。在這種情況下,運行以下命令以連接到另一台主機:

server-unknown:>dataflow config server http://{host}

現在,Shell已連接到SCDF服務器,我們可以運行命令了。

我們在Shell中需要做的第一件事是導入應用程序啟動器。找到最新版本在這裡進行的RabbitMQ + Maven的春季引導2.0.x版本,並運行以下命令(再次,更新版本,這裡的“ Darwin-SR1 ”,需要的話):

$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven

要檢查已安裝的應用程序,請運行以下Shell命令:

$ dataflow:> app list

結果,我們應該看到一個包含所有已安裝應用程序的表。

另外,SCDF提供了一個名為Flo的圖形界面,我們可以通過以下地址訪問該界面: http://localhost:9393/dashboard 。但是,它的使用不在本文的範圍之內。

5.組成ETL管道

現在讓我們創建我們的流管道。為此,我們將使用JDBC Source應用程序啟動器從關係數據庫中提取信息。

另外,我們將創建一個用於轉換信息結構的自定義處理器和一個自定義接收器,以將我們的數據加載到MongoDB中。

5.1。提取–準備要提取的關係數據庫

讓我們創建一個名為crm的數據庫和一個名為customer的表:

CREATE DATABASE crm;
CREATE TABLE customer (

 id bigint NOT NULL,

 imported boolean DEFAULT false,

 customer_name character varying(50),

 PRIMARY KEY(id)

 )

請注意,我們使用的標誌是imported ,它將存儲已導入的記錄。如果需要,我們還可以將該信息存儲在另一個表中。

現在,讓我們插入一些數據:

INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2。轉換–將JDBC字段映射到MongoDB字段結構

對於轉換步驟,我們將把源表中的customer_name字段簡單地轉換為新的字段name 。可以在這裡完成其他轉換,但讓我們簡化示例。

為此,我們將創建一個名為customer-transform的新項目。最簡單的方法是使用Spring Initializr站點創建項目。進入網站後,選擇一個組和一個工件名稱。我們將分別使用com.customercustomer-transform,

完成後,單擊“生成項目”按鈕以下載項目。然後,解壓縮該項目並將其導入到您最喜歡的IDE中,並將以下依賴項添加到pom.xml

<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-stream-binder-rabbit</artifactId>

 </dependency>

現在,我們開始編碼字段名稱轉換。為此,我們將創建Customer類以充當適配器。此類將通過setName()方法接收customer_name ,並將通過getName方法輸出其值.

@JsonProperty註釋將在從JSON反序列化為Java的同時進行轉換:

public class Customer {



 private Long id;



 private String name;



 @JsonProperty("customer_name")

 public void setName(String name) {

 this.name = name;

 }



 @JsonProperty("name")

 public String getName() {

 return name;

 }



 // Getters and Setters

 }

處理器需要從輸入接收數據,進行轉換並將結果綁定到輸出通道。讓我們創建一個類來做到這一點:

import org.springframework.cloud.stream.annotation.EnableBinding;

 import org.springframework.cloud.stream.messaging.Processor;

 import org.springframework.integration.annotation.Transformer;



 @EnableBinding(Processor.class)

 public class CustomerProcessorConfiguration {



 @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

 public Customer convertToPojo(Customer payload) {



 return payload;

 }

 }

在上面的代碼中,我們可以觀察到轉換是自動發生的。輸入將數據接收為JSON,然後Jackson使用set方法將其反序列化為Customer對象。

輸出相反,使用get方法將數據序列化為JSON。

5.3。負載– MongoDB中的接收器

與轉換步驟類似,我們將創建另一個maven項目,現在將其命名為customer- mongodb -sink 。再次,訪問Spring Initializr ,為Group選擇com.customer ,為Artifact選擇customer-mongodb-sink 。然後,在依賴項搜索框中鍵入 MongoDB 並下載項目。

接下來,解壓縮並將其導入您喜歡的IDE。

然後,添加與customer-transform項目相同的額外依賴項。

現在,我們將創建另一個Customer類,以在此步驟中接收輸入:

import org.springframework.data.mongodb.core.mapping.Document;



 @Document(collection="customer")

 public class Customer {



 private Long id;

 private String name;



 // Getters and Setters

 }

為了接收Customer ,我們將創建一個偵聽器類,該類將使用CustomerRepository保存客戶實體:

@EnableBinding(Sink.class)

 public class CustomerListener {



 @Autowired

 private CustomerRepository repository;



 @StreamListener(Sink.INPUT)

 public void save(Customer customer) {

 repository.save(customer);

 }

 }

CustomerRepository在這種情況下是Spring Data的MongoRepository

import org.springframework.data.mongodb.repository.MongoRepository;

 import org.springframework.stereotype.Repository;



 @Repository

 public interface CustomerRepository extends MongoRepository<Customer, Long> {



 }

5.4。流定義

現在,兩個自定義應用程序都準備好在SCDF Server上註冊。為此,請使用Maven命令mvn install編譯兩個項目。

然後,我們使用Spring Cloud Data Flow Shell註冊它們:

app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

最後,讓我們檢查應用程序是否存儲在SCDF中,在shell中運行application list命令:

app list

結果,我們應該在結果表中看到這兩個應用程序。

5.4.1。流管道領域特定語言– DSL

DSL定義了應用程序之間的配置和數據流。 SCDF DSL很簡單。首先,我們定義應用程序的名稱,然後定義配置。

而且,該語法是受Unix啟發的Pipeline語法,它使用豎線(也稱為“管道”)連接多個應用程序:

http --port=8181 | log

這將創建在端口8181中服務的HTTP應用程序,該應用程序會將接收到的主體有效負載發送到日誌。

現在,讓我們看看如何創建JDBC Source的DSL流定義。

5.4.2。 JDBC源流定義

JDBC Source的關鍵配置是queryupdatequery將選擇未讀的記錄,而update將更改一個標誌,以防止重新讀取當前記錄。

另外,我們將定義JDBC Source以30秒的固定延遲進行輪詢,並且最多輪詢1000行。最後,我們將定義連接的配置,例如驅動程序,用戶名,密碼和連接URL:

jdbc

 --query='SELECT id, customer_name FROM public.customer WHERE imported = false'

 --update='UPDATE public.customer SET imported = true WHERE id in (:id)'

 --max-rows-per-poll=1000

 --fixed-delay=30 --time-unit=SECONDS

 --driver-class-name=org.postgresql.Driver

 --url=jdbc:postgresql://localhost:5432/crm

 --username=postgres

 --password=postgres

此處可以找到更多的JDBC Source配置屬性。

5.4.3。客戶MongoDB接收器流定義

由於我們沒有在customer-mongodb-sink application.properties中定義連接配置,因此將通過DSL參數進行配置。

我們的應用程序完全基於MongoDataAutoConfiguration.您可以在此處查看其他可能的配置基本上,我們將定義spring.data.mongodb.uri

customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.4。創建和部署流

首先,要創建最終的流定義,請返回命令行管理程序並執行以下命令(不添加換行符,僅為了可讀性,已插入了這些換行符):

stream create --name jdbc-to-mongodb

 --definition "jdbc

 --query='SELECT id, customer_name FROM public.customer WHERE imported=false'

 --fixed-delay=30

 --max-rows-per-poll=1000

 --update='UPDATE customer SET imported=true WHERE id in (:id)'

 --time-unit=SECONDS

 --password=postgres

 --driver-class-name=org.postgresql.Driver

 --username=postgres

 --url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink

 --spring.data.mongodb.uri=mongodb://localhost/main"

該流DSL定義了一個名為jdbc -to- mongodb的流。接下來,我們將按其名稱部署流

stream deploy --name jdbc-to-mongodb

最後,我們應該在日誌輸出中看到所有可用日誌的位置:

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink



 Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform



 Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

六,結論

在本文中,我們已經看到了使用Spring Cloud Data Flow的ETL數據管道的完整示例。

最值得注意的是,我們看到了應用程序啟動器的配置,使用Spring Cloud Data Flow Shell創建了ETL流管道,並實現了用於讀取,轉換和寫入數據的自定義應用程序。

與往常一樣,示例代碼可以在GitHub項目中找到