Issue
What is need
I'm writing an application (Spring + Kotlin) that takes information with Kafka. If I set autoStartup = "true" when declaring a @KafkaListener then the app works fine but only if broker is available. When the broker is unavailable application crashes on start. It's undesirable behavior. The application must work and perform other functions.
What I tried to do
For the escape of crashing application on start somebody on this site in another topic advised setting autoStartup = "false" when declaring a @KafkaListener. And it really helped to prevent crash on start. But now I cannot successfully start KafkaListener manually. In other examples I saw auto wiring of KafkaListenerEndpointRegistry, but when I trying to do it:
@Service
class KafkaConsumer @Autowired constructor(
private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {
IntelliJ Idea warns:
Could not autowire. No beans of 'KafkaListenerEndpointRegistry' type found.
When I try to use KafkaListenerEndpointRegistry without autowiring and perform this code:
@Service
class KafkaConsumer {
private val logger = LoggerFactory.getLogger(this::class.java)
private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()
@Scheduled(fixedDelay = 10000)
fun startCpguListener(){
val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
if (!container.isRunning)
try {
logger.info("Kafka Consumer is not running. Trying to start...")
container.start()
} catch (e: Exception){
logger.error(e.message)
}
}
@KafkaListener(
id = "consumer1",
topics = ["cpgdb.public.user"],
autoStartup = "false"
)
private fun listen(it: ConsumerRecord<JsonNode, JsonNode>, qwe: Consumer<Any, Any>){
val pay = it.value().get("payload")
val after = pay.get("after")
val id = after["id"].asInt()
val receivedUser = CpguUser(
id = id,
name = after["name"].asText()
)
logger.info("received user with id = $id")
}
}
}
kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
always return null. I guess it's because I didn't auto wire kafkaListenerEndpointRegistry. How can I do it? Or if exist another solution of my answer I'll be appreciative any help! Thanks!
There is Kafka config:
@Configuration
@EnableConfigurationProperties(KafkaProperties::class)
class KafkaConfiguration(private val props: KafkaProperties) {
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
factory.consumerFactory = consumerFactory()
factory.setConcurrency(1)
factory.setMessageConverter(MessagingMessageConverter())
factory.setStatefulRetry(true)
val retryTemplate = RetryTemplate()
retryTemplate.setRetryPolicy(AlwaysRetryPolicy())
retryTemplate.setBackOffPolicy(ExponentialBackOffPolicy())
factory.setRetryTemplate(retryTemplate)
val handler = SeekToCurrentErrorHandler()
handler.isAckAfterHandle = false
factory.setErrorHandler(handler)
factory.containerProperties.isMissingTopicsFatal = false
return factory
}
@Bean
fun consumerFactory(): ConsumerFactory<Any, Any> {
return DefaultKafkaConsumerFactory(consumerConfigs())
}
@Bean
fun consumerConfigs(): Map<String, Any> {
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to props.bootstrap.address,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(MonitoringConsumerInterceptor::class.java),
ConsumerConfig.CLIENT_ID_CONFIG to props.receiver.clientId,
ConsumerConfig.GROUP_ID_CONFIG to props.receiver.groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true
)
}
}
- spring boot version: 2.3.0
- spring-kafka version: 2.5.3
- kafka-clients version: 2.5.0
Solution
Just ignore IntelliJ's warning about the auto wiring; the bean does exist; it's just that IntelliJ can't detect it.
Answered By - Gary Russell
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.