Spring Reactor簡介
1.概述
在這篇快速的文章中,我們將通過為反應性,事件驅動的應用程序設置真實場景來介紹反應堆總線。
2. Project Reactor的基礎
2.1 為什麼要使用Reactor?
現代應用程序需要處理大量並發請求並處理大量數據。標準的阻塞代碼已不足以滿足這些要求。
反應性設計模式是一種基於事件的體系結構方法,用於異步處理來自單個或多個服務處理程序的大量並發服務請求。
Project Reactor基於此模式,並且有一個明確而雄心勃勃的目標,那就是在JVM上構建非阻塞,反應式應用程序.
2.2 示例方案
在開始之前,這裡有一些有趣的場景,在這些場景中,可以利用反應式架構風格,只是為了了解我們可以在哪裡應用它:
- 適用於大型在線購物平台(如亞馬遜)的通知服務
- 為銀行業提供大量交易處理服務
- 股票價格同時變化的股票交易業務
3. Maven依賴
讓我們通過在pom.xml:
添加以下依賴關係來開始使用Project Reactor Bus pom.xml:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>
<version>2.0.8.RELEASE</version>
</dependency>
我們可以在Maven Central中查看最新版本的reactor-bus
。
4.構建演示應用程序
為了更好地理解基於反應器的方法的好處,讓我們看一個實際的例子。
我們將構建一個簡單的應用程序,負責向在線購物平台的用戶發送通知。例如,如果用戶下了新訂單,則該應用會通過電子郵件或SMS發送訂單確認。
典型的同步實現自然會受到電子郵件或SMS服務的吞吐量的限制。因此,交通高峰,例如假期通常是有問題的。
通過反應性方法,我們可以將系統設計為更靈活,並更好地適應外部系統(例如網關服務器)中可能發生的故障或超時。
讓我們看一下該應用程序-從更傳統的方面開始,再轉到更具reactive的結構。
4.1。簡單的POJO
首先,我們創建一個POJO類來表示通知數據:
eval(ez_write_tag([[468,60],'baeldung_com-medrectangle-3','ezslot_0',110,'0','0']));
public class NotificationData {
private long id;
private String name;
private String email;
private String mobile;
// getter and setter methods
}
4.2。服務層
現在讓我們定義一個簡單的服務層:
public interface NotificationService {
void initiateNotification(NotificationData notificationData)
throws InterruptedException;
}
和實現,模擬長時間運行的操作:
@Service
public class NotificationServiceimpl implements NotificationService {
@Override
public void initiateNotification(NotificationData notificationData)
throws InterruptedException {
System.out.println("Notification service started for "
+ "Notification ID: " + notificationData.getId());
Thread.sleep(5000);
System.out.println("Notification service ended for "
+ "Notification ID: " + notificationData.getId());
}
}
請注意,說明通過短信或電子郵件網關發送消息的真實生活場景,我們特意推出了五秒鐘的延遲initiateNotification
與方法Thread.sleep(5000).
因此,當線程訪問服務時,它將被阻塞五秒鐘。
4.3。消費者
現在,讓我們進入應用程序的更多響應方面並實現一個使用者-然後將其映射到反應堆事件總線:
@Service
public class NotificationConsumer implements
Consumer<Event<NotificationData>> {
@Autowired
private NotificationService notificationService;
@Override
public void accept(Event<NotificationData> notificationDataEvent) {
NotificationData notificationData = notificationDataEvent.getData();
try {
notificationService.initiateNotification(notificationData);
} catch (InterruptedException e) {
// ignore
}
}
}
如我們所見,我們創建的Consumer<T>
實現了Consumer<T>
接口。主要邏輯駐留在accept
方法中。
這是我們在典型的Spring偵聽器實現中可以達到的類似方法。
4.4。控制器
最後,既然我們可以使用事件了,那麼我們也可以生成它們。
我們將在一個簡單的控制器中執行此操作:
@Controller
public class NotificationController {
@Autowired
private EventBus eventBus;
@GetMapping("/startNotification/{param}")
public void startNotification(@PathVariable Integer param) {
for (int i = 0; i < param; i++) {
NotificationData data = new NotificationData();
data.setId(i);
eventBus.notify("notificationConsumer", Event.wrap(data));
System.out.println(
"Notification " + i + ": notification task submitted successfully");
}
}
}
這是不言自明的-我們在這里通過EventBus
發出事件。
例如,如果客戶端點擊參數值為10的URL,則將通過事件總線發送十個事件。
4.5。 Java配置
現在讓我們將所有內容放在一起,並創建一個簡單的Spring Boot應用程序。
首先,我們需要配置EventBus
和Environment
Bean:
@Configuration
public class Config {
@Bean
public Environment env() {
return Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
public EventBus createEventBus(Environment env) {
return EventBus.create(env, Environment.THREAD_POOL);
}
}
在我們的例子中,我們使用環境中可用的默認線程池實例化EventBus
。
另外,我們可以使用自定義的Dispatcher
實例:
EventBus evBus = EventBus.create(
env,
Environment.newDispatcher(
REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,
DispatcherType.THREAD_POOL_EXECUTOR));
現在,我們準備創建一個主要的應用程序代碼:
import static reactor.bus.selector.Selectors.$;
@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {
@Autowired
private EventBus eventBus;
@Autowired
private NotificationConsumer notificationConsumer;
@Override
public void run(String... args) throws Exception {
eventBus.on($("notificationConsumer"), notificationConsumer);
}
public static void main(String[] args) {
SpringApplication.run(NotificationApplication.class, args);
}
}
在我們的run
方法,我們註冊notificationConsumer
在通知指定選擇匹配被觸發。
注意我們如何使用$
屬性的靜態導入來創建Selector
對象。
5.測試應用程序
現在讓我們創建一個測試來查看我們的NotificationApplication
情況:
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {
@LocalServerPort
private int port;
@Test
public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
}
}
如我們所見,一旦執行了請求,所有十個任務都會立即提交,而不會產生任何阻塞。提交後,通知事件將並行處理。
Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9
重要的是要記住,在我們的場景中,無需按任何特定順序處理這些事件。
六,結論
在本快速教程中,我們創建了一個簡單的事件驅動應用程序。我們還看到瞭如何開始編寫更具反應性和非阻塞性的代碼。
但是,這種情況只是刮擦對象的表面,只是開始嘗試反應式範式的良好基礎。
與往常一樣,源代碼可以在GitHub上找到。