Skip to content

Commit ce274c0

Browse files
authored
Added configurable scheduling selection and moving to rate scheduling (#8)
1 parent c4e7031 commit ce274c0

File tree

12 files changed

+300
-79
lines changed

12 files changed

+300
-79
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ vendor/
2323
cover.out
2424

2525
dist/
26+
deployment/binaries/
2627

2728
coverage_unit.txt

README.md

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
This project provides a way to extract time series samples from Arduino cloud, publishing to a S3 destination bucket.
44
Data are extracted at the given resolution via a scheduled Lambda function. Then samples are stored in CSV files and saved to S3.
5-
By default, data extraction is performed every hour, extracting samples aggregated at 5min resolution. Non numeric values like strings are sampled at the given resolution.
5+
By default, data extraction is performed every hour (configurable), extracting samples aggregated at 5min resolution (configurable).
6+
Aggregation is performed as average over aggregation period.
7+
Non numeric values like strings are sampled at the given resolution.
68

79
## Architecture
810

@@ -22,9 +24,9 @@ timestamp,thing_id,thing_name,property_id,property_name,property_type,value
2224

2325
Files are organized by date and files of the same day are grouped.
2426
```
25-
<bucket>:2024-09-04/2024-09-04-10.csv
26-
<bucket>:2024-09-04/2024-09-04-11.csv
27-
<bucket>:2024-09-04/2024-09-04-12.csv
27+
<bucket>:2024-09-04/2024-09-04-10-00.csv
28+
<bucket>:2024-09-04/2024-09-04-11-00.csv
29+
<bucket>:2024-09-04/2024-09-04-12-00.csv
2830
```
2931

3032
## Deployment via Cloud Formation Template
@@ -45,7 +47,7 @@ Before stack creation, two S3 buckets have to be created:
4547
bucket must be in the same region where stack will be created.
4648

4749
Follow these steps to deploy a new stack:
48-
* download [lambda code binaries](deployment/binaries/arduino-s3-integration-lambda.zip) and [Cloud Formation Template](deployment/cloud-formation-template/deployment.yaml)
50+
* download [lambda code binaries](https://github.com/arduino/aws-s3-integration/releases) and [Cloud Formation Template](deployment/cloud-formation-template/deployment.yaml)
4951
* upload CFT and binary zip file on an S3 bucket accessible by the AWS account. For the CFT yaml file, copy the Object URL (it will be required in next step).
5052

5153
![object URL](docs/objecturl.png)
@@ -67,12 +69,13 @@ These parameters are filled by CFT at stack creation time and can be adjusted la
6769

6870
| Parameter | Description |
6971
| --------- | ----------- |
70-
| /arduino/s3-importer/iot/api-key | IoT API key |
71-
| /arduino/s3-importer/iot/api-secret | IoT API secret |
72-
| /arduino/s3-importer/iot/org-id | (optional) organization id |
73-
| /arduino/s3-importer/iot/filter/tags | (optional) tags filtering. Syntax: tag=value,tag2=value2 |
74-
| /arduino/s3-importer/iot/samples-resolution-seconds | (optional) samples resolution (default: 300s) |
75-
| /arduino/s3-importer/destination-bucket | S3 destination bucket |
72+
| /arduino/s3-exporter/<stack-name>/iot/api-key | IoT API key |
73+
| /arduino/s3-exporter/<stack-name>/iot/api-secret | IoT API secret |
74+
| /arduino/s3-exporter/<stack-name>/iot/org-id | (optional) organization id |
75+
| /arduino/s3-exporter/<stack-name>/iot/filter/tags | (optional) tags filtering. Syntax: tag=value,tag2=value2 |
76+
| /arduino/s3-exporter/<stack-name>/iot/samples-resolution | (optional) samples aggregation resolution (1/5/15 minutes, 1 hour, raw) |
77+
| /arduino/s3-exporter/<stack-name>/destination-bucket | S3 destination bucket |
78+
| /arduino/s3-exporter/<stack-name>/iot/scheduling | Execution scheduling |
7679

7780
### Tag filtering
7881

@@ -85,7 +88,7 @@ You can use tag filtering if you need to reduce export to a specific set of Thin
8588

8689
![tag 1](docs/tag-1.png)
8790

88-
* Configure tag filter during CFT creation of by editing '/arduino/s3-importer/iot/filter/tags' parameter (syntax: tag1=value1,tag2=value2).
91+
* Configure tag filter during CFT creation of by editing '/arduino/s3-exporter/<stack-name>/iot/filter/tags' parameter (syntax: tag1=value1,tag2=value2).
8992

9093
![tag filter](docs/tag-filter.png)
9194

business/tsextractor/tsextractor.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,25 @@ func New(iotcl *iot.Client, logger *logrus.Entry) *TsExtractor {
4545
return &TsExtractor{iotcl: iotcl, logger: logger}
4646
}
4747

48+
func computeTimeAlignment(resolutionSeconds, timeWindowInMinutes int) (time.Time, time.Time) {
49+
// Compute time alignment
50+
if resolutionSeconds <= 60 {
51+
resolutionSeconds = 300 // Align to 5 minutes
52+
}
53+
to := time.Now().Truncate(time.Duration(resolutionSeconds) * time.Second).UTC()
54+
from := to.Add(-time.Duration(timeWindowInMinutes) * time.Minute)
55+
return from, to
56+
}
57+
4858
func (a *TsExtractor) ExportTSToS3(
4959
ctx context.Context,
5060
timeWindowInMinutes int,
5161
thingsMap map[string]iotclient.ArduinoThing,
5262
resolution int,
5363
destinationS3Bucket string) error {
5464

55-
to := time.Now().Truncate(time.Hour).UTC()
56-
from := to.Add(-time.Duration(timeWindowInMinutes) * time.Minute)
65+
// Truncate time to given resolution
66+
from, to := computeTimeAlignment(resolution, timeWindowInMinutes)
5767

5868
// Open s3 output writer
5969
s3cl, err := s3.NewS3Client(destinationS3Bucket)
@@ -70,16 +80,16 @@ func (a *TsExtractor) ExportTSToS3(
7080
var wg sync.WaitGroup
7181
tokens := make(chan struct{}, importConcurrency)
7282

73-
a.logger.Infoln("=====> Export perf data - time window: ", timeWindowInMinutes, " minutes")
83+
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to)
7484
for thingID, thing := range thingsMap {
7585

7686
if thing.Properties == nil || len(thing.Properties) == 0 {
7787
a.logger.Warn("Skipping thing with no properties: ", thingID)
7888
continue
7989
}
8090

81-
wg.Add(1)
8291
tokens <- struct{}{}
92+
wg.Add(1)
8393

8494
go func(thingID string, thing iotclient.ArduinoThing, writer *csv.CsvWriter) {
8595
defer func() { <-tokens }()
@@ -111,13 +121,14 @@ func (a *TsExtractor) ExportTSToS3(
111121
}
112122

113123
// Wait for all routines termination
124+
a.logger.Infoln("Waiting for all data extraction jobs to terminate...")
114125
wg.Wait()
115126

116127
// Close csv output writer and upload to s3
117128
writer.Close()
118129
defer writer.Delete()
119130

120-
destinationKey := fmt.Sprintf("%s/%s.csv", from.Format("2006-01-02"), from.Format("2006-01-02-15"))
131+
destinationKey := fmt.Sprintf("%s/%s.csv", from.Format("2006-01-02"), from.Format("2006-01-02-15-04"))
121132
a.logger.Infof("Uploading file %s to bucket %s\n", destinationKey, s3cl.DestinationBucket())
122133
if err := s3cl.WriteFile(ctx, destinationKey, writer.GetFilePath()); err != nil {
123134
return err
@@ -159,7 +170,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
159170
break
160171
} else {
161172
// This is due to a rate limit on the IoT API, we need to wait a bit before retrying
162-
a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
173+
a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
163174
randomRateLimitingSleep()
164175
}
165176
}
@@ -261,7 +272,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
261272
break
262273
} else {
263274
// This is due to a rate limit on the IoT API, we need to wait a bit before retrying
264-
a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
275+
a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
265276
randomRateLimitingSleep()
266277
}
267278
}
@@ -321,7 +332,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
321332
break
322333
} else {
323334
// This is due to a rate limit on the IoT API, we need to wait a bit before retrying
324-
a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
335+
a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
325336
randomRateLimitingSleep()
326337
}
327338
}
@@ -337,7 +348,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
337348
}
338349

