Skip to content

Commit 5691bd5

Browse files
authored
feat: add profiler for request execution details for write api connection worker (#2555)
* Add profiler for request execution details. The usage of the new API will be added in the next PR * Add profiler for request execution details. The usage of the new API will be added in the next PR
1 parent cd0c17b commit 5691bd5

File tree

2 files changed

+559
-0
lines changed

2 files changed

+559
-0
lines changed
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigquery.storage.v1;
17+
18+
import java.time.Duration;
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.Comparator;
22+
import java.util.Iterator;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Map.Entry;
26+
import java.util.PriorityQueue;
27+
import java.util.Queue;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.ConcurrentLinkedDeque;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicLong;
32+
import java.util.logging.Logger;
33+
34+
/**
35+
* A profiler that would periodically generate a report for the past period with the latency report
36+
* for the slowest requests. This is used for debugging only.
37+
*
38+
* <pre>
39+
* The report will contain the execution details of the TOP_K slowest requests, one example:
40+
*
41+
* INFO: During the last 60000 milliseconds at system time 1720825020138, in total 2 requests finished. Total dropped request is 0. The top 10 long latency requests details report:
42+
* -----------------------------
43+
* Request uuid: request_1 with total time 1000 milliseconds
44+
* Operation name json_to_proto_conversion starts at: 1720566109971, ends at: 1720566109971, total time: 200 milliseconds
45+
* Operation name backend_latency starts at: 1720566109971, ends at: 1720566109971, total time: 800 milliseconds
46+
* -----------------------------
47+
* Request uuid: request_2 with total time 500 milliseconds
48+
* Operation name json_to_proto_conversion starts at: 1720566109971, ends at: 1720566109971, total time: 250 milliseconds
49+
* Operation name backend_latency starts at: 1720566109971, ends at: 1720566109971, total time: 250 milliseconds
50+
* ...
51+
* </pre>
52+
*/
53+
public class RequestProfiler {
54+
enum OperationName {
55+
// The total end to end latency for a request.
56+
TOTAL_LATENCY("append_request_total_latency"),
57+
// Json to proto conversion time.
58+
JSON_TO_PROTO_CONVERSION("json_to_proto_conversion"),
59+
// Time spent to fetch the table schema when user didn't provide it.
60+
SCHEMA_FETCHING("schema_fetching"),
61+
// Time spent within wait queue before it get picked up.
62+
WAIT_QUEUE("wait_queue"),
63+
// Time spent within backend + the time spent over network.
64+
RESPONSE_LATENCY("response_latency");
65+
private final String operationName;
66+
67+
OperationName(String operationName) {
68+
this.operationName = operationName;
69+
}
70+
}
71+
72+
private static final Logger log = Logger.getLogger(RequestProfiler.class.getName());
73+
74+
// Discard the requests if we are caching too many requests.
75+
private static final int MAX_CACHED_REQUEST = 100000;
76+
77+
// Singleton for easier access.
78+
public static final RequestProfiler REQUEST_PROFILER_SINGLETON = new RequestProfiler();
79+
80+
// Tunable static variable indicate how many top longest latency requests we should consider.
81+
private static int TOP_K = 10;
82+
83+
// Tunable static variable indicate how often the report should be generated.
84+
private static Duration FLUSH_PERIOD = Duration.ofMinutes(1);
85+
86+
// From request uuid to the profiler of individual request. This will be cleaned up periodically.
87+
private final Map<String, IndividualRequestProfiler> idToIndividualOperation =
88+
new ConcurrentHashMap<>();
89+
90+
private Thread flushThread;
91+
92+
// Count the total number of dropped operations.
93+
AtomicLong droppedOperationCount = new AtomicLong(0);
94+
95+
// Mark an operation for a given request id to be start.
96+
void startOperation(OperationName operationName, String requestUniqueId) {
97+
if (!idToIndividualOperation.containsKey(requestUniqueId)) {
98+
if (idToIndividualOperation.size() > MAX_CACHED_REQUEST) {
99+
log.warning(
100+
String.format(
101+
"startOperation is triggered for request_id: %s that's hasn't "
102+
+ "seen before, this is possible when "
103+
+ "we are recording too much ongoing requests. So far we has dropped %s operations.",
104+
requestUniqueId, droppedOperationCount));
105+
droppedOperationCount.incrementAndGet();
106+
return;
107+
}
108+
idToIndividualOperation.put(requestUniqueId, new IndividualRequestProfiler(requestUniqueId));
109+
}
110+
idToIndividualOperation.get(requestUniqueId).startOperation(operationName);
111+
}
112+
113+
// Mark an operation for a given request id to be end.
114+
void endOperation(OperationName operationName, String requestUniqueId) {
115+
if (!idToIndividualOperation.containsKey(requestUniqueId)) {
116+
log.warning(
117+
String.format(
118+
"endOperation is triggered for request_id: %s that's hasn't "
119+
+ "seen before, this is possible when "
120+
+ "we are recording too much ongoing requests. So far we has dropped %s operations.",
121+
requestUniqueId, droppedOperationCount));
122+
return;
123+
}
124+
idToIndividualOperation.get(requestUniqueId).endOperation(operationName);
125+
}
126+
127+
void flushReport() {
128+
log.info(flushAndGenerateReportText());
129+
}
130+
131+
// Periodically trigger the report generation.
132+
void startPeriodicalReportFlushing() {
133+
this.flushThread =
134+
new Thread(
135+
new Runnable() {
136+
@Override
137+
public void run() {
138+
while (true) {
139+
try {
140+
TimeUnit.MILLISECONDS.sleep(FLUSH_PERIOD.toMillis());
141+
} catch (InterruptedException e) {
142+
log.warning("Flush report thread is interrupted by " + e.toString());
143+
throw new RuntimeException(e);
144+
}
145+
flushReport();
146+
}
147+
}
148+
});
149+
this.flushThread.start();
150+
}
151+
152+
String flushAndGenerateReportText() {
153+
RequestProfilerComparator comparator = new RequestProfilerComparator();
154+
155+
// Find the top k requests with the longest latency.
156+
PriorityQueue<IndividualRequestProfiler> minHeap =
157+
new PriorityQueue<IndividualRequestProfiler>(comparator);
158+
Iterator<Entry<String, IndividualRequestProfiler>> iterator =
159+
idToIndividualOperation.entrySet().iterator();
160+
int finishedRequestCount = 0;
161+
// Iterate through all the requests stats, add to min heap if that's a finished request and has
162+
// longer total
163+
// latency than the least amount of latency in the min heap.
164+
while (iterator.hasNext()) {
165+
Entry<String, IndividualRequestProfiler> individualRequestProfiler = iterator.next();
166+
if (!individualRequestProfiler.getValue().finalized) {
167+
continue;
168+
}
169+
finishedRequestCount++;
170+
if (minHeap.size() < TOP_K
171+
|| individualRequestProfiler.getValue().totalTime > minHeap.peek().totalTime) {
172+
minHeap.add(individualRequestProfiler.getValue());
173+
}
174+
if (minHeap.size() > TOP_K) {
175+
minHeap.poll();
176+
}
177+
// Remove during using iterator is safe.
178+
iterator.remove();
179+
}
180+
181+
// Generate report for the TOP_K longest requests.
182+
String reportText =
183+
String.format(
184+
"During the last %s milliseconds at system time %s, in total %s requests finished. Total dropped "
185+
+ "request is %s. The top %s long latency requests details report:\n",
186+
FLUSH_PERIOD.toMillis(),
187+
System.currentTimeMillis(),
188+
finishedRequestCount,
189+
droppedOperationCount.getAndSet(0),
190+
TOP_K);
191+
if (minHeap.isEmpty()) {
192+
reportText += "-----------------------------\n";
193+
reportText += "\t0 requests finished during the last period.";
194+
} else {
195+
// Print the report for the top k requests.
196+
ArrayList<String> reportList = new ArrayList<>();
197+
while (minHeap.size() > 0) {
198+
reportList.add("-----------------------------\n" + minHeap.poll().generateReport());
199+
}
200+
// Output in reverse order to make sure the longest latency request shows up in front.
201+
for (int i = 0; i < reportList.size(); i++) {
202+
reportText += reportList.get(reportList.size() - i - 1);
203+
}
204+
}
205+
return reportText;
206+
}
207+
208+
// Min heap comparator
209+
class RequestProfilerComparator implements Comparator<IndividualRequestProfiler> {
210+
@Override
211+
public int compare(IndividualRequestProfiler x, IndividualRequestProfiler y) {
212+
if (x.totalTime > y.totalTime) {
213+
return 1;
214+
} else if (x.totalTime < y.totalTime) {
215+
return -1;
216+
}
217+
return 0;
218+
}
219+
}
220+
221+
/**
222+
* Record the profiling information for each individual request. Act like a buffer of the past
223+
* requests, either finished or not finished.
224+
*/
225+
private static final class IndividualRequestProfiler {
226+
// From operation name to the list of time spent each time we do this operation.
227+
// e.g. some operation is retried two times, resulting in two time recorded in the queue.
228+
private final Map<OperationName, Queue<Long>> timeRecorderMap;
229+
230+
// All current finished operations.
231+
private final List<IndividualOperation> finishedOperations;
232+
233+
private final String requestUniqueId;
234+
235+
// TOTAL_REQUEST has been marked as finished for this request. In this state `finalized` will
236+
// be true and totalTime will have non zero value.
237+
private long totalTime;
238+
private boolean finalized;
239+
240+
IndividualRequestProfiler(String requestUniqueId) {
241+
this.timeRecorderMap = new ConcurrentHashMap<>();
242+
this.finishedOperations = Collections.synchronizedList(new ArrayList<IndividualOperation>());
243+
this.requestUniqueId = requestUniqueId;
244+
}
245+
246+
void startOperation(OperationName operationName) {
247+
timeRecorderMap.putIfAbsent(operationName, new ConcurrentLinkedDeque<>());
248+
// Please be aware that System.currentTimeMillis() is not accurate in Windows system.
249+
timeRecorderMap.get(operationName).add(System.currentTimeMillis());
250+
}
251+
252+
void endOperation(OperationName operationName) {
253+
if (!timeRecorderMap.containsKey(operationName)) {
254+
String warningMessage =
255+
String.format(
256+
"Operation %s ignored for request %s due to "
257+
+ "startOperation() is not called before calling endOperation().",
258+
operationName, requestUniqueId);
259+
log.warning(warningMessage);
260+
return;
261+
}
262+
long startTime = timeRecorderMap.get(operationName).poll();
263+
long endTime = System.currentTimeMillis();
264+
long totalTime = endTime - startTime;
265+
finishedOperations.add(new IndividualOperation(operationName, startTime, endTime, totalTime));
266+
if (operationName == OperationName.TOTAL_LATENCY) {
267+
finalized = true;
268+
this.totalTime = totalTime;
269+
}
270+
}
271+
272+
String generateReport() {
273+
String message =
274+
"\tRequest uuid: "
275+
+ requestUniqueId
276+
+ " with total time "
277+
+ this.totalTime
278+
+ " milliseconds\n";
279+
for (int i = 0; i < finishedOperations.size(); i++) {
280+
if (finishedOperations.get(i).operationName == OperationName.TOTAL_LATENCY) {
281+
continue;
282+
}
283+
message += "\t\t";
284+
message += finishedOperations.get(i).format();
285+
message += "\n";
286+
}
287+
return message;
288+
}
289+
290+
// Record the stats of individual operation.
291+
private static final class IndividualOperation {
292+
OperationName operationName;
293+
294+
// Runtime stats for individual operation.
295+
long totalTime;
296+
long startTimestamp;
297+
long endTimestamp;
298+
299+
IndividualOperation(
300+
OperationName operationName, long startTimestamp, long endTimestamp, long totalTime) {
301+
this.operationName = operationName;
302+
this.startTimestamp = startTimestamp;
303+
this.endTimestamp = endTimestamp;
304+
this.totalTime = totalTime;
305+
}
306+
307+
String format() {
308+
return String.format(
309+
"Operation name %s starts at: %s, ends at: " + "%s, total time: %s milliseconds",
310+
operationName.operationName, startTimestamp, endTimestamp, totalTime);
311+
}
312+
}
313+
}
314+
315+
// Sets how many top latency requests to log during every reportss period.
316+
public static void setTopKRequestsToLog(int topK) {
317+
TOP_K = topK;
318+
}
319+
320+
// Sets the report period of the profiler.
321+
public static void setReportPeriod(Duration flushPeriod) {
322+
FLUSH_PERIOD = flushPeriod;
323+
}
324+
}

0 commit comments

Comments
 (0)