Skip to content

Commit 0eb762d

Browse files
committed
Let producer confirm timeout be 0
Fixes #20
1 parent 596beeb commit 0eb762d

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public ProducerBuilder confirmTimeout(Duration timeout) {
109109
if (timeout.isNegative()) {
110110
throw new IllegalArgumentException("the confirm timeout cannot be negative");
111111
}
112-
if (timeout.compareTo(Duration.ofSeconds(1)) < 0) {
112+
if (timeout.compareTo(Duration.ofSeconds(1)) < 0 && !timeout.isZero()) {
113113
throw new IllegalArgumentException("the confirm timeout cannot be less than 1 second");
114114
}
115115
this.confirmTimeout = timeout;

src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java

+15
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static org.assertj.core.api.Assertions.assertThat;
17+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1718
import static org.mockito.ArgumentMatchers.any;
1819
import static org.mockito.ArgumentMatchers.anyByte;
1920
import static org.mockito.ArgumentMatchers.anyInt;
@@ -44,6 +45,7 @@
4445
import java.util.concurrent.atomic.AtomicInteger;
4546
import java.util.function.ToLongFunction;
4647
import java.util.stream.IntStream;
48+
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
4749
import org.junit.jupiter.api.AfterEach;
4850
import org.junit.jupiter.api.BeforeEach;
4951
import org.junit.jupiter.params.ParameterizedTest;
@@ -282,4 +284,17 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
282284
assertThat(interruptedLatch.await(5, TimeUnit.SECONDS)).isTrue();
283285
assertThat(confirmCalled).isFalse();
284286
}
287+
288+
@ParameterizedTest
289+
@CsvSource({"-1,false", "0,true", "500,false", "1000,true", "5000,true"})
290+
void confirmTimeoutCanZeroAndLongerThanOneSecond(int timeoutInMs, boolean ok) throws Throwable {
291+
Duration timeout = Duration.ofMillis(timeoutInMs);
292+
StreamProducerBuilder builder = new StreamProducerBuilder(env);
293+
ThrowingCallable call = () -> builder.confirmTimeout(timeout);
294+
if (ok) {
295+
call.call();
296+
} else {
297+
assertThatThrownBy(call).isInstanceOf(IllegalArgumentException.class);
298+
}
299+
}
285300
}

0 commit comments

Comments
 (0)