Skip to content

Commit 6685aa3

Browse files
chore: remodel unary callables as server streaming callables with an adapter at the end (#2403)
* chore: remodel unary callables as server streaming callables with an adapter at the end Change-Id: I8708dff0e192d7647ef2cb361fc0992e1ddd2b24 * test + fixes Change-Id: Id4c56656a829f5f4c7ab1170f5f980cf3cc3760c * chore: generate libraries at Mon Nov 4 22:30:01 UTC 2024 * oops Change-Id: I1bd8c318b3272925cd6b81601d7b1d7c772a853f * more tests Change-Id: I1c45f2058cadc1acb9c6abd87222be9eb233778c * avoid multiple cancels Change-Id: I4e05efaac6ae60f5827c6d666c3c6f6cebebaa54 * chore: generate libraries at Tue Nov 5 00:23:44 UTC 2024 * fix fallback Change-Id: I654e70f0b34f5d4c3071ba3c2fed64ea183a865e * chore: generate libraries at Tue Nov 5 00:42:37 UTC 2024 * proper fallback Change-Id: Ic0106f3c6983edbb032aeba6e107e4324952397d * Use transforming callable Change-Id: I8d8474050e40cd819d3be2a5b251448f6eb8c94f * fix npe Change-Id: Ib589ca063369e26ef214eb89099e459981dafe83 * clean up logic Change-Id: I4504c47143000d97554a96469d5f3fd368d08ef1 * oops, messed up splitting commits, this should've been part of this pr not the next Change-Id: I16a35e19c50b7b7b855f4299cf41f0607b3e90bd * typo Change-Id: I8202e935975e1a55606265c502fe7573b8a4acb0 * disable watchdog for the new ReadRow callable chain Change-Id: I4522719a65f24d27fb9dccde031c3b1cc04042c2 --------- Co-authored-by: cloud-java-bot <[email protected]>
1 parent bcf60c2 commit 6685aa3

File tree

6 files changed

+620
-22
lines changed

6 files changed

+620
-22
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub;
17+
18+
import com.google.api.core.AbstractApiFuture;
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.gax.grpc.GrpcStatusCode;
21+
import com.google.api.gax.rpc.ApiCallContext;
22+
import com.google.api.gax.rpc.InternalException;
23+
import com.google.api.gax.rpc.ResponseObserver;
24+
import com.google.api.gax.rpc.ServerStreamingCallable;
25+
import com.google.api.gax.rpc.StreamController;
26+
import com.google.api.gax.rpc.UnaryCallable;
27+
import com.google.api.gax.tracing.ApiTracer;
28+
import com.google.api.gax.tracing.ApiTracerFactory;
29+
import com.google.api.gax.tracing.SpanName;
30+
import com.google.common.base.Preconditions;
31+
import io.grpc.Status;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.logging.Level;
34+
import java.util.logging.Logger;
35+
import javax.annotation.Nullable;
36+
37+
/**
38+
* Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link
39+
* UnaryCallable}. It is intended to be the outermost callable of a chain.
40+
*
41+
* <p>Responsibilities:
42+
*
43+
* <ul>
44+
* <li>Operation level metrics
45+
* <li>Configuring the default call context
46+
* <li>Converting the result to a future
47+
*/
48+
class BigtableUnaryOperationCallable<ReqT, RespT> extends UnaryCallable<ReqT, RespT> {
49+
private static final Logger LOGGER =
50+
Logger.getLogger(BigtableUnaryOperationCallable.class.getName());
51+
Logger logger = LOGGER;
52+
53+
private final ServerStreamingCallable<ReqT, RespT> inner;
54+
private final ApiCallContext defaultCallContext;
55+
private final ApiTracerFactory tracerFactory;
56+
private final SpanName spanName;
57+
private final boolean allowNoResponse;
58+
59+
public BigtableUnaryOperationCallable(
60+
ServerStreamingCallable<ReqT, RespT> inner,
61+
ApiCallContext defaultCallContext,
62+
ApiTracerFactory tracerFactory,
63+
SpanName spanName,
64+
boolean allowNoResponse) {
65+
this.inner = inner;
66+
this.defaultCallContext = defaultCallContext;
67+
this.tracerFactory = tracerFactory;
68+
this.spanName = spanName;
69+
this.allowNoResponse = allowNoResponse;
70+
}
71+
72+
@Override
73+
public ApiFuture<RespT> futureCall(ReqT req, ApiCallContext apiCallContext) {
74+
apiCallContext = defaultCallContext.merge(apiCallContext);
75+
76+
ApiTracer apiTracer =
77+
tracerFactory.newTracer(
78+
apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary);
79+
80+
apiCallContext = apiCallContext.withTracer(apiTracer);
81+
82+
UnaryFuture f = new UnaryFuture(apiTracer, allowNoResponse);
83+
inner.call(req, f, apiCallContext);
84+
return f;
85+
}
86+
87+
class UnaryFuture extends AbstractApiFuture<RespT> implements ResponseObserver<RespT> {
88+
private final ApiTracer tracer;
89+
private final boolean allowNoResponse;
90+
91+
private StreamController controller;
92+
private final AtomicBoolean upstreamCancelled = new AtomicBoolean();
93+
private boolean responseReceived;
94+
private @Nullable RespT response;
95+
96+
private UnaryFuture(ApiTracer tracer, boolean allowNoResponse) {
97+
this.tracer = Preconditions.checkNotNull(tracer, "tracer can't be null");
98+
this.allowNoResponse = allowNoResponse;
99+
this.responseReceived = false;
100+
}
101+
102+
@Override
103+
public void onStart(StreamController controller) {
104+
this.controller = controller;
105+
controller.disableAutoInboundFlowControl();
106+
// Request 2 to detect protocol bugs
107+
controller.request(2);
108+
}
109+
110+
/**
111+
* Immediately cancel the future state and try to cancel the underlying operation. Will return
112+
* false if the future is already resolved.
113+
*/
114+
@Override
115+
public boolean cancel(boolean mayInterruptIfRunning) {
116+
if (super.cancel(mayInterruptIfRunning)) {
117+
cancelUpstream();
118+
return true;
119+
}
120+
return false;
121+
}
122+
123+
private void cancelUpstream() {
124+
if (upstreamCancelled.compareAndSet(false, true)) {
125+
controller.cancel();
126+
}
127+
}
128+
129+
@Override
130+
public void onResponse(RespT resp) {
131+
tracer.responseReceived();
132+
133+
// happy path - buffer the only responsse
134+
if (!responseReceived) {
135+
responseReceived = true;
136+
this.response = resp;
137+
return;
138+
}
139+
140+
String msg =
141+
String.format(
142+
"Received multiple responses for a %s unary operation. Previous: %s, New: %s",
143+
spanName, response, resp);
144+
logger.log(Level.WARNING, msg);
145+
146+
InternalException error =
147+
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
148+
if (setException(error)) {
149+
tracer.operationFailed(error);
150+
}
151+
152+
cancelUpstream();
153+
}
154+
155+
@Override
156+
public void onError(Throwable throwable) {
157+
if (this.setException(throwable)) {
158+
tracer.operationFailed(throwable);
159+
} else if (isCancelled()) {
160+
tracer.operationCancelled();
161+
}
162+
// The future might've been resolved due to double response
163+
}
164+
165+
@Override
166+
public void onComplete() {
167+
if (allowNoResponse || responseReceived) {
168+
if (set(response)) {
169+
tracer.operationSucceeded();
170+
return;
171+
}
172+
} else {
173+
String msg = spanName + " unary operation completed without a response message";
174+
InternalException e =
175+
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
176+
177+
if (setException(e)) {
178+
tracer.operationFailed(e);
179+
return;
180+
}
181+
}
182+
183+
// check cancellation race
184+
if (isCancelled()) {
185+
tracer.operationCancelled();
186+
}
187+
}
188+
}
189+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 113 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import com.google.api.gax.retrying.RetryAlgorithm;
4242
import com.google.api.gax.retrying.RetryingExecutorWithContext;
4343
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
44+
import com.google.api.gax.retrying.SimpleStreamResumptionStrategy;
45+
import com.google.api.gax.retrying.StreamResumptionStrategy;
4446
import com.google.api.gax.rpc.ApiCallContext;
4547
import com.google.api.gax.rpc.Callables;
4648
import com.google.api.gax.rpc.ClientContext;
@@ -136,6 +138,7 @@
136138
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
137139
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
138140
import com.google.common.annotations.VisibleForTesting;
141+
import com.google.common.base.Functions;
139142
import com.google.common.base.MoreObjects;
140143
import com.google.common.base.Preconditions;
141144
import com.google.common.collect.ImmutableList;
@@ -155,6 +158,7 @@
155158
import java.io.IOException;
156159
import java.net.URI;
157160
import java.net.URISyntaxException;
161+
import java.time.Duration;
158162
import java.util.Collections;
159163
import java.util.List;
160164
import java.util.Map;
@@ -559,27 +563,54 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
559563
* </ul>
560564
*/
561565
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
562-
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
563-
createReadRowsBaseCallable(
564-
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
565-
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
566-
.setRetrySettings(settings.readRowSettings().getRetrySettings())
567-
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
568-
.build(),
569-
rowAdapter);
570-
571-
ReadRowsUserCallable<RowT> readRowCallable =
572-
new ReadRowsUserCallable<>(readRowsCallable, requestContext);
573-
574-
ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(readRowCallable);
575-
576-
UnaryCallable<Query, RowT> traced =
577-
new TracedUnaryCallable<>(
578-
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));
579-
580-
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
566+
if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) {
567+
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
568+
createReadRowsBaseCallable(
569+
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
570+
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
571+
.setRetrySettings(settings.readRowSettings().getRetrySettings())
572+
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
573+
.build(),
574+
rowAdapter);
575+
576+
ReadRowsUserCallable<RowT> readRowCallable =
577+
new ReadRowsUserCallable<>(readRowsCallable, requestContext);
578+
ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(readRowCallable);
579+
UnaryCallable<Query, RowT> traced =
580+
new TracedUnaryCallable<>(
581+
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));
582+
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
583+
} else {
584+
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
585+
createReadRowsBaseCallable(
586+
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
587+
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
588+
.setRetrySettings(settings.readRowSettings().getRetrySettings())
589+
.setIdleTimeoutDuration(Duration.ZERO)
590+
.setWaitTimeoutDuration(Duration.ZERO)
591+
.build(),
592+
rowAdapter,
593+
new SimpleStreamResumptionStrategy<>());
594+
ServerStreamingCallable<Query, RowT> readRowCallable =
595+
new TransformingServerStreamingCallable<>(
596+
readRowsCallable,
597+
(query) -> query.limit(1).toProto(requestContext),
598+
Functions.identity());
599+
600+
return new BigtableUnaryOperationCallable<>(
601+
readRowCallable,
602+
clientContext.getDefaultCallContext(),
603+
clientContext.getTracerFactory(),
604+
getSpanName("ReadRow"),
605+
/*allowNoResponses=*/ true);
606+
}
581607
}
582608