339350
propertyID := strings.Replace(response.Query, "property.", "", 1)
340-
a.logger.Infof("Thing %s - Query %s Property %s - %d values\n", thingID, response.Query, propertyID, response.CountValues)
351+
a.logger.Debugf("Thing %s - Query %s Property %s - %d values\n", thingID, response.Query, propertyID, response.CountValues)
341352
sampleCount += response.CountValues
342353

343354
propertyName, propertyType := extractPropertyNameAndType(thing, propertyID)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package tsextractor
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestTimeAlignment_HourlyTimeWindows(t *testing.T) {
10+
// Test the time alignment with hourly time windows
11+
from, to := computeTimeAlignment(3600, 60)
12+
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
13+
}
14+
15+
func TestTimeAlignment_15minTimeWindows(t *testing.T) {
16+
// Test the time alignment with hourly time windows
17+
from, to := computeTimeAlignment(3600, 15)
18+
assert.Equal(t, int64(900), to.Unix()-from.Unix())
19+
}
20+
21+
func TestTimeAlignment_15min_HourlyTimeWindows(t *testing.T) {
22+
// Test the time alignment with hourly time windows and 15min resolution
23+
from, to := computeTimeAlignment(900, 60)
24+
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
25+
}
26+
27+
func TestTimeAlignment_5min_HourlyTimeWindows(t *testing.T) {
28+
// Test the time alignment with hourly time windows and 5min resolution
29+
from, to := computeTimeAlignment(300, 60)
30+
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
31+
}
32+
33+
func TestTimeAlignment_raw_HourlyTimeWindows(t *testing.T) {
34+
// Test the time alignment with hourly time windows and 5min resolution
35+
from, to := computeTimeAlignment(-1, 60)
36+
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
37+
}
Binary file not shown.

