Skip to content

Commit fa745e8

Browse files
authored
test: add benchmark for long-running sessions clean up task. (#2564)
Adding benchmarks for long running sessions cleanup feature introduced as part of - https://togithub.com/googleapis/java-spanner/pull/2419 .
1 parent 7468a14 commit fa745e8

File tree

2 files changed

+328
-1
lines changed

2 files changed

+328
-1
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,6 @@ public boolean isTrackStackTraceOfSessionCheckout() {
235235
return trackStackTraceOfSessionCheckout;
236236
}
237237

238-
@VisibleForTesting
239238
Duration getWaitForMinSessions() {
240239
return waitForMinSessions;
241240
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import com.google.api.gax.rpc.TransportChannelProvider;
22+
import com.google.cloud.NoCredentials;
23+
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
24+
import com.google.cloud.spanner.SessionPoolOptions.ActionOnInactiveTransaction;
25+
import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions;
26+
import com.google.common.util.concurrent.Futures;
27+
import com.google.common.util.concurrent.ListenableFuture;
28+
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
29+
import com.google.common.util.concurrent.MoreExecutors;
30+
import com.google.spanner.v1.BatchCreateSessionsRequest;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.Random;
34+
import java.util.concurrent.Executors;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicInteger;
37+
import org.openjdk.jmh.annotations.AuxCounters;
38+
import org.openjdk.jmh.annotations.Benchmark;
39+
import org.openjdk.jmh.annotations.BenchmarkMode;
40+
import org.openjdk.jmh.annotations.Fork;
41+
import org.openjdk.jmh.annotations.Level;
42+
import org.openjdk.jmh.annotations.Measurement;
43+
import org.openjdk.jmh.annotations.Mode;
44+
import org.openjdk.jmh.annotations.OutputTimeUnit;
45+
import org.openjdk.jmh.annotations.Param;
46+
import org.openjdk.jmh.annotations.Scope;
47+
import org.openjdk.jmh.annotations.Setup;
48+
import org.openjdk.jmh.annotations.State;
49+
import org.openjdk.jmh.annotations.TearDown;
50+
import org.openjdk.jmh.annotations.Warmup;
51+
import org.threeten.bp.Duration;
52+
53+
/**
54+
* Benchmarks for long-running sessions scenarios. The simulated execution times are based on
55+
* reasonable estimates and are primarily intended to keep the benchmarks comparable with each other
56+
* before and after changes have been made to the pool. The benchmarks are bound to the Maven
57+
* profile `benchmark` and can be executed like this: <code>
58+
* mvn clean test -DskipTests -Pbenchmark -Dbenchmark.name=LongRunningSessionsBenchmark
59+
* </code>
60+
*/
61+
@BenchmarkMode(Mode.AverageTime)
62+
@Fork(value = 1, warmups = 0)
63+
@Measurement(batchSize = 1, iterations = 1, timeUnit = TimeUnit.MILLISECONDS)
64+
@Warmup(batchSize = 0, iterations = 0)
65+
@OutputTimeUnit(TimeUnit.SECONDS)
66+
public class LongRunningSessionsBenchmark {
67+
private static final String TEST_PROJECT = "my-project";
68+
private static final String TEST_INSTANCE = "my-instance";
69+
private static final String TEST_DATABASE = "my-database";
70+
private static final int HOLD_SESSION_TIME = 100;
71+
private static final int LONG_HOLD_SESSION_TIME = 10000; // 10 seconds
72+
private static final int RND_WAIT_TIME_BETWEEN_REQUESTS = 100;
73+
private static final Random RND = new Random();
74+
75+
@State(Scope.Thread)
76+
@AuxCounters(org.openjdk.jmh.annotations.AuxCounters.Type.EVENTS)
77+
public static class BenchmarkState {
78+
private StandardBenchmarkMockServer mockServer;
79+
private Spanner spanner;
80+
private DatabaseClientImpl client;
81+
private AtomicInteger longRunningSessions;
82+
83+
@Param({"100"})
84+
int minSessions;
85+
86+
@Param({"400"})
87+
int maxSessions;
88+
89+
@Param({"4"})
90+
int numChannels;
91+
92+
/** AuxCounter for number of RPCs. */
93+
public int numBatchCreateSessionsRpcs() {
94+
return mockServer.countRequests(BatchCreateSessionsRequest.class);
95+
}
96+
97+
/** AuxCounter for number of sessions created. */
98+
public int sessionsCreated() {
99+
return mockServer.getMockSpanner().numSessionsCreated();
100+
}
101+
102+
@Setup(Level.Invocation)
103+
public void setup() throws Exception {
104+
mockServer = new StandardBenchmarkMockServer();
105+
longRunningSessions = new AtomicInteger();
106+
TransportChannelProvider channelProvider = mockServer.start();
107+
108+
/**
109+
* This ensures that the background thread responsible for cleaning long-running sessions
110+
* executes every 10s. Any transaction for which session has not been used for more than 2s
111+
* will be treated as long-running.
112+
*/
113+
InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
114+
InactiveTransactionRemovalOptions.newBuilder()
115+
.setActionOnInactiveTransaction(ActionOnInactiveTransaction.WARN_AND_CLOSE)
116+
.setExecutionFrequency(Duration.ofSeconds(10))
117+
.setIdleTimeThreshold(Duration.ofSeconds(2))
118+
.build();
119+
SpannerOptions options =
120+
SpannerOptions.newBuilder()
121+
.setProjectId(TEST_PROJECT)
122+
.setChannelProvider(channelProvider)
123+
.setNumChannels(numChannels)
124+
.setCredentials(NoCredentials.getInstance())
125+
.setSessionPoolOption(
126+
SessionPoolOptions.newBuilder()
127+
.setMinSessions(minSessions)
128+
.setMaxSessions(maxSessions)
129+
.setWaitForMinSessions(Duration.ofSeconds(5))
130+
.setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions)
131+
.build())
132+
.build();
133+
134+
spanner = options.getService();
135+
client =
136+
(DatabaseClientImpl)
137+
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
138+
}
139+
140+
@TearDown(Level.Invocation)
141+
public void teardown() throws Exception {
142+
spanner.close();
143+
mockServer.shutdown();
144+
}
145+
}
146+
147+
/**
148+
* Measures the time needed to execute a burst of read requests.
149+
*
150+
* <p>Some read requests will be long-running and will cause session leaks. Such sessions will be
151+
* removed by the session maintenance background task if SessionPool Option
152+
* ActionOnInactiveTransaction is set as WARN_AND_CLOSE.
153+
*
154+
* @param server
155+
* @throws Exception
156+
*/
157+
@Benchmark
158+
public void burstRead(final BenchmarkState server) throws Exception {
159+
int totalQueries = server.maxSessions * 8;
160+
int parallelThreads = server.maxSessions * 2;
161+
final DatabaseClient client =
162+
server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
163+
SessionPool pool = ((DatabaseClientImpl) client).pool;
164+
assertThat(pool.totalSessions()).isEqualTo(server.minSessions);
165+
166+
ListeningScheduledExecutorService service =
167+
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads));
168+
List<ListenableFuture<?>> futures = new ArrayList<>(totalQueries);
169+
for (int i = 0; i < totalQueries; i++) {
170+
futures.add(
171+
service.submit(
172+
() -> {
173+
Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS));
174+
try (ResultSet rs =
175+
client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) {
176+
while (rs.next()) {
177+
// introduce random sleep times to have long-running sessions
178+
randomWait(server);
179+
}
180+
return null;
181+
}
182+
}));
183+
}
184+
// explicitly run the maintenance cycle to clean up any dangling long-running sessions.
185+
pool.poolMaintainer.maintainPool();
186+
187+
Futures.allAsList(futures).get();
188+
service.shutdown();
189+
assertNumLeakedSessionsRemoved(server, pool);
190+
}
191+
192+
/**
193+
* Measures the time needed to execute a burst of write requests (PDML).
194+
*
195+
* <p>Some write requests will be long-running. The test asserts that no sessions are removed by
196+
* the session maintenance background task with SessionPool Option ActionOnInactiveTransaction set
197+
* as WARN_AND_CLOSE. This is because PDML writes are expected to be long-running.
198+
*
199+
* @param server
200+
* @throws Exception
201+
*/
202+
@Benchmark
203+
public void burstWrite(final BenchmarkState server) throws Exception {
204+
int totalWrites = server.maxSessions * 8;
205+
int parallelThreads = server.maxSessions * 2;
206+
final DatabaseClient client =
207+
server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
208+
SessionPool pool = ((DatabaseClientImpl) client).pool;
209+
assertThat(pool.totalSessions()).isEqualTo(server.minSessions);
210+
211+
ListeningScheduledExecutorService service =
212+
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads));
213+
List<ListenableFuture<?>> futures = new ArrayList<>(totalWrites);
214+
for (int i = 0; i < totalWrites; i++) {
215+
futures.add(
216+
service.submit(
217+
() -> {
218+
// introduce random sleep times so that some sessions are long-running sessions
219+
randomWaitForMockServer(server);
220+
client.executePartitionedUpdate(StandardBenchmarkMockServer.UPDATE_STATEMENT);
221+
}));
222+
}
223+
// explicitly run the maintenance cycle to clean up any dangling long-running sessions.
224+
pool.poolMaintainer.maintainPool();
225+
226+
Futures.allAsList(futures).get();
227+
service.shutdown();
228+
assertThat(pool.numLeakedSessionsRemoved())
229+
.isEqualTo(0); // no sessions should be cleaned up in case of partitioned updates.
230+
}
231+
232+
/**
233+
* Measures the time needed to execute a burst of read and write requests.
234+
*
235+
* <p>Some read requests will be long-running and will cause session leaks. Such sessions will be
236+
* removed by the session maintenance background task if SessionPool Option
237+
* ActionOnInactiveTransaction is set as WARN_AND_CLOSE.
238+
*
239+
* <p>Some write requests will be long-running. The test asserts that no sessions are removed by
240+
* the session maintenance background task with SessionPool Option ActionOnInactiveTransaction set
241+
* as WARN_AND_CLOSE. This is because PDML writes are expected to be long-running.
242+
*
243+
* @param server
244+
* @throws Exception
245+
*/
246+
@Benchmark
247+
public void burstReadAndWrite(final BenchmarkState server) throws Exception {
248+
int totalWrites = server.maxSessions * 4;
249+
int totalReads = server.maxSessions * 4;
250+
int parallelThreads = server.maxSessions * 2;
251+
final DatabaseClient client =
252+
server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
253+
SessionPool pool = ((DatabaseClientImpl) client).pool;
254+
assertThat(pool.totalSessions()).isEqualTo(server.minSessions);
255+
256+
ListeningScheduledExecutorService service =
257+
MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads));
258+
List<ListenableFuture<?>> futures = new ArrayList<>(totalReads + totalWrites);
259+
for (int i = 0; i < totalWrites; i++) {
260+
futures.add(
261+
service.submit(
262+
() -> {
263+
// introduce random sleep times so that some sessions are long-running sessions
264+
randomWaitForMockServer(server);
265+
client.executePartitionedUpdate(StandardBenchmarkMockServer.UPDATE_STATEMENT);
266+
}));
267+
}
268+
for (int i = 0; i < totalReads; i++) {
269+
futures.add(
270+
service.submit(
271+
() -> {
272+
Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS));
273+
try (ResultSet rs =
274+
client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) {
275+
while (rs.next()) {
276+
// introduce random sleep times to have long-running sessions
277+
randomWait(server);
278+
}
279+
return null;
280+
}
281+
}));
282+
}
283+
// explicitly run the maintenance cycle to clean up any dangling long-running sessions.
284+
pool.poolMaintainer.maintainPool();
285+
286+
Futures.allAsList(futures).get();
287+
service.shutdown();
288+
assertNumLeakedSessionsRemoved(server, pool);
289+
}
290+
291+
private void randomWait(final BenchmarkState server) throws InterruptedException {
292+
if (RND.nextBoolean()) {
293+
server.longRunningSessions.incrementAndGet();
294+
Thread.sleep(LONG_HOLD_SESSION_TIME);
295+
} else {
296+
Thread.sleep(HOLD_SESSION_TIME);
297+
}
298+
}
299+
300+
private void randomWaitForMockServer(final BenchmarkState server) {
301+
if (RND.nextBoolean()) {
302+
server.longRunningSessions.incrementAndGet();
303+
server
304+
.mockServer
305+
.getMockSpanner()
306+
.setExecuteStreamingSqlExecutionTime(
307+
SimulatedExecutionTime.ofMinimumAndRandomTime(LONG_HOLD_SESSION_TIME, 0));
308+
} else {
309+
server
310+
.mockServer
311+
.getMockSpanner()
312+
.setExecuteStreamingSqlExecutionTime(
313+
SimulatedExecutionTime.ofMinimumAndRandomTime(HOLD_SESSION_TIME, 0));
314+
}
315+
}
316+
317+
private void assertNumLeakedSessionsRemoved(final BenchmarkState server, final SessionPool pool) {
318+
final SessionPoolOptions sessionPoolOptions =
319+
server.spanner.getOptions().getSessionPoolOptions();
320+
assertThat(server.longRunningSessions.get()).isNotEqualTo(0);
321+
if (sessionPoolOptions.warnAndCloseInactiveTransactions()
322+
|| sessionPoolOptions.closeInactiveTransactions()) {
323+
assertThat(pool.numLeakedSessionsRemoved()).isGreaterThan(0);
324+
} else if (sessionPoolOptions.warnInactiveTransactions()) {
325+
assertThat(pool.numLeakedSessionsRemoved()).isEqualTo(0);
326+
}
327+
}
328+
}

0 commit comments

Comments
 (0)