使用Spring Cloud App Starter
1.簡介
在本文中,我們將演示如何使用Spring Cloud App啟動程序-這些啟動程序提供了自舉和隨時可用的應用程序-可以用作將來開發的起點。
簡而言之,Task App Starters專用於數據庫遷移和分佈式測試等用例,Stream App Starters提供與外部系統的集成。
總體而言,有超過55個啟動器;在此處和此處查看官方文檔,以獲取有關這兩者的更多信息。
接下來,我們將構建一個小型的分佈式Twitter應用程序,它將Twitter帖子流式傳輸到Hadoop分佈式文件系統中。
2.進行設置
我們將使用consumer-key
和access-token
創建一個簡單的Twitter應用程序。
然後,我們將設置Hadoop,以便我們可以保留Twitter流以用於將來的大數據用途。
最後,我們可以選擇使用提供的Spring GitHub存儲庫來編譯和組裝sources
獨立組件–使用Maven的processors-sinks
體系結構模式,或通過其Spring Stream綁定接口組合sources
, processors
和sinks
。
我們將探討實現此目的的兩種方法。
值得注意的是,以前,所有Stream App Starters都整理到github.com/spring-cloud/spring-cloud-stream-app-starters的一個大型倉庫中。每個啟動器均已簡化和隔離。
3. Twitter憑證
首先,讓我們設置我們的Twitter Developer憑據。要獲取Twitter開發人員憑據,請按照以下步驟設置應用程序,並從Twitter開發人員的官方文檔中創建訪問令牌。
具體來說,我們需要:
- 消費者密鑰
- 消費者關鍵秘密
- 訪問令牌機密
- 訪問令牌
請確保打開該窗口或將其記下,因為我們將在下面使用它們!
4.安裝Hadoop
接下來,讓我們安裝Hadoop!我們可以遵循官方文檔,也可以簡單地利用Docker:
$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1
5.編譯我們的應用入門
要使用獨立的完全獨立的組件,我們可以從其GitHub存儲庫中單獨下載並編譯所需的Spring Cloud Stream App Starters。
5.1。 Twitter Spring Cloud Stream應用程序入門
讓我們將Twitter Spring Cloud Stream App Starter( org.springframework.cloud.stream.app.twitterstream.source
)添加到我們的項目中:
git clone https://github.com/spring-cloud-stream-app-starters/twitter.git
然後,我們運行Maven:
./mvnw clean install -PgenerateApps
生成的已編譯Starter App將在本地項目根目錄的“ / target”中可用。
然後,我們可以運行已編譯的.jar並傳遞相關的應用程序屬性,如下所示:
java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
--accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>
我們還可以使用熟悉的Spring application.properties:
傳遞憑據application.properties:
twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...
5.2。 HDFS Spring Cloud Stream應用程序入門
現在(已經設置了Hadoop),讓我們將HDFS Spring Cloud Stream App Starter( org.springframework.cloud.stream.app.hdfs.sink
)依賴項添加到我們的項目中。
首先,克隆相關的倉庫:
git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git
然後,運行Maven作業:
./mvnw clean install -PgenerateApps
生成的已編譯Starter App將在本地項目根目錄的“ / target”中可用。然後,我們可以運行已編譯的.jar並傳遞相關的應用程序屬性:
java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/
“ hdfs://127.0.0.1:50010/
”是Hadoop的默認設置,但是默認的HDFS端口可能會根據您配置實例的方式而有所不同。
鑑於我們先前傳入的配置,我們可以在“ http://0.0.0.0:50070
”上看到數據節點(及其當前端口)的列表。
我們還可以在編譯之前使用熟悉的Spring application.properties
傳遞我們的憑據-因此我們不必總是通過CLI傳遞這些憑據。
讓我們將application.properties
配置為使用默認的Hadoop端口:
hdfs.fs-uri=hdfs://127.0.0.1:50010/
6.使用AggregateApplicationBuilder
另外,我們可以通過org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder
將我們的Spring Stream Source
和Sink
合併到一個簡單的Spring Boot應用程序中!
首先,我們將兩個Stream App Starters添加到我們的pom.xml
:
<dependencies>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
</dependencies>
然後,通過將兩個Stream App Starter依賴項包裝到各自的子應用程序中,開始組合它們。
6.1。構建我們的應用程序組件
我們的SourceApp
指定要轉換或使用的Source
:
@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
@InboundChannelAdapter(Source.OUTPUT)
public String timerMessageSource() {
return new SimpleDateFormat().format(new Date());
}
}
請注意,我們將SourceApp
綁定到org.springframework.cloud.stream.messaging.Source
並註入適當的配置類以從環境屬性中選擇所需的設置。
接下來,我們設置一個簡單的org.springframework.cloud.stream.messaging.Processor
綁定:
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String processMessage(String payload) {
log.info("Payload received!");
return payload;
}
}
然後,我們創建我們的消費者( Sink
):
@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
@ServiceActivator(inputChannel= Sink.INPUT)
public void loggerSink(Object payload) {
log.info("Received: " + payload);
}
}
在這裡,我們將SinkApp
綁定到org.springframework.cloud.stream.messaging.Sink
然後再次注入正確的配置類以使用我們指定的Hadoop設置。
最後,我們在AggregateApp
主要方法中使用AggregateApplicationBuilder
組合SourceApp
, ProcessorApp
和SinkApp
:
@SpringBootApplication
public class AggregateApp {
public static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApp.class).args("--fixedDelay=5000")
.via(ProcessorApp.class)
.to(SinkApp.class).args("--debug=true")
.run(args);
}
}
與任何Spring Boot應用程序一樣,我們可以通過application.properties or
編程方式將指定的設置作為環境屬性注入。
由於我們使用的是Spring Stream框架,因此我們也可以將參數傳遞給AggregateApplicationBuilder
構造函數。
6.2。運行完整的應用程序
然後,我們可以使用以下命令行指令來編譯並運行我們的應用程序:
$ mvn install
$ java -jar twitterhdfs.jar
請記住,將每個@SpringBootApplication
類保留在單獨的包中(否則,將引發多個不同的綁定異常)!有關如何使用AggregateApplicationBuilder
更多信息,請參閱官方文檔。
編譯並運行應用程序之後,我們應該在控制台中看到類似以下內容的內容(自然,內容會因Tweet而異):
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
cbtwitterhdfs.processor.ProcessorApp : Payload received!
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
com.baeldung.twitterhdfs.sink.SinkApp : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...
這些證明了我們的正確操作Processor
和Sink
從接收數據的Source
!在此示例中,我們沒有將HDFS接收器配置為執行太多操作-它只會打印消息“已收到有效負載!”
7.結論
在本教程中,我們學習瞭如何將兩個很棒的Spring Stream App Starters組合成一個可愛的Spring Boot示例!
這是有關Spring Boot Starters的其他一些很棒的官方文章,以及如何創建自定義的Starter!
與往常一樣,本文中使用的代碼可以在GitHub上找到。