Skip to content

Commit 026f9a3

Browse files
committed
Implement KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING
This commit adds a new coroutine return type, DeferredProxyPromise<T>, which is derived from kj::Promise<DeferredProxy<T>>, and allows us to `KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING` from inside the coroutine to fulfill the outer promise early.
1 parent ccbbb36 commit 026f9a3

File tree

2 files changed

+292
-0
lines changed

2 files changed

+292
-0
lines changed
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
#include "util.h"
2+
#include <kj/test.h>
3+
4+
namespace workerd::api {
5+
namespace {
6+
7+
KJ_TEST("DeferredProxyPromise<T>: early co_return implicitly fulfills outer promise") {
8+
kj::EventLoop loop;
9+
kj::WaitScope waitScope(loop);
10+
11+
{
12+
// Implicit void co_return.
13+
auto coro = []() -> DeferredProxyPromise<void> {
14+
co_await kj::Promise<void>(kj::READY_NOW);
15+
};
16+
auto promise = coro();
17+
KJ_EXPECT(promise.poll(waitScope));
18+
auto proxyTask = promise.wait(waitScope).proxyTask;
19+
KJ_EXPECT(proxyTask.poll(waitScope));
20+
proxyTask.wait(waitScope);
21+
}
22+
{
23+
// Explicit void co_return.
24+
auto coro = []() -> DeferredProxyPromise<void> {
25+
co_return;
26+
};
27+
auto promise = coro();
28+
KJ_EXPECT(promise.poll(waitScope));
29+
auto proxyTask = promise.wait(waitScope).proxyTask;
30+
KJ_EXPECT(proxyTask.poll(waitScope));
31+
proxyTask.wait(waitScope);
32+
}
33+
{
34+
// Valueful co_return.
35+
auto coro = []() -> DeferredProxyPromise<int> {
36+
co_return 123;
37+
};
38+
auto promise = coro();
39+
KJ_EXPECT(promise.poll(waitScope));
40+
auto proxyTask = promise.wait(waitScope).proxyTask;
41+
KJ_EXPECT(proxyTask.poll(waitScope));
42+
KJ_EXPECT(proxyTask.wait(waitScope) == 123);
43+
}
44+
}
45+
46+
KJ_TEST("DeferredProxyPromise<T>: `co_yield BEGIN_DEFERRED_PROXYING` fulfills outer promise") {
47+
kj::EventLoop loop;
48+
kj::WaitScope waitScope(loop);
49+
50+
auto paf1 = kj::newPromiseAndFulfiller<void>();
51+
auto paf2 = kj::newPromiseAndFulfiller<int>();
52+
53+
auto coro = [&]() -> DeferredProxyPromise<int> {
54+
co_await paf1.promise;
55+
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
56+
co_return co_await paf2.promise;
57+
};
58+
59+
auto promise = coro();
60+
61+
// paf1 unfulfilled, so we don't have a DeferredProxy<T> yet.
62+
KJ_EXPECT(!promise.poll(waitScope));
63+
64+
paf1.fulfiller->fulfill();
65+
66+
KJ_EXPECT(promise.poll(waitScope));
67+
auto proxyTask = promise.wait(waitScope).proxyTask;
68+
69+
// paf2 unfulfilled, so we don't have a T yet.
70+
KJ_EXPECT(!proxyTask.poll(waitScope));
71+
72+
paf2.fulfiller->fulfill(123);
73+
74+
KJ_EXPECT(proxyTask.poll(waitScope));
75+
KJ_EXPECT(proxyTask.wait(waitScope) == 123);
76+
}
77+
78+
KJ_TEST("DeferredProxyPromise<T>: unhandled exception before `co_yield BEGIN_DEFERRED_PROXYING`") {
79+
kj::EventLoop loop;
80+
kj::WaitScope waitScope(loop);
81+
82+
auto paf = kj::newPromiseAndFulfiller<void>();
83+
84+
auto coro = [&]() -> DeferredProxyPromise<int> {
85+
co_await paf.promise;
86+
KJ_FAIL_ASSERT("promise should have been rejected");
87+
};
88+
89+
auto promise = coro();
90+
91+
// paf unfulfilled, so we don't have a DeferredProxy<T> yet.
92+
KJ_EXPECT(!promise.poll(waitScope));
93+
94+
paf.fulfiller->reject(KJ_EXCEPTION(FAILED, "test error"));
95+
96+
KJ_EXPECT(promise.poll(waitScope));
97+
KJ_EXPECT_THROW_MESSAGE("test error", promise.wait(waitScope));
98+
}
99+
100+
KJ_TEST("DeferredProxyPromise<T>: unhandled exception after `co_yield BEGIN_DEFERRED_PROXYING`") {
101+
kj::EventLoop loop;
102+
kj::WaitScope waitScope(loop);
103+
104+
auto paf1 = kj::newPromiseAndFulfiller<void>();
105+
auto paf2 = kj::newPromiseAndFulfiller<int>();
106+
107+
auto coro = [&]() -> DeferredProxyPromise<int> {
108+
co_await paf1.promise;
109+
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
110+
co_return co_await paf2.promise;
111+
};
112+
113+
auto promise = coro();
114+
115+
// paf1 unfulfilled, so we don't have a DeferredProxy<T> yet.
116+
KJ_EXPECT(!promise.poll(waitScope));
117+
118+
paf1.fulfiller->fulfill();
119+
120+
KJ_EXPECT(promise.poll(waitScope));
121+
auto proxyTask = promise.wait(waitScope).proxyTask;
122+
123+
// paf2 unfulfilled, so we don't have a T yet.
124+
KJ_EXPECT(!proxyTask.poll(waitScope));
125+
126+
paf2.fulfiller->reject(KJ_EXCEPTION(FAILED, "test error"));
127+
128+
KJ_EXPECT(proxyTask.poll(waitScope));
129+
KJ_EXPECT_THROW_MESSAGE("test error", proxyTask.wait(waitScope));
130+
}
131+
132+
KJ_TEST("DeferredProxyPromise<T>: can be `co_await`ed from another coroutine") {
133+
kj::EventLoop loop;
134+
kj::WaitScope waitScope(loop);
135+
136+
auto paf1 = kj::newPromiseAndFulfiller<void>();
137+
auto paf2 = kj::newPromiseAndFulfiller<int>();
138+
139+
auto nestedCoro = [&]() -> DeferredProxyPromise<int> {
140+
co_await paf1.promise;
141+
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
142+
co_return co_await paf2.promise;
143+
};
144+
145+
auto coro = [&]() -> DeferredProxyPromise<int> {
146+
auto deferred = co_await nestedCoro();
147+
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
148+
co_return co_await deferred.proxyTask;
149+
};
150+
151+
auto promise = coro();
152+
153+
// paf1 unfulfilled, so we don't have a DeferredProxy<T> yet.
154+
KJ_EXPECT(!promise.poll(waitScope));
155+
156+
paf1.fulfiller->fulfill();
157+
158+
KJ_EXPECT(promise.poll(waitScope));
159+
auto proxyTask = promise.wait(waitScope).proxyTask;
160+
161+
// paf2 unfulfilled, so we don't have a T yet.
162+
KJ_EXPECT(!proxyTask.poll(waitScope));
163+
164+
paf2.fulfiller->fulfill(123);
165+
166+
KJ_EXPECT(proxyTask.poll(waitScope));
167+
KJ_EXPECT(proxyTask.wait(waitScope) == 123);
168+
}
169+
170+
} // namespace
171+
} // namespace workerd::api

