AutoMQ 簡介:經濟高效的 Kafka 替代方案
1. 概述
Apache Kafka 已成為最受歡迎和最廣泛使用的訊息傳遞和事件流平台之一。然而,設定和管理 Kafka 叢集是一個複雜的過程,通常由大型組織中的專門團隊完成,以確保高可用性、可靠性、負載平衡和擴展。
AutoMQ是 Apache Kafka 的雲端原生替代品,專注於降低成本並提高效率。它採用共享儲存架構,將資料儲存在Amazon Simple Storage Service (S3)中,並透過Amazon Elastic Block Store (EBS)保證持久性。
在本教程中,我們將探討如何將 AutoMQ 整合到 Spring Boot 應用程式中。我們將逐步設定本地 AutoMQ 集群,並實現基本的生產者-消費者模式。
2. 使用測試容器設定 AutoMQ
為了促進本地開發和測試,我們將使用 Testcontainers 來設定 AutoMQ 叢集。透過 Testcontainers 運行 AutoMQ 叢集的先決條件是活動的 Docker 實例和 Docker Compose。
AutoMQ 提供用於本地部署的 Docker Compose 文件,該文件使用LocalStack來模擬 Amazon S3 服務,並使用本機檔案系統來模擬 Amazon EBS 。我們將在我們的設定中使用此 Compose 檔案。
請務必注意,以下設定不適用於生產環境。
2.1.依賴關係
讓我們先將必要的依賴項新增到專案的pom.xml
檔案中:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.0</version>
</dependency>
AutoMQ 與 Apache Kafka 完全相容,這意味著它實作相同的 API,並使用相同的協定和配置屬性。這允許我們使用熟悉的spring-kafka
依賴項將 AutoMQ 整合到我們的應用程式中。
接下來,我們將添加一些測試依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
spring-boot-testcontainers
相依性為我們提供了必要的類別來啟動 AutoMQ 叢集所需的臨時 Docker 實例。
此外,我們還添加了等待庫,它將幫助我們在本教程後面測試非同步生產者-消費者實作。
2.2.定義測試容器 Bean
接下來,讓我們建立一個@TestConfiguration
類別來定義我們的 Testcontainers beans:
`@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfiguration {
private static final String COMPOSE_URL = "https://download.automq.com/community_edition/standalone_deployment/docker-compose.yaml";
@Bean
public ComposeContainer composeContainer() {
File dockerCompose = downloadComposeFile();
return new ComposeContainer(dockerCompose)
.withLocalCompose(true);
}
private File downloadComposeFile() {
File dockerCompose = Files.createTempFile("docker-compose", ".yaml").toFile();
FileUtils.copyURLToFile(URI.create(COMPOSE_URL).toURL(), dockerCompose);
return dockerCompose;
}
}`
這裡,我們使用 Testcontainers 的Docker Compose 模組。首先,我們下載 AutoMQ Docker Compose 檔案並從其內容建立一個ComposeContainer
bean 。
我們使用withLocalCompose()
方法並將其設為true
,指示 Testcontainers 使用安裝在我們的開發或 CI 機器上的 Docker Compose 二進位。
但是, Testcontainers目前不支援Docker Compose 的container_name
屬性。讓我們圍繞它實現一個臨時 hack:
private File downloadComposeFile() {
// ... same as above
return removeContainerNames(dockerCompose);
}
private File removeContainerNames(File composeFile) {
List<String> filteredLines = Files.readAllLines(composeFile.toPath())
.stream()
.filter(line -> !line.contains("container_name:"))
.toList();
Files.write(composeFile.toPath(), filteredLines);
return composeFile;
}
私有的removeContainerNames()
方法從下載的Docker Compose檔案中刪除container_name
屬性。此解決方法可確保我們用於實例化ComposeContainer
bean 的 Docker Compose 不包含container_name
屬性。
最後,為了允許我們的應用程式連接到 AutoMQ 集群,我們將配置bootstrap-servers
屬性:
@Bean
public DynamicPropertyRegistrar dynamicPropertyRegistrar() {
return registry -> {
registry.add("spring.kafka.bootstrap-servers", () -> "localhost:9094,localhost:9095");
};
}
我們在定義DynamicPropertyRegistrar
bean 時配置預設的 AutoMQ 引導伺服器localhost:9094,localhost:9095
。
配置正確的連接詳細資訊後,Spring Boot 會自動建立KafkaTemplate
的 bean,我們將在本教學後面使用它。
2.3.在開發過程中使用測試容器
雖然 Testcontainers 主要用於整合測試,但我們也可以在本地開發期間使用它。
為了實現這一點,我們將在src/test/java
目錄中建立一個單獨的主類別:
public class TestApplication {
public static void main(String[] args) {
SpringApplication.from(Application::main)
.with(TestcontainersConfiguration.class)
.run(args);
}
}
我們建立一個TestApplication
類,並在其main()
方法中,使用TestcontainersConfiguration
類別啟動我們的主Application
類別。
此設定可幫助我們在本地設定和管理外部服務。我們可以執行 Spring Boot 應用程式並將其連接到透過 Testcontainers 啟動的外部服務。
3. 實現生產者-消費者模式
現在我們已經設定了本地 AutoMQ 集群,讓我們使用它來實現一個基本的生產者-消費者模式。
3.1.配置 AutoMQ 消費者
首先,我們在application.yml
檔案中定義消費者監聽的主題名稱:
com:
baeldung:
topic:
onboarding-initiated: user-service.onboarding.initiated.v1
接下來,讓我們建立一個類別來使用已配置主題中的消息:
`@Configuration
class UserOnboardingInitiatedListener {
private static final Logger log = LoggerFactory.getLogger(UserOnboardingInitiatedListener.class);
@KafkaListener(topics = "${com.baeldung.topic.onboarding-initiated}", groupId = "user-service")
public void listen(User user) {
log.info("Dispatching user account confirmation email to {}", user.email());
}
}
record User(String email) {
}`
這裡,我們在listen()
方法上使用@KafkaListener
註解來指定主題和消費者群組。每當訊息發佈到user-service.onboarding.initiated.v1
主題時,就會呼叫此方法。
我們定義一個User
記錄來表示我們的訊息負載。
最後,我們將以下配置新增至application.yml
檔案:
`spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.value.default.type: com.baeldung.automq.User
allow.auto.create.topics: true`
我們為消費者和生產者配置鍵和值序列化和反序列化屬性。此外,我們將User
記錄指定為預設訊息負載類型。
最後,我們啟用主題的自動創建,因此 AutoMQ 會在主題不存在時自動建立主題。
3.2.測試訊息消耗
現在我們已經配置了消費者,讓我們驗證它是否消費並記錄發佈到配置主題的訊息:
`@SpringBootTest
@ExtendWith(OutputCaptureExtension.class)
@Import(TestcontainersConfiguration.class)
class UserOnboardingInitiatedListenerLiveTest {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
@Value("${com.baeldung.topic.onboarding-initiated}")
private String onboardingInitiatedTopic;
@Test
void whenMessagePublishedToTopic_thenProcessedByListener(CapturedOutput capturedOutput) {
User user = new User("[email protected]");
kafkaTemplate.send(onboardingInitiatedTopic, user);
String expectedConsumerLog = String.format("Dispatching user account confirmation email to %s", user.email());
Awaitility
.await()
.atMost(1, TimeUnit.SECONDS)
.until(() -> capturedOutput.getAll().contains(expectedConsumerLog));
}
}`
在這裡,我們自動組裝KafkaTemplate
類別的實例,並使用@Value
注入儲存在application.yaml
檔案中的設定主題名稱。
我們首先建立一個User
物件並使用KafkaTemplate
將其傳送到配置的主題。然後,使用等待性和OutputCaptureExtension
提供的CapturedOutput
實例,我們斷言消費者已記錄預期的日誌訊息。
我們的測試案例可能會間歇性失敗,因為消費者需要一些時間來啟動並訂閱該主題。為了解決這個問題,讓我們在測試案例執行之前等待我們的消費者被分配分區:
@BeforeAll
void setUp(CapturedOutput capturedOutput) {
String expectedLog = "partitions assigned";
Awaitility
.await()
.atMost(Durations.ONE_MINUTE)
.pollDelay(Durations.ONE_SECOND)
.until(() -> capturedOutput.getAll().contains(expectedLog));
}
在使用@BeforeAll
註解的setUp()
方法中,我們等待最多一分鐘,每秒輪詢一次,直到CapturedOutput
實例包含確認分區分配的日誌。
我們的測試類別還演示了等待庫測試非同步操作的強大功能。
4. 結論
在本文中,我們探索了將 AutoMQ 整合到 Spring Boot 應用程式中。
使用Testcontainers的Docker Compose模組,我們啟動了AutoMQ集群,創建了本地測試環境。
然後,我們實現了一個基本的生產者-消費者架構並成功地進行了測試。
與往常一樣,本文中使用的所有程式碼範例都可以在 GitHub 上找到。