Skip to content

Commit 0e16427

Browse files
committed
Merge branch 'master' into JAVA-5527
2 parents e049234 + bc49800 commit 0e16427

File tree

10 files changed

+216
-71
lines changed

10 files changed

+216
-71
lines changed

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import static com.mongodb.ReadPreference.primary;
4646
import static com.mongodb.ReadPreference.primaryPreferred;
4747
import static com.mongodb.assertions.Assertions.assertFalse;
48+
import static com.mongodb.assertions.Assertions.assertTrue;
4849
import static com.mongodb.assertions.Assertions.notNull;
4950
import static com.mongodb.connection.ClusterConnectionMode.LOAD_BALANCED;
5051
import static com.mongodb.connection.ClusterConnectionMode.SINGLE;
@@ -112,6 +113,7 @@ public final class CommandMessage extends RequestMessage {
112113
this.payloadFieldNameValidator = payloadFieldNameValidator;
113114
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
114115
this.serverApi = serverApi;
116+
assertTrue(useOpMsg() || responseExpected);
115117
}
116118

117119
/**
@@ -187,7 +189,11 @@ private String getSequenceIdentifier(final ByteBuf byteBuf) {
187189
}
188190

189191
boolean isResponseExpected() {
190-
return !useOpMsg() || requireOpMsgResponse();
192+
if (responseExpected) {
193+
return true;
194+
} else {
195+
return payload != null && payload.isOrdered() && payload.hasAnotherSplit();
196+
}
191197
}
192198

193199
MongoNamespace getNamespace() {
@@ -240,7 +246,7 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu
240246

241247
private int getOpMsgFlagBits() {
242248
int flagBits = 0;
243-
if (!requireOpMsgResponse()) {
249+
if (!isResponseExpected()) {
244250
flagBits = 1 << 1;
245251
}
246252
if (exhaustAllowed) {
@@ -249,14 +255,6 @@ private int getOpMsgFlagBits() {
249255
return flagBits;
250256
}
251257

252-
private boolean requireOpMsgResponse() {
253-
if (responseExpected) {
254-
return true;
255-
} else {
256-
return payload != null && payload.hasAnotherSplit();
257-
}
258-
}
259-
260258
private boolean isDirectConnectionToReplicaSetMember() {
261259
return clusterConnectionMode == SINGLE
262260
&& getSettings().getServerType() != SHARD_ROUTER

driver-core/src/main/com/mongodb/internal/connection/IdHoldingBsonWriter.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.connection;
1818

19+
import com.mongodb.lang.Nullable;
1920
import org.bson.BsonBinary;
2021
import org.bson.BsonBinaryWriter;
2122
import org.bson.BsonBoolean;
@@ -57,11 +58,17 @@ public class IdHoldingBsonWriter extends LevelCountingBsonWriter {
5758
private LevelCountingBsonWriter idBsonBinaryWriter;
5859
private BasicOutputBuffer outputBuffer;
5960
private String currentFieldName;
61+
private final BsonValue fallbackId;
6062
private BsonValue id;
6163
private boolean idFieldIsAnArray = false;
6264

63-
public IdHoldingBsonWriter(final BsonWriter bsonWriter) {
65+
/**
66+
* @param fallbackId The "_id" field value to use if the top-level document written via this {@link BsonWriter}
67+
* does not have "_id". If {@code null}, then a new {@link BsonObjectId} is generated instead.
68+
*/
69+
public IdHoldingBsonWriter(final BsonWriter bsonWriter, @Nullable final BsonObjectId fallbackId) {
6470
super(bsonWriter);
71+
this.fallbackId = fallbackId;
6572
}
6673

6774
@Override
@@ -99,7 +106,7 @@ public void writeEndDocument() {
99106
}
100107

101108
if (getCurrentLevel() == 0 && id == null) {
102-
id = new BsonObjectId();
109+
id = fallbackId == null ? new BsonObjectId() : fallbackId;
103110
writeObjectId(ID_FIELD_NAME, id.asObjectId().getValue());
104111
}
105112
super.writeEndDocument();
@@ -408,6 +415,15 @@ public void flush() {
408415
super.flush();
409416
}
410417

