Skip to content

Commit cb6c2f4

Browse files
committed
Use CompositeFrontendMessage to group multiple messages in a single TCP packet
Extended flow parse+flush,execute+flush, execute+close+sync, and bind+describe messages are now sent as single TCP packet to improve TCP efficiency. [#138]
1 parent 9c958e3 commit cb6c2f4

9 files changed

+184
-80
lines changed

src/main/java/io/r2dbc/postgresql/client/ExtendedQueryMessageFlow.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
3030
import io.r2dbc.postgresql.message.frontend.Bind;
3131
import io.r2dbc.postgresql.message.frontend.Close;
32+
import io.r2dbc.postgresql.message.frontend.CompositeFrontendMessage;
3233
import io.r2dbc.postgresql.message.frontend.Describe;
3334
import io.r2dbc.postgresql.message.frontend.Execute;
3435
import io.r2dbc.postgresql.message.frontend.ExecutionType;
@@ -111,7 +112,7 @@ public static Flux<BackendMessage> execute(Binding binding, Client client, Porta
111112
* @return the resulting message stream
112113
*/
113114
private static Flux<BackendMessage> fetchAll(Flux<FrontendMessage> bindFlow, Client client, String portal) {
114-
return client.exchange(bindFlow.concatWithValues(new Execute(portal, NO_LIMIT), new Close(portal, PORTAL), Sync.INSTANCE))
115+
return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal, NO_LIMIT), new Close(portal, PORTAL), Sync.INSTANCE)))
115116
.as(Operators::discardOnCancel);
116117
}
117118

@@ -130,7 +131,7 @@ private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow
130131
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
131132
AtomicBoolean isCanceled = new AtomicBoolean(false);
132133

