Skip to content

Commit 2f45fa2

Browse files
authored
fix: Remove singleton access to request profiler. Use instance of hook instead. (#2567)
* Add profiler for request execution details. The usage of the new API will be added in the next PR * Add profiler for request execution details. The usage of the new API will be added in the next PR * add new code change * . * Remove directly access to singleton
1 parent ba31fc5 commit 2f45fa2

File tree

9 files changed

+195
-158
lines changed

9 files changed

+195
-158
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ class ConnectionWorker implements AutoCloseable {
247247
*/
248248
private final RetrySettings retrySettings;
249249

250+
private final RequestProfiler.RequestProfilerHook requestProfilerHook;
251+
250252
private static String projectMatching = "projects/[^/]+/";
251253
private static Pattern streamPatternProject = Pattern.compile(projectMatching);
252254

@@ -386,7 +388,8 @@ public ConnectionWorker(
386388
String traceId,
387389
@Nullable String compressorName,
388390
BigQueryWriteSettings clientSettings,
389-
RetrySettings retrySettings)
391+
RetrySettings retrySettings,
392+
boolean enableRequestProfiler)
390393
throws IOException {
391394
this.lock = new ReentrantLock();
392395
this.hasMessageInWaitingQueue = lock.newCondition();
@@ -410,6 +413,7 @@ public ConnectionWorker(
410413
this.compressorName = compressorName;
411414
this.retrySettings = retrySettings;
412415
this.telemetryAttributes = buildOpenTelemetryAttributes();
416+
this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler);
413417
registerOpenTelemetryMetrics();
414418

415419
// Always recreate a client for connection worker.
@@ -503,7 +507,7 @@ private boolean shouldWaitForBackoff(AppendRequestAndResponse requestWrapper) {
503507

504508
private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper) {
505509
lock.lock();
506-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
510+
requestProfilerHook.startOperation(
507511
RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
508512
try {
509513
Condition condition = lock.newCondition();
@@ -513,7 +517,7 @@ private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper)
513517
} catch (InterruptedException e) {
514518
throw new IllegalStateException(e);
515519
} finally {
516-
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
520+
requestProfilerHook.endOperation(
517521
RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
518522
lock.unlock();
519523
}
@@ -535,7 +539,7 @@ private void addMessageToWaitingQueue(
535539
++this.inflightRequests;
536540
this.inflightBytes += requestWrapper.messageSize;
537541
hasMessageInWaitingQueue.signal();
538-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
542+
requestProfilerHook.startOperation(
539543
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
540544
if (addToFront) {
541545
waitingRequestQueue.addFirst(requestWrapper);
@@ -649,13 +653,12 @@ private ApiFuture<AppendRowsResponse> appendInternal(
649653
writerId));
650654
return requestWrapper.appendResult;
651655
}
652-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
653-
RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
656+
requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
654657
++this.inflightRequests;
655658
this.inflightBytes += requestWrapper.messageSize;
656659
waitingRequestQueue.addLast(requestWrapper);
657660
hasMessageInWaitingQueue.signal();
658-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
661+
requestProfilerHook.startOperation(
659662
RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
660663
try {
661664
maybeWaitForInflightQuota();
@@ -665,7 +668,7 @@ private ApiFuture<AppendRowsResponse> appendInternal(
665668
this.inflightBytes -= requestWrapper.messageSize;
666669
throw ex;
667670
}
668-
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
671+
requestProfilerHook.endOperation(
669672
RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
670673
return requestWrapper.appendResult;
671674
} finally {
@@ -831,10 +834,10 @@ private void appendLoop() {
831834
while (!inflightRequestQueue.isEmpty()) {
832835
AppendRequestAndResponse requestWrapper = inflightRequestQueue.pollLast();
833836
// Consider the backend latency as completed for the current request.
834-
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
837+
requestProfilerHook.endOperation(
835838
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
836839
requestWrapper.requestSendTimeStamp = null;
837-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
840+
requestProfilerHook.startOperation(
838841
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
839842
waitingRequestQueue.addFirst(requestWrapper);
840843
}
@@ -845,7 +848,7 @@ private void appendLoop() {
845848
}
846849
while (!this.waitingRequestQueue.isEmpty()) {
847850
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
848-
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
851+
requestProfilerHook.endOperation(
849852
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
850853
waitForBackoffIfNecessary(requestWrapper);
851854
this.inflightRequestQueue.add(requestWrapper);
@@ -931,7 +934,7 @@ private void appendLoop() {
931934
}
932935
firstRequestForTableOrSchemaSwitch = false;
933936

934-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
937+
requestProfilerHook.startOperation(
935938
RequestProfiler.OperationName.RESPONSE_LATENCY, requestUniqueId);
936939

937940
// Send should only throw an exception if there is a problem with the request. The catch
@@ -1212,7 +1215,7 @@ private void requestCallback(AppendRowsResponse response) {
12121215
}
12131216
if (!this.inflightRequestQueue.isEmpty()) {
12141217
requestWrapper = pollFirstInflightRequestQueue();
1215-
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
1218+
requestProfilerHook.endOperation(
12161219
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
12171220
} else if (inflightCleanuped) {
12181221
// It is possible when requestCallback is called, the inflight queue is already drained
@@ -1277,7 +1280,7 @@ private void requestCallback(AppendRowsResponse response) {
12771280
requestWrapper.appendResult.set(response);
12781281
}
12791282
} finally {
1280-
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
1283+
requestProfilerHook.endOperation(
12811284
RequestProfiler.OperationName.TOTAL_LATENCY, requestWrapper.requestUniqueId);
12821285
}
12831286
});

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ public abstract static class Builder {
205205
/** Static setting for connection pool. */
206206
private static Settings settings = Settings.builder().build();
207207

208+
private final boolean enableRequestProfiler;
209+
208210
ConnectionWorkerPool(
209211
long maxInflightRequests,
210212
long maxInflightBytes,
@@ -213,7 +215,8 @@ public abstract static class Builder {
213215
String traceId,
214216
@Nullable String comperssorName,
215217
BigQueryWriteSettings clientSettings,
216-
RetrySettings retrySettings) {
218+
RetrySettings retrySettings,
219+
boolean enableRequestProfiler) {
217220
this.maxInflightRequests = maxInflightRequests;
218221
this.maxInflightBytes = maxInflightBytes;
219222
this.maxRetryDuration = maxRetryDuration;
@@ -223,6 +226,7 @@ public abstract static class Builder {
223226
this.clientSettings = clientSettings;
224227
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
225228
this.retrySettings = retrySettings;
229+
this.enableRequestProfiler = enableRequestProfiler;
226230
}
227231

228232
/**
@@ -404,7 +408,8 @@ private ConnectionWorker createConnectionWorker(
404408
traceId,
405409
compressorName,
406410
clientSettings,
407-
retrySettings);
411+
retrySettings,
412+
enableRequestProfiler);
408413
connectionWorkerPool.add(connectionWorker);
409414
log.info(
410415
String.format(

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/RequestProfiler.java

Lines changed: 69 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ enum OperationName {
8080
private static final int MAX_CACHED_REQUEST = 100000;
8181

8282
// Singleton for easier access.
83-
static final RequestProfiler REQUEST_PROFILER_SINGLETON = new RequestProfiler();
83+
private static final RequestProfiler REQUEST_PROFILER_SINGLETON = new RequestProfiler();
8484

8585
// Tunable static variable indicate how many top longest latency requests we should consider.
8686
private static final int DEFAULT_TOP_K = 20;
@@ -172,33 +172,35 @@ void flushAndPrintReport() {
172172
// Periodically trigger the report generation.
173173
void startPeriodicalReportFlushing() {
174174
this.enableProfiiler = true;
175-
this.flushThread =
176-
new Thread(
177-
new Runnable() {
178-
@Override
179-
public void run() {
180-
try {
181-
while (true) {
182-
try {
183-
TimeUnit.MILLISECONDS.sleep(FLUSH_PERIOD.toMillis());
184-
} catch (InterruptedException e) {
185-
log.warning("Flush report thread is interrupted by " + e.toString());
186-
throw new RuntimeException(e);
175+
if (this.flushThread == null || !this.flushThread.isAlive()) {
176+
this.flushThread =
177+
new Thread(
178+
new Runnable() {
179+
@Override
180+
public void run() {
181+
try {
182+
while (true) {
183+
try {
184+
TimeUnit.MILLISECONDS.sleep(FLUSH_PERIOD.toMillis());
185+
} catch (InterruptedException e) {
186+
log.warning("Flush report thread is interrupted by " + e.toString());
187+
throw new RuntimeException(e);
188+
}
189+
flushAndPrintReport();
187190
}
188-
flushAndPrintReport();
191+
} catch (Exception ex) {
192+
// Mute any exception thrown from profiler process as we don't want to
193+
// interrupt normal operations.
194+
log.warning(
195+
"Exception thrown request profiler ignored, this is suggesting faulty "
196+
+ "implementation of "
197+
+ "RequestProfiler, exception context: "
198+
+ ex.toString());
189199
}
190-
} catch (Exception ex) {
191-
// Mute any exception thrown from profiler process as we don't want to
192-
// interrupt normal operations.
193-
log.warning(
194-
"Exception thrown request profiler ignored, this is suggesting faulty "
195-
+ "implementation of "
196-
+ "RequestProfiler, exception context: "
197-
+ ex.toString());
198200
}
199-
}
200-
});
201-
this.flushThread.start();
201+
});
202+
this.flushThread.start();
203+
}
202204
}
203205

204206
String flushAndGenerateReportText() {
@@ -402,7 +404,48 @@ void internalDisableAndClearProfiler() {
402404
FLUSH_PERIOD = DEFAULT_FLUSH_PERIOD;
403405
}
404406

405-
public static void disableAndClearProfiler() {
407+
public static void disableAndResetProfiler() {
406408
REQUEST_PROFILER_SINGLETON.internalDisableAndClearProfiler();
407409
}
410+
411+
/**
412+
* A hook for easier access to request profiler. Otherwise we have to trigger tedious if clauses
413+
* to check whether profiler is enabled before every caller's trigger of the request profiler.
414+
* This is because profiler is shared statically across instances.
415+
*/
416+
static class RequestProfilerHook {
417+
private boolean enableRequestProfiler = false;
418+
419+
RequestProfilerHook(boolean enableRequestProfiler) {
420+
this.enableRequestProfiler = enableRequestProfiler;
421+
}
422+
423+
// Mimic the api exposed by the main request profiler.
424+
void startOperation(OperationName operationName, String requestUniqueId) {
425+
if (this.enableRequestProfiler) {
426+
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(operationName, requestUniqueId);
427+
}
428+
}
429+
430+
// Mimic the api exposed by the main request profiler.
431+
void endOperation(OperationName operationName, String requestUniqueId) {
432+
if (this.enableRequestProfiler) {
433+
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(operationName, requestUniqueId);
434+
}
435+
}
436+
437+
void startPeriodicalReportFlushing() {
438+
if (this.enableRequestProfiler) {
439+
RequestProfiler.REQUEST_PROFILER_SINGLETON.startPeriodicalReportFlushing();
440+
}
441+
}
442+
443+
String flushAndGenerateReportText() {
444+
return RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText();
445+
}
446+
447+
void enableProfiler() {
448+
REQUEST_PROFILER_SINGLETON.enableProfiler();
449+
}
450+
}
408451
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public class SchemaAwareStreamWriter<T> implements AutoCloseable {
7171
// the user provides the table schema, we should always use that schema.
7272
private final boolean skipRefreshStreamWriter;
7373

74+
// Provide access to the request profiler.
75+
private final RequestProfiler.RequestProfilerHook requestProfilerHook;
76+
7477
/**
7578
* Constructs the SchemaAwareStreamWriter
7679
*
@@ -103,8 +106,10 @@ private SchemaAwareStreamWriter(Builder<T> builder)
103106
streamWriterBuilder.setDefaultMissingValueInterpretation(
104107
builder.defaultMissingValueInterpretation);
105108
streamWriterBuilder.setClientId(builder.clientId);
109+
streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler);
110+
requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
106111
if (builder.enableRequestProfiler) {
107-
streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler);
112+
requestProfilerHook.startPeriodicalReportFlushing();
108113
}
109114
this.streamWriter = streamWriterBuilder.build();
110115
this.streamName = builder.streamName;
@@ -127,12 +132,12 @@ private SchemaAwareStreamWriter(Builder<T> builder)
127132
public ApiFuture<AppendRowsResponse> append(Iterable<T> items)
128133
throws IOException, DescriptorValidationException {
129134
String requestUniqueId = generateRequestUniqueId();
130-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
135+
requestProfilerHook.startOperation(
131136
RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId);
132137
try {
133138
return appendWithUniqueId(items, -1, requestUniqueId);
134139
} catch (Exception ex) {
135-
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
140+
requestProfilerHook.endOperation(
136141
RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId);
137142
throw ex;
138143
}
@@ -197,12 +202,12 @@ private List<DynamicMessage> buildMessage(Iterable<T> items)
197202
public ApiFuture<AppendRowsResponse> append(Iterable<T> items, long offset)
198203
throws IOException, DescriptorValidationException {
199204
String requestUniqueId = generateRequestUniqueId();
200-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
205+
requestProfilerHook.startOperation(
201206
RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId);
202207
try {
203208
return appendWithUniqueId(items, offset, requestUniqueId);
204209
} catch (Exception ex) {
205-
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
210+
requestProfilerHook.endOperation(
206211
RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId);
207212
throw ex;
208213
}
@@ -213,7 +218,7 @@ ApiFuture<AppendRowsResponse> appendWithUniqueId(
213218
throws DescriptorValidationException, IOException {
214219
// Handle schema updates in a Thread-safe way by locking down the operation
215220
synchronized (this) {
216-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
221+
requestProfilerHook.startOperation(
217222
RequestProfiler.OperationName.JSON_TO_PROTO_CONVERSION, requestUniqueId);
218223
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
219224
try {
@@ -246,7 +251,7 @@ ApiFuture<AppendRowsResponse> appendWithUniqueId(
246251
rowIndexToErrorMessage);
247252
}
248253
} finally {
249-
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
254+
requestProfilerHook.endOperation(
250255
RequestProfiler.OperationName.JSON_TO_PROTO_CONVERSION, requestUniqueId);
251256
}
252257
return this.streamWriter.appendWithUniqueId(rowsBuilder.build(), offset, requestUniqueId);
@@ -529,9 +534,6 @@ private Builder(
529534
this.skipRefreshStreamWriter = true;
530535
}
531536
this.toProtoConverter = toProtoConverter;
532-
if (this.enableRequestProfiler) {
533-
RequestProfiler.REQUEST_PROFILER_SINGLETON.startPeriodicalReportFlushing();
534-
}
535537
}
536538

537539
/**

0 commit comments

Comments
 (0)