Skip to content

Commit 81b5a20

Browse files
committed
Combine extended flow message into composite message objects to improve flushing
[resolves #138]
1 parent c6e1d56 commit 81b5a20

File tree

10 files changed

+193
-83
lines changed

10 files changed

+193
-83
lines changed

src/jmh/java/io/r2dbc/postgresql/StatementBenchmarks.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
@BenchmarkMode(Mode.Throughput)
4141
@OutputTimeUnit(TimeUnit.SECONDS)
4242
@Testable
43-
public class StatementBenchmarks {
43+
public class StatementBenchmarks extends BenchmarkSettings {
4444

4545
private static PostgresqlServerExtension extension = new PostgresqlServerExtension();
4646

src/main/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatement.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ final class ExtendedQueryPostgresqlStatement implements PostgresqlStatement {
6969
this.sql = Assert.requireNonNull(sql, "sql must not be null");
7070
this.statementCache = Assert.requireNonNull(statementCache, "statementCache must not be null");
7171
this.forceBinary = forceBinary;
72-
7372
this.bindings = new Bindings(expectedSize(sql));
7473
}
7574

@@ -183,7 +182,7 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
183182
ExceptionFactory factory = ExceptionFactory.withSql(sql);
184183
return this.statementCache.getName(this.bindings.first(), sql)
185184
.flatMapMany(name -> ExtendedQueryMessageFlow
186-
.execute(Flux.fromIterable(this.bindings.bindings), this.client, this.portalNameSupplier, name, sql, this.forceBinary))
185+
.execute(this.bindings.bindings, this.client, this.portalNameSupplier, name, sql, this.forceBinary))
187186
.filter(RESULT_FRAME_FILTER)
188187
.windowUntil(CloseComplete.class::isInstance)
189188
.map(messages -> PostgresqlResult.toResult(this.codecs, messages, factory));

src/main/java/io/r2dbc/postgresql/client/ExtendedQueryMessageFlow.java

+134-11
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.r2dbc.postgresql.client;
1818

19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.ByteBufAllocator;
1921
import io.netty.buffer.Unpooled;
2022
import io.r2dbc.postgresql.message.Format;
2123
import io.r2dbc.postgresql.message.backend.BackendMessage;
@@ -35,7 +37,7 @@
3537

3638
import java.util.Collection;
3739
import java.util.Collections;
38-
import java.util.List;
40+
import java.util.Objects;
3941
import java.util.function.Predicate;
4042
import java.util.regex.Pattern;
4143

@@ -71,15 +73,22 @@ private ExtendedQueryMessageFlow() {
7173
* @return the messages received in response to the exchange
7274
* @throws IllegalArgumentException if {@code bindings}, {@code client}, {@code portalNameSupplier}, or {@code statementName} is {@code null}
7375
*/
74-
public static Flux<BackendMessage> execute(Publisher<Binding> bindings, Client client, PortalNameSupplier portalNameSupplier, String statementName, String query, boolean forceBinary) {
76+
public static Flux<BackendMessage> execute(Collection<Binding> bindings, Client client, PortalNameSupplier portalNameSupplier, String statementName, String query, boolean forceBinary) {
7577
Assert.requireNonNull(bindings, "bindings must not be null");
7678
Assert.requireNonNull(client, "client must not be null");
7779
Assert.requireNonNull(portalNameSupplier, "portalNameSupplier must not be null");
7880
Assert.requireNonNull(statementName, "statementName must not be null");
7981

80-
return client.exchange(Flux.from(bindings)
81-
.flatMap(binding -> toBindFlow(binding, portalNameSupplier, statementName, query, forceBinary))
82-
.concatWith(Mono.just(Sync.INSTANCE)));
82+
if (bindings.size() == 1) {
83+
84+
Binding binding = bindings.iterator().next();
85+
return client.exchange(toBindFlow(binding, portalNameSupplier, statementName, query, forceBinary, true));
86+
} else {
87+
88+
return client.exchange(Flux.fromIterable(bindings)
89+
.flatMap(binding -> toBindFlow(binding, portalNameSupplier, statementName, query, forceBinary, false))
90+
.concatWith(Mono.just(Sync.INSTANCE)));
91+
}
8392
}
8493

8594
/**
@@ -92,7 +101,7 @@ public static Flux<BackendMessage> execute(Publisher<Binding> bindings, Client c
92101
* @return the messages received in response to this exchange
93102
* @throws IllegalArgumentException if {@code client}, {@code name}, {@code query}, or {@code types} is {@code null}
94103
*/
95-
public static Flux<BackendMessage> parse(Client client, String name, String query, List<Integer> types) {
104+
public static Flux<BackendMessage> parse(Client client, String name, String query, int[] types) {
96105
Assert.requireNonNull(client, "client must not be null");
97106
Assert.requireNonNull(name, "name must not be null");
98107
Assert.requireNonNull(query, "query must not be null");
@@ -110,24 +119,138 @@ private static Collection<Format> resultFormat(boolean forceBinary) {
110119
}
111120
}
112121

113-
private static Flux<FrontendMessage> toBindFlow(Binding binding, PortalNameSupplier portalNameSupplier, String statementName, String query, boolean forceBinary) {
122+
private static Publisher<FrontendMessage> toBindFlow(Binding binding, PortalNameSupplier portalNameSupplier, String statementName, String query, boolean forceBinary, boolean inlineSync) {
114123
String portal = portalNameSupplier.get();
115-
116-
return Flux.fromIterable(binding.getParameterValues())
124+
return binding.parameterValues()
117125
.flatMap(f -> {
118126
if (f == Parameter.NULL_VALUE) {
119127
return Flux.just(Bind.NULL_VALUE);
128+
} else if (f instanceof Mono) {
129+
return f;
120130
} else {
121131
return Flux.from(f)
122132
.reduce(Unpooled.compositeBuffer(), (c, b) -> c.addComponent(true, b));
123133
}
124134
})
125135
.collectList()
126-
.flatMapMany(values -> {
136+
.map(values -> {
127137
Bind bind = new Bind(portal, binding.getParameterFormats(), values, resultFormat(forceBinary), statementName);
128138

129-
return Flux.just(bind, new Describe(portal, PORTAL), new Execute(portal, NO_LIMIT), new Close(portal, PORTAL));
139+
if (inlineSync) {
140+
return new BindDescribeExecuteSyncClose(bind, portal);
141+
}
142+
143+
return new BindDescribeExecuteClose(bind, portal);
130144
}).doOnSubscribe(ignore -> QueryLogger.logQuery(query));
131145
}
132146

147+
public static class BindDescribeExecuteSyncClose implements FrontendMessage {
148+
149+
private final Bind bind;
150+
151+
private final String portal;
152+
153+
public BindDescribeExecuteSyncClose(Bind bind, String portal) {
154+
155+
this.bind = bind;
156+
this.portal = portal;
157+
}
158+
159+
@Override
160+
public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
161+
162+
return Mono.fromSupplier(() -> {
163+
164+
ByteBuf buffer = byteBufAllocator.buffer(4 * 32);
165+
166+
this.bind.encode(buffer);
167+
Describe.encode(buffer, this.portal, PORTAL);
168+
Execute.encode(buffer, this.portal, NO_LIMIT);
169+
Sync.INSTANCE.encode(buffer);
170+
Close.encode(buffer, this.portal, PORTAL);
171+
return buffer;
172+
});
173+
}
174+
175+
@Override
176+
public boolean equals(Object o) {
177+
if (this == o) {
178+
return true;
179+
}
180+
if (!(o instanceof BindDescribeExecuteSyncClose)) {
181+
return false;
182+
}
183+
BindDescribeExecuteSyncClose that = (BindDescribeExecuteSyncClose) o;
184+
return Objects.equals(this.bind, that.bind) &&
185+
Objects.equals(this.portal, that.portal);
186+
}
187+
188+
@Override
189+
public int hashCode() {
190+
return Objects.hash(this.bind, this.portal);
191+
}
192+
193+
@Override
194+
public String toString() {
195+
return "BindDescExecSyncClose{" +
196+
"bind=" + this.bind +
197+
", portal='" + this.portal + '\'' +
198+
'}';
199+
}
200+
}
201+
202+
public static class BindDescribeExecuteClose implements FrontendMessage {
203+
204+
private final Bind bind;
205+
206+
private final String portal;
207+
208+
public BindDescribeExecuteClose(Bind bind, String portal) {
209+
210+
this.bind = bind;
211+
this.portal = portal;
212+
}
213+
214+
@Override
215+
public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
216+
217+
return Mono.fromSupplier(() -> {
218+
219+
ByteBuf buffer = byteBufAllocator.buffer(4 * 32);
220+
221+
this.bind.encode(buffer);
222+
Describe.encode(buffer, this.portal, PORTAL);
223+
Execute.encode(buffer, this.portal, NO_LIMIT);
224+
Close.encode(buffer, this.portal, PORTAL);
225+
return buffer;
226+
});
227+
}
228+
229+
@Override
230+
public boolean equals(Object o) {
231+
if (this == o) {
232+
return true;
233+
}
234+
if (!(o instanceof BindDescribeExecuteClose)) {
235+
return false;
236+
}
237+
BindDescribeExecuteClose that = (BindDescribeExecuteClose) o;
238+
return Objects.equals(this.bind, that.bind) &&
239+
Objects.equals(this.portal, that.portal);
240+
}
241+
242+
@Override
243+
public int hashCode() {
244+
return Objects.hash(this.bind, this.portal);
245+
}
246+
247+
@Override
248+
public String toString() {
249+
return "BindDescExecClose{" +
250+
"bind=" + this.bind +
251+
", portal='" + this.portal + '\'' +
252+
'}';
253+
}
254+
}
255+
133256
}

src/main/java/io/r2dbc/postgresql/message/frontend/Bind.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,17 @@ public void encode(ByteBuf byteBuf) {
116116
this.parameterFormats.forEach(format -> writeShort(byteBuf, format.getDiscriminator()));
117117

118118
writeShort(byteBuf, this.parameters.size());
119-
this.parameters.forEach(parameters -> {
120-
if (parameters == NULL_VALUE) {
119+
120+
for (ByteBuf parameter : this.parameters) {
121+
122+
if (parameter == NULL_VALUE) {
121123
writeInt(byteBuf, NULL);
122124
} else {
123-
writeInt(byteBuf, parameters.readableBytes());
124-
writeBytes(byteBuf, parameters);
125-
parameters.release();
125+
writeInt(byteBuf, parameter.readableBytes());
126+
writeBytes(byteBuf, parameter);
127+
parameter.release();
126128
}
127-
});
129+
}
128130

129131
writeShort(byteBuf, this.resultFormats.size());
130132
this.resultFormats.forEach(format -> writeShort(byteBuf, format.getDiscriminator()));

src/main/java/io/r2dbc/postgresql/message/frontend/Close.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,17 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
7171

7272
@Override
7373
public void encode(ByteBuf byteBuf) {
74+
encode(byteBuf, this.name, this.type);
75+
}
7476

77+
public static void encode(ByteBuf byteBuf, String name, ExecutionType type) {
7578
writeByte(byteBuf, 'C');
7679

7780
int writerIndex = byteBuf.writerIndex();
7881

7982
writeLengthPlaceholder(byteBuf);
80-
writeByte(byteBuf, this.type.getDiscriminator());
81-
writeCStringUTF8(byteBuf, this.name);
83+
writeByte(byteBuf, type.getDiscriminator());
84+
writeCStringUTF8(byteBuf, name);
8285
writeSize(byteBuf, writerIndex);
8386
}
8487

src/main/java/io/r2dbc/postgresql/message/frontend/Describe.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,17 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
7171
@Override
7272
public void encode(ByteBuf byteBuf) {
7373

74+
encode(byteBuf, name, type);
75+
}
76+
77+
public static void encode(ByteBuf byteBuf, String name, ExecutionType type) {
7478
writeByte(byteBuf, 'D');
7579

7680
int writerIndex = byteBuf.writerIndex();
7781

7882
writeLengthPlaceholder(byteBuf);
79-
writeByte(byteBuf, this.type.getDiscriminator());
80-
writeCStringUTF8(byteBuf, this.name);
83+
writeByte(byteBuf, type.getDiscriminator());
84+
writeCStringUTF8(byteBuf, name);
8185
writeSize(byteBuf, writerIndex);
8286
}
8387

src/main/java/io/r2dbc/postgresql/message/frontend/Execute.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,17 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
7676

7777
@Override
7878
public void encode(ByteBuf byteBuf) {
79+
encode(byteBuf, this.name, this.rows);
80+
}
7981

82+
public static void encode(ByteBuf byteBuf, String name, int rows) {
8083
writeByte(byteBuf, 'E');
8184

8285
int writerIndex = byteBuf.writerIndex();
8386

8487
writeLengthPlaceholder(byteBuf);
85-
writeCStringUTF8(byteBuf, this.name);
86-
writeInt(byteBuf, this.rows);
88+
writeCStringUTF8(byteBuf, name);
89+
writeInt(byteBuf, rows);
8790
writeSize(byteBuf, writerIndex);
8891
}
8992

@@ -103,7 +106,7 @@ public boolean equals(Object o) {
103106
@Override
104107
public int hashCode() {
105108

106-
return Objects.hash(name, rows);
109+
return Objects.hash(this.name, this.rows);
107110
}
108111

109112
@Override

src/main/java/io/r2dbc/postgresql/message/frontend/Parse.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.reactivestreams.Publisher;
2323
import reactor.core.publisher.Mono;
2424

25-
import java.util.List;
25+
import java.util.Arrays;
2626
import java.util.Objects;
2727

2828
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte;
@@ -49,7 +49,7 @@ public final class Parse implements FrontendMessage {
4949

5050
private final String name;
5151

52-
private final List<Integer> parameters;
52+
private final int[] parameters;
5353

5454
private final String query;
5555

@@ -63,7 +63,7 @@ public final class Parse implements FrontendMessage {
6363
* @see #UNNAMED_STATEMENT
6464
* @see #UNSPECIFIED
6565
*/
66-
public Parse(String name, List<Integer> parameters, String query) {
66+
public Parse(String name, int[] parameters, String query) {
6767
this.name = Assert.requireNonNull(name, "name must not be null");
6868
this.parameters = Assert.requireNonNull(parameters, "parameters must not be null");
6969
this.query = Assert.requireNonNull(query, "query must not be null");
@@ -81,8 +81,11 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
8181
writeCStringUTF8(out, this.name);
8282
writeCStringUTF8(out, this.query);
8383

84-
writeShort(out, this.parameters.size());
85-
this.parameters.forEach(parameter -> writeInt(out, parameter));
84+
writeShort(out, this.parameters.length);
85+
86+
for (int parameter : this.parameters) {
87+
writeInt(out, parameter);
88+
}
8689

8790
return writeSize(out);
8891
});
@@ -98,7 +101,7 @@ public boolean equals(Object o) {
98101
}
99102
Parse that = (Parse) o;
100103
return Objects.equals(this.name, that.name) &&
101-
Objects.equals(this.parameters, that.parameters) &&
104+
Arrays.equals(this.parameters, that.parameters) &&
102105
Objects.equals(this.query, that.query);
103106
}
104107

@@ -111,7 +114,7 @@ public int hashCode() {
111114
public String toString() {
112115
return "Parse{" +
113116
"name='" + this.name + '\'' +
114-
", parameters=" + this.parameters +
117+
", parameters=" + Arrays.toString(this.parameters) +
115118
", query='" + this.query + '\'' +
116119
'}';
117120
}

0 commit comments

Comments
 (0)