Skip to content

Commit b7c5bfa

Browse files
committed
Move exhaustCursor method to cursor interface.
JAVA-5530
1 parent f43fc49 commit b7c5bfa

File tree

7 files changed

+183
-32
lines changed

7 files changed

+183
-32
lines changed

driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
import com.mongodb.internal.operation.BatchCursor;
2020

2121
import java.io.Closeable;
22+
import java.util.ArrayList;
2223
import java.util.List;
2324

25+
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
26+
2427
/**
2528
* MongoDB returns query results as batches, and this interface provides an asynchronous iterator over those batches. The first call to
2629
* the {@code next} method will return the first batch, and subsequent calls will trigger an asynchronous request to get the next batch
@@ -72,4 +75,22 @@ public interface AsyncBatchCursor<T> extends Closeable {
7275
*/
7376
@Override
7477
void close();
78+
79+
default void exhaustCursor(final SingleResultCallback<List<List<T>>> finalCallback) {
80+
List<List<T>> results = new ArrayList<>();
81+
82+
beginAsync().thenRunDoWhileLoop(iterationCallback -> {
83+
beginAsync().
84+
thenSupply(this::next)
85+
.thenConsume((batch, callback) -> {
86+
if (batch != null && !batch.isEmpty()) {
87+
results.add(batch);
88+
}
89+
callback.complete(callback);
90+
}).finish(iterationCallback);
91+
}, () -> !this.isClosed())
92+
.<List<List<T>>>thenSupply(callback -> {
93+
callback.complete(results);
94+
}).finish(finalCallback);
95+
}
7596
}

driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525
import java.util.Iterator;
2626
import java.util.List;
2727

