Skip to content

Add new AsyncResponseTransformer: toPublisher() #2837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jan 29, 2022

Conversation

Bennett-Lynch
Copy link
Contributor

@Bennett-Lynch Bennett-Lynch commented Nov 10, 2021

Description

Add new AsyncResponseTransformer: toPublisher(). This transformer makes it more convenient for users to directly consume a streaming-response payload (i.e., S3 GetObject) with async clients. This also allows users of Reactor/RxJava to more easily consume a streaming response (e.g., via Flux#from(Publisher)).

Motivation and Context

AsyncResponseTransformer currently lacks a static factory method that would allow a user to stream the response body data. The only methods currently available are toFile() and toBytes(). This lacks parity when compared to AsyncRequestBody#fromPublisher(), which allows declaring a request body from an arbitrary Publisher<ByteBuffer>. It also lacks parity when compared to the sync client's response equivalent, ResponseTransformer#toInputStream(), which streams a response body to an InputStream.

Related Issues

  1. Return a Publisher<ByteBuffer> from streaming operations #391
  2. Line-by-line publisher of S3 object content #1253
  3. Support S3TransferManager upload using Flux<ByteBuffer> #2714

Modifications

  1. Create new AsyncResponseTransformer#toPublisher() static factory method that publishes the response body content through a ResponsePublisher, which is an SdkPublisher that also contains a reference to the SdkResponse returned by the service.
  2. Update generated async clients' response-streaming operations to publish metrics explicitly at end-of-stream, rather than end-of-future. This is necessary because the provided transformer implementation determines when the associated future is completed. For to-file and to-bytes transformers, it is not completed until the end of the response body. But for the new to-publisher transformer, the future is completed when receiving the response header.
  3. To facilitate the above requirement, move NotifyingAsyncResponseTransformer from being a TM-internal-API to a SDK-protected-API. Refactor NotifyingAsyncResponseTransformer into 3 separate listeners interfaces: AsyncResponseTransformerListener, PublisherListener, SubscriberListener. Each listener type allows wrapping the associated interface to listen in on events during the request/response lifecycle. Also convert NotifyingAsyncRequestBody to SDK-protected AsyncRequestBodyListener.
  4. Update TransferManager's TransferProgressUpdater to use the new transformer-listener logic to notify of a completedTransfer only after stream completion. Update TransferManager's TransferListenerInvoker to guarantee one-time callback execution, now that there are multiple possible origins of success and failure.
  5. Add integration tests for TransferManager and S3AsyncClient using the new toPublisher feature.

License

  • I confirm that this pull request can be released under the Apache 2 license

Bennett Lynch and others added 4 commits November 10, 2021 08:29
## Motivation

AsyncResponseTransformer currently lacks a static factory method that
would allow a user to stream the response body data. The only methods
currently available are toFile() and toBytes(). This lacks parity when
compared to AsyncRequestBody#fromPublisher(), which allows declaring a
body from an arbitrary ByteBuffer Publisher, and the sync client
equivalent, ResponseTransformer#toInputStream(), which streams a
response body to an InputStream.
@sonarqubecloud
Copy link

sonarqubecloud bot commented Nov 10, 2021

SonarCloud Quality Gate failed.    Quality Gate failed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 3 Code Smells

37.8% 37.8% Coverage
0.0% 0.0% Duplication

@Bennett-Lynch
Copy link
Contributor Author

@pkgonan Let me know if you think this would be useful. It's related to #2714, but for downloads rather than uploads. It's possible to implement your own AsyncResponseTransformer to do this today, but offering it as part of the SDK should help simplify that use case.

@pkgonan
Copy link

pkgonan commented Nov 16, 2021

@Bennett-Lynch
If there is an upload, there must also be a download. Thanks a lot. It looks very useful.

@Bennett-Lynch Bennett-Lynch changed the title Add new AsyncResponseTransformer: toPublisher() <Draft> Add new AsyncResponseTransformer: toPublisher() Nov 20, 2021
@pkgonan
Copy link

pkgonan commented Dec 19, 2021

@Bennett-Lynch
Hi. When do you expect this feature done?

@Bennett-Lynch
Copy link
Contributor Author

Bennett-Lynch commented Dec 22, 2021

Hi @pkgonan. I don't have an estimate to share at this time, but if this is a feature you could benefit from, please let me know and I will try to find cycles to prioritize it.

In the meantime, if you need the functionality immediately, you should be able to copy the classes that were introduced in this PR. These changes don't require any substantial changes to the inner workings of the SDK and instead are a mostly standalone and pluggable implementation of the AsyncResponseTransformer interface. I understand copying classes is not an ideal solution, but I just wanted to mention it as a temporary solution.

@pkgonan
Copy link

pkgonan commented Dec 22, 2021

@Bennett-Lynch Hi. Yes I need this feature. We are considering this feature in new projects. Therefore, I was constantly checking the progress of this PR.

@Bennett-Lynch Bennett-Lynch marked this pull request as ready for review January 27, 2022 04:18
@Bennett-Lynch Bennett-Lynch requested a review from a team as a code owner January 27, 2022 04:18
@Bennett-Lynch Bennett-Lynch changed the title <Draft> Add new AsyncResponseTransformer: toPublisher() Add new AsyncResponseTransformer: toPublisher() Jan 27, 2022
Copy link
Contributor

@zoewangg zoewangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure integ tests pass before merging?

@sonarqubecloud
Copy link

SonarCloud Quality Gate failed.    Quality Gate failed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 10 Code Smells

87.1% 87.1% Coverage
3.7% 3.7% Duplication

@Bennett-Lynch Bennett-Lynch merged commit 782b49d into aws:master Jan 29, 2022
@Bennett-Lynch
Copy link
Contributor Author

This change has been released as part of version 2.17.122.

@pkgonan
Copy link

pkgonan commented Sep 7, 2023

@Bennett-Lynch
Hi.

When using this feature, the LeasedConcurrency metric will continue to rise.

Perhaps there is a leak in your connection.

aws-sdk-java-automation added a commit that referenced this pull request Dec 15, 2023
…69a1f8ffe

Pull request: release <- staging/03e02562-7185-4eda-ba03-71c69a1f8ffe
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants