Skip to content

Commit e6cc5f6

Browse files
fix: aggregate batching throttling latency per attempt and reset it between (#1905)
* fix: aggregate batching throttling latency per attempt and reset it between This should improve reporting of latency when bulk mutation throttling is enabled. Also: - fix tests to properly close the batcher - simplify tests to avoid unnecessary mocking - improve test failure messaging Change-Id: I53748c5e54ebbbe2a896f8ea0ce6c39a8f5fa297 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent f4fe6a0 commit e6cc5f6

File tree

4 files changed

+41
-61
lines changed

4 files changed

+41
-61
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
273273
}
274274
}
275275

276-
recorder.putClientBlockingLatencies(totalClientBlockingTime.get());
276+
// Make sure to reset the blocking time after recording it for the next attempt
277+
recorder.putClientBlockingLatencies(totalClientBlockingTime.getAndSet(0));
277278

278279
// Patch the status until it's fixed in gax. When an attempt failed,
279280
// it'll throw a ServerStreamingAttemptException. Unwrap the exception

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class MetricsTracer extends BigtableTracer {
5959

6060
private volatile int attempt = 0;
6161

62+
private volatile boolean reportBatchingLatency = false;
63+
private volatile long batchThrottledLatency = 0;
64+
6265
MetricsTracer(
6366
OperationType operationType,
6467
Tagger tagger,
@@ -167,6 +170,14 @@ private void recordAttemptCompletion(@Nullable Throwable throwable) {
167170
RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY,
168171
attemptTimer.elapsed(TimeUnit.MILLISECONDS));
169172

173+
if (reportBatchingLatency) {
174+
measures.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, batchThrottledLatency);
175+
176+
// Reset batch throttling latency for next attempt. This can't be done in attemptStarted
177+
// because batching flow control will add batching latency before the attempt has started.
178+
batchThrottledLatency = 0;
179+
}
180+
170181
// Patch the throwable until it's fixed in gax. When an attempt failed,
171182
// it'll throw a ServerStreamingAttemptException. Unwrap the exception
172183
// so it could get processed by extractStatus
@@ -216,11 +227,8 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
216227

217228
@Override
218229
public void batchRequestThrottled(long totalThrottledMs) {
219-
MeasureMap measures =
220-
stats
221-
.newMeasureMap()
222-
.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs);
223-
measures.record(newTagCtxBuilder().build());
230+
reportBatchingLatency = true;
231+
batchThrottledLatency += totalThrottledMs;
224232
}
225233

226234
private TagContextBuilder newTagCtxBuilder() {

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java

Lines changed: 21 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import static org.mockito.Mockito.doAnswer;
2121
import static org.mockito.Mockito.when;
2222

23-
import com.google.api.gax.batching.BatchResource;
2423
import com.google.api.gax.batching.Batcher;
2524
import com.google.api.gax.batching.BatcherImpl;
26-
import com.google.api.gax.batching.BatchingDescriptor;
2725
import com.google.api.gax.batching.FlowController;
2826
import com.google.api.gax.grpc.GrpcCallContext;
2927
import com.google.api.gax.rpc.ApiCallContext;
@@ -387,45 +385,38 @@ public Object answer(InvocationOnMock invocation) {
387385
.when(mockService)
388386
.readRows(any(ReadRowsRequest.class), any());
389387

390-
try (Batcher batcher =
388+
try (Batcher<ByteString, Row> batcher =
391389
stub.newBulkReadRowsBatcher(Query.create(TABLE_ID), GrpcCallContext.createDefault())) {
392390
batcher.add(ByteString.copyFromUtf8("row1"));
393-
batcher.sendOutstanding();
394-
395-
long throttledTimeMetric =
396-
StatsTestUtils.getAggregationValueAsLong(
397-
localStats,
398-
RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW,
399-
ImmutableMap.of(
400-
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
401-
PROJECT_ID,
402-
INSTANCE_ID,
403-
APP_PROFILE_ID);
404-
assertThat(throttledTimeMetric).isEqualTo(0);
405391
}
392+
393+
long throttledTimeMetric =
394+
StatsTestUtils.getAggregationValueAsLong(
395+
localStats,
396+
RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW,
397+
ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
398+
PROJECT_ID,
399+
INSTANCE_ID,
400+
APP_PROFILE_ID);
401+
assertThat(throttledTimeMetric).isEqualTo(0);
406402
}
407403

408404
@Test
409405
public void testBatchMutateRowsThrottledTime() throws Exception {
410406
FlowController flowController = Mockito.mock(FlowController.class);
411-
BatchingDescriptor batchingDescriptor = Mockito.mock(MutateRowsBatchingDescriptor.class);
412-
when(batchingDescriptor.createResource(any())).thenReturn(new FakeBatchResource());
413-
when(batchingDescriptor.createEmptyResource()).thenReturn(new FakeBatchResource());
407+
MutateRowsBatchingDescriptor batchingDescriptor = new MutateRowsBatchingDescriptor();
408+
414409
// Mock throttling
415410
final long throttled = 50;
416411
doAnswer(
417-
new Answer() {
418-
@Override
419-
public Object answer(InvocationOnMock invocation) throws Throwable {
420-
Thread.sleep(throttled);
421-
return null;
422-
}
412+
invocation -> {
413+
Thread.sleep(throttled);
414+
return null;
423415
})
424416
.when(flowController)
425417
.reserve(any(Long.class), any(Long.class));
426418
when(flowController.getMaxElementCountLimit()).thenReturn(null);
427419
when(flowController.getMaxRequestBytesLimit()).thenReturn(null);
428-
when(batchingDescriptor.newRequestBuilder(any())).thenCallRealMethod();
429420

430421
doAnswer(
431422
new Answer() {
@@ -444,18 +435,18 @@ public Object answer(InvocationOnMock invocation) {
444435

445436
ApiCallContext defaultContext = GrpcCallContext.createDefault();
446437

447-
Batcher batcher =
448-
new BatcherImpl(
438+
try (Batcher<RowMutationEntry, Void> batcher =
439+
new BatcherImpl<>(
449440
batchingDescriptor,
450441
stub.bulkMutateRowsCallable().withDefaultCallContext(defaultContext),
451442
BulkMutation.create(TABLE_ID),
452443
settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(),
453444
Executors.newSingleThreadScheduledExecutor(),
454445
flowController,
455-
defaultContext);
446+
defaultContext)) {
456447

457-
batcher.add(RowMutationEntry.create("key"));
458-
batcher.sendOutstanding();
448+
batcher.add(RowMutationEntry.create("key").deleteRow());
449+
}
459450

460451
long throttledTimeMetric =
461452
StatsTestUtils.getAggregationValueAsLong(
@@ -473,29 +464,4 @@ public Object answer(InvocationOnMock invocation) {
473464
private static <T> StreamObserver<T> anyObserver(Class<T> returnType) {
474465
return (StreamObserver<T>) any(returnType);
475466
}
476-
477-
private class FakeBatchResource implements BatchResource {
478-
479-
FakeBatchResource() {}
480-
481-
@Override
482-
public BatchResource add(BatchResource resource) {
483-
return new FakeBatchResource();
484-
}
485-
486-
@Override
487-
public long getElementCount() {
488-
return 1;
489-
}
490-
491-
@Override
492-
public long getByteCount() {
493-
return 1;
494-
}
495-
496-
@Override
497-
public boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold) {
498-
return false;
499-
}
500-
}
501467
}

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,11 @@ public static long getAggregationValueAsLong(
299299

300300
AggregationData aggregationData = aggregationMap.get(tagValues);
301301

302+
if (aggregationData == null) {
303+
throw new RuntimeException(
304+
"Failed to find metric for: " + tags + ". Current aggregation data: " + aggregationMap);
305+
}
306+
302307
return aggregationData.match(
303308
new io.opencensus.common.Function<AggregationData.SumDataDouble, Long>() {
304309
@Override

0 commit comments

Comments
 (0)