Skip to content

Commit 1f8c11a

Browse files
committed
fix: re-wrote implementation and added documentation
1 parent a0e4f6f commit 1f8c11a

17 files changed

+476
-275
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from aws_lambda_powertools.utilities.streaming.s3_object import S3Object
2+
3+
__all__ = ["S3Object"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import io
2+
import logging
3+
from typing import IO, TYPE_CHECKING, AnyStr, Iterable, List, Optional
4+
5+
import boto3
6+
from botocore.response import StreamingBody
7+
8+
if TYPE_CHECKING:
9+
from mypy_boto3_s3 import S3ServiceResource
10+
from mypy_boto3_s3.service_resource import Object
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class _S3SeekableIO(IO[bytes]):
16+
"""
17+
_S3SeekableIO wraps boto3.StreamingBody to allow for seeking. Seeking is achieved by closing the
18+
existing connection and re-opening a new one, passing the correct HTTP Range header.
19+
20+
Parameters
21+
----------
22+
bucket: str
23+
The S3 bucket
24+
key: str
25+
The S3 key
26+
version_id: str, optional
27+
A version ID of the object, when the S3 bucket is versioned
28+
boto3_s3_resource: S3ServiceResource, optional
29+
An optional boto3 S3 resource. If missing, a new one will be created.
30+
"""
31+
32+
def __init__(
33+
self, bucket: str, key: str, version_id: Optional[str] = None, boto3_s3_resource=Optional["S3ServiceResource"]
34+
):
35+
self.bucket = bucket
36+
self.key = key
37+
self.version_id = version_id
38+
39+
# Holds the current position in the stream
40+
self._position = 0
41+
42+
# Caches the size of the object
43+
self._size: Optional[int] = None
44+
45+
self._s3_object: Optional["Object"] = None
46+
self._s3_resource: Optional["S3ServiceResource"] = boto3_s3_resource
47+
self._raw_stream: Optional[StreamingBody] = None
48+
49+
@property
50+
def s3_resource(self) -> "S3ServiceResource":
51+
"""
52+
Returns a boto3 S3ServiceResource
53+
"""
54+
if self._s3_resource is None:
55+
self._s3_resource = boto3.resource("s3")
56+
return self._s3_resource
57+
58+
@property
59+
def s3_object(self) -> "Object":
60+
"""
61+
Returns a boto3 S3Object
62+
"""
63+
if self._s3_object is None:
64+
if self.version_id is not None:
65+
self._s3_object = self.s3_resource.ObjectVersion(
66+
bucket_name=self.bucket, object_key=self.key, id=self.version_id
67+
).Object()
68+
else:
69+
self._s3_object = self.s3_resource.Object(bucket_name=self.bucket, key=self.key)
70+
71+
return self._s3_object
72+
73+
@property
74+
def size(self) -> int:
75+
"""
76+
Retrieves the size of the S3 object
77+
"""
78+
if self._size is None:
79+
self._size = self.s3_object.content_length
80+
return self._size
81+
82+
@property
83+
def raw_stream(self) -> StreamingBody:
84+
"""
85+
Returns the boto3 StreamingBody, starting the stream from the seeked position.
86+
"""
87+
if self._raw_stream is None:
88+
range_header = "bytes=%d-" % self._position
89+
logging.debug(f"Starting new stream at {range_header}...")
90+
self._raw_stream = self.s3_object.get(Range=range_header)["Body"]
91+
92+
return self._raw_stream
93+
94+
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
95+
"""
96+
Seeks the current object, invalidating the underlying stream if the position changes.
97+
"""
98+
current_position = self._position
99+
100+
if whence == io.SEEK_SET:
101+
self._position = offset
102+
elif whence == io.SEEK_CUR:
103+
self._position += offset
104+
elif whence == io.SEEK_END:
105+
self._position = self.size + offset
106+
else:
107+
raise ValueError(f"invalid whence ({whence}, should be {io.SEEK_SET}, {io.SEEK_CUR}, {io.SEEK_END})")
108+
109+
# If we changed the position in the stream, we should invalidate the existing stream
110+
# and open a new one on the next read
111+
if current_position != self._position and self._raw_stream is not None:
112+
self._raw_stream.close()
113+
self._raw_stream = None
114+
115+
return self._position
116+
117+
def seekable(self) -> bool:
118+
return True
119+
120+
def readable(self) -> bool:
121+
return True
122+
123+
def writable(self) -> bool:
124+
return False
125+
126+
def tell(self) -> int:
127+
return self._position
128+
129+
def read(self, size: Optional[int] = -1) -> bytes:
130+
size = None if size == -1 else size
131+
data = self.raw_stream.read(size)
132+
if data is not None:
133+
self._position += len(data)
134+
return data
135+
136+
def readline(self, size: Optional[int] = None) -> bytes:
137+
data = self.raw_stream.readline(size)
138+
self._position += len(data)
139+
return data
140+
141+
def readlines(self, hint: int = -1) -> List[bytes]:
142+
# boto3's StreamingResponse doesn't implement the "hint" parameter
143+
return self.raw_stream.readlines()
144+
145+
@property
146+
def closed(self) -> bool:
147+
return self.raw_stream.closed
148+
149+
def __next__(self):
150+
return self.raw_stream.__next__()
151+
152+
def __iter__(self):
153+
return self.raw_stream.__iter__()
154+
155+
def __enter__(self):
156+
return self
157+
158+
def __exit__(self, **kwargs):
159+
self.close()
160+
161+
def close(self) -> None:
162+
self.raw_stream.close()
163+
164+
def fileno(self) -> int:
165+
raise NotImplementedError()
166+
167+
def flush(self) -> None:
168+
raise NotImplementedError()
169+
170+
def isatty(self) -> bool:
171+
return False
172+
173+
def truncate(self, size: Optional[int] = 0) -> int:
174+
raise NotImplementedError()
175+
176+
def write(self, data: AnyStr) -> int:
177+
raise NotImplementedError()
178+
179+
def writelines(self, lines: Iterable[AnyStr]) -> None:
180+
raise NotImplementedError()

0 commit comments

Comments
 (0)