@@ -59,8 +59,10 @@ public class GaugeManager {
59
59
private final TransportManager transportManager ;
60
60
61
61
@ Nullable private GaugeMetadataManager gaugeMetadataManager ;
62
- @ Nullable private ScheduledFuture gaugeManagerDataCollectionJob = null ;
62
+ @ Nullable private ScheduledFuture <?> gaugeManagerDataCollectionJob = null ;
63
63
@ Nullable private String sessionId = null ;
64
+
65
+ private boolean isCollectingGauges = false ;
64
66
private ApplicationProcessState applicationProcessState =
65
67
ApplicationProcessState .APPLICATION_PROCESS_STATE_UNKNOWN ;
66
68
@@ -103,6 +105,34 @@ public static synchronized GaugeManager getInstance() {
103
105
return instance ;
104
106
}
105
107
108
+ public void updateGaugeLogging (
109
+ String sessionId , ApplicationProcessState applicationProcessState , long collectionFrequency ) {
110
+ try {
111
+ gaugeManagerDataCollectionJob =
112
+ gaugeManagerExecutor
113
+ .get ()
114
+ .scheduleWithFixedDelay (
115
+ () -> {
116
+ syncFlush (sessionId , applicationProcessState );
117
+ },
118
+ /* initialDelay= */ collectionFrequency
119
+ * APPROX_NUMBER_OF_DATA_POINTS_PER_GAUGE_METRIC ,
120
+ /* period= */ collectionFrequency * APPROX_NUMBER_OF_DATA_POINTS_PER_GAUGE_METRIC ,
121
+ TimeUnit .MILLISECONDS );
122
+
123
+ } catch (RejectedExecutionException e ) {
124
+ logger .warn ("Unable to start collecting Gauges: " + e .getMessage ());
125
+ }
126
+ }
127
+
128
+ public long updateGaugeCollection (
129
+ ApplicationProcessState applicationProcessState , Timer gaugeCollectionTimer ) {
130
+ if (isCollectingGauges ) {
131
+ stopCollectingGauges ();
132
+ }
133
+ return startCollectingGauges (applicationProcessState , gaugeCollectionTimer );
134
+ }
135
+
106
136
/**
107
137
* Starts the collection of available gauges for the given {@code sessionId} and {@code
108
138
* applicationProcessState}. The collected Gauge Metrics will be flushed at regular intervals.
@@ -190,7 +220,7 @@ private long startCollectingGauges(ApplicationProcessState appState, Timer refer
190
220
* this.stopCollectingGauges()} should always be called from the same thread.
191
221
*/
192
222
public void stopCollectingGauges () {
193
- if (this .sessionId == null ) {
223
+ if (! this .isCollectingGauges ) {
194
224
return ;
195
225
}
196
226
@@ -219,6 +249,7 @@ public void stopCollectingGauges() {
219
249
TimeUnit .MILLISECONDS );
220
250
221
251
this .sessionId = null ;
252
+ this .isCollectingGauges = false ;
222
253
this .applicationProcessState = ApplicationProcessState .APPLICATION_PROCESS_STATE_UNKNOWN ;
223
254
}
224
255
@@ -253,16 +284,16 @@ private void syncFlush(String sessionId, ApplicationProcessState appState) {
253
284
/**
254
285
* Log the Gauge Metadata information to the transport.
255
286
*
256
- * @param aqsSessionId The {@link PerfSession#aqsSessionId ()} ()} to which the collected Gauge Metrics
287
+ * @param sessionId The {@link PerfSession#sessionId ()} ()} to which the collected Gauge Metrics
257
288
* should be associated with.
258
289
* @param appState The {@link ApplicationProcessState} for which these gauges are collected.
259
290
* @return true if GaugeMetadata was logged, false otherwise.
260
291
*/
261
- public boolean logGaugeMetadata (String aqsSessionId , ApplicationProcessState appState ) {
292
+ public boolean logGaugeMetadata (String sessionId , ApplicationProcessState appState ) {
262
293
if (gaugeMetadataManager != null ) {
263
294
GaugeMetric gaugeMetric =
264
295
GaugeMetric .newBuilder ()
265
- .setSessionId (aqsSessionId )
296
+ .setSessionId (sessionId )
266
297
.setGaugeMetadata (getGaugeMetadata ())
267
298
.build ();
268
299
transportManager .log (gaugeMetric , appState );
@@ -335,6 +366,22 @@ public void collectGaugeMetricOnce(Timer referenceTime) {
335
366
collectGaugeMetricOnce (cpuGaugeCollector .get (), memoryGaugeCollector .get (), referenceTime );
336
367
}
337
368
369
+ public void logExistingGaugeMetrics (
370
+ String sessionId , ApplicationProcessState applicationProcessState ) {
371
+ // Flush any data that was collected and associate it with the given session ID and
372
+ // applicationProcessState.
373
+ @ SuppressWarnings ("FutureReturnValueIgnored" )
374
+ ScheduledFuture <?> unusedFuture =
375
+ gaugeManagerExecutor
376
+ .get ()
377
+ .schedule (
378
+ () -> {
379
+ syncFlush (sessionId , applicationProcessState );
380
+ },
381
+ TIME_TO_WAIT_BEFORE_FLUSHING_GAUGES_QUEUE_MS ,
382
+ TimeUnit .MILLISECONDS );
383
+ }
384
+
338
385
private static void collectGaugeMetricOnce (
339
386
CpuGaugeCollector cpuGaugeCollector ,
340
387
MemoryGaugeCollector memoryGaugeCollector ,
@@ -405,4 +452,10 @@ private long getMemoryGaugeCollectionFrequencyMs(
405
452
return memoryGaugeCollectionFrequency ;
406
453
}
407
454
}
455
+
456
+ private long getGaugeLoggingFrequency (ApplicationProcessState applicationProcessState ) {
457
+ return Math .min (
458
+ getMemoryGaugeCollectionFrequencyMs (applicationProcessState ),
459
+ getCpuGaugeCollectionFrequencyMs (applicationProcessState ));
460
+ }
408
461
}
0 commit comments