Spring Kafka和Spring Boot配置

在下面的教程中,我們將演示如何使用Spring Boot配置Spring Kafka。 Spring Boot使用合理的默認配置Spring Kafka。並使用application.yml屬性文件覆蓋這些默認值。

項目設置

  • Spring Kafka: 2.1.4.RELEASE
  • Spring Boot: 2.0.0.RELEASE
  • Apache Kafka: kafka_2.11-1.0.0
  • Maven: 3.5

此前已經學習瞭如何創建一個Kafka消費者和生產者,它可以手動配置生產者和消費者。 在這個例子中,我們將使用Spring Boot使用合理的默認值來配置它們。

下載並安裝Apache Kafka

要下載並安裝Apache Kafka,請閱讀官方文檔( https://kafka.apache.org/quickstart )。 本教程假設服務器使用默認配置啓動,並且沒有更改服務器端口。

Maven的依賴

這個項目中,使用Apache Maven來管理項目依賴關係。 確保以下依賴關係在類路徑中。pom.xml 文件的內容如下所示 -

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.yiibai.spring.kafka</groupId>
    <artifactId>springboot-config</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>https://www.yiibai.com</url>
    <description>Spring Kafka Spring Boot</description>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <!-- testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <defaultGoal>compile</defaultGoal>
    </build>

</project>

整個項目的目錄結構如下所示 -
Spring

使用Spring Boot發送Spring Kafka消息

Spring Boot根據application.yml屬性文件中配置的屬性自動配置並初始化KafkaTemplate。 通過使用[@Service](https://github.com/Service "@Service")註解,使Sender類符合Spring容器的要求來執行自動發現。

Sender.java 的代碼如下所示 -

package com.yiibai.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic.foo}")
    private String topic;

    public void send(String message){
        LOG.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);
    }
}

用Spring Boot接收Kafka消息

ConcurrentKafkaListenerContainerFactoryKafkaMessageListenerContainer bean也由Spring Boot自動配置。 可以選擇使用application.yml屬性文件來配置這些bean。

通過使用[@KafkaListener](https://github.com/KafkaListener "@KafkaListener")來註解一個方法Spring Kafka會自動創建一個消息監聽器容器。

Receiver.java 實現的代碼如下所示 -

package com.yiibai.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class Receiver {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @KafkaListener(topics = "${app.topic.foo}")
    public void receive(@Payload String message,
                        @Headers MessageHeaders headers) {
        LOG.info("received message='{}'", message);
        headers.keySet().forEach(key -> LOG.info("{}: {}", key, headers.get(key)));
    }

}

使用application.yml配置應用程序

Spring Boot會嘗試根據pom.xml文件中指定的依賴關係自動配置應用程序,並設置合理的默認值。這裏還沒有配置任何Consumer,Producer或KafkaTemplate bean,Spring引導將使用spring引導默認值自動配置它們。 這些值可以使用application.yml屬性文件重寫。可以找到更多關於Spring Boot Kafka Properties的信息

還創建了一個在src/main/resources文件夾中的application.yml屬性文件。 這些屬性通過spring引導注入到配置類中。

spring:
  kafka:
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

app:
  topic:
    foo: foo.t

logging:
  level:
    root: WARN
    org.springframework.web: INFO
    com.yiibai: DEBUG

運行應用程序

在運行這個項目程序之前,需要運行 zookeeperkafka ,如下所示 -

啓動zookeeper服務 -

D:\software\kafka_2.12-1.0.1\bin\windows> zookeeper-server-start.bat D:\software\kafka_2.12-1.0.1\config\zookeeper.properties

啓動kafka服務 -

D:\software\kafka_2.12-1.0.1\bin\windows> kafka-server-start.bat D:\software\kafka_2.12-1.0.1\config\server.properties

最後,編寫了一個簡單的Spring Boot應用程序來演示應用程序。使這個演示工作,需要在端口9092上運行的本地主機上的Kafka服務器,這是Kafka的默認配置。

package com.yiibai.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        sender.send("Spring Kafka and Spring Boot Configuration Example");
    }

示例

當運行應用程序時,應該得到以下輸出。


  .   ____          _            __ _ _
 /\\\\ / ___'_ __ _ _(_)_ __  __ _ \\ \\ \\ \\
( ( )\\___ | '_ | '_| | '_ \\/ _` | \\ \\ \\ \\
 \\\\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.0.RELEASE)

2018-03-14 11:22:25.177  INFO 2892 --- [           main] com.yiibai.kafka.SpringKafkaApplication  : Starting SpringKafkaApplication on MY-PC with PID 2892 (F:\\worksp\\spring-kafka\\springboot-config\\target\\classes started by Administrator in F:\\worksp\\spring-kafka\\springboot-config)
2018-03-14 11:22:25.181 DEBUG 2892 --- [           main] com.yiibai.kafka.SpringKafkaApplication  : Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
2018-03-14 11:22:25.182  INFO 2892 --- [           main] com.yiibai.kafka.SpringKafkaApplication  : No active profile set, falling back to default profiles: default
2018-03-14 11:22:26.869  INFO 2892 --- [           main] com.yiibai.kafka.SpringKafkaApplication  : Started SpringKafkaApplication in 2.208 seconds (JVM running for 2.751)
2018-03-14 11:22:26.871  INFO 2892 --- [           main] com.yiibai.kafka.Sender                  : sending message='Spring Kafka and Spring Boot Configuration Example' to topic='foo.t'
... ...
2018-03-14 11:22:36.035  WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 10 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:36.156  WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 7 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:36.163  WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 12 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:36.433  WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 8 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:36.436  WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 14 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:38.559  WARN 2892 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=foo] Error while fetching metadata with correlation id 16 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:38.559  WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:40.028  WARN 2892 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {foo.t=LEADER_NOT_AVAILABLE}
2018-03-14 11:22:56.203  INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver                : received message='Spring Kafka and Spring Boot Configuration Example'
2018-03-14 11:22:56.205  INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver                : kafka_offset: 0
2018-03-14 11:22:56.206  INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver                : kafka_nativeHeaders: RecordHeaders(headers = [], isReadOnly = false)
2018-03-14 11:22:56.206  INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver                : kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@68cba188
2018-03-14 11:22:56.206  INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver                : kafka_timestampType: CREATE_TIME
2018-03-14 11:22:56.206  INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver                : kafka_receivedMessageKey: null
2018-03-14 11:22:56.207  INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver                : kafka_receivedPartitionId: 0
2018-03-14 11:22:56.207  INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver                : kafka_receivedTopic: foo.t
2018-03-14 11:22:56.207  INFO 2892 --- [ntainer#0-0-C-1] com.yiibai.kafka.Receiver                : kafka_receivedTimestamp: 1520997760772