28+
import static java.util.Spliterator.IMMUTABLE;
29+
import static java.util.Spliterator.ORDERED;
30+
import static java.util.Spliterators.spliteratorUnknownSize;
31+
import static java.util.stream.Collectors.toList;
32+
import static java.util.stream.StreamSupport.stream;
33+
2834
/**
2935
* MongoDB returns query results as batches, and this interface provideds an iterator over those batches. The first call to
3036
* the {@code next} method will return the first batch, and subsequent calls will trigger a request to get the next batch
@@ -98,4 +104,9 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
98104
ServerCursor getServerCursor();
99105

100106
ServerAddress getServerAddress();
107+
108+
default List<List<T>> exhaustCursor() {
109+
return stream(spliteratorUnknownSize(this, ORDERED | IMMUTABLE), false)
110+
.collect(toList());
111+
}
101112
}

driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@
132132
import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState;
133133
import static com.mongodb.internal.operation.CommandOperationHelper.shouldAttemptToRetryWriteAndAddRetryableLabel;
134134
import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException;
135-
import static com.mongodb.internal.operation.CursorHelper.exhaustCursorAsync;
136135
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
137136
import static com.mongodb.internal.operation.SyncOperationHelper.cursorDocumentToBatchCursor;
138137
import static com.mongodb.internal.operation.SyncOperationHelper.decorateWriteWithRetries;
@@ -141,12 +140,8 @@
141140
import static java.util.Collections.emptyMap;
142141
import static java.util.Collections.singletonList;
143142
import static java.util.Optional.ofNullable;
144-
import static java.util.Spliterator.IMMUTABLE;
145-
import static java.util.Spliterator.ORDERED;
146-
import static java.util.Spliterators.spliteratorUnknownSize;
147143
import static java.util.stream.Collectors.toList;
148144
import static java.util.stream.Collectors.toSet;
149-
import static java.util.stream.StreamSupport.stream;
150145

151146
/**
152147
* This class is not part of the public API and may be removed or changed at any time.
@@ -544,7 +539,7 @@ private List<List<BsonDocument>> exhaustBulkWriteCommandOkResponseCursor(
544539
options.getComment().orElse(null),
545540
connectionSource,
546541
connection)) {
547-
return stream(spliteratorUnknownSize(cursor, ORDERED | IMMUTABLE), false).collect(toList());
542+
return cursor.exhaustCursor();
548543
}
549544
}
550545

@@ -562,7 +557,7 @@ private void exhaustBulkWriteCommandOkResponseCursorAsync(final AsyncConnectionS
562557
connection);
563558

564559
beginAsync().<List<List<BsonDocument>>>thenSupply(callback -> {
565-
exhaustCursorAsync(cursor, callback);
560+
cursor.exhaustCursor(callback);
566561
}).thenAlwaysRunAndFinish(() -> {
567562
if (!cursor.isClosed()) {
568563
cursor.close();

driver-core/src/main/com/mongodb/internal/operation/CursorHelper.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,41 +16,16 @@
1616

1717
package com.mongodb.internal.operation;
1818

19-
import com.mongodb.internal.async.AsyncBatchCursor;
20-
import com.mongodb.internal.async.SingleResultCallback;
2119
import com.mongodb.lang.Nullable;
2220
import org.bson.BsonDocument;
2321
import org.bson.BsonInt32;
2422

25-
import java.util.ArrayList;
26-
import java.util.List;
27-
28-
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
29-
3023
final class CursorHelper {
3124

3225
static BsonDocument getCursorDocumentFromBatchSize(@Nullable final Integer batchSize) {
3326
return batchSize == null ? new BsonDocument() : new BsonDocument("batchSize", new BsonInt32(batchSize));
3427
}
3528

36-
public static <T> void exhaustCursorAsync(final AsyncBatchCursor<T> cursor, final SingleResultCallback<List<List<T>>> finalCallback) {
37-
List<List<T>> results = new ArrayList<>();
38-
39-
beginAsync().thenRunDoWhileLoop(iterationCallback -> {
40-
beginAsync().
41-
thenSupply(cursor::next)
42-
.thenConsume((batch, callback) -> {
43-
if (batch != null && !batch.isEmpty()) {
44-
results.add(batch);
45-
}
46-
callback.complete(callback);
47-
}).finish(iterationCallback);
48-
}, () -> !cursor.isClosed())
49-
.<List<List<T>>>thenSupply(callback -> {
50-
callback.complete(results);
51-
}).finish(finalCallback);
52-
}
53-
5429
private CursorHelper() {
5530
}
5631
}

driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,13 @@ public void deleteOne(final Bson filter) {
363363
.execute(getBinding());
364364
}
365365

366+
public void deleteMany(final Bson filter) {
367+
new MixedBulkWriteOperation(namespace,
368+
singletonList(new DeleteRequest(filter.toBsonDocument(Document.class, registry)).multi(true)),
369+
true, WriteConcern.ACKNOWLEDGED, false)
370+
.execute(getBinding());
371+
}
372+
366373
public List<T> find(final Bson filter) {
367374
return find(filter, null);
368375
}

driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.mongodb.ServerCursor;
2424
import com.mongodb.client.cursor.TimeoutMode;
2525
import com.mongodb.client.model.CreateCollectionOptions;
26+
import com.mongodb.client.model.Filters;
2627
import com.mongodb.client.model.OperationTest;
2728
import com.mongodb.internal.binding.AsyncConnectionSource;
2829
import com.mongodb.internal.connection.AsyncConnection;
@@ -46,7 +47,9 @@
4647

4748
import java.util.ArrayList;
4849
import java.util.List;
50+
import java.util.concurrent.CompletableFuture;
4951
import java.util.concurrent.CountDownLatch;
52+
import java.util.concurrent.ExecutionException;
5053
import java.util.concurrent.TimeUnit;
5154
import java.util.concurrent.atomic.AtomicInteger;
5255
import java.util.stream.Collectors;
@@ -103,6 +106,95 @@ void cleanup() {
103106
});
104107
}
105108

109+
@Test
110+
@DisplayName("should exhaust cursor with multiple batches")
111+
void shouldExhaustCursorAsyncWithMultipleBatches() {
112+
// given
113+
BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3
114+
cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
115+
null, connectionSource, connection);
116+
117+
CompletableFuture<List<List<Document>>> futureResult = new CompletableFuture<>();
118+
119+
// when
120+
cursor.exhaustCursor((result, throwable) -> {
121+
if (throwable != null) {
122+
futureResult.completeExceptionally(throwable);
123+
} else {
124+
futureResult.complete(result);
125+
}
126+
});
127+
128+
// then
129+
assertDoesNotThrow(() -> {
130+
List<List<Document>> resultBatches = futureResult.get(5, TimeUnit.SECONDS);
131+
132+
assertEquals(4, resultBatches.size(), "Expected 4 batches for 10 documents with batch size of 3.");
133+
134+
int totalDocuments = resultBatches.stream().mapToInt(List::size).sum();
135+
assertEquals(10, totalDocuments, "Expected a total of 10 documents.");
136+
});
137+
}
138+
139+
@Test
140+
@DisplayName("should exhaust cursor with closed cursor")
141+
void shouldExhaustCursorAsyncWithClosedCursor() {
142+
// given
143+
BsonDocument commandResult = executeFindCommand(0, 3);
144+
cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
145+
null, connectionSource, connection);
146+
147+
cursor.close();
148+
149+
CompletableFuture<List<List<Document>>> futureResult = new CompletableFuture<>();
150+
151+
// when
152+
cursor.exhaustCursor((result, throwable) -> {
153+
if (throwable != null) {
154+
futureResult.completeExceptionally(throwable);
155+
} else {
156+
futureResult.complete(result);
157+
}
158+
});
159+
160+
//then
161+
ExecutionException executionException = assertThrows(ExecutionException.class, () -> {
162+
futureResult.get(5, TimeUnit.SECONDS);
163+
}, "Expected an exception when operating on a closed cursor.");
164+
165+
IllegalStateException illegalStateException = (IllegalStateException) executionException.getCause();
166+
assertEquals("Cursor has been closed", illegalStateException.getMessage());
167+
}
168+
169+
@Test
170+
@DisplayName("should exhaust cursor with empty cursor")
171+
void shouldExhaustCursorAsyncWithEmptyCursor() {
172+
// given
173+
getCollectionHelper().deleteMany(Filters.empty());
174+
175+
BsonDocument commandResult = executeFindCommand(0, 3); // No documents to fetch
176+
cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
177+
null, connectionSource, connection);
178+
179+
CompletableFuture<List<List<Document>>> futureResult = new CompletableFuture<>();
180+
181+
// when
182+
cursor.exhaustCursor((result, throwable) -> {
183+
if (throwable != null) {
184+
futureResult.completeExceptionally(throwable);
185+
} else {
186+
futureResult.complete(result);
187+
}
188+
});
189+
190+
// then
191+
assertDoesNotThrow(() -> {
192+
List<List<Document>> resultBatches = futureResult.get(5, TimeUnit.SECONDS);
193+
194+
assertTrue(resultBatches.isEmpty(), "Expected no batches for an empty cursor.");
195+
});
196+
}
197+
106198
@Test
107199
@DisplayName("server cursor should not be null")
108200
void theServerCursorShouldNotBeNull() {

driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.mongodb.ServerCursor;
2323
import com.mongodb.client.cursor.TimeoutMode;
2424
import com.mongodb.client.model.CreateCollectionOptions;
25+
import com.mongodb.client.model.Filters;
2526
import com.mongodb.client.model.OperationTest;
2627
import com.mongodb.internal.binding.ConnectionSource;
2728
import com.mongodb.internal.connection.Connection;
@@ -101,6 +102,55 @@ void cleanup() {
101102
});
102103
}
103104

105+
@Test
106+
@DisplayName("should exhaust cursor with multiple batches")
107+
void shouldExhaustCursorAsyncWithMultipleBatches() {
108+
// given
109+
BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3
110+
cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
111+
null, connectionSource, connection);
112+
113+
// when
114+
List<List<Document>> result = cursor.exhaustCursor();
115+
116+
// then
117+
assertEquals(4, result.size(), "Expected 4 batches for 10 documents with batch size of 3.");
118+
119+
int totalDocuments = result.stream().mapToInt(List::size).sum();
120+
assertEquals(10, totalDocuments, "Expected a total of 10 documents.");
121+
}
122+
123+
@Test
124+
@DisplayName("should exhaust cursor with closed cursor")
125+
void shouldExhaustCursorAsyncWithClosedCursor() {
126+
// given
127+
BsonDocument commandResult = executeFindCommand(0, 3);
128+
cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
129+
null, connectionSource, connection);
130+
cursor.close();
131+
132+
// when & then
133+
IllegalStateException illegalStateException = assertThrows(IllegalStateException.class, cursor::exhaustCursor);
134+
assertEquals("Cursor has been closed", illegalStateException.getMessage());
135+
}
136+
137+
@Test
138+
@DisplayName("should exhaust cursor async with empty cursor")
139+
void shouldExhaustCursorAsyncWithEmptyCursor() {
140+
// given
141+
getCollectionHelper().deleteMany(Filters.empty());
142+
143+
BsonDocument commandResult = executeFindCommand(0, 3); // No documents to fetch
144+
cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
145+
null, connectionSource, connection);
146+
147+
// when
148+
List<List<Document>> result = cursor.exhaustCursor();
149+
150+
// then
151+
assertTrue(result.isEmpty(), "Expected no batches for an empty cursor.");
152+
}
153+
104154
@Test
105155
@DisplayName("server cursor should not be null")
106156
void theServerCursorShouldNotBeNull() {

0 commit comments

Comments
 (0)