Skip to content

Commit a2805a6

Browse files
committed
Make SplittablePayload extend OpMsgSequence
Justification: This brings SplittablePayload closer in design to DualSplittablePayloads, reducing a potential source of confusion for future readers. We could go further in this direction, but this is a start.
1 parent 7d7f3eb commit a2805a6

File tree

8 files changed

+46
-47
lines changed

8 files changed

+46
-47
lines changed

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,8 @@ boolean isResponseExpected() {
206206
if (responseExpected) {
207207
return true;
208208
} else {
209-
if (sequences instanceof ValidatableSplittablePayload) {
210-
ValidatableSplittablePayload validatableSplittablePayload = (ValidatableSplittablePayload) sequences;
211-
SplittablePayload payload = validatableSplittablePayload.getSplittablePayload();
209+
if (sequences instanceof SplittablePayload) {
210+
SplittablePayload payload = (SplittablePayload) sequences;
212211
return payload.isOrdered() && payload.hasAnotherSplit();
213212
} else if (sequences instanceof DualSplittablePayloads) {
214213
return assertNotNull(dualSplittablePayloadsRequireResponse);
@@ -235,13 +234,12 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final ByteBufferBsonOut
235234
ArrayList<BsonElement> extraElements = getExtraElements(operationContext);
236235

237236
int commandDocumentSizeInBytes = writeDocument(command, bsonOutput, commandFieldNameValidator, true);
238-
if (sequences instanceof ValidatableSplittablePayload) {
237+
if (sequences instanceof SplittablePayload) {
239238
appendElementsToDocument(bsonOutput, commandStartPosition, extraElements);
240-
ValidatableSplittablePayload validatableSplittablePayload = (ValidatableSplittablePayload) sequences;
241-
SplittablePayload payload = validatableSplittablePayload.getSplittablePayload();
239+
SplittablePayload payload = (SplittablePayload) sequences;
242240
writeOpMsgSectionWithPayloadType1(bsonOutput, payload.getPayloadName(), () -> {
243241
writePayload(
244-
new BsonBinaryWriter(bsonOutput, validatableSplittablePayload.getFieldNameValidator()),
242+
new BsonBinaryWriter(bsonOutput, payload.getFieldNameValidator()),
245243
bsonOutput, getSettings(), messageStartPosition, payload, getSettings().getMaxDocumentSize()
246244
);
247245
return null;

driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.bson.BsonObjectId;
2727
import org.bson.BsonValue;
2828
import org.bson.BsonWriter;
29+
import org.bson.FieldNameValidator;
2930
import org.bson.codecs.BsonValueCodecProvider;
3031
import org.bson.codecs.Codec;
3132
import org.bson.codecs.Encoder;
@@ -54,8 +55,9 @@
5455
*
5556
* <p>This class is not part of the public API and may be removed or changed at any time</p>
5657
*/
57-
public final class SplittablePayload {
58+
public final class SplittablePayload extends OpMsgSequences {
5859
private static final CodecRegistry REGISTRY = fromProviders(new BsonValueCodecProvider());
60+
private final FieldNameValidator fieldNameValidator;
5961
private final WriteRequestEncoder writeRequestEncoder = new WriteRequestEncoder();
6062
private final Type payloadType;
6163
private final List<WriteRequestWithIndex> writeRequestWithIndexes;
@@ -94,10 +96,16 @@ public enum Type {
9496
* @param payloadType the payload type
9597
* @param writeRequestWithIndexes the writeRequests
9698
*/
97-
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes, final boolean ordered) {
99+
public SplittablePayload(final Type payloadType, final List<WriteRequestWithIndex> writeRequestWithIndexes, final boolean ordered,
100+
final FieldNameValidator fieldNameValidator) {
98101
this.payloadType = notNull("batchType", payloadType);
99102
this.writeRequestWithIndexes = notNull("writeRequests", writeRequestWithIndexes);
100103
this.ordered = ordered;
104+
this.fieldNameValidator = fieldNameValidator;
105+
}
106+
107+
public FieldNameValidator getFieldNameValidator() {
108+
return fieldNameValidator;
101109
}
102110

103111
/**
@@ -175,7 +183,7 @@ boolean isOrdered() {
175183
public SplittablePayload getNextSplit() {
176184
isTrue("hasAnotherSplit", hasAnotherSplit());
177185
List<WriteRequestWithIndex> nextPayLoad = writeRequestWithIndexes.subList(position, writeRequestWithIndexes.size());
178-
return new SplittablePayload(payloadType, nextPayLoad, ordered);
186+
return new SplittablePayload(payloadType, nextPayLoad, ordered, fieldNameValidator);
179187
}
180188

181189
/**

driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@
6363
import static com.mongodb.internal.bulk.WriteRequest.Type.INSERT;
6464
import static com.mongodb.internal.bulk.WriteRequest.Type.REPLACE;
6565
import static com.mongodb.internal.bulk.WriteRequest.Type.UPDATE;
66-
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
6766
import static com.mongodb.internal.operation.CommandOperationHelper.commandWriteConcern;
67+
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
6868
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
6969
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
7070
import static com.mongodb.internal.operation.WriteConcernHelper.createWriteConcernError;
@@ -154,7 +154,7 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
154154

155155
this.indexMap = indexMap;
156156
this.unprocessed = unprocessedItems;
157-
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered);
157+
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered, getFieldNameValidator());
158158
this.operationContext = operationContext;
159159
this.comment = comment;
160160
this.variables = variables;

driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import com.mongodb.internal.connection.MongoWriteConcernWithResponseException;
4040
import com.mongodb.internal.connection.OperationContext;
4141
import com.mongodb.internal.connection.ProtocolHelper;
42-
import com.mongodb.internal.connection.ValidatableSplittablePayload;
4342
import com.mongodb.internal.operation.retry.AttachmentKeys;
4443
import com.mongodb.internal.session.SessionContext;
4544
import com.mongodb.internal.validator.NoOpFieldNameValidator;
@@ -422,8 +421,7 @@ private BsonDocument executeCommand(
422421
final Connection connection,
423422
final BulkWriteBatch batch) {
424423
return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
425-
operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
426-
new ValidatableSplittablePayload(batch.getPayload(), batch.getFieldNameValidator()));
424+
operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload());
427425
}
428426

429427
private void executeCommandAsync(
@@ -433,8 +431,7 @@ private void executeCommandAsync(
433431
final BulkWriteBatch batch,
434432
final SingleResultCallback<BsonDocument> callback) {
435433
connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
436-
operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
437-
new ValidatableSplittablePayload(batch.getPayload(), batch.getFieldNameValidator()), callback);
434+
operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload(), callback);
438435
}
439436

440437
private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) {

driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class CommandMessageSpecification extends Specification {
154154
MessageSettings.builder().maxWireVersion(maxWireVersion).build(), true,
155155
payload == null
156156
? OpMsgSequences.EmptyOpMsgSequences.INSTANCE
157-
: new ValidatableSplittablePayload(payload, NoOpFieldNameValidator.INSTANCE),
157+
: payload,
158158
ClusterConnectionMode.MULTIPLE, null)
159159
def output = new ByteBufferBsonOutput(new SimpleBufferProvider())
160160
message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE,
@@ -177,7 +177,8 @@ class CommandMessageSpecification extends Specification {
177177
new BsonDocument('insert', new BsonString('coll')),
178178
new SplittablePayload(INSERT, [new BsonDocument('_id', new BsonInt32(1)),
179179
new BsonDocument('_id', new BsonInt32(2))]
180-
.withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true),
180+
.withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true,
181+
NoOpFieldNameValidator.INSTANCE),
181182
],
182183
[
183184
LATEST_WIRE_VERSION,
@@ -198,9 +199,9 @@ class CommandMessageSpecification extends Specification {
198199
new BsonDocument('_id', new BsonInt32(3)).append('c', new BsonBinary(new byte[450])),
199200
new BsonDocument('_id', new BsonInt32(4)).append('b', new BsonBinary(new byte[441])),
200201
new BsonDocument('_id', new BsonInt32(5)).append('c', new BsonBinary(new byte[451]))]
201-
.withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true)
202+
.withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true, fieldNameValidator)
202203
def message = new CommandMessage(namespace, insertCommand, fieldNameValidator, ReadPreference.primary(), messageSettings,
203-
false, new ValidatableSplittablePayload(payload, fieldNameValidator), ClusterConnectionMode.MULTIPLE, null)
204+
false, payload, ClusterConnectionMode.MULTIPLE, null)
204205
def output = new ByteBufferBsonOutput(new SimpleBufferProvider())
205206
def sessionContext = Stub(SessionContext) {
206207
getReadConcern() >> ReadConcern.DEFAULT
@@ -224,7 +225,7 @@ class CommandMessageSpecification extends Specification {
224225
when:
225226
payload = payload.getNextSplit()
226227
message = new CommandMessage(namespace, insertCommand, fieldNameValidator, ReadPreference.primary(), messageSettings,
227-
false, new ValidatableSplittablePayload(payload, fieldNameValidator), ClusterConnectionMode.MULTIPLE, null)
228+
false, payload, ClusterConnectionMode.MULTIPLE, null)
228229
output.truncateToPosition(0)
229230
message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext, Stub(TimeoutContext), null))
230231
byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray()))
@@ -242,7 +243,7 @@ class CommandMessageSpecification extends Specification {
242243
when:
243244
payload = payload.getNextSplit()
244245
message = new CommandMessage(namespace, insertCommand, fieldNameValidator, ReadPreference.primary(), messageSettings,
245-
false, new ValidatableSplittablePayload(payload, fieldNameValidator), ClusterConnectionMode.MULTIPLE, null)
246+
false, payload, ClusterConnectionMode.MULTIPLE, null)
246247
output.truncateToPosition(0)
247248
message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext, Stub(TimeoutContext), null))
248249
byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray()))
@@ -260,7 +261,7 @@ class CommandMessageSpecification extends Specification {
260261
when:
261262
payload = payload.getNextSplit()
262263
message = new CommandMessage(namespace, insertCommand, fieldNameValidator, ReadPreference.primary(), messageSettings,
263-
false, new ValidatableSplittablePayload(payload, fieldNameValidator), ClusterConnectionMode.MULTIPLE, null)
264+
false, payload, ClusterConnectionMode.MULTIPLE, null)
264265
output.truncateToPosition(0)
265266
message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE,
266267
sessionContext,
@@ -288,9 +289,9 @@ class CommandMessageSpecification extends Specification {
288289
def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonBinary(new byte[900])),
289290
new BsonDocument('b', new BsonBinary(new byte[450])),
290291
new BsonDocument('c', new BsonBinary(new byte[450]))]
291-
.withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true)
292+
.withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true, fieldNameValidator)
292293
def message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
293-
false, new ValidatableSplittablePayload(payload, fieldNameValidator), ClusterConnectionMode.MULTIPLE, null)
294+
false, payload, ClusterConnectionMode.MULTIPLE, null)
294295
def output = new ByteBufferBsonOutput(new SimpleBufferProvider())
295296
def sessionContext = Stub(SessionContext) {
296297
getReadConcern() >> ReadConcern.DEFAULT
@@ -315,7 +316,7 @@ class CommandMessageSpecification extends Specification {
315316
when:
316317
payload = payload.getNextSplit()
317318
message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
318-
false, new ValidatableSplittablePayload(payload, fieldNameValidator), ClusterConnectionMode.MULTIPLE, null)
319+
false, payload, ClusterConnectionMode.MULTIPLE, null)
319320
output.truncateToPosition(0)
320321
message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext,
321322
Stub(TimeoutContext), null))
@@ -339,9 +340,9 @@ class CommandMessageSpecification extends Specification {
339340
def messageSettings = MessageSettings.builder().maxDocumentSize(900)
340341
.maxWireVersion(LATEST_WIRE_VERSION).build()
341342
def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonBinary(new byte[900]))]
342-
.withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true)
343+
.withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true, fieldNameValidator)
343344
def message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
344-
false, new ValidatableSplittablePayload(payload, fieldNameValidator), ClusterConnectionMode.MULTIPLE, null)
345+
false, payload, ClusterConnectionMode.MULTIPLE, null)
345346
def output = new ByteBufferBsonOutput(new SimpleBufferProvider())
346347
def sessionContext = Stub(SessionContext) {
347348
getReadConcern() >> ReadConcern.DEFAULT
@@ -362,9 +363,9 @@ class CommandMessageSpecification extends Specification {
362363
given:
363364
def messageSettings = MessageSettings.builder().serverType(ServerType.SHARD_ROUTER)
364365
.maxWireVersion(FOUR_DOT_ZERO_WIRE_VERSION).build()
365-
def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonInt32(1))], true)
366+
def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonInt32(1))], true, fieldNameValidator)
366367
def message = new CommandMessage(namespace, command, fieldNameValidator, ReadPreference.primary(), messageSettings,
367-
false, new ValidatableSplittablePayload(payload, fieldNameValidator), ClusterConnectionMode.MULTIPLE, null)
368+
false, payload, ClusterConnectionMode.MULTIPLE, null)
368369
def output = new ByteBufferBsonOutput(new SimpleBufferProvider())
369370
def sessionContext = Stub(SessionContext) {
370371
getReadConcern() >> ReadConcern.DEFAULT

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/CryptConnection.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.mongodb.internal.connection.OperationContext;
2929
import com.mongodb.internal.connection.SplittablePayload;
3030
import com.mongodb.internal.connection.SplittablePayloadBsonWriter;
31-
import com.mongodb.internal.connection.ValidatableSplittablePayload;
3231
import com.mongodb.internal.time.Timeout;
3332
import com.mongodb.internal.validator.MappedFieldNameValidator;
3433
import com.mongodb.lang.Nullable;
@@ -113,10 +112,9 @@ public <T> void commandAsync(final String database, final BsonDocument command,
113112
try {
114113
SplittablePayload payload = null;
115114
FieldNameValidator payloadFieldNameValidator = null;
116-
if (sequences instanceof ValidatableSplittablePayload) {
117-
ValidatableSplittablePayload validatableSplittablePayload = (ValidatableSplittablePayload) sequences;
118-
payload = validatableSplittablePayload.getSplittablePayload();
119-
payloadFieldNameValidator = validatableSplittablePayload.getFieldNameValidator();
115+
if (sequences instanceof SplittablePayload) {
116+
payload = (SplittablePayload) sequences;
117+
payloadFieldNameValidator = payload.getFieldNameValidator();
120118
}
121119
BasicOutputBuffer bsonOutput = new BasicOutputBuffer();
122120
BsonBinaryWriter bsonBinaryWriter = new BsonBinaryWriter(

driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.mongodb.internal.connection.OperationContext;
2727
import com.mongodb.internal.connection.SplittablePayload;
2828
import com.mongodb.internal.connection.SplittablePayloadBsonWriter;
29-
import com.mongodb.internal.connection.ValidatableSplittablePayload;
3029
import com.mongodb.internal.time.Timeout;
3130
import com.mongodb.internal.validator.MappedFieldNameValidator;
3231
import com.mongodb.lang.Nullable;
@@ -101,10 +100,9 @@ public <T> T command(final String database, final BsonDocument command, final Fi
101100

102101
SplittablePayload payload = null;
103102
FieldNameValidator payloadFieldNameValidator = null;
104-
if (sequences instanceof ValidatableSplittablePayload) {
105-
ValidatableSplittablePayload validatableSplittablePayload = (ValidatableSplittablePayload) sequences;
106-
payload = validatableSplittablePayload.getSplittablePayload();
107-
payloadFieldNameValidator = validatableSplittablePayload.getFieldNameValidator();
103+
if (sequences instanceof SplittablePayload) {
104+
payload = (SplittablePayload) sequences;
105+
payloadFieldNameValidator = payload.getFieldNameValidator();
108106
}
109107
BasicOutputBuffer bsonOutput = new BasicOutputBuffer();
110108
BsonBinaryWriter bsonBinaryWriter = new BsonBinaryWriter(new BsonWriterSettings(),

0 commit comments

Comments
 (0)