Skip to content

Commit c5aa695

Browse files
committed
chore: add initial tests for s3_seekable_io
1 parent 1f8c11a commit c5aa695

File tree

3 files changed

+211
-3
lines changed

3 files changed

+211
-3
lines changed

aws_lambda_powertools/utilities/streaming/_s3_seekable_io.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ def __init__(
3939
# Holds the current position in the stream
4040
self._position = 0
4141

42+
# Stores the closed state of the stream
43+
self._closed: bool = False
44+
4245
# Caches the size of the object
4346
self._size: Optional[int] = None
4447

@@ -88,6 +91,7 @@ def raw_stream(self) -> StreamingBody:
8891
range_header = "bytes=%d-" % self._position
8992
logging.debug(f"Starting new stream at {range_header}...")
9093
self._raw_stream = self.s3_object.get(Range=range_header)["Body"]
94+
self._closed = False
9195

9296
return self._raw_stream
9397

@@ -140,11 +144,13 @@ def readline(self, size: Optional[int] = None) -> bytes:
140144

141145
def readlines(self, hint: int = -1) -> List[bytes]:
142146
# boto3's StreamingResponse doesn't implement the "hint" parameter
143-
return self.raw_stream.readlines()
147+
data = self.raw_stream.readlines()
148+
self._position += sum(len(line) for line in data)
149+
return data
144150

145151
@property
146152
def closed(self) -> bool:
147-
return self.raw_stream.closed
153+
return self._closed
148154

149155
def __next__(self):
150156
return self.raw_stream.__next__()
@@ -155,11 +161,12 @@ def __iter__(self):
155161
def __enter__(self):
156162
return self
157163

158-
def __exit__(self, **kwargs):
164+
def __exit__(self, *kwargs):
159165
self.close()
160166

161167
def close(self) -> None:
162168
self.raw_stream.close()
169+
self._closed = True
163170

164171
def fileno(self) -> int:
165172
raise NotImplementedError()

tests/functional/streaming/__init__.py

Whitespace-only changes.
+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
import io
2+
3+
import boto3
4+
import pytest
5+
from botocore import stub
6+
from botocore.response import StreamingBody
7+
8+
from aws_lambda_powertools.utilities.streaming._s3_seekable_io import _S3SeekableIO
9+
10+
11+
@pytest.fixture
12+
def s3_resource():
13+
return boto3.resource("s3")
14+
15+
16+
@pytest.fixture
17+
def s3_seekable_obj(s3_resource):
18+
return _S3SeekableIO(bucket="bucket", key="key", boto3_s3_resource=s3_resource)
19+
20+
21+
@pytest.fixture
22+
def s3_resource_stub(s3_resource):
23+
s3_stub = stub.Stubber(s3_resource.meta.client)
24+
s3_stub.activate()
25+
return s3_stub
26+
27+
28+
def test_seekable(s3_seekable_obj):
29+
assert s3_seekable_obj.seekable() is True
30+
31+
32+
def test_readable(s3_seekable_obj):
33+
assert s3_seekable_obj.readable() is True
34+
35+
36+
def test_writeable(s3_seekable_obj):
37+
assert s3_seekable_obj.writable() is False
38+
39+
40+
def test_tell_is_zero(s3_seekable_obj):
41+
assert s3_seekable_obj.tell() == 0
42+
43+
44+
def test_seek_set_changes_position(s3_seekable_obj):
45+
assert s3_seekable_obj.seek(300, io.SEEK_SET) == 300
46+
assert s3_seekable_obj.tell() == 300
47+
48+
49+
def test_seek_cur_changes_position(s3_seekable_obj):
50+
assert s3_seekable_obj.seek(200, io.SEEK_CUR) == 200
51+
assert s3_seekable_obj.seek(100, io.SEEK_CUR) == 300
52+
assert s3_seekable_obj.tell() == 300
53+
54+
55+
def test_seek_end(s3_seekable_obj, s3_resource_stub):
56+
s3_resource_stub.add_response("head_object", {"ContentLength": 1000})
57+
58+
assert s3_seekable_obj.seek(0, io.SEEK_END) == 1000
59+
assert s3_seekable_obj.tell() == 1000
60+
61+
62+
def test_size(s3_seekable_obj, s3_resource_stub):
63+
s3_resource_stub.add_response("head_object", {"ContentLength": 1000})
64+
65+
assert s3_seekable_obj.size == 1000
66+
67+
68+
def test_raw_stream_fetches_with_range_header(s3_seekable_obj, s3_resource_stub):
69+
s3_resource_stub.add_response(
70+
"get_object",
71+
{"Body": ""},
72+
{"Bucket": s3_seekable_obj.bucket, "Key": s3_seekable_obj.key, "Range": "bytes=0-"},
73+
)
74+
75+
assert s3_seekable_obj.raw_stream is not None
76+
77+
78+
def test_raw_stream_fetches_with_range_header_after_seek(s3_seekable_obj, s3_resource_stub):
79+
s3_seekable_obj.seek(100, io.SEEK_SET)
80+
81+
s3_resource_stub.add_response(
82+
"get_object",
83+
{"Body": ""},
84+
{"Bucket": s3_seekable_obj.bucket, "Key": s3_seekable_obj.key, "Range": "bytes=100-"},
85+
)
86+
87+
assert s3_seekable_obj.raw_stream is not None
88+
89+
90+
def test_read(s3_seekable_obj, s3_resource_stub):
91+
payload = b"hello world"
92+
streaming_body = StreamingBody(raw_stream=io.BytesIO(payload), content_length=len(payload))
93+
94+
s3_resource_stub.add_response(
95+
"get_object",
96+
{"Body": streaming_body},
97+
{"Bucket": s3_seekable_obj.bucket, "Key": s3_seekable_obj.key, "Range": "bytes=0-"},
98+
)
99+
100+
assert s3_seekable_obj.read(5) == b"hello"
101+
assert s3_seekable_obj.read(1) == b" "
102+
assert s3_seekable_obj.read(10) == b"world"
103+
assert s3_seekable_obj.tell() == len(payload)
104+
105+
106+
def test_readline(s3_seekable_obj, s3_resource_stub):
107+
payload = b"hello world\nworld hello"
108+
streaming_body = StreamingBody(raw_stream=io.BytesIO(payload), content_length=len(payload))
109+
110+
s3_resource_stub.add_response(
111+
"get_object",
112+
{"Body": streaming_body},
113+
{"Bucket": s3_seekable_obj.bucket, "Key": s3_seekable_obj.key, "Range": "bytes=0-"},
114+
)
115+
116+
assert s3_seekable_obj.readline() == b"hello world\n"
117+
assert s3_seekable_obj.readline() == b"world hello"
118+
assert s3_seekable_obj.tell() == len(payload)
119+
120+
121+
def test_readlines(s3_seekable_obj, s3_resource_stub):
122+
payload = b"hello world\nworld hello"
123+
streaming_body = StreamingBody(raw_stream=io.BytesIO(payload), content_length=len(payload))
124+
125+
s3_resource_stub.add_response(
126+
"get_object",
127+
{"Body": streaming_body},
128+
{"Bucket": s3_seekable_obj.bucket, "Key": s3_seekable_obj.key, "Range": "bytes=0-"},
129+
)
130+
131+
assert s3_seekable_obj.readlines() == [b"hello world\n", b"world hello"]
132+
assert s3_seekable_obj.tell() == len(payload)
133+
134+
135+
def test_closed(s3_seekable_obj, s3_resource_stub):
136+
payload = b"test"
137+
streaming_body = StreamingBody(raw_stream=io.BytesIO(payload), content_length=len(payload))
138+
139+
s3_resource_stub.add_response(
140+
"get_object",
141+
{"Body": streaming_body},
142+
{"Bucket": s3_seekable_obj.bucket, "Key": s3_seekable_obj.key, "Range": "bytes=0-"},
143+
)
144+
145+
s3_seekable_obj.close()
146+
assert s3_seekable_obj.closed is True
147+
148+
149+
def test_next(s3_seekable_obj, s3_resource_stub):
150+
payload = b"test"
151+
streaming_body = StreamingBody(raw_stream=io.BytesIO(payload), content_length=len(payload))
152+
153+
s3_resource_stub.add_response(
154+
"get_object",
155+
{"Body": streaming_body},
156+
{"Bucket": s3_seekable_obj.bucket, "Key": s3_seekable_obj.key, "Range": "bytes=0-"},
157+
)
158+
159+
assert next(s3_seekable_obj) == b"test"
160+
with pytest.raises(StopIteration):
161+
next(s3_seekable_obj)
162+
163+
164+
def test_context_manager(s3_seekable_obj, s3_resource_stub):
165+
payload = b"test"
166+
streaming_body = StreamingBody(raw_stream=io.BytesIO(payload), content_length=len(payload))
167+
168+
s3_resource_stub.add_response(
169+
"get_object",
170+
{"Body": streaming_body},
171+
{"Bucket": s3_seekable_obj.bucket, "Key": s3_seekable_obj.key, "Range": "bytes=0-"},
172+
)
173+
174+
with s3_seekable_obj as f:
175+
assert f.read(4) == b"test"
176+
177+
assert s3_seekable_obj.closed is True
178+
179+
180+
def test_fileno(s3_seekable_obj):
181+
with pytest.raises(NotImplementedError):
182+
s3_seekable_obj.fileno()
183+
184+
185+
def test_flush(s3_seekable_obj):
186+
with pytest.raises(NotImplementedError):
187+
s3_seekable_obj.flush()
188+
189+
190+
def test_isatty(s3_seekable_obj):
191+
assert s3_seekable_obj.isatty() is False
192+
193+
194+
def test_truncate(s3_seekable_obj):
195+
with pytest.raises(NotImplementedError):
196+
s3_seekable_obj.truncate()
197+
198+
199+
def test_write(s3_seekable_obj):
200+
with pytest.raises(NotImplementedError):
201+
s3_seekable_obj.write(b"data")

0 commit comments

Comments
 (0)