Skip to content

Commit cf954d7

Browse files
GH-1828: Use consumer record's partition for @RetryableTopic (#1829)
* GH-1828: Add tests for RetryableTopic with autoCreate=false Signed-off-by: Deepesh <[email protected]> * GH-1828: Update DeadLetterPublishingRecovererFactory#resolveDestination - Extract out a protected method to `resolveTopicPartition` for destination DLT - Set the partition to the same partition as the original record * GH-1828: Remove un-necessary mockito stubbing from tests * GH-1828: Add javadoc for DeadLetterPublishingRecovererFactory.resolveTopicPartition
1 parent 4285424 commit cf954d7

File tree

3 files changed

+374
-6
lines changed

3 files changed

+374
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,24 @@ private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e)
115115

116116
return nextDestination.isNoOpsTopic()
117117
? null
118-
: new TopicPartition(nextDestination.getDestinationName(),
119-
cr.partition() % nextDestination.getDestinationPartitions());
118+
: resolveTopicPartition(cr, nextDestination);
119+
}
120+
121+
/**
122+
* Creates and returns the {@link TopicPartition}, where the original record should be forwarded.
123+
* By default, it will use the partition same as original record's partition, in the next destination topic.
124+
*
125+
* <p>{@link DeadLetterPublishingRecoverer#checkPartition} has logic to check whether that partition exists,
126+
* and if it doesn't it sets -1, to allow the Producer itself to assign a partition to the record.</p>
127+
*
128+
* <p>Subclasses can inherit from this method to override the implementation, if necessary.</p>
129+
*
130+
* @param cr The original {@link ConsumerRecord}, which is to be forwarded to DLT
131+
* @param nextDestination The next {@link DestinationTopic}, where the consumerRecord is to be forwarded
132+
* @return An instance of {@link TopicPartition}, specifying the topic and partition, where the cr is to be sent
133+
*/
134+
protected TopicPartition resolveTopicPartition(final ConsumerRecord<?, ?> cr, final DestinationTopic nextDestination) {
135+
return new TopicPartition(nextDestination.getDestinationName(), cr.partition());
120136
}
121137

122138
private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ void shouldSendMessage() {
102102
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 1, e, failureTimestamp)).willReturn(destinationTopic);
103103
given(destinationTopic.isNoOpsTopic()).willReturn(false);
104104
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
105-
given(destinationTopic.getDestinationPartitions()).willReturn(3);
106105
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
107106
given(destinationTopic.getDestinationDelay()).willReturn(1000L);
108107
willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations();
@@ -145,7 +144,6 @@ void shouldIncreaseAttempts() {
145144
.willReturn(destinationTopic);
146145
given(destinationTopic.isNoOpsTopic()).willReturn(false);
147146
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
148-
given(destinationTopic.getDestinationPartitions()).willReturn(1);
149147
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
150148
willReturn(kafkaOperations).given(destinationTopic).getKafkaOperations();
151149
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);
@@ -175,7 +173,6 @@ void shouldAddOriginalTimestampHeader() {
175173
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 1, e, originalTimestamp)).willReturn(destinationTopic);
176174
given(destinationTopic.isNoOpsTopic()).willReturn(false);
177175
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
178-
given(destinationTopic.getDestinationPartitions()).willReturn(1);
179176
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
180177
willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations();
181178
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);
@@ -206,7 +203,6 @@ void shouldNotReplaceOriginalTimestampHeader() {
206203
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 1, e, timestamp)).willReturn(destinationTopic);
207204
given(destinationTopic.isNoOpsTopic()).willReturn(false);
208205
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
209-
given(destinationTopic.getDestinationPartitions()).willReturn(1);
210206
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
211207
willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations();
212208
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);

0 commit comments

Comments
 (0)