Skip to content

Commit 16f19dd

Browse files
authored
feat: wire and expose profiler api to the StreamWirter/JsonStreamWriter (#2561)
* Add profiler for request execution details. The usage of the new API will be added in the next PR * Add profiler for request execution details. The usage of the new API will be added in the next PR * add new code change * feat: wire profiler to the actual codebase * .
1 parent 5691bd5 commit 16f19dd

12 files changed

+416
-128
lines changed

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,5 +193,9 @@
193193
<className>com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter$Builder</className>
194194
<method>com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter$Builder setTraceIdBase(java.lang.String)</method>
195195
</difference>
196+
<difference>
197+
<differenceType>1001</differenceType>
198+
<className>com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool</className>
199+
</difference>
196200
</differences>
197201

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 72 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,7 @@
4545
import java.io.IOException;
4646
import java.time.Duration;
4747
import java.time.Instant;
48-
import java.util.ArrayList;
49-
import java.util.Comparator;
50-
import java.util.Deque;
51-
import java.util.HashMap;
52-
import java.util.LinkedList;
53-
import java.util.List;
54-
import java.util.Map;
55-
import java.util.Set;
56-
import java.util.UUID;
48+
import java.util.*;
5749
import java.util.concurrent.ConcurrentHashMap;
5850
import java.util.concurrent.ExecutorService;
5951
import java.util.concurrent.Executors;
@@ -511,6 +503,8 @@ private boolean shouldWaitForBackoff(AppendRequestAndResponse requestWrapper) {
511503

512504
private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper) {
513505
lock.lock();
506+
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
507+
RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
514508
try {
515509
Condition condition = lock.newCondition();
516510
while (shouldWaitForBackoff(requestWrapper)) {
@@ -519,6 +513,8 @@ private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper)
519513
} catch (InterruptedException e) {
520514
throw new IllegalStateException(e);
521515
} finally {
516+
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
517+
RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
522518
lock.unlock();
523519
}
524520
}
@@ -539,6 +535,8 @@ private void addMessageToWaitingQueue(
539535
++this.inflightRequests;
540536
this.inflightBytes += requestWrapper.messageSize;
541537
hasMessageInWaitingQueue.signal();
538+
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
539+
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
542540
if (addToFront) {
543541
waitingRequestQueue.addFirst(requestWrapper);
544542
} else {
@@ -547,7 +545,8 @@ private void addMessageToWaitingQueue(
547545
}
548546

549547
/** Schedules the writing of rows at given offset. */
550-
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
548+
ApiFuture<AppendRowsResponse> append(
549+
StreamWriter streamWriter, ProtoRows rows, long offset, String requestUniqueId) {
551550
if (this.location != null && !this.location.equals(streamWriter.getLocation())) {
552551
throw new StatusRuntimeException(
553552
Status.fromCode(Code.INVALID_ARGUMENT)
@@ -584,7 +583,7 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
584583
requestBuilder.setDefaultMissingValueInterpretation(
585584
streamWriter.getDefaultValueInterpretation());
586585
}
587-
return appendInternal(streamWriter, requestBuilder.build());
586+
return appendInternal(streamWriter, requestBuilder.build(), requestUniqueId);
588587
}
589588

590589
Boolean isUserClosed() {
@@ -601,9 +600,9 @@ String getWriteLocation() {
601600
}
602601

603602
private ApiFuture<AppendRowsResponse> appendInternal(
604-
StreamWriter streamWriter, AppendRowsRequest message) {
603+
StreamWriter streamWriter, AppendRowsRequest message, String requestUniqueId) {
605604
AppendRequestAndResponse requestWrapper =
606-
new AppendRequestAndResponse(message, streamWriter, this.retrySettings);
605+
new AppendRequestAndResponse(message, streamWriter, this.retrySettings, requestUniqueId);
607606
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
608607
requestWrapper.appendResult.setException(
609608
new StatusRuntimeException(
@@ -650,11 +649,14 @@ private ApiFuture<AppendRowsResponse> appendInternal(
650649
writerId));
651650
return requestWrapper.appendResult;
652651
}
653-
652+
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
653+
RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
654654
++this.inflightRequests;
655655
this.inflightBytes += requestWrapper.messageSize;
656656
waitingRequestQueue.addLast(requestWrapper);
657657
hasMessageInWaitingQueue.signal();
658+
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
659+
RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
658660
try {
659661
maybeWaitForInflightQuota();
660662
} catch (StatusRuntimeException ex) {
@@ -663,6 +665,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(
663665
this.inflightBytes -= requestWrapper.messageSize;
664666
throw ex;
665667
}
668+
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
669+
RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
666670
return requestWrapper.appendResult;
667671
} finally {
668672
this.lock.unlock();
@@ -826,7 +830,12 @@ private void appendLoop() {
826830
// prepended as they need to be sent before new requests.
827831
while (!inflightRequestQueue.isEmpty()) {
828832
AppendRequestAndResponse requestWrapper = inflightRequestQueue.pollLast();
833+
// Consider the backend latency as completed for the current request.
834+
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
835+
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
829836
requestWrapper.requestSendTimeStamp = null;
837+
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
838+
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
830839
waitingRequestQueue.addFirst(requestWrapper);
831840
}
832841

@@ -836,6 +845,8 @@ private void appendLoop() {
836845
}
837846
while (!this.waitingRequestQueue.isEmpty()) {
838847
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
848+
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
849+
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
839850
waitForBackoffIfNecessary(requestWrapper);
840851
this.inflightRequestQueue.add(requestWrapper);
841852
localQueue.addLast(requestWrapper);
@@ -876,7 +887,9 @@ private void appendLoop() {
876887
}
877888
while (!localQueue.isEmpty()) {
878889
localQueue.peekFirst().setRequestSendQueueTime();
879-
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
890+
AppendRequestAndResponse wrapper = localQueue.pollFirst();
891+
AppendRowsRequest originalRequest = wrapper.message;
892+
String requestUniqueId = wrapper.requestUniqueId;
880893
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();
881894
// Always respect the first writer schema seen by the loop.
882895
if (writerSchema == null) {
@@ -918,6 +931,9 @@ private void appendLoop() {
918931
}
919932
firstRequestForTableOrSchemaSwitch = false;
920933

934+
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
935+
RequestProfiler.OperationName.RESPONSE_LATENCY, requestUniqueId);
936+
921937
// Send should only throw an exception if there is a problem with the request. The catch
922938
// block will handle this case, and return the exception with the result.
923939
// Otherwise send will return:
@@ -1196,6 +1212,8 @@ private void requestCallback(AppendRowsResponse response) {
11961212
}
11971213
if (!this.inflightRequestQueue.isEmpty()) {
11981214
requestWrapper = pollFirstInflightRequestQueue();
1215+
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
1216+
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
11991217
} else if (inflightCleanuped) {
12001218
// It is possible when requestCallback is called, the inflight queue is already drained
12011219
// because we timed out waiting for done.
@@ -1227,35 +1245,40 @@ private void requestCallback(AppendRowsResponse response) {
12271245
// the current request callback finishes.
12281246
threadPool.submit(
12291247
() -> {
1230-
if (response.hasError()) {
1231-
Exceptions.StorageException storageException =
1232-
Exceptions.toStorageException(response.getError(), null);
1233-
log.fine(String.format("Got error message: %s", response.toString()));
1234-
if (storageException != null) {
1235-
requestWrapper.appendResult.setException(storageException);
1236-
} else if (response.getRowErrorsCount() > 0) {
1237-
Map<Integer, String> rowIndexToErrorMessage = new HashMap<>();
1238-
for (int i = 0; i < response.getRowErrorsCount(); i++) {
1239-
RowError rowError = response.getRowErrors(i);
1240-
rowIndexToErrorMessage.put(
1241-
Math.toIntExact(rowError.getIndex()), rowError.getMessage());
1248+
try {
1249+
if (response.hasError()) {
1250+
Exceptions.StorageException storageException =
1251+
Exceptions.toStorageException(response.getError(), null);
1252+
log.fine(String.format("Got error message: %s", response.toString()));
1253+
if (storageException != null) {
1254+
requestWrapper.appendResult.setException(storageException);
1255+
} else if (response.getRowErrorsCount() > 0) {
1256+
Map<Integer, String> rowIndexToErrorMessage = new HashMap<>();
1257+
for (int i = 0; i < response.getRowErrorsCount(); i++) {
1258+
RowError rowError = response.getRowErrors(i);
1259+
rowIndexToErrorMessage.put(
1260+
Math.toIntExact(rowError.getIndex()), rowError.getMessage());
1261+
}
1262+
AppendSerializationError exception =
1263+
new AppendSerializationError(
1264+
response.getError().getCode(),
1265+
response.getError().getMessage(),
1266+
streamName,
1267+
rowIndexToErrorMessage);
1268+
requestWrapper.appendResult.setException(exception);
1269+
} else {
1270+
StatusRuntimeException exception =
1271+
new StatusRuntimeException(
1272+
Status.fromCodeValue(response.getError().getCode())
1273+
.withDescription(response.getError().getMessage()));
1274+
requestWrapper.appendResult.setException(exception);
12421275
}
1243-
AppendSerializationError exception =
1244-
new AppendSerializationError(
1245-
response.getError().getCode(),
1246-
response.getError().getMessage(),
1247-
streamName,
1248-
rowIndexToErrorMessage);
1249-
requestWrapper.appendResult.setException(exception);
12501276
} else {
1251-
StatusRuntimeException exception =
1252-
new StatusRuntimeException(
1253-
Status.fromCodeValue(response.getError().getCode())
1254-
.withDescription(response.getError().getMessage()));
1255-
requestWrapper.appendResult.setException(exception);
1277+
requestWrapper.appendResult.set(response);
12561278
}
1257-
} else {
1258-
requestWrapper.appendResult.set(response);
1279+
} finally {
1280+
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
1281+
RequestProfiler.OperationName.TOTAL_LATENCY, requestWrapper.requestUniqueId);
12591282
}
12601283
});
12611284
}
@@ -1367,6 +1390,9 @@ static final class AppendRequestAndResponse {
13671390
Instant blockMessageSendDeadline;
13681391

13691392
Integer retryCount;
1393+
1394+
// Unique identifier for the request.
1395+
String requestUniqueId;
13701396
ExponentialRetryAlgorithm retryAlgorithm;
13711397

13721398
// The writer that issues the call of the request.
@@ -1379,11 +1405,15 @@ static final class AppendRequestAndResponse {
13791405
Instant requestSendTimeStamp;
13801406

13811407
AppendRequestAndResponse(
1382-
AppendRowsRequest message, StreamWriter streamWriter, RetrySettings retrySettings) {
1408+
AppendRowsRequest message,
1409+
StreamWriter streamWriter,
1410+
RetrySettings retrySettings,
1411+
String requestUniqueId) {
13831412
this.appendResult = SettableApiFuture.create();
13841413
this.message = message;
13851414
this.messageSize = message.getProtoRows().getSerializedSize();
13861415
this.streamWriter = streamWriter;
1416+
this.requestUniqueId = requestUniqueId;
13871417
this.blockMessageSendDeadline = Instant.now();
13881418
this.retryCount = 0;
13891419
// To be set after first retry

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,6 @@ public static void setOptions(Settings settings) {
234234
ConnectionWorkerPool.settings = settings;
235235
}
236236

237-
/** Distributes the writing of a message to an underlying connection. */
238-
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows) {
239-
return append(streamWriter, rows, -1);
240-
}
241-
242237
ConnectionWorker getConnectionWorker(StreamWriter streamWriter) {
243238
ConnectionWorker connectionWorker;
244239
lock.lock();
@@ -280,12 +275,13 @@ ConnectionWorker getConnectionWorker(StreamWriter streamWriter) {
280275
}
281276

282277
/** Distributes the writing of a message to an underlying connection. */
283-
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
278+
ApiFuture<AppendRowsResponse> append(
279+
StreamWriter streamWriter, ProtoRows rows, long offset, String uniqueRequestId) {
284280
// We are in multiplexing mode after entering the following logic.
285281
ConnectionWorker connectionWorker = getConnectionWorker(streamWriter);
286282
Stopwatch stopwatch = Stopwatch.createStarted();
287283
ApiFuture<AppendRowsResponse> responseFuture =
288-
connectionWorker.append(streamWriter, rows, offset);
284+
connectionWorker.append(streamWriter, rows, offset, uniqueRequestId);
289285
return ApiFutures.transform(
290286
responseFuture,
291287
// Add callback for update schema

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,15 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
383383
return this;
384384
}
385385

386+
/**
387+
* Enable a latency profiler that would periodically generate a detailed latency report for the
388+
* top latency requests. This is currently an experimental API.
389+
*/
390+
public Builder setEnableLatencyProfiler(boolean enableLatencyProfiler) {
391+
this.schemaAwareStreamWriterBuilder.setEnableLatencyProfiler(enableLatencyProfiler);
392+
return this;
393+
}
394+
386395
/**
387396
* Sets the default missing value interpretation value if the column is not presented in the
388397
* missing_value_interpretations map.

0 commit comments

Comments
 (0)