使用Spring 5的反應式WebSocket
1.概述
在本文中,我們將使用新的Spring 5 WebSockets API和Spring WebFlux提供的響應功能來創建一個快速示例。
WebSocket是一種眾所周知的協議,支持客戶端和服務器之間的全雙工通信,通常用於Web應用程序中,其中客戶端和服務器需要以高頻率和低延遲交換事件。
Spring Framework 5在框架中具有現代化的WebSockets支持,從而為該通信通道添加了響應功能。
我們可以在Spring WebFlux上找到更多信息。
2. Maven依賴
我們將使用spring-boot-starters依賴項進行spring-boot-integration和spring-boot-starter-webflux的使用,當前版本可在Spring Milestone Repository中獲得。
在此示例中,我們使用的是最新可用版本2.0.0.M7,但應該始終在Maven存儲庫中獲得可用的最新版本:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
3. Spring中的WebSocket配置
我們的配置非常簡單:我們將注入WebSocketHandler
來處理Spring WebSocket應用程序中的套接字會話。
@Autowired
private WebSocketHandler webSocketHandler;
此外,讓我們創建一個HandlerMapping
bean註釋的方法,該方法將負責請求和處理程序對象之間的映射:
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
我們可以連接的URL為: ws://localhost:<port>/event-emitter.
4. Spring中的WebSocket消息處理
我們的ReactiveWebSocketHandler
類將負責管理服務器端的WebSocket會話。
它實現了WebSocketHandler
接口,因此我們可以覆蓋handle
方法,該方法將用於將消息發送到WebSocket客戶端:
@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
// private fields ...
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText)
.log());
}
}
5.創建一個簡單的反應式WebSocket客戶端
現在,讓我們創建一個Spring Reactive WebSocket客戶端,它將能夠與我們的WebSocket服務器連接並交換信息。
5.1。 Maven依賴
首先,Maven依賴項。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
在這裡,我們使用與之前用於設置反應式WebSocket服務器應用程序相同的spring-boot-starter-webflux。
5.2。 WebSocket客戶端
現在,讓我們創建ReactiveClientWebSocket
類,該類負責開始與服務器的通信:
public class ReactiveJavaClientWebSocket {
public static void main(String[] args) throws InterruptedException {
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create("ws://localhost:8080/event-emitter"),
session -> session.send(
Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
.thenMany(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.log())
.then())
.block(Duration.ofSeconds(10L));
}
}
在上面的代碼中,我們可以看到我們正在使用ReactorNettyWebSocketClient
,這是用於Reactor Netty的WebSocketClient
實現。
此外,客戶端通過URL ws://localhost:8080/event-emitter,
連接到WebSocket服務器,並在連接到服務器後立即建立會話。
我們還可以看到,我們正在將消息以及連接請求發送到服務器(“ event-spring-reactive-client-websocket
”)。
此外,調用send
方法,將類型為Publisher<T>,
的變量作為參數Publisher<T>,
在我們的示例中,我們的Publisher<T>
是Mono<T>
, T
是簡單的String“ event-me-from-reactive-java-client-websocket
”。
而且,調用了期望類型為String
的Flux
的thenMany(…)
方法。 receive()
方法獲取傳入消息的流量,這些消息隨後將轉換為字符串。
最後, block()
方法會在給定時間(在我們的示例中為10秒)後強制客戶端與服務器斷開連接。
5.3。啟動客戶端
要運行它,請確保Reactive WebSocket服務器已啟動並正在運行。然後,啟動ReactiveJavaClientWebSocket
類,我們可以在sysout
日誌上看到正在發出的事件:
[reactor-http-nio-4] INFO reactor.Flux.Map.1 -
onNext({"eventId":"6042b94f-fd02-47a1-911d-dacf97f12ba6",
"eventDt":"2018-01-11T23:29:26.900"})
我們還可以從Reactive WebSocket服務器的日誌中看到客戶端在嘗試連接期間發送的消息:
[reactor-http-nio-2] reactor.Flux.Map.1:
onNext(event-me-from-reactive-java-client)
此外,我們可以在客戶端完成請求後(在我們的情況下,是10秒鐘後)看到終止連接的消息:
[reactor-http-nio-2] reactor.Flux.Map.1: onComplete()
6.創建瀏覽器WebSocket客戶端
讓我們創建一個簡單的HTML / Javascript客戶端WebSocket來使用我們的響應式WebSocket服務器應用程序。
<div class="events"></div>
<script>
var clientWebSocket = new WebSocket("ws://localhost:8080/event-emitter");
clientWebSocket.onopen = function() {
console.log("clientWebSocket.onopen", clientWebSocket);
console.log("clientWebSocket.readyState", "websocketstatus");
clientWebSocket.send("event-me-from-browser");
}
clientWebSocket.onclose = function(error) {
console.log("clientWebSocket.onclose", clientWebSocket, error);
events("Closing connection");
}
clientWebSocket.onerror = function(error) {
console.log("clientWebSocket.onerror", clientWebSocket, error);
events("An error occured");
}
clientWebSocket.onmessage = function(error) {
console.log("clientWebSocket.onmessage", clientWebSocket, error);
events(error.data);
}
function events(responseEvent) {
document.querySelector(".events").innerHTML += responseEvent + "<br>";
}
</script>
在運行WebSocket服務器的情況下,在瀏覽器(例如Chrome,Internet Explorer,Mozilla Firefox等)中打開此HTML文件,我們應該看到事件被打印在屏幕上,每個事件的延遲為1秒,如我們的WebSocket服務器。
{"eventId":"c25975de-6775-4b0b-b974-b396847878e6","eventDt":"2018-01-11T23:56:09.780"}
{"eventId":"ac74170b-1f71-49d3-8737-b3f9a8a352f9","eventDt":"2018-01-11T23:56:09.781"}
{"eventId":"40d8f305-f252-4c14-86d7-ed134d3e10c6","eventDt":"2018-01-11T23:56:09.782"}
7.結論
在這裡,我們提供了一個示例,說明如何通過使用Spring 5 Framework實現服務器和客戶端之間的WebSocket通信,並實現Spring Webflux提供的新的響應功能。
與往常一樣,可以在我們的GitHub存儲庫中找到完整的示例。