Skip to content

Commit 0ce3a11

Browse files
metrics: stabilize worker_park_count and worker_unpark_count (#7276)
1 parent 1ea9ce1 commit 0ce3a11

File tree

5 files changed

+171
-174
lines changed

5 files changed

+171
-174
lines changed

tokio/src/runtime/metrics/batch.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,9 @@ pub(crate) struct MetricsBatch {
1414
/// Instant at which work last resumed (continued after park).
1515
processing_scheduled_tasks_started_at: Instant,
1616

17-
#[cfg(tokio_unstable)]
1817
/// Number of times the worker parked.
1918
park_count: u64,
2019

21-
#[cfg(tokio_unstable)]
2220
/// Number of times the worker parked and unparked.
2321
park_unpark_count: u64,
2422

@@ -80,6 +78,8 @@ impl MetricsBatch {
8078
MetricsBatch {
8179
busy_duration_total: 0,
8280
processing_scheduled_tasks_started_at: now,
81+
park_count: 0,
82+
park_unpark_count: 0,
8383
}
8484
}
8585
},
@@ -120,7 +120,12 @@ impl MetricsBatch {
120120
cfg_metrics_variant! {
121121
stable: {
122122
#[inline(always)]
123-
fn submit_unstable(&mut self, _worker: &WorkerMetrics, _mean_poll_time: u64) {}
123+
fn submit_unstable(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
124+
worker.park_count.store(self.park_count, Relaxed);
125+
worker
126+
.park_unpark_count
127+
.store(self.park_unpark_count, Relaxed);
128+
}
124129
},
125130
unstable: {
126131
#[inline(always)]
@@ -153,7 +158,10 @@ impl MetricsBatch {
153158
cfg_metrics_variant! {
154159
stable: {
155160
/// The worker is about to park.
156-
pub(crate) fn about_to_park(&mut self) {}
161+
pub(crate) fn about_to_park(&mut self) {
162+
self.park_count += 1;
163+
self.park_unpark_count += 1;
164+
}
157165
},
158166
unstable: {
159167
/// The worker is about to park.
@@ -171,18 +179,9 @@ impl MetricsBatch {
171179
}
172180
}
173181
}
174-
175-
cfg_metrics_variant! {
176-
stable: {
177-
/// The worker was unparked.
178-
pub(crate) fn unparked(&mut self) {}
179-
},
180-
unstable: {
181-
/// The worker was unparked.
182-
pub(crate) fn unparked(&mut self) {
183-
self.park_unpark_count += 1;
184-
}
185-
}
182+
/// The worker was unparked.
183+
pub(crate) fn unparked(&mut self) {
184+
self.park_unpark_count += 1;
186185
}
187186

188187
/// Start processing a batch of tasks

tokio/src/runtime/metrics/runtime.rs

Lines changed: 97 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,103 @@ impl RuntimeMetrics {
143143
.load(Relaxed);
144144
Duration::from_nanos(nanos)
145145
}
146+
147+
/// Returns the total number of times the given worker thread has parked.
148+
///
149+
/// The worker park count starts at zero when the runtime is created and
150+
/// increases by one each time the worker parks the thread waiting for new
151+
/// inbound events to process. This usually means the worker has processed
152+
/// all pending work and is currently idle.
153+
///
154+
/// The counter is monotonically increasing. It is never decremented or
155+
/// reset to zero.
156+
///
157+
/// # Arguments
158+
///
159+
/// `worker` is the index of the worker being queried. The given value must
160+
/// be between 0 and `num_workers()`. The index uniquely identifies a single
161+
/// worker and will continue to identify the worker throughout the lifetime
162+
/// of the runtime instance.
163+
///
164+
/// # Panics
165+
///
166+
/// The method panics when `worker` represents an invalid worker, i.e. is
167+
/// greater than or equal to `num_workers()`.
168+
///
169+
/// # Examples
170+
///
171+
/// ```
172+
/// use tokio::runtime::Handle;
173+
///
174+
/// #[tokio::main]
175+
/// async fn main() {
176+
/// let metrics = Handle::current().metrics();
177+
///
178+
/// let n = metrics.worker_park_count(0);
179+
/// println!("worker 0 parked {} times", n);
180+
/// }
181+
/// ```
182+
pub fn worker_park_count(&self, worker: usize) -> u64 {
183+
self.handle
184+
.inner
185+
.worker_metrics(worker)
186+
.park_count
187+
.load(Relaxed)
188+
}
189+
190+
/// Returns the total number of times the given worker thread has parked
191+
/// and unparked.
192+
///
193+
/// The worker park/unpark count starts at zero when the runtime is created
194+
/// and increases by one each time the worker parks the thread waiting for
195+
/// new inbound events to process. This usually means the worker has processed
196+
/// all pending work and is currently idle. When new work becomes available,
197+
/// the worker is unparked and the park/unpark count is again increased by one.
198+
///
199+
/// An odd count means that the worker is currently parked.
200+
/// An even count means that the worker is currently active.
201+
///
202+
/// The counter is monotonically increasing. It is never decremented or
203+
/// reset to zero.
204+
///
205+
/// # Arguments
206+
///
207+
/// `worker` is the index of the worker being queried. The given value must
208+
/// be between 0 and `num_workers()`. The index uniquely identifies a single
209+
/// worker and will continue to identify the worker throughout the lifetime
210+
/// of the runtime instance.
211+
///
212+
/// # Panics
213+
///
214+
/// The method panics when `worker` represents an invalid worker, i.e. is
215+
/// greater than or equal to `num_workers()`.
216+
///
217+
/// # Examples
218+
///
219+
/// ```
220+
/// use tokio::runtime::Handle;
221+
///
222+
/// #[tokio::main]
223+
/// async fn main() {
224+
/// let metrics = Handle::current().metrics();
225+
/// let n = metrics.worker_park_unpark_count(0);
226+
///
227+
/// println!("worker 0 parked and unparked {} times", n);
228+
///
229+
/// if n % 2 == 0 {
230+
/// println!("worker 0 is active");
231+
/// } else {
232+
/// println!("worker 0 is parked");
233+
/// }
234+
/// }
235+
/// ```
236+
pub fn worker_park_unpark_count(&self, worker: usize) -> u64 {
237+
self.handle
238+
.inner
239+
.worker_metrics(worker)
240+
.park_unpark_count
241+
.load(Relaxed)
242+
}
146243
}
147244

148245
cfg_unstable_metrics! {
@@ -318,104 +415,6 @@ impl RuntimeMetrics {
318415
.load(Relaxed)
319416
}
320417

321-
/// Returns the total number of times the given worker thread has parked.
322-
///
323-
/// The worker park count starts at zero when the runtime is created and
324-
/// increases by one each time the worker parks the thread waiting for new
325-
/// inbound events to process. This usually means the worker has processed
326-
/// all pending work and is currently idle.
327-
///
328-
/// The counter is monotonically increasing. It is never decremented or
329-
/// reset to zero.
330-
///
331-
/// # Arguments
332-
///
333-
/// `worker` is the index of the worker being queried. The given value must
334-
/// be between 0 and `num_workers()`. The index uniquely identifies a single
335-
/// worker and will continue to identify the worker throughout the lifetime
336-
/// of the runtime instance.
337-
///
338-
/// # Panics
339-
///
340-
/// The method panics when `worker` represents an invalid worker, i.e. is
341-
/// greater than or equal to `num_workers()`.
342-
///
343-
/// # Examples
344-
///
345-
/// ```
346-
/// use tokio::runtime::Handle;
347-
///
348-
/// #[tokio::main]
349-
/// async fn main() {
350-
/// let metrics = Handle::current().metrics();
351-
///
352-
/// let n = metrics.worker_park_count(0);
353-
/// println!("worker 0 parked {} times", n);
354-
/// }
355-
/// ```
356-
pub fn worker_park_count(&self, worker: usize) -> u64 {
357-
self.handle
358-
.inner
359-
.worker_metrics(worker)
360-
.park_count
361-
.load(Relaxed)
362-
}
363-
364-
/// Returns the total number of times the given worker thread has parked
365-
/// and unparked.
366-
///
367-
/// The worker park/unpark count starts at zero when the runtime is created
368-
/// and increases by one each time the worker parks the thread waiting for
369-
/// new inbound events to process. This usually means the worker has processed
370-
/// all pending work and is currently idle. When new work becomes available,
371-
/// the worker is unparked and the park/unpark count is again increased by one.
372-
///
373-
/// An odd count means that the worker is currently parked.
374-
/// An even count means that the worker is currently active.
375-
///
376-
/// The counter is monotonically increasing. It is never decremented or
377-
/// reset to zero.
378-
///
379-
/// # Arguments
380-
///
381-
/// `worker` is the index of the worker being queried. The given value must
382-
/// be between 0 and `num_workers()`. The index uniquely identifies a single
383-
/// worker and will continue to identify the worker throughout the lifetime
384-
/// of the runtime instance.
385-
///
386-
/// # Panics
387-
///
388-
/// The method panics when `worker` represents an invalid worker, i.e. is
389-
/// greater than or equal to `num_workers()`.
390-
///
391-
/// # Examples
392-
///
393-
/// ```
394-
/// use tokio::runtime::Handle;
395-
///
396-
/// #[tokio::main]
397-
/// async fn main() {
398-
/// let metrics = Handle::current().metrics();
399-
/// let n = metrics.worker_park_unpark_count(0);
400-
///
401-
/// println!("worker 0 parked and unparked {} times", n);
402-
///
403-
/// if n % 2 == 0 {
404-
/// println!("worker 0 is active");
405-
/// } else {
406-
/// println!("worker 0 is parked");
407-
/// }
408-
/// }
409-
/// ```
410-
pub fn worker_park_unpark_count(&self, worker: usize) -> u64 {
411-
self.handle
412-
.inner
413-
.worker_metrics(worker)
414-
.park_unpark_count
415-
.load(Relaxed)
416-
}
417-
418-
419418
/// Returns the number of times the given worker thread unparked but
420419
/// performed no work before parking again.
421420
///

tokio/src/runtime/metrics/worker.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@ pub(crate) struct WorkerMetrics {
2828
/// Thread id of worker thread.
2929
thread_id: Mutex<Option<ThreadId>>,
3030

31-
#[cfg(tokio_unstable)]
3231
/// Number of times the worker parked.
3332
pub(crate) park_count: MetricAtomicU64,
3433

35-
#[cfg(tokio_unstable)]
3634
/// Number of times the worker parked and unparked.
3735
pub(crate) park_unpark_count: MetricAtomicU64,
3836

tokio/tests/rt_metrics.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use std::sync::mpsc;
55
use std::time::Duration;
66
use tokio::runtime::Runtime;
7+
use tokio::time;
78

89
#[test]
910
fn num_workers() {
@@ -125,6 +126,64 @@ fn worker_total_busy_duration() {
125126
}
126127
}
127128

129+
#[test]
130+
fn worker_park_count() {
131+
let rt = current_thread();
132+
let metrics = rt.metrics();
133+
rt.block_on(async {
134+
time::sleep(Duration::from_millis(1)).await;
135+
});
136+
drop(rt);
137+
assert!(1 <= metrics.worker_park_count(0));
138+
139+
let rt = threaded();
140+
let metrics = rt.metrics();
141+
rt.block_on(async {
142+
time::sleep(Duration::from_millis(1)).await;
143+
});
144+
drop(rt);
145+
assert!(1 <= metrics.worker_park_count(0));
146+
assert!(1 <= metrics.worker_park_count(1));
147+
}
148+
149+
#[test]
150+
fn worker_park_unpark_count() {
151+
let rt = current_thread();
152+
let metrics = rt.metrics();
153+
rt.block_on(rt.spawn(async {})).unwrap();
154+
drop(rt);
155+
assert!(2 <= metrics.worker_park_unpark_count(0));
156+
157+
let rt = threaded();
158+
let metrics = rt.metrics();
159+
160+
// Wait for workers to be parked after runtime startup.
161+
for _ in 0..100 {
162+
if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) {
163+
break;
164+
}
165+
std::thread::sleep(std::time::Duration::from_millis(100));
166+
}
167+
assert_eq!(1, metrics.worker_park_unpark_count(0));
168+
assert_eq!(1, metrics.worker_park_unpark_count(1));
169+
170+
// Spawn a task to unpark and then park a worker.
171+
rt.block_on(rt.spawn(async {})).unwrap();
172+
for _ in 0..100 {
173+
if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) {
174+
break;
175+
}
176+
std::thread::sleep(std::time::Duration::from_millis(100));
177+
}
178+
assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1));
179+
180+
// Both threads unpark for runtime shutdown.
181+
drop(rt);
182+
assert_eq!(0, metrics.worker_park_unpark_count(0) % 2);
183+
assert_eq!(0, metrics.worker_park_unpark_count(1) % 2);
184+
assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1));
185+
}
186+
128187
fn try_block_threaded(rt: &Runtime) -> Result<Vec<mpsc::Sender<()>>, mpsc::RecvTimeoutError> {
129188
let (tx, rx) = mpsc::channel();
130189

0 commit comments

Comments
 (0)