Skip to content

Commit fc1c7f3

Browse files
committed
Merge branch 'Squiry-prepared-statement-cache-configuration'
[resolves pgjdbc#223]
2 parents e5f35c0 + 3c872e7 commit fc1c7f3

21 files changed

+1021
-137
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
7878
| `applicationName` | The name of the application connecting to the database. Defaults to `r2dbc-postgresql`. _(Optional)_
7979
| `autodetectExtensions` | Whether to auto-detect and register `Extension`s from the class path. Defaults to `true`. _(Optional)_
8080
| `forceBinary` | Whether to force binary transfer. Defaults to `false`. _(Optional)_
81+
| `preparedStatementCacheQueries` | Determine the number of queries that are cached in each connection. The default is `-1`, meaning there's no limit. The value of `-1` disables the cache. Any other value specifies the cache size.
8182
| `options` | A `Map<String, String>` of connection parameters. These are applied to each database connection created by the `ConnectionFactory`. Useful for setting generic [PostgreSQL connection parameters][psql-runtime-config]. _(Optional)_
8283
| `schema` | The schema to set. _(Optional)_
8384
| `sslMode` | SSL mode to use, see `SSLMode` enum. Supported values: `DISABLE`, `ALLOW`, `PREFER`, `REQUIRE`, `VERIFY_CA`, `VERIFY_FULL`. _(Optional)_
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.client.Binding;
20+
import io.r2dbc.postgresql.client.Client;
21+
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
22+
import io.r2dbc.postgresql.util.Assert;
23+
import reactor.core.publisher.Mono;
24+
import reactor.util.annotation.Nullable;
25+
import reactor.util.function.Tuple2;
26+
import reactor.util.function.Tuples;
27+
28+
import java.util.ArrayList;
29+
import java.util.Collection;
30+
import java.util.Iterator;
31+
import java.util.LinkedHashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
36+
/**
37+
* Bounded (size-limited) {@link StatementCache}.
38+
*/
39+
final class BoundedStatementCache implements StatementCache {
40+
41+
private final Map<Tuple2<String, List<Integer>>, String> cache = new LinkedHashMap<>(16, 0.75f, true);
42+
43+
private final Client client;
44+
45+
private final AtomicInteger counter = new AtomicInteger();
46+
47+
private final int limit;
48+
49+
public BoundedStatementCache(Client client, int limit) {
50+
this.client = Assert.requireNonNull(client, "client must not be null");
51+
if (limit <= 0) {
52+
throw new IllegalArgumentException("statement cache limit must be greater than zero");
53+
}
54+
this.limit = limit;
55+
}
56+
57+
@Override
58+
public Mono<String> getName(Binding binding, String sql) {
59+
Assert.requireNonNull(binding, "binding must not be null");
60+
Assert.requireNonNull(sql, "sql must not be null");
61+
Tuple2<String, List<Integer>> key = Tuples.of(sql, binding.getParameterTypes());
62+
String name = get(key);
63+
if (name != null) {
64+
return Mono.just(name);
65+
}
66+
67+
Mono<Void> closeLastStatement = Mono.defer(() -> {
68+
if (getCacheSize() < this.limit) {
69+
return Mono.empty();
70+
}
71+
String lastAccessedStatementName = getAndRemoveEldest();
72+
ExceptionFactory factory = ExceptionFactory.withSql(lastAccessedStatementName);
73+
return ExtendedQueryMessageFlow
74+
.closeStatement(this.client, lastAccessedStatementName)
75+
.handle(factory::handleErrorResponse)
76+
.then();
77+
});
78+
79+
return closeLastStatement.then(this.parse(sql, binding.getParameterTypes()))
80+
.doOnNext(preparedName -> put(key, preparedName));
81+
}
82+
83+
84+
/**
85+
* Synchronized cache access: Return all statement names.
86+
*
87+
* @return statement names.
88+
*/
89+
Collection<String> getCachedStatementNames() {
90+
synchronized (this.cache) {
91+
List<String> names = new ArrayList<>(this.cache.size());
92+
names.addAll(cache.values());
93+
return names;
94+
}
95+
}
96+
97+
/**
98+
* Synchronized cache access: Retrieve statement name by key.
99+
*
100+
* @param key
101+
* @return statement name by key
102+
*/
103+
@Nullable
104+
private String get(Tuple2<String, List<Integer>> key) {
105+
synchronized (this.cache) {
106+
return this.cache.get(key);
107+
}
108+
}
109+
110+
/**
111+
* Synchronized cache access: Least recently used entry.
112+
*
113+
* @return least recently used entry
114+
*/
115+
private String getAndRemoveEldest() {
116+
synchronized (this.cache) {
117+
Iterator<Map.Entry<Tuple2<String, List<Integer>>, String>> iterator = this.cache.entrySet().iterator();
118+
String entry = iterator.next().getValue();
119+
iterator.remove();
120+
return entry;
121+
}
122+
}
123+
124+
/**
125+
* Synchronized cache access: Store prepared statement.
126+
*/
127+
private void put(Tuple2<String, List<Integer>> key, String preparedName) {
128+
synchronized (this.cache) {
129+
this.cache.put(key, preparedName);
130+
}
131+
}
132+
133+
/**
134+
* Synchronized cache access: Return the cache size.
135+
*
136+
* @return the cache size.
137+
*/
138+
private int getCacheSize() {
139+
synchronized (this.cache) {
140+
return this.cache.size();
141+
}
142+
}
143+
144+
@Override
145+
public String toString() {
146+
return "LimitedStatementCache{" +
147+
"cache=" + this.cache +
148+
", counter=" + this.counter +
149+
", client=" + this.client +
150+
", limit=" + this.limit +
151+
'}';
152+
}
153+
154+
private Mono<String> parse(String sql, List<Integer> types) {
155+
String name = String.format("S_%d", this.counter.getAndIncrement());
156+
157+
ExceptionFactory factory = ExceptionFactory.withSql(name);
158+
return ExtendedQueryMessageFlow
159+
.parse(this.client, name, sql, types)
160+
.handle(factory::handleErrorResponse)
161+
.then(Mono.just(name))
162+
.cache();
163+
}
164+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.client.Binding;
20+
import io.r2dbc.postgresql.client.Client;
21+
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
22+
import io.r2dbc.postgresql.util.Assert;
23+
import reactor.core.publisher.Mono;
24+
25+
class DisabledStatementCache implements StatementCache {
26+
27+
private static final String UNNAMED_STATEMENT_NAME = "";
28+
29+
private final Client client;
30+
31+
DisabledStatementCache(Client client) {
32+
this.client = Assert.requireNonNull(client, "client must not be null");
33+
}
34+
35+
@Override
36+
public Mono<String> getName(Binding binding, String sql) {
37+
Assert.requireNonNull(binding, "binding must not be null");
38+
Assert.requireNonNull(sql, "sql must not be null");
39+
String name = UNNAMED_STATEMENT_NAME;
40+
41+
ExceptionFactory factory = ExceptionFactory.withSql(name);
42+
return ExtendedQueryMessageFlow
43+
.parse(this.client, name, sql, binding.getParameterTypes())
44+
.handle(factory::handleErrorResponse)
45+
.then(Mono.just(name));
46+
}
47+
48+
@Override
49+
public String toString() {
50+
return "DisabledStatementCache{" +
51+
"client=" + this.client +
52+
'}';
53+
}
54+
}

src/main/java/io/r2dbc/postgresql/IndefiniteStatementCache.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ public Mono<String> getName(Binding binding, String sql) {
4646
Assert.requireNonNull(binding, "binding must not be null");
4747
Assert.requireNonNull(sql, "sql must not be null");
4848

49-
return this.cache.computeIfAbsent(Tuples.of(sql, binding.getParameterTypes()),
50-
tuple -> this.parse(tuple.getT1(), tuple.getT2()));
49+
synchronized (this.cache) {
50+
return this.cache.computeIfAbsent(Tuples.of(sql, binding.getParameterTypes()),
51+
tuple -> this.parse(tuple.getT1(), tuple.getT2()));
52+
}
5153
}
5254

5355
@Override

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionConfiguration.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,12 @@ public final class PostgresqlConnectionConfiguration {
7878

7979
private final SSLConfig sslConfig;
8080

81+
private final int preparedStatementCacheQueries;
82+
8183
private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions,
8284
@Nullable Duration connectTimeout, @Nullable String database, List<Extension> extensions, boolean forceBinary, @Nullable String host,
8385
@Nullable Map<String, String> options, @Nullable CharSequence password, int port, @Nullable String schema, @Nullable String socket, String username,
84-
SSLConfig sslConfig) {
86+
SSLConfig sslConfig, int preparedStatementCacheQueries) {
8587
this.applicationName = Assert.requireNonNull(applicationName, "applicationName must not be null");
8688
this.autodetectExtensions = autodetectExtensions;
8789
this.connectTimeout = connectTimeout;
@@ -96,6 +98,7 @@ private PostgresqlConnectionConfiguration(String applicationName, boolean autode
9698
this.socket = socket;
9799
this.username = Assert.requireNonNull(username, "username must not be null");
98100
this.sslConfig = sslConfig;
101+
this.preparedStatementCacheQueries = preparedStatementCacheQueries;
99102
}
100103

101104
/**
@@ -215,6 +218,10 @@ SSLConfig getSslConfig() {
215218
return this.sslConfig;
216219
}
217220

221+
int getPreparedStatementCacheQueries() {
222+
return this.preparedStatementCacheQueries;
223+
}
224+
218225
private static String obfuscate(int length) {
219226

220227
StringBuilder builder = new StringBuilder();
@@ -284,6 +291,8 @@ public static final class Builder {
284291
@Nullable
285292
private String username;
286293

294+
private int preparedStatementCacheQueries = -1;
295+
287296
private Builder() {
288297
}
289298

@@ -330,7 +339,7 @@ public PostgresqlConnectionConfiguration build() {
330339
}
331340

332341
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.extensions, this.forceBinary, this.host,
333-
this.options, this.password, this.port, this.schema, this.socket, this.username, this.createSslConfig());
342+
this.options, this.password, this.port, this.schema, this.socket, this.username, this.createSslConfig(), this.preparedStatementCacheQueries);
334343
}
335344

336345
/**
@@ -569,6 +578,19 @@ public Builder username(String username) {
569578
return this;
570579
}
571580

581+
/**
582+
* Configure the preparedStatementCacheQueries. The default is {@code -1}, meaning there's no limit. The value of {@code 0} disables the cache. Any other value specifies the cache size.
583+
*
584+
* @param preparedStatementCacheQueries the preparedStatementCacheQueries
585+
* @return this {@link Builder}
586+
* @throws IllegalArgumentException if {@code username} is {@code null}
587+
* @since 0.8.1
588+
*/
589+
public Builder preparedStatementCacheQueries(int preparedStatementCacheQueries) {
590+
this.preparedStatementCacheQueries = preparedStatementCacheQueries;
591+
return this;
592+
}
593+
572594
@Override
573595
public String toString() {
574596
return "Builder{" +
@@ -591,6 +613,7 @@ public String toString() {
591613
", sslCert='" + this.sslCert + '\'' +
592614
", sslKey='" + this.sslKey + '\'' +
593615
", sslHostnameVerifier='" + this.sslHostnameVerifier + '\'' +
616+
", preparedStatementCacheQueries='" + this.preparedStatementCacheQueries + '\'' +
594617
'}';
595618
}
596619

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,10 @@ private Mono<PostgresqlConnection> doCreateConnection(boolean forReplication, @N
169169
isolationLevelMono = getIsolationLevel(client, codecs);
170170
}
171171

172+
StatementCache statementCache = StatementCache.fromPreparedStatementCacheQueries(client, this.configuration.getPreparedStatementCacheQueries());
173+
172174
return isolationLevelMono
173-
.map(it -> new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, new IndefiniteStatementCache(client), it, this.configuration.isForceBinary()))
175+
.map(it -> new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, statementCache, it, this.configuration.isForceBinary()))
174176
.delayUntil(connection -> {
175177
return prepareConnection(connection, client.getByteBufAllocator(), codecs);
176178
})

0 commit comments

Comments
 (0)