diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java index b16de24b0a..8693180a5b 100644 --- a/src/main/java/com/rabbitmq/stream/StreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java @@ -102,6 +102,15 @@ public interface StreamCreator { */ StreamCreator initialMemberCount(int initialMemberCount); + /** + * Set an argument for the stream creation. + * + * @param key argument key + * @param value argument value + * @return this creator instance + */ + StreamCreator argument(String key, String value); + /** * Configure the super stream to create. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index aa27f430fa..a86a6881a1 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -2753,7 +2753,11 @@ public StreamParametersBuilder initialMemberCount(int initialMemberCount) { } public StreamParametersBuilder put(String key, String value) { - parameters.put(key, value); + if (value == null) { + parameters.remove(key); + } else { + parameters.put(key, value); + } return this; } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java index e639240cde..1a71831418 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java @@ -92,6 +92,12 @@ public StreamCreator initialMemberCount(int initialMemberCount) { return this; } + @Override + public StreamCreator argument(String key, String value) { + streamParametersBuilder.put(key, value); + return this; + } + @Override public SuperStreamConfiguration superStream() { if (this.superStreamConfiguration == null) {