Skip to content

Optimize String write #1651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 17, 2025
20 changes: 20 additions & 0 deletions bson/src/main/org/bson/ByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,26 @@ public interface ByteBuf {
*/
byte[] array();

/**
* <p>States whether this buffer is backed by an accessible byte array.</p>
*
* <p>If this method returns {@code true} then the {@link #array()} and {@link #arrayOffset()} methods may safely be invoked.</p>
*
* @return {@code true} if, and only if, this buffer is backed by an array and is not read-only
* @since 5.5
*/
boolean hasArray();

/**
* Returns the offset of the first byte within the backing byte array of
* this buffer.
*
* @throws java.nio.ReadOnlyBufferException If this buffer is backed by an array but is read-only
* @throws UnsupportedOperationException if this buffer is not backed by an accessible array
* @since 5.5
*/
int arrayOffset();

/**
* Returns this buffer's limit.
*
Expand Down
10 changes: 10 additions & 0 deletions bson/src/main/org/bson/ByteBufNIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ public byte[] array() {
return buf.array();
}

@Override
public boolean hasArray() {
return buf.hasArray();
}

@Override
public int arrayOffset() {
return buf.arrayOffset();
}

@Override
public int limit() {
return buf.limit();
Expand Down
2 changes: 1 addition & 1 deletion bson/src/main/org/bson/io/OutputBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void writeLong(final long value) {
writeInt64(value);
}

private int writeCharacters(final String str, final boolean checkForNullCharacters) {
protected int writeCharacters(final String str, final boolean checkForNullCharacters) {
int len = str.length();
int total = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.connection;

import org.bson.BsonSerializationException;
import org.bson.ByteBuf;
import org.bson.io.OutputBuffer;

Expand All @@ -25,8 +26,10 @@
import java.util.ArrayList;
import java.util.List;

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static java.lang.String.format;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
Expand Down Expand Up @@ -100,11 +103,17 @@ private ByteBuf getCurrentByteBuffer() {
return getByteBufferAtIndex(curBufferIndex);
}

private ByteBuf getNextByteBuffer() {
assertFalse(bufferList.get(curBufferIndex).hasRemaining());
return getByteBufferAtIndex(++curBufferIndex);
}

private ByteBuf getByteBufferAtIndex(final int index) {
if (bufferList.size() < index + 1) {
bufferList.add(bufferProvider.getBuffer(index >= (MAX_SHIFT - INITIAL_SHIFT)
? MAX_BUFFER_SIZE
: Math.min(INITIAL_BUFFER_SIZE << index, MAX_BUFFER_SIZE)));
ByteBuf buffer = bufferProvider.getBuffer(index >= (MAX_SHIFT - INITIAL_SHIFT)
? MAX_BUFFER_SIZE
: Math.min(INITIAL_BUFFER_SIZE << index, MAX_BUFFER_SIZE));
bufferList.add(buffer);
}
return bufferList.get(index);
}
Expand Down Expand Up @@ -147,6 +156,16 @@ public List<ByteBuf> getByteBuffers() {
return buffers;
}

public List<ByteBuf> getDuplicateByteBuffers() {
ensureOpen();

List<ByteBuf> buffers = new ArrayList<>(bufferList.size());
for (final ByteBuf cur : bufferList) {
buffers.add(cur.duplicate().order(ByteOrder.LITTLE_ENDIAN));
}
return buffers;
}


@Override
public int pipe(final OutputStream out) throws IOException {
Expand All @@ -155,14 +174,18 @@ public int pipe(final OutputStream out) throws IOException {
byte[] tmp = new byte[INITIAL_BUFFER_SIZE];

int total = 0;
for (final ByteBuf cur : getByteBuffers()) {
ByteBuf dup = cur.duplicate();
while (dup.hasRemaining()) {
int numBytesToCopy = Math.min(dup.remaining(), tmp.length);
dup.get(tmp, 0, numBytesToCopy);
out.write(tmp, 0, numBytesToCopy);
List<ByteBuf> byteBuffers = getByteBuffers();
try {
for (final ByteBuf cur : byteBuffers) {
while (cur.hasRemaining()) {
int numBytesToCopy = Math.min(cur.remaining(), tmp.length);
cur.get(tmp, 0, numBytesToCopy);
out.write(tmp, 0, numBytesToCopy);
}
total += cur.limit();
}
total += dup.limit();
} finally {
byteBuffers.forEach(ByteBuf::release);
}
return total;
}
Expand Down Expand Up @@ -282,4 +305,129 @@ private static final class BufferPositionPair {
this.position = position;
}
}

protected int writeCharacters(final String str, final boolean checkNullTermination) {
int len = str.length();
int sp = 0;
int prevPos = position;

ByteBuf buf = getCurrentByteBuffer();
int currBufferPos = buf.position();
int limit = buf.limit();
int remaining = limit - currBufferPos;

if (buf.hasArray()) {
byte[] dst = buf.array();
int arrayOffset = buf.arrayOffset();
if (remaining >= str.length() + 1) {
sp = writeOnArrayAscii(str, dst, arrayOffset + currBufferPos, checkNullTermination);
currBufferPos += sp;
if (sp == len) {
dst[arrayOffset + currBufferPos++] = 0;
position += sp + 1;
buf.position(currBufferPos);
return sp + 1;
}
position += sp;
buf.position(currBufferPos);
}
}

while (sp < len) {
remaining = limit - currBufferPos;
int c = str.charAt(sp);

if (checkNullTermination && c == 0x0) {
throw new BsonSerializationException(
format("BSON cstring '%s' is not valid because it contains a null character " + "at index %d", str, sp));
}

if (c < 0x80) {
if (remaining == 0) {
buf = getNextByteBuffer();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still suggest to give at shot at the PR I made which use a separate index to access single bytes in the internalNio Buffer within the Netty buffers, for two reasons:

  • NIO ByteBuffer can benefit from additional optimizations from the JVM since it's a known type to it
  • Netty buffer read/write both move forward the internal indexes and force Netty to verify accessibility of the buffer for each operation, which have some Java Memory Model effects (.e.g. any subsequent load has to happen for real, each time!)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - accessing the NIO buffer directly sounds like a potential win. I’m aiming to keep this PR focused and incremental for easier review and integration. We could consider Netty-specific optimizations in a follow-up PR/ scope, once we have Netty benchmarks running in CI

currBufferPos = 0;
limit = buf.limit();
}
buf.put((byte) c);
currBufferPos++;
position++;
} else if (c < 0x800) {
if (remaining < 2) {
write((byte) (0xc0 + (c >> 6)));
write((byte) (0x80 + (c & 0x3f)));

buf = getCurrentByteBuffer();
currBufferPos = buf.position();
limit = buf.limit();
} else {
buf.put((byte) (0xc0 + (c >> 6)));
buf.put((byte) (0x80 + (c & 0x3f)));
currBufferPos += 2;
position += 2;
}
} else {
c = Character.codePointAt(str, sp);
if (c < 0x10000) {
if (remaining < 3) {
write((byte) (0xe0 + (c >> 12)));
write((byte) (0x80 + ((c >> 6) & 0x3f)));
write((byte) (0x80 + (c & 0x3f)));

buf = getCurrentByteBuffer();
currBufferPos = buf.position();
limit = buf.limit();
} else {
buf.put((byte) (0xe0 + (c >> 12)));
buf.put((byte) (0x80 + ((c >> 6) & 0x3f)));
buf.put((byte) (0x80 + (c & 0x3f)));
currBufferPos += 3;
position += 3;
}
} else {
if (remaining < 4) {
write((byte) (0xf0 + (c >> 18)));
write((byte) (0x80 + ((c >> 12) & 0x3f)));
write((byte) (0x80 + ((c >> 6) & 0x3f)));
write((byte) (0x80 + (c & 0x3f)));

buf = getCurrentByteBuffer();
currBufferPos = buf.position();
limit = buf.limit();
} else {
buf.put((byte) (0xf0 + (c >> 18)));
buf.put((byte) (0x80 + ((c >> 12) & 0x3f)));
buf.put((byte) (0x80 + ((c >> 6) & 0x3f)));
buf.put((byte) (0x80 + (c & 0x3f)));
currBufferPos += 4;
position += 4;
}
}
}
sp += Character.charCount(c);
}

getCurrentByteBuffer().put((byte) 0);
position++;
return position - prevPos;
}

private static int writeOnArrayAscii(final String str,
final byte[] dst,
final int arrayPosition,
final boolean checkNullTermination) {
int pos = arrayPosition;
int sp = 0;
for (; sp < str.length(); sp++, pos++) {
char c = str.charAt(sp);
if (checkNullTermination && c == 0) {
throw new BsonSerializationException(
format("BSON cstring '%s' is not valid because it contains a null character " + "at index %d", str, sp));
}
if (c >= 0x80) {
break;
}
dst[pos] = (byte) c;
}
return sp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ public byte[] array() {
throw new UnsupportedOperationException("Not implemented yet!");
}

@Override
public boolean hasArray() {
return false;
}

@Override
public int arrayOffset() {
throw new UnsupportedOperationException("Not implemented yet!");
}

@Override
public ByteBuf limit(final int newLimit) {
if (newLimit < 0 || newLimit > capacity()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ public byte[] array() {
return proxied.array();
}

@Override
public boolean hasArray() {
return proxied.hasArray();
}

@Override
public int arrayOffset() {
return proxied.arrayOffset();
}

@Override
public int limit() {
if (isWriting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,7 @@ class ByteBufSpecification extends Specification {
@Override
ByteBuf getBuffer(final int size) {
io.netty.buffer.ByteBuf buffer = allocator.directBuffer(size, size)
try {
new NettyByteBuf(buffer.retain())
} finally {
buffer.release();
}
new NettyByteBuf(buffer)
}
}
}
Loading