Skip to content

Commit 23f672a

Browse files
heitorlessarubenfonseca
authored andcommitted
docs(streaming): add reading ahead and backwards section
1 parent 8586c2a commit 23f672a

File tree

5 files changed

+85
-5
lines changed

5 files changed

+85
-5
lines changed

docs/utilities/streaming.md

+36
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,42 @@ We provide popular built-in transformations that you can apply against your stre
9393

9494
## Advanced
9595

96+
### Reading ahead or backwards
97+
98+
`S3Object` implements [Python I/O interface](https://docs.python.org/3/tutorial/inputoutput.html){target="_blank"}. This means you can use `seek` to start reading contents of your file from any particular position, saving you processing time.
99+
100+
#### Reading backwards
101+
102+
For example, let's imagine you have a large CSV file, each row has a non-uniform size (bytes), and you want to read and process the last row only.
103+
104+
```csv title="non_uniform_sample.csv"
105+
--8<-- "examples/streaming/src/non_uniform_sample.csv"
106+
```
107+
108+
You found out the last row has exactly 30 bytes. We can use `seek()` to skip to the end of the file, read 30 bytes, then transform to CSV.
109+
110+
```python title="Reading only the last CSV row" hl_lines="16 18"
111+
--8<-- "examples/streaming/src/s3_csv_stream_non_uniform_seek.py"
112+
```
113+
114+
#### Reading ahead
115+
116+
!!! question "What if we want to jump the first N rows?"
117+
118+
You can also solve with `seek`, but let's take a large uniform CSV file to make this easier to grasp.
119+
120+
```csv title="uniform_sample.csv"
121+
--8<-- "examples/streaming/src/uniform_sample.csv"
122+
```
123+
124+
You found out that each row has 8 bytes, the header line has 22 bytes, and every new line has 1 byte.
125+
126+
You want to skip the first 100 lines.
127+
128+
```python hl_lines="28 31" title="Skipping the first 100 rows"
129+
--8<-- "examples/streaming/src/s3_csv_stream_seek.py"
130+
```
131+
96132
### Custom options for data transformations
97133

98134
We will propagate additional options to the underlying implementation for each transform class.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
id,name,location
2+
1,Ruben Fonseca, Denmark
3+
2,Heitor Lessa, Netherlands
4+
3,Leandro Damascena, Portugal
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import io
2+
from typing import Dict
3+
4+
from aws_lambda_powertools.utilities.streaming.s3_object import S3Object
5+
from aws_lambda_powertools.utilities.streaming.transformations import CsvTransform
6+
from aws_lambda_powertools.utilities.typing import LambdaContext
7+
8+
LAST_ROW_SIZE = 30
9+
CSV_HEADERS = ["id", "name", "location"]
10+
11+
12+
def lambda_handler(event: Dict[str, str], context: LambdaContext):
13+
sample_csv = S3Object(bucket=event["bucket"], key="sample.csv")
14+
15+
# Jump to the end of the file
16+
sample_csv.seek(0, io.SEEK_END)
17+
# From the current position, jump exactly 30 bytes
18+
sample_csv.seek(sample_csv.tell() - LAST_ROW_SIZE, io.SEEK_SET)
19+
20+
# Transform portion of data into CSV with our headers
21+
sample_csv.transform(CsvTransform(fieldnames=CSV_HEADERS), in_place=True)
22+
23+
# We will only read the last portion of the file from S3
24+
# as we're only interested in the last 'location' from our dataset
25+
for last_row in sample_csv:
26+
print(last_row["location"])

examples/streaming/src/s3_csv_stream_seek.py

+15-5
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,28 @@
88
"""
99
Assuming the CSV files contains rows after the header always has 8 bytes + 1 byte newline:
1010
11+
reading,position,type
1112
21.3,5,+
1213
23.4,4,+
1314
21.3,0,-
15+
...
1416
"""
1517

18+
CSV_HEADERS = ["reading", "position", "type"]
19+
ROW_SIZE = 8 + 1 # 1 byte newline
20+
HEADER_SIZE = 22 + 1 # 1 byte newline
21+
LINES_TO_JUMP = 100
22+
1623

1724
def lambda_handler(event: Dict[str, str], context: LambdaContext):
18-
s3 = S3Object(bucket=event["bucket"], key=event["key"])
25+
sample_csv = S3Object(bucket=event["bucket"], key=event["key"])
26+
27+
# Skip the header line
28+
sample_csv.seek(HEADER_SIZE, io.SEEK_SET)
1929

2030
# Jump 100 lines of 9 bytes each (8 bytes of data + 1 byte newline)
21-
s3.seek(100 * 9, io.SEEK_SET)
31+
sample_csv.seek(LINES_TO_JUMP * ROW_SIZE, io.SEEK_SET)
2232

23-
s3.transform(CsvTransform(), in_place=True)
24-
for obj in s3:
25-
print(obj)
33+
sample_csv.transform(CsvTransform(), in_place=True)
34+
for row in sample_csv:
35+
print(row["reading"])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
reading,position,type
2+
21.3,5,+
3+
23.4,4,+
4+
21.3,0,-

0 commit comments

Comments
 (0)