Skip to content

Commit e352b31

Browse files
committed
Add improved Bulk Write API for Java Reactive Driver
- Created and documented the new Reactive Bulk Write API - Enabled unified and prose tests for reactive Bulk Write API JAVA-5530
1 parent 65e72d0 commit e352b31

File tree

20 files changed

+688
-138
lines changed

20 files changed

+688
-138
lines changed

driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface AsyncFunction<T, R> {
3434
* @param value A {@code @}{@link Nullable} argument of the asynchronous function.
3535
* @param callback the callback
3636
*/
37-
void unsafeFinish(T value, SingleResultCallback<R> callback);
37+
void unsafeFinish(@Nullable T value, SingleResultCallback<R> callback);
3838

3939
/**
4040
* Must be invoked at end of async chain or when executing a callback handler supplied by the caller.

driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.mongodb.internal.async;
1818

1919
import com.mongodb.internal.TimeoutContext;
20+
import com.mongodb.internal.async.function.AsyncCallbackLoop;
21+
import com.mongodb.internal.async.function.LoopState;
2022
import com.mongodb.internal.async.function.RetryState;
2123
import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;
2224

@@ -181,6 +183,27 @@ default AsyncRunnable thenRun(final AsyncRunnable runnable) {
181183
};
182184
}
183185

186+
// /**
187+
// * @param runnable The async runnable to run after this runnable
188+
// * @return the composition of this runnable and the runnable, a runnable
189+
// */
190+
// default AsyncSupplier<T> thenSupplyUntil(
191+
// final AsyncSupplier supplier,
192+
// final Predicate<Boolean> condition,
193+
// final Consumer<T> runnable) {
194+
// return (c) -> {
195+
// this.unsafeFinish((r, e) -> {
196+
// if (e == null) {
197+
// /* If 'runnable' is executed on a different thread from the one that executed the initial 'finish()',
198+
// then invoking 'finish()' within 'runnable' will catch and propagate any exceptions to 'c' (the callback). */
199+
// supplier.finish(c);
200+
// } else {
201+
// c.completeExceptionally(e);
202+
// }
203+
// });
204+
// };
205+
// }
206+
184207
/**
185208
* The error check checks if the exception is an instance of the provided class.
186209
* @see #thenRunTryCatchAsyncBlocks(AsyncRunnable, java.util.function.Predicate, AsyncFunction)
@@ -217,6 +240,18 @@ default AsyncRunnable thenRunTryCatchAsyncBlocks(
217240
});
218241
}
219242

243+
default <R> AsyncSupplier<R> thenSupplyTryCatchAsyncBlocks(
244+
final AsyncSupplier<R> supplier,
245+
final Predicate<Throwable> errorCheck,
246+
final AsyncFunction<Throwable, R> errorFunction) {
247+
return this.thenSupply(c -> {
248+
beginAsync()
249+
.thenSupply(supplier)
250+
.onErrorIf(errorCheck, errorFunction)
251+
.finish(c);
252+
});
253+
}
254+
220255
/**
221256
* @param condition the condition to check
222257
* @param runnable The async runnable to run after this runnable,
@@ -282,4 +317,51 @@ default AsyncRunnable thenRunRetryingWhile(
282317
).get(callback);
283318
});
284319
}
320+
321+
/**
322+
* In order to break the loop and complete the ongoing iteration, use
323+
* {@link LoopState#breakAndCompleteIf(Supplier, SingleResultCallback)} in the loopBodyRunnable.
324+
*
325+
* <p>
326+
* This is equivalent to while(true) with break.
327+
*
328+
* @param loopBodyRunnable the loopBodyRunnable to loop
329+
* @return the composition of this, and the looping branch
330+
* @see AsyncCallbackLoop
331+
*/
332+
default AsyncRunnable thenRunWhileLoop(final AsyncRunnable loopBodyRunnable, final LoopState loopState) {
333+
return thenRun(callback -> {
334+
new AsyncCallbackLoop(loopState, loopBodyRunnable::finish).run(callback);
335+
});
336+
}
337+
338+
/**
339+
* This method is equivalent to a do-while loop, where the loop body is executed first and
340+
* then the condition is checked to determine whether the loop should continue.
341+
*
342+
* @param loopBodyRunnable the asynchronous task to be executed in each iteration of the loop
343+
* @param whileCheck a condition to check after each iteration; the loop continues as long as this condition returns true
344+
* @return the composition of this and the looping branch
345+
* @see AsyncCallbackLoop
346+
*/
347+
348+
default AsyncRunnable thenRunDoWhileLoop(final AsyncRunnable loopBodyRunnable, final Supplier<Boolean> whileCheck) {
349+
return thenRun(finalCallback -> {
350+
LoopState loopState = new LoopState();
351+
new AsyncCallbackLoop(loopState, iterationCallback -> {
352+
353+
loopBodyRunnable.finish((result, t) -> {
354+
if (t != null) {
355+
iterationCallback.completeExceptionally(t);
356+
return;
357+
}
358+
if (loopState.breakAndCompleteIf(() -> !whileCheck.get(), iterationCallback)) {
359+
return;
360+
}
361+
iterationCallback.complete(iterationCallback);
362+
});
363+
364+
}).run(finalCallback);
365+
});
366+
}
285367
}

driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,26 @@ default void finish(final SingleResultCallback<T> callback) {
8181
}
8282
}
8383

84+
/**
85+
* The runnable will always be executed, including on the exceptional path.
86+
* @param runnable the runnable
87+
* @param callback the callback
88+
*/
89+
default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<T> callback) {
90+
this.finish((r, e) -> {
91+
try {
92+
runnable.run();
93+
} catch (Throwable t) {
94+
if (e != null) {
95+
t.addSuppressed(e);
96+
}
97+
callback.completeExceptionally(t);
98+
return;
99+
}
100+
callback.onResult(r, e);
101+
});
102+
}
103+
84104
/**
85105
* @param function The async function to run after this supplier
86106
* @return the composition of this supplier and the function, a supplier
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.internal.async;
17+
18+
import com.mongodb.annotations.NotThreadSafe;
19+
import com.mongodb.assertions.Assertions;
20+
import com.mongodb.lang.Nullable;
21+
22+
@NotThreadSafe
23+
public class MutableValue<T> {
24+
private T value;
25+
26+
public MutableValue(@Nullable final T value) {
27+
this.value = value;
28+
}
29+
30+
public MutableValue() {
31+
this(null);
32+
}
33+
34+
public T get() {
35+
return Assertions.assertNotNull(value);
36+
}
37+
38+
@Nullable
39+
public T getNullable() {
40+
return value;
41+
}
42+
43+
public void set(@Nullable final T value) {
44+
this.value = value;
45+
}
46+
}

driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackSupplier.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package com.mongodb.internal.async.function;
1717

18-
import com.mongodb.annotations.NotThreadSafe;
18+
import com.mongodb.internal.async.MutableValue;
1919
import com.mongodb.internal.async.SingleResultCallback;
2020

2121
import java.util.function.Supplier;
@@ -68,16 +68,12 @@ public interface AsyncCallbackSupplier<R> {
6868
* This is a price we have to pay to provide a guarantee similar to that of the {@code finally} block.
6969
*/
7070
default AsyncCallbackSupplier<R> whenComplete(final Runnable after) {
71-
@NotThreadSafe
72-
final class MutableBoolean {
73-
private boolean value;
74-
}
75-
MutableBoolean afterExecuted = new MutableBoolean();
71+
MutableValue<Boolean> afterExecuted = new MutableValue<>(false);
7672
Runnable trackableAfter = () -> {
7773
try {
7874
after.run();
7975
} finally {
80-
afterExecuted.value = true;
76+
afterExecuted.set(true);
8177
}
8278
};
8379
return callback -> {
@@ -103,7 +99,7 @@ final class MutableBoolean {
10399
primaryUnexpectedException = unexpectedException;
104100
throw unexpectedException;
105101
} finally {
106-
if (primaryUnexpectedException != null && !afterExecuted.value) {
102+
if (primaryUnexpectedException != null && !afterExecuted.get()) {
107103
try {
108104
trackableAfter.run();
109105
} catch (Throwable afterException) {

driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
import com.mongodb.client.model.SearchIndexModel;
4545
import com.mongodb.client.model.UpdateOptions;
4646
import com.mongodb.client.model.WriteModel;
47+
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
48+
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
49+
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
4750
import com.mongodb.client.model.changestream.FullDocument;
4851
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
4952
import com.mongodb.internal.TimeoutSettings;
@@ -293,6 +296,12 @@ public AsyncWriteOperation<BulkWriteResult> bulkWrite(final List<? extends Write
293296
return operations.bulkWrite(requests, options);
294297
}
295298

299+
public AsyncWriteOperation<ClientBulkWriteResult> clientBulkWriteOperation(
300+
final List<? extends ClientNamespacedWriteModel> clientWriteModels,
301+
@Nullable final ClientBulkWriteOptions options) {
302+
return operations.clientBulkWriteOperation(clientWriteModels, options);
303+
}
304+
296305
public <TResult> AsyncReadOperation<TResult> commandRead(final Bson command, final Class<TResult> resultClass) {
297306
return operations.commandRead(command, resultClass);
298307
}

0 commit comments

Comments
 (0)