Skip to content

Commit d8f91fb

Browse files
authored
Refactor MixedBulkWriteOperation a bit and change CommandMessage.isResponseExpected such that it accounts for ordered/unordered bulk writes (#1481)
1 parent 81402ae commit d8f91fb

File tree

7 files changed

+83
-58
lines changed

7 files changed

+83
-58
lines changed

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import static com.mongodb.ReadPreference.primary;
4646
import static com.mongodb.ReadPreference.primaryPreferred;
4747
import static com.mongodb.assertions.Assertions.assertFalse;
48+
import static com.mongodb.assertions.Assertions.assertTrue;
4849
import static com.mongodb.assertions.Assertions.notNull;
4950
import static com.mongodb.connection.ClusterConnectionMode.LOAD_BALANCED;
5051
import static com.mongodb.connection.ClusterConnectionMode.SINGLE;
@@ -112,6 +113,7 @@ public final class CommandMessage extends RequestMessage {
112113
this.payloadFieldNameValidator = payloadFieldNameValidator;
113114
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
114115
this.serverApi = serverApi;
116+
assertTrue(useOpMsg() || responseExpected);
115117
}
116118

117119
/**
@@ -187,7 +189,11 @@ private String getSequenceIdentifier(final ByteBuf byteBuf) {
187189
}
188190

189191
boolean isResponseExpected() {
190-
return !useOpMsg() || requireOpMsgResponse();
192+
if (responseExpected) {
193+
return true;
194+
} else {
195+
return payload != null && payload.isOrdered() && payload.hasAnotherSplit();
196+
}
191197
}
192198

193199
MongoNamespace getNamespace() {
@@ -240,7 +246,7 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu
240246

241247
private int getOpMsgFlagBits() {
242248
int flagBits = 0;
243-
if (!requireOpMsgResponse()) {
249+
if (!isResponseExpected()) {
244250
flagBits = 1 << 1;
245251
}
246252
if (exhaustAllowed) {
@@ -249,14 +255,6 @@ private int getOpMsgFlagBits() {
249255
return flagBits;
250256
}
251257

252-
private boolean requireOpMsgResponse() {
253-
if (responseExpected) {
254-
return true;
255-
} else {
256-
return payload != null && payload.hasAnotherSplit();
257-
}
258-
}
259-
260258
private boolean isDirectConnectionToReplicaSetMember() {
261259
return clusterConnectionMode == SINGLE
262260
&& getSettings().getServerType() != SHARD_ROUTER

driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.stream.Collectors;
3838

3939
import static com.mongodb.assertions.Assertions.assertNotNull;
40+
import static com.mongodb.assertions.Assertions.assertTrue;
4041
import static com.mongodb.assertions.Assertions.isTrue;
4142
import static com.mongodb.assertions.Assertions.notNull;
4243
import static com.mongodb.internal.connection.SplittablePayload.Type.INSERT;
@@ -57,6 +58,7 @@ public final class SplittablePayload {
5758
private final WriteRequestEncoder writeRequestEncoder = new WriteRequestEncoder();
5859
private final Type payloadType;
5960
private final List<WriteRequestWithIndex> writeRequestWithIndexes;
61+
private final boolean ordered;
6062
private final Map<Integer, BsonValue> insertedIds = new HashMap<>();
6163
private int position = 0;
6264

@@ -91,9 +93,10 @@ public enum Type {
9193
* @param payloadType the payload type
9294
* @param writeRequestWithIndexes the writeRequests
9395
*/
94-
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes) {
96+
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes, final boolean ordered) {
9597
this.payloadType = notNull("batchType", payloadType);
9698
this.writeRequestWithIndexes = notNull("writeRequests", writeRequestWithIndexes);
99+
this.ordered = ordered;
97100
}
98101

