Skip to content

Commit 47cfae9

Browse files
committed
Close volatile resource in some Kafka tests
1 parent 96d3b00 commit 47cfae9

File tree

3 files changed

+15
-1
lines changed

3 files changed

+15
-1
lines changed

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
189189
assertThat(gateway.isPaused()).isFalse();
190190

191191
gateway.stop();
192+
consumer.close();
193+
pf.reset();
192194
}
193195

194196
@Test
@@ -268,6 +270,8 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
268270
assertThat(record).has(value("ERROR"));
269271

270272
gateway.stop();
273+
consumer.close();
274+
pf.reset();
271275
}
272276

273277
@Test
@@ -352,6 +356,8 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
352356
assertThat(record).has(value("ERROR"));
353357

354358
gateway.stop();
359+
consumer.close();
360+
pf.reset();
355361
}
356362

357363
@Test
@@ -409,6 +415,7 @@ public <T, E extends Throwable> void onError(RetryContext context, RetryCallback
409415

410416
gateway.stop();
411417
consumer.close();
418+
pf.reset();
412419
}
413420

414421
}

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
215215
assertThat(((ConversionException) error.getPayload()).getRecord()).isNotNull();
216216

217217
adapter.stop();
218+
pf.reset();
218219
}
219220

220221
@Test
@@ -277,6 +278,7 @@ protected boolean doSend(Message<?> message, long timeout) {
277278
assertThat(receivedMessageHistory.get().toString()).isEqualTo("myNullChannel");
278279

279280
adapter.stop();
281+
pf.reset();
280282
}
281283

282284

@@ -387,6 +389,7 @@ protected boolean doSend(Message<?> message, long timeout) {
387389
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(1);
388390

389391
adapter.stop();
392+
pf.reset();
390393
}
391394

392395
@Test
@@ -474,7 +477,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
474477
assertThat(((ConversionException) error.getPayload()).getMessage())
475478
.contains("Failed to convert to message");
476479
assertThat(((ConversionException) error.getPayload()).getRecords()).hasSize(2);
480+
477481
adapter.stop();
482+
pf.reset();
478483
}
479484

480485
@Test
@@ -517,6 +522,7 @@ void testInboundJson() {
517522
assertThat(received.getPayload()).isInstanceOf(Map.class);
518523

519524
adapter.stop();
525+
pf.reset();
520526
}
521527

522528
@Test
@@ -564,6 +570,7 @@ void testInboundJsonWithPayload() {
564570
assertThat(received.getPayload()).isEqualTo(new Foo("baz"));
565571

566572
adapter.stop();
573+
pf.reset();
567574
}
568575

569576
@SuppressWarnings({ "unchecked", "rawtypes" })

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
119119
assertThat(received).isNull();
120120
assertThat(KafkaTestUtils.getPropertyValue(source, "consumer.fetcher.minBytes")).isEqualTo(2);
121121
source.destroy();
122-
producerFactory.destroy();
122+
template.destroy();
123123
}
124124

125125
}

0 commit comments

Comments
 (0)