src/workerd/api/util.h

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,127 @@ inline kj::Promise<DeferredProxy<void>> addNoopDeferredProxy(kj::Promise<void> p
152152
co_return newNoopDeferredProxy();
153153
}
154154

155+
// ---------------------------------------------------------
156+
// Deferred proxy coroutine integration
157+
158+
class BeginDeferredProxyingConstant final {};
159+
constexpr BeginDeferredProxyingConstant BEGIN_DEFERRED_PROXYING {};
160+
// A magic constant which a DeferredProxyPromise<T> coroutine can `co_yield` to indicate that the
161+
// deferred proxying phase of its operation has begun.
162+
163+
template <typename T>
164+
class DeferredProxyPromise: public kj::Promise<DeferredProxy<T>> {
165+
// A "strong typedef" for a kj::Promise<DeferredProxy<T>>. DeferredProxyPromise<T> is intended to
166+
// be used as the return type for coroutines, in which case the coroutine implementation gains the
167+
// following features:
168+
//
169+
// - `co_yield BEGIN_DEFERRED_PROXYING` fulfills the outer kj::Promise<DeferredProxy<T>>. The
170+
// resulting DeferredProxy<T> object contains a `proxyTask` Promise which owns the coroutine.
171+
//
172+
// - `co_return` implicitly fulfills the outer Promise for the DeferredProxy<T> (if it has not
173+
// already been fulfilled by the magic `co_yield` described above), then fulfills the inner
174+
// `proxyTask`.
175+
//
176+
// - Unhandled exceptions reject the outer kj::Promise<DeferredProxy<T>> (if it has not already
177+
// been fulfilled by the magic `co_yield` described above), then reject the inner `proxyTask`.
178+
179+
public:
180+
DeferredProxyPromise(kj::Promise<DeferredProxy<T>> promise)
181+
: kj::Promise<DeferredProxy<T>>(kj::mv(promise)) {}
182+
// Allow conversion from a regular Promise. This allows our `promise_type::get_return_object()`
183+
// implementation to be implemented as a regular Promise-returning coroutine.
184+
185+
class Coroutine;
186+
using promise_type = Coroutine;
187+
// The coroutine adapter class, required for the compiler to know how to create coroutines
188+
// returning DeferredProxyPromise<T>. Since the whole point of DeferredProxyPromise<T> is to serve
189+
// as a coroutine return type, there's not really any point hiding promise_type inside of a
190+
// coroutine_traits specialization, like kj::Promise<T> does.
191+
};
192+
193+
template <typename T>
194+
class DeferredProxyPromise<T>::Coroutine:
195+
public kj::_::CoroutineMixin<DeferredProxyPromise<T>::Coroutine, T> {
196+
// The coroutine adapter type for DeferredProxyPromise<T>. Most of the work is forwarded to the
197+
// regular kj::Promise<T> coroutine adapter.
198+
199+
public:
200+
using Handle = kj::_::stdcoro::coroutine_handle<Coroutine>;
201+
202+
Coroutine(kj::SourceLocation location = {}): inner(Handle::from_promise(*this), location) {}
203+
204+
kj::Promise<DeferredProxy<T>> get_return_object() {
205+
// We need to return a RAII object which will destroy this (as in, `this`) coroutine adapter.
206+
// The logic which calls `coroutine_handle<>::destroy()` is tucked away in our inner coroutine
207+
// adapter, however, leading to the weird situation where the `inner.get_return_object()`
208+
// Promise owns `this`. Thus, we cannot store the inner promise in our own coroutine adapter
209+
// class, because that would cause a reference cycle. Fortunately, we can implement our own
210+
// `get_return_object()` as a regular Promise-returning coroutine and keep the inner Promise in
211+
// our coroutine frame, giving the caller transitive ownership of the DeferredProxyPromise<T>
212+
// coroutine by way of the kj::Promise<DeferredProxy<T>> coroutine. Later on, when the outer
213+
// Promise is fulfilled, the caller will gain direct ownership of the DeferredProxyPromise<T>
214+
// coroutine via the `proxyTask` promise.
215+
auto proxyTask = inner.get_return_object();
216+
co_await beginDeferredProxying.promise;
217+
co_return DeferredProxy<T> { kj::mv(proxyTask) };
218+
}
219+
220+
auto initial_suspend() { return inner.initial_suspend(); }
221+
auto final_suspend() noexcept { return inner.final_suspend(); }
222+
// Just trivially forward these.
223+
224+
void unhandled_exception() {
225+
// If the outer promise hasn't yet been fulfilled, it needs to be rejected now.
226+
if (beginDeferredProxying.fulfiller->isWaiting()) {
227+
beginDeferredProxying.fulfiller->reject(kj::getCaughtExceptionAsKj());
228+
}
229+
230+
inner.unhandled_exception();
231+
}
232+
233+
kj::_::stdcoro::suspend_never yield_value(decltype(BEGIN_DEFERRED_PROXYING)) {
234+
// This allows us to write `co_yield` within a DeferredProxyPromise<T> coroutine to fulfill
235+
// the coroutine's outer promise with a DeferredProxy<T>.
236+
// This could alternatively be an await_transform() with a magic parameter type.
237+
if (beginDeferredProxying.fulfiller->isWaiting()) {
238+
beginDeferredProxying.fulfiller->fulfill();
239+
}
240+
241+
return {};
242+
}
243+
244+
void fulfill(kj::_::FixVoid<T>&& value) {
245+
// Required by CoroutineMixin implementation to implement `co_return`.
246+
247+
// Fulfill the outer promise if it hasn't already been fulfilled.
248+
if (beginDeferredProxying.fulfiller->isWaiting()) {
249+
beginDeferredProxying.fulfiller->fulfill();
250+
}
251+
252+
inner.fulfill(kj::mv(value));
253+
}
254+
255+
template <typename U>
256+
auto await_transform(U&& awaitable) {
257+
// Trivially forward everything, so we can await anything a kj::Promise<T> can.
258+
return inner.await_transform(kj::fwd<U>(awaitable));
259+
}
260+
261+
operator kj::_::CoroutineBase&() { return inner; }
262+
// Required by Awaiter<T>::await_suspend() to support awaiting Promises.
263+
264+
private:
265+
typename kj::_::stdcoro::coroutine_traits<kj::Promise<T>>::promise_type inner;
266+
// We defer the majority of the implementation to the regular kj::Promise<T> coroutine adapter.
267+
268+
kj::PromiseFulfillerPair<void> beginDeferredProxying = kj::newPromiseAndFulfiller<void>();
269+
// Our `get_return_object()` function returns a kj::Promise<DeferredProxy<T>>, waits on this
270+
// `beginDeferredProxying.promise`, then fulfills its Promise with the result of
271+
// `inner.get_return_object()`.
272+
};
273+
274+
// =======================================================================================
275+
155276
kj::Maybe<jsg::V8Ref<v8::Object>> cloneRequestCf(
156277
jsg::Lock& js, kj::Maybe<jsg::V8Ref<v8::Object>> maybeCf);
157278

0 commit comments

Comments
 (0)