Skip to content

RFC: S3 Streaming utility #1692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
2 tasks done
rubenfonseca opened this issue Nov 8, 2022 · 2 comments · Fixed by #1719
Closed
2 tasks done

RFC: S3 Streaming utility #1692

rubenfonseca opened this issue Nov 8, 2022 · 2 comments · Fixed by #1719
Assignees
Labels

Comments

@rubenfonseca
Copy link
Contributor

rubenfonseca commented Nov 8, 2022

Is this related to an existing feature request or issue?

No response

Which AWS Lambda Powertools utility does this relate to?

New utility

Summary

Implement a S3 Streaming utility so we can process S3 files that are bigger than memory, or temporary storage.

Use case

As a user processing S3 files inside my Lambda, I would like to be able to consume big S3 files in a streaming fashion. I want Powertools to handle all the streaming and IO for me, so I can focus on writing the logic that handles the data.

Ideally, I would be able to enable plugins to further process the streaming data: for instance the data on S3 could be compressed, or could be formatted as JSON lines. I want Powertools to handle all the unpacking/deserialization for me.

Current experience

Currently, one can tap into the the StreamingBody response from boto3 in order to stream bytes from S3:

stream = s3.Object(bucket_name=bucket, key=key).get()["Body"]

One can also compose this stream with data transformations, like inflating a gzip stream:

gz = GzipFile(raw=stream)
for line in gz:
   ....

However, this raw solutions has the following problems/limitations:

  • The raw stream from boto3 is not seeakble (necessary for things like zipfile)
  • Poor DX when plugging multiple data transformations
  • No transparent error handling for re-triable exceptions

Proposal

Introduce a new utility called streaming and a new class S3Object that would implement the RawIOBase interface.

class S3Object(io.RawIOBase):
  def __init__(self, bucket: string, key: string, version_id: Optional[str], boto3_client: Optional[boto3.Client]):

The code would implement all the necessary methods to satisfy the RawIOBase protocol.

Additionally, create a plug-in system/middleware to plug additional data transformations on the stream. Common transformations could be included directly on the class constructor:

S3Object(...., gunzip=True, json=True)

While still providing a way to implement custom data transformations:

class CustomTransform(BaseTransform):
    def transform(self, source: io.BufferedIOBase) -> io.BufferedIOBase:
       ...

s3 = S3Object(...)
s3.add_transform(CustomTransform())

User experience

A user can use the utility from the Lambda handler:

def lambda_handler(event: dict, context: LambdaContext):
  s3_obj = S3Object(bucket=event.get("bucket"), key=event.get("key"), gunzip=True)
  for line in s3_obj:
    # do something with line

Under the hood, the utility would take care of fetching only the necessary bits as the data is consumed, and all the retrying caused by connection timeouts.

Out of scope

  • Staging the data into temporary storage (Lambda's Ephemeral storage, EFS, etc), before processing: can be added later if there's interest.
  • Processing parallelization: it will increase the complexity of the solution and it can be added later.

Future work

Once the basics are in place, we could optimize even further the IO under the hood, by implementing read-head, async and parallel IO, etc. This could result in improved performance on future versions of Powertools, with no breaking API change.

Potential challenges

Implementation is not trivial, so we need a good set of tests, including E2E tests.

Dependencies and Integrations

No response

Alternative solutions

When fetching an S3 object with boto3, there's a private accessor to the underlying urllib3.response.HTTPResponse. This could be used to stream the results, and could be enough if we're not implementing seek.

s3_obj['Body']._raw_stream
<urllib3.response.HTTPResponse object at 0x1040e8460>

Acknowledgment

@rubenfonseca rubenfonseca added RFC triage Pending triage from maintainers labels Nov 8, 2022
@heitorlessa heitorlessa removed the triage Pending triage from maintainers label Nov 15, 2022
@rubenfonseca rubenfonseca linked a pull request Nov 15, 2022 that will close this issue
7 tasks
@github-actions
Copy link
Contributor

⚠️COMMENT VISIBILITY WARNING⚠️

This issue is now closed. Please be mindful that future comments are hard for our team to see.

If you need more assistance, please either tag a team member or open a new issue that references this one.

If you wish to keep having a conversation with other community members under this issue feel free to do so.

@github-actions github-actions bot added the pending-release Fix or implementation already in dev waiting to be released label Nov 24, 2022
@github-actions
Copy link
Contributor

This is now released under 2.4.0 version!

@github-actions github-actions bot removed the pending-release Fix or implementation already in dev waiting to be released label Nov 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants