Skip to content

Commit 411a169

Browse files
Merge pull request #953 from cloudflare/harris/deferred-proxy-coroutine
Implement `KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING`
2 parents baf7c0b + e404eda commit 411a169

13 files changed

+623
-131
lines changed

WORKSPACE

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ bazel_skylib_workspace()
2424

2525
http_archive(
2626
name = "capnp-cpp",
27-
sha256 = "610154de03297c1dabf9b2a096ad2272015cfb410b5b31294179dbca3dd2f515",
28-
strip_prefix = "capnproto-capnproto-60a81f3/c++",
27+
sha256 = "56177a2b3e03a1ee3518a0663a537d0eeb601fd23e4c4926a0b7ae65aab6e194",
28+
strip_prefix = "capnproto-capnproto-78e751a/c++",
2929
type = "tgz",
30-
urls = ["https://github.com/capnproto/capnproto/tarball/60a81f38bff869109bc58516c42ecd814a21a8eb"],
30+
urls = ["https://github.com/capnproto/capnproto/tarball/78e751abceb4d376da5a200e986011f881f965a4"],
3131
)
3232

3333
http_archive(

src/workerd/api/blob.c++

+2-2
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ public:
183183
}
184184

185185
// We can't defer the write to the proxy stage since it depends on `blob` which lives in the
186-
// isolate.
187-
co_return newNoopDeferredProxy();
186+
// isolate, so we don't `KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING`.
187+
co_return;
188188
}
189189

190190
private:

src/workerd/api/cache.c++

