Skip to content

Commit 7a2716c

Browse files
authored
Configurable aggregation statistic and file compression (#10)
1 parent 9201a3e commit 7a2716c

File tree

15 files changed

+526
-195
lines changed

15 files changed

+526
-195
lines changed

README.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ By default, all Arduino things present in the account are exported: it is possib
1515

1616
CSV produced has the following structure:
1717
```console
18-
timestamp,thing_id,thing_name,property_id,property_name,property_type,value
19-
2024-09-04T11:00:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,3
20-
2024-09-04T11:01:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,7
21-
2024-09-04T11:02:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,11
22-
2024-09-04T11:03:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,15
18+
timestamp,thing_id,thing_name,property_id,property_name,property_type,value,aggregation_statistic
19+
2024-09-04T11:00:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,3,AVG
20+
2024-09-04T11:01:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,7,AVG
21+
2024-09-04T11:02:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,11,AVG
22+
2024-09-04T11:03:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,15,AVG
2323
```
2424

2525
Files are organized by date and files of the same day are grouped.
@@ -76,6 +76,9 @@ These parameters are filled by CFT at stack creation time and can be adjusted la
7676
| /arduino/s3-exporter/{stack-name}/iot/samples-resolution | (optional) samples aggregation resolution (1/5/15 minutes, 1 hour, raw) |
7777
| /arduino/s3-exporter/{stack-name}/destination-bucket | S3 destination bucket |
7878
| /arduino/s3-exporter/{stack-name}/iot/scheduling | Execution scheduling |
79+
| /arduino/s3-exporter/{stack-name}/iot/aggregation-statistic | Aggregation statistic |
80+
81+
It is possible to compress (with gzip) files before uploading to S3. To enable compression, add ENABLE_COMPRESSION env variable to lambda configuration (with value true/false).
7982

8083
### Tag filtering
8184

app/importer/importer.go renamed to app/exporter/exporter.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,22 @@
1313
// Arduino software without disclosing the source code of your own applications.
1414
// To purchase a commercial license, send an email to [email protected].
1515

16-
package importer
16+
package exporter
1717

1818
import (
1919
"context"
20+
"fmt"
21+
"os"
2022

2123
"github.com/arduino/aws-s3-integration/business/tsextractor"
2224
"github.com/arduino/aws-s3-integration/internal/iot"
25+
"github.com/arduino/aws-s3-integration/internal/s3"
2326
"github.com/arduino/aws-s3-integration/internal/utils"
2427
iotclient "github.com/arduino/iot-client-go/v2"
2528
"github.com/sirupsen/logrus"
2629
)
2730

28-
func StartImport(ctx context.Context, logger *logrus.Entry, key, secret, orgid string, tagsF *string, resolution, timeWindowMinutes int, destinationS3Bucket string) error {
31+
func StartExporter(ctx context.Context, logger *logrus.Entry, key, secret, orgid string, tagsF *string, resolution, timeWindowMinutes int, destinationS3Bucket string, aggregationStat string, compress bool) error {
2932

3033
// Init client
3134
iotcl, err := iot.NewClient(key, secret, orgid)
@@ -50,8 +53,39 @@ func StartImport(ctx context.Context, logger *logrus.Entry, key, secret, orgid s
5053

5154
// Extract data points from thing and push to S3
5255
tsextractorClient := tsextractor.New(iotcl, logger)
53-
if err := tsextractorClient.ExportTSToS3(ctx, timeWindowMinutes, thingsMap, resolution, destinationS3Bucket); err != nil {
56+
57+
// Open s3 output writer
58+
s3cl, err := s3.NewS3Client(destinationS3Bucket)
59+
if err != nil {
60+
return err
61+
}
62+
63+
if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat); err != nil {
5464
logger.Error("Error aligning time series samples: ", err)
65+
return err
66+
} else {
67+
writer.Close()
68+
defer writer.Delete()
69+
70+
fileToUpload := writer.GetFilePath()
71+
destinationKeyFormat := "%s/%s.csv"
72+
if compress {
73+
logger.Infof("Compressing file: %s\n", fileToUpload)
74+
compressedFile, err := utils.GzipFileCompression(fileToUpload)
75+
if err != nil {
76+
return err
77+
}
78+
fileToUpload = compressedFile
79+
logger.Infof("Generated compressed file: %s\n", fileToUpload)
80+
destinationKeyFormat = "%s/%s.csv.gz"
81+
defer func(f string) { os.Remove(f) }(fileToUpload)
82+
}
83+
84+
destinationKey := fmt.Sprintf(destinationKeyFormat, from.Format("2006-01-02"), from.Format("2006-01-02-15-04"))
85+
logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey)
86+
if err := s3cl.WriteFile(ctx, destinationKey, fileToUpload); err != nil {
87+
return err
88+
}
5589
}
5690

5791
return nil

business/tsextractor/tsextractor.go

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package tsextractor
1717

1818
import (
1919
"context"
20+
"encoding/json"
2021
"fmt"
2122
"strconv"
2223
"strings"
@@ -28,7 +29,6 @@ import (
2829

2930
"github.com/arduino/aws-s3-integration/internal/csv"
3031
"github.com/arduino/aws-s3-integration/internal/iot"
31-
"github.com/arduino/aws-s3-integration/internal/s3"
3232
iotclient "github.com/arduino/iot-client-go/v2"
3333
"github.com/sirupsen/logrus"
3434
)
@@ -37,11 +37,11 @@ const importConcurrency = 10
3737
const retryCount = 5
3838

3939
type TsExtractor struct {
40-
iotcl *iot.Client
40+
iotcl iot.API
4141
logger *logrus.Entry
4242
}
4343

44-
func New(iotcl *iot.Client, logger *logrus.Entry) *TsExtractor {
44+
func New(iotcl iot.API, logger *logrus.Entry) *TsExtractor {
4545
return &TsExtractor{iotcl: iotcl, logger: logger}
4646
}
4747

@@ -51,36 +51,42 @@ func computeTimeAlignment(resolutionSeconds, timeWindowInMinutes int) (time.Time
5151
resolutionSeconds = 300 // Align to 5 minutes
5252
}
5353
to := time.Now().Truncate(time.Duration(resolutionSeconds) * time.Second).UTC()
54+
if resolutionSeconds <= 900 {
55+
// Shift time window to avoid missing data
56+
to = to.Add(-time.Duration(300) * time.Second)
57+
}
5458
from := to.Add(-time.Duration(timeWindowInMinutes) * time.Minute)
5559
return from, to
5660
}
5761

58-
func (a *TsExtractor) ExportTSToS3(
62+
func isRawResolution(resolution int) bool {
63+
return resolution <= 0
64+
}
65+
66+
func (a *TsExtractor) ExportTSToFile(
5967
ctx context.Context,
6068
timeWindowInMinutes int,
6169
thingsMap map[string]iotclient.ArduinoThing,
6270
resolution int,
63-
destinationS3Bucket string) error {
71+
aggregationStat string) (*csv.CsvWriter, time.Time, error) {
6472

6573
// Truncate time to given resolution
6674
from, to := computeTimeAlignment(resolution, timeWindowInMinutes)
6775

68-
// Open s3 output writer
69-
s3cl, err := s3.NewS3Client(destinationS3Bucket)
70-
if err != nil {
71-
return err
72-
}
73-
7476
// Open csv output writer
75-
writer, err := csv.NewWriter(from, a.logger)
77+
writer, err := csv.NewWriter(from, a.logger, isRawResolution(resolution))
7678
if err != nil {
77-
return err
79+
return nil, from, err
7880
}
7981

8082
var wg sync.WaitGroup
8183
tokens := make(chan struct{}, importConcurrency)
8284

83-
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to)
85+
if isRawResolution(resolution) {
86+
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: raw")
87+
} else {
88+
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: ", aggregationStat)
89+
}
8490
for thingID, thing := range thingsMap {
8591

8692
if thing.Properties == nil || len(thing.Properties) == 0 {
@@ -95,7 +101,7 @@ func (a *TsExtractor) ExportTSToS3(
95101
defer func() { <-tokens }()
96102
defer wg.Done()
97103

98-
if resolution <= 0 {
104+
if isRawResolution(resolution) {
99105
// Populate raw time series data
100106
err := a.populateRawTSDataIntoS3(ctx, from, to, thingID, thing, writer)
101107
if err != nil {
@@ -104,7 +110,7 @@ func (a *TsExtractor) ExportTSToS3(
104110
}
105111
} else {
106112
// Populate numeric time series data
107-
err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
113+
err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, aggregationStat, writer)
108114
if err != nil {
109115
a.logger.Error("Error populating time series data: ", err)
110116
return
@@ -124,17 +130,7 @@ func (a *TsExtractor) ExportTSToS3(
124130
a.logger.Infoln("Waiting for all data extraction jobs to terminate...")
125131
wg.Wait()
126132

127-
// Close csv output writer and upload to s3
128-
writer.Close()
129-
defer writer.Delete()
130-
131-
destinationKey := fmt.Sprintf("%s/%s.csv", from.Format("2006-01-02"), from.Format("2006-01-02-15-04"))
132-
a.logger.Infof("Uploading file %s to bucket %s\n", destinationKey, s3cl.DestinationBucket())
133-
if err := s3cl.WriteFile(ctx, destinationKey, writer.GetFilePath()); err != nil {
134-
return err
135-
}
136-
137-
return nil
133+
return writer, from, nil
138134
}
139135

140136
func randomRateLimitingSleep() {
@@ -155,6 +151,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
155151
thingID string,
156152
thing iotclient.ArduinoThing,
157153
resolution int,
154+
aggregationStat string,
158155
writer *csv.CsvWriter) error {
159156

160157
if resolution <= 60 {
@@ -165,7 +162,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
165162
var err error
166163
var retry bool
167164
for i := 0; i < retryCount; i++ {
168-
batched, retry, err = a.iotcl.GetTimeSeriesByThing(ctx, thingID, from, to, int64(resolution))
165+
batched, retry, err = a.iotcl.GetTimeSeriesByThing(ctx, thingID, from, to, int64(resolution), aggregationStat)
169166
if !retry {
170167
break
171168
} else {
@@ -195,7 +192,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
195192

196193
ts := response.Times[i]
197194
value := response.Values[i]
198-
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64)))
195+
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64), aggregationStat))
199196
}
200197
}
201198

@@ -210,7 +207,20 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
210207
return nil
211208
}
212209

213-
func composeRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string) []string {
210+
func composeRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string, aggregation string) []string {
211+
row := make([]string, 8)
212+
row[0] = ts.UTC().Format(time.RFC3339)
213+
row[1] = thingID
214+
row[2] = thingName
215+
row[3] = propertyID
216+
row[4] = propertyName
217+
row[5] = propertyType
218+
row[6] = value
219+
row[7] = aggregation
220+
return row
221+
}
222+
223+
func composeRawRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string) []string {
214224
row := make([]string, 7)
215225
row[0] = ts.UTC().Format(time.RFC3339)
216226
row[1] = thingID
@@ -300,7 +310,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
300310
if value == nil {
301311
continue
302312
}
303-
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, interfaceToString(value)))
313+
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), ""))
304314
}
305315
}
306316

