Skip to content

KafkaListener doesnt respect containerFactory after upgrading from spring kafka 2.7.9 to 2.8.0 #2050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
WtfJoke opened this issue Dec 17, 2021 · 3 comments · Fixed by #2051
Closed
Assignees
Milestone

Comments

@WtfJoke
Copy link

WtfJoke commented Dec 17, 2021

Affects Version(s): 2.8.0

After upgrading from Spring Boot 2.5.6 to 2.6.1 we noticed, that the @KafkaListener doesnt respect the containerFactory setting anymore.

In the particular case, we werent able to consume non schema registry topics anymore as the ContainerFactory with the overwritten Deserializer werent used and instead the Default Deserializer (AvroDeSerializer) was used.

I werent able to pinpoint the issue exactly, but was able to make a small demo project with which you are able to reproduce the issue.

  1. Clone https://github.com/WtfJoke/springkafka28issue
  2. Start app
  3. curl -X POST localhost:8080/show
  4. Exception thrown (Unknown Magic Byte by Avro Serializer, allthough JsonSerializer is specified in KafkaConsumerConfig#showPublishedConsumerFactory which is used by the @KafkaListener annotation

If you change spring boot version to 2.5.7 (or switch to working branch), everything works as expected and in step 4. instead of an exception, the show is logged to console.

I hope my description is clear enough, to reproduce the issue.

@artembilan
Copy link
Member

I think the problem comes from this fix: 852c447.

So, since you have that in your config:

spring.deserializer.key.delegate.class: 'org.apache.kafka.common.serialization.StringDeserializer'
spring.deserializer.value.delegate.class: 'io.confluent.kafka.serializers.KafkaAvroDeserializer'

The ErrorHandlingDeserializer take them into account from its configure():

	public void configure(Map<String, ?> configs, boolean isKey) {
		setupDelegate(configs, isKey ? KEY_DESERIALIZER_CLASS : VALUE_DESERIALIZER_CLASS);
		Assert.state(this.delegate != null, "No delegate deserializer configured");
		this.delegate.configure(configs, isKey);
		this.isForKey = isKey;
		setupFunction(configs, isKey ? KEY_FUNCTION : VALUE_FUNCTION);
	}

We probably just need to check for this.delegate != null before calling that setupDelegate() and so on for failedDeserializationFunction.

As a workaround I suggestion you to extend that ErrorHandlingDeserializer and override its configure(Map<String, ?> configs, boolean isKey) to empty body:

    @Bean
    fun showPublishedConsumerFactory(): ConsumerFactory<String, Show> {
        val deserializer =
            object : ErrorHandlingDeserializer<Show>(JsonDeserializer(Show::class.java, false)) {
                override fun configure(configs: Map<String, *>, isKey: Boolean) {

                }
            }
        return DefaultKafkaConsumerFactory(
            consumerConfigs(), ErrorHandlingDeserializer(StringDeserializer()),
            deserializer
        )
    }

@artembilan artembilan added this to the 2.8.1 milestone Dec 17, 2021
@artembilan artembilan self-assigned this Dec 17, 2021
artembilan added a commit to artembilan/spring-kafka that referenced this issue Dec 17, 2021
Fixes spring-projects#2050

The application can have several consumer factories when one fully relies
on the configuration properties for its deserializers and other
configures them programmatically.
The consumer factory now calls `configure()` on the `Deserializer`
independently of its origins.
See spring-projects#1879
In this case the `ErrorHandlingDeserializer` consults
`spring.deserializer.key.delegate.class` or `spring.deserializer.value.delegate.class`
for its delegate overriding provided explicitly programmatically

* Fix `ErrorHandlingDeserializer` to check it `delegate` and `failedDeserializationFunction`
for null before taking their values from the respective configuration properties
* Add `spring.deserializer.value.delegate.class` property to `testJsonSerDeIgnoreTypeHeadersInbound()`
configuration to ensure that it does not override an explicit `JsonDeserializer` delegate
* Fix warning in the `EmbeddedKafkaBroker` about `this.` prefix for a `static logger` property
@artembilan
Copy link
Member

I proposed the fix here: #2051

@WtfJoke
Copy link
Author

WtfJoke commented Dec 20, 2021

Thank you @artembilan for your fast response + confirmation + proposed fix.
Just wanted to let you know that the proposed workaround works :)

garyrussell pushed a commit that referenced this issue Dec 20, 2021
Fixes #2050

The application can have several consumer factories when one fully relies
on the configuration properties for its deserializers and other
configures them programmatically.
The consumer factory now calls `configure()` on the `Deserializer`
independently of its origins.
See #1879
In this case the `ErrorHandlingDeserializer` consults
`spring.deserializer.key.delegate.class` or `spring.deserializer.value.delegate.class`
for its delegate overriding provided explicitly programmatically

* Fix `ErrorHandlingDeserializer` to check it `delegate` and `failedDeserializationFunction`
for null before taking their values from the respective configuration properties
* Add `spring.deserializer.value.delegate.class` property to `testJsonSerDeIgnoreTypeHeadersInbound()`
configuration to ensure that it does not override an explicit `JsonDeserializer` delegate
* Fix warning in the `EmbeddedKafkaBroker` about `this.` prefix for a `static logger` property
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants