From 187d639ffa47145f860e43fc6e13890caad6cb28 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Sun, 27 Mar 2022 21:50:33 +0200 Subject: [PATCH 1/3] GH-2059: expose adminTimeout property in EmbeddedKafka annotation --- .../kafka/test/EmbeddedKafkaBroker.java | 55 ++++++++++++------- .../condition/EmbeddedKafkaCondition.java | 4 +- .../kafka/test/context/EmbeddedKafka.java | 9 +++ 3 files changed, 46 insertions(+), 22 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java index c43cfa5423..fa3951d3b7 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java @@ -90,6 +90,7 @@ * @author Elliot Kennedy * @author Nakul Mishra * @author Pawel Lozinski + * @author Adrian Chlebosz * * @since 2.2 */ @@ -113,7 +114,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean { */ public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property"; - private static final Duration DEFAULT_ADMIN_TIMEOUT = Duration.ofSeconds(10); + public static final int DEFAULT_ADMIN_TIMEOUT = 10; public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000; @@ -157,7 +158,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean { private int[] kafkaPorts; - private Duration adminTimeout = DEFAULT_ADMIN_TIMEOUT; + private Duration adminTimeout = Duration.ofSeconds(DEFAULT_ADMIN_TIMEOUT); private int zkConnectionTimeout = DEFAULT_ZK_CONNECTION_TIMEOUT; @@ -238,33 +239,24 @@ public EmbeddedKafkaBroker kafkaPorts(int... ports) { } /** - * Set an explicit port for the embedded Zookeeper. - * @param port the port. - * @return the {@link EmbeddedKafkaBroker}. + * Set the system property with this name to the list of broker addresses. + * @param brokerListProperty the brokerListProperty to set + * @return this broker. * @since 2.3 */ - public EmbeddedKafkaBroker zkPort(int port) { - this.zkPort = port; + public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) { + this.brokerListProperty = brokerListProperty; return this; } - /** - * Set the timeout in seconds for admin operations (e.g. topic creation, close). - * Default 30 seconds. - * @param adminTimeout the timeout. - * @since 2.2 - */ - public void setAdminTimeout(int adminTimeout) { - this.adminTimeout = Duration.ofSeconds(adminTimeout); - } /** - * Set the system property with this name to the list of broker addresses. - * @param brokerListProperty the brokerListProperty to set - * @return this broker. + * Set an explicit port for the embedded Zookeeper. + * @param port the port. + * @return the {@link EmbeddedKafkaBroker}. * @since 2.3 */ - public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) { - this.brokerListProperty = brokerListProperty; + public EmbeddedKafkaBroker zkPort(int port) { + this.zkPort = port; return this; } @@ -286,6 +278,27 @@ public void setZkPort(int zkPort) { this.zkPort = zkPort; } + /** + * Set the timeout in seconds for admin operations (e.g. topic creation, close). + * @param adminTimeout the timeout. + * @return the {@link EmbeddedKafkaBroker} + * @since 3.0 + */ + public EmbeddedKafkaBroker adminTimeout(int adminTimeout) { + this.adminTimeout = Duration.ofSeconds(adminTimeout); + return this; + } + + /** + * Set the timeout in seconds for admin operations (e.g. topic creation, close). + * Default 10 seconds. + * @param adminTimeout the timeout. + * @since 2.2 + */ + public void setAdminTimeout(int adminTimeout) { + this.adminTimeout = Duration.ofSeconds(adminTimeout); + } + /** * Set connection timeout for the client to the embedded Zookeeper. * @param zkConnectionTimeout the connection timeout, diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java index a2a80cb305..1777204e79 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java @@ -51,6 +51,7 @@ * @author Gary Russell * @author Artem Bilan * @author Pawel Lozinski + * @author Adrian Chlebosz * * @since 2.3 * @@ -127,7 +128,8 @@ private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) { .zkPort(embedded.zookeeperPort()) .kafkaPorts(ports) .zkConnectionTimeout(embedded.zkConnectionTimeout()) - .zkSessionTimeout(embedded.zkSessionTimeout()); + .zkSessionTimeout(embedded.zkSessionTimeout()) + .adminTimeout(embedded.adminTimeout()); Properties properties = new Properties(); for (String pair : embedded.brokerProperties()) { diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java index 12e0591bb5..8955ed4a98 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java @@ -22,6 +22,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.time.Duration; import org.junit.jupiter.api.extension.ExtendWith; @@ -60,6 +61,7 @@ * @author Gary Russell * @author Sergio Lourenco * @author Pawel Lozinski + * @author Adrian Chlebosz * * @since 1.3 * @@ -172,5 +174,12 @@ */ int zkSessionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_SESSION_TIMEOUT; + /** + * Timeout in seconds for admin operations (e.g. topic creation, close). + * @return default {@link EmbeddedKafkaBroker#DEFAULT_ADMIN_TIMEOUT} + * @since 3.0 + */ + int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT; + } From a7084c4d546d5516241f1a27cb46318dd9ec78b7 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Tue, 29 Mar 2022 11:43:07 +0200 Subject: [PATCH 2/3] GH-2059: remove unused import --- .../kafka/test/condition/EmbeddedKafkaCondition.java | 2 +- .../org/springframework/kafka/test/context/EmbeddedKafka.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java index 1777204e79..a9cc666633 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java index 8955ed4a98..5a18a9f31c 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import java.time.Duration; import org.junit.jupiter.api.extension.ExtendWith; From b4fda65f4395752b500d2d81de8d90afdd6214f3 Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Tue, 29 Mar 2022 17:13:01 +0200 Subject: [PATCH 3/3] GH-2059: change since version param in added methods Resolves https://github.com/spring-projects/spring-kafka/issues/2059 --- .../org/springframework/kafka/test/EmbeddedKafkaBroker.java | 2 +- .../org/springframework/kafka/test/context/EmbeddedKafka.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java index fa3951d3b7..6c00ecd497 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java @@ -282,7 +282,7 @@ public void setZkPort(int zkPort) { * Set the timeout in seconds for admin operations (e.g. topic creation, close). * @param adminTimeout the timeout. * @return the {@link EmbeddedKafkaBroker} - * @since 3.0 + * @since 2.8.5 */ public EmbeddedKafkaBroker adminTimeout(int adminTimeout) { this.adminTimeout = Duration.ofSeconds(adminTimeout); diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java index 5a18a9f31c..3b57204b09 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java @@ -176,7 +176,7 @@ /** * Timeout in seconds for admin operations (e.g. topic creation, close). * @return default {@link EmbeddedKafkaBroker#DEFAULT_ADMIN_TIMEOUT} - * @since 3.0 + * @since 2.8.5 */ int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT;