Skip to content

Commit 893536a

Browse files
authored
Merge pull request #757 from rabbitmq/dynamic-batch-pumping
Make dynamic batch pump more aggressively
2 parents 05df2a5 + 8a198ee commit 893536a

File tree

2 files changed

+53
-9
lines changed

2 files changed

+53
-9
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,30 @@ private void loop() {
6969
if (state.items.size() >= state.batchSize) {
7070
this.maybeCompleteBatch(state, true);
7171
} else {
72-
item = this.requests.poll();
73-
if (item == null) {
74-
this.maybeCompleteBatch(state, false);
75-
} else {
76-
state.items.add(item);
77-
if (state.items.size() >= state.batchSize) {
78-
this.maybeCompleteBatch(state, true);
79-
}
80-
}
72+
pump(state, 2);
8173
}
8274
} else {
8375
this.maybeCompleteBatch(state, false);
8476
}
8577
}
8678
}
8779

80+
private void pump(State<T> state, int pumpCount) {
81+
if (pumpCount <= 0) {
82+
return;
83+
}
84+
T item = this.requests.poll();
85+
if (item == null) {
86+
this.maybeCompleteBatch(state, false);
87+
} else {
88+
state.items.add(item);
89+
if (state.items.size() >= state.batchSize) {
90+
this.maybeCompleteBatch(state, true);
91+
}
92+
this.pump(state, pumpCount - 1);
93+
}
94+
}
95+
8896
private static final class State<T> {
8997

9098
int batchSize;

src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import com.rabbitmq.stream.impl.TestUtils.Sync;
2424
import java.util.Locale;
2525
import java.util.Random;
26+
import java.util.concurrent.Semaphore;
27+
import java.util.concurrent.TimeUnit;
2628
import java.util.concurrent.atomic.AtomicBoolean;
2729
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.concurrent.atomic.AtomicLong;
2831
import java.util.stream.IntStream;
2932
import org.junit.jupiter.api.Test;
3033

@@ -118,4 +121,37 @@ void failedProcessingIsReplayed() throws Exception {
118121
waitAtMost(() -> collected.get() == itemCount);
119122
}
120123
}
124+
125+
@Test
126+
void lowThrottlingValueShouldStillHighPublishingRate() throws Exception {
127+
int batchSize = 10;
128+
Semaphore semaphore = new Semaphore(batchSize);
129+
DynamicBatch.BatchConsumer<Long> action =
130+
items -> {
131+
semaphore.release(items.size());
132+
return true;
133+
};
134+
135+
try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize)) {
136+
MetricRegistry metrics = new MetricRegistry();
137+
Meter rate = metrics.meter("publishing-rate");
138+
AtomicBoolean keepGoing = new AtomicBoolean(true);
139+
AtomicLong sequence = new AtomicLong();
140+
new Thread(
141+
() -> {
142+
while (keepGoing.get() && !Thread.interrupted()) {
143+
long id = sequence.getAndIncrement();
144+
if (semaphore.tryAcquire()) {
145+
batch.add(id);
146+
rate.mark();
147+
}
148+
}
149+
})
150+
.start();
151+
long start = System.nanoTime();
152+
waitAtMost(
153+
() ->
154+
System.nanoTime() - start > TimeUnit.SECONDS.toNanos(1) && rate.getMeanRate() > 1000);
155+
}
156+
}
121157
}

0 commit comments

Comments
 (0)