deployment/cloud-formation-template/deployment.yaml

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@ AWSTemplateFormatVersion: '2010-09-09'
22
Description: Arduino S3 data exporter. For deployment and architectural details, see https://github.com/arduino/aws-s3-integration
33

44
Parameters:
5-
LambdaFunctionName:
6-
Type: String
7-
Default: 'arduino-s3-csv-data-exporter'
8-
Description: Name of the Lambda function.
9-
105
LambdaCodeS3Bucket:
116
Type: String
127
Description: S3 bucket where the Lambda function ZIP file is stored.
@@ -30,16 +25,32 @@ Parameters:
3025
Default: '<empty>'
3126
Description: Arduino Organization ID (optional).
3227

28+
ExecutionScheduling:
29+
Type: String
30+
Description: "Choose the execution scheduling for the data export"
31+
AllowedValues:
32+
- 5 minutes
33+
- 15 minutes
34+
- 1 hour
35+
- 1 day
36+
Default: 1 hour
37+
38+
Resolution:
39+
Type: String
40+
Description: "Samples resolution data extraction resolution. 'raw' and '1 minute' are not supported for '1 day' scheduling"
41+
AllowedValues:
42+
- raw
43+
- 1 minute
44+
- 5 minutes
45+
- 15 minutes
46+
- 1 hour
47+
Default: 5 minutes
48+
3349
TagFilter:
3450
Type: String
3551
Default: '<empty>'
3652
Description: Filter things to import by tag (optional). Format> tag1=value1,tag2=value2
3753