133-
return client.exchange(bindFlow.concatWithValues(new Execute(portal, fetchSize), Flush.INSTANCE).concatWith(requestsProcessor))
134+
return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal, fetchSize), Flush.INSTANCE)).concatWith(requestsProcessor))
134135
.handle((BackendMessage message, SynchronousSink<BackendMessage> sink) -> {
135136
if (message instanceof CommandComplete) {
136137
requestsSink.next(new Close(portal, PORTAL));
@@ -177,7 +178,7 @@ public static Flux<BackendMessage> parse(Client client, String name, String quer
177178
ParseComplete will be received if parse was successful
178179
ReadyForQuery will be received as a response to Sync, which was send in case of error in parsing
179180
*/
180-
return client.exchange(PARSE_TAKE_UNTIL, Flux.just(new Parse(name, types, query), Flush.INSTANCE))
181+
return client.exchange(PARSE_TAKE_UNTIL, Flux.just(new CompositeFrontendMessage(new Parse(name, types, query), Flush.INSTANCE)))
181182
.doOnNext(message -> {
182183
if (message instanceof ErrorResponse) {
183184
/*
@@ -202,7 +203,7 @@ public static Flux<BackendMessage> closeStatement(Client client, String name) {
202203
Assert.requireNonNull(client, "client must not be null");
203204
Assert.requireNonNull(name, "name must not be null");
204205

205-
return client.exchange(Flux.just(new Close(name, ExecutionType.STATEMENT), Sync.INSTANCE))
206+
return client.exchange(Flux.just(new CompositeFrontendMessage(new Close(name, ExecutionType.STATEMENT), Sync.INSTANCE)))
206207
.takeUntil(CloseComplete.class::isInstance);
207208
}
208209

@@ -221,7 +222,7 @@ private static Flux<FrontendMessage> toBindFlow(ConnectionContext connectionCont
221222
.flatMapMany(values -> {
222223
Bind bind = new Bind(portal, binding.getParameterFormats(), values, resultFormat(forceBinary), statementName);
223224

224-
return Flux.<FrontendMessage>just(bind, new Describe(portal, PORTAL));
225+
return Flux.<FrontendMessage>just(new CompositeFrontendMessage(bind, new Describe(portal, PORTAL)));
225226
}).doOnSubscribe(ignore -> QueryLogger.logQuery(connectionContext, query));
226227
}
227228

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.message.frontend;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.ByteBufAllocator;
21+
import org.reactivestreams.Publisher;
22+
import reactor.core.publisher.Mono;
23+
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import java.util.Objects;
27+
28+
/**
29+
* {@link FrontendMessage} that aggregates {@link FrontendMessage}, specifically {@link DirectEncoder} messages to send multiple {@link FrontendMessage} in a single TCP packet.
30+
*
31+
* @since 0.8.6
32+
*/
33+
public final class CompositeFrontendMessage implements FrontendMessage, FrontendMessage.DirectEncoder {
34+
35+
private final List<DirectEncoder> messages;
36+
37+
public CompositeFrontendMessage(DirectEncoder... messages) {
38+
this(Arrays.asList(messages));
39+
}
40+
41+
public CompositeFrontendMessage(List<DirectEncoder> messages) {
42+
this.messages = messages;
43+
}
44+
45+
public boolean contains(FrontendMessage message) {
46+
return this.messages.contains(message);
47+
}
48+
49+
@Override
50+
public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
51+
return Mono.fromSupplier(() -> {
52+
53+
ByteBuf buffer = byteBufAllocator.buffer();
54+
encode(buffer);
55+
56+
return buffer;
57+
});
58+
}
59+
60+
@Override
61+
public void encode(ByteBuf byteBuf) {
62+
63+
for (DirectEncoder message : this.messages) {
64+
message.encode(byteBuf);
65+
}
66+
}
67+
68+
@Override
69+
public boolean equals(Object o) {
70+
if (this == o) {
71+
return true;
72+
}
73+
if (!(o instanceof CompositeFrontendMessage)) {
74+
return false;
75+
}
76+
CompositeFrontendMessage that = (CompositeFrontendMessage) o;
77+
return Objects.equals(this.messages, that.messages);
78+
}
79+
80+
@Override
81+
public int hashCode() {
82+
return Objects.hash(this.messages);
83+
}
84+
85+
@Override
86+
public String toString() {
87+
return this.messages.toString();
88+
}
89+
90+
}

src/main/java/io/r2dbc/postgresql/message/frontend/Parse.java

+18-11
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
/**
3636
* The Parse message.
3737
*/
38-
public final class Parse implements FrontendMessage {
38+
public final class Parse implements FrontendMessage, FrontendMessage.DirectEncoder {
3939

4040
/**
4141
* The unnamed statement.
@@ -76,18 +76,25 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
7676
return Mono.fromSupplier(() -> {
7777
ByteBuf out = byteBufAllocator.ioBuffer();
7878

79-
writeByte(out, 'P');
80-
writeLengthPlaceholder(out);
81-
writeCStringUTF8(out, this.name);
82-
writeCStringUTF8(out, this.query);
79+
encode(out);
80+
return out;
81+
});
82+
}
8383

84-
writeShort(out, this.parameters.length);
85-
for (int parameter : this.parameters) {
86-
writeInt(out, parameter);
87-
}
84+
@Override
85+
public void encode(ByteBuf byteBuf) {
8886

89-
return writeSize(out);
90-
});
87+
writeByte(byteBuf, 'P');
88+
writeLengthPlaceholder(byteBuf);
89+
writeCStringUTF8(byteBuf, this.name);
90+
writeCStringUTF8(byteBuf, this.query);
91+
92+
writeShort(byteBuf, this.parameters.length);
93+
for (int parameter : this.parameters) {
94+
writeInt(byteBuf, parameter);
95+
}
96+
97+
writeSize(byteBuf);
9198
}
9299

93100
@Override

src/test/java/io/r2dbc/postgresql/BoundedStatementCacheUnitTests.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2525
import io.r2dbc.postgresql.message.backend.ParseComplete;
2626
import io.r2dbc.postgresql.message.frontend.Close;
27+
import io.r2dbc.postgresql.message.frontend.CompositeFrontendMessage;
2728
import io.r2dbc.postgresql.message.frontend.ExecutionType;
2829
import io.r2dbc.postgresql.message.frontend.Flush;
2930
import io.r2dbc.postgresql.message.frontend.Parse;
@@ -64,17 +65,17 @@ void constructorNoClient() {
6465
void getName() {
6566
// @formatter:off
6667
Client client = TestClient.builder()
67-
.expectRequest(new Parse("S_0", new int[]{100}, "test-query-0"), Flush.INSTANCE)
68+
.expectRequest(new CompositeFrontendMessage(new Parse("S_0", new int[]{100}, "test-query-0"), Flush.INSTANCE))
6869
.thenRespond(ParseComplete.INSTANCE)
69-
.expectRequest(new Parse("S_1", new int[]{200}, "test-query-1"), Flush.INSTANCE)
70+
.expectRequest(new CompositeFrontendMessage(new Parse("S_1", new int[]{200}, "test-query-1"), Flush.INSTANCE))
7071
.thenRespond(ParseComplete.INSTANCE)
71-
.expectRequest(new Close("S_0", ExecutionType.STATEMENT), Sync.INSTANCE)
72+
.expectRequest(new CompositeFrontendMessage(new Close("S_0", ExecutionType.STATEMENT), Sync.INSTANCE))
7273
.thenRespond(CloseComplete.INSTANCE)
73-
.expectRequest(new Parse("S_2", new int[]{200}, "test-query-2"), Flush.INSTANCE)
74+
.expectRequest(new CompositeFrontendMessage(new Parse("S_2", new int[]{200}, "test-query-2"), Flush.INSTANCE))
7475
.thenRespond(ParseComplete.INSTANCE)
75-
.expectRequest(new Close("S_2", ExecutionType.STATEMENT), Sync.INSTANCE)
76+
.expectRequest(new CompositeFrontendMessage(new Close("S_2", ExecutionType.STATEMENT), Sync.INSTANCE))
7677
.thenRespond(CloseComplete.INSTANCE)
77-
.expectRequest(new Parse("S_3", new int[]{100}, "test-query-0"), Flush.INSTANCE)
78+
.expectRequest(new CompositeFrontendMessage(new Parse("S_3", new int[]{100}, "test-query-0"), Flush.INSTANCE))
7879
.thenRespond(ParseComplete.INSTANCE)
7980
.build();
8081
// @formatter:on
@@ -123,7 +124,7 @@ void getName() {
123124
void getNameErrorResponse() {
124125
// @formatter:off
125126
Client client = TestClient.builder()
126-
.expectRequest(new Parse("S_0", new int[]{100}, "test-query"), Flush.INSTANCE)
127+
.expectRequest(new CompositeFrontendMessage(new Parse("S_0", new int[]{100}, "test-query"), Flush.INSTANCE))
127128
.thenRespond(new ErrorResponse(Collections.emptyList()))
128129
.build();
129130
// @formatter:on

src/test/java/io/r2dbc/postgresql/DisabledStatementCacheUnitTests.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.r2dbc.postgresql.client.TestClient;
2323
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2424
import io.r2dbc.postgresql.message.backend.ParseComplete;
25+
import io.r2dbc.postgresql.message.frontend.CompositeFrontendMessage;
2526
import io.r2dbc.postgresql.message.frontend.Flush;
2627
import io.r2dbc.postgresql.message.frontend.Parse;
2728
import io.r2dbc.spi.R2dbcNonTransientResourceException;
@@ -51,13 +52,13 @@ void constructorNoClient() {
5152
void getName() {
5253
// @formatter:off
5354
Client client = TestClient.builder()
54-
.expectRequest(new Parse("", new int[]{100}, "test-query"), Flush.INSTANCE)
55+
.expectRequest(new CompositeFrontendMessage(new Parse("", new int[]{100}, "test-query"), Flush.INSTANCE))
5556
.thenRespond(ParseComplete.INSTANCE)
56-
.expectRequest(new Parse("", new int[]{100}, "test-query"), Flush.INSTANCE)
57+
.expectRequest(new CompositeFrontendMessage(new Parse("", new int[]{100}, "test-query"), Flush.INSTANCE))
5758
.thenRespond(ParseComplete.INSTANCE)
58-
.expectRequest(new Parse("", new int[]{200}, "test-query"), Flush.INSTANCE)
59+
.expectRequest(new CompositeFrontendMessage(new Parse("", new int[]{200}, "test-query"), Flush.INSTANCE))
5960
.thenRespond(ParseComplete.INSTANCE)
60-
.expectRequest(new Parse("", new int[]{200}, "test-query-2"), Flush.INSTANCE)
61+
.expectRequest(new CompositeFrontendMessage(new Parse("", new int[]{200}, "test-query-2"), Flush.INSTANCE))
6162
.thenRespond(ParseComplete.INSTANCE)
6263
.build();
6364
// @formatter:on
@@ -89,7 +90,7 @@ void getName() {
8990
void getNameErrorResponse() {
9091
// @formatter:off
9192
Client client = TestClient.builder()
92-
.expectRequest(new Parse("", new int[]{100}, "test-query"), Flush.INSTANCE)
93+
.expectRequest(new CompositeFrontendMessage(new Parse("", new int[]{100}, "test-query"), Flush.INSTANCE))
9394
.thenRespond(new ErrorResponse(Collections.emptyList()))
9495
.build();
9596
// @formatter:on

0 commit comments

Comments
 (0)