@@ -360,7 +370,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
360370
if value == nil {
361371
continue
362372
}
363-
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, interfaceToString(value)))
373+
samples = append(samples, composeRawRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value)))
364374
}
365375
}
366376

@@ -375,7 +385,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
375385
return nil
376386
}
377387

378-
func interfaceToString(value interface{}) string {
388+
func (a *TsExtractor) interfaceToString(value interface{}) string {
379389
switch v := value.(type) {
380390
case string:
381391
return v
@@ -385,6 +395,13 @@ func interfaceToString(value interface{}) string {
385395
return strconv.FormatFloat(v, 'f', -1, 64)
386396
case bool:
387397
return strconv.FormatBool(v)
398+
case map[string]any:
399+
encoded, err := json.Marshal(v)
400+
if err != nil {
401+
a.logger.Error("Error encoding map to json: ", err)
402+
return fmt.Sprintf("%v", v)
403+
}
404+
return string(encoded)
388405
default:
389406
return fmt.Sprintf("%v", v)
390407
}

business/tsextractor/tsextractor_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
package tsextractor
22

33
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"os"
48
"testing"
9+
"time"
510

11+
iotMocks "github.com/arduino/aws-s3-integration/internal/iot/mocks"
12+
iotclient "github.com/arduino/iot-client-go/v2"
13+
"github.com/sirupsen/logrus"
614
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/mock"
716
)
817

918
func TestTimeAlignment_HourlyTimeWindows(t *testing.T) {
@@ -35,3 +44,73 @@ func TestTimeAlignment_raw_HourlyTimeWindows(t *testing.T) {
3544
from, to := computeTimeAlignment(-1, 60)
3645
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
3746
}
47+
48+
func toPtr(val string) *string {
49+
return &val
50+
}
51+
52+
func TestExtractionFlow_defaultAggregation(t *testing.T) {
53+
logger := logrus.NewEntry(logrus.New())
54+
ctx := context.Background()
55+
56+
thingId := "91f30213-2bd7-480a-b1dc-f31b01840e7e"
57+
propertyId := "c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac"
58+
59+
// Init client
60+
iotcl := iotMocks.NewAPI(t)
61+
62+
now := time.Now()
63+
responses := []iotclient.ArduinoSeriesResponse{
64+
{
65+
Aggregation: toPtr("AVG"),
66+
Query: fmt.Sprintf("property.%s", propertyId),
67+
Times: []time.Time{now.Add(-time.Minute * 1), now},
68+
Values: []float64{1.0, 2.0},
69+
CountValues: 2,
70+
},
71+
}
72+
samples := iotclient.ArduinoSeriesBatch{
73+
Responses: responses,
74+
}
75+
iotcl.On("GetTimeSeriesByThing", ctx, thingId, mock.Anything, mock.Anything, int64(300), "AVG").Return(&samples, false, nil)
76+
77+
tsextractorClient := New(iotcl, logger)
78+
79+
propCount := int64(1)
80+
thingsMap := make(map[string]iotclient.ArduinoThing)
81+
thingsMap[thingId] = iotclient.ArduinoThing{
82+
Id: thingId,
83+
Name: "test",
84+
Properties: []iotclient.ArduinoProperty{
85+
{
86+
Name: "ptest",
87+
Id: propertyId,
88+
Type: "FLOAT",
89+
},
90+
},
91+
PropertiesCount: &propCount,
92+
}
93+
94+
writer, from, err := tsextractorClient.ExportTSToFile(ctx, 60, thingsMap, 300, "AVG")
95+
assert.NoError(t, err)
96+
assert.NotNil(t, writer)
97+
assert.NotNil(t, from)
98+
99+
writer.Close()
100+
defer writer.Delete()
101+
102+
outF, err := os.Open(writer.GetFilePath())
103+
assert.NoError(t, err)
104+
defer outF.Close()
105+
content, err := io.ReadAll(outF)
106+
assert.NoError(t, err)
107+
entries := []string{
108+
"timestamp,thing_id,thing_name,property_id,property_name,property_type,value",
109+
"91f30213-2bd7-480a-b1dc-f31b01840e7e,test,c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac,ptest,FLOAT,1,AVG",
110+
"91f30213-2bd7-480a-b1dc-f31b01840e7e,test,c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac,ptest,FLOAT,2,AVG",
111+
}
112+
for _, entry := range entries {
113+
assert.Contains(t, string(content), entry)
114+
}
115+
fmt.Println(string(content))
116+
}

0 commit comments

Comments
 (0)