Skip to content

Commit 7e51a68

Browse files
committed
Limit segment size to 3 GB
Fixes #132
1 parent 140afd5 commit 7e51a68

File tree

7 files changed

+109
-9
lines changed

7 files changed

+109
-9
lines changed

src/main/java/com/rabbitmq/stream/ByteCapacity.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -16,12 +16,13 @@
1616
import java.util.Collections;
1717
import java.util.HashMap;
1818
import java.util.Map;
19+
import java.util.Objects;
1920
import java.util.function.BiFunction;
2021
import java.util.regex.Matcher;
2122
import java.util.regex.Pattern;
2223

2324
/** API to easily configure byte capacities. */
24-
public class ByteCapacity {
25+
public class ByteCapacity implements Comparable<ByteCapacity> {
2526

2627
private static final String UNIT_MB = "mb";
2728
private static final int KILOBYTES_MULTIPLIER = 1000;
@@ -123,8 +124,30 @@ public static ByteCapacity from(String value) {
123124
}
124125
}
125126

127+
@Override
128+
public boolean equals(Object o) {
129+
if (this == o) {
130+
return true;
131+
}
132+
if (o == null || getClass() != o.getClass()) {
133+
return false;
134+
}
135+
ByteCapacity that = (ByteCapacity) o;
136+
return bytes == that.bytes;
137+
}
138+
139+
@Override
140+
public int hashCode() {
141+
return Objects.hash(bytes);
142+
}
143+
126144
@Override
127145
public String toString() {
128146
return this.input;
129147
}
148+
149+
@Override
150+
public int compareTo(ByteCapacity other) {
151+
return Long.compare(this.bytes, other.bytes);
152+
}
130153
}

src/main/java/com/rabbitmq/stream/StreamCreator.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -18,6 +18,9 @@
1818
/** API to configure and create a stream. */
1919
public interface StreamCreator {
2020

21+
/** Segment size is limited to 3 GB. */
22+
ByteCapacity MAX_SEGMENT_SIZE = ByteCapacity.from("3GB");
23+
2124
/**
2225
* The name of the stream
2326
*
@@ -37,6 +40,8 @@ public interface StreamCreator {
3740
/**
3841
* The maximum size of each stream segments.
3942
*
43+
* <p>Maximum size is {@link StreamCreator#MAX_SEGMENT_SIZE} (3 GB).
44+
*
4045
* @param byteCapacity
4146
* @return this creator instance
4247
*/

src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -46,6 +46,10 @@ public StreamCreator maxLengthBytes(ByteCapacity byteCapacity) {
4646

4747
@Override
4848
public StreamCreator maxSegmentSizeBytes(ByteCapacity byteCapacity) {
49+
if (byteCapacity != null && byteCapacity.compareTo(MAX_SEGMENT_SIZE) > 0) {
50+
throw new IllegalArgumentException(
51+
"The maximum segment size cannot be more than " + MAX_SEGMENT_SIZE);
52+
}
4953
streamParametersBuilder.maxSegmentSizeBytes(byteCapacity);
5054
return this;
5155
}

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@
8787
import org.slf4j.LoggerFactory;
8888
import picocli.CommandLine;
8989
import picocli.CommandLine.Model.CommandSpec;
90+
import picocli.CommandLine.ParameterException;
91+
import picocli.CommandLine.Spec;
9092

9193
@CommandLine.Command(
9294
name = "stream-perf-test",
@@ -105,6 +107,8 @@ public class StreamPerfTest implements Callable<Integer> {
105107
};
106108
private final String[] arguments;
107109

110+
@Spec CommandSpec spec; // injected by picocli
111+
108112
@CommandLine.Mixin
109113
private final CommandLine.HelpCommand helpCommand = new CommandLine.HelpCommand();
110114

@@ -220,12 +224,21 @@ public class StreamPerfTest implements Callable<Integer> {
220224
converter = Utils.ByteCapacityTypeConverter.class)
221225
private ByteCapacity maxLengthBytes;
222226

227+
private ByteCapacity maxSegmentSize;
228+
223229
@CommandLine.Option(
224230
names = {"--stream-max-segment-size-bytes", "-smssb"},
225231
description = "max size of segments",
226232
defaultValue = "500mb",
227233
converter = Utils.ByteCapacityTypeConverter.class)
228-
private ByteCapacity maxSegmentSize;
234+
public void setMaxSegmentSize(ByteCapacity in) {
235+
if (in != null && in.compareTo(StreamCreator.MAX_SEGMENT_SIZE) > 0) {
236+
throw new ParameterException(
237+
spec.commandLine(),
238+
"The maximum segment size cannot be more than " + StreamCreator.MAX_SEGMENT_SIZE);
239+
}
240+
this.maxSegmentSize = in;
241+
}
229242

230243
@CommandLine.Option(
231244
names = {"--max-age", "-ma"},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
17+
18+
import com.rabbitmq.stream.ByteCapacity;
19+
import org.junit.jupiter.api.Test;
20+
21+
public class StreamCreatorTest {
22+
23+
@Test
24+
void maxSegmentSizeBytesOK() {
25+
new StreamStreamCreator(null)
26+
.maxSegmentSizeBytes(ByteCapacity.MB(500))
27+
.maxSegmentSizeBytes(ByteCapacity.MB(2999))
28+
.maxSegmentSizeBytes(ByteCapacity.GB(3));
29+
}
30+
31+
@Test
32+
void maxSegmentSizeBytesKO() {
33+
assertThatThrownBy(
34+
() -> {
35+
new StreamStreamCreator(null).maxSegmentSizeBytes(ByteCapacity.MB(3001));
36+
})
37+
.isInstanceOf(IllegalArgumentException.class);
38+
assertThatThrownBy(
39+
() -> {
40+
new StreamStreamCreator(null).maxSegmentSizeBytes(ByteCapacity.GB(4));
41+
})
42+
.isInstanceOf(IllegalArgumentException.class);
43+
}
44+
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -110,7 +110,7 @@ public static Duration waitAtMost(int timeoutInSeconds, CallableBooleanSupplier
110110
return waitAtMost(timeoutInSeconds, condition, null);
111111
}
112112

113-
static Duration waitAtMost(
113+
public static Duration waitAtMost(
114114
int timeoutInSeconds, CallableBooleanSupplier condition, Supplier<String> message)
115115
throws Exception {
116116
if (condition.getAsBoolean()) {

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ void tearDownTest() {
101101
}
102102

103103
private void waitRunEnds(int expectedExitCode) throws Exception {
104-
waitAtMost(20, () -> exitCode.get() == expectedExitCode);
104+
waitAtMost(
105+
20,
106+
() -> exitCode.get() == expectedExitCode,
107+
() -> "Expected " + expectedExitCode + " exit code, got " + exitCode.get());
105108
}
106109

107110
private void waitRunEnds() throws Exception {
@@ -266,7 +269,7 @@ void streamCreationIsIdempotentWhateverTheDifferencesInStreamProperties() throws
266269
Future<?> run =
267270
run(
268271
builder()
269-
.maxLengthBytes(ByteCapacity.GB(42)) // different than already existing stream
272+
.maxLengthBytes(ByteCapacity.GB(42)) // different from already existing stream
270273
.streamMaxSegmentSizeBytes(ByteCapacity.MB(500))
271274
.leaderLocator(LeaderLocator.LEAST_LEADERS));
272275
waitOneSecond();
@@ -275,6 +278,14 @@ void streamCreationIsIdempotentWhateverTheDifferencesInStreamProperties() throws
275278
assertThat(consoleOutput()).contains("Warning: stream '" + s + "'");
276279
}
277280

281+
@Test
282+
void exceedingMaxSegmentSizeLimitShouldGenerateError() throws Exception {
283+
run(builder().streamMaxSegmentSizeBytes(ByteCapacity.TB(1)));
284+
waitRunEnds(2);
285+
assertThat(consoleErrorOutput()).contains("The maximum segment size cannot be more than 3GB");
286+
checkErrIsEmpty = false;
287+
}
288+
278289
@Test
279290
void byteRatesShouldBeIncludedWhenOptionIsEnabled() throws Exception {
280291
Future<?> run = run(builder().byteRates());

0 commit comments

Comments
 (0)