@@ -12,38 +12,36 @@ namespace workerd {
12
12
13
13
using kj::uint ;
14
14
15
+ // A double-buffered batch queue which enforces an upper bound on buffer growth.
16
+ //
17
+ // Objects of this type have two buffers -- the push buffer and the pop buffer -- and support
18
+ // `push()` and `pop()` operations. `push()` adds elements to the push buffer. `pop()` swaps the
19
+ // push and the pop buffers and returns a RAII object which provides a view onto the pop buffer.
20
+ // When the RAII object is destroyed, it resets the size and capacity of the pop buffer.
21
+ //
22
+ // This class is useful when the cost of context switching between producers and consumers is
23
+ // high and/or when you must be able to gracefully handle bursts of pushes, such as when
24
+ // transferring objects between threads. Note that this class implements no cross-thread
25
+ // synchronization itself, but it can become an effective multiple-producer, single-consumer queue
26
+ // when wrapped as a `kj::MutexGuarded<BatchQueue<T>>`.
15
27
template <typename T>
16
28
class BatchQueue {
17
- // A double-buffered batch queue which enforces an upper bound on buffer growth.
18
- //
19
- // Objects of this type have two buffers -- the push buffer and the pop buffer -- and support
20
- // `push()` and `pop()` operations. `push()` adds elements to the push buffer. `pop()` swaps the
21
- // push and the pop buffers and returns a RAII object which provides a view onto the pop buffer.
22
- // When the RAII object is destroyed, it resets the size and capacity of the pop buffer.
23
- //
24
- // This class is useful when the cost of context switching between producers and consumers is
25
- // high and/or when you must be able to gracefully handle bursts of pushes, such as when
26
- // transferring objects between threads. Note that this class implements no cross-thread
27
- // synchronization itself, but it can become an effective multiple-producer, single-consumer queue
28
- // when wrapped as a `kj::MutexGuarded<BatchQueue<T>>`.
29
-
30
29
public:
30
+ // `initialCapacity` is the number of elements of type T for which we should allocate space in the
31
+ // initial buffers, and any reconstructed buffers. Buffers will be reconstructed if they are
32
+ // observed to grow beyond `maxCapacity` after a completed pop operation.
31
33
explicit BatchQueue (uint initialCapacity, uint maxCapacity)
32
34
: pushBuffer(initialCapacity),
33
35
popBuffer(initialCapacity),
34
36
initialCapacity(initialCapacity),
35
37
maxCapacity(maxCapacity) {}
36
- // `initialCapacity` is the number of elements of type T for which we should allocate space in the
37
- // initial buffers, and any reconstructed buffers. Buffers will be reconstructed if they are
38
- // observed to grow beyond `maxCapacity` after a completed pop operation.
39
38
39
+ // This is the return type of `pop()` (in fact, `pop()` is the only way to construct a non-empty
40
+ // Batch). Default-constructible, moveable, and non-copyable.
41
+ //
42
+ // A Batch can be converted to an ArrayPtr<T>. When a Batch is destroyed, it clears the pop
43
+ // buffer and resets the pop buffer capacity to `initialCapacity` if necessary.
40
44
class Batch {
41
- // This is the return type of `pop()` (in fact, `pop()` is the only way to construct a non-empty
42
- // Batch). Default-constructible, moveable, and non-copyable.
43
- //
44
- // A Batch can be converted to an ArrayPtr<T>. When a Batch is destroyed, it clears the pop
45
- // buffer and resets the pop buffer capacity to `initialCapacity` if necessary.
46
-
47
45
public:
48
46
Batch () = default ;
49
47
Batch (Batch&&) = default ;
@@ -67,18 +65,17 @@ class BatchQueue {
67
65
// It's a Maybe so we can support move operations.
68
66
};
69
67
68
+ // If a batch is available, swap the buffers and return a Batch object backed by the pop buffer.
69
+ // The caller should destroy the Batch object as soon as they are done with it. Destruction will
70
+ // clear the pop buffer and, if necessary, reconstruct it to stay under `maxCapacity`.
71
+ //
72
+ // Throws if `pop()` is called again before the previous Batch object was destroyed. Note
73
+ // that this exception is only reliable if the previous `pop()` returned a non-empty Batch.
74
+ //
75
+ // `pop()` accesses both buffers, so it must be synchronized with `push()` operations across
76
+ // threads. Batch objects and `push()` access different buffers, so they require no explicit
77
+ // cross-thread synchronization with each other.
70
78
Batch pop () {
71
- // If a batch is available, swap the buffers and return a Batch object backed by the pop buffer.
72
- // The caller should destroy the Batch object as soon as they are done with it. Destruction will
73
- // clear the pop buffer and, if necessary, reconstruct it to stay under `maxCapacity`.
74
- //
75
- // Throws if `pop()` is called again before the previous Batch object was destroyed. Note
76
- // that this exception is only reliable if the previous `pop()` returned a non-empty Batch.
77
- //
78
- // `pop()` accesses both buffers, so it must be synchronized with `push()` operations across
79
- // threads. Batch objects and `push()` access different buffers, so they require no explicit
80
- // cross-thread synchronization with each other.
81
-
82
79
KJ_REQUIRE (popBuffer.empty (), " pop()'s previous result not yet destroyed." );
83
80
84
81
Batch batch;
@@ -91,16 +88,14 @@ class BatchQueue {
91
88
return batch;
92
89
}
93
90
91
+ // Add an item to the current batch.
94
92
template <typename U>
95
93
void push (U&& value) {
96
- // Add an item to the current batch.
97
-
98
94
pushBuffer.add (kj::fwd<U>(value));
99
95
}
100
96
101
97
auto empty () const { return pushBuffer.empty (); }
102
98
auto size () const { return pushBuffer.size (); }
103
- // Push buffer metadata.
104
99
105
100
private:
106
101
kj::Vector<T> pushBuffer;
0 commit comments