Skip to content

Commit c9f9bf7

Browse files
committed
Polishing
Rename LimitedStatementCache to BoundedStatementCache. Synchronize cache access. Update documentation. Extract PgBouncerIntegrationTests into its own top-level class. [#223][#225][closes #277]
1 parent 99cf367 commit c9f9bf7

12 files changed

+399
-260
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
7979
| `autodetectExtensions` | Whether to auto-detect and register `Extension`s from the class path. Defaults to `true`. _(Optional)_
8080
| `fetchSize` | The default number of rows to return when fetching results. Defaults to `0` for unlimited. _(Optional)_
8181
| `forceBinary` | Whether to force binary transfer. Defaults to `false`. _(Optional)_
82+
| `preparedStatementCacheQueries` | Determine the number of queries that are cached in each connection. The default is `-1`, meaning there's no limit. The value of `-1` disables the cache. Any other value specifies the cache size.
8283
| `options` | A `Map<String, String>` of connection parameters. These are applied to each database connection created by the `ConnectionFactory`. Useful for setting generic [PostgreSQL connection parameters][psql-runtime-config]. _(Optional)_
8384
| `schema` | The search path to set. _(Optional)_
8485
| `sslMode` | SSL mode to use, see `SSLMode` enum. Supported values: `DISABLE`, `ALLOW`, `PREFER`, `REQUIRE`, `VERIFY_CA`, `VERIFY_FULL`. _(Optional)_
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Copyright 2019-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.client.Binding;
20+
import io.r2dbc.postgresql.client.Client;
21+
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
22+
import io.r2dbc.postgresql.util.Assert;
23+
import reactor.core.publisher.Mono;
24+
import reactor.util.annotation.Nullable;
25+
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.Collection;
29+
import java.util.Iterator;
30+
import java.util.LinkedHashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
35+
/**
36+
* Bounded (size-limited) {@link StatementCache}.
37+
*/
38+
final class BoundedStatementCache implements StatementCache {
39+
40+
private final Map<CacheKey, String> cache = new LinkedHashMap<>(16, 0.75f, true);
41+
42+
private final Client client;
43+
44+
private final AtomicInteger counter = new AtomicInteger();
45+
46+
private final int limit;
47+
48+
public BoundedStatementCache(Client client, int limit) {
49+
this.client = Assert.requireNonNull(client, "client must not be null");
50+
if (limit <= 0) {
51+
throw new IllegalArgumentException("statement cache limit must be greater than zero");
52+
}
53+
this.limit = limit;
54+
}
55+
56+
@Override
57+
public Mono<String> getName(Binding binding, String sql) {
58+
Assert.requireNonNull(binding, "binding must not be null");
59+
Assert.requireNonNull(sql, "sql must not be null");
60+
CacheKey key = new CacheKey(sql, binding.getParameterTypes());
61+
String name = get(key);
62+
if (name != null) {
63+
return Mono.just(name);
64+
}
65+
66+
Mono<Void> closeLastStatement = Mono.defer(() -> {
67+
if (getCacheSize() < this.limit) {
68+
return Mono.empty();
69+
}
70+
String lastAccessedStatementName = getAndRemoveEldest();
71+
ExceptionFactory factory = ExceptionFactory.withSql(lastAccessedStatementName);
72+
return ExtendedQueryMessageFlow
73+
.closeStatement(this.client, lastAccessedStatementName)
74+
.handle(factory::handleErrorResponse)
75+
.then();
76+
});
77+
78+
return closeLastStatement.then(parse(sql, binding.getParameterTypes()))
79+
.doOnNext(preparedName -> put(key, preparedName));
80+
}
81+
82+
83+
/**
84+
* Synchronized cache access: Return all statement names.
85+
*
86+
* @return statement names.
87+
*/
88+
Collection<String> getCachedStatementNames() {
89+
synchronized (this.cache) {
90+
List<String> names = new ArrayList<>(this.cache.size());
91+
names.addAll(cache.values());
92+
return names;
93+
}
94+
}
95+
96+
/**
97+
* Synchronized cache access: Retrieve statement name by key.
98+
*
99+
* @param key
100+
* @return statement name by key
101+
*/
102+
@Nullable
103+
private String get(CacheKey key) {
104+
synchronized (this.cache) {
105+
return this.cache.get(key);
106+
}
107+
}
108+
109+
/**
110+
* Synchronized cache access: Least recently used entry.
111+
*
112+
* @return least recently used entry
113+
*/
114+
private String getAndRemoveEldest() {
115+
synchronized (this.cache) {
116+
Iterator<Map.Entry<CacheKey, String>> iterator = this.cache.entrySet().iterator();
117+
String entry = iterator.next().getValue();
118+
iterator.remove();
119+
return entry;
120+
}
121+
}
122+
123+
/**
124+
* Synchronized cache access: Store prepared statement.
125+
*/
126+
private void put(CacheKey key, String preparedName) {
127+
synchronized (this.cache) {
128+
this.cache.put(key, preparedName);
129+
}
130+
}
131+
132+
/**
133+
* Synchronized cache access: Return the cache size.
134+
*
135+
* @return the cache size.
136+
*/
137+
private int getCacheSize() {
138+
synchronized (this.cache) {
139+
return this.cache.size();
140+
}
141+
}
142+
143+
@Override
144+
public String toString() {
145+
return "LimitedStatementCache{" +
146+
"cache=" + this.cache +
147+
", counter=" + this.counter +
148+
", client=" + this.client +
149+
", limit=" + this.limit +
150+
'}';
151+
}
152+
153+
private Mono<String> parse(String sql, int[] types) {
154+
String name = "S_" + this.counter.getAndIncrement();
155+
156+
ExceptionFactory factory = ExceptionFactory.withSql(name);
157+
return ExtendedQueryMessageFlow
158+
.parse(this.client, name, sql, types)
159+
.handle(factory::handleErrorResponse)
160+
.then(Mono.just(name))
161+
.cache();
162+
}
163+
164+
static class CacheKey {
165+
166+
String sql;
167+
168+
int[] parameterTypes;
169+
170+
public CacheKey(String sql, int[] parameterTypes) {
171+
this.sql = sql;
172+
this.parameterTypes = parameterTypes;
173+
}
174+
175+
@Override
176+
public boolean equals(Object o) {
177+
if (this == o) {
178+
return true;
179+
}
180+
if (!(o instanceof CacheKey)) {
181+
return false;
182+
}
183+
184+
CacheKey cacheKey = (CacheKey) o;
185+
186+
if (this.sql != null ? !this.sql.equals(cacheKey.sql) : cacheKey.sql != null) {
187+
return false;
188+
}
189+
return Arrays.equals(this.parameterTypes, cacheKey.parameterTypes);
190+
}
191+
192+
@Override
193+
public int hashCode() {
194+
int result = this.sql != null ? this.sql.hashCode() : 0;
195+
result = 31 * result + Arrays.hashCode(this.parameterTypes);
196+
return result;
197+
}
198+
}
199+
}

src/main/java/io/r2dbc/postgresql/IndefiniteStatementCache.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public Mono<String> getName(Binding binding, String sql) {
6868

6969
Mono<String> mono = typedMap.get(binding.getParameterTypes());
7070
if (mono == null) {
71-
mono = this.parse(sql, binding.getParameterTypes());
71+
mono = parse(sql, binding.getParameterTypes());
7272
typedMap.put(binding.getParameterTypes(), mono);
7373
}
7474

src/main/java/io/r2dbc/postgresql/LimitedStatementCache.java

-98
This file was deleted.

0 commit comments

Comments
 (0)