Skip to content

KafkaTemplate receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) can have nulls #2240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
gurpiarbassi opened this issue Apr 21, 2022 · 3 comments · Fixed by #2242

Comments

@gurpiarbassi
Copy link
Contributor

gurpiarbassi commented Apr 21, 2022

In what version(s) of Spring for Apache Kafka are you seeing this issue?

This is on main branch

Describe the bug

KafkaTemplate has a method:

      @Override
  	@Nullable
	public ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
		Properties props = oneOnly();
		try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
			TopicPartition topicPartition = new TopicPartition(topic, partition);
			return receiveOne(topicPartition, offset, pollTimeout, consumer);
		}
	}

This method correctly states that the return value could be null via @nullable and the javadoc also states the same.

However there is another method:

@Override
	public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
		Properties props = oneOnly();
		Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
		try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
			requested.forEach(tpo -> {
				ConsumerRecord<K, V> one = receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
				records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>()).add(one);
			});
			return new ConsumerRecords<>(records);
		}
	}

This method returns a ConsumerRecords object however the ConsumerRecord itself could be null. Therefore you end up with a collection with null inside it.

To Reproduce
I reproduced it by calling receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) on a brand new topic that was empty. The javadoc doesn't mention the elements could be null.

Could we do a null check before add the ConsumerRecord to the ConsumerRecords object?

Furthermore tpo.getOffset() can also return null which leads to an auto unboxing issue.

@artembilan
Copy link
Member

I think this is correct observation.

Would you mind to contribute the fix: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc ?

Thanks

@gurpiarbassi
Copy link
Contributor Author

gurpiarbassi commented Apr 21, 2022

I think this is correct observation.

Would you mind to contribute the fix: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc ?

Thanks

Sure. I can look into this. What should be the expected behaviour when tpo.getOffset() returns null? Should we default to 0?

The doc on KafkaOperations states

Receive a multiple records with the default poll timeout (5 seconds). Only
	 * absolute, positive offsets are supported.

So maybe it should throw a KafkaException if tpo.getOffset() is null

@garyrussell
Copy link
Contributor

I think a null (or negative) offset should throw an exception; a null return should be skipped from the list - still return an entry for the TP, but with an empty list (if that's the result) - assuming ConsumerRecords allows that; if not skip the TP entirely.

gurpiarbassi added a commit to gurpiarbassi/spring-kafka that referenced this issue Apr 21, 2022
- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
gurpiarbassi added a commit to gurpiarbassi/spring-kafka that referenced this issue Apr 21, 2022
- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
gurpiarbassi added a commit to gurpiarbassi/spring-kafka that referenced this issue Apr 21, 2022
- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
gurpiarbassi added a commit to gurpiarbassi/spring-kafka that referenced this issue Apr 21, 2022
Fixes spring-projectsGH-2240 (spring-projects#2240)

- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
gurpiarbassi added a commit to gurpiarbassi/spring-kafka that referenced this issue Apr 21, 2022
Fixes spring-projectsGH-2240 (spring-projects#2240)

- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
gurpiarbassi added a commit to gurpiarbassi/spring-kafka that referenced this issue Apr 26, 2022
Fixes spring-projectsGH-2240 (spring-projects#2240)

- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
gurpiarbassi added a commit to gurpiarbassi/spring-kafka that referenced this issue Apr 26, 2022
Fixes spring-projectsGH-2240 (spring-projects#2240)

- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
garyrussell pushed a commit that referenced this issue Apr 27, 2022
Fixes GH-2240 (#2240)

- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
garyrussell pushed a commit that referenced this issue Apr 27, 2022
Fixes GH-2240 (#2240)

- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
garyrussell pushed a commit that referenced this issue Apr 27, 2022
Fixes GH-2240 (#2240)

- Ensures that we dont get ConsumerRecords with nulls in the list
- Ensures that we do not allow null offsets
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants