Skip to content

Commit 015bd98

Browse files
authored
chore(spanner): handle commit retry protocol extension for mux rw (#3449)
In a read-write transaction using a multiplexed session with (read/query + mutation) operations, the CommitResponse from the backend during the commit RPC may include a `MultiplexedSessionRetry` field (indicated by a precommit token). This field signals that the commit RPC should be retried once using the new precommit token. During this retry, mutations should not be resent, as they were already buffered in spanFE during the initial commit RPC call.
1 parent 28e67f9 commit 015bd98

File tree

3 files changed

+128
-11
lines changed

3 files changed

+128
-11
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,9 @@ ApiFuture<CommitResponse> commitAsync() {
409409
}
410410
builder.addAllMutations(mutationsProto);
411411
finishOps.addListener(
412-
new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
412+
new CommitRunnable(
413+
res, finishOps, builder, /* retryAttemptDueToCommitProtocolExtension = */ false),
414+
MoreExecutors.directExecutor());
413415
return res;
414416
}
415417

@@ -418,14 +420,17 @@ private final class CommitRunnable implements Runnable {
418420
private final SettableApiFuture<CommitResponse> res;
419421
private final ApiFuture<Void> prev;
420422
private final CommitRequest.Builder requestBuilder;
423+
private final boolean retryAttemptDueToCommitProtocolExtension;
421424

422425
CommitRunnable(
423426
SettableApiFuture<CommitResponse> res,
424427
ApiFuture<Void> prev,
425-
CommitRequest.Builder requestBuilder) {
428+
CommitRequest.Builder requestBuilder,
429+
boolean retryAttemptDueToCommitProtocolExtension) {
426430
this.res = res;
427431
this.prev = prev;
428432
this.requestBuilder = requestBuilder;
433+
this.retryAttemptDueToCommitProtocolExtension = retryAttemptDueToCommitProtocolExtension;
429434
}
430435

431436
@Override
@@ -459,6 +464,13 @@ public void run() {
459464
// Set the precommit token in the CommitRequest for multiplexed sessions.
460465
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
461466
}
467+
if (retryAttemptDueToCommitProtocolExtension) {
468+
// When a retry occurs due to the commit protocol extension, clear all mutations because
469+
// they were already buffered in SpanFE during the previous attempt.
470+
requestBuilder.clearMutations();
471+
span.addAnnotation(
472+
"Retrying commit operation with a new precommit token obtained from the previous CommitResponse");
473+
}
462474
final CommitRequest commitRequest = requestBuilder.build();
463475
span.addAnnotation("Starting Commit");
464476
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
@@ -479,6 +491,29 @@ public void run() {
479491
return;
480492
}
481493
com.google.spanner.v1.CommitResponse proto = commitFuture.get();
494+
495+
// If the CommitResponse includes a precommit token, the client will retry the
496+
// commit RPC once with the new token and clear any existing mutations.
497+
// This case is applicable only when the read-write transaction uses multiplexed
498+
// session.
499+
if (proto.hasPrecommitToken() && !retryAttemptDueToCommitProtocolExtension) {
500+
// track the latest pre commit token
501+
onPrecommitToken(proto.getPrecommitToken());
502+
span.addAnnotation(
503+
"Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field");
504+
opSpan.end();
505+
506+
// Retry the commit RPC with the latest precommit token from CommitResponse.
507+
new CommitRunnable(
508+
res,
509+
prev,
510+
requestBuilder,
511+
/* retryAttemptDueToCommitProtocolExtension = */ true)
512+
.run();
513+
514+
// Exit to prevent further processing in this attempt.
515+
return;
516+
}
482517
if (!proto.hasCommitTimestamp()) {
483518
throw newSpannerException(
484519
ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName());

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ private static void checkStreamException(
603603
private ConcurrentMap<ByteString, Boolean> isPartitionedDmlTransaction =
604604
new ConcurrentHashMap<>();
605605
private ConcurrentMap<ByteString, Boolean> abortedTransactions = new ConcurrentHashMap<>();
606+
private ConcurrentMap<ByteString, Boolean> commitRetryTransactions = new ConcurrentHashMap<>();
606607
private final AtomicBoolean abortNextTransaction = new AtomicBoolean();
607608
private final AtomicBoolean abortNextStatement = new AtomicBoolean();
608609
private final AtomicBoolean ignoreNextInlineBeginRequest = new AtomicBoolean();
@@ -2045,15 +2046,23 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
20452046
return;
20462047
}
20472048
simulateAbort(session, request.getTransactionId());
2048-
commitTransaction(transaction.getId());
2049-
CommitResponse.Builder responseBuilder =
2050-
CommitResponse.newBuilder().setCommitTimestamp(getCurrentGoogleTimestamp());
2051-
if (request.getReturnCommitStats()) {
2052-
responseBuilder.setCommitStats(
2053-
com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
2054-
// This is not really always equal, but at least it returns a value.
2055-
.setMutationCount(request.getMutationsCount())
2056-
.build());
2049+
CommitResponse.Builder responseBuilder = CommitResponse.newBuilder();
2050+
Optional<Boolean> commitRetry =
2051+
Optional.fromNullable(commitRetryTransactions.get(request.getTransactionId()));
2052+
if (commitRetry.or(Boolean.FALSE) && session.getMultiplexed()) {
2053+
responseBuilder.setPrecommitToken(
2054+
getCommitResponsePrecommitToken(request.getTransactionId()));
2055+
commitRetryTransactions.remove(request.getTransactionId());
2056+
} else {
2057+
commitTransaction(transaction.getId());
2058+
responseBuilder.setCommitTimestamp(getCurrentGoogleTimestamp());
2059+
if (request.getReturnCommitStats()) {
2060+
responseBuilder.setCommitStats(
2061+
com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
2062+
// This is not really always equal, but at least it returns a value.
2063+
.setMutationCount(request.getMutationsCount())
2064+
.build());
2065+
}
20572066
}
20582067
responseObserver.onNext(responseBuilder.build());
20592068
responseObserver.onCompleted();
@@ -2134,6 +2143,14 @@ void markAbortedTransaction(ByteString transactionId) {
21342143
transactionSequenceNo.remove(transactionId);
21352144
}
21362145

2146+
public void markCommitRetryOnTransaction(ByteString transactionId) {
2147+
Transaction transaction = transactions.get(transactionId);
2148+
if (transaction == null || !isReadWriteTransaction(transactionId)) {
2149+
return;
2150+
}
2151+
commitRetryTransactions.putIfAbsent(transactionId, Boolean.TRUE);
2152+
}
2153+
21372154
@Override
21382155
public void partitionQuery(
21392156
PartitionQueryRequest request, StreamObserver<PartitionResponse> responseObserver) {
@@ -2527,6 +2544,11 @@ static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken
25272544
return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId);
25282545
}
25292546

2547+
static MultiplexedSessionPrecommitToken getCommitResponsePrecommitToken(
2548+
ByteString transactionId) {
2549+
return getPrecommitToken("CommitResponsePrecommitToken", transactionId);
2550+
}
2551+
25302552
static MultiplexedSessionPrecommitToken getPrecommitToken(
25312553
String value, ByteString transactionId) {
25322554
transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0));

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,6 +1575,66 @@ public void testOtherUnimplementedError_ReadWriteTransactionStillUsesMultiplexed
15751575
assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
15761576
}
15771577

1578+
@Test
1579+
public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() {
1580+
// This test simulates the commit retry protocol extension which occurs when a read-write
1581+
// transaction contains read/query + mutation operations.
1582+
DatabaseClientImpl client =
1583+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1584+
1585+
client
1586+
.readWriteTransaction()
1587+
.run(
1588+
transaction -> {
1589+
try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
1590+
//noinspection StatementWithEmptyBody
1591+
while (resultSet.next()) {
1592+
// ignore
1593+
}
1594+
}
1595+
1596+
Mutation mutation =
1597+
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
1598+
transaction.buffer(mutation);
1599+
1600+
TransactionContextImpl impl = (TransactionContextImpl) transaction;
1601+
// Force the Commit RPC to return a CommitResponse with MultiplexedSessionRetry field
1602+
// set.
1603+
// This scenario is only possible when a read-write transaction contains read/query +
1604+
// mutation operations.
1605+
mockSpanner.markCommitRetryOnTransaction(impl.transactionId);
1606+
return null;
1607+
});
1608+
1609+
List<ExecuteSqlRequest> executeSqlRequests =
1610+
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
1611+
assertEquals(1, executeSqlRequests.size());
1612+
// Verify the request is executed using multiplexed sessions
1613+
assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed());
1614+
1615+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
1616+
assertEquals(2, commitRequests.size());
1617+
assertNotNull(commitRequests.get(0).getPrecommitToken());
1618+
assertEquals(
1619+
ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
1620+
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
1621+
// Verify that the first request has mutations set
1622+
assertTrue(commitRequests.get(0).getMutationsCount() > 0);
1623+
1624+
// Second CommitRequest should contain the latest precommit token received via the
1625+
// CommitResponse in previous attempt.
1626+
assertNotNull(commitRequests.get(1).getPrecommitToken());
1627+
assertEquals(
1628+
ByteString.copyFromUtf8("CommitResponsePrecommitToken"),
1629+
commitRequests.get(1).getPrecommitToken().getPrecommitToken());
1630+
// Verify that the commit retry request does not have any mutations set
1631+
assertEquals(0, commitRequests.get(1).getMutationsCount());
1632+
1633+
assertNotNull(client.multiplexedSessionDatabaseClient);
1634+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
1635+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
1636+
}
1637+
15781638
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
15791639
assertNotNull(client.multiplexedSessionDatabaseClient);
15801640
SessionReference sessionReference =

0 commit comments

Comments
 (0)