38-
Resolution:
39-
Type: Number
40-
Default: 300
41-
Description: Samples resolution in seconds. Default is 5 minutes (300s). Set to -1 to export raw data.
42-
4354
DestinationS3Bucket:
4455
Type: String
4556
Description: S3 bucket where CSV files will be stored.
@@ -73,7 +84,7 @@ Resources:
7384
- ssm:GetParameter
7485
- ssm:GetParameters
7586
- ssm:GetParametersByPath
76-
Resource: arn:aws:ssm:*:*:parameter/arduino/s3-importer/*
87+
Resource: arn:aws:ssm:*:*:parameter/arduino/s3-*
7788
- Effect: Allow
7889
Action:
7990
- s3:PutObject
@@ -87,8 +98,7 @@ Resources:
8798
LambdaFunction:
8899
Type: AWS::Lambda::Function
89100
Properties:
90-
FunctionName:
91-
Ref: LambdaFunctionName
101+
FunctionName: !Sub arduino-s3-csv-data-exporter-${AWS::StackName}
92102
Handler: bootstrap
93103
Role: !GetAtt ArduinoS3LambdaExecutionRole.Arn
94104
Code:
@@ -99,12 +109,15 @@ Resources:
99109
Runtime: provided.al2
100110
Timeout: 900
101111
MemorySize: 256
112+
Environment:
113+
Variables:
114+
STACK_NAME: !Sub ${AWS::StackName}
102115

103116
# Parameters in Parameter Store
104117
ApiKeyParameter:
105118
Type: AWS::SSM::Parameter
106119
Properties:
107-
Name: /arduino/s3-importer/iot/api-key
120+
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/api-key
108121
Type: String
109122
Value:
110123
Ref: IotApiKey
@@ -113,7 +126,7 @@ Resources:
113126
ApiSecretParameter:
114127
Type: AWS::SSM::Parameter
115128
Properties:
116-
Name: /arduino/s3-importer/iot/api-secret
129+
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/api-secret
117130
Type: String
118131
Value:
119132
Ref: IotApiSecret
@@ -122,7 +135,7 @@ Resources:
122135
OrgIdParameter:
123136
Type: AWS::SSM::Parameter
124137
Properties:
125-
Name: /arduino/s3-importer/iot/org-id
138+
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/org-id
126139
Type: String
127140
Value:
128141
Ref: IotOrgId
@@ -131,7 +144,7 @@ Resources:
131144
FilterTagsParameter:
132145
Type: AWS::SSM::Parameter
133146
Properties:
134-
Name: /arduino/s3-importer/iot/filter/tags
147+
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/filter/tags
135148
Type: String
136149
Value:
137150
Ref: TagFilter
@@ -140,7 +153,7 @@ Resources:
140153
ResolutionParameter:
141154
Type: AWS::SSM::Parameter
142155
Properties:
143-
Name: /arduino/s3-importer/iot/samples-resolution-seconds
156+
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/samples-resolution
144157
Type: String
145158
Value:
146159
Ref: Resolution
@@ -149,17 +162,27 @@ Resources:
149162
DestinationS3BucketParameter:
150163
Type: AWS::SSM::Parameter
151164
Properties:
152-
Name: /arduino/s3-importer/destination-bucket
165+
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/destination-bucket
153166
Type: String
154167
Value:
155168
Ref: DestinationS3Bucket
156169
Tier: Standard
157170

171+
ExecutionSchedulingParameter:
172+
Type: AWS::SSM::Parameter
173+
Properties:
174+
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/scheduling
175+
Type: String
176+
Value:
177+
Ref: ExecutionScheduling
178+
Tier: Standard
179+
158180
# EventBridge Rule to trigger Lambda every hour
159181
EventBridgeRule:
160182
Type: AWS::Events::Rule
161183
Properties:
162-
ScheduleExpression: cron(10 * * * ? *)
184+
ScheduleExpression:
185+
Fn::Sub: "rate(${ExecutionScheduling})"
163186
Targets:
164187
- Arn: !GetAtt LambdaFunction.Arn
165188
Id: LambdaTarget
@@ -170,8 +193,7 @@ Resources:
170193
LambdaPermissionForEventBridge:
171194
Type: AWS::Lambda::Permission
172195
Properties:
173-
FunctionName:
174-
Ref: LambdaFunctionName
196+
FunctionName: !Sub arduino-s3-csv-data-exporter-${AWS::StackName}
175197
Action: lambda:InvokeFunction
176198
Principal: events.amazonaws.com
177199
SourceArn: !GetAtt EventBridgeRule.Arn

docs/cft-stack-2.png

-24.1 KB
Loading

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.1
1111
github.com/aws/aws-sdk-go-v2/service/ssm v1.50.1
1212
github.com/sirupsen/logrus v1.9.3
13+
github.com/stretchr/testify v1.9.0
1314
golang.org/x/oauth2 v0.21.0
1415
)
1516

@@ -29,6 +30,9 @@ require (
2930
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
3031
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
3132
github.com/aws/smithy-go v1.20.4 // indirect
33+
github.com/davecgh/go-spew v1.1.1 // indirect
3234
github.com/jmespath/go-jmespath v0.4.0 // indirect
35+
github.com/pmezard/go-difflib v1.0.0 // indirect
3336
golang.org/x/sys v0.20.0 // indirect
37+
gopkg.in/yaml.v3 v3.0.1 // indirect
3438
)

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
6262
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
6363
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
6464
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
65+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
6566
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
6667
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
6768
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

0 commit comments

Comments
 (0)