Skip to content

Commit 5a0ec46

Browse files
authored
Align extraction time to time window (#11)
1 parent 34496b2 commit 5a0ec46

File tree

7 files changed

+117
-36
lines changed

7 files changed

+117
-36
lines changed

README.md

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
# Arduino AWS S3 CSV exporter
22

33
This project provides a way to extract time series samples from Arduino cloud, publishing to a S3 destination bucket.
4-
Data are extracted at the given resolution via a scheduled Lambda function. Then samples are stored in CSV files and saved to S3.
4+
Data are extracted at the given resolution via a scheduled Lambda function. Samples are stored in CSV files and saved to S3.
55
By default, data extraction is performed every hour (configurable), extracting samples aggregated at 5min resolution (configurable).
6-
Aggregation is performed as average over aggregation period.
7-
Non numeric values like strings are sampled at the given resolution.
6+
Aggregation is performed as average over aggregation period. Non numeric values like strings are sampled at the given resolution.
87

98
## Architecture
109

11-
S3 exporter is based on a Go lambda function triggered by periodic event from EventBridge.
12-
Job is configured to extract samples for a 60min time window with the default resolution of 5min.
10+
S3 exporter is based on a GO Lambda function triggered by periodic event from EventBridge.
11+
Function is triggered at a fixed rate (by default, 1 hour), starting from the deployment time.
12+
Rate also define the time extraction window. So, with a 1 hour scheduling, one hour of data are extracted.
1313
One file is created per execution and contains all samples for selected things. Time series samples are exported at UTC timezone.
1414
By default, all Arduino things present in the account are exported: it is possible to filter them via [tags](#tag-filtering).
1515

@@ -29,6 +29,9 @@ Files are organized by date and files of the same day are grouped.
2929
<bucket>:2024-09-04/2024-09-04-12-00.csv
3030
```
3131

32+
Data extraction is aligned with function execution time.
33+
It is possible to align data extracted with extraction time window (for example, export last complete hour) by configuring `/arduino/s3-exporter/{stack-name}/iot/align_with_time_window` property.
34+
3235
## Deployment via Cloud Formation Template
3336

3437
It is possible to deploy required resources via [cloud formation template](deployment/cloud-formation-template/deployment.yaml)
@@ -74,11 +77,11 @@ These parameters are filled by CFT at stack creation time and can be adjusted la
7477
| /arduino/s3-exporter/{stack-name}/iot/org-id | (optional) organization id |
7578
| /arduino/s3-exporter/{stack-name}/iot/filter/tags | (optional) tags filtering. Syntax: tag=value,tag2=value2 |
7679
| /arduino/s3-exporter/{stack-name}/iot/samples-resolution | (optional) samples aggregation resolution (1/5/15 minutes, 1 hour, raw) |
77-
| /arduino/s3-exporter/{stack-name}/destination-bucket | S3 destination bucket |
7880
| /arduino/s3-exporter/{stack-name}/iot/scheduling | Execution scheduling |
81+
| /arduino/s3-exporter/{stack-name}/iot/align_with_time_window | Align data extraction with time windows (for example, last complte hour) |
7982
| /arduino/s3-exporter/{stack-name}/iot/aggregation-statistic | Aggregation statistic |
80-
81-
It is possible to compress (with gzip) files before uploading to S3. To enable compression, add ENABLE_COMPRESSION env variable to lambda configuration (with value true/false).
83+
| /arduino/s3-exporter/{stack-name}/destination-bucket | S3 destination bucket |
84+
| /arduino/s3-exporter/{stack-name}/enable_compression | Compress CSV files with gzip before uploading to S3 bucket |
8285

8386
### Tag filtering
8487

app/exporter/exporter.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,28 @@ import (
2828
"github.com/sirupsen/logrus"
2929
)
3030

31-
func StartExporter(ctx context.Context, logger *logrus.Entry, key, secret, orgid string, tagsF *string, resolution, timeWindowMinutes int, destinationS3Bucket string, aggregationStat string, compress bool) error {
31+
func StartExporter(
32+
ctx context.Context,
33+
logger *logrus.Entry,
34+
key, secret, orgid string,
35+
tagsF *string,
36+
resolution, timeWindowMinutes int,
37+
destinationS3Bucket string,
38+
aggregationStat string,
39+
compress, enableAlignTimeWindow bool) error {
3240

3341
// Init client
3442
iotcl, err := iot.NewClient(key, secret, orgid)
3543
if err != nil {
3644
return err
3745
}
3846

39-
if tagsF == nil {
40-
logger.Infoln("Things - searching with no filter")
47+
if tagsF != nil {
48+
logger.Infoln("Filtering things linked to configured account using tags: ", *tagsF)
4149
} else {
42-
logger.Infoln("Things - searching by tags: ", *tagsF)
50+
logger.Infoln("Importing all things linked to configured account")
4351
}
52+
4453
things, err := iotcl.ThingList(ctx, nil, nil, true, utils.ParseTags(tagsF))
4554
if err != nil {
4655
return err
@@ -60,7 +69,11 @@ func StartExporter(ctx context.Context, logger *logrus.Entry, key, secret, orgid
6069
return err
6170
}
6271

63-
if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat); err != nil {
72+
if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, enableAlignTimeWindow); err != nil {
73+
if writer != nil {
74+
writer.Close()
75+
defer writer.Delete()
76+
}
6477
logger.Error("Error aligning time series samples: ", err)
6578
return err
6679
} else {

business/tsextractor/tsextractor.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package tsextractor
1818
import (
1919
"context"
2020
"encoding/json"
21+
"errors"
2122
"fmt"
2223
"strconv"
2324
"strings"
@@ -45,13 +46,18 @@ func New(iotcl iot.API, logger *logrus.Entry) *TsExtractor {
4546
return &TsExtractor{iotcl: iotcl, logger: logger}
4647
}
4748

48-
func computeTimeAlignment(resolutionSeconds, timeWindowInMinutes int) (time.Time, time.Time) {
49+
func computeTimeAlignment(resolutionSeconds, timeWindowInMinutes int, enableAlignTimeWindow bool) (time.Time, time.Time) {
4950
// Compute time alignment
5051
if resolutionSeconds <= 60 {
5152
resolutionSeconds = 300 // Align to 5 minutes
5253
}
53-
to := time.Now().Truncate(time.Duration(resolutionSeconds) * time.Second).UTC()
54-
if resolutionSeconds <= 900 {
54+
55+
timeAlignmentSeconds := resolutionSeconds
56+
if enableAlignTimeWindow {
57+
timeAlignmentSeconds = timeWindowInMinutes * 60
58+
}
59+
to := time.Now().Truncate(time.Duration(timeAlignmentSeconds) * time.Second).UTC()
60+
if !enableAlignTimeWindow && resolutionSeconds <= 900 {
5561
// Shift time window to avoid missing data
5662
to = to.Add(-time.Duration(300) * time.Second)
5763
}
@@ -68,10 +74,11 @@ func (a *TsExtractor) ExportTSToFile(
6874
timeWindowInMinutes int,
6975
thingsMap map[string]iotclient.ArduinoThing,
7076
resolution int,
71-
aggregationStat string) (*csv.CsvWriter, time.Time, error) {
77+
aggregationStat string,
78+
enableAlignTimeWindow bool) (*csv.CsvWriter, time.Time, error) {
7279

7380
// Truncate time to given resolution
74-
from, to := computeTimeAlignment(resolution, timeWindowInMinutes)
81+
from, to := computeTimeAlignment(resolution, timeWindowInMinutes, enableAlignTimeWindow)
7582

7683
// Open csv output writer
7784
writer, err := csv.NewWriter(from, a.logger, isRawResolution(resolution))
@@ -81,6 +88,7 @@ func (a *TsExtractor) ExportTSToFile(
8188

8289
var wg sync.WaitGroup
8390
tokens := make(chan struct{}, importConcurrency)
91+
errorChannel := make(chan error, len(thingsMap))
8492

8593
if isRawResolution(resolution) {
8694
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: raw")
@@ -106,20 +114,23 @@ func (a *TsExtractor) ExportTSToFile(
106114
err := a.populateRawTSDataIntoS3(ctx, from, to, thingID, thing, writer)
107115
if err != nil {
108116
a.logger.Error("Error populating raw time series data: ", err)
117+
errorChannel <- err
109118
return
110119
}
111120
} else {
112121
// Populate numeric time series data
113122
err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, aggregationStat, writer)
114123
if err != nil {
115124
a.logger.Error("Error populating time series data: ", err)
125+
errorChannel <- err
116126
return
117127
}
118128

119129
// Populate string time series data, if any
120130
err = a.populateStringTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
121131
if err != nil {
122132
a.logger.Error("Error populating string time series data: ", err)
133+
errorChannel <- err
123134
return
124135
}
125136
}
@@ -129,6 +140,18 @@ func (a *TsExtractor) ExportTSToFile(
129140
// Wait for all routines termination
130141
a.logger.Infoln("Waiting for all data extraction jobs to terminate...")
131142
wg.Wait()
143+
close(errorChannel)
144+
145+
// Check if there were errors
146+
detectedErrors := false
147+
for err := range errorChannel {
148+
if err != nil {
149+
a.logger.Error(err)
150+
}
151+
}
152+
if detectedErrors {
153+
return writer, from, errors.New("errors detected during data export")
154+
}
132155

133156
return writer, from, nil
134157
}

business/tsextractor/tsextractor_test.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,46 @@ import (
1616
)
1717

1818
func TestTimeAlignment_HourlyTimeWindows(t *testing.T) {
19-
// Test the time alignment with hourly time windows
20-
from, to := computeTimeAlignment(3600, 60)
19+
// Test the time alignment with hourly time windows, not aligned
20+
nowTruncated := time.Now().UTC().Truncate(time.Duration(300) * time.Second).Add(-time.Duration(300) * time.Second)
21+
fromTuncated := nowTruncated.Add(-time.Hour)
22+
from, to := computeTimeAlignment(300, 60, false)
23+
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
24+
assert.Equal(t, nowTruncated, to)
25+
assert.Equal(t, fromTuncated, from)
26+
}
27+
28+
func TestTimeAlignment_HourlyTimeWindows_aligned(t *testing.T) {
29+
// Test the time alignment with hourly time windows, complete last hour
30+
nowTruncated := time.Now().UTC().Truncate(time.Hour)
31+
fromTuncated := nowTruncated.Add(-time.Hour)
32+
from, to := computeTimeAlignment(300, 60, true)
2133
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
34+
assert.Equal(t, nowTruncated, to)
35+
assert.Equal(t, fromTuncated, from)
2236
}
2337

2438
func TestTimeAlignment_15minTimeWindows(t *testing.T) {
2539
// Test the time alignment with hourly time windows
26-
from, to := computeTimeAlignment(3600, 15)
40+
from, to := computeTimeAlignment(3600, 15, false)
2741
assert.Equal(t, int64(900), to.Unix()-from.Unix())
2842
}
2943

3044
func TestTimeAlignment_15min_HourlyTimeWindows(t *testing.T) {
3145
// Test the time alignment with hourly time windows and 15min resolution
32-
from, to := computeTimeAlignment(900, 60)
46+
from, to := computeTimeAlignment(900, 60, false)
3347
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
3448
}
3549

3650
func TestTimeAlignment_5min_HourlyTimeWindows(t *testing.T) {
3751
// Test the time alignment with hourly time windows and 5min resolution
38-
from, to := computeTimeAlignment(300, 60)
52+
from, to := computeTimeAlignment(300, 60, false)
3953
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
4054
}
4155

4256
func TestTimeAlignment_raw_HourlyTimeWindows(t *testing.T) {
4357
// Test the time alignment with hourly time windows and 5min resolution
44-
from, to := computeTimeAlignment(-1, 60)
58+
from, to := computeTimeAlignment(-1, 60, false)
4559
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
4660
}
4761

@@ -91,7 +105,7 @@ func TestExtractionFlow_defaultAggregation(t *testing.T) {
91105
PropertiesCount: &propCount,
92106
}
93107

94-
writer, from, err := tsextractorClient.ExportTSToFile(ctx, 60, thingsMap, 300, "AVG")
108+
writer, from, err := tsextractorClient.ExportTSToFile(ctx, 60, thingsMap, 300, "AVG", false)
95109
assert.NoError(t, err)
96110
assert.NotNil(t, writer)
97111
assert.NotNil(t, from)

deployment/cloud-formation-template/deployment.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,22 @@ Resources:
197197
Ref: ResolutionAggregationStatistic
198198
Tier: Standard
199199

200+
CompressionParameter:
201+
Type: AWS::SSM::Parameter
202+
Properties:
203+
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/enable_compression
204+
Type: String
205+
Value: "false"
206+
Tier: Standard
207+
208+
AlignExtractionParameter:
209+
Type: AWS::SSM::Parameter
210+
Properties:
211+
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/align_with_time_window
212+
Type: String
213+
Value: "false"
214+
Tier: Standard
215+
200216
# EventBridge Rule to trigger Lambda every hour
201217
EventBridgeRule:
202218
Type: AWS::Events::Rule

lambda.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type AWSS3ImportTrigger struct {
3333
const (
3434
GlobalArduinoPrefix = "/arduino/s3-importer"
3535

36-
// Compatibility parameters for backward compatibility
36+
// Parameters for backward compatibility
3737
IoTApiKey = GlobalArduinoPrefix + "/iot/api-key"
3838
IoTApiSecret = GlobalArduinoPrefix + "/iot/api-secret"
3939
IoTApiOrgId = GlobalArduinoPrefix + "/iot/org-id"
@@ -53,6 +53,8 @@ const (
5353
SchedulingStack = PerStackArduinoPrefix + "/iot/scheduling"
5454
DestinationS3BucketStack = PerStackArduinoPrefix + "/destination-bucket"
5555
AggregationStatStack = PerStackArduinoPrefix + "/iot/aggregation-statistic"
56+
AlignWithTimeWindowStack = PerStackArduinoPrefix + "/iot/align_with_time_window"
57+
EnableCompressionStack = PerStackArduinoPrefix + "/enable_compression"
5658

5759
SamplesResolutionSeconds = 300
5860
DefaultTimeExtractionWindowMinutes = 60
@@ -61,8 +63,8 @@ const (
6163
func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, error) {
6264

6365
logger := logrus.NewEntry(logrus.New())
66+
6467
stackName := os.Getenv("STACK_NAME")
65-
compressFile := os.Getenv("ENABLE_COMPRESSION")
6668

6769
var apikey *string
6870
var apiSecret *string
@@ -71,6 +73,8 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
7173
var orgId *string
7274
var err error
7375
var aggregationStat *string
76+
enabledCompression := false
77+
enableAlignTimeWindow := false
7478

7579
logger.Infoln("------ Reading parameters from SSM")
7680
paramReader, err := parameters.New()
@@ -98,6 +102,17 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
98102
tags = tagsParam
99103
}
100104
aggregationStat, _ = paramReader.ReadConfigByStack(AggregationStatStack, stackName)
105+
106+
alignTs, _ := paramReader.ReadConfigByStack(AlignWithTimeWindowStack, stackName)
107+
if alignTs != nil && *alignTs == "true" {
108+
enableAlignTimeWindow = true
109+
}
110+
111+
compression, _ := paramReader.ReadConfigByStack(EnableCompressionStack, stackName)
112+
if compression != nil && *compression == "true" {
113+
enabledCompression = true
114+
}
115+
101116
} else {
102117
apikey, err = paramReader.ReadConfig(IoTApiKey)
103118
if err != nil {
@@ -148,11 +163,6 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
148163
resolution = &defReso
149164
}
150165

151-
enabledCompression := false
152-
if compressFile == "true" {
153-
enabledCompression = true
154-
}
155-
156166
logger.Infoln("------ Running import")
157167
if event.Dev || os.Getenv("DEV") == "true" {
158168
logger.Infoln("Running in dev mode")
@@ -174,12 +184,14 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
174184
logger.Infoln("resolution:", *resolution, "seconds")
175185
}
176186
logger.Infoln("aggregation statistic:", *aggregationStat)
177-
logger.Infoln("data extraction time windows:", extractionWindowMinutes, "minutes")
187+
logger.Infoln("data extraction time window:", *extractionWindowMinutes, "minutes")
178188
logger.Infoln("file compression enabled:", enabledCompression)
189+
logger.Infoln("align time window:", enableAlignTimeWindow)
179190

180-
err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat, enabledCompression)
191+
err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat, enabledCompression, enableAlignTimeWindow)
181192
if err != nil {
182-
return nil, err
193+
message := "Error detected during data export"
194+
return &message, err
183195
}
184196

185197
message := "Data exported successfully"

resources/test/localexecution.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func HandleRequest(ctx context.Context, dev bool) (*string, error) {
8989
logger.Infoln("tags:", *tags)
9090
}
9191

92-
err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket, "MAX", true)
92+
err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket, "MAX", true, true)
9393
if err != nil {
9494
return nil, err
9595
}

0 commit comments

Comments
 (0)