+45-70
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,10 @@ jsg::Promise<void> Cache::put(jsg::Lock& js, Request::Info requestOrUrl,
332332
// destruction timing in the event of an error; see below.
333333

334334
// The next step is a bit complicated as it occurs in two separate async flows.
335-
// First, we await the serialization promise, then construct a DeferredProxy.
336-
// That DeferredProxy contains the second async flow that actually handles the
337-
// request and response.
335+
// First, we await the serialization promise, then enter "deferred proxying" by issuing
336+
// `KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING` from our coroutine. Everything after that
337+
// `KJ_CO_MAGIC` constitutes the second async flow that actually handles the request and
338+
// response.
338339
//
339340
// Weird: It's important that these objects be torn down in the right order and that the
340341
// DeferredProxy promise is handled separately from the inner promise.
@@ -385,62 +386,9 @@ jsg::Promise<void> Cache::put(jsg::Lock& js, Request::Info requestOrUrl,
385386
// will properly treat it as pending I/O, but only *after* the outer promise completes. This
386387
// gets us everything we want.
387388
//
388-
// Hence, what you see here: we first await the serializePromise then add all our
389-
// additional work onto the inner "deferred proxy" promise. Then we `awaitDeferredProxy()`
390-
// the whole thing.
391-
392-
// We use the same exception handling logic for both the promise for the DeferredProxy
393-
// and the inner promise handling the request/response.
394-
static auto constexpr handleException = [](kj::Exception&& exception) {
395-
if (exception.getType() != kj::Exception::Type::DISCONNECTED) {
396-
kj::throwFatalException(kj::mv(exception));
397-
}
398-
// If the origin or the cache disconnected, we don't treat this as an error, as put()
399-
// doesn't guarantee that it stores anything anyway.
400-
//
401-
// TODO(someday): I (Kenton) don't undestand why we'd explicitly want to hide this
402-
// error, even though hiding it is technically not a violation of the contract. To me
403-
// this seems undesirable, especially when it was the origin that failed. The caller
404-
// can always choose to ignore errors if they want (and many do, by passing to
405-
// waitUntil()). However, there is at least one test which depends on this behavior,
406-
// and probably production Workers in the wild, so I'm not changing it for now.
407-
};
408-
409-
// Here we handle the inner promise -- the one held by the DeferredProxy that is
410-
// returned below by the handleSerialize() method.
411-
static auto constexpr handleResponse =
412-
[](DeferredProxy<void>& deferred,
413-
kj::Promise<void> writePayloadHeadersPromise,
414-
kj::Promise<void> pumpRequestBodyPromise,
415-
kj::Own<kj::AsyncOutputStream> bodyStream,
416-
kj::Promise<kj::HttpClient::Response> responsePromise,
417-
kj::Own<kj::HttpClient> client,
418-
kj::Own<kj::AsyncInputStream> payloadStream) -> kj::Promise<void> {
419-
try {
420-
co_await deferred.proxyTask;
421-
// Make sure headers get written even if the body was empty -- see comments earlier.
422-
co_await writePayloadHeadersPromise;
423-
// Make sure the request body is done being pumped and had no errors. If serialization
424-
// completed successfully, then this should also complete immediately thereafter.
425-
co_await pumpRequestBodyPromise;
426-
// It is important to destroy the bodyStream before actually waiting on the
427-
// responsePromise to ensure that the terminal chunk is written since the bodyStream
428-
// may only write the terminal chunk in the streams destructor.
429-
bodyStream = nullptr;
430-
payloadStream = nullptr;
431-
auto response = co_await responsePromise;
432-
// We expect to see either 204 (success) or 413 (failure). Any other status code is a
433-
// violation of the contract between us and the cache, and is an internal
434-
// error, which we log. However, there's no need to throw, since the Cache API is an
435-
// ephemeral K/V store, and we never guaranteed the script we'd actually cache anything.
436-
if (response.statusCode != 204 && response.statusCode != 413) {
437-
LOG_CACHE_ERROR_ONCE(
438-
"Response to Cache API PUT was neither 204 nor 413: ", response);
439-
}
440-
} catch (...) {
441-
handleException(kj::getCaughtExceptionAsKj());
442-
}
443-
};
389+
// Hence, what you see here: we first await the serializePromise, then enter deferred proxying
390+
// with our magic `KJ_CO_MAGIC`, then perform all our additional work. Then we
391+
// `awaitDeferredProxy()` the whole thing.
444392

445393
// Here we handle the promise for the DeferredProxy itself.
446394
static auto constexpr handleSerialize = [](
@@ -477,18 +425,45 @@ jsg::Promise<void> Cache::put(jsg::Lock& js, Request::Info requestOrUrl,
477425
});
478426
try {
479427
auto deferred = co_await serialize;
480-
co_return DeferredProxy<void> {
481-
handleResponse(deferred,
482-
kj::mv(writePayloadHeadersPromise),
483-
kj::mv(pumpRequestBodyPromise),
484-
kj::mv(bodyStream),
485-
kj::mv(responsePromise),
486-
kj::mv(httpClient),
487-
kj::mv(payloadStream))
488-
};
428+
429+
// With our `serialize` promise having resolved to a DeferredProxy, we can now enter
430+
// deferred proxying ourselves.
431+
KJ_CO_MAGIC BEGIN_DEFERRED_PROXYING;
432+
433+
co_await deferred.proxyTask;
434+
// Make sure headers get written even if the body was empty -- see comments earlier.
435+
co_await writePayloadHeadersPromise;
436+
// Make sure the request body is done being pumped and had no errors. If serialization
437+
// completed successfully, then this should also complete immediately thereafter.
438+
co_await pumpRequestBodyPromise;
439+
// It is important to destroy the bodyStream before actually waiting on the
440+
// responsePromise to ensure that the terminal chunk is written since the bodyStream
441+
// may only write the terminal chunk in the streams destructor.
442+
bodyStream = nullptr;
443+
payloadStream = nullptr;
444+
auto response = co_await responsePromise;
445+
// We expect to see either 204 (success) or 413 (failure). Any other status code is a
446+
// violation of the contract between us and the cache, and is an internal
447+
// error, which we log. However, there's no need to throw, since the Cache API is an
448+
// ephemeral K/V store, and we never guaranteed the script we'd actually cache anything.
449+
if (response.statusCode != 204 && response.statusCode != 413) {
450+
LOG_CACHE_ERROR_ONCE(
451+
"Response to Cache API PUT was neither 204 nor 413: ", response);
452+
}
489453
} catch (...) {
490-
handleException(kj::getCaughtExceptionAsKj());
491-
co_return newNoopDeferredProxy();
454+
auto exception = kj::getCaughtExceptionAsKj();
455+
if (exception.getType() != kj::Exception::Type::DISCONNECTED) {
456+
kj::throwFatalException(kj::mv(exception));
457+
}
458+
// If the origin or the cache disconnected, we don't treat this as an error, as put()
459+
// doesn't guarantee that it stores anything anyway.
460+
//
461+
// TODO(someday): I (Kenton) don't undestand why we'd explicitly want to hide this
462+
// error, even though hiding it is technically not a violation of the contract. To me
463+
// this seems undesirable, especially when it was the origin that failed. The caller
464+
// can always choose to ignore errors if they want (and many do, by passing to
465+
// waitUntil()). However, there is at least one test which depends on this behavior,
466+
// and probably production Workers in the wild, so I'm not changing it for now.
492467
}
493468
};
494469

src/workerd/api/crypto-impl.h

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
// Don't include this file unless your name is "crypto*.c++".
99

1010
#include "crypto.h"
11+
#include <workerd/api/util.h>
1112
#include <kj/encoding.h>
1213
#include <openssl/evp.h>
1314
#include <openssl/bio.h>

0 commit comments

Comments
 (0)