Skip to content

Configurable aggregation statistic and file compression #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ By default, all Arduino things present in the account are exported: it is possib

CSV produced has the following structure:
```console
timestamp,thing_id,thing_name,property_id,property_name,property_type,value
2024-09-04T11:00:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,3
2024-09-04T11:01:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,7
2024-09-04T11:02:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,11
2024-09-04T11:03:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,15
timestamp,thing_id,thing_name,property_id,property_name,property_type,value,aggregation_statistic
2024-09-04T11:00:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,3,AVG
2024-09-04T11:01:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,7,AVG
2024-09-04T11:02:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,11,AVG
2024-09-04T11:03:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,15,AVG
```

Files are organized by date and files of the same day are grouped.
Expand Down Expand Up @@ -76,6 +76,9 @@ These parameters are filled by CFT at stack creation time and can be adjusted la
| /arduino/s3-exporter/{stack-name}/iot/samples-resolution | (optional) samples aggregation resolution (1/5/15 minutes, 1 hour, raw) |
| /arduino/s3-exporter/{stack-name}/destination-bucket | S3 destination bucket |
| /arduino/s3-exporter/{stack-name}/iot/scheduling | Execution scheduling |
| /arduino/s3-exporter/{stack-name}/iot/aggregation-statistic | Aggregation statistic |

It is possible to compress (with gzip) files before uploading to S3. To enable compression, add ENABLE_COMPRESSION env variable to lambda configuration (with value true/false).

### Tag filtering

Expand Down
40 changes: 37 additions & 3 deletions app/importer/importer.go → app/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,22 @@
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to [email protected].

package importer
package exporter

import (
"context"
"fmt"
"os"

"github.com/arduino/aws-s3-integration/business/tsextractor"
"github.com/arduino/aws-s3-integration/internal/iot"
"github.com/arduino/aws-s3-integration/internal/s3"
"github.com/arduino/aws-s3-integration/internal/utils"
iotclient "github.com/arduino/iot-client-go/v2"
"github.com/sirupsen/logrus"
)

func StartImport(ctx context.Context, logger *logrus.Entry, key, secret, orgid string, tagsF *string, resolution, timeWindowMinutes int, destinationS3Bucket string) error {
func StartExporter(ctx context.Context, logger *logrus.Entry, key, secret, orgid string, tagsF *string, resolution, timeWindowMinutes int, destinationS3Bucket string, aggregationStat string, compress bool) error {

// Init client
iotcl, err := iot.NewClient(key, secret, orgid)
Expand All @@ -50,8 +53,39 @@ func StartImport(ctx context.Context, logger *logrus.Entry, key, secret, orgid s

// Extract data points from thing and push to S3
tsextractorClient := tsextractor.New(iotcl, logger)
if err := tsextractorClient.ExportTSToS3(ctx, timeWindowMinutes, thingsMap, resolution, destinationS3Bucket); err != nil {

// Open s3 output writer
s3cl, err := s3.NewS3Client(destinationS3Bucket)
if err != nil {
return err
}

if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat); err != nil {
logger.Error("Error aligning time series samples: ", err)
return err
} else {
writer.Close()
defer writer.Delete()

fileToUpload := writer.GetFilePath()
destinationKeyFormat := "%s/%s.csv"
if compress {
logger.Infof("Compressing file: %s\n", fileToUpload)
compressedFile, err := utils.GzipFileCompression(fileToUpload)
if err != nil {
return err
}
fileToUpload = compressedFile
logger.Infof("Generated compressed file: %s\n", fileToUpload)
destinationKeyFormat = "%s/%s.csv.gz"
defer func(f string) { os.Remove(f) }(fileToUpload)
}

destinationKey := fmt.Sprintf(destinationKeyFormat, from.Format("2006-01-02"), from.Format("2006-01-02-15-04"))
logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey)
if err := s3cl.WriteFile(ctx, destinationKey, fileToUpload); err != nil {
return err
}
}

return nil
Expand Down
83 changes: 50 additions & 33 deletions business/tsextractor/tsextractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package tsextractor

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
Expand All @@ -28,7 +29,6 @@ import (

"github.com/arduino/aws-s3-integration/internal/csv"
"github.com/arduino/aws-s3-integration/internal/iot"
"github.com/arduino/aws-s3-integration/internal/s3"
iotclient "github.com/arduino/iot-client-go/v2"
"github.com/sirupsen/logrus"
)
Expand All @@ -37,11 +37,11 @@ const importConcurrency = 10
const retryCount = 5

type TsExtractor struct {
iotcl *iot.Client
iotcl iot.API
logger *logrus.Entry
}

func New(iotcl *iot.Client, logger *logrus.Entry) *TsExtractor {
func New(iotcl iot.API, logger *logrus.Entry) *TsExtractor {
return &TsExtractor{iotcl: iotcl, logger: logger}
}

Expand All @@ -51,36 +51,42 @@ func computeTimeAlignment(resolutionSeconds, timeWindowInMinutes int) (time.Time
resolutionSeconds = 300 // Align to 5 minutes
}
to := time.Now().Truncate(time.Duration(resolutionSeconds) * time.Second).UTC()
if resolutionSeconds <= 900 {
// Shift time window to avoid missing data
to = to.Add(-time.Duration(300) * time.Second)
}
from := to.Add(-time.Duration(timeWindowInMinutes) * time.Minute)
return from, to
}

func (a *TsExtractor) ExportTSToS3(
func isRawResolution(resolution int) bool {
return resolution <= 0
}

func (a *TsExtractor) ExportTSToFile(
ctx context.Context,
timeWindowInMinutes int,
thingsMap map[string]iotclient.ArduinoThing,
resolution int,
destinationS3Bucket string) error {
aggregationStat string) (*csv.CsvWriter, time.Time, error) {

// Truncate time to given resolution
from, to := computeTimeAlignment(resolution, timeWindowInMinutes)

// Open s3 output writer
s3cl, err := s3.NewS3Client(destinationS3Bucket)
if err != nil {
return err
}

// Open csv output writer
writer, err := csv.NewWriter(from, a.logger)
writer, err := csv.NewWriter(from, a.logger, isRawResolution(resolution))
if err != nil {
return err
return nil, from, err
}

var wg sync.WaitGroup
tokens := make(chan struct{}, importConcurrency)

a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to)
if isRawResolution(resolution) {
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: raw")
} else {
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: ", aggregationStat)
}
for thingID, thing := range thingsMap {

if thing.Properties == nil || len(thing.Properties) == 0 {
Expand All @@ -95,7 +101,7 @@ func (a *TsExtractor) ExportTSToS3(
defer func() { <-tokens }()
defer wg.Done()

if resolution <= 0 {
if isRawResolution(resolution) {
// Populate raw time series data
err := a.populateRawTSDataIntoS3(ctx, from, to, thingID, thing, writer)
if err != nil {
Expand All @@ -104,7 +110,7 @@ func (a *TsExtractor) ExportTSToS3(
}
} else {
// Populate numeric time series data
err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, aggregationStat, writer)
if err != nil {
a.logger.Error("Error populating time series data: ", err)
return
Expand All @@ -124,17 +130,7 @@ func (a *TsExtractor) ExportTSToS3(
a.logger.Infoln("Waiting for all data extraction jobs to terminate...")
wg.Wait()

// Close csv output writer and upload to s3
writer.Close()
defer writer.Delete()

destinationKey := fmt.Sprintf("%s/%s.csv", from.Format("2006-01-02"), from.Format("2006-01-02-15-04"))
a.logger.Infof("Uploading file %s to bucket %s\n", destinationKey, s3cl.DestinationBucket())
if err := s3cl.WriteFile(ctx, destinationKey, writer.GetFilePath()); err != nil {
return err
}

return nil
return writer, from, nil
}

func randomRateLimitingSleep() {
Expand All @@ -155,6 +151,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
thingID string,
thing iotclient.ArduinoThing,
resolution int,
aggregationStat string,
writer *csv.CsvWriter) error {

if resolution <= 60 {
Expand All @@ -165,7 +162,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
var err error
var retry bool
for i := 0; i < retryCount; i++ {
batched, retry, err = a.iotcl.GetTimeSeriesByThing(ctx, thingID, from, to, int64(resolution))
batched, retry, err = a.iotcl.GetTimeSeriesByThing(ctx, thingID, from, to, int64(resolution), aggregationStat)
if !retry {
break
} else {
Expand Down Expand Up @@ -195,7 +192,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(

ts := response.Times[i]
value := response.Values[i]
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64)))
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64), aggregationStat))
}
}

Expand All @@ -210,7 +207,20 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
return nil
}

func composeRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string) []string {
func composeRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string, aggregation string) []string {
row := make([]string, 8)
row[0] = ts.UTC().Format(time.RFC3339)
row[1] = thingID
row[2] = thingName
row[3] = propertyID
row[4] = propertyName
row[5] = propertyType
row[6] = value
row[7] = aggregation
return row
}

func composeRawRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string) []string {
row := make([]string, 7)
row[0] = ts.UTC().Format(time.RFC3339)
row[1] = thingID
Expand Down Expand Up @@ -300,7 +310,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
if value == nil {
continue
}
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, interfaceToString(value)))
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), ""))
}
}

Expand Down Expand Up @@ -360,7 +370,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
if value == nil {
continue
}
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, interfaceToString(value)))
samples = append(samples, composeRawRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value)))
}
}

Expand All @@ -375,7 +385,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
return nil
}

func interfaceToString(value interface{}) string {
func (a *TsExtractor) interfaceToString(value interface{}) string {
switch v := value.(type) {
case string:
return v
Expand All @@ -385,6 +395,13 @@ func interfaceToString(value interface{}) string {
return strconv.FormatFloat(v, 'f', -1, 64)
case bool:
return strconv.FormatBool(v)
case map[string]any:
encoded, err := json.Marshal(v)
if err != nil {
a.logger.Error("Error encoding map to json: ", err)
return fmt.Sprintf("%v", v)
}
return string(encoded)
default:
return fmt.Sprintf("%v", v)
}
Expand Down
79 changes: 79 additions & 0 deletions business/tsextractor/tsextractor_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package tsextractor

import (
"context"
"fmt"
"io"
"os"
"testing"
"time"

iotMocks "github.com/arduino/aws-s3-integration/internal/iot/mocks"
iotclient "github.com/arduino/iot-client-go/v2"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestTimeAlignment_HourlyTimeWindows(t *testing.T) {
Expand Down Expand Up @@ -35,3 +44,73 @@ func TestTimeAlignment_raw_HourlyTimeWindows(t *testing.T) {
from, to := computeTimeAlignment(-1, 60)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
}

func toPtr(val string) *string {
return &val
}

func TestExtractionFlow_defaultAggregation(t *testing.T) {
logger := logrus.NewEntry(logrus.New())
ctx := context.Background()

thingId := "91f30213-2bd7-480a-b1dc-f31b01840e7e"
propertyId := "c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac"

// Init client
iotcl := iotMocks.NewAPI(t)

now := time.Now()
responses := []iotclient.ArduinoSeriesResponse{
{
Aggregation: toPtr("AVG"),
Query: fmt.Sprintf("property.%s", propertyId),
Times: []time.Time{now.Add(-time.Minute * 1), now},
Values: []float64{1.0, 2.0},
CountValues: 2,
},
}
samples := iotclient.ArduinoSeriesBatch{
Responses: responses,
}
iotcl.On("GetTimeSeriesByThing", ctx, thingId, mock.Anything, mock.Anything, int64(300), "AVG").Return(&samples, false, nil)

tsextractorClient := New(iotcl, logger)

propCount := int64(1)
thingsMap := make(map[string]iotclient.ArduinoThing)
thingsMap[thingId] = iotclient.ArduinoThing{
Id: thingId,
Name: "test",
Properties: []iotclient.ArduinoProperty{
{
Name: "ptest",
Id: propertyId,
Type: "FLOAT",
},
},
PropertiesCount: &propCount,
}

writer, from, err := tsextractorClient.ExportTSToFile(ctx, 60, thingsMap, 300, "AVG")
assert.NoError(t, err)
assert.NotNil(t, writer)
assert.NotNil(t, from)

writer.Close()
defer writer.Delete()

outF, err := os.Open(writer.GetFilePath())
assert.NoError(t, err)
defer outF.Close()
content, err := io.ReadAll(outF)
assert.NoError(t, err)
entries := []string{
"timestamp,thing_id,thing_name,property_id,property_name,property_type,value",
"91f30213-2bd7-480a-b1dc-f31b01840e7e,test,c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac,ptest,FLOAT,1,AVG",
"91f30213-2bd7-480a-b1dc-f31b01840e7e,test,c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac,ptest,FLOAT,2,AVG",
}
for _, entry := range entries {
assert.Contains(t, string(content), entry)
}
fmt.Println(string(content))
}
Loading
Loading