609+
private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable(
610+
ServerStreamingCallSettings<ReqT, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {
611+
return createReadRowsBaseCallable(
612+
readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy<RowT>(rowAdapter));
613+
}
583614
/**
584615
* Creates a callable chain to handle ReadRows RPCs. The chain will:
585616
*
@@ -596,8 +627,9 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
596627
* <p>NOTE: the caller is responsible for adding tracing & metrics.
597628
*/
598629
private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable(
599-
ServerStreamingCallSettings<ReqT, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {
600-
630+
ServerStreamingCallSettings<ReqT, Row> readRowsSettings,
631+
RowAdapter<RowT> rowAdapter,
632+
StreamResumptionStrategy<ReadRowsRequest, RowT> resumptionStrategy) {
601633
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
602634
GrpcRawCallableFactory.createServerStreamingCallable(
603635
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
@@ -625,7 +657,7 @@ private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRo
625657
// ReadRowsRequest -> ReadRowsResponse callable).
626658
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
627659
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
628-
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
660+
.setResumptionStrategy(resumptionStrategy)
629661
.setRetryableCodes(readRowsSettings.getRetryableCodes())
630662
.setRetrySettings(readRowsSettings.getRetrySettings())
631663
.setIdleTimeout(readRowsSettings.getIdleTimeout())
@@ -1264,6 +1296,21 @@ private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnar
12641296
UnaryCallSettings<ReqT, RespT> callSettings,
12651297
Function<ReqT, BaseReqT> requestTransformer,
12661298
Function<BaseRespT, RespT> responseTranformer) {
1299+
if (EnhancedBigtableStubSettings.SKIP_TRAILERS) {
1300+
return createUnaryCallableNew(
1301+
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
1302+
} else {
1303+
return createUnaryCallableOld(
1304+
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
1305+
}
1306+
}
1307+
1308+
private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableOld(
1309+
MethodDescriptor<BaseReqT, BaseRespT> methodDescriptor,
1310+
RequestParamsExtractor<BaseReqT> headerParamsFn,
1311+
UnaryCallSettings<ReqT, RespT> callSettings,
1312+
Function<ReqT, BaseReqT> requestTransformer,
1313+
Function<BaseRespT, RespT> responseTranformer) {
12671314

12681315
UnaryCallable<BaseReqT, BaseRespT> base =
12691316
GrpcRawCallableFactory.createUnaryCallable(
@@ -1300,6 +1347,50 @@ public ApiFuture<RespT> futureCall(ReqT reqT, ApiCallContext apiCallContext) {
13001347
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
13011348
}
13021349

1350+
private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableNew(
1351+
MethodDescriptor<BaseReqT, BaseRespT> methodDescriptor,
1352+
RequestParamsExtractor<BaseReqT> headerParamsFn,
1353+
UnaryCallSettings<ReqT, RespT> callSettings,
1354+
Function<ReqT, BaseReqT> requestTransformer,
1355+
Function<BaseRespT, RespT> responseTranformer) {
1356+
1357+
ServerStreamingCallable<BaseReqT, BaseRespT> base =
1358+
GrpcRawCallableFactory.createServerStreamingCallable(
1359+
GrpcCallSettings.<BaseReqT, BaseRespT>newBuilder()
1360+
.setMethodDescriptor(methodDescriptor)
1361+
.setParamsExtractor(headerParamsFn)
1362+
.build(),
1363+
callSettings.getRetryableCodes());
1364+
1365+
base = new StatsHeadersServerStreamingCallable<>(base);
1366+
1367+
base = new BigtableTracerStreamingCallable<>(base);
1368+
1369+
base = withRetries(base, convertUnaryToServerStreamingSettings(callSettings));
1370+
1371+
ServerStreamingCallable<ReqT, RespT> transformed =
1372+
new TransformingServerStreamingCallable<>(base, requestTransformer, responseTranformer);
1373+
1374+
return new BigtableUnaryOperationCallable<>(
1375+
transformed,
1376+
clientContext.getDefaultCallContext(),
1377+
clientContext.getTracerFactory(),
1378+
getSpanName(methodDescriptor.getBareMethodName()),
1379+
/* allowNoResponse= */ false);
1380+
}
1381+
1382+
private static <ReqT, RespT>
1383+
ServerStreamingCallSettings<ReqT, RespT> convertUnaryToServerStreamingSettings(
1384+
UnaryCallSettings<?, ?> unarySettings) {
1385+
return ServerStreamingCallSettings.<ReqT, RespT>newBuilder()
1386+
.setResumptionStrategy(new SimpleStreamResumptionStrategy<>())
1387+
.setRetryableCodes(unarySettings.getRetryableCodes())
1388+
.setRetrySettings(unarySettings.getRetrySettings())
1389+
.setIdleTimeoutDuration(Duration.ZERO)
1390+
.setWaitTimeoutDuration(Duration.ZERO)
1391+
.build();
1392+
}
1393+
13031394
private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
13041395
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
13051396
GrpcRawCallableFactory.createUnaryCallable(

0 commit comments

Comments
 (0)