@KafkaListener | Spring Boot Example
@KafkaListener | Interview Questions
@KafkaListener | How does Kafka consumer work?
@KafkaListener | KafkaMessageListenerContainer vs ConcurrentMessageListenerContainer?
KafkaConsumer.java
@Component public class KafkaConsumer { @KafkaListener(id = "foo", topics = "myTopic") public void listen(String data) { System.out.println(data); } }
@KafkaListener allows a method to consume messages from Kafka topic(s).
@KafkaListener designates a method as a listener in a KafkaMessageListenerContainer.
A KafkaMessageListenerContainer is how Spring Boot connects and polls records from Kafka under the hood.
Remember that the @Component annotation tells Spring Boot to register our KafkaConsumer class as a managed Spring Bean.
KafkaConsumerConfig.java
@Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092"); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
This class configures the Kafka consumer.
The @Configuration annotation tells Spring Boot to generate bean definitions via @Bean annotation.
Any class annotated with @Bean will be registered by the Spring container and used at runtime.
The ConsumerFactory class is a Spring Kafka class for configuring the underlying ConcurrentMessageListenerContainer.
The ConcurrentMessageListenerContainer is a collection of KafkaMessageListenerContainers.
Remember that KafkaMessageListenerContainer implements the MessageListener.
Remember that a MessageListener is created by @KafkaListener annotation.
KafkaDemoApplication.java
@SpringBootApplication public class KafkaDemoApplication { public static void main(String[] args) { SpringApplication.run(KafkaDemoApplication.class, args); } }
@SpringBootApplication performs component scans and registers all of the configuration beans defined above.
This is how you run the Spring Boot application.
This is how you start the Kafka listener.
build.gradle
dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka' }
These are the two dependencies required to run the application.
How does @KafkaListener work?
The @KafkaListener is an annotation that marks a method or class as the target of incoming messages.
By using @KafkaListener, you abstract away the need to configure the underlying KafkaMessageListenerContainer.
The KafkaListenerEndpointRegistry
The Spring Kafka library uses a BeanPostProcessor to register the target methods as KafkaEndpoints in a KafkaListenerEndpointRegistry.
The KafkaListenerEndpointRegistry class creates the underlying MessageListenerContainer instances.
While the KafkaListenerEndpointRegistry is registered in the Spring container, its listener containers are not. Instead, these listeners are registered with the KafkaListenerEndpointRegistry itself.
The KafkaListenerEndpointRegistry manages the lifecycle of the containers.
What are the @KafkaListener Annotation Properties?
- id
- containerFactory
- topics
- topicPattern
- topicPartitions
- containerGroup
- errorHandler
- groupId
- idIsGroup
- clientIdPrefix
- beanRef
- concurrency
- autoStartup
- properties
- splitIterables
- contentTypeConverter
- batch
You can set these properties like this...
@Component public class Listener { @KafkaListener( id = "foo", topics = "myTopic", properties = {"bootstrap.server=localhost:9092"} ) public void listen(String data) { System.out.println(data); } }
What is DefaultKafkaConsumerFactory?
The DefaultKafkaConsumerFactory provides the default configuration for creating consumer instances.
The DefaultKafkaConsumerFactory class implements the ConsumerFactory interface.
Remember that the @KafkaListener annotation ultimately generates listener containers. These listener containers ultimately create Kafka consumer instances.
These consumers need to be configured with appropriate topics, consumer groups, bootstrap servers, etc. The DefaultKafkaConsumerFactory provides such details when creating these consumers via Spring Boot.
Still confused? Check out What is DefaultKafkaConsumerFactory?
What is KafkaListenerContainerFactory?
KafkaListenerContainerFactory is an interface.
The ConcurrentKafkaListenerContainerFactory class is an implementation of this interface. This is the default container factory used with Spring Kafka.
The ConcurrentKafkaListenerContainerFactory class creates the ConcurrentMessageListenerContainer(s) which creates a number of KafkaMessageListenerContainers (based on it's concurrency property).
These KafkaMessageListenerContainer instances are the same instances generated by @KafkaListener.
Still confused? Check out What is KafkaListenerContainerFactory?