Spring support and integration of Apache Kafka

1. Introduction

Apache Kafka is a distributed, fault-tolerant stream processing system. In this article, we will introduce Spring's support for Apache Kafka and the level of abstraction provided by the native Kafka Java client Api.

Spring Kafka brings a simple and typical spring template programming model through @ KafkaListener annotation. It also comes with a KafkaTemplate and message driven POJO.

2. Installation and setting

To download and install Kafka, please refer to the official guide. Then you also need to find a connection in POM Add spring Kafka to the XML file:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.7.RELEASE</version>
</dependency>

Create a new Spring Boot sample application and start it with the default configuration.

3. Configure Topics

Previously, we used the command line tool to create topic in Kafka, for example:

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

However, with the introduction of AdminClient in Kafka, we can now create topics programmatically.

Add KafkAdmin bean to Spring with the following code. It will automatically add topic to all beans of NewTopic class:

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
 
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("developlee", 1, (short) 1);
    }
}

4. Message generation

To create a message, first configure the Producer factory and set the policy for creating an instance of Kafka Producer, and then use KafkaTemplate. KafkaTemplate wraps the Producer instance and provides an easy way to send messages to Kafka Topic.

Using a single instance throughout the application context will provide higher performance. Therefore, it is recommended to use a Producer instance. This instance is thread safe, so the KakfaTemplate instance is also thread safe,

4.1. Producer configuration

@Configuration
public class KafkaProducerConfig {
 
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4.2. News release

We use KafkaTemplate to publish messages:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
 
public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

The send API returns the ListenableFuture object. If we want to block the sending thread and get the result of sending the message, we can call the get API of the ListenableFuture object. The thread will wait for the result, but it will slow down the producer.

Kafka is a fast stream processing platform. Therefore, it is best to process the result asynchronously so that subsequent messages do not have to wait for the result of the previous message. We can do this through callback:

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = 
      kafkaTemplate.send(topicName, message);
    
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
 
        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" 
              + message + "] due to : " + ex.getMessage());
        }
    });
}

5. Message consumption

5.1. Consumer configuration

For consumption messages, we need to configure a ConsumerFactory and a KafkaListenerContainerFactory.

Once these beans are available in the Spring Bean factory, POJO based consumers can be configured using the @ KafkaListener annotation.

The @ EnableKafka annotation needs to be added to the configuration class to detect the @ KafkaListener annotation on Spring managed bean s:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
 
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

5.2. Message consumption

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

Multiple listener s can be implemented for a topic, and each topic has a different group Id. In addition, a consumer can listen to messages from different topics:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring also supports retrieving one or more message headers using the @ Header annotation in listener:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

5.3. Consume messages from specific partitions

Notice that we created topic "develope" using only one partition. However, for topics with multiple partitions, @ KafkaListener can explicitly subscribe to specific partitions with initial offset topic:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

Since initialOffset has been sent to partition 0 in the listener, all messages previously consumed from partition 0 and partition 3 will be reused each time the listener is initialized. If it is not necessary to set the offset, we can use the partitions attribute of the @ TopicPartition annotation to set only the partitions without offset:

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. Add message filter for Listener

By adding custom filters, you can configure the listener to use specific types of messages. This can be done by setting RecordFilterStrategy to KafkaListenerContainerFactory:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {
 
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

You can then configure listener to use this container factory:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

In this listener, all messages matching the filter will be discarded.

6. Custom message converter

So far, we have only discussed strings as objects for sending and receiving messages. However, we can also send and receive custom Java objects. This requires configuring the appropriate serializer in ProducerFactory and deserializer in ConsumerFactory.

Let's look at a simple bean and send it as a message:

public class Greeting {
 
    private String msg;
    private String name;
 
    // standard getters, setters and constructor
}

6.1. Production custom message

In this example, we will use JsonSerializer. Let's look at the codes of ProducerFactory and KafkaTemplate:

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}
 
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

The new KafkaTemplate can be used to send Greeting messages:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. Consume custom messages

Similarly, we modify the ConsumerFactory and KafkaListenerContainerFactory to properly deserialize the Greeting message:

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {
 
    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

The spring Kafka JSON serializer and deserializer use the Jackson library, which is an optional maven dependency of the spring Kafka project. We also add it to POM XML file:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

It is recommended not to use the latest version of Jackson, but POM The version of spring Kafka in the XML file.
Finally, we need to write a listener to consume the Greeting message:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

7. Conclusion

In this article, we introduced the basics of Apache Kafka and Spring integration, and briefly introduced the classes used to send and receive messages.
The complete source code of this article can be found in Found on GitHub Before executing the code, make sure that the server is running Kafka.
If you think the article is good, remember to pay attention to the official account: the big guy outside the pot
Liu Yishou's blog

Tags: Spring

Posted by rednaxel on Fri, 06 May 2022 07:40:31 +0300