17
17
package com .google .cloud .spanner ;
18
18
19
19
import static com .google .cloud .spanner .SessionImpl .NO_CHANNEL_HINT ;
20
+ import static com .google .cloud .spanner .SpannerExceptionFactory .newSpannerException ;
20
21
21
22
import com .google .api .core .ApiFuture ;
22
23
import com .google .api .core .ApiFutures ;
27
28
import com .google .cloud .spanner .SpannerException .ResourceNotFoundException ;
28
29
import com .google .common .annotations .VisibleForTesting ;
29
30
import com .google .common .base .Preconditions ;
31
+ import com .google .common .util .concurrent .MoreExecutors ;
32
+ import com .google .spanner .v1 .BeginTransactionRequest ;
33
+ import com .google .spanner .v1 .RequestOptions ;
34
+ import com .google .spanner .v1 .Transaction ;
30
35
import java .time .Clock ;
31
36
import java .time .Duration ;
32
37
import java .time .Instant ;
@@ -92,6 +97,10 @@ void onError(SpannerException spannerException) {
92
97
// synchronizing, as it does not really matter exactly which error is set.
93
98
this .client .resourceNotFoundException .set ((ResourceNotFoundException ) spannerException );
94
99
}
100
+ // Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions if
101
+ // UNIMPLEMENTED with error message "Transaction type read_write not supported with
102
+ // multiplexed sessions" is returned.
103
+ this .client .maybeMarkUnimplementedForRW (spannerException );
95
104
}
96
105
97
106
@ Override
@@ -164,6 +173,12 @@ public void close() {
164
173
/** The current multiplexed session that is used by this client. */
165
174
private final AtomicReference <ApiFuture <SessionReference >> multiplexedSessionReference ;
166
175
176
+ /**
177
+ * The Transaction response returned by the BeginTransaction request with read-write when a
178
+ * multiplexed session is created during client initialization.
179
+ */
180
+ private final SettableApiFuture <Transaction > readWriteBeginTransactionReferenceFuture ;
181
+
167
182
/** The expiration date/time of the current multiplexed session. */
168
183
private final AtomicReference <Instant > expirationDate ;
169
184
@@ -190,6 +205,12 @@ public void close() {
190
205
*/
191
206
private final AtomicBoolean unimplemented = new AtomicBoolean (false );
192
207
208
+ /**
209
+ * This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
210
+ * executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
211
+ */
212
+ @ VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean (false );
213
+
193
214
MultiplexedSessionDatabaseClient (SessionClient sessionClient ) {
194
215
this (sessionClient , Clock .systemUTC ());
195
216
}
@@ -217,6 +238,7 @@ public void close() {
217
238
this .tracer = sessionClient .getSpanner ().getTracer ();
218
239
final SettableApiFuture <SessionReference > initialSessionReferenceFuture =
219
240
SettableApiFuture .create ();
241
+ this .readWriteBeginTransactionReferenceFuture = SettableApiFuture .create ();
220
242
this .multiplexedSessionReference = new AtomicReference <>(initialSessionReferenceFuture );
221
243
this .sessionClient .asyncCreateMultiplexedSession (
222
244
new SessionConsumer () {
@@ -226,6 +248,16 @@ public void onSessionReady(SessionImpl session) {
226
248
// only start the maintainer if we actually managed to create a session in the first
227
249
// place.
228
250
maintainer .start ();
251
+
252
+ // initiate a begin transaction request to verify if read-write transactions are
253
+ // supported using multiplexed sessions.
254
+ if (sessionClient
255
+ .getSpanner ()
256
+ .getOptions ()
257
+ .getSessionPoolOptions ()
258
+ .getUseMultiplexedSessionForRW ()) {
259
+ verifyBeginTransactionWithRWOnMultiplexedSessionAsync (session .getName ());
260
+ }
229
261
}
230
262
231
263
@ Override
@@ -267,6 +299,70 @@ private void maybeMarkUnimplemented(Throwable t) {
267
299
}
268
300
}
269
301
302
+ private void maybeMarkUnimplementedForRW (SpannerException spannerException ) {
303
+ if (spannerException .getErrorCode () == ErrorCode .UNIMPLEMENTED
304
+ && verifyErrorMessage (
305
+ spannerException ,
306
+ "Transaction type read_write not supported with multiplexed sessions" )) {
307
+ unimplementedForRW .set (true );
308
+ }
309
+ }
310
+
311
+ private boolean verifyErrorMessage (SpannerException spannerException , String message ) {
312
+ if (spannerException .getCause () == null ) {
313
+ return false ;
314
+ }
315
+ if (spannerException .getCause ().getMessage () == null ) {
316
+ return false ;
317
+ }
318
+ return spannerException .getCause ().getMessage ().contains (message );
319
+ }
320
+
321
+ private void verifyBeginTransactionWithRWOnMultiplexedSessionAsync (String sessionName ) {
322
+ // TODO: Remove once this is guaranteed to be available.
323
+ // annotate the explict BeginTransactionRequest with a transaction tag
324
+ // "multiplexed-rw-background-begin-txn" to avoid storing this request on mock spanner.
325
+ // this is to safeguard other mock spanner tests whose BeginTransaction request count will
326
+ // otherwise increase by 1. Modifying the unit tests do not seem valid since this code is
327
+ // temporary and will be removed once the read-write on multiplexed session looks stable at
328
+ // backend.
329
+ BeginTransactionRequest .Builder requestBuilder =
330
+ BeginTransactionRequest .newBuilder ()
331
+ .setSession (sessionName )
332
+ .setOptions (
333
+ SessionImpl .createReadWriteTransactionOptions (
334
+ Options .fromTransactionOptions (), /* previousTransactionId = */ null ))
335
+ .setRequestOptions (
336
+ RequestOptions .newBuilder ()
337
+ .setTransactionTag ("multiplexed-rw-background-begin-txn" )
338
+ .build ());
339
+ final BeginTransactionRequest request = requestBuilder .build ();
340
+ final ApiFuture <Transaction > requestFuture ;
341
+ requestFuture =
342
+ sessionClient
343
+ .getSpanner ()
344
+ .getRpc ()
345
+ .beginTransactionAsync (request , /* options = */ null , /* routeToLeader = */ true );
346
+ requestFuture .addListener (
347
+ () -> {
348
+ try {
349
+ Transaction txn = requestFuture .get ();
350
+ if (txn .getId ().isEmpty ()) {
351
+ throw newSpannerException (
352
+ ErrorCode .INTERNAL , "Missing id in transaction\n " + sessionName );
353
+ }
354
+ readWriteBeginTransactionReferenceFuture .set (txn );
355
+ } catch (Exception e ) {
356
+ SpannerException spannerException = SpannerExceptionFactory .newSpannerException (e );
357
+ // Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions
358
+ // if UNIMPLEMENTED is returned.
359
+ maybeMarkUnimplementedForRW (spannerException );
360
+ readWriteBeginTransactionReferenceFuture .setException (e );
361
+ }
362
+ },
363
+ MoreExecutors .directExecutor ());
364
+ }
365
+
270
366
boolean isValid () {
271
367
return resourceNotFoundException .get () == null ;
272
368
}
@@ -283,6 +379,10 @@ boolean isMultiplexedSessionsSupported() {
283
379
return !this .unimplemented .get ();
284
380
}
285
381
382
+ boolean isMultiplexedSessionsForRWSupported () {
383
+ return !this .unimplementedForRW .get ();
384
+ }
385
+
286
386
void close () {
287
387
synchronized (this ) {
288
388
if (!this .isClosed ) {
@@ -308,6 +408,17 @@ SessionReference getCurrentSessionReference() {
308
408
}
309
409
}
310
410
411
+ @ VisibleForTesting
412
+ Transaction getReadWriteBeginTransactionReference () {
413
+ try {
414
+ return this .readWriteBeginTransactionReferenceFuture .get ();
415
+ } catch (ExecutionException executionException ) {
416
+ throw SpannerExceptionFactory .asSpannerException (executionException .getCause ());
417
+ } catch (InterruptedException interruptedException ) {
418
+ throw SpannerExceptionFactory .propagateInterrupt (interruptedException );
419
+ }
420
+ }
421
+
311
422
/**
312
423
* Returns true if the multiplexed session has been created. This client can be used before the
313
424
* session has been created, and will in that case use a delayed transaction that contains a
0 commit comments