Skip to content

Commit aa63a8a

Browse files
committed
Replace DeferredProxyPromise<T> with kj::Promise<DeferredProxy<T>>
1 parent 5ea1835 commit aa63a8a

File tree

8 files changed

+128
-135
lines changed

8 files changed

+128
-135
lines changed

src/workerd/api/blob.c++

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ public:
183183
}
184184

185185
// We can't defer the write to the proxy stage since it depends on `blob` which lives in the
186-
// isolate.
187-
co_return newNoopDeferredProxy();
186+
// isolate, so we don't `KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING`.
187+
co_return;
188188
}
189189

190190
private:

src/workerd/api/cache.c++

Lines changed: 45 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,10 @@ jsg::Promise<void> Cache::put(jsg::Lock& js, Request::Info requestOrUrl,
332332
// destruction timing in the event of an error; see below.
333333

334334
// The next step is a bit complicated as it occurs in two separate async flows.
335-
// First, we await the serialization promise, then construct a DeferredProxy.
336-
// That DeferredProxy contains the second async flow that actually handles the
337-
// request and response.
335+
// First, we await the serialization promise, then enter "deferred proxying" by issuing
336+
// `KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING` from our coroutine. Everything after that
337+
// `KJ_CO_MAGIC` constitutes the second async flow that actually handles the request and
338+
// response.
338339
//
339340
// Weird: It's important that these objects be torn down in the right order and that the
340341
// DeferredProxy promise is handled separately from the inner promise.
@@ -385,62 +386,9 @@ jsg::Promise<void> Cache::put(jsg::Lock& js, Request::Info requestOrUrl,
385386
// will properly treat it as pending I/O, but only *after* the outer promise completes. This
386387
// gets us everything we want.
387388
//
388-
// Hence, what you see here: we first await the serializePromise then add all our
389-
// additional work onto the inner "deferred proxy" promise. Then we `awaitDeferredProxy()`
390-
// the whole thing.
391-
392-
// We use the same exception handling logic for both the promise for the DeferredProxy
393-
// and the inner promise handling the request/response.
394-
static auto constexpr handleException = [](kj::Exception&& exception) {
395-
if (exception.getType() != kj::Exception::Type::DISCONNECTED) {
396-
kj::throwFatalException(kj::mv(exception));
397-
}
398-
// If the origin or the cache disconnected, we don't treat this as an error, as put()
399-
// doesn't guarantee that it stores anything anyway.
400-
//
401-
// TODO(someday): I (Kenton) don't undestand why we'd explicitly want to hide this
402-
// error, even though hiding it is technically not a violation of the contract. To me
403-
// this seems undesirable, especially when it was the origin that failed. The caller
404-
// can always choose to ignore errors if they want (and many do, by passing to
405-
// waitUntil()). However, there is at least one test which depends on this behavior,
406-
// and probably production Workers in the wild, so I'm not changing it for now.
407-
};
408-
409-
// Here we handle the inner promise -- the one held by the DeferredProxy that is
410-
// returned below by the handleSerialize() method.
411-
static auto constexpr handleResponse =
412-
[](DeferredProxy<void>& deferred,
413-
kj::Promise<void> writePayloadHeadersPromise,
414-
kj::Promise<void> pumpRequestBodyPromise,
415-
kj::Own<kj::AsyncOutputStream> bodyStream,
416-
kj::Promise<kj::HttpClient::Response> responsePromise,
417-
kj::Own<kj::HttpClient> client,
418-
kj::Own<kj::AsyncInputStream> payloadStream) -> kj::Promise<void> {
419-
try {
420-
co_await deferred.proxyTask;
421-
// Make sure headers get written even if the body was empty -- see comments earlier.
422-
co_await writePayloadHeadersPromise;
423-
// Make sure the request body is done being pumped and had no errors. If serialization
424-
// completed successfully, then this should also complete immediately thereafter.
425-
co_await pumpRequestBodyPromise;
426-
// It is important to destroy the bodyStream before actually waiting on the
427-
// responsePromise to ensure that the terminal chunk is written since the bodyStream
428-
// may only write the terminal chunk in the streams destructor.
429-
bodyStream = nullptr;
430-
payloadStream = nullptr;
431-
auto response = co_await responsePromise;
432-
// We expect to see either 204 (success) or 413 (failure). Any other status code is a
433-
// violation of the contract between us and the cache, and is an internal
434-
// error, which we log. However, there's no need to throw, since the Cache API is an
435-
// ephemeral K/V store, and we never guaranteed the script we'd actually cache anything.
436-
if (response.statusCode != 204 && response.statusCode != 413) {
437-
LOG_CACHE_ERROR_ONCE(
438-
"Response to Cache API PUT was neither 204 nor 413: ", response);
439-
}
440-
} catch (...) {
441-
handleException(kj::getCaughtExceptionAsKj());
442-
}
443-
};
389+
// Hence, what you see here: we first await the serializePromise, then enter deferred proxying
390+
// with our magic `KJ_CO_MAGIC`, then perform all our additional work. Then we
391+
// `awaitDeferredProxy()` the whole thing.
444392

445393
// Here we handle the promise for the DeferredProxy itself.
446394
static auto constexpr handleSerialize = [](
@@ -477,18 +425,45 @@ jsg::Promise<void> Cache::put(jsg::Lock& js, Request::Info requestOrUrl,
477425
});
478426
try {
479427
auto deferred = co_await serialize;
480-
co_return DeferredProxy<void> {
481-
handleResponse(deferred,
482-
kj::mv(writePayloadHeadersPromise),
483-
kj::mv(pumpRequestBodyPromise),
484-
kj::mv(bodyStream),
485-
kj::mv(responsePromise),
486-
kj::mv(httpClient),
487-
kj::mv(payloadStream))
488-
};
428+
429+
// With our `serialize` promise having resolved to a DeferredProxy, we can now enter
430+
// deferred proxying ourselves.
431+
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
432+
433+
co_await deferred.proxyTask;
434+
// Make sure headers get written even if the body was empty -- see comments earlier.
435+
co_await writePayloadHeadersPromise;
436+
// Make sure the request body is done being pumped and had no errors. If serialization
437+
// completed successfully, then this should also complete immediately thereafter.
438+
co_await pumpRequestBodyPromise;
439+
// It is important to destroy the bodyStream before actually waiting on the
440+
// responsePromise to ensure that the terminal chunk is written since the bodyStream
441+
// may only write the terminal chunk in the streams destructor.
442+
bodyStream = nullptr;
443+
payloadStream = nullptr;
444+
auto response = co_await responsePromise;
445+
// We expect to see either 204 (success) or 413 (failure). Any other status code is a
446+
// violation of the contract between us and the cache, and is an internal
447+
// error, which we log. However, there's no need to throw, since the Cache API is an
448+
// ephemeral K/V store, and we never guaranteed the script we'd actually cache anything.
449+
if (response.statusCode != 204 && response.statusCode != 413) {
450+
LOG_CACHE_ERROR_ONCE(
451+
"Response to Cache API PUT was neither 204 nor 413: ", response);
452+
}
489453
} catch (...) {
490-
handleException(kj::getCaughtExceptionAsKj());
491-
co_return newNoopDeferredProxy();
454+
auto exception = kj::getCaughtExceptionAsKj();
455+
if (exception.getType() != kj::Exception::Type::DISCONNECTED) {
456+
kj::throwFatalException(kj::mv(exception));
457+
}
458+
// If the origin or the cache disconnected, we don't treat this as an error, as put()
459+
// doesn't guarantee that it stores anything anyway.
460+
//
461+
// TODO(someday): I (Kenton) don't undestand why we'd explicitly want to hide this
462+
// error, even though hiding it is technically not a violation of the contract. To me
463+
// this seems undesirable, especially when it was the origin that failed. The caller
464+
// can always choose to ignore errors if they want (and many do, by passing to
465+
// waitUntil()). However, there is at least one test which depends on this behavior,
466+
// and probably production Workers in the wild, so I'm not changing it for now.
492467
}
493468
};
494469

src/workerd/api/deferred-proxy-test.c++

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
namespace workerd::api {
55
namespace {
66

7-
KJ_TEST("DeferredProxyPromise<T>: early co_return implicitly fulfills outer promise") {
7+
KJ_TEST("kj::Promise<DeferredProxy<T>>: early co_return implicitly fulfills outer promise") {
88
kj::EventLoop loop;
99
kj::WaitScope waitScope(loop);
1010

1111
{
1212
// Implicit void co_return.
13-
auto coro = []() -> DeferredProxyPromise<void> {
13+
auto coro = []() -> kj::Promise<DeferredProxy<void>> {
1414
co_await kj::Promise<void>(kj::READY_NOW);
1515
};
1616
auto promise = coro();
@@ -21,7 +21,7 @@ KJ_TEST("DeferredProxyPromise<T>: early co_return implicitly fulfills outer prom
2121
}
2222
{
2323
// Explicit void co_return.
24-
auto coro = []() -> DeferredProxyPromise<void> {
24+
auto coro = []() -> kj::Promise<DeferredProxy<void>> {
2525
co_return;
2626
};
2727
auto promise = coro();
@@ -32,7 +32,7 @@ KJ_TEST("DeferredProxyPromise<T>: early co_return implicitly fulfills outer prom
3232
}
3333
{
3434
// Valueful co_return.
35-
auto coro = []() -> DeferredProxyPromise<int> {
35+
auto coro = []() -> kj::Promise<DeferredProxy<int>> {
3636
co_return 123;
3737
};
3838
auto promise = coro();
@@ -43,14 +43,15 @@ KJ_TEST("DeferredProxyPromise<T>: early co_return implicitly fulfills outer prom
4343
}
4444
}
4545

46-
KJ_TEST("DeferredProxyPromise<T>: `co_yield BEGIN_DEFERRED_PROXYING` fulfills outer promise") {
46+
KJ_TEST("kj::Promise<DeferredProxy<T>>: `KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING` fulfills outer "
47+
"promise") {
4748
kj::EventLoop loop;
4849
kj::WaitScope waitScope(loop);
4950

5051
auto paf1 = kj::newPromiseAndFulfiller<void>();
5152
auto paf2 = kj::newPromiseAndFulfiller<int>();
5253

53-
auto coro = [&]() -> DeferredProxyPromise<int> {
54+
auto coro = [&]() -> kj::Promise<DeferredProxy<int>> {
5455
co_await paf1.promise;
5556
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
5657
co_return co_await paf2.promise;
@@ -75,13 +76,14 @@ KJ_TEST("DeferredProxyPromise<T>: `co_yield BEGIN_DEFERRED_PROXYING` fulfills ou
7576
KJ_EXPECT(proxyTask.wait(waitScope) == 123);
7677
}
7778

78-
KJ_TEST("DeferredProxyPromise<T>: unhandled exception before `co_yield BEGIN_DEFERRED_PROXYING`") {
79+
KJ_TEST("kj::Promise<DeferredProxy<T>>: unhandled exception before "
80+
"`KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING`") {
7981
kj::EventLoop loop;
8082
kj::WaitScope waitScope(loop);
8183

8284
auto paf = kj::newPromiseAndFulfiller<void>();
8385

84-
auto coro = [&]() -> DeferredProxyPromise<int> {
86+
auto coro = [&]() -> kj::Promise<DeferredProxy<int>> {
8587
co_await paf.promise;
8688
KJ_FAIL_ASSERT("promise should have been rejected");
8789
};
@@ -97,14 +99,15 @@ KJ_TEST("DeferredProxyPromise<T>: unhandled exception before `co_yield BEGIN_DEF
9799
KJ_EXPECT_THROW_MESSAGE("test error", promise.wait(waitScope));
98100
}
99101

100-
KJ_TEST("DeferredProxyPromise<T>: unhandled exception after `co_yield BEGIN_DEFERRED_PROXYING`") {
102+
KJ_TEST("kj::Promise<DeferredProxy<T>>: unhandled exception after "
103+
"`KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING`") {
101104
kj::EventLoop loop;
102105
kj::WaitScope waitScope(loop);
103106

104107
auto paf1 = kj::newPromiseAndFulfiller<void>();
105108
auto paf2 = kj::newPromiseAndFulfiller<int>();
106109

107-
auto coro = [&]() -> DeferredProxyPromise<int> {
110+
auto coro = [&]() -> kj::Promise<DeferredProxy<int>> {
108111
co_await paf1.promise;
109112
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
110113
co_return co_await paf2.promise;
@@ -129,20 +132,20 @@ KJ_TEST("DeferredProxyPromise<T>: unhandled exception after `co_yield BEGIN_DEFE
129132
KJ_EXPECT_THROW_MESSAGE("test error", proxyTask.wait(waitScope));
130133
}
131134

132-
KJ_TEST("DeferredProxyPromise<T>: can be `co_await`ed from another coroutine") {
135+
KJ_TEST("kj::Promise<DeferredProxy<T>>: can be `co_await`ed from another coroutine") {
133136
kj::EventLoop loop;
134137
kj::WaitScope waitScope(loop);
135138

136139
auto paf1 = kj::newPromiseAndFulfiller<void>();
137140
auto paf2 = kj::newPromiseAndFulfiller<int>();
138141

139-
auto nestedCoro = [&]() -> DeferredProxyPromise<int> {
142+
auto nestedCoro = [&]() -> kj::Promise<DeferredProxy<int>> {
140143
co_await paf1.promise;
141144
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
142145
co_return co_await paf2.promise;
143146
};
144147

145-
auto coro = [&]() -> DeferredProxyPromise<int> {
148+
auto coro = [&]() -> kj::Promise<DeferredProxy<int>> {
146149
auto deferred = co_await nestedCoro();
147150
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
148151
co_return co_await deferred.proxyTask;

src/workerd/api/deferred-proxy.h

Lines changed: 59 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -61,69 +61,84 @@ inline kj::Promise<DeferredProxy<void>> addNoopDeferredProxy(kj::Promise<void> p
6161
// ---------------------------------------------------------
6262
// Deferred proxy coroutine integration
6363

64-
class BeginDeferredProxyingConstant final {};
65-
constexpr BeginDeferredProxyingConstant BEGIN_DEFERRED_PROXYING {};
66-
// A magic constant which a DeferredProxyPromise<T> coroutine can `co_yield` to indicate that the
67-
// deferred proxying phase of its operation has begun.
64+
// If a coroutine returns a kj::Promise<DeferredProxy<T>>, the coroutine implementation gains the
65+
// following features:
66+
//
67+
// - `KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING` fulfills the outer kj::Promise<DeferredProxy<T>>. The
68+
// resulting DeferredProxy<T> object contains a `proxyTask` Promise which owns the coroutine.
69+
//
70+
// - `co_return` implicitly fulfills the outer Promise for the DeferredProxy<T> (if it has not
71+
// already been fulfilled by the magic `KJ_CO_MAGIC` described above), then fulfills the inner
72+
// `proxyTask`.
73+
//
74+
// - Unhandled exceptions reject the outer kj::Promise<DeferredProxy<T>> (if it has not already
75+
// been fulfilled by the magic `KJ_CO_MAGIC` described above), then reject the inner `proxyTask`.
76+
//
77+
// It is not possible to write a "regular" coroutine which returns kj::Promise<DeferredProxy<T>>;
78+
// that is, `co_return DeferredProxy<T> { ... }` is a compile error. You must initiate deferred
79+
// proxying using `KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING`.
80+
81+
template <typename T, typename... Args>
82+
class DeferredProxyCoroutine;
83+
// The coroutine adapter class, required for the compiler to know how to create coroutines
84+
// returning kj::Promise<DeferredProxy<T>>. We declare it here so we can name it in our
85+
// `coroutine_traits` specialization below.
6886

69-
template <typename T>
70-
class DeferredProxyPromise: public kj::Promise<DeferredProxy<T>> {
71-
// A "strong typedef" for a kj::Promise<DeferredProxy<T>>. DeferredProxyPromise<T> is intended to
72-
// be used as the return type for coroutines, in which case the coroutine implementation gains the
73-
// following features:
74-
//
75-
// - `co_yield BEGIN_DEFERRED_PROXYING` fulfills the outer kj::Promise<DeferredProxy<T>>. The
76-
// resulting DeferredProxy<T> object contains a `proxyTask` Promise which owns the coroutine.
77-
//
78-
// - `co_return` implicitly fulfills the outer Promise for the DeferredProxy<T> (if it has not
79-
// already been fulfilled by the magic `co_yield` described above), then fulfills the inner
80-
// `proxyTask`.
81-
//
82-
// - Unhandled exceptions reject the outer kj::Promise<DeferredProxy<T>> (if it has not already
83-
// been fulfilled by the magic `co_yield` described above), then reject the inner `proxyTask`.
87+
} // namespace workerd::api
8488

85-
public:
86-
DeferredProxyPromise(kj::Promise<DeferredProxy<T>> promise)
87-
: kj::Promise<DeferredProxy<T>>(kj::mv(promise)) {}
88-
// Allow conversion from a regular Promise. This allows our `promise_type::get_return_object()`
89-
// implementation to be implemented as a regular Promise-returning coroutine.
90-
91-
class Coroutine;
92-
using promise_type = Coroutine;
93-
// The coroutine adapter class, required for the compiler to know how to create coroutines
94-
// returning DeferredProxyPromise<T>. Since the whole point of DeferredProxyPromise<T> is to serve
95-
// as a coroutine return type, there's not really any point hiding promise_type inside of a
96-
// coroutine_traits specialization, like kj::Promise<T> does.
89+
namespace KJ_COROUTINE_STD_NAMESPACE {
90+
// Enter the `std` or `std::experimental` namespace, depending on whether we're using C++20
91+
// coroutines or the Coroutines TS.
92+
93+
template <class T, class... Args>
94+
struct coroutine_traits<kj::Promise<workerd::api::DeferredProxy<T>>, Args...> {
95+
using promise_type = workerd::api::DeferredProxyCoroutine<T, Args...>;
9796
};
9897

98+
} // namespace KJ_COROUTINE_STD_NAMESPACE
99+
100+
namespace workerd::api {
101+
102+
class BeginDeferredProxyingConstant final {};
103+
constexpr BeginDeferredProxyingConstant BEGIN_DEFERRED_PROXYING {};
104+
// A magic constant which a DeferredProxyPromise<T> coroutine can `KJ_CO_MAGIC` to indicate that the
105+
// deferred proxying phase of its operation has begun.
106+
99107
template <typename T>
100-
class DeferredProxyPromise<T>::Coroutine:
101-
public kj::_::CoroutineMixin<DeferredProxyPromise<T>::Coroutine, T> {
108+
class DeferredProxyCoroutine: public kj::_::CoroutineMixin<DeferredProxyCoroutine<T>, T> {
102109
// The coroutine adapter type for DeferredProxyPromise<T>. Most of the work is forwarded to the
103110
// regular kj::Promise<T> coroutine adapter.
104111

105112
using InnerCoroutineAdapter =
106113
typename kj::_::stdcoro::coroutine_traits<kj::Promise<T>, Args...>::promise_type;
107114

108115
public:
109-
using Handle = kj::_::stdcoro::coroutine_handle<Coroutine>;
116+
using Handle = kj::_::stdcoro::coroutine_handle<DeferredProxyCoroutine>;
110117

111-
Coroutine(kj::SourceLocation location = {}): inner(Handle::from_promise(*this), location) {}
118+
DeferredProxyCoroutine(kj::SourceLocation location = {})
119+
: inner(Handle::from_promise(*this), location) {}
112120

113121
kj::Promise<DeferredProxy<T>> get_return_object() {
114122
// We need to return a RAII object which will destroy this (as in, `this`) coroutine adapter.
115123
// The logic which calls `coroutine_handle<>::destroy()` is tucked away in our inner coroutine
116124
// adapter, however, leading to the weird situation where the `inner.get_return_object()`
117-
// Promise owns `this`. Thus, we cannot store the inner promise in our own coroutine adapter
118-
// class, because that would cause a reference cycle. Fortunately, we can implement our own
119-
// `get_return_object()` as a regular Promise-returning coroutine and keep the inner Promise in
120-
// our coroutine frame, giving the caller transitive ownership of the DeferredProxyPromise<T>
121-
// coroutine by way of the kj::Promise<DeferredProxy<T>> coroutine. Later on, when the outer
122-
// Promise is fulfilled, the caller will gain direct ownership of the DeferredProxyPromise<T>
123-
// coroutine via the `proxyTask` promise.
125+
// Promise owns `this`. So, returning a Promise with `inner.get_return_object()` in a `.then()`
126+
// gives transitive ownership of the coroutine to the caller.
127+
//
128+
// Note that we must use a `.then()` instead of a coroutine to implement this, because trying to
129+
// write a coroutine returning `kj::Promise<DeferredProxy<T>>` will just recurse into this
130+
// coroutine adapter class.
131+
//
132+
// TODO(perf): We could avoid the `newPromiseAndFulfiller()` and `.then()` overhead by having
133+
// DeferredProxyCoroutine<T> implement PromiseNode. `this` could own `proxyTask` temporarily,
134+
// forwarding `PromiseNode::destroy()` calls to `proxyTask`'s PromiseNode (if it still
135+
// exists), which takes care of the ownership reference cycle. `yield_value()`, `fulfill()`,
136+
// and `unhandled_exception()`could arm our OnReadyEvent, and our `get()` implementation would
137+
// return `proxyTask`, or a stored exception.
124138
auto proxyTask = inner.get_return_object();
125-
co_await beginDeferredProxying.promise;
126-
co_return DeferredProxy<T> { kj::mv(proxyTask) };
139+
return beginDeferredProxying.promise.then([proxyTask=kj::mv(proxyTask)]() mutable {
140+
return DeferredProxy<T> { kj::mv(proxyTask) };
141+
});
127142
}
128143

129144
auto initial_suspend() { return inner.initial_suspend(); }

src/workerd/api/http.c++

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ public:
458458
if (end) co_await output.end();
459459
}
460460

461-
co_return newNoopDeferredProxy();
461+
co_return;
462462
}
463463

464464
private:

0 commit comments

Comments
 (0)