Skip to content

Commit e235d39

Browse files
committed
feat: add s3 streaming utility
1 parent f9752df commit e235d39

File tree

15 files changed

+497
-0
lines changed

15 files changed

+497
-0
lines changed

aws_lambda_powertools/utilities/streaming/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
import io
2+
import logging
3+
import typing
4+
from typing import TYPE_CHECKING, List, Literal, Optional, Sequence, Union, overload
5+
6+
import boto3
7+
from botocore.response import StreamingBody
8+
9+
from aws_lambda_powertools.utilities.streaming.transformations.base import (
10+
BaseTransform,
11+
T,
12+
)
13+
from aws_lambda_powertools.utilities.streaming.transformations.gzip import GzipTransform
14+
from aws_lambda_powertools.utilities.streaming.transformations.json import JsonTransform
15+
16+
if TYPE_CHECKING:
17+
from mypy_boto3_s3.service_resource import Object, S3ServiceResource
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class _S3Proxy(io.RawIOBase):
23+
def __init__(
24+
self, bucket: str, key: str, version_id: Optional[str] = None, boto3_s3_resource=Optional["S3ServiceResource"]
25+
):
26+
self.bucket = bucket
27+
self.key = key
28+
self.version_id = version_id
29+
30+
self._position = 0
31+
self._size: Optional[int] = None
32+
33+
self._s3_object: Optional["Object"] = None
34+
self._s3_resource: Optional["S3ServiceResource"] = boto3_s3_resource
35+
self._raw_stream: Optional[StreamingBody] = None
36+
37+
@property
38+
def s3_resource(self) -> "S3ServiceResource":
39+
if self._s3_resource is None:
40+
self._s3_resource = boto3.resource("s3")
41+
return self._s3_resource
42+
43+
@property
44+
def s3_object(self) -> "Object":
45+
if self._s3_object is None:
46+
if self.version_id is not None:
47+
self._s3_object = self.s3_resource.ObjectVersion(
48+
bucket_name=self.bucket, object_key=self.key, id=self.version_id
49+
).Object()
50+
else:
51+
self._s3_object = self.s3_resource.Object(bucket_name=self.bucket, key=self.key)
52+
53+
return self._s3_object
54+
55+
@property
56+
def size(self) -> int:
57+
if self._size is None:
58+
self._size = self.s3_object.content_length
59+
return self._size
60+
61+
@property
62+
def raw_stream(self) -> StreamingBody:
63+
if self._raw_stream is None:
64+
range_header = "bytes=%d-" % self._position
65+
logging.debug(f"Starting new stream at {range_header}...")
66+
self._raw_stream = self.s3_object.get(Range=range_header)["Body"]
67+
68+
return self._raw_stream
69+
70+
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
71+
current_position = self._position
72+
73+
if whence == io.SEEK_SET:
74+
self._position = offset
75+
elif whence == io.SEEK_CUR:
76+
self._position += offset
77+
elif whence == io.SEEK_END:
78+
self._position = self.size + offset
79+
else:
80+
raise ValueError(f"invalid whence ({whence}, should be {io.SEEK_SET}, {io.SEEK_CUR}, {io.SEEK_END})")
81+
82+
# If we changed the position in the stream, we should invalidate the existing stream
83+
# and open a new one on the next read
84+
if current_position != self._position and self._raw_stream is not None:
85+
self._raw_stream.close()
86+
self._raw_stream = None
87+
88+
return self._position
89+
90+
def seekable(self) -> bool:
91+
return True
92+
93+
def readable(self) -> bool:
94+
return True
95+
96+
def writable(self) -> bool:
97+
return False
98+
99+
def read(self, size: Optional[int] = -1) -> Optional[bytes]:
100+
size = None if size == -1 else size
101+
data = self.raw_stream.read(size)
102+
if data is not None:
103+
self._position += len(data)
104+
return data
105+
106+
def readline(self, size: Optional[int] = None) -> bytes:
107+
data = self.raw_stream.readline(size)
108+
self._position += len(data)
109+
return data
110+
111+
@property
112+
def closed(self) -> bool:
113+
return self.raw_stream.closed
114+
115+
def __next__(self):
116+
return self.raw_stream.__next__()
117+
118+
def __iter__(self):
119+
return self.raw_stream.__iter__()
120+
121+
122+
class S3Object(io.RawIOBase):
123+
def __init__(
124+
self,
125+
bucket: str,
126+
key: str,
127+
version_id: Optional[str] = None,
128+
boto3_s3_resource: Optional["S3ServiceResource"] = None,
129+
gunzip: Optional[bool] = False,
130+
json: Optional[bool] = False,
131+
):
132+
self.bucket = bucket
133+
self.key = key
134+
self.version_id = version_id
135+
self.raw_stream = _S3Proxy(bucket=bucket, key=key, version_id=version_id, boto3_s3_resource=boto3_s3_resource)
136+
137+
self._transformed_stream: Optional[io.RawIOBase] = None
138+
self._data_transformations: List[BaseTransform] = []
139+
if gunzip:
140+
self._data_transformations.append(GzipTransform())
141+
if json:
142+
self._data_transformations.append(JsonTransform())
143+
144+
@property
145+
def size(self) -> int:
146+
return self.raw_stream.size
147+
148+
@property
149+
def transformed_stream(self) -> io.RawIOBase:
150+
if self._transformed_stream is None:
151+
# Apply all the transformations
152+
transformed_stream = self.raw_stream
153+
for transformation in self._data_transformations:
154+
transformed_stream = transformation.transform(transformed_stream)
155+
156+
self._transformed_stream = transformed_stream
157+
158+
return self._transformed_stream
159+
160+
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
161+
return self.raw_stream.seek(offset, whence)
162+
163+
def seekable(self) -> bool:
164+
return self.raw_stream.seekable()
165+
166+
def readable(self) -> bool:
167+
return self.raw_stream.readable()
168+
169+
def writable(self) -> bool:
170+
return self.raw_stream.writable()
171+
172+
def tell(self) -> int:
173+
return self.raw_stream.tell()
174+
175+
@property
176+
def closed(self) -> bool:
177+
return self.raw_stream.closed
178+
179+
def __enter__(self):
180+
return self
181+
182+
def __exit__(self, *args):
183+
self.close()
184+
185+
def close(self):
186+
self.raw_stream.close()
187+
188+
# Also close transformed stream if there are any transformations
189+
if self.raw_stream != self._transformed_stream and self._transformed_stream is not None:
190+
self._transformed_stream.close()
191+
192+
def read(self, size: int = -1) -> Optional[bytes]:
193+
return self.transformed_stream.read(size)
194+
195+
def readline(self, size: Optional[int] = None) -> bytes:
196+
return self.transformed_stream.readline(size)
197+
198+
def __next__(self):
199+
return self.transformed_stream.__next__()
200+
201+
def __iter__(self):
202+
return self.transformed_stream.__iter__()
203+
204+
@overload
205+
def transform(
206+
self, transformations: Union[BaseTransform[T], Sequence[BaseTransform[T]]], in_place: Literal[True]
207+
) -> T:
208+
pass
209+
210+
@overload
211+
def transform(
212+
self, transformations: Union[BaseTransform[T], Sequence[BaseTransform[T]]], in_place: Literal[False]
213+
) -> None:
214+
pass
215+
216+
@overload
217+
def transform(self, transformations: Union[BaseTransform[T], Sequence[BaseTransform[T]]]) -> T:
218+
pass
219+
220+
def transform(
221+
self, transformations: Union[BaseTransform[T], Sequence[BaseTransform[T]]], in_place: Optional[bool] = False
222+
) -> Optional[T]:
223+
if self.tell() != 0:
224+
raise ValueError(f"Cannot add transformations to a read object. Already read {self.tell()} bytes")
225+
226+
# Make transformations always be a sequence to make mypy happy
227+
if not isinstance(transformations, Sequence):
228+
transformations = [transformations]
229+
230+
if in_place:
231+
self._data_transformations.extend(transformations)
232+
233+
# Invalidate transformed stream
234+
self._transformed_stream = None
235+
return None
236+
else:
237+
# Tell MyPy that raw_stream actually implements T (bound to io.RawIOBase)
238+
stream = typing.cast(T, self.raw_stream)
239+
for transformation in transformations:
240+
stream = transformation.transform(stream)
241+
return stream
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from aws_lambda_powertools.utilities.streaming.transformations.base import BaseTransform
2+
from aws_lambda_powertools.utilities.streaming.transformations.gzip import GzipTransform
3+
from aws_lambda_powertools.utilities.streaming.transformations.json import JsonTransform
4+
from aws_lambda_powertools.utilities.streaming.transformations.zip import ZipTransform
5+
6+
__all__ = ["BaseTransform", "GzipTransform", "JsonTransform", "ZipTransform"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import io
2+
from abc import abstractmethod
3+
from typing import Generic, TypeVar
4+
5+
T = TypeVar("T", bound=io.RawIOBase)
6+
7+
8+
class BaseTransform(Generic[T]):
9+
def __init__(self, **kwargs):
10+
self.kwargs = kwargs
11+
12+
@abstractmethod
13+
def transform(self, input_stream: io.RawIOBase) -> T:
14+
pass
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import io
2+
from gzip import GzipFile
3+
4+
from aws_lambda_powertools.utilities.streaming.transformations.base import BaseTransform
5+
6+
7+
class GzipTransform(BaseTransform):
8+
def transform(self, input_stream: io.RawIOBase) -> GzipFile:
9+
return GzipFile(fileobj=input_stream, mode="rb", **self.kwargs)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import io
2+
import json
3+
from json import JSONDecodeError
4+
from typing import Optional
5+
6+
from aws_lambda_powertools.utilities.streaming.transformations.base import BaseTransform
7+
8+
9+
class JsonDeserializer(io.RawIOBase):
10+
def __init__(self, input_stream: io.RawIOBase):
11+
self.input = input_stream
12+
13+
def read(self, size: int = -1) -> Optional[bytes]:
14+
return self.input.read(size)
15+
16+
def readline(self, size: Optional[int] = None) -> bytes:
17+
return self.input.readline(size)
18+
19+
def read_object(self) -> dict:
20+
obj: dict = {}
21+
22+
while not self.input.closed:
23+
line = self.input.__next__()
24+
try:
25+
obj = json.loads(line)
26+
except JSONDecodeError:
27+
continue
28+
break
29+
30+
return obj
31+
32+
def __next__(self):
33+
return self.read_object()
34+
35+
36+
class JsonTransform(BaseTransform):
37+
def transform(self, input_stream: io.RawIOBase) -> JsonDeserializer:
38+
return JsonDeserializer(input_stream=input_stream)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import io
2+
import typing
3+
from zipfile import ZipFile
4+
5+
from aws_lambda_powertools.utilities.streaming.transformations.base import BaseTransform
6+
7+
8+
class ZipTransform(BaseTransform):
9+
def transform(self, input_stream: io.RawIOBase) -> ZipFile:
10+
input_as_io = typing.cast(typing.IO[bytes], input_stream)
11+
return ZipFile(input_as_io, mode="r", **self.kwargs)

0 commit comments

Comments
 (0)