Skip to content

Commit 23a7e4a

Browse files
committed
repo init
0 parents  commit 23a7e4a

File tree

17 files changed

+1876
-0
lines changed

17 files changed

+1876
-0
lines changed

LICENSE

Lines changed: 373 additions & 0 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# AWS IoT S3 importer
2+
3+
This project provides a way to extract time series samples from Arduino cloud, publishing to an S3 destination bucket.
4+
Things can be filterd by tags.
5+
6+
## Deployment schema
7+
8+
Imported is based on a Go lambda function triggered by periodic events from EventBridge.
9+
Job is configured to extract samples for a 60min time window: trigger is configured accordingly on EventBridge.
10+
11+
### Policies
12+
13+
See policies defined in [cloud formation template](deployment/cloud-formation-template/deployment.yaml)
14+
15+
### Configuration parameters
16+
17+
| Parameter | Description |
18+
| --------- | ----------- |
19+
| /arduino/s3-importer/iot/api-key | IoT API key |
20+
| /arduino/s3-importer/iot/api-secret | IoT API secret |
21+
| /arduino/s3-importer/iot/org-id | (optional) organization id |
22+
| /arduino/s3-importer/iot/filter/tags | (optional) tags filtering. Syntax: tag=value,tag2=value2 |
23+
| /arduino/s3-importer/iot/samples-resolution-seconds | (optional) samples resolution (default: 300s) |
24+
| /arduino/s3-importer/destination-bucket | S3 destination bucket |
25+
26+
## Deployment via Cloud Formation Template
27+
28+
It is possible to deploy required resources via [cloud formation template](deployment/cloud-formation-template/deployment.yaml)
29+
Required steps to deploy project:
30+
* compile lambda
31+
```console
32+
foo@bar:~$ ./compile-lambda.sh
33+
arduino-s3-integration-lambda.zip archive created
34+
```
35+
* Save zip file on an S3 bucket accessible by the AWS account
36+
* Start creation of a new cloud formation stack provising the [cloud formation template](deployment/cloud-formation-template/deployment.yaml)
37+
* Fill all required parameters (mandatory: Arduino API key and secret, S3 bucket and key where code has been uploaded, destination S3 bucket. Optionally, tag filter for filtering things, organization identifier and samples resolution)

app/importer/importer.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// This file is part of arduino aws-s3-integration.
2+
//
3+
// Copyright 2024 ARDUINO SA (http://www.arduino.cc/)
4+
//
5+
// This software is released under the Mozilla Public License Version 2.0,
6+
// which covers the main part of aws-s3-integration.
7+
// The terms of this license can be found at:
8+
// https://www.mozilla.org/media/MPL/2.0/index.815ca599c9df.txt
9+
//
10+
// You can be released from the requirements of the above licenses by purchasing
11+
// a commercial license. Buying such a license is mandatory if you want to
12+
// modify or otherwise use the software for commercial activities involving the
13+
// Arduino software without disclosing the source code of your own applications.
14+
// To purchase a commercial license, send an email to [email protected].
15+
16+
package importer
17+
18+
import (
19+
"context"
20+
21+
"github.com/arduino/aws-s3-integration/business/tsextractor"
22+
"github.com/arduino/aws-s3-integration/internal/iot"
23+
"github.com/arduino/aws-s3-integration/internal/utils"
24+
iotclient "github.com/arduino/iot-client-go/v2"
25+
"github.com/sirupsen/logrus"
26+
)
27+
28+
func StartImport(ctx context.Context, logger *logrus.Entry, key, secret, orgid string, tagsF *string, resolution, timeWindowMinutes int, destinationS3Bucket string) error {
29+
30+
// Init client
31+
iotcl, err := iot.NewClient(key, secret, orgid)
32+
if err != nil {
33+
return err
34+
}
35+
36+
if tagsF == nil {
37+
logger.Infoln("Things - searching with no filter")
38+
} else {
39+
logger.Infoln("Things - searching by tags: ", *tagsF)
40+
}
41+
things, err := iotcl.ThingList(ctx, nil, nil, true, utils.ParseTags(tagsF))
42+
if err != nil {
43+
return err
44+
}
45+
thingsMap := make(map[string]iotclient.ArduinoThing, len(things))
46+
for _, thing := range things {
47+
logger.Infoln(" Thing: ", thing.Id, thing.Name)
48+
thingsMap[thing.Id] = thing
49+
}
50+
51+
// Extract data points from thing and push to S3
52+
tsextractorClient := tsextractor.New(iotcl, logger)
53+
if err := tsextractorClient.ExportTSToS3(ctx, timeWindowMinutes, thingsMap, resolution, destinationS3Bucket); err != nil {
54+
logger.Error("Error aligning time series samples: ", err)
55+
}
56+
57+
return nil
58+
}

