使用Spring Cloud App Starter

1.簡介

在本文中,我們將演示如何使用Spring Cloud App啟動程序-這些啟動程序提供了自舉和隨時可用的應用程序-可以用作將來開發的起點。

簡而言之,Task App Starters專用於數據庫遷移和分佈式測試等用例,Stream App Starters提供與外部系統的集成。

總體而言,有超過55個啟動器;在此處此處查看官方文檔,以獲取有關這兩者的更多信息。

接下來,我們將構建一個小型的分佈式Twitter應用程序,它將Twitter帖子流式傳輸到Hadoop分佈式文件系統中。

2.進行設置

我們將使用consumer-keyaccess-token創建一個簡單的Twitter應用程序。

然後,我們將設置Hadoop,以便我們可以保留Twitter流以用於將來的大數據用途。

最後,我們可以選擇使用提供的Spring GitHub存儲庫來編譯和組裝sources獨立組件–使用Maven的processors-sinks體系結構模式,或通過其Spring Stream綁定接口組合sourcesprocessorssinks

我們將探討實現此目的的兩種方法。

值得注意的是,以前,所有Stream App Starters都整理到github.com/spring-cloud/spring-cloud-stream-app-starters的一個大型倉庫中。每個啟動器均已簡化和隔離。

3. Twitter憑證

首先,讓我們設置我們的Twitter Developer憑據。要獲取Twitter開發人員憑據,請按照以下步驟設置應用程序,並從Twitter開發人員的官方文檔中創建訪問令牌。

具體來說,我們需要:

  1. 消費者密鑰
  2. 消費者關鍵秘密
  3. 訪問令牌機密
  4. 訪問令牌

請確保打開該窗口或將其記下,因為我們將在下面使用它們!

4.安裝Hadoop

接下來,讓我們安裝Hadoop!我們可以遵循官方文檔,也可以簡單地利用Docker:

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5.編譯我們的應用入門

要使用獨立的完全獨立的組件,我們可以從其GitHub存儲庫中單獨下載並編譯所需的Spring Cloud Stream App Starters。

5.1。 Twitter Spring Cloud Stream應用程序入門

讓我們將Twitter Spring Cloud Stream App Starter( org.springframework.cloud.stream.app.twitterstream.source )添加到我們的項目中:

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

然後,我們運行Maven:

./mvnw clean install -PgenerateApps

生成的已編譯Starter App將在本地項目根目錄的“ / target”中可用。

然後,我們可以運行已編譯的.jar並傳遞相關的應用程序屬性,如下所示:

java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \

 --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>

我們還可以使用熟悉的Spring application.properties:傳遞憑據application.properties:

twitter.credentials.access-token=...

 twitter.credentials.access-token-secret=...

 twitter.credentials.consumer-key=...

 twitter.credentials.consumer-secret=...

5.2。 HDFS Spring Cloud Stream應用程序入門

現在(已經設置了Hadoop),讓我們將HDFS Spring Cloud Stream App Starter( org.springframework.cloud.stream.app.hdfs.sink )依賴項添加到我們的項目中。

首先,克隆相關的倉庫:

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

然後,運行Maven作業:

./mvnw clean install -PgenerateApps

生成的已編譯Starter App將在本地項目根目錄的“ / target”中可用。然後,我們可以運行已編譯的.jar並傳遞相關的應用程序屬性:

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/

hdfs://127.0.0.1:50010/ ”是Hadoop的默認設置,但是默認的HDFS端口可能會根據您配置實例的方式而有所不同。

鑑於我們先前傳入的配置,我們可以在“ http://0.0.0.0:50070 ”上看到數據節點(及其當前端口)的列表。

我們還可以在編譯之前使用熟悉的Spring application.properties傳遞我們的憑據-因此我們不必總是通過CLI傳遞這些憑據。

讓我們將application.properties配置為使用默認的Hadoop端口:

hdfs.fs-uri=hdfs://127.0.0.1:50010/

6.使用AggregateApplicationBuilder

