Skip to content

Commit 8ea7bd1

Browse files
rajatbhattaarpan14
andauthored
feat: add support for BatchWriteAtLeastOnce (#2520)
* feat: add support for BatchWriteAtleastOnce * test: add batchwrite() support to MockSpannerServiceImpl * test: add commit timestamp to proto * test: add commit timestamp to proto * test: add commit timestamp to proto * consume the stream in tests * refactor tests * refactor tests * test if mutations are correctly applied * null check * skip for emulator * add method documentation * add method documentation * add method documentation * remove autogenerated code * remove autogenerated tests * batchWriteAtleastOnce -> batchWriteAtLeastOnce * batchWriteAtleastOnceWithOptions -> batchWriteAtLeastOnceWithOptions * changes based on updated batch write API * add copyright and doc * address review comments * address review comments * add more documentation --------- Co-authored-by: Arpan Mishra <[email protected]>
1 parent 4143bb9 commit 8ea7bd1

File tree

12 files changed

+586
-58
lines changed

12 files changed

+586
-58
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java

+52
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import com.google.api.gax.rpc.ServerStream;
1920
import com.google.cloud.Timestamp;
2021
import com.google.cloud.spanner.Options.RpcPriority;
2122
import com.google.cloud.spanner.Options.TransactionOption;
2223
import com.google.cloud.spanner.Options.UpdateOption;
24+
import com.google.spanner.v1.BatchWriteResponse;
2325

2426
/**
2527
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
@@ -191,6 +193,56 @@ CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption.
191193
CommitResponse writeAtLeastOnceWithOptions(
192194
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException;
193195

196+
/**
197+
* Applies batch of mutation groups in a collection of efficient transactions. The mutation groups
198+
* are applied non-atomically in an unspecified order and thus, they must be independent of each
199+
* other. Partial failure is possible, i.e., some mutation groups may have been applied
200+
* successfully, while some may have failed. The results of individual batches are streamed into
201+
* the response as and when the batches are applied.
202+
*
203+
* <p>One BatchWriteResponse can contain the results for multiple MutationGroups. Inspect the
204+
* indexes field to determine the MutationGroups that the BatchWriteResponse is for.
205+
*
206+
* <p>The mutation groups may be applied more than once. This can lead to failures if the mutation
207+
* groups are non-idempotent. For example, an insert that is replayed can return an {@link
208+
* ErrorCode#ALREADY_EXISTS} error. For this reason, users of the library may prefer to use {@link
209+
* #write(Iterable)} instead. However, {@code batchWriteAtLeastOnce()} method may be appropriate
210+
* for non-atomically committing multiple mutation groups in a single RPC with low latency.
211+
*
212+
* <p>Example of BatchWriteAtLeastOnce
213+
*
214+
* <pre>{@code
215+
* Iterable<MutationGroup> mutationGroups =
216+
* ImmutableList.of(
217+
* MutationGroup.of(
218+
* Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(),
219+
* Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()),
220+
* MutationGroup.of(
221+
* Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(),
222+
* Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build()),
223+
* MutationGroup.of(
224+
* Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build(),
225+
* Mutation.newInsertBuilder("FOO5").set("ID").to(5L).set("NAME").to("Bar5").build()),
226+
* MutationGroup.of(
227+
* Mutation.newInsertBuilder("FOO6").set("ID").to(6L).set("NAME").to("Bar6").build()));
228+
* ServerStream<BatchWriteResponse> responses =
229+
* dbClient.batchWriteAtLeastOnce(mutationGroups, Options.tag("batch-write-tag"));
230+
* for (BatchWriteResponse response : responses) {
231+
* // Do something when a response is received.
232+
* }
233+
* }</pre>
234+
*
235+
* Options for a transaction can include:
236+
*
237+
* <ul>
238+
* <li>{@link Options#priority(com.google.cloud.spanner.Options.RpcPriority)}: The {@link
239+
* RpcPriority} to use for the batch write request.
240+
* <li>{@link Options#tag(String)}: The transaction tag to use for the batch write request.
241+
* </ul>
242+
*/
243+
ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
244+
Iterable<MutationGroup> mutationGroups, TransactionOption... options) throws SpannerException;
245+
194246
/**
195247
* Returns a context in which a single read can be performed using {@link TimestampBound#strong()}
196248
* concurrency. This method will return a {@link ReadContext} that will not return the read

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

+17
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import com.google.api.gax.rpc.ServerStream;
1920
import com.google.cloud.Timestamp;
2021
import com.google.cloud.spanner.Options.TransactionOption;
2122
import com.google.cloud.spanner.Options.UpdateOption;
@@ -24,6 +25,7 @@
2425
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.base.Function;
2627
import com.google.common.util.concurrent.ListenableFuture;
28+
import com.google.spanner.v1.BatchWriteResponse;
2729
import io.opencensus.common.Scope;
2830
import io.opencensus.trace.Span;
2931
import io.opencensus.trace.Tracer;
@@ -106,6 +108,21 @@ public CommitResponse writeAtLeastOnceWithOptions(
106108
}
107109
}
108110

111+
@Override
112+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
113+
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
114+
throws SpannerException {
115+
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
116+
try (Scope s = tracer.withSpan(span)) {
117+
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
118+
} catch (RuntimeException e) {
119+
TraceUtil.setWithFailure(span, e);
120+
throw e;
121+
} finally {
122+
span.end(TraceUtil.END_SPAN_OPTIONS);
123+
}
124+
}
125+
109126
@Override
110127
public ReadContext singleUse() {
111128
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.common.base.Preconditions;
20+
import com.google.common.collect.ImmutableList;
21+
import com.google.spanner.v1.BatchWriteRequest;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
/** Represents a group of Cloud Spanner mutations to be committed together. */
26+
public class MutationGroup {
27+
private final ImmutableList<Mutation> mutations;
28+
29+
private MutationGroup(ImmutableList<Mutation> mutations) {
30+
this.mutations = mutations;
31+
}
32+
33+
/** Creates a {@code MutationGroup} given a vararg of mutations. */
34+
public static MutationGroup of(Mutation... mutations) {
35+
Preconditions.checkArgument(mutations.length > 0, "Should pass in at least one mutation.");
36+
return new MutationGroup(ImmutableList.copyOf(mutations));
37+
}
38+
39+
/** Creates a {@code MutationGroup} given an iterable of mutations. */
40+
public static MutationGroup of(Iterable<Mutation> mutations) {
41+
return new MutationGroup(ImmutableList.copyOf(mutations));
42+
}
43+
44+
/** Returns corresponding mutations for this MutationGroup. */
45+
public ImmutableList<Mutation> getMutations() {
46+
return mutations;
47+
}
48+
49+
static BatchWriteRequest.MutationGroup toProto(final MutationGroup mutationGroup) {
50+
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
51+
Mutation.toProto(mutationGroup.getMutations(), mutationsProto);
52+
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
53+
}
54+
55+
static List<BatchWriteRequest.MutationGroup> toListProto(
56+
final Iterable<MutationGroup> mutationGroups) {
57+
List<BatchWriteRequest.MutationGroup> mutationGroupsProto = new ArrayList<>();
58+
for (MutationGroup group : mutationGroups) {
59+
mutationGroupsProto.add(toProto(group));
60+
}
61+
return mutationGroupsProto;
62+
}
63+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

+45-10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.SettableApiFuture;
24+
import com.google.api.gax.rpc.ServerStream;
2425
import com.google.cloud.Timestamp;
2526
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
2627
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
@@ -35,6 +36,8 @@
3536
import com.google.common.util.concurrent.MoreExecutors;
3637
import com.google.protobuf.ByteString;
3738
import com.google.protobuf.Empty;
39+
import com.google.spanner.v1.BatchWriteRequest;
40+
import com.google.spanner.v1.BatchWriteResponse;
3841
import com.google.spanner.v1.BeginTransactionRequest;
3942
import com.google.spanner.v1.CommitRequest;
4043
import com.google.spanner.v1.RequestOptions;
@@ -160,7 +163,6 @@ public CommitResponse writeAtLeastOnceWithOptions(
160163
Iterable<Mutation> mutations, TransactionOption... transactionOptions)
161164
throws SpannerException {
162165
setActive(null);
163-
Options commitRequestOptions = Options.fromTransactionOptions(transactionOptions);
164166
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
165167
Mutation.toProto(mutations, mutationsProto);
166168
final CommitRequest.Builder requestBuilder =
@@ -172,15 +174,9 @@ public CommitResponse writeAtLeastOnceWithOptions(
172174
.setSingleUseTransaction(
173175
TransactionOptions.newBuilder()
174176
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
175-
if (commitRequestOptions.hasPriority() || commitRequestOptions.hasTag()) {
176-
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
177-
if (commitRequestOptions.hasPriority()) {
178-
requestOptionsBuilder.setPriority(commitRequestOptions.priority());
179-
}
180-
if (commitRequestOptions.hasTag()) {
181-
requestOptionsBuilder.setTransactionTag(commitRequestOptions.tag());
182-
}
183-
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
177+
RequestOptions commitRequestOptions = getRequestOptions(transactionOptions);
178+
if (commitRequestOptions != null) {
179+
requestBuilder.setRequestOptions(commitRequestOptions);
184180
}
185181
CommitRequest request = requestBuilder.build();
186182
Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan();
@@ -195,6 +191,45 @@ public CommitResponse writeAtLeastOnceWithOptions(
195191
}
196192
}
197193

194+
private RequestOptions getRequestOptions(TransactionOption... transactionOptions) {
195+
Options requestOptions = Options.fromTransactionOptions(transactionOptions);
196+
if (requestOptions.hasPriority() || requestOptions.hasTag()) {
197+
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
198+
if (requestOptions.hasPriority()) {
199+
requestOptionsBuilder.setPriority(requestOptions.priority());
200+
}
201+
if (requestOptions.hasTag()) {
202+
requestOptionsBuilder.setTransactionTag(requestOptions.tag());
203+
}
204+
return requestOptionsBuilder.build();
205+
}
206+
return null;
207+
}
208+
209+
@Override
210+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
211+
Iterable<MutationGroup> mutationGroups, TransactionOption... transactionOptions)
212+
throws SpannerException {
213+
setActive(null);
214+
List<BatchWriteRequest.MutationGroup> mutationGroupsProto =
215+
MutationGroup.toListProto(mutationGroups);
216+
final BatchWriteRequest.Builder requestBuilder =
217+
BatchWriteRequest.newBuilder().setSession(name).addAllMutationGroups(mutationGroupsProto);
218+
RequestOptions batchWriteRequestOptions = getRequestOptions(transactionOptions);
219+
if (batchWriteRequestOptions != null) {
220+
requestBuilder.setRequestOptions(batchWriteRequestOptions);
221+
}
222+
Span span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE).startSpan();
223+
try (Scope s = tracer.withSpan(span)) {
224+
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);
225+
} catch (Throwable e) {
226+
TraceUtil.setWithFailure(span, e);
227+
throw SpannerExceptionFactory.newSpannerException(e);
228+
} finally {
229+
span.end(TraceUtil.END_SPAN_OPTIONS);
230+
}
231+
}
232+
198233
@Override
199234
public ReadContext singleUse() {
200235
return singleUse(TimestampBound.strong());

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

+25
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.google.api.core.ApiFutures;
4444
import com.google.api.core.SettableApiFuture;
4545
import com.google.api.gax.core.ExecutorProvider;
46+
import com.google.api.gax.rpc.ServerStream;
4647
import com.google.cloud.Timestamp;
4748
import com.google.cloud.grpc.GrpcTransportOptions;
4849
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
@@ -69,6 +70,7 @@
6970
import com.google.common.util.concurrent.MoreExecutors;
7071
import com.google.common.util.concurrent.SettableFuture;
7172
import com.google.protobuf.Empty;
73+
import com.google.spanner.v1.BatchWriteResponse;
7274
import com.google.spanner.v1.ResultSetStats;
7375
import io.opencensus.common.Scope;
7476
import io.opencensus.metrics.DerivedLongCumulative;
@@ -1172,6 +1174,17 @@ public CommitResponse writeAtLeastOnceWithOptions(
11721174
}
11731175
}
11741176

1177+
@Override
1178+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
1179+
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
1180+
throws SpannerException {
1181+
try {
1182+
return get().batchWriteAtLeastOnce(mutationGroups, options);
1183+
} finally {
1184+
close();
1185+
}
1186+
}
1187+
11751188
@Override
11761189
public ReadContext singleUse() {
11771190
try {
@@ -1465,6 +1478,18 @@ public CommitResponse writeAtLeastOnceWithOptions(
14651478
}
14661479
}
14671480

1481+
@Override
1482+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
1483+
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
1484+
throws SpannerException {
1485+
try {
1486+
markUsed();
1487+
return delegate.batchWriteAtLeastOnce(mutationGroups, options);
1488+
} catch (SpannerException e) {
1489+
throw lastException = e;
1490+
}
1491+
}
1492+
14681493
@Override
14691494
public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
14701495
throws SpannerException {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
6666
static final String COMMIT = "CloudSpannerOperation.Commit";
6767
static final String QUERY = "CloudSpannerOperation.ExecuteStreamingQuery";
6868
static final String READ = "CloudSpannerOperation.ExecuteStreamingRead";
69+
static final String BATCH_WRITE = "CloudSpannerOperation.BatchWrite";
6970

7071
private static final Object CLIENT_ID_LOCK = new Object();
7172

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

+10
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@
157157
import com.google.spanner.admin.instance.v1.UpdateInstanceMetadata;
158158
import com.google.spanner.admin.instance.v1.UpdateInstanceRequest;
159159
import com.google.spanner.v1.BatchCreateSessionsRequest;
160+
import com.google.spanner.v1.BatchWriteRequest;
161+
import com.google.spanner.v1.BatchWriteResponse;
160162
import com.google.spanner.v1.BeginTransactionRequest;
161163
import com.google.spanner.v1.CommitRequest;
162164
import com.google.spanner.v1.CommitResponse;
@@ -1684,6 +1686,14 @@ public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
16841686
return partitionedDmlStub.executeStreamingSqlCallable().call(request, context);
16851687
}
16861688

1689+
@Override
1690+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
1691+
BatchWriteRequest request, @Nullable Map<Option, ?> options) {
1692+
GrpcCallContext context =
1693+
newCallContext(options, request.getSession(), request, SpannerGrpc.getBatchWriteMethod());
1694+
return spannerStub.batchWriteCallable().call(request, context);
1695+
}
1696+
16871697
@Override
16881698
public StreamingCall executeQuery(
16891699
ExecuteSqlRequest request,

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

+3
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,9 @@ ApiFuture<ResultSet> executeQueryAsync(
403403
ServerStream<PartialResultSet> executeStreamingPartitionedDml(
404404
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);
405405

406+
ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
407+
BatchWriteRequest request, @Nullable Map<Option, ?> options);
408+
406409
/**
407410
* Executes a query with streaming result.
408411
*

0 commit comments

Comments
 (0)