diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java index c43cfa5423..6c00ecd497 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java @@ -90,6 +90,7 @@ * @author Elliot Kennedy * @author Nakul Mishra * @author Pawel Lozinski + * @author Adrian Chlebosz * * @since 2.2 */ @@ -113,7 +114,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean { */ public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property"; - private static final Duration DEFAULT_ADMIN_TIMEOUT = Duration.ofSeconds(10); + public static final int DEFAULT_ADMIN_TIMEOUT = 10; public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000; @@ -157,7 +158,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean { private int[] kafkaPorts; - private Duration adminTimeout = DEFAULT_ADMIN_TIMEOUT; + private Duration adminTimeout = Duration.ofSeconds(DEFAULT_ADMIN_TIMEOUT); private int zkConnectionTimeout = DEFAULT_ZK_CONNECTION_TIMEOUT; @@ -238,33 +239,24 @@ public EmbeddedKafkaBroker kafkaPorts(int... ports) { } /** - * Set an explicit port for the embedded Zookeeper. - * @param port the port. - * @return the {@link EmbeddedKafkaBroker}. + * Set the system property with this name to the list of broker addresses. + * @param brokerListProperty the brokerListProperty to set + * @return this broker. * @since 2.3 */ - public EmbeddedKafkaBroker zkPort(int port) { - this.zkPort = port; + public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) { + this.brokerListProperty = brokerListProperty; return this; } - /** - * Set the timeout in seconds for admin operations (e.g. topic creation, close). - * Default 30 seconds. - * @param adminTimeout the timeout. - * @since 2.2 - */ - public void setAdminTimeout(int adminTimeout) { - this.adminTimeout = Duration.ofSeconds(adminTimeout); - } /** - * Set the system property with this name to the list of broker addresses. - * @param brokerListProperty the brokerListProperty to set - * @return this broker. + * Set an explicit port for the embedded Zookeeper. + * @param port the port. + * @return the {@link EmbeddedKafkaBroker}. * @since 2.3 */ - public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) { - this.brokerListProperty = brokerListProperty; + public EmbeddedKafkaBroker zkPort(int port) { + this.zkPort = port; return this; } @@ -286,6 +278,27 @@ public void setZkPort(int zkPort) { this.zkPort = zkPort; } + /** + * Set the timeout in seconds for admin operations (e.g. topic creation, close). + * @param adminTimeout the timeout. + * @return the {@link EmbeddedKafkaBroker} + * @since 2.8.5 + */ + public EmbeddedKafkaBroker adminTimeout(int adminTimeout) { + this.adminTimeout = Duration.ofSeconds(adminTimeout); + return this; + } + + /** + * Set the timeout in seconds for admin operations (e.g. topic creation, close). + * Default 10 seconds. + * @param adminTimeout the timeout. + * @since 2.2 + */ + public void setAdminTimeout(int adminTimeout) { + this.adminTimeout = Duration.ofSeconds(adminTimeout); + } + /** * Set connection timeout for the client to the embedded Zookeeper. * @param zkConnectionTimeout the connection timeout, diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java index a2a80cb305..a9cc666633 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,7 @@ * @author Gary Russell * @author Artem Bilan * @author Pawel Lozinski + * @author Adrian Chlebosz * * @since 2.3 * @@ -127,7 +128,8 @@ private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) { .zkPort(embedded.zookeeperPort()) .kafkaPorts(ports) .zkConnectionTimeout(embedded.zkConnectionTimeout()) - .zkSessionTimeout(embedded.zkSessionTimeout()); + .zkSessionTimeout(embedded.zkSessionTimeout()) + .adminTimeout(embedded.adminTimeout()); Properties properties = new Properties(); for (String pair : embedded.brokerProperties()) { diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java index 12e0591bb5..3b57204b09 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,6 +60,7 @@ * @author Gary Russell * @author Sergio Lourenco * @author Pawel Lozinski + * @author Adrian Chlebosz * * @since 1.3 * @@ -172,5 +173,12 @@ */ int zkSessionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_SESSION_TIMEOUT; + /** + * Timeout in seconds for admin operations (e.g. topic creation, close). + * @return default {@link EmbeddedKafkaBroker#DEFAULT_ADMIN_TIMEOUT} + * @since 2.8.5 + */ + int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT; + }