business/tsextractor/tsextractor.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// This file is part of arduino aws-s3-integration.
2+
//
3+
// Copyright 2024 ARDUINO SA (http://www.arduino.cc/)
4+
//
5+
// This software is released under the Mozilla Public License Version 2.0,
6+
// which covers the main part of aws-s3-integration.
7+
// The terms of this license can be found at:
8+
// https://www.mozilla.org/media/MPL/2.0/index.815ca599c9df.txt
9+
//
10+
// You can be released from the requirements of the above licenses by purchasing
11+
// a commercial license. Buying such a license is mandatory if you want to
12+
// modify or otherwise use the software for commercial activities involving the
13+
// Arduino software without disclosing the source code of your own applications.
14+
// To purchase a commercial license, send an email to [email protected].
15+
16+
package tsextractor
17+
18+
import (
19+
"context"
20+
"fmt"
21+
"strconv"
22+
"strings"
23+
"sync"
24+
"time"
25+
26+
"github.com/arduino/aws-s3-integration/internal/csv"
27+
"github.com/arduino/aws-s3-integration/internal/iot"
28+
"github.com/arduino/aws-s3-integration/internal/s3"
29+
iotclient "github.com/arduino/iot-client-go/v2"
30+
"github.com/sirupsen/logrus"
31+
)
32+
33+
const importConcurrency = 10
34+
35+
type TsExtractor struct {
36+
iotcl *iot.Client
37+
logger *logrus.Entry
38+
}
39+
40+
func New(iotcl *iot.Client, logger *logrus.Entry) *TsExtractor {
41+
return &TsExtractor{iotcl: iotcl, logger: logger}
42+
}
43+
44+
func (a *TsExtractor) ExportTSToS3(
45+
ctx context.Context,
46+
timeWindowInMinutes int,
47+
thingsMap map[string]iotclient.ArduinoThing,
48+
resolution int,
49+
destinationS3Bucket string) error {
50+
51+
to := time.Now().Truncate(time.Hour).UTC()
52+
from := to.Add(-time.Duration(timeWindowInMinutes) * time.Minute)
53+
54+
// Open s3 output writer
55+
s3cl, err := s3.NewS3Client(destinationS3Bucket)
56+
if err != nil {
57+
return err
58+
}
59+
60+
// Open csv output writer
61+
writer, err := csv.NewWriter(from, a.logger)
62+
if err != nil {
63+
return err
64+
}
65+
66+
var wg sync.WaitGroup
67+
tokens := make(chan struct{}, importConcurrency)
68+
69+
a.logger.Infoln("=====> Export perf data - time window: ", timeWindowInMinutes, " minutes")
70+
for thingID, thing := range thingsMap {
71+
72+
if thing.Properties == nil || len(thing.Properties) == 0 {
73+
a.logger.Warn("Skipping thing with no properties: ", thingID)
74+
continue
75+
}
76+
77+
wg.Add(1)
78+
tokens <- struct{}{}
79+
80+
go func(thingID string, thing iotclient.ArduinoThing, writer *csv.CsvWriter) {
81+
defer func() { <-tokens }()
82+
defer wg.Done()
83+
84+
err := a.populateTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
85+
if err != nil {
86+
a.logger.Error("Error populating time series data: ", err)
87+
return
88+
}
89+
}(thingID, thing, writer)
90+
}
91+
92+
// Wait for all routines termination
93+
wg.Wait()
94+
95+
// Close csv output writer and upload to s3
96+
writer.Close()
97+
defer writer.Delete()
98+
99+
destinationKey := fmt.Sprintf("%s/%s.csv", from.Format("2006-01-02"), from.Format("2006-01-02-15"))
100+
if err := s3cl.WriteFile(ctx, destinationKey, writer.GetFilePath()); err != nil {
101+
return err
102+
}
103+
104+
return nil
105+
}
106+
107+
func (a *TsExtractor) populateTSDataIntoS3(
108+
ctx context.Context,
109+
from time.Time,
110+
to time.Time,
111+
thingID string,
112+
thing iotclient.ArduinoThing,
113+
resolution int,
114+
writer *csv.CsvWriter) error {
115+
116+
var batched *iotclient.ArduinoSeriesBatch
117+
var err error
118+
var retry bool
119+
for i := 0; i < 3; i++ {
120+
batched, retry, err = a.iotcl.GetTimeSeriesByThing(ctx, thingID, from, to, int64(resolution))
121+
if !retry {
122+
break
123+
} else {
124+
// This is due to a rate limit on the IoT API, we need to wait a bit before retrying
125+
a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
126+
time.Sleep(1 * time.Second)
127+
}
128+
}
129+
if err != nil {
130+
return err
131+
}
132+
133+
sampleCount := int64(0)
134+
samples := [][]string{}
135+
for _, response := range batched.Responses {
136+
if response.CountValues == 0 {
137+
continue
138+
}
139+
140+
propertyID := strings.Replace(response.Query, "property.", "", 1)
141+
a.logger.Debugf("Thing %s - Property %s - %d values\n", thingID, propertyID, response.CountValues)
142+
sampleCount += response.CountValues
143+
144+
propertyName := extractPropertyName(thing, propertyID)
145+
146+
for i := 0; i < len(response.Times); i++ {
147+
148+
ts := response.Times[i]
149+
value := response.Values[i]
150+
row := make([]string, 6)
151+
row[0] = ts.UTC().Format(time.RFC3339)
152+
row[1] = thingID
153+
row[2] = thing.Name
154+
row[3] = propertyID
155+
row[4] = propertyName
156+
row[5] = strconv.FormatFloat(value, 'f', 3, 64)
157+
158+
samples = append(samples, row)
159+
}
160+
}
161+
162+
// Write samples to csv ouput file
163+
if len(samples) > 0 {
164+
if err := writer.Write(samples); err != nil {
165+
return err
166+
}
167+
a.logger.Debugf("Thing %s [%s] saved %d values\n", thingID, thing.Name, sampleCount)
168+
}
169+
170+
return nil
171+
}
172+
173+
func extractPropertyName(thing iotclient.ArduinoThing, propertyID string) string {
174+
propertyName := ""
175+
for _, prop := range thing.Properties {
176+
if prop.Id == propertyID {
177+
propertyName = prop.Name
178+
break
179+
}
180+
}
181+
return propertyName
182+
}

compile-lambda.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
3+
GOOS=linux CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc lambda.go
4+
zip arduino-s3-integration-lambda.zip bootstrap
5+
rm bootstrap
6+
echo "arduino-s3-integration-lambda.zip archive created"

0 commit comments

Comments
 (0)