Skip to content

Commit 428e294

Browse files
arpan14olavloitegcf-owl-bot[bot]
authored
feat: add session pool option for modelling a timeout around session acquisition. (#2641)
* feat: add session pool option for modelling a timeout around session acquisition. * docs: modify existing documentation. * Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java Co-authored-by: Knut Olav Løite <[email protected]> * Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java Co-authored-by: Knut Olav Løite <[email protected]> * chore: address review comments. * fix: lint errors. * fix: broken unit test. * chore: add more unit test. * fix: review comments. * fix: NPE in unit test. * Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java Co-authored-by: Knut Olav Løite <[email protected]> * chore: fix review comments. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Knut Olav Løite <[email protected]> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent f03ce56 commit 428e294

File tree

5 files changed

+349
-16
lines changed

5 files changed

+349
-16
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

+20-15
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import com.google.common.base.Supplier;
6464
import com.google.common.collect.ImmutableList;
6565
import com.google.common.collect.ImmutableMap;
66-
import com.google.common.collect.ImmutableSet;
6766
import com.google.common.util.concurrent.ForwardingListenableFuture;
6867
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
6968
import com.google.common.util.concurrent.ListenableFuture;
@@ -120,17 +119,6 @@ class SessionPool {
120119
private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
121120
private static final Tracer tracer = Tracing.getTracer();
122121
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
123-
static final ImmutableSet<ErrorCode> SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES =
124-
ImmutableSet.of(
125-
ErrorCode.UNKNOWN,
126-
ErrorCode.INVALID_ARGUMENT,
127-
ErrorCode.PERMISSION_DENIED,
128-
ErrorCode.UNAUTHENTICATED,
129-
ErrorCode.RESOURCE_EXHAUSTED,
130-
ErrorCode.FAILED_PRECONDITION,
131-
ErrorCode.OUT_OF_RANGE,
132-
ErrorCode.UNIMPLEMENTED,
133-
ErrorCode.INTERNAL);
134122

135123
/**
136124
* If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits
@@ -1675,7 +1663,8 @@ public PooledSession get() {
16751663
while (true) {
16761664
Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan();
16771665
try (Scope waitScope = tracer.withSpan(span)) {
1678-
PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout);
1666+
PooledSession s =
1667+
pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout());
16791668
if (s == null) {
16801669
// Set the status to DEADLINE_EXCEEDED and retry.
16811670
numWaiterTimeouts.incrementAndGet();
@@ -1685,6 +1674,11 @@ public PooledSession get() {
16851674
return s;
16861675
}
16871676
} catch (Exception e) {
1677+
if (e instanceof SpannerException
1678+
&& ErrorCode.RESOURCE_EXHAUSTED.equals(((SpannerException) e).getErrorCode())) {
1679+
numWaiterTimeouts.incrementAndGet();
1680+
tracer.getCurrentSpan().setStatus(Status.RESOURCE_EXHAUSTED);
1681+
}
16881682
TraceUtil.setWithFailure(span, e);
16891683
throw e;
16901684
} finally {
@@ -1693,15 +1687,26 @@ public PooledSession get() {
16931687
}
16941688
}
16951689

1696-
private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) {
1690+
private PooledSession pollUninterruptiblyWithTimeout(
1691+
long timeoutMillis, Duration acquireSessionTimeout) {
16971692
boolean interrupted = false;
16981693
try {
16991694
while (true) {
17001695
try {
1701-
return waiter.get(timeoutMillis, TimeUnit.MILLISECONDS);
1696+
return acquireSessionTimeout == null
1697+
? waiter.get(timeoutMillis, TimeUnit.MILLISECONDS)
1698+
: waiter.get(acquireSessionTimeout.toMillis(), TimeUnit.MILLISECONDS);
17021699
} catch (InterruptedException e) {
17031700
interrupted = true;
17041701
} catch (TimeoutException e) {
1702+
if (acquireSessionTimeout != null) {
1703+
throw SpannerExceptionFactory.newSpannerException(
1704+
ErrorCode.RESOURCE_EXHAUSTED,
1705+
"Timed out after waiting "
1706+
+ acquireSessionTimeout.toMillis()
1707+
+ "ms for acquiring session. To mitigate error SessionPoolOptions#setAcquireSessionTimeout(Duration) to set a higher timeout"
1708+
+ " or increase the number of sessions in the session pool.");
1709+
}
17051710
return null;
17061711
} catch (ExecutionException e) {
17071712
throw SpannerExceptionFactory.newSpannerException(e.getCause());

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

+41-1
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,16 @@ public class SessionPoolOptions {
5252
private final ActionOnSessionLeak actionOnSessionLeak;
5353
private final boolean trackStackTraceOfSessionCheckout;
5454
private final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions;
55-
private final long initialWaitForSessionTimeoutMillis;
55+
56+
/**
57+
* Use {@link #acquireSessionTimeout} instead to specify the total duration to wait while
58+
* acquiring session for a transaction.
59+
*/
60+
@Deprecated private final long initialWaitForSessionTimeoutMillis;
61+
5662
private final boolean autoDetectDialect;
5763
private final Duration waitForMinSessions;
64+
private final Duration acquireSessionTimeout;
5865

5966
/** Property for allowing mocking of session maintenance clock. */
6067
private final Clock poolMaintainerClock;
@@ -78,6 +85,7 @@ private SessionPoolOptions(Builder builder) {
7885
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
7986
this.autoDetectDialect = builder.autoDetectDialect;
8087
this.waitForMinSessions = builder.waitForMinSessions;
88+
this.acquireSessionTimeout = builder.acquireSessionTimeout;
8189
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
8290
this.poolMaintainerClock = builder.poolMaintainerClock;
8391
}
@@ -105,6 +113,7 @@ public boolean equals(Object o) {
105113
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
106114
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect)
107115
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions)
116+
&& Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout)
108117
&& Objects.equals(
109118
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
110119
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
@@ -128,6 +137,7 @@ public int hashCode() {
128137
this.removeInactiveSessionAfter,
129138
this.autoDetectDialect,
130139
this.waitForMinSessions,
140+
this.acquireSessionTimeout,
131141
this.inactiveTransactionRemovalOptions,
132142
this.poolMaintainerClock);
133143
}
@@ -239,6 +249,11 @@ Duration getWaitForMinSessions() {
239249
return waitForMinSessions;
240250
}
241251

252+
@VisibleForTesting
253+
Duration getAcquireSessionTimeout() {
254+
return acquireSessionTimeout;
255+
}
256+
242257
public static Builder newBuilder() {
243258
return new Builder();
244259
}
@@ -424,6 +439,7 @@ public static class Builder {
424439
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
425440
private boolean autoDetectDialect = false;
426441
private Duration waitForMinSessions = Duration.ZERO;
442+
private Duration acquireSessionTimeout = Duration.ofSeconds(60);
427443

428444
private Clock poolMaintainerClock;
429445

@@ -446,6 +462,7 @@ private Builder(SessionPoolOptions options) {
446462
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
447463
this.autoDetectDialect = options.autoDetectDialect;
448464
this.waitForMinSessions = options.waitForMinSessions;
465+
this.acquireSessionTimeout = options.acquireSessionTimeout;
449466
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
450467
this.poolMaintainerClock = options.poolMaintainerClock;
451468
}
@@ -538,6 +555,10 @@ public Builder setFailIfPoolExhausted() {
538555
/**
539556
* If all sessions are in use and there is no more room for creating new sessions, block for a
540557
* session to become available. Default behavior is same.
558+
*
559+
* <p>By default the requests are blocked for 60s and will fail with a `SpannerException` with
560+
* error code `ResourceExhausted` if this timeout is exceeded. If you wish to block for a
561+
* different period use the option {@link Builder#setAcquireSessionTimeout(Duration)} ()}
541562
*/
542563
public Builder setBlockIfPoolExhausted() {
543564
this.actionOnExhaustion = ActionOnExhaustion.BLOCK;
@@ -695,6 +716,25 @@ public Builder setWaitForMinSessions(Duration waitForMinSessions) {
695716
return this;
696717
}
697718

719+
/**
720+
* If greater than zero, we wait for said duration when no sessions are available in the {@link
721+
* SessionPool}. The default is a 60s timeout. Set the value to null to disable the timeout.
722+
*/
723+
public Builder setAcquireSessionTimeout(Duration acquireSessionTimeout) {
724+
try {
725+
if (acquireSessionTimeout != null) {
726+
Preconditions.checkArgument(
727+
acquireSessionTimeout.toMillis() > 0,
728+
"acquireSessionTimeout should be greater than 0 ns");
729+
}
730+
} catch (ArithmeticException ex) {
731+
throw new IllegalArgumentException(
732+
"acquireSessionTimeout in millis should be lesser than Long.MAX_VALUE");
733+
}
734+
this.acquireSessionTimeout = acquireSessionTimeout;
735+
return this;
736+
}
737+
698738
/** Build a SessionPoolOption object */
699739
public SessionPoolOptions build() {
700740
validate();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.spanner;
17+
18+
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_RESULTSET;
19+
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT;
20+
import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertThrows;
23+
24+
import com.google.api.gax.grpc.testing.LocalChannelProvider;
25+
import com.google.cloud.NoCredentials;
26+
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
27+
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
28+
import com.google.common.util.concurrent.Futures;
29+
import com.google.common.util.concurrent.ListenableFuture;
30+
import com.google.common.util.concurrent.ListeningExecutorService;
31+
import com.google.common.util.concurrent.MoreExecutors;
32+
import io.grpc.Server;
33+
import io.grpc.inprocess.InProcessServerBuilder;
34+
import java.io.IOException;
35+
import java.util.ArrayList;
36+
import java.util.List;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.ScheduledThreadPoolExecutor;
39+
import java.util.concurrent.atomic.AtomicInteger;
40+
import org.junit.After;
41+
import org.junit.AfterClass;
42+
import org.junit.Before;
43+
import org.junit.BeforeClass;
44+
import org.junit.Test;
45+
import org.junit.experimental.categories.Category;
46+
import org.junit.runner.RunWith;
47+
import org.junit.runners.JUnit4;
48+
import org.threeten.bp.Duration;
49+
50+
@Category(SlowTest.class)
51+
@RunWith(JUnit4.class)
52+
public class BatchCreateSessionsSlowTest {
53+
private static final String TEST_PROJECT = "my-project";
54+
private static final String TEST_DATABASE_ROLE = "my-role";
55+
private static MockSpannerServiceImpl mockSpanner;
56+
private static Server server;
57+
private static LocalChannelProvider channelProvider;
58+
private Spanner spanner;
59+
60+
@BeforeClass
61+
public static void startStaticServer() throws IOException {
62+
mockSpanner = new MockSpannerServiceImpl();
63+
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
64+
mockSpanner.putStatementResult(
65+
StatementResult.query(SELECT1, MockSpannerTestUtil.SELECT1_RESULTSET));
66+
mockSpanner.putStatementResult(
67+
StatementResult.query(READ_ONE_KEY_VALUE_STATEMENT, READ_ONE_KEY_VALUE_RESULTSET));
68+
69+
String uniqueName = InProcessServerBuilder.generateName();
70+
server =
71+
InProcessServerBuilder.forName(uniqueName)
72+
// We need to use a real executor for timeouts to occur.
73+
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
74+
.addService(mockSpanner)
75+
.build()
76+
.start();
77+
channelProvider = LocalChannelProvider.create(uniqueName);
78+
}
79+
80+
@AfterClass
81+
public static void stopServer() throws InterruptedException {
82+
server.shutdown();
83+
server.awaitTermination();
84+
}
85+
86+
@Before
87+
public void setUp() {
88+
spanner =
89+
SpannerOptions.newBuilder()
90+
.setProjectId(TEST_PROJECT)
91+
.setDatabaseRole(TEST_DATABASE_ROLE)
92+
.setChannelProvider(channelProvider)
93+
.setCredentials(NoCredentials.getInstance())
94+
.setSessionPoolOption(SessionPoolOptions.newBuilder().setFailOnSessionLeak().build())
95+
.build()
96+
.getService();
97+
}
98+
99+
@After
100+
public void tearDown() {
101+
mockSpanner.unfreeze();
102+
spanner.close();
103+
mockSpanner.reset();
104+
mockSpanner.removeAllExecutionTimes();
105+
}
106+
107+
@Test
108+
public void testBatchCreateSessionsTimesOut_whenDeadlineExceeded() throws Exception {
109+
// Simulate a minimum execution time of 1000 milliseconds for the BatchCreateSessions RPC.
110+
mockSpanner.setBatchCreateSessionsExecutionTime(
111+
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
112+
SpannerOptions.Builder builder =
113+
SpannerOptions.newBuilder()
114+
.setProjectId("my-project")
115+
.setChannelProvider(channelProvider)
116+
.setCredentials(NoCredentials.getInstance());
117+
// Set the timeout and retry settings for BatchCreateSessions to a simple
118+
// single-attempt-and-timeout after 100ms.
119+
builder
120+
.getSpannerStubSettingsBuilder()
121+
.batchCreateSessionsSettings()
122+
.setSimpleTimeoutNoRetries(Duration.ofMillis(100));
123+
124+
try (Spanner spanner = builder.build().getService()) {
125+
DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database");
126+
DatabaseClient client = spanner.getDatabaseClient(databaseId);
127+
128+
ListeningExecutorService service =
129+
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000));
130+
List<ListenableFuture<Void>> futures = new ArrayList<>(5000);
131+
AtomicInteger counter = new AtomicInteger();
132+
for (int i = 0; i < 5000; i++) {
133+
final int index = i;
134+
futures.add(
135+
service.submit(
136+
() -> {
137+
// The following call is non-blocking and will not generate an exception.
138+
ResultSet rs = client.singleUse().executeQuery(SELECT1);
139+
// Actually trying to get any results will cause an exception.
140+
// The DEADLINE_EXCEEDED error of the BatchCreateSessions RPC is in this case
141+
// propagated to
142+
// the application.
143+
SpannerException e = assertThrows(SpannerException.class, rs::next);
144+
assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode());
145+
System.out.printf("finished test %d\n", counter.incrementAndGet());
146+
147+
return null;
148+
}));
149+
}
150+
service.shutdown();
151+
assertEquals(5000, Futures.allAsList(futures).get().size());
152+
}
153+
}
154+
155+
@Test
156+
public void testBatchCreateSessionsTimesOut_whenResourceExhausted() throws Exception {
157+
// Simulate a minimum execution time of 2000 milliseconds for the BatchCreateSessions RPC.
158+
mockSpanner.setBatchCreateSessionsExecutionTime(
159+
SimulatedExecutionTime.ofMinimumAndRandomTime(2000, 0));
160+
// Add a timeout for the max amount of time (60ms) that a request waits when a session is
161+
// unavailable.
162+
SessionPoolOptions sessionPoolOptions =
163+
SessionPoolOptions.newBuilder().setAcquireSessionTimeout(Duration.ofMillis(60)).build();
164+
SpannerOptions.Builder builder =
165+
SpannerOptions.newBuilder()
166+
.setProjectId("my-project")
167+
.setChannelProvider(channelProvider)
168+
.setCredentials(NoCredentials.getInstance())
169+
.setSessionPoolOption(sessionPoolOptions);
170+
// Set the timeout and retry settings for BatchCreateSessions to a simple
171+
// single-attempt-and-timeout after 1000ms. This will ensure that session acquisition timeout of
172+
// 60ms will kick for all requests before the overall request RPC timeout is breached.
173+
builder
174+
.getSpannerStubSettingsBuilder()
175+
.batchCreateSessionsSettings()
176+
.setSimpleTimeoutNoRetries(Duration.ofMillis(1000));
177+
178+
try (Spanner spanner = builder.build().getService()) {
179+
DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database");
180+
DatabaseClient client = spanner.getDatabaseClient(databaseId);
181+
182+
ListeningExecutorService service =
183+
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000));
184+
List<ListenableFuture<Void>> futures = new ArrayList<>(5000);
185+
AtomicInteger counter = new AtomicInteger();
186+
for (int i = 0; i < 5000; i++) {
187+
final int index = i;
188+
futures.add(
189+
service.submit(
190+
() -> {
191+
// The following call is non-blocking and will not generate an exception.
192+
ResultSet rs = client.singleUse().executeQuery(SELECT1);
193+
// Actually trying to get any results will cause an exception.
194+
// When number of requests > MAX_SESSIONS, post setAcquireSessionTimeout
195+
// a few requests will timeout with RESOURCE_EXHAUSTED error.
196+
SpannerException e = assertThrows(SpannerException.class, rs::next);
197+
assertEquals(ErrorCode.RESOURCE_EXHAUSTED, e.getErrorCode());
198+
System.out.printf("finished test %d\n", counter.incrementAndGet());
199+
200+
return null;
201+
}));
202+
}
203+
service.shutdown();
204+
assertEquals(5000, Futures.allAsList(futures).get().size());
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)