另外,我們可以通過org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder將我們的Spring Stream SourceSink合併到一個簡單的Spring Boot應用程序中!

首先,我們將兩個Stream App Starters添加到我們的pom.xml

<dependencies>

 <dependency>

 <groupId>org.springframework.cloud.stream.app</groupId>

 <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>

 <version>2.1.2.RELEASE</version>

 </dependency>

 <dependency>

 <groupId>org.springframework.cloud.stream.app</groupId>

 <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>

 <version>2.1.2.RELEASE</version>

 </dependency>

 </dependencies>

然後,通過將兩個Stream App Starter依賴項包裝到各自的子應用程序中,開始組合它們。

6.1。構建我們的應用程序組件

我們的SourceApp指定要轉換或使用的Source

@SpringBootApplication

 @EnableBinding(Source.class)

 @Import(TwitterstreamSourceConfiguration.class)

 public class SourceApp {

 @InboundChannelAdapter(Source.OUTPUT)

 public String timerMessageSource() {

 return new SimpleDateFormat().format(new Date());

 }

 }

請注意,我們將SourceApp綁定到org.springframework.cloud.stream.messaging.Source並註入適當的配置類以從環境屬性中選擇所需的設置。

接下來,我們設置一個簡單的org.springframework.cloud.stream.messaging.Processor綁定:

@SpringBootApplication

 @EnableBinding(Processor.class)

 public class ProcessorApp {

 @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

 public String processMessage(String payload) {

 log.info("Payload received!");

 return payload;

 }

 }

然後,我們創建我們的消費者( Sink ):

@SpringBootApplication

 @EnableBinding(Sink.class)

 @Import(HdfsSinkConfiguration.class)

 public class SinkApp {

 @ServiceActivator(inputChannel= Sink.INPUT)

 public void loggerSink(Object payload) {

 log.info("Received: " + payload);

 }

 }

在這裡,我們將SinkApp綁定到org.springframework.cloud.stream.messaging.Sink然後再次注入正確的配置類以使用我們指定的Hadoop設置。

最後,我們在AggregateApp主要方法中使用AggregateApplicationBuilder組合SourceAppProcessorAppSinkApp

@SpringBootApplication

 public class AggregateApp {

 public static void main(String[] args) {

 new AggregateApplicationBuilder()

 .from(SourceApp.class).args("--fixedDelay=5000")

 .via(ProcessorApp.class)

 .to(SinkApp.class).args("--debug=true")

 .run(args);

 }

 }

與任何Spring Boot應用程序一樣,我們可以通過application.properties or編程方式將指定的設置作為環境屬性注入。

由於我們使用的是Spring Stream框架,因此我們也可以將參數傳遞給AggregateApplicationBuilder構造函數。

6.2。運行完整的應用程序

然後,我們可以使用以下命令行指令來編譯並運行我們的應用程序:

 $ mvn install

 $ java -jar twitterhdfs.jar

請記住,將每個@SpringBootApplication類保留在單獨的包中(否則,將引發多個不同的綁定異常)!有關如何使用AggregateApplicationBuilder更多信息,請參閱官方文檔

編譯並運行應用程序之後,我們應該在控制台中看到類似以下內容的內容(自然,內容會因Tweet而異):

2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]

 cbtwitterhdfs.processor.ProcessorApp : Payload received!

 2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]

 com.baeldung.twitterhdfs.sink.SinkApp : Received: {"created_at":

 "Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":

 "952761898239385601","text":"RT @mighty_jimin: 180114 ...

這些證明了我們的正確操作ProcessorSink從接收數據的Source !在此示例中,我們沒有將HDFS接收器配置為執行太多操作-它只會打印消息“已收到有效負載!”

7.結論

在本教程中,我們學習瞭如何將兩個很棒的Spring Stream App Starters組合成一個可愛的Spring Boot示例!

這是有關Spring Boot Starters的其他一些很棒的官方文章,以及如何創建自定義的Starter!

與往常一樣,本文中使用的代碼可以在GitHub上找到