Skip to content

Commit 87462fc

Browse files
Squirymp911de
authored andcommitted
Prepared statement cache size configuration
Added preparedStatementCacheQueries parameter, which controls cache behaviour. When cache limit is reached last used statement is closed before parsing new one. When cache limit is 0 caching of statement is disabled and unnamed statements are used in extended message flow. Negative values stand for IndefiniteStatementCache (default one). [#223][#225][#277]
1 parent 0bdcca5 commit 87462fc

11 files changed

+537
-7
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.client.Binding;
20+
import io.r2dbc.postgresql.client.Client;
21+
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
22+
import io.r2dbc.postgresql.util.Assert;
23+
import reactor.core.publisher.Mono;
24+
25+
class DisabledStatementCache implements StatementCache {
26+
27+
private static final String UNNAMED_STATEMENT_NAME = "";
28+
29+
private final Client client;
30+
31+
DisabledStatementCache(Client client) {
32+
this.client = Assert.requireNonNull(client, "client must not be null");
33+
}
34+
35+
@Override
36+
public Mono<String> getName(Binding binding, String sql) {
37+
Assert.requireNonNull(binding, "binding must not be null");
38+
Assert.requireNonNull(sql, "sql must not be null");
39+
String name = UNNAMED_STATEMENT_NAME;
40+
41+
ExceptionFactory factory = ExceptionFactory.withSql(name);
42+
return ExtendedQueryMessageFlow
43+
.parse(this.client, name, sql, binding.getParameterTypes())
44+
.handle(factory::handleErrorResponse)
45+
.then(Mono.just(name));
46+
}
47+
48+
@Override
49+
public String toString() {
50+
return "DisabledStatementCache{" +
51+
"client=" + this.client +
52+
'}';
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.client.Binding;
20+
import io.r2dbc.postgresql.client.Client;
21+
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
22+
import io.r2dbc.postgresql.util.Assert;
23+
import reactor.core.publisher.Mono;
24+
import reactor.util.function.Tuple2;
25+
import reactor.util.function.Tuples;
26+
27+
import java.util.Arrays;
28+
import java.util.HashMap;
29+
import java.util.LinkedHashMap;
30+
import java.util.List;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.stream.Collectors;
33+
34+
public class LimitedStatementCache implements StatementCache {
35+
36+
private final HashMap<Tuple2<String, List<Integer>>, String> cache = new LinkedHashMap<>(16, 0.75f, true);
37+
38+
private final Client client;
39+
40+
private final AtomicInteger counter = new AtomicInteger();
41+
42+
private final int limit;
43+
44+
public LimitedStatementCache(Client client, int limit) {
45+
this.client = Assert.requireNonNull(client, "client must not be null");
46+
if (limit <= 0) {
47+
throw new IllegalArgumentException("statement cache limit must be greater than zero");
48+
}
49+
this.limit = limit;
50+
}
51+
52+
@Override
53+
public Mono<String> getName(Binding binding, String sql) {
54+
Assert.requireNonNull(binding, "binding must not be null");
55+
Assert.requireNonNull(sql, "sql must not be null");
56+
Tuple2<String, List<Integer>> key = Tuples.of(sql, Arrays.stream(binding.getParameterTypes()).boxed().collect(Collectors.toList()));
57+
String name = this.cache.get(key);
58+
if (name != null) {
59+
return Mono.just(name);
60+
}
61+
62+
Mono<Void> closeLastStatement = Mono.defer(() -> {
63+
if (this.cache.size() < this.limit) {
64+
return Mono.empty();
65+
}
66+
String lastAccessedStatementName = this.cache.values().iterator().next();
67+
ExceptionFactory factory = ExceptionFactory.withSql(lastAccessedStatementName);
68+
return ExtendedQueryMessageFlow
69+
.closeStatement(this.client, lastAccessedStatementName)
70+
.handle(factory::handleErrorResponse)
71+
.then();
72+
});
73+
74+
return closeLastStatement.then(this.parse(sql, binding.getParameterTypes()))
75+
.doOnNext(preparedName -> this.cache.put(key, preparedName));
76+
}
77+
78+
@Override
79+
public String toString() {
80+
return "LimitedStatementCache{" +
81+
"cache=" + this.cache +
82+
", counter=" + this.counter +
83+
", client=" + this.client +
84+
", limit=" + this.limit +
85+
'}';
86+
}
87+
88+
private Mono<String> parse(String sql, int[] types) {
89+
String name = String.format("S_%d", this.counter.getAndIncrement());
90+
91+
ExceptionFactory factory = ExceptionFactory.withSql(name);
92+
return ExtendedQueryMessageFlow
93+
.parse(this.client, name, sql, types)
94+
.handle(factory::handleErrorResponse)
95+
.then(Mono.just(name))
96+
.cache();
97+
}
98+
}

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionConfiguration.java

+35-2
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,14 @@ public final class PostgresqlConnectionConfiguration {
8383

8484
private final SSLConfig sslConfig;
8585

86+
private final int preparedStatementCacheQueries;
87+
8688
private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions,
8789
@Nullable Duration connectTimeout, @Nullable String database, List<Extension> extensions,
8890
ToIntFunction<String> fetchSize, boolean forceBinary,
8991
@Nullable String host,
9092
@Nullable Map<String, String> options, @Nullable CharSequence password, int port, @Nullable String schema, @Nullable String socket, String username,
91-
SSLConfig sslConfig) {
93+
SSLConfig sslConfig, int preparedStatementCacheQueries) {
9294
this.applicationName = Assert.requireNonNull(applicationName, "applicationName must not be null");
9395
this.autodetectExtensions = autodetectExtensions;
9496
this.connectTimeout = connectTimeout;
@@ -108,6 +110,7 @@ private PostgresqlConnectionConfiguration(String applicationName, boolean autode
108110
this.socket = socket;
109111
this.username = Assert.requireNonNull(username, "username must not be null");
110112
this.sslConfig = sslConfig;
113+
this.preparedStatementCacheQueries = preparedStatementCacheQueries;
111114
}
112115

113116
/**
@@ -239,6 +242,21 @@ SSLConfig getSslConfig() {
239242
return this.sslConfig;
240243
}
241244

245+
int getPreparedStatementCacheQueries() {
246+
return this.preparedStatementCacheQueries;
247+
}
248+
249+
private static String obfuscate(int length) {
250+
251+
StringBuilder builder = new StringBuilder();
252+
253+
for (int i = 0; i < length; i++) {
254+
builder.append("*");
255+
}
256+
257+
return builder.toString();
258+
}
259+
242260
/**
243261
* A builder for {@link PostgresqlConnectionConfiguration} instances.
244262
* <p>
@@ -299,6 +317,8 @@ public static final class Builder {
299317
@Nullable
300318
private String username;
301319

320+
private int preparedStatementCacheQueries = -1;
321+
302322
private Builder() {
303323
}
304324

@@ -346,7 +366,7 @@ public PostgresqlConnectionConfiguration build() {
346366

347367
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.extensions, this.fetchSize, this.forceBinary,
348368
this.host,
349-
this.options, this.password, this.port, this.schema, this.socket, this.username, this.createSslConfig());
369+
this.options, this.password, this.port, this.schema, this.socket, this.username, this.createSslConfig(), this.preparedStatementCacheQueries);
350370
}
351371

352372
/**
@@ -589,6 +609,18 @@ public Builder sslPassword(@Nullable CharSequence sslPassword) {
589609
return this;
590610
}
591611

612+
/**
613+
* Configure the preparedStatementCacheQueries.
614+
*
615+
* @param preparedStatementCacheQueries the preparedStatementCacheQueries
616+
* @return this {@link Builder}
617+
* @throws IllegalArgumentException if {@code username} is {@code null}
618+
*/
619+
public Builder preparedStatementCacheQueries(int preparedStatementCacheQueries) {
620+
this.preparedStatementCacheQueries = preparedStatementCacheQueries;
621+
return this;
622+
}
623+
592624
@Override
593625
public String toString() {
594626
return "Builder{" +
@@ -612,6 +644,7 @@ public String toString() {
612644
", sslCert='" + this.sslCert + '\'' +
613645
", sslKey='" + this.sslKey + '\'' +
614646
", sslHostnameVerifier='" + this.sslHostnameVerifier + '\'' +
647+
", preparedStatementCacheQueries='" + this.preparedStatementCacheQueries + '\'' +
615648
'}';
616649
}
617650

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ private Mono<PostgresqlConnection> doCreateConnection(boolean forReplication, @N
158158
.flatMap(client -> {
159159

160160
DefaultCodecs codecs = new DefaultCodecs(client.getByteBufAllocator());
161-
StatementCache statementCache = new IndefiniteStatementCache(client);
161+
StatementCache statementCache = StatementCache.fromPreparedStatementCacheQueries(client, this.configuration.getPreparedStatementCacheQueries());
162162

163163
// early connection object to retrieve initialization details
164164
PostgresqlConnection earlyConnection = new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, statementCache, IsolationLevel.READ_COMMITTED,

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryProvider.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import javax.net.ssl.HostnameVerifier;
2727
import java.util.LinkedHashMap;
28-
import java.util.Locale;
2928
import java.util.Map;
3029
import java.util.function.Function;
3130

@@ -125,6 +124,12 @@ public final class PostgresqlConnectionFactoryProvider implements ConnectionFact
125124
*/
126125
public static final Option<String> SSL_ROOT_CERT = Option.valueOf("sslRootCert");
127126

127+
/**
128+
* Determine the number of queries that are cached in each connection.
129+
* The default is -1, meaning there's no limit. The value of 0 disables the cache.
130+
*/
131+
public static final Option<Integer> PREPARED_STATEMENT_CACHE_QUERIES = Option.valueOf("preparedStatementCacheQueries");
132+
128133
/**
129134
* Connection options which are applied once after the connection has been created.
130135
*/
@@ -188,6 +193,11 @@ private static PostgresqlConnectionConfiguration createConfiguration(ConnectionF
188193
builder.forceBinary(convertToBoolean(forceBinary));
189194
}
190195

196+
Object preparedStatementCacheQueries = connectionFactoryOptions.getValue(PREPARED_STATEMENT_CACHE_QUERIES);
197+
if (preparedStatementCacheQueries != null) {
198+
builder.preparedStatementCacheQueries(convertToInt(preparedStatementCacheQueries));
199+
}
200+
191201
Object options = connectionFactoryOptions.getValue(Option.valueOf("options"));
192202
if (options != null) {
193203
builder.options(convertToMap(options));
@@ -274,7 +284,7 @@ private static boolean convertToBoolean(Object value) {
274284
private static int convertToInt(Object value) {
275285
return value instanceof Integer ? (int) value : Integer.parseInt(value.toString());
276286
}
277-
287+
278288
@SuppressWarnings("unchecked")
279289
private static Map<String, String> convertToMap(Object options) {
280290
if (options instanceof Map) {

src/main/java/io/r2dbc/postgresql/StatementCache.java

+10
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,20 @@
1717
package io.r2dbc.postgresql;
1818

1919
import io.r2dbc.postgresql.client.Binding;
20+
import io.r2dbc.postgresql.client.Client;
2021
import reactor.core.publisher.Mono;
2122

2223
interface StatementCache {
2324

2425
Mono<String> getName(Binding binding, String sql);
2526

27+
static StatementCache fromPreparedStatementCacheQueries(Client client, int preparedStatementCacheQueries) {
28+
if (preparedStatementCacheQueries < 0) {
29+
return new IndefiniteStatementCache(client);
30+
}
31+
if (preparedStatementCacheQueries == 0) {
32+
return new DisabledStatementCache(client);
33+
}
34+
return new LimitedStatementCache(client, preparedStatementCacheQueries);
35+
}
2636
}

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,7 @@ public Mono<Void> close() {
193193
if (this.isClosed.compareAndSet(false, true)) {
194194

195195
if (!connected || this.processId == null) {
196-
this.connection.dispose();
197-
return this.connection.onDispose();
196+
return closeConnection();
198197
}
199198

200199
return Flux.just(Terminate.INSTANCE)
@@ -209,6 +208,11 @@ public Mono<Void> close() {
209208
});
210209
}
211210

211+
private Mono<? extends Void> closeConnection() {
212+
this.connection.dispose();
213+
return this.connection.onDispose();
214+
}
215+
212216
@Override
213217
public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests) {
214218
Assert.requireNonNull(takeUntil, "takeUntil must not be null");

0 commit comments

Comments
 (0)