Skip to content

Commit def5d63

Browse files
committed
Pre-encode static messages
[#138]
1 parent 0c6408d commit def5d63

File tree

11 files changed

+197
-70
lines changed

11 files changed

+197
-70
lines changed

src/main/java/io/r2dbc/postgresql/message/frontend/Bind.java

+35-24
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
/**
4040
* The Bind message.
4141
*/
42-
public final class Bind implements FrontendMessage {
42+
public final class Bind implements FrontendMessage, FrontendMessage.DirectEncoder {
4343

4444
/**
4545
* A marker indicating a {@code NULL} value.
@@ -95,30 +95,41 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
9595
return Mono.fromSupplier(() -> {
9696
ByteBuf out = byteBufAllocator.ioBuffer();
9797

98-
writeByte(out, 'B');
99-
writeLengthPlaceholder(out);
100-
writeCStringUTF8(out, this.name);
101-
writeCStringUTF8(out, this.source);
102-
103-
writeShort(out, this.parameterFormats.size());
104-
this.parameterFormats.forEach(format -> writeShort(out, format.getDiscriminator()));
105-
106-
writeShort(out, this.parameters.size());
107-
this.parameters.forEach(parameters -> {
108-
if (parameters == NULL_VALUE) {
109-
writeInt(out, NULL);
110-
} else {
111-
writeInt(out, parameters.readableBytes());
112-
writeBytes(out, parameters);
113-
parameters.release();
114-
}
115-
});
116-
117-
writeShort(out, this.resultFormats.size());
118-
this.resultFormats.forEach(format -> writeShort(out, format.getDiscriminator()));
119-
120-
return writeSize(out);
98+
encode(out);
99+
100+
return out;
101+
});
102+
}
103+
104+
@Override
105+
public void encode(ByteBuf byteBuf) {
106+
107+
writeByte(byteBuf, 'B');
108+
109+
int writerIndex = byteBuf.writerIndex();
110+
111+
writeLengthPlaceholder(byteBuf);
112+
writeCStringUTF8(byteBuf, this.name);
113+
writeCStringUTF8(byteBuf, this.source);
114+
115+
writeShort(byteBuf, this.parameterFormats.size());
116+
this.parameterFormats.forEach(format -> writeShort(byteBuf, format.getDiscriminator()));
117+
118+
writeShort(byteBuf, this.parameters.size());
119+
this.parameters.forEach(parameters -> {
120+
if (parameters == NULL_VALUE) {
121+
writeInt(byteBuf, NULL);
122+
} else {
123+
writeInt(byteBuf, parameters.readableBytes());
124+
writeBytes(byteBuf, parameters);
125+
parameters.release();
126+
}
121127
});
128+
129+
writeShort(byteBuf, this.resultFormats.size());
130+
this.resultFormats.forEach(format -> writeShort(byteBuf, format.getDiscriminator()));
131+
132+
writeSize(byteBuf, writerIndex);
122133
}
123134

124135
@Override

src/main/java/io/r2dbc/postgresql/message/frontend/Close.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
/**
3333
* The Close message.
3434
*/
35-
public final class Close implements FrontendMessage {
35+
public final class Close implements FrontendMessage, FrontendMessage.DirectEncoder {
3636

3737
/**
3838
* The unnamed statement or portal.
@@ -63,15 +63,25 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
6363
return Mono.fromSupplier(() -> {
6464
ByteBuf out = byteBufAllocator.ioBuffer();
6565

66-
writeByte(out, 'C');
67-
writeLengthPlaceholder(out);
68-
writeByte(out, this.type.getDiscriminator());
69-
writeCStringUTF8(out, this.name);
66+
encode(out);
7067

71-
return writeSize(out);
68+
return out;
7269
});
7370
}
7471

72+
@Override
73+
public void encode(ByteBuf byteBuf) {
74+
75+
writeByte(byteBuf, 'C');
76+
77+
int writerIndex = byteBuf.writerIndex();
78+
79+
writeLengthPlaceholder(byteBuf);
80+
writeByte(byteBuf, this.type.getDiscriminator());
81+
writeCStringUTF8(byteBuf, this.name);
82+
writeSize(byteBuf, writerIndex);
83+
}
84+
7585
@Override
7686
public boolean equals(Object o) {
7787
if (this == o) {

src/main/java/io/r2dbc/postgresql/message/frontend/CopyDone.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,29 @@
2323
import reactor.core.publisher.Mono;
2424

2525
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.MESSAGE_OVERHEAD;
26+
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeArray;
2627
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte;
2728
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeLengthPlaceholder;
2829
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeSize;
2930

3031
/**
3132
* The CopyDone message.
3233
*/
33-
public final class CopyDone implements FrontendMessage {
34+
public final class CopyDone implements FrontendMessage, FrontendMessage.DirectEncoder {
3435

3536
/**
3637
* A static singleton instance that should always be used.
3738
*/
3839
public static final CopyDone INSTANCE = new CopyDone();
3940

41+
private final byte[] message = writeArray(buffer -> {
42+
43+
writeByte(buffer, 'c');
44+
writeLengthPlaceholder(buffer);
45+
46+
return writeSize(buffer);
47+
});
48+
4049
private CopyDone() {
4150
}
4251

@@ -47,13 +56,16 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
4756
return Mono.fromSupplier(() -> {
4857
ByteBuf out = byteBufAllocator.ioBuffer(MESSAGE_OVERHEAD);
4958

50-
writeByte(out, 'c');
51-
writeLengthPlaceholder(out);
52-
53-
return writeSize(out);
59+
encode(out);
60+
return out;
5461
});
5562
}
5663

64+
@Override
65+
public void encode(ByteBuf byteBuf) {
66+
byteBuf.writeBytes(this.message);
67+
}
68+
5769
@Override
5870
public String toString() {
5971
return "CopyDone{}";

src/main/java/io/r2dbc/postgresql/message/frontend/Describe.java

+16-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
/**
3333
* The Describe message.
3434
*/
35-
public final class Describe implements FrontendMessage {
35+
public final class Describe implements FrontendMessage, FrontendMessage.DirectEncoder {
3636

3737
/**
3838
* The unnamed statement or portal.
@@ -63,15 +63,24 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
6363
return Mono.fromSupplier(() -> {
6464
ByteBuf out = byteBufAllocator.ioBuffer();
6565

66-
writeByte(out, 'D');
67-
writeLengthPlaceholder(out);
68-
writeByte(out, this.type.getDiscriminator());
69-
writeCStringUTF8(out, this.name);
70-
71-
return writeSize(out);
66+
encode(out);
67+
return out;
7268
});
7369
}
7470

71+
@Override
72+
public void encode(ByteBuf byteBuf) {
73+
74+
writeByte(byteBuf, 'D');
75+
76+
int writerIndex = byteBuf.writerIndex();
77+
78+
writeLengthPlaceholder(byteBuf);
79+
writeByte(byteBuf, this.type.getDiscriminator());
80+
writeCStringUTF8(byteBuf, this.name);
81+
writeSize(byteBuf, writerIndex);
82+
}
83+
7584
@Override
7685
public boolean equals(Object o) {
7786
if (this == o) {

src/main/java/io/r2dbc/postgresql/message/frontend/Execute.java

+15-7
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
/**
3434
* The Execute message.
3535
*/
36-
public final class Execute implements FrontendMessage {
36+
public final class Execute implements FrontendMessage, FrontendMessage.DirectEncoder {
3737

3838
/**
3939
* No limit on returned rows.
@@ -69,14 +69,22 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
6969

7070
return Mono.fromSupplier(() -> {
7171
ByteBuf out = byteBufAllocator.ioBuffer();
72+
encode(out);
73+
return out;
74+
});
75+
}
76+
77+
@Override
78+
public void encode(ByteBuf byteBuf) {
7279

73-
writeByte(out, 'E');
74-
writeLengthPlaceholder(out);
75-
writeCStringUTF8(out, this.name);
76-
writeInt(out, this.rows);
80+
writeByte(byteBuf, 'E');
7781

78-
return writeSize(out);
79-
});
82+
int writerIndex = byteBuf.writerIndex();
83+
84+
writeLengthPlaceholder(byteBuf);
85+
writeCStringUTF8(byteBuf, this.name);
86+
writeInt(byteBuf, this.rows);
87+
writeSize(byteBuf, writerIndex);
8088
}
8189

8290
@Override

src/main/java/io/r2dbc/postgresql/message/frontend/Flush.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,29 @@
2323
import reactor.core.publisher.Mono;
2424

2525
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.MESSAGE_OVERHEAD;
26+
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeArray;
2627
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte;
2728
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeLengthPlaceholder;
2829
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeSize;
2930

3031
/**
3132
* The Flush message.
3233
*/
33-
public final class Flush implements FrontendMessage {
34+
public final class Flush implements FrontendMessage, FrontendMessage.DirectEncoder {
3435

3536
/**
3637
* A static singleton instance that should always be used.
3738
*/
3839
public static final Flush INSTANCE = new Flush();
3940

41+
private final byte[] message = writeArray(buffer -> {
42+
43+
writeByte(buffer, 'H');
44+
writeLengthPlaceholder(buffer);
45+
46+
return writeSize(buffer);
47+
});
48+
4049
private Flush() {
4150
}
4251

@@ -47,13 +56,17 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
4756
return Mono.fromSupplier(() -> {
4857
ByteBuf out = byteBufAllocator.ioBuffer(MESSAGE_OVERHEAD);
4958

50-
writeByte(out, 'H');
51-
writeLengthPlaceholder(out);
59+
encode(out);
5260

53-
return writeSize(out);
61+
return out;
5462
});
5563
}
5664

65+
@Override
66+
public void encode(ByteBuf byteBuf) {
67+
byteBuf.writeBytes(this.message);
68+
}
69+
5770
@Override
5871
public String toString() {
5972
return "Flush{}";

src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessage.java

+14
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,18 @@ public interface FrontendMessage {
3434
*/
3535
Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator);
3636

37+
38+
/**
39+
* Interface for messages that can be directly encoded without producing a {@link Publisher} first.
40+
*/
41+
interface DirectEncoder {
42+
43+
/**
44+
* Encode a message directly by writing its content to a {@link ByteBuf}.
45+
*
46+
* @param byteBuf the target {@link ByteBuf} to write into
47+
*/
48+
void encode(ByteBuf byteBuf);
49+
}
50+
3751
}

src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessageUtils.java

+14
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
package io.r2dbc.postgresql.message.frontend;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.UnpooledByteBufAllocator;
2021
import io.r2dbc.postgresql.util.Assert;
2122

2223
import java.nio.ByteBuffer;
24+
import java.util.function.Function;
2325

2426
import static io.netty.util.CharsetUtil.UTF_8;
2527

@@ -118,4 +120,16 @@ static ByteBuf writeSize(ByteBuf out, int startIndex) {
118120
return out;
119121
}
120122

123+
static byte[] writeArray(Function<ByteBuf, ByteBuf> writeFunction) {
124+
125+
ByteBuf buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
126+
ByteBuf result = writeFunction.apply(buffer);
127+
128+
byte[] bytes = new byte[result.readableBytes()];
129+
result.readBytes(bytes);
130+
result.release();
131+
132+
return bytes;
133+
}
134+
121135
}

0 commit comments

Comments
 (0)