Skip to content

Commit 95b2356

Browse files
authored
chore: add new request idempotency header x-goog-gcs-idempotency-token (#2027)
Page Bucket's token will be taken care of by the generator updates that will come in the near future.
1 parent 427f330 commit 95b2356

14 files changed

+602
-63
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicCopyWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import com.google.api.gax.grpc.GrpcCallContext;
1920
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2021
import com.google.api.gax.rpc.UnaryCallable;
2122
import com.google.cloud.RestorableState;
@@ -78,7 +79,9 @@ public void copyChunk() {
7879
RewriteObjectRequest.newBuilder()
7980
.setRewriteToken(mostRecentResponse.getRewriteToken())
8081
.build();
81-
mostRecentResponse = Retrying.run(options, alg, () -> callable.call(req), Decoder.identity());
82+
GrpcCallContext retryContext = Retrying.newCallContext();
83+
mostRecentResponse =
84+
Retrying.run(options, alg, () -> callable.call(req, retryContext), Decoder.identity());
8285
}
8386
}
8487

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,8 @@ UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
281281
return new UnbufferedWriteSession<>(
282282
requireNonNull(start, "start must be non null"),
283283
bindFunction(
284-
WriteFlushStrategy.fsyncEveryFlush(write, deps, alg), ResumableWrite::identity)
284+
WriteFlushStrategy.fsyncEveryFlush(write, deps, alg, Retrying::newCallContext),
285+
ResumableWrite::identity)
285286
.andThen(StorageByteChannels.writable()::createSynchronized));
286287
}
287288
}
@@ -309,7 +310,8 @@ BufferedWritableByteChannelSession<WriteObjectResponse> build() {
309310
return new BufferedWriteSession<>(
310311
requireNonNull(start, "start must be non null"),
311312
bindFunction(
312-
WriteFlushStrategy.fsyncEveryFlush(write, deps, alg), ResumableWrite::identity)
313+
WriteFlushStrategy.fsyncEveryFlush(write, deps, alg, Retrying::newCallContext),
314+
ResumableWrite::identity)
313315
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
314316
.andThen(StorageByteChannels.writable()::createSynchronized));
315317
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 81 additions & 45 deletions
Large diffs are not rendered by default.

google-cloud-storage/src/main/java/com/google/cloud/storage/Retrying.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,19 @@
2020

2121
import com.google.api.core.ApiClock;
2222
import com.google.api.core.NanoClock;
23+
import com.google.api.gax.grpc.GrpcCallContext;
2324
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
2425
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2526
import com.google.api.gax.retrying.RetrySettings;
2627
import com.google.cloud.RetryHelper.RetryHelperException;
2728
import com.google.cloud.storage.Conversions.Decoder;
2829
import com.google.cloud.storage.spi.v1.HttpRpcContext;
30+
import com.google.common.collect.ImmutableList;
31+
import com.google.common.collect.ImmutableMap;
32+
import java.util.UUID;
2933
import java.util.concurrent.Callable;
3034
import java.util.function.Function;
35+
import org.checkerframework.checker.nullness.qual.NonNull;
3136

3237
final class Retrying {
3338

@@ -115,6 +120,14 @@ static <T, U> U run(
115120
}
116121
}
117122

123+
@NonNull
124+
static GrpcCallContext newCallContext() {
125+
return GrpcCallContext.createDefault()
126+
.withExtraHeaders(
127+
ImmutableMap.of(
128+
"x-goog-gcs-idempotency-token", ImmutableList.of(UUID.randomUUID().toString())));
129+
}
130+
118131
static ResultRetryAlgorithm<?> neverRetry() {
119132
return new BasicResultRetryAlgorithm<Object>() {
120133
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/Utils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import com.google.api.client.util.DateTime;
2222
import com.google.api.core.InternalApi;
23+
import com.google.api.gax.grpc.GrpcCallContext;
24+
import com.google.api.gax.rpc.ApiCallContext;
2325
import com.google.cloud.storage.Conversions.Codec;
2426
import com.google.cloud.storage.UnifiedOpts.NamedField;
2527
import com.google.common.annotations.VisibleForTesting;
@@ -289,4 +291,14 @@ private static int crc32cDecode(String from) {
289291
private static String crc32cEncode(int from) {
290292
return BaseEncoding.base64().encode(Ints.toByteArray(from));
291293
}
294+
295+
/**
296+
* Type preserving method for {@link GrpcCallContext#merge(ApiCallContext)}
297+
*
298+
* @see GrpcCallContext#merge(ApiCallContext)
299+
*/
300+
@NonNull
301+
static GrpcCallContext merge(@NonNull GrpcCallContext l, @NonNull GrpcCallContext r) {
302+
return (GrpcCallContext) l.merge(r);
303+
}
292304
}

google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.ExecutionException;
3232
import java.util.function.Consumer;
3333
import java.util.function.LongConsumer;
34+
import java.util.function.Supplier;
3435
import org.checkerframework.checker.nullness.qual.NonNull;
3536
import org.checkerframework.checker.nullness.qual.Nullable;
3637

@@ -57,12 +58,19 @@ private WriteFlushStrategy() {}
5758
static FlusherFactory fsyncEveryFlush(
5859
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write,
5960
RetryingDependencies deps,
60-
ResultRetryAlgorithm<?> alg) {
61+
ResultRetryAlgorithm<?> alg,
62+
Supplier<GrpcCallContext> baseContextSupplier) {
6163
return (String bucketName,
6264
LongConsumer committedTotalBytesCallback,
6365
Consumer<WriteObjectResponse> onSuccessCallback) ->
6466
new FsyncEveryFlusher(
65-
write, deps, alg, bucketName, committedTotalBytesCallback, onSuccessCallback);
67+
write,
68+
deps,
69+
alg,
70+
bucketName,
71+
committedTotalBytesCallback,
72+
onSuccessCallback,
73+
baseContextSupplier);
6674
}
6775

6876
/**
@@ -79,14 +87,14 @@ static FlusherFactory fsyncOnClose(
7987
new FsyncOnClose(write, bucketName, committedTotalBytesCallback, onSuccessCallback);
8088
}
8189

82-
private static GrpcCallContext contextWithBucketName(String bucketName) {
83-
GrpcCallContext ret = GrpcCallContext.createDefault();
90+
private static GrpcCallContext contextWithBucketName(
91+
String bucketName, GrpcCallContext baseContext) {
8492
if (bucketName != null && !bucketName.isEmpty()) {
85-
return ret.withExtraHeaders(
93+
return baseContext.withExtraHeaders(
8694
ImmutableMap.of(
8795
"x-goog-request-params", ImmutableList.of(String.format("bucket=%s", bucketName))));
8896
}
89-
return ret;
97+
return baseContext;
9098
}
9199

92100
/**
@@ -138,20 +146,23 @@ private static final class FsyncEveryFlusher implements Flusher {
138146
private final String bucketName;
139147
private final LongConsumer sizeCallback;
140148
private final Consumer<WriteObjectResponse> completeCallback;
149+
private final Supplier<GrpcCallContext> baseContextSupplier;
141150

142151
private FsyncEveryFlusher(
143152
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write,
144153
RetryingDependencies deps,
145154
ResultRetryAlgorithm<?> alg,
146155
String bucketName,
147156
LongConsumer sizeCallback,
148-
Consumer<WriteObjectResponse> completeCallback) {
157+
Consumer<WriteObjectResponse> completeCallback,
158+
Supplier<GrpcCallContext> baseContextSupplier) {
149159
this.write = write;
150160
this.deps = deps;
151161
this.alg = alg;
152162
this.bucketName = bucketName;
153163
this.sizeCallback = sizeCallback;
154164
this.completeCallback = completeCallback;
165+
this.baseContextSupplier = baseContextSupplier;
155166
}
156167

157168
public void flush(@NonNull List<WriteObjectRequest> segments) {
@@ -160,7 +171,8 @@ public void flush(@NonNull List<WriteObjectRequest> segments) {
160171
alg,
161172
() -> {
162173
Observer observer = new Observer(sizeCallback, completeCallback);
163-
GrpcCallContext internalContext = contextWithBucketName(bucketName);
174+
GrpcCallContext internalContext =
175+
contextWithBucketName(bucketName, baseContextSupplier.get());
164176
ApiStreamObserver<WriteObjectRequest> write =
165177
this.write.withDefaultCallContext(internalContext).clientStreamingCall(observer);
166178

@@ -230,7 +242,8 @@ private void ensureOpen() {
230242
if (stream == null) {
231243
synchronized (this) {
232244
if (stream == null) {
233-
GrpcCallContext internalContext = contextWithBucketName(bucketName);
245+
GrpcCallContext internalContext =
246+
contextWithBucketName(bucketName, GrpcCallContext.createDefault());
234247
stream =
235248
this.write
236249
.withDefaultCallContext(internalContext)

google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ public void intercept(HttpRequest request) throws IOException {
182182
newValue = invocationEntry;
183183
}
184184
headers.set("x-goog-api-client", newValue);
185+
headers.set("x-goog-gcs-idempotency-token", invocationId);
185186
}
186187
}
187188
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ public void resumableUpload() throws IOException, InterruptedException, Executio
191191
WriteFlushStrategy.fsyncEveryFlush(
192192
sc.writeObjectCallable(),
193193
RetryingDependencies.attemptOnce(),
194-
Retrying.neverRetry()));
194+
Retrying.neverRetry(),
195+
Retrying::newCallContext));
195196
ArrayList<String> debugMessages = new ArrayList<>();
196197
try {
197198
ImmutableList<ByteBuffer> buffers = TestUtils.subDivide(bytes, 10);
@@ -279,7 +280,8 @@ public void resumableUpload_chunkAutomaticRetry()
279280
public boolean shouldRetry(Throwable t, Object ignore) {
280281
return TestUtils.findThrowable(DataLossException.class, t) != null;
281282
}
282-
}))) {
283+
},
284+
Retrying::newCallContext))) {
283285
writeCtx = c.getWriteCtx();
284286
ImmutableList<ByteBuffer> buffers = TestUtils.subDivide(bytes, 10);
285287
c.write(buffers.get(0));

google-cloud-storage/src/test/java/com/google/cloud/storage/WriteFlushStrategyTest.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.common.truth.Truth.assertThat;
2020

21+
import com.google.api.gax.grpc.GrpcCallContext;
2122
import com.google.api.gax.rpc.ApiCallContext;
2223
import com.google.api.gax.rpc.ApiStreamObserver;
2324
import com.google.api.gax.rpc.ClientStreamingCallable;
@@ -47,7 +48,10 @@ public void bucketNameAddedToXGoogRequestParams_nonNull_nonEmpty_fsyncEveryFlush
4748
doTest(
4849
write ->
4950
WriteFlushStrategy.fsyncEveryFlush(
50-
write, RetryingDependencies.attemptOnce(), Retrying.neverRetry()),
51+
write,
52+
RetryingDependencies.attemptOnce(),
53+
Retrying.neverRetry(),
54+
GrpcCallContext::createDefault),
5155
"bucket-name",
5256
expectedHeaderNonNullNonEmpty);
5357
}
@@ -62,7 +66,10 @@ public void bucketNameNotAddedToXGoogRequestParams_nonNull_empty_fsyncEveryFlush
6266
doTest(
6367
write ->
6468
WriteFlushStrategy.fsyncEveryFlush(
65-
write, RetryingDependencies.attemptOnce(), Retrying.neverRetry()),
69+
write,
70+
RetryingDependencies.attemptOnce(),
71+
Retrying.neverRetry(),
72+
GrpcCallContext::createDefault),
6673
"",
6774
expectedHeaderNonNullEmpty);
6875
}
@@ -77,7 +84,10 @@ public void bucketNameNotAddedToXGoogRequestParams_null_fsyncEveryFlush() {
7784
doTest(
7885
write ->
7986
WriteFlushStrategy.fsyncEveryFlush(
80-
write, RetryingDependencies.attemptOnce(), Retrying.neverRetry()),
87+
write,
88+
RetryingDependencies.attemptOnce(),
89+
Retrying.neverRetry(),
90+
GrpcCallContext::createDefault),
8191
null,
8292
expectedHeaderNull);
8393
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage.it;
18+
19+
import static com.google.common.truth.Truth.assertWithMessage;
20+
21+
import com.google.common.collect.ImmutableList;
22+
import com.google.common.truth.IterableSubject;
23+
import io.grpc.Attributes;
24+
import io.grpc.CallOptions;
25+
import io.grpc.Channel;
26+
import io.grpc.ClientCall;
27+
import io.grpc.ClientInterceptor;
28+
import io.grpc.ClientStreamTracer;
29+
import io.grpc.ClientStreamTracer.StreamInfo;
30+
import io.grpc.Metadata;
31+
import io.grpc.MethodDescriptor;
32+
import java.util.ArrayList;
33+
import java.util.Collections;
34+
import java.util.List;
35+
import java.util.Objects;
36+
37+
final class GrpcRequestAuditing implements ClientInterceptor {
38+
39+
private final List<Metadata> requestHeaders;
40+
41+
GrpcRequestAuditing() {
42+
requestHeaders = Collections.synchronizedList(new ArrayList<>());
43+
}
44+
45+
void clear() {
46+
requestHeaders.clear();
47+
}
48+
49+
@Override
50+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
51+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
52+
CallOptions withStreamTracerFactory = callOptions.withStreamTracerFactory(new Factory());
53+
return next.newCall(method, withStreamTracerFactory);
54+
}
55+
56+
public <T> IterableSubject assertRequestHeader(Metadata.Key<T> key) {
57+
ImmutableList<Object> actual =
58+
requestHeaders.stream()
59+
.map(m -> m.get(key))
60+
.filter(Objects::nonNull)
61+
.distinct()
62+
.collect(ImmutableList.toImmutableList());
63+
return assertWithMessage(String.format("Headers %s", key.name())).that(actual);
64+
}
65+
66+
private final class Factory extends ClientStreamTracer.Factory {
67+
@Override
68+
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
69+
return new Tracer();
70+
}
71+
}
72+
73+
private final class Tracer extends ClientStreamTracer {
74+
75+
@Override
76+
public void streamCreated(Attributes transportAttrs, Metadata headers) {
77+
requestHeaders.add(headers);
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)