Skip to content

Commit 7fc074f

Browse files
committed
Merge branch 'main' into string-read-optm
# Conflicts: # bson/src/main/org/bson/ByteBuf.java # bson/src/main/org/bson/ByteBufNIO.java # driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java # driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java
2 parents d43b83d + ea85540 commit 7fc074f

File tree

6 files changed

+1009
-159
lines changed

6 files changed

+1009
-159
lines changed

bson/src/main/org/bson/AbstractBsonWriter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020
import org.bson.types.ObjectId;
2121

2222
import java.io.Closeable;
23+
import java.util.ArrayDeque;
2324
import java.util.Arrays;
25+
import java.util.Deque;
2426
import java.util.List;
2527
import java.util.Map;
26-
import java.util.Stack;
2728

2829
import static java.lang.String.format;
2930
import static org.bson.assertions.Assertions.notNull;
@@ -35,7 +36,7 @@
3536
*/
3637
public abstract class AbstractBsonWriter implements BsonWriter, Closeable {
3738
private final BsonWriterSettings settings;
38-
private final Stack<FieldNameValidator> fieldNameValidatorStack = new Stack<>();
39+
private final Deque<FieldNameValidator> fieldNameValidatorStack = new ArrayDeque<>();
3940
private State state;
4041
private Context context;
4142
private int serializationDepth;

bson/src/main/org/bson/BsonBinaryWriter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
import org.bson.types.Decimal128;
2222
import org.bson.types.ObjectId;
2323

24+
import java.util.ArrayDeque;
25+
import java.util.Deque;
2426
import java.util.List;
25-
import java.util.Stack;
2627

2728
import static java.lang.Math.max;
2829
import static java.lang.String.format;
@@ -37,7 +38,7 @@ public class BsonBinaryWriter extends AbstractBsonWriter {
3738
private final BsonBinaryWriterSettings binaryWriterSettings;
3839

3940
private final BsonOutput bsonOutput;
40-
private final Stack<Integer> maxDocumentSizeStack = new Stack<>();
41+
private final Deque<Integer> maxDocumentSizeStack = new ArrayDeque<>();
4142
private static final int ARRAY_INDEXES_CACHE_SIZE = 1000;
4243
private static final byte[] ARRAY_INDEXES_BUFFER;
4344
private static final int[] ARRAY_INDEXES_OFFSETS;

bson/src/main/org/bson/io/OutputBuffer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public void writeLong(final long value) {
197197
writeInt64(value);
198198
}
199199

200-
private int writeCharacters(final String str, final boolean checkForNullCharacters) {
200+
protected int writeCharacters(final String str, final boolean checkForNullCharacters) {
201201
int len = str.length();
202202
int total = 0;
203203

driver-core/src/main/com/mongodb/internal/connection/ByteBufferBsonOutput.java

+208-14
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.connection;
1818

19+
import org.bson.BsonSerializationException;
1920
import org.bson.ByteBuf;
2021
import org.bson.io.OutputBuffer;
2122

@@ -25,8 +26,10 @@
2526
import java.util.ArrayList;
2627
import java.util.List;
2728

29+
import static com.mongodb.assertions.Assertions.assertFalse;
2830
import static com.mongodb.assertions.Assertions.assertTrue;
2931
import static com.mongodb.assertions.Assertions.notNull;
32+
import static java.lang.String.format;
3033

3134
/**
3235
* <p>This class is not part of the public API and may be removed or changed at any time</p>
@@ -43,6 +46,7 @@ public class ByteBufferBsonOutput extends OutputBuffer {
4346
private int curBufferIndex = 0;
4447
private int position = 0;
4548
private boolean closed;
49+
private ByteBuf currentByteBuffer;
4650

4751
/**
4852
* Construct an instance that uses the given buffer provider to allocate byte buffers as needs as it grows.
@@ -169,20 +173,29 @@ public void writeByte(final int value) {
169173
}
170174

171175
private ByteBuf getCurrentByteBuffer() {
172-
ByteBuf curByteBuffer = getByteBufferAtIndex(curBufferIndex);
173-
if (curByteBuffer.hasRemaining()) {
174-
return curByteBuffer;
176+
if (currentByteBuffer == null) {
177+
currentByteBuffer = getByteBufferAtIndex(curBufferIndex);
178+
}
179+
if (currentByteBuffer.hasRemaining()) {
180+
return currentByteBuffer;
175181
}
176182

177183
curBufferIndex++;
178-
return getByteBufferAtIndex(curBufferIndex);
184+
currentByteBuffer = getByteBufferAtIndex(curBufferIndex);
185+
return currentByteBuffer;
186+
}
187+
188+
private ByteBuf getNextByteBuffer() {
189+
assertFalse(bufferList.get(curBufferIndex).hasRemaining());
190+
return getByteBufferAtIndex(++curBufferIndex);
179191
}
180192

181193
private ByteBuf getByteBufferAtIndex(final int index) {
182194
if (bufferList.size() < index + 1) {
183-
bufferList.add(bufferProvider.getBuffer(index >= (MAX_SHIFT - INITIAL_SHIFT)
184-
? MAX_BUFFER_SIZE
185-
: Math.min(INITIAL_BUFFER_SIZE << index, MAX_BUFFER_SIZE)));
195+
ByteBuf buffer = bufferProvider.getBuffer(index >= (MAX_SHIFT - INITIAL_SHIFT)
196+
? MAX_BUFFER_SIZE
197+
: Math.min(INITIAL_BUFFER_SIZE << index, MAX_BUFFER_SIZE));
198+
bufferList.add(buffer);
186199
}
187200
return bufferList.get(index);
188201
}
@@ -225,6 +238,16 @@ public List<ByteBuf> getByteBuffers() {
225238
return buffers;
226239
}
227240

241+
public List<ByteBuf> getDuplicateByteBuffers() {
242+
ensureOpen();
243+
244+
List<ByteBuf> buffers = new ArrayList<>(bufferList.size());
245+
for (final ByteBuf cur : bufferList) {
246+
buffers.add(cur.duplicate().order(ByteOrder.LITTLE_ENDIAN));
247+
}
248+
return buffers;
249+
}
250+
228251

229252
@Override
230253
public int pipe(final OutputStream out) throws IOException {
@@ -233,14 +256,18 @@ public int pipe(final OutputStream out) throws IOException {
233256
byte[] tmp = new byte[INITIAL_BUFFER_SIZE];
234257

235258
int total = 0;
236-
for (final ByteBuf cur : getByteBuffers()) {
237-
ByteBuf dup = cur.duplicate();
238-
while (dup.hasRemaining()) {
239-
int numBytesToCopy = Math.min(dup.remaining(), tmp.length);
240-
dup.get(tmp, 0, numBytesToCopy);
241-
out.write(tmp, 0, numBytesToCopy);
259+
List<ByteBuf> byteBuffers = getByteBuffers();
260+
try {
261+
for (final ByteBuf cur : byteBuffers) {
262+
while (cur.hasRemaining()) {
263+
int numBytesToCopy = Math.min(cur.remaining(), tmp.length);
264+
cur.get(tmp, 0, numBytesToCopy);
265+
out.write(tmp, 0, numBytesToCopy);
266+
}
267+
total += cur.limit();
242268
}
243-
total += dup.limit();
269+
} finally {
270+
byteBuffers.forEach(ByteBuf::release);
244271
}
245272
return total;
246273
}
@@ -259,6 +286,10 @@ public void truncateToPosition(final int newPosition) {
259286

260287
bufferList.get(bufferPositionPair.bufferIndex).position(bufferPositionPair.position);
261288

289+
if (bufferPositionPair.bufferIndex + 1 < bufferList.size()) {
290+
currentByteBuffer = null;
291+
}
292+
262293
while (bufferList.size() > bufferPositionPair.bufferIndex + 1) {
263294
ByteBuf buffer = bufferList.remove(bufferList.size() - 1);
264295
buffer.release();
@@ -286,6 +317,7 @@ public void close() {
286317
for (final ByteBuf cur : bufferList) {
287318
cur.release();
288319
}
320+
currentByteBuffer = null;
289321
bufferList.clear();
290322
closed = true;
291323
}
@@ -325,6 +357,7 @@ private void merge(final ByteBufferBsonOutput branch) {
325357
bufferList.addAll(branch.bufferList);
326358
curBufferIndex += branch.curBufferIndex + 1;
327359
position += branch.position;
360+
currentByteBuffer = null;
328361
}
329362

330363
public static final class Branch extends ByteBufferBsonOutput {
@@ -360,4 +393,165 @@ private static final class BufferPositionPair {
360393
this.position = position;
361394
}
362395
}
396+
397+
protected int writeCharacters(final String str, final boolean checkNullTermination) {
398+
int stringLength = str.length();
399+
int sp = 0;
400+
int prevPos = position;
401+
402+
ByteBuf curBuffer = getCurrentByteBuffer();
403+
int curBufferPos = curBuffer.position();
404+
int curBufferLimit = curBuffer.limit();
405+
int remaining = curBufferLimit - curBufferPos;
406+
407+
if (curBuffer.isBackedByArray()) {
408+
byte[] dst = curBuffer.array();
409+
int arrayOffset = curBuffer.arrayOffset();
410+
if (remaining >= str.length() + 1) {
411+
// Write ASCII characters directly to the array until we hit a non-ASCII character.
412+
sp = writeOnArrayAscii(str, dst, arrayOffset + curBufferPos, checkNullTermination);
413+
curBufferPos += sp;
414+
// If the whole string was written as ASCII, append the null terminator.
415+
if (sp == stringLength) {
416+
dst[arrayOffset + curBufferPos++] = 0;
417+
position += sp + 1;
418+
curBuffer.position(curBufferPos);
419+
return sp + 1;
420+
}
421+
// Otherwise, update the position to reflect the partial write.
422+
position += sp;
423+
curBuffer.position(curBufferPos);
424+
}
425+
}
426+
427+
// We get here, when the buffer is not backed by an array, or when the string contains at least one non-ASCII characters.
428+
return writeOnBuffers(str,
429+
checkNullTermination,
430+
sp,
431+
stringLength,
432+
curBufferLimit,
433+
curBufferPos,
434+
curBuffer,
435+
prevPos);
436+
}
437+
438+
private int writeOnBuffers(final String str,
439+
final boolean checkNullTermination,
440+
final int stringPointer,
441+
final int stringLength,
442+
final int bufferLimit,
443+
final int bufferPos,
444+
final ByteBuf buffer,
445+
final int prevPos) {
446+
int remaining;
447+
int sp = stringPointer;
448+
int curBufferPos = bufferPos;
449+
int curBufferLimit = bufferLimit;
450+
ByteBuf curBuffer = buffer;
451+
while (sp < stringLength) {
452+
remaining = curBufferLimit - curBufferPos;
453+
int c = str.charAt(sp);
454+
455+
if (checkNullTermination && c == 0x0) {
456+
throw new BsonSerializationException(
457+
format("BSON cstring '%s' is not valid because it contains a null character " + "at index %d", str, sp));
458+
}
459+
460+
if (c < 0x80) {
461+
if (remaining == 0) {
462+
curBuffer = getNextByteBuffer();
463+
curBufferPos = 0;
464+
curBufferLimit = curBuffer.limit();
465+
}
466+
curBuffer.put((byte) c);
467+
curBufferPos++;
468+
position++;
469+
} else if (c < 0x800) {
470+
if (remaining < 2) {
471+
// Not enough space: use write() to handle buffer boundary
472+
write((byte) (0xc0 + (c >> 6)));
473+
write((byte) (0x80 + (c & 0x3f)));
474+
475+
curBuffer = getCurrentByteBuffer();
476+
curBufferPos = curBuffer.position();
477+
curBufferLimit = curBuffer.limit();
478+
} else {
479+
curBuffer.put((byte) (0xc0 + (c >> 6)));
480+
curBuffer.put((byte) (0x80 + (c & 0x3f)));
481+
curBufferPos += 2;
482+
position += 2;
483+
}
484+
} else {
485+
// Handle multibyte characters (may involve surrogate pairs).
486+
c = Character.codePointAt(str, sp);
487+
/*
488+
Malformed surrogate pairs are encoded as-is (3 byte code unit) without substituting any code point.
489+
This known deviation from the spec and current functionality remains for backward compatibility.
490+
Ticket: JAVA-5575
491+
*/
492+
if (c < 0x10000) {
493+
if (remaining < 3) {
494+
write((byte) (0xe0 + (c >> 12)));
495+
write((byte) (0x80 + ((c >> 6) & 0x3f)));
496+
write((byte) (0x80 + (c & 0x3f)));
497+
498+
curBuffer = getCurrentByteBuffer();
499+
curBufferPos = curBuffer.position();
500+
curBufferLimit = curBuffer.limit();
501+
} else {
502+
curBuffer.put((byte) (0xe0 + (c >> 12)));
503+
curBuffer.put((byte) (0x80 + ((c >> 6) & 0x3f)));
504+
curBuffer.put((byte) (0x80 + (c & 0x3f)));
505+
curBufferPos += 3;
506+
position += 3;
507+
}
508+
} else {
509+
if (remaining < 4) {
510+
write((byte) (0xf0 + (c >> 18)));
511+
write((byte) (0x80 + ((c >> 12) & 0x3f)));
512+
write((byte) (0x80 + ((c >> 6) & 0x3f)));
513+
write((byte) (0x80 + (c & 0x3f)));
514+
515+
curBuffer = getCurrentByteBuffer();
516+
curBufferPos = curBuffer.position();
517+
curBufferLimit = curBuffer.limit();
518+
} else {
519+
curBuffer.put((byte) (0xf0 + (c >> 18)));
520+
curBuffer.put((byte) (0x80 + ((c >> 12) & 0x3f)));
521+
curBuffer.put((byte) (0x80 + ((c >> 6) & 0x3f)));
522+
curBuffer.put((byte) (0x80 + (c & 0x3f)));
523+
curBufferPos += 4;
524+
position += 4;
525+
}
526+
}
527+
}
528+
sp += Character.charCount(c);
529+
}
530+
531+
getCurrentByteBuffer().put((byte) 0);
532+
position++;
533+
return position - prevPos;
534+
}
535+
536+
private static int writeOnArrayAscii(final String str,
537+
final byte[] dst,
538+
final int arrayPosition,
539+
final boolean checkNullTermination) {
540+
int pos = arrayPosition;
541+
int sp = 0;
542+
// Fast common path: This tight loop is JIT-friendly (simple, no calls, few branches),
543+
// It might be unrolled for performance.
544+
for (; sp < str.length(); sp++, pos++) {
545+
char c = str.charAt(sp);
546+
if (checkNullTermination && c == 0) {
547+
throw new BsonSerializationException(
548+
format("BSON cstring '%s' is not valid because it contains a null character " + "at index %d", str, sp));
549+
}
550+
if (c >= 0x80) {
551+
break;
552+
}
553+
dst[pos] = (byte) c;
554+
}
555+
return sp;
556+
}
363557
}

driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufSpecification.groovy

+1-5
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,7 @@ class ByteBufSpecification extends Specification {
249249
@Override
250250
ByteBuf getBuffer(final int size) {
251251
io.netty.buffer.ByteBuf buffer = allocator.directBuffer(size, size)
252-
try {
253-
new NettyByteBuf(buffer.retain())
254-
} finally {
255-
buffer.release();
256-
}
252+
new NettyByteBuf(buffer)
257253
}
258254
}
259255
}

0 commit comments

Comments
 (0)