418+
/**
419+
* Returns either the value of the "_id" field from the top-level document written via this {@link BsonWriter},
420+
* provided that the document is not {@link RawBsonDocument},
421+
* or the generated {@link BsonObjectId}.
422+
* If the document is {@link RawBsonDocument}, then returns {@code null}.
423+
* <p>
424+
* {@linkplain #flush() Flushing} is not required before calling this method.</p>
425+
*/
426+
@Nullable
411427
public BsonValue getId() {
412428
return id;
413429
}

driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.mongodb.internal.bulk.WriteRequestWithIndex;
2424
import org.bson.BsonDocument;
2525
import org.bson.BsonDocumentWrapper;
26+
import org.bson.BsonObjectId;
2627
import org.bson.BsonValue;
2728
import org.bson.BsonWriter;
2829
import org.bson.codecs.BsonValueCodecProvider;
@@ -37,6 +38,7 @@
3738
import java.util.stream.Collectors;
3839

3940
import static com.mongodb.assertions.Assertions.assertNotNull;
41+
import static com.mongodb.assertions.Assertions.assertTrue;
4042
import static com.mongodb.assertions.Assertions.isTrue;
4143
import static com.mongodb.assertions.Assertions.notNull;
4244
import static com.mongodb.internal.connection.SplittablePayload.Type.INSERT;
@@ -57,6 +59,7 @@ public final class SplittablePayload {
5759
private final WriteRequestEncoder writeRequestEncoder = new WriteRequestEncoder();
5860
private final Type payloadType;
5961
private final List<WriteRequestWithIndex> writeRequestWithIndexes;
62+
private final boolean ordered;
6063
private final Map<Integer, BsonValue> insertedIds = new HashMap<>();
6164
private int position = 0;
6265

@@ -91,9 +94,10 @@ public enum Type {
9194
* @param payloadType the payload type
9295
* @param writeRequestWithIndexes the writeRequests
9396
*/
94-
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes) {
97+
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes, final boolean ordered) {
9598
this.payloadType = notNull("batchType", payloadType);
9699
this.writeRequestWithIndexes = notNull("writeRequests", writeRequestWithIndexes);
100+
this.ordered = ordered;
97101
}
98102

99103
/**
@@ -117,7 +121,7 @@ public String getPayloadName() {
117121
}
118122

119123
boolean hasPayload() {
120-
return writeRequestWithIndexes.size() > 0;
124+
return !writeRequestWithIndexes.isEmpty();
121125
}
122126

123127
public int size() {
@@ -137,10 +141,6 @@ public List<BsonDocument> getPayload() {
137141
.collect(Collectors.toList());
138142
}
139143

140-
public List<WriteRequestWithIndex> getWriteRequestWithIndexes() {
141-
return writeRequestWithIndexes;
142-
}
143-
144144
/**
145145
* @return the current position in the payload
146146
*/
@@ -160,16 +160,22 @@ public void setPosition(final int position) {
160160
* @return true if there are more values after the current position
161161
*/
162162
public boolean hasAnotherSplit() {
163+
// this method must be not called before this payload having been encoded
164+
assertTrue(position > 0);
163165
return writeRequestWithIndexes.size() > position;
164166
}
165167

168+
boolean isOrdered() {
169+
return ordered;
170+
}
171+
166172
/**
167173
* @return a new SplittablePayload containing only the values after the current position.
168174
*/
169175
public SplittablePayload getNextSplit() {
170176
isTrue("hasAnotherSplit", hasAnotherSplit());
171177
List<WriteRequestWithIndex> nextPayLoad = writeRequestWithIndexes.subList(position, writeRequestWithIndexes.size());
172-
return new SplittablePayload(payloadType, nextPayLoad);
178+
return new SplittablePayload(payloadType, nextPayLoad, ordered);
173179
}
174180

175181
/**
@@ -191,10 +197,23 @@ public void encode(final BsonWriter writer, final WriteRequestWithIndex writeReq
191197
InsertRequest insertRequest = (InsertRequest) writeRequestWithIndex.getWriteRequest();
192198
BsonDocument document = insertRequest.getDocument();
193199

194-
IdHoldingBsonWriter idHoldingBsonWriter = new IdHoldingBsonWriter(writer);
195-
getCodec(document).encode(idHoldingBsonWriter, document,
196-
EncoderContext.builder().isEncodingCollectibleDocument(true).build());
197-
insertedIds.put(writeRequestWithIndex.getIndex(), idHoldingBsonWriter.getId());
200+
BsonValue documentId = insertedIds.compute(
201+
writeRequestWithIndex.getIndex(),
202+
(writeRequestIndex, writeRequestDocumentId) -> {
203+
IdHoldingBsonWriter idHoldingBsonWriter = new IdHoldingBsonWriter(
204+
writer,
205+
// Reuse `writeRequestDocumentId` if it may have been generated
206+
// by `IdHoldingBsonWriter` in a previous attempt.
207+
// If its type is not `BsonObjectId`, we know it could not have been generated.
208+
writeRequestDocumentId instanceof BsonObjectId ? writeRequestDocumentId.asObjectId() : null);
209+
getCodec(document).encode(idHoldingBsonWriter, document,
210+
EncoderContext.builder().isEncodingCollectibleDocument(true).build());
211+
return idHoldingBsonWriter.getId();
212+
});
213+
if (documentId == null) {
214+
// we must add an entry anyway because we rely on all the indexes being present
215+
insertedIds.put(writeRequestWithIndex.getIndex(), null);
216+
}
198217
} else if (writeRequestWithIndex.getType() == WriteRequest.Type.UPDATE
199218
|| writeRequestWithIndex.getType() == WriteRequest.Type.REPLACE) {
200219
UpdateRequest update = (UpdateRequest) writeRequestWithIndex.getWriteRequest();

driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoBulkWriteException;
20-
import com.mongodb.MongoClientException;
2120
import com.mongodb.MongoInternalException;
2221
import com.mongodb.MongoNamespace;
2322
import com.mongodb.WriteConcern;
@@ -65,6 +64,7 @@
6564
import static com.mongodb.internal.bulk.WriteRequest.Type.REPLACE;
6665
import static com.mongodb.internal.bulk.WriteRequest.Type.UPDATE;
6766
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
67+
import static com.mongodb.internal.operation.MixedBulkWriteOperation.commandWriteConcern;
6868
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
6969
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
7070
import static com.mongodb.internal.operation.WriteConcernHelper.createWriteConcernError;
@@ -101,12 +101,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
101101
final List<? extends WriteRequest> writeRequests,
102102
final OperationContext operationContext,
103103
@Nullable final BsonValue comment, @Nullable final BsonDocument variables) {
104-
SessionContext sessionContext = operationContext.getSessionContext();
105-
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !sessionContext.hasActiveTransaction()
106-
&& !writeConcern.isAcknowledged()) {
107-
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
108-
}
109-
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext);
104+
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, operationContext.getSessionContext());
110105
List<WriteRequestWithIndex> writeRequestsWithIndex = new ArrayList<>();
111106
boolean writeRequestsAreRetryable = true;
112107
for (int i = 0; i < writeRequests.size(); i++) {
@@ -159,7 +154,7 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
159154

160155
this.indexMap = indexMap;
161156
this.unprocessed = unprocessedItems;
162-
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems);
157+
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered);
163158
this.operationContext = operationContext;
164159
this.comment = comment;
165160
this.variables = variables;
@@ -169,9 +164,8 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
169164
if (!payloadItems.isEmpty()) {
170165
command.put(getCommandName(batchType), new BsonString(namespace.getCollectionName()));
171166
command.put("ordered", new BsonBoolean(ordered));
172-
if (!writeConcern.isServerDefault() && !sessionContext.hasActiveTransaction()) {
173-
command.put("writeConcern", writeConcern.asDocument());
174-
}
167+
commandWriteConcern(writeConcern, sessionContext).ifPresent(value ->
168+
command.put("writeConcern", value.asDocument()));
175169
if (bypassDocumentValidation != null) {
176170
command.put("bypassDocumentValidation", new BsonBoolean(bypassDocumentValidation));
177171
}

0 commit comments

Comments
 (0)