Skip to content

Commit 7d27816

Browse files
feat: create the backbone of counting errors per connection each minute. (#2094)
* Create the backbone of counting errors per connection each minute. * Clean up creating new interceptors and StatsRecorderWrapper ctor. * Rename setting background task and fix imports. * Temporarily skip exporting per connection metrics to fix test failures. * Temporarily share the tests for debugging purposes * Temporarily add the test for debugging. * Remove the new ExecutorProvider and fix integration test failures. * Update unit tests to reflect the new setup. * Clean up and add tests. * Clean comments and add a TODO. * Improve tests and comments. * Address comments and refactor by defining new classes. * Fix code formatting. * Refactor classes and move to better packages. * Clean up classes and address comments. * Update the scheduler object. * Apply cleanups. * Fix unit tests and avoid hanging when getting error in close(). * Fix code formatting. * Improve error handling in the close() method. * Improve exception logging. * Fix code formatting. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 2607fff commit 7d27816

File tree

10 files changed

+543
-17
lines changed

10 files changed

+543
-17
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ api_key
4242
artman-genfiles
4343

4444
.flattened-pom.xml
45+
dependency-reduced-pom.xml

google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/BigtableCreateTimeSeriesExporter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public void export(Collection<Metric> metrics) {
5151
if (!metric.getMetricDescriptor().getName().contains("bigtable")) {
5252
continue;
5353
}
54+
// TODO: temporarily skip exporting per connection metrics.
55+
if (metric.getMetricDescriptor().getName().contains("per_connection_error_count")) {
56+
continue;
57+
}
5458

5559
projectToTimeSeries =
5660
metric.getTimeSeriesList().stream()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.stats;
17+
18+
import com.google.api.core.InternalApi;
19+
import io.opencensus.stats.MeasureMap;
20+
import io.opencensus.stats.StatsRecorder;
21+
import io.opencensus.tags.TagContext;
22+
import io.opencensus.tags.TagContextBuilder;
23+
import io.opencensus.tags.TagKey;
24+
import io.opencensus.tags.TagValue;
25+
import io.opencensus.tags.Tagger;
26+
import io.opencensus.tags.Tags;
27+
import java.util.Map;
28+
29+
/** A wrapper to record built-in metrics for connection metrics not tied to operations/RPCs. */
30+
@InternalApi("For internal use only")
31+
public class StatsRecorderWrapperForConnection {
32+
private final StatsRecorder statsRecorder;
33+
private final TagContext tagContext;
34+
private MeasureMap perConnectionErrorCountMeasureMap;
35+
36+
public StatsRecorderWrapperForConnection(
37+
Map<String, String> statsAttributes, StatsRecorder statsRecorder) {
38+
this.statsRecorder = statsRecorder;
39+
40+
this.perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();
41+
42+
Tagger tagger = Tags.getTagger();
43+
TagContextBuilder tagContextBuilder = tagger.toBuilder(tagger.getCurrentTagContext());
44+
for (Map.Entry<String, String> entry : statsAttributes.entrySet()) {
45+
tagContextBuilder.putLocal(TagKey.create(entry.getKey()), TagValue.create(entry.getValue()));
46+
}
47+
this.tagContext = tagContextBuilder.build();
48+
}
49+
50+
public void putAndRecordPerConnectionErrorCount(long errorCount) {
51+
perConnectionErrorCountMeasureMap.put(
52+
BuiltinMeasureConstants.PER_CONNECTION_ERROR_COUNT, errorCount);
53+
54+
perConnectionErrorCountMeasureMap.record(tagContext);
55+
perConnectionErrorCountMeasureMap = statsRecorder.newMeasureMap();
56+
}
57+
}

google-cloud-bigtable-stats/src/main/java/com/google/cloud/bigtable/stats/StatsWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public static StatsRecorderWrapper createRecorder(
4040
operationType, spanName, statsAttributes, Stats.getStatsRecorder());
4141
}
4242

43+
public static StatsRecorderWrapperForConnection createRecorderForConnection(
44+
Map<String, String> statsAttributes) {
45+
return new StatsRecorderWrapperForConnection(statsAttributes, Stats.getStatsRecorder());
46+
}
47+
4348
// This is used in integration tests to get the tag value strings from view manager because Stats
4449
// is relocated to com.google.bigtable.veneer.repackaged.io.opencensus.
4550
@InternalApi("Visible for testing")

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.stub;
1717

18+
import com.google.api.core.ApiFunction;
1819
import com.google.api.core.BetaApi;
1920
import com.google.api.core.InternalApi;
2021
import com.google.api.gax.batching.Batcher;
@@ -94,6 +95,7 @@
9495
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
9596
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
9697
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
98+
import com.google.cloud.bigtable.data.v2.stub.metrics.ErrorCountPerConnectionMetricTracker;
9799
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
98100
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
99101
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
@@ -117,6 +119,7 @@
117119
import com.google.common.collect.ImmutableList;
118120
import com.google.common.collect.ImmutableMap;
119121
import com.google.protobuf.ByteString;
122+
import io.grpc.ManagedChannelBuilder;
120123
import io.opencensus.stats.Stats;
121124
import io.opencensus.stats.StatsRecorder;
122125
import io.opencensus.tags.TagKey;
@@ -149,7 +152,6 @@
149152
public class EnhancedBigtableStub implements AutoCloseable {
150153
private static final String CLIENT_NAME = "Bigtable";
151154
private static final long FLOW_CONTROL_ADJUSTING_INTERVAL_MS = TimeUnit.SECONDS.toMillis(20);
152-
153155
private final EnhancedBigtableStubSettings settings;
154156
private final ClientContext clientContext;
155157

@@ -176,7 +178,6 @@ public class EnhancedBigtableStub implements AutoCloseable {
176178

177179
public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
178180
throws IOException {
179-
180181
settings = settings.toBuilder().setTracerFactory(createBigtableTracerFactory(settings)).build();
181182
ClientContext clientContext = createClientContext(settings);
182183

@@ -204,10 +205,27 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
204205
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
205206
: null;
206207

207-
if (builder.getEnableRoutingCookie() && transportProvider != null) {
208-
// TODO: this also need to be added to BigtableClientFactory
209-
// patch cookies interceptor
210-
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
208+
ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker;
209+
if (transportProvider != null) {
210+
errorCountPerConnectionMetricTracker =
211+
new ErrorCountPerConnectionMetricTracker(createBuiltinAttributes(builder));
212+
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =
213+
transportProvider.getChannelConfigurator();
214+
transportProvider.setChannelConfigurator(
215+
managedChannelBuilder -> {
216+
if (settings.getEnableRoutingCookie()) {
217+
managedChannelBuilder.intercept(new CookiesInterceptor());
218+
}
219+
220+
managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());
221+
222+
if (oldChannelConfigurator != null) {
223+
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
224+
}
225+
return managedChannelBuilder;
226+
});
227+
} else {
228+
errorCountPerConnectionMetricTracker = null;
211229
}
212230

213231
// Inject channel priming
@@ -233,7 +251,12 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
233251
builder.setTransportChannelProvider(transportProvider.build());
234252
}
235253

236-
return ClientContext.create(builder.build());
254+
ClientContext clientContext = ClientContext.create(builder.build());
255+
if (errorCountPerConnectionMetricTracker != null) {
256+
errorCountPerConnectionMetricTracker.startConnectionErrorCountTracker(
257+
clientContext.getExecutor());
258+
}
259+
return clientContext;
237260
}
238261

239262
public static ApiTracerFactory createBigtableTracerFactory(
@@ -254,13 +277,7 @@ public static ApiTracerFactory createBigtableTracerFactory(
254277
.put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(instanceId))
255278
.put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(appProfileId))
256279
.build();
257-
ImmutableMap<String, String> builtinAttributes =
258-
ImmutableMap.<String, String>builder()
259-
.put("project_id", projectId)
260-
.put("instance", instanceId)
261-
.put("app_profile", appProfileId)
262-
.put("client_name", "bigtable-java/" + Version.VERSION)
263-
.build();
280+
ImmutableMap<String, String> builtinAttributes = createBuiltinAttributes(settings.toBuilder());
264281

265282
return new CompositeTracerFactory(
266283
ImmutableList.of(
@@ -283,6 +300,16 @@ public static ApiTracerFactory createBigtableTracerFactory(
283300
settings.getTracerFactory()));
284301
}
285302

303+
private static ImmutableMap<String, String> createBuiltinAttributes(
304+
EnhancedBigtableStubSettings.Builder builder) {
305+
return ImmutableMap.<String, String>builder()
306+
.put("project_id", builder.getProjectId())
307+
.put("instance", builder.getInstanceId())
308+
.put("app_profile", builder.getAppProfileId())
309+
.put("client_name", "bigtable-java/" + Version.VERSION)
310+
.build();
311+
}
312+
286313
private static void patchCredentials(EnhancedBigtableStubSettings.Builder settings)
287314
throws IOException {
288315
int i = settings.getEndpoint().lastIndexOf(":");

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,6 @@ private Builder() {
652652
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
653653
this.enableRoutingCookie = true;
654654
this.enableRetryInfo = true;
655-
656655
// Defaults provider
657656
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();
658657

@@ -772,7 +771,6 @@ private Builder(EnhancedBigtableStubSettings settings) {
772771
jwtAudienceMapping = settings.jwtAudienceMapping;
773772
enableRoutingCookie = settings.enableRoutingCookie;
774773
enableRetryInfo = settings.enableRetryInfo;
775-
776774
// Per method settings.
777775
readRowsSettings = settings.readRowsSettings.toBuilder();
778776
readRowSettings = settings.readRowSettings.toBuilder();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.metrics;
17+
18+
import io.grpc.CallOptions;
19+
import io.grpc.Channel;
20+
import io.grpc.ClientCall;
21+
import io.grpc.ClientInterceptor;
22+
import io.grpc.ForwardingClientCall;
23+
import io.grpc.ForwardingClientCallListener;
24+
import io.grpc.Metadata;
25+
import io.grpc.MethodDescriptor;
26+
import io.grpc.Status;
27+
import java.util.concurrent.atomic.LongAdder;
28+
import java.util.logging.Level;
29+
import java.util.logging.Logger;
30+
31+
/** An interceptor which counts the number of failed responses for a channel. */
32+
class ConnectionErrorCountInterceptor implements ClientInterceptor {
33+
private static final Logger LOG =
34+
Logger.getLogger(ConnectionErrorCountInterceptor.class.toString());
35+
private final LongAdder numOfErrors;
36+
private final LongAdder numOfSuccesses;
37+
38+
ConnectionErrorCountInterceptor() {
39+
numOfErrors = new LongAdder();
40+
numOfSuccesses = new LongAdder();
41+
}
42+
43+
@Override
44+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
45+
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
46+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
47+
channel.newCall(methodDescriptor, callOptions)) {
48+
@Override
49+
public void start(Listener<RespT> responseListener, Metadata headers) {
50+
super.start(
51+
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
52+
responseListener) {
53+
@Override
54+
public void onClose(Status status, Metadata trailers) {
55+
// Connection accounting is non-critical, so we log the exception, but let normal
56+
// processing proceed.
57+
try {
58+
handleOnCloseUnsafe(status);
59+
} catch (Throwable t) {
60+
if (t instanceof InterruptedException) {
61+
Thread.currentThread().interrupt();
62+
}
63+
LOG.log(
64+
Level.WARNING, "Unexpected error while updating connection error stats", t);
65+
}
66+
super.onClose(status, trailers);
67+
}
68+
69+
private void handleOnCloseUnsafe(Status status) {
70+
if (status.isOk()) {
71+
numOfSuccesses.increment();
72+
} else {
73+
numOfErrors.increment();
74+
}
75+
}
76+
},
77+
headers);
78+
}
79+
};
80+
}
81+
82+
long getAndResetNumOfErrors() {
83+
return numOfErrors.sumThenReset();
84+
}
85+
86+
long getAndResetNumOfSuccesses() {
87+
return numOfSuccesses.sumThenReset();
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.metrics;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.cloud.bigtable.stats.StatsRecorderWrapperForConnection;
20+
import com.google.cloud.bigtable.stats.StatsWrapper;
21+
import com.google.common.annotations.VisibleForTesting;
22+
import com.google.common.collect.ImmutableMap;
23+
import io.grpc.ClientInterceptor;
24+
import java.util.Collections;
25+
import java.util.Set;
26+
import java.util.WeakHashMap;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.TimeUnit;
29+
30+
/* Background task that goes through all connections and updates the errors_per_connection metric. */
31+
@InternalApi("For internal use only")
32+
public class ErrorCountPerConnectionMetricTracker implements Runnable {
33+
private static final Integer PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS = 60;
34+
private final Set<ConnectionErrorCountInterceptor> connectionErrorCountInterceptors;
35+
private final Object interceptorsLock = new Object();
36+
// This is not final so that it can be updated and mocked during testing.
37+
private StatsRecorderWrapperForConnection statsRecorderWrapperForConnection;
38+
39+
@VisibleForTesting
40+
void setStatsRecorderWrapperForConnection(
41+
StatsRecorderWrapperForConnection statsRecorderWrapperForConnection) {
42+
this.statsRecorderWrapperForConnection = statsRecorderWrapperForConnection;
43+
}
44+
45+
public ErrorCountPerConnectionMetricTracker(ImmutableMap<String, String> builtinAttributes) {
46+
connectionErrorCountInterceptors =
47+
Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
48+
49+
this.statsRecorderWrapperForConnection =
50+
StatsWrapper.createRecorderForConnection(builtinAttributes);
51+
}
52+
53+
public void startConnectionErrorCountTracker(ScheduledExecutorService scheduler) {
54+
scheduler.scheduleAtFixedRate(
55+
this, 0, PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS, TimeUnit.SECONDS);
56+
}
57+
58+
public ClientInterceptor getInterceptor() {
59+
ConnectionErrorCountInterceptor connectionErrorCountInterceptor =
60+
new ConnectionErrorCountInterceptor();
61+
synchronized (interceptorsLock) {
62+
connectionErrorCountInterceptors.add(connectionErrorCountInterceptor);
63+
}
64+
return connectionErrorCountInterceptor;
65+
}
66+
67+
@Override
68+
public void run() {
69+
synchronized (interceptorsLock) {
70+
for (ConnectionErrorCountInterceptor interceptor : connectionErrorCountInterceptors) {
71+
long errors = interceptor.getAndResetNumOfErrors();
72+
long successes = interceptor.getAndResetNumOfSuccesses();
73+
// We avoid keeping track of inactive connections (i.e., without any failed or successful
74+
// requests).
75+
if (errors > 0 || successes > 0) {
76+
// TODO: add a metric to also keep track of the number of successful requests per each
77+
// connection.
78+
statsRecorderWrapperForConnection.putAndRecordPerConnectionErrorCount(errors);
79+
}
80+
}
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)