Skip to content

Commit 40d58c8

Browse files
committed
Stop thread in DynamicBatch
1 parent 6d51653 commit 40d58c8

File tree

5 files changed

+22
-3
lines changed

5 files changed

+22
-3
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29-
class DynamicBatch<T> {
29+
final class DynamicBatch<T> implements AutoCloseable {
3030

3131
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
3232
private static final int MIN_BATCH_SIZE = 32;
@@ -35,11 +35,13 @@ class DynamicBatch<T> {
3535
private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
3636
private final Predicate<List<T>> consumer;
3737
private final int configuredBatchSize;
38+
private final Thread thread;
3839

3940
DynamicBatch(Predicate<List<T>> consumer, int batchSize) {
4041
this.consumer = consumer;
4142
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
42-
new Thread(this::loop).start();
43+
this.thread = new Thread(this::loop);
44+
this.thread.start();
4345
}
4446

4547
void add(T item) {
@@ -103,4 +105,9 @@ private boolean completeBatch(List<T> items) {
103105
return false;
104106
}
105107
}
108+
109+
@Override
110+
public void close() {
111+
this.thread.interrupt();
112+
}
106113
}

src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java

+5
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,9 @@ private boolean publish(List<Object> entities) {
151151
return false;
152152
}
153153
}
154+
155+
@Override
156+
public void close() {
157+
this.dynamicBatch.close();
158+
}
154159
}

src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
import com.rabbitmq.stream.ConfirmationHandler;
1818
import com.rabbitmq.stream.Message;
1919

20-
interface MessageAccumulator {
20+
interface MessageAccumulator extends AutoCloseable {
2121

2222
void add(Message message, ConfirmationHandler confirmationHandler);
2323

2424
int size();
2525

2626
void flush(boolean force);
27+
28+
@Override
29+
void close();
2730
}

src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java

+3
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,7 @@ private void publishBatch(boolean stateCheck) {
112112
producer.publishInternal(entities);
113113
}
114114
}
115+
116+
@Override
117+
public void close() {}
115118
}

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

+1
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ public void close() {
441441
}
442442

443443
void closeFromEnvironment() {
444+
this.accumulator.close();
444445
this.closingCallback.run();
445446
cancelConfirmTimeoutTask();
446447
this.closed.set(true);

0 commit comments

Comments
 (0)