Skip to content

Eliminate temporary buffer allocations during bson serialization #1628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 8, 2025
2 changes: 1 addition & 1 deletion bson/src/main/org/bson/BsonBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void doWriteNull() {
public void doWriteObjectId(final ObjectId value) {
bsonOutput.writeByte(BsonType.OBJECT_ID.getValue());
writeCurrentName();
bsonOutput.writeBytes(value.toByteArray());
bsonOutput.writeObjectId(value);
}

@Override
Expand Down
108 changes: 79 additions & 29 deletions bson/src/main/org/bson/io/BasicOutputBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

import org.bson.ByteBuf;
import org.bson.ByteBufNIO;
import org.bson.types.ObjectId;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static java.lang.String.format;
Expand All @@ -32,8 +35,12 @@
* A BSON output stream that stores the output in a single, un-pooled byte array.
*/
public class BasicOutputBuffer extends OutputBuffer {
private byte[] buffer;
private int position;

/**
* This ByteBuffer allows us to write ObjectIDs without allocating a temporary array per object, and enables us
* to leverage JVM intrinsics for writing little-endian numeric values.
*/
private ByteBuffer buffer;

/**
* Construct an instance with a default initial byte array size.
Expand All @@ -48,7 +55,8 @@ public BasicOutputBuffer() {
* @param initialSize the initial size of the byte array
*/
public BasicOutputBuffer(final int initialSize) {
buffer = new byte[initialSize];
// Allocate heap buffer to ensure we can access underlying array
buffer = ByteBuffer.allocate(initialSize).order(LITTLE_ENDIAN);
}

/**
Expand All @@ -58,50 +66,76 @@ public BasicOutputBuffer(final int initialSize) {
* @since 3.3
*/
public byte[] getInternalBuffer() {
return buffer;
return buffer.array();
}

@Override
public void write(final byte[] b) {
writeBytes(b, 0, b.length);
}

@Override
public byte[] toByteArray() {
ensureOpen();
return Arrays.copyOf(buffer.array(), buffer.position());
}

@Override
public void writeInt32(final int value) {
ensureOpen();
ensure(4);
buffer.putInt(value);
}

@Override
public void writeInt32(final int position, final int value) {
ensureOpen();
checkPosition(position, 4);
buffer.putInt(position, value);
}

@Override
public void writeInt64(final long value) {
ensureOpen();
ensure(8);
buffer.putLong(value);
}

@Override
public void writeObjectId(final ObjectId value) {
ensureOpen();
write(b, 0, b.length);
ensure(12);
value.putToByteBuffer(buffer);
}

@Override
public void writeBytes(final byte[] bytes, final int offset, final int length) {
ensureOpen();

ensure(length);
System.arraycopy(bytes, offset, buffer, position, length);
position += length;
buffer.put(bytes, offset, length);
}

@Override
public void writeByte(final int value) {
ensureOpen();

ensure(1);
buffer[position++] = (byte) (0xFF & value);
buffer.put((byte) (0xFF & value));
}

@Override
protected void write(final int absolutePosition, final int value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the writeInt32(final int value) method has been updated to use ByteBuffer's bulk operation putInt(...), this method is no longer covered by tests. Previously, it was indirectly tested through writeInt32. Could you add a dedicated test case for this?

Copy link
Contributor Author

@Edarke Edarke Apr 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, updated in 9a187bd

ensureOpen();
checkPosition(absolutePosition, 1);

if (absolutePosition < 0) {
throw new IllegalArgumentException(format("position must be >= 0 but was %d", absolutePosition));
}
if (absolutePosition > position - 1) {
throw new IllegalArgumentException(format("position must be <= %d but was %d", position - 1, absolutePosition));
}

buffer[absolutePosition] = (byte) (0xFF & value);
buffer.put(absolutePosition, (byte) (0xFF & value));
}

@Override
public int getPosition() {
ensureOpen();
return position;
return buffer.position();
}

/**
Expand All @@ -110,29 +144,32 @@ public int getPosition() {
@Override
public int getSize() {
ensureOpen();
return position;
return buffer.position();
}

@Override
public int pipe(final OutputStream out) throws IOException {
ensureOpen();
out.write(buffer, 0, position);
return position;
out.write(buffer.array(), 0, buffer.position());
return buffer.position();
}

@Override
public void truncateToPosition(final int newPosition) {
ensureOpen();
if (newPosition > position || newPosition < 0) {
if (newPosition > buffer.position() || newPosition < 0) {
throw new IllegalArgumentException();
}
position = newPosition;
// The cast is required for compatibility with JDK 9+ where ByteBuffer's position method is inherited from Buffer.
((Buffer) buffer).position(newPosition);
}

@Override
public List<ByteBuf> getByteBuffers() {
ensureOpen();
return Arrays.asList(new ByteBufNIO(ByteBuffer.wrap(buffer, 0, position).duplicate().order(LITTLE_ENDIAN)));
// Create a flipped copy of the buffer for reading. Note that ByteBufNIO overwrites the endian-ness.
ByteBuffer flipped = ByteBuffer.wrap(buffer.array(), 0, buffer.position());
return Collections.singletonList(new ByteBufNIO(flipped));
}

@Override
Expand All @@ -147,19 +184,32 @@ private void ensureOpen() {
}

private void ensure(final int more) {
int need = position + more;
if (need <= buffer.length) {
int length = buffer.position();
int need = length + more;
if (need <= buffer.capacity()) {
return;
}

int newSize = buffer.length * 2;
int newSize = length * 2;
if (newSize < need) {
newSize = need + 128;
}

byte[] n = new byte[newSize];
System.arraycopy(buffer, 0, n, 0, position);
buffer = n;
ByteBuffer tmp = ByteBuffer.allocate(newSize).order(LITTLE_ENDIAN);
tmp.put(buffer.array(), 0, length); // Avoids covariant call to flip on jdk8
this.buffer = tmp;
}

/**
* Ensures that `absolutePosition` is a valid index in `this.buffer` and there is room to write at
* least `bytesToWrite` bytes.
*/
private void checkPosition(final int absolutePosition, final int bytesToWrite) {
if (absolutePosition < 0) {
throw new IllegalArgumentException(format("position must be >= 0 but was %d", absolutePosition));
}
if (absolutePosition > buffer.position() - bytesToWrite) {
throw new IllegalArgumentException(format("position must be <= %d but was %d", buffer.position() - bytesToWrite, absolutePosition));
}
}
}
Loading