Skip to content

GH-2059: expose adminTimeout property in EmbeddedKafka annotation #2194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
* @author Elliot Kennedy
* @author Nakul Mishra
* @author Pawel Lozinski
* @author Adrian Chlebosz
*
* @since 2.2
*/
Expand All @@ -113,7 +114,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
*/
public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";

private static final Duration DEFAULT_ADMIN_TIMEOUT = Duration.ofSeconds(10);
public static final int DEFAULT_ADMIN_TIMEOUT = 10;

public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;

Expand Down Expand Up @@ -157,7 +158,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {

private int[] kafkaPorts;

private Duration adminTimeout = DEFAULT_ADMIN_TIMEOUT;
private Duration adminTimeout = Duration.ofSeconds(DEFAULT_ADMIN_TIMEOUT);

private int zkConnectionTimeout = DEFAULT_ZK_CONNECTION_TIMEOUT;

Expand Down Expand Up @@ -238,33 +239,24 @@ public EmbeddedKafkaBroker kafkaPorts(int... ports) {
}

/**
* Set an explicit port for the embedded Zookeeper.
* @param port the port.
* @return the {@link EmbeddedKafkaBroker}.
* Set the system property with this name to the list of broker addresses.
* @param brokerListProperty the brokerListProperty to set
* @return this broker.
* @since 2.3
*/
public EmbeddedKafkaBroker zkPort(int port) {
this.zkPort = port;
public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
this.brokerListProperty = brokerListProperty;
return this;
}
/**
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
* Default 30 seconds.
* @param adminTimeout the timeout.
* @since 2.2
*/
public void setAdminTimeout(int adminTimeout) {
this.adminTimeout = Duration.ofSeconds(adminTimeout);
}

/**
* Set the system property with this name to the list of broker addresses.
* @param brokerListProperty the brokerListProperty to set
* @return this broker.
* Set an explicit port for the embedded Zookeeper.
* @param port the port.
* @return the {@link EmbeddedKafkaBroker}.
* @since 2.3
*/
public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
this.brokerListProperty = brokerListProperty;
public EmbeddedKafkaBroker zkPort(int port) {
this.zkPort = port;
return this;
}

Expand All @@ -286,6 +278,27 @@ public void setZkPort(int zkPort) {
this.zkPort = zkPort;
}

/**
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
* @param adminTimeout the timeout.
* @return the {@link EmbeddedKafkaBroker}
* @since 2.8.5
*/
public EmbeddedKafkaBroker adminTimeout(int adminTimeout) {
this.adminTimeout = Duration.ofSeconds(adminTimeout);
return this;
}

/**
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
* Default 10 seconds.
* @param adminTimeout the timeout.
* @since 2.2
*/
public void setAdminTimeout(int adminTimeout) {
this.adminTimeout = Duration.ofSeconds(adminTimeout);
}

/**
* Set connection timeout for the client to the embedded Zookeeper.
* @param zkConnectionTimeout the connection timeout,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,6 +51,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Pawel Lozinski
* @author Adrian Chlebosz
*
* @since 2.3
*
Expand Down Expand Up @@ -127,7 +128,8 @@ private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
.zkPort(embedded.zookeeperPort())
.kafkaPorts(ports)
.zkConnectionTimeout(embedded.zkConnectionTimeout())
.zkSessionTimeout(embedded.zkSessionTimeout());
.zkSessionTimeout(embedded.zkSessionTimeout())
.adminTimeout(embedded.adminTimeout());
Properties properties = new Properties();

for (String pair : embedded.brokerProperties()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,6 +60,7 @@
* @author Gary Russell
* @author Sergio Lourenco
* @author Pawel Lozinski
* @author Adrian Chlebosz
*
* @since 1.3
*
Expand Down Expand Up @@ -172,5 +173,12 @@
*/
int zkSessionTimeout() default EmbeddedKafkaBroker.DEFAULT_ZK_SESSION_TIMEOUT;

/**
* Timeout in seconds for admin operations (e.g. topic creation, close).
* @return default {@link EmbeddedKafkaBroker#DEFAULT_ADMIN_TIMEOUT}
* @since 2.8.5
*/
int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT;

}