Skip to content

feat(streaming): add new s3 streaming utility #1719

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
a8d8467
feat: add s3 streaming utility
rubenfonseca Nov 15, 2022
1fddf1e
chore: add streaming utility area and label
rubenfonseca Nov 15, 2022
28c6003
fix: use Literal from typing_extensions
rubenfonseca Nov 15, 2022
fc5bdbb
fix: re-wrote implementation and added documentation
rubenfonseca Nov 16, 2022
c11de5c
chore: add initial tests for s3_seekable_io
rubenfonseca Nov 16, 2022
8dc8b84
fix: renamed file
rubenfonseca Nov 16, 2022
77aba9b
fix: remove json transformation
rubenfonseca Nov 17, 2022
e29d46d
fix: remove json transformation from import
rubenfonseca Nov 17, 2022
be99144
chore: add ijson as a dev dependency
rubenfonseca Nov 17, 2022
822f597
chore: add s3 objects tests
rubenfonseca Nov 17, 2022
b70fb62
chore: update docs
rubenfonseca Nov 17, 2022
320c6a9
chore: add e2e tests
rubenfonseca Nov 18, 2022
4c867a3
fix: bug when using encoding and newline on CSV transform
rubenfonseca Nov 18, 2022
c66e045
chore(docs): moved s3 streaming under streaming in docs
rubenfonseca Nov 21, 2022
97ffd8a
Revert "chore(docs): moved s3 streaming under streaming in docs"
rubenfonseca Nov 21, 2022
3cf13e5
chore: added compat StreamingBody for older botocore
rubenfonseca Nov 21, 2022
49fc7b8
fix: boto3 import order
rubenfonseca Nov 21, 2022
cda826d
chore: use s3 client instead of s3 resource
rubenfonseca Nov 21, 2022
dbdef1e
fix: replace Union with |
rubenfonseca Nov 21, 2022
0f43797
fix: add descriptions on NotImplementedErrors
rubenfonseca Nov 21, 2022
c42395a
chore(docs): added programming documentation to the data transformations
rubenfonseca Nov 21, 2022
47b8345
fix: typo
rubenfonseca Nov 21, 2022
4c1ddd8
chore: add versioned bucket tests
rubenfonseca Nov 21, 2022
e405c92
fix: monkey patch botocore response correctly
rubenfonseca Nov 22, 2022
95f7036
fix: only patch StreamingBody if botocore is bellow 1.29.13
rubenfonseca Nov 22, 2022
fa4dec2
chore: improve docs
rubenfonseca Nov 22, 2022
3bfbb5c
chore: apply review feedback
rubenfonseca Nov 23, 2022
758b50f
fix: typos
rubenfonseca Nov 23, 2022
f232810
docs(streaming): copywriting for intro and key features
heitorlessa Nov 23, 2022
fb2909f
chore: pushed an example of seeking a CSV file
rubenfonseca Nov 23, 2022
f314597
docs(streaming): copywriting background; split up data transformations
heitorlessa Nov 23, 2022
a542773
docs(streaming): split up zip files to make limitation more explicit
heitorlessa Nov 23, 2022
c2a64e8
chore: add testing examples to streaming docs
rubenfonseca Nov 23, 2022
639f38e
fix: remove test
rubenfonseca Nov 23, 2022
f496d41
docs(streaming): copywriting on custom transform options
heitorlessa Nov 23, 2022
7c0cc3a
chore: line-editing on byot
heitorlessa Nov 23, 2022
a8e48c6
docs(streaming): combine testing your code to be more realistic
heitorlessa Nov 23, 2022
6ed058a
docs: add streaming as a new utility in features table
heitorlessa Nov 23, 2022
60d3d86
docs(index): add feature in project homepage
heitorlessa Nov 24, 2022
8586c2a
docs(limitations): add new section to call out known issues
heitorlessa Nov 24, 2022
23f672a
docs(streaming): add reading ahead and backwards section
heitorlessa Nov 24, 2022
28e2040
docs: fix seek positioning and byte size
heitorlessa Nov 24, 2022
793dc70
docs(streaming): fix semantic wording on read ahead vs skipping
heitorlessa Nov 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ labelPRBasedOnFilePath:
typing:
- aws_lambda_powertools/utilities/typing/*
- mypy.ini
streaming:
- aws_lambda_powertools/utilities/streaming/*
commons:
- aws_lambda_powertools/shared/*

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ A suite of Python utilities for AWS Lambda functions to ease adopting best pract

![hero-image](https://user-images.githubusercontent.com/3340292/198254617-d0fdb672-86a6-4988-8a40-adf437135e0a.png)


## Features

* **[Tracing](https://awslabs.github.io/aws-lambda-powertools-python/latest/core/tracer/)** - Decorators and utilities to trace Lambda function handlers, and both synchronous and asynchronous functions
Expand All @@ -32,6 +31,7 @@ A suite of Python utilities for AWS Lambda functions to ease adopting best pract
* **[Parser](https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/parser/)** - Data parsing and deep validation using Pydantic
* **[Idempotency](https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/idempotency/)** - Convert your Lambda functions into idempotent operations which are safe to retry
* **[Feature Flags](https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/feature_flags/)** - A simple rule engine to evaluate when one or multiple features should be enabled depending on the input
* **[Streaming](https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/streaming/)** - Streams datasets larger than the available memory as streaming data.

### Installation

Expand Down
3 changes: 3 additions & 0 deletions aws_lambda_powertools/utilities/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from aws_lambda_powertools.utilities.streaming.s3_object import S3Object

__all__ = ["S3Object"]
184 changes: 184 additions & 0 deletions aws_lambda_powertools/utilities/streaming/_s3_seekable_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import io
import logging
from typing import IO, TYPE_CHECKING, AnyStr, Iterable, List, Optional

import boto3

from aws_lambda_powertools.utilities.streaming.compat import PowertoolsStreamingBody

if TYPE_CHECKING:
from mypy_boto3_s3 import Client

logger = logging.getLogger(__name__)


class _S3SeekableIO(IO[bytes]):
"""
_S3SeekableIO wraps boto3.StreamingBody to allow for seeking. Seeking is achieved by closing the
existing connection and re-opening a new one, passing the correct HTTP Range header.

Parameters
----------
bucket: str
The S3 bucket
key: str
The S3 key
version_id: str, optional
A version ID of the object, when the S3 bucket is versioned
boto3_client: boto3 S3 Client, optional
An optional boto3 S3 client. If missing, a new one will be created.
sdk_options: dict, optional
Dictionary of options that will be passed to the S3 Client get_object API call
"""

def __init__(
self, bucket: str, key: str, version_id: Optional[str] = None, boto3_client=Optional["Client"], **sdk_options
):
self.bucket = bucket
self.key = key

# Holds the current position in the stream
self._position = 0

# Stores the closed state of the stream
self._closed: bool = False

# Caches the size of the object
self._size: Optional[int] = None

self._s3_client: Optional["Client"] = boto3_client
self._raw_stream: Optional[PowertoolsStreamingBody] = None

self._sdk_options = sdk_options
self._sdk_options["Bucket"] = bucket
self._sdk_options["Key"] = key
if version_id is not None:
self._sdk_options["VersionId"] = version_id

@property
def s3_client(self) -> "Client":
"""
Returns a boto3 S3 client
"""
if self._s3_client is None:
self._s3_client = boto3.client("s3")
return self._s3_client

@property
def size(self) -> int:
"""
Retrieves the size of the S3 object
"""
if self._size is None:
logger.debug("Getting size of S3 object")
self._size = self.s3_client.head_object(**self._sdk_options).get("ContentLength", 0)
return self._size

@property
def raw_stream(self) -> PowertoolsStreamingBody:
"""
Returns the boto3 StreamingBody, starting the stream from the seeked position.
"""
if self._raw_stream is None:
range_header = f"bytes={self._position}-"
logger.debug(f"Starting new stream at {range_header}")
self._raw_stream = self.s3_client.get_object(Range=range_header, **self._sdk_options).get("Body")
self._closed = False

return self._raw_stream

def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
"""
Seeks the current object, invalidating the underlying stream if the position changes.
"""
current_position = self._position

if whence == io.SEEK_SET:
self._position = offset
elif whence == io.SEEK_CUR:
self._position += offset
elif whence == io.SEEK_END:
self._position = self.size + offset
else:
raise ValueError(f"invalid whence ({whence}, should be {io.SEEK_SET}, {io.SEEK_CUR}, {io.SEEK_END})")

# Invalidate the existing stream, so a new one will be open on the next IO operation.
#
# Some consumers of this class might call seek multiple times, without affecting the net position.
# zipfile.ZipFile does this often. If we just blindly invalidated the stream, we would have to re-open
# an S3 HTTP connection just to continue reading on the same position as before, which would be inefficient.
#
# So we only invalidate it if there's a net position change after seeking, and we have an existing S3 connection
if current_position != self._position and self._raw_stream is not None:
self._raw_stream.close()
self._raw_stream = None

return self._position

def seekable(self) -> bool:
return True

def readable(self) -> bool:
return True

def writable(self) -> bool:
return False

def tell(self) -> int:
return self._position

def read(self, size: Optional[int] = -1) -> bytes:
size = None if size == -1 else size
data = self.raw_stream.read(size)
if data is not None:
self._position += len(data)
return data

def readline(self, size: Optional[int] = None) -> bytes:
data = self.raw_stream.readline(size)
self._position += len(data)
return data

def readlines(self, hint: int = -1) -> List[bytes]:
# boto3's StreamingResponse doesn't implement the "hint" parameter
data = self.raw_stream.readlines()
self._position += sum(len(line) for line in data)
return data

@property
def closed(self) -> bool:
return self._closed

def __next__(self):
return self.raw_stream.__next__()

def __iter__(self):
return self.raw_stream.__iter__()

def __enter__(self):
return self

def __exit__(self, *kwargs):
self.close()

def close(self) -> None:
self.raw_stream.close()
self._closed = True

def fileno(self) -> int:
raise NotImplementedError("this stream is not backed by a file descriptor")

def flush(self) -> None:
raise NotImplementedError("this stream is not writable")

def isatty(self) -> bool:
return False

def truncate(self, size: Optional[int] = 0) -> int:
raise NotImplementedError("this stream is not writable")

def write(self, data: AnyStr) -> int:
raise NotImplementedError("this stream is not writable")

def writelines(self, lines: Iterable[AnyStr]) -> None:
raise NotImplementedError("this stream is not writable")
Loading