Skip to content

Commit 3e0386a

Browse files
committed
Implement DeferredProxy coroutine as a PromiseNode
This allows us to avoid all extra allocations.
1 parent aa63a8a commit 3e0386a

File tree

1 file changed

+89
-36
lines changed

1 file changed

+89
-36
lines changed

src/workerd/api/deferred-proxy.h

Lines changed: 89 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,17 @@ constexpr BeginDeferredProxyingConstant BEGIN_DEFERRED_PROXYING {};
104104
// A magic constant which a DeferredProxyPromise<T> coroutine can `KJ_CO_MAGIC` to indicate that the
105105
// deferred proxying phase of its operation has begun.
106106

107-
template <typename T>
108-
class DeferredProxyCoroutine: public kj::_::CoroutineMixin<DeferredProxyCoroutine<T>, T> {
107+
template <typename T, typename C>
108+
concept CoroutineYieldValue = requires (T&& v, C coroutineAdapter) {
109+
// A concept which is true if C is a coroutine adapter which supports the `co_yield` operator for
110+
// type T. We could also check that the expression results in an awaitable, but that is already a
111+
// compile error in other ways.
112+
{ coroutineAdapter.yield_value(kj::fwd<T>(v)) };
113+
};
114+
115+
template <typename T, typename... Args>
116+
class DeferredProxyCoroutine: public kj::_::PromiseNode,
117+
public kj::_::CoroutineMixin<DeferredProxyCoroutine<T, Args...>, T> {
109118
// The coroutine adapter type for DeferredProxyPromise<T>. Most of the work is forwarded to the
110119
// regular kj::Promise<T> coroutine adapter.
111120

@@ -122,46 +131,36 @@ class DeferredProxyCoroutine: public kj::_::CoroutineMixin<DeferredProxyCoroutin
122131
// We need to return a RAII object which will destroy this (as in, `this`) coroutine adapter.
123132
// The logic which calls `coroutine_handle<>::destroy()` is tucked away in our inner coroutine
124133
// adapter, however, leading to the weird situation where the `inner.get_return_object()`
125-
// Promise owns `this`. So, returning a Promise with `inner.get_return_object()` in a `.then()`
126-
// gives transitive ownership of the coroutine to the caller.
134+
// Promise owns `this`. And `this` owns `inner.get_return_object()` transitively via `result`!
127135
//
128-
// Note that we must use a `.then()` instead of a coroutine to implement this, because trying to
129-
// write a coroutine returning `kj::Promise<DeferredProxy<T>>` will just recurse into this
130-
// coroutine adapter class.
131-
//
132-
// TODO(perf): We could avoid the `newPromiseAndFulfiller()` and `.then()` overhead by having
133-
// DeferredProxyCoroutine<T> implement PromiseNode. `this` could own `proxyTask` temporarily,
134-
// forwarding `PromiseNode::destroy()` calls to `proxyTask`'s PromiseNode (if it still
135-
// exists), which takes care of the ownership reference cycle. `yield_value()`, `fulfill()`,
136-
// and `unhandled_exception()`could arm our OnReadyEvent, and our `get()` implementation would
137-
// return `proxyTask`, or a stored exception.
138-
auto proxyTask = inner.get_return_object();
139-
return beginDeferredProxying.promise.then([proxyTask=kj::mv(proxyTask)]() mutable {
140-
return DeferredProxy<T> { kj::mv(proxyTask) };
141-
});
136+
// Fortunately, DeferredProxyCoroutine implements the PromiseNode interface, meaning when our
137+
// returned Promise is eventually dropped, our `PromiseNode::destroy()` implementation will be
138+
// called. This gives us the opportunity (that is, in `destroy()`) to destroy our
139+
// `inner.get_return_object()` Promise, breaking the ownership cycle and destroying `this`.
140+
141+
result.value = DeferredProxy<T> { inner.get_return_object() };
142+
return kj::_::PromiseNode::to<kj::Promise<DeferredProxy<T>>>(kj::_::OwnPromiseNode(this));
142143
}
143144

144145
auto initial_suspend() { return inner.initial_suspend(); }
145146
auto final_suspend() noexcept { return inner.final_suspend(); }
146147
// Just trivially forward these.
147148

148149
void unhandled_exception() {
149-
// If the outer promise hasn't yet been fulfilled, it needs to be rejected now.
150-
if (beginDeferredProxying.fulfiller->isWaiting()) {
151-
beginDeferredProxying.fulfiller->reject(kj::getCaughtExceptionAsKj());
152-
}
150+
// Reject our outer promise if it hasn't yet been fulfilled, then forward to the inner
151+
// implementation.
153152

153+
rejectOuterPromise();
154154
inner.unhandled_exception();
155155
}
156156

157157
kj::_::stdcoro::suspend_never yield_value(decltype(BEGIN_DEFERRED_PROXYING)) {
158158
// This allows us to write `KJ_CO_MAGIC` within a DeferredProxyPromise<T> coroutine to fulfill
159159
// the coroutine's outer promise with a DeferredProxy<T>.
160-
// This could alternatively be an await_transform() with a magic parameter type.
161-
if (beginDeferredProxying.fulfiller->isWaiting()) {
162-
beginDeferredProxying.fulfiller->fulfill();
163-
}
160+
//
161+
// This could alternatively be implemented as an await_transform() with a magic parameter type.
164162

163+
fulfillOuterPromise();
165164
return {};
166165
}
167166

@@ -175,11 +174,7 @@ class DeferredProxyCoroutine: public kj::_::CoroutineMixin<DeferredProxyCoroutin
175174
void fulfill(kj::_::FixVoid<T>&& value) {
176175
// Required by CoroutineMixin implementation to implement `co_return`.
177176

178-
// Fulfill the outer promise if it hasn't already been fulfilled.
179-
if (beginDeferredProxying.fulfiller->isWaiting()) {
180-
beginDeferredProxying.fulfiller->fulfill();
181-
}
182-
177+
fulfillOuterPromise();
183178
inner.fulfill(kj::mv(value));
184179
}
185180

@@ -193,13 +188,71 @@ class DeferredProxyCoroutine: public kj::_::CoroutineMixin<DeferredProxyCoroutin
193188
// Required by Awaiter<T>::await_suspend() to support awaiting Promises.
194189

195190
private:
196-
typename kj::_::stdcoro::coroutine_traits<kj::Promise<T>>::promise_type inner;
191+
void fulfillOuterPromise() {
192+
// Fulfill the outer promise if it hasn't already settled.
193+
194+
if (!deferredProxyingHasBegun) {
195+
// Our `result` is put in place already by `get_return_object()`, so all we have to do is arm
196+
// the event.
197+
onReadyEvent.arm();
198+
deferredProxyingHasBegun = true;
199+
}
200+
}
201+
202+
void rejectOuterPromise() {
203+
// Reject the outer promise if it hasn't already settled.
204+
205+
if (!deferredProxyingHasBegun) {
206+
result.addException(kj::getCaughtExceptionAsKj());
207+
onReadyEvent.arm();
208+
deferredProxyingHasBegun = true;
209+
}
210+
}
211+
212+
// PromiseNode implementation
213+
214+
void destroy() override {
215+
// The promise returned by `inner.get_return_object()` is what actually owns this coroutine
216+
// frame. We temporarily store that in `result` until our outer promise is fulfilled. So, to
217+
// destroy ourselves, we must manually drop `result`.
218+
//
219+
// On the other hand, if our outer promise has already been fulfilled, and `result` delivered to
220+
// wherever it is going, then someone else directly owns the coroutine now, not us, and it's
221+
// okay for this drop to be a no-op.
222+
223+
auto drop = kj::mv(result);
224+
}
225+
226+
void onReady(kj::_::Event* event) noexcept override {
227+
onReadyEvent.init(event);
228+
}
229+
230+
void get(kj::_::ExceptionOrValue& output) noexcept override {
231+
static_cast<decltype(result)&>(output) = kj::mv(result);
232+
}
233+
234+
void tracePromise(kj::_::TraceBuilder& builder, bool stopAtNextEvent) override {
235+
// The PromiseNode we're waiting on is whatever the coroutine is waiting on.
236+
static_cast<kj::_::PromiseNode&>(inner).tracePromise(builder, stopAtNextEvent);
237+
238+
// Maybe returning the address of get() will give us a function name with meaningful type
239+
// information.
240+
builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get));
241+
}
242+
243+
InnerCoroutineAdapter inner;
197244
// We defer the majority of the implementation to the regular kj::Promise<T> coroutine adapter.
198245

199-
kj::PromiseFulfillerPair<void> beginDeferredProxying = kj::newPromiseAndFulfiller<void>();
200-
// Our `get_return_object()` function returns a kj::Promise<DeferredProxy<T>>, waits on this
201-
// `beginDeferredProxying.promise`, then fulfills its Promise with the result of
202-
// `inner.get_return_object()`.
246+
OnReadyEvent onReadyEvent;
247+
// Helper to arm the event which fires when the outer promise (that is, `this` PromiseNode) for
248+
// the DeferredProxy<T> is ready.
249+
250+
kj::_::ExceptionOr<DeferredProxy<T>> result;
251+
// Stores the result for the outer promise.
252+
253+
bool deferredProxyingHasBegun = false;
254+
// Set to true when deferred proxying has begun -- that is, when the outer DeferredProxy<T>
255+
// promise is fulfilled by calling `onReadyEvent.arm()`.
203256
};
204257

205258
} // namespace workerd::api

0 commit comments

Comments
 (0)