99102
/**
@@ -117,7 +120,7 @@ public String getPayloadName() {
117120
}
118121

119122
boolean hasPayload() {
120-
return writeRequestWithIndexes.size() > 0;
123+
return !writeRequestWithIndexes.isEmpty();
121124
}
122125

123126
public int size() {
@@ -137,10 +140,6 @@ public List<BsonDocument> getPayload() {
137140
.collect(Collectors.toList());
138141
}
139142

140-
public List<WriteRequestWithIndex> getWriteRequestWithIndexes() {
141-
return writeRequestWithIndexes;
142-
}
143-
144143
/**
145144
* @return the current position in the payload
146145
*/
@@ -160,16 +159,22 @@ public void setPosition(final int position) {
160159
* @return true if there are more values after the current position
161160
*/
162161
public boolean hasAnotherSplit() {
162+
// this method must be not called before this payload having been encoded
163+
assertTrue(position > 0);
163164
return writeRequestWithIndexes.size() > position;
164165
}
165166

167+
boolean isOrdered() {
168+
return ordered;
169+
}
170+
166171
/**
167172
* @return a new SplittablePayload containing only the values after the current position.
168173
*/
169174
public SplittablePayload getNextSplit() {
170175
isTrue("hasAnotherSplit", hasAnotherSplit());
171176
List<WriteRequestWithIndex> nextPayLoad = writeRequestWithIndexes.subList(position, writeRequestWithIndexes.size());
172-
return new SplittablePayload(payloadType, nextPayLoad);
177+
return new SplittablePayload(payloadType, nextPayLoad, ordered);
173178
}
174179

175180
/**

driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoBulkWriteException;
20-
import com.mongodb.MongoClientException;
2120
import com.mongodb.MongoInternalException;
2221
import com.mongodb.MongoNamespace;
2322
import com.mongodb.WriteConcern;
@@ -65,6 +64,7 @@
6564
import static com.mongodb.internal.bulk.WriteRequest.Type.REPLACE;
6665
import static com.mongodb.internal.bulk.WriteRequest.Type.UPDATE;
6766
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
67+
import static com.mongodb.internal.operation.MixedBulkWriteOperation.commandWriteConcern;
6868
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
6969
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
7070
import static com.mongodb.internal.operation.WriteConcernHelper.createWriteConcernError;
@@ -101,12 +101,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
101101
final List<? extends WriteRequest> writeRequests,
102102
final OperationContext operationContext,
103103
@Nullable final BsonValue comment, @Nullable final BsonDocument variables) {
104-
SessionContext sessionContext = operationContext.getSessionContext();
105-
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !sessionContext.hasActiveTransaction()
106-
&& !writeConcern.isAcknowledged()) {
107-
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
108-
}
109-
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext);
104+
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, operationContext.getSessionContext());
110105
List<WriteRequestWithIndex> writeRequestsWithIndex = new ArrayList<>();
111106
boolean writeRequestsAreRetryable = true;
112107
for (int i = 0; i < writeRequests.size(); i++) {
@@ -159,7 +154,7 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
159154

160155
this.indexMap = indexMap;
161156
this.unprocessed = unprocessedItems;
162-
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems);
157+
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered);
163158
this.operationContext = operationContext;
164159
this.comment = comment;
165160
this.variables = variables;
@@ -169,9 +164,8 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
169164
if (!payloadItems.isEmpty()) {
170165
command.put(getCommandName(batchType), new BsonString(namespace.getCollectionName()));
171166
command.put("ordered", new BsonBoolean(ordered));
172-
if (!writeConcern.isServerDefault() && !sessionContext.hasActiveTransaction()) {
173-
command.put("writeConcern", writeConcern.asDocument());
174-
}
167+
commandWriteConcern(writeConcern, sessionContext).ifPresent(value ->
168+
command.put("writeConcern", value.asDocument()));
175169
if (bypassDocumentValidation != null) {
176170
command.put("bypassDocumentValidation", new BsonBoolean(bypassDocumentValidation));
177171
}

driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.operation;
1818

19+
import com.mongodb.MongoClientException;
1920
import com.mongodb.MongoException;
2021
import com.mongodb.MongoNamespace;
2122
import com.mongodb.WriteConcern;
@@ -191,8 +192,8 @@ public BulkWriteResult execute(final WriteBinding binding) {
191192
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
192193
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
193194
SessionContext sessionContext = binding.getOperationContext().getSessionContext();
194-
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
195-
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)) {
195+
WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext);
196+
if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)) {
196197
handleMongoWriteConcernWithResponseException(retryState, true, timeoutContext);
197198
}
198199
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern);
@@ -201,7 +202,7 @@ public BulkWriteResult execute(final WriteBinding binding) {
201202
connectionDescription, ordered, writeConcern,
202203
bypassDocumentValidation, retryWrites, writeRequests, binding.getOperationContext(), comment, variables), timeoutContext);
203204
}
204-
return executeBulkWriteBatch(retryState, binding, connection);
205+
return executeBulkWriteBatch(retryState, writeConcern, binding, connection);
205206
})
206207
);
207208
try {
@@ -226,8 +227,8 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
226227
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
227228
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
228229
SessionContext sessionContext = binding.getOperationContext().getSessionContext();
229-
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
230-
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)
230+
WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext);
231+
if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)
231232
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback, timeoutContext)) {
232233
return;
233234
}
@@ -245,13 +246,17 @@ && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallba
245246
releasingCallback.onResult(null, t);
246247
return;
247248
}
248-
executeBulkWriteBatchAsync(retryState, binding, connection, releasingCallback);
249+
executeBulkWriteBatchAsync(retryState, writeConcern, binding, connection, releasingCallback);
249250
})
250251
).whenComplete(binding::release);
251252
retryingBulkWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER)));
252253
}
253254

254-
private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection) {
255+
private BulkWriteResult executeBulkWriteBatch(
256+
final RetryState retryState,
257+
final WriteConcern effectiveWriteConcern,
258+
final WriteBinding binding,
259+
final Connection connection) {
255260
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
256261
.orElseThrow(Assertions::fail);
257262
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail);
@@ -261,7 +266,7 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final
261266

262267
while (currentBatch.shouldProcessBatch()) {
263268
try {
264-
BsonDocument result = executeCommand(operationContext, connection, currentBatch);
269+
BsonDocument result = executeCommand(effectiveWriteConcern, operationContext, connection, currentBatch);
265270
if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) {
266271
MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result,
267272
connection.getDescription().getServerAddress(), "errMsg", timeoutContext);
@@ -295,7 +300,11 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final
295300
}
296301
}
297302

298-
private void executeBulkWriteBatchAsync(final RetryState retryState, final AsyncWriteBinding binding, final AsyncConnection connection,
303+
private void executeBulkWriteBatchAsync(
304+
final RetryState retryState,
305+
final WriteConcern effectiveWriteConcern,
306+
final AsyncWriteBinding binding,
307+
final AsyncConnection connection,
299308
final SingleResultCallback<BulkWriteResult> callback) {
300309
LoopState loopState = new LoopState();
301310
AsyncCallbackRunnable loop = new AsyncCallbackLoop(loopState, iterationCallback -> {
@@ -309,7 +318,7 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async
309318
}
310319
OperationContext operationContext = binding.getOperationContext();
311320
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
312-
executeCommandAsync(operationContext, connection, currentBatch, (result, t) -> {
321+
executeCommandAsync(effectiveWriteConcern, operationContext, connection, currentBatch, (result, t) -> {
313322
if (t == null) {
314323
if (currentBatch.getRetryWrites() && !operationContext.getSessionContext().hasActiveTransaction()) {
315324
MongoException writeConcernBasedError = ProtocolHelper.createSpecialException(result,
@@ -405,31 +414,47 @@ private boolean handleMongoWriteConcernWithResponseExceptionAsync(final RetrySta
405414
}
406415

407416
@Nullable
408-
private BsonDocument executeCommand(final OperationContext operationContext, final Connection connection, final BulkWriteBatch batch) {
417+
private BsonDocument executeCommand(
418+
final WriteConcern effectiveWriteConcern,
419+
final OperationContext operationContext,
420+
final Connection connection,
421+
final BulkWriteBatch batch) {
409422
return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
410-
operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()),
423+
operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
411424
batch.getPayload(), batch.getFieldNameValidator());
412425
}
413426

414-
private void executeCommandAsync(final OperationContext operationContext, final AsyncConnection connection, final BulkWriteBatch batch,
427+
private void executeCommandAsync(
428+
final WriteConcern effectiveWriteConcern,
429+
final OperationContext operationContext,
430+
final AsyncConnection connection,
431+
final BulkWriteBatch batch,
415432
final SingleResultCallback<BsonDocument> callback) {
416433
connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
417-
operationContext, shouldAcknowledge(batch, operationContext.getSessionContext()),
434+
operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
418435
batch.getPayload(), batch.getFieldNameValidator(), callback);
419436
}
420437

421-
private WriteConcern getAppliedWriteConcern(final SessionContext sessionContext) {
422-
if (sessionContext.hasActiveTransaction()) {
423-
return WriteConcern.ACKNOWLEDGED;
424-
} else {
425-
return writeConcern;
438+
private static WriteConcern validateAndGetEffectiveWriteConcern(final WriteConcern writeConcernSetting, final SessionContext sessionContext)
439+
throws MongoClientException {
440+
boolean activeTransaction = sessionContext.hasActiveTransaction();
441+
WriteConcern effectiveWriteConcern = activeTransaction
442+
? WriteConcern.ACKNOWLEDGED
443+
: writeConcernSetting;
444+
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !activeTransaction && !effectiveWriteConcern.isAcknowledged()) {
445+
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
426446
}
447+
return effectiveWriteConcern;
427448
}
428449

429-
private boolean shouldAcknowledge(final BulkWriteBatch batch, final SessionContext sessionContext) {
430-
return ordered
431-
? batch.hasAnotherBatch() || getAppliedWriteConcern(sessionContext).isAcknowledged()
432-
: getAppliedWriteConcern(sessionContext).isAcknowledged();
450+
static Optional<WriteConcern> commandWriteConcern(final WriteConcern effectiveWriteConcern, final SessionContext sessionContext) {
451+
return effectiveWriteConcern.isServerDefault() || sessionContext.hasActiveTransaction()
452+
? Optional.empty()
453+
: Optional.of(effectiveWriteConcern);
454+
}
455+
456+
private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) {
457+
return effectiveWriteConcern.isAcknowledged() || (ordered && batch.hasAnotherBatch());
433458
}
434459

435460
private void addErrorLabelsToWriteConcern(final BsonDocument result, final Set<String> errorLabels) {

0 commit comments

Comments
 (0)