Skip to content

Align extraction time to time window #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# Arduino AWS S3 CSV exporter

This project provides a way to extract time series samples from Arduino cloud, publishing to a S3 destination bucket.
Data are extracted at the given resolution via a scheduled Lambda function. Then samples are stored in CSV files and saved to S3.
Data are extracted at the given resolution via a scheduled Lambda function. Samples are stored in CSV files and saved to S3.
By default, data extraction is performed every hour (configurable), extracting samples aggregated at 5min resolution (configurable).
Aggregation is performed as average over aggregation period.
Non numeric values like strings are sampled at the given resolution.
Aggregation is performed as average over aggregation period. Non numeric values like strings are sampled at the given resolution.

## Architecture

S3 exporter is based on a Go lambda function triggered by periodic event from EventBridge.
Job is configured to extract samples for a 60min time window with the default resolution of 5min.
S3 exporter is based on a GO Lambda function triggered by periodic event from EventBridge.
Function is triggered at a fixed rate (by default, 1 hour), starting from the deployment time.
Rate also define the time extraction window. So, with a 1 hour scheduling, one hour of data are extracted.
One file is created per execution and contains all samples for selected things. Time series samples are exported at UTC timezone.
By default, all Arduino things present in the account are exported: it is possible to filter them via [tags](#tag-filtering).

Expand All @@ -29,6 +29,9 @@ Files are organized by date and files of the same day are grouped.
<bucket>:2024-09-04/2024-09-04-12-00.csv
```

Data extraction is aligned with function execution time.
It is possible to align data extracted with extraction time window (for example, export last complete hour) by configuring `/arduino/s3-exporter/{stack-name}/iot/align_with_time_window` property.

## Deployment via Cloud Formation Template

It is possible to deploy required resources via [cloud formation template](deployment/cloud-formation-template/deployment.yaml)
Expand Down Expand Up @@ -74,11 +77,11 @@ These parameters are filled by CFT at stack creation time and can be adjusted la
| /arduino/s3-exporter/{stack-name}/iot/org-id | (optional) organization id |
| /arduino/s3-exporter/{stack-name}/iot/filter/tags | (optional) tags filtering. Syntax: tag=value,tag2=value2 |
| /arduino/s3-exporter/{stack-name}/iot/samples-resolution | (optional) samples aggregation resolution (1/5/15 minutes, 1 hour, raw) |
| /arduino/s3-exporter/{stack-name}/destination-bucket | S3 destination bucket |
| /arduino/s3-exporter/{stack-name}/iot/scheduling | Execution scheduling |
| /arduino/s3-exporter/{stack-name}/iot/align_with_time_window | Align data extraction with time windows (for example, last complte hour) |
| /arduino/s3-exporter/{stack-name}/iot/aggregation-statistic | Aggregation statistic |

It is possible to compress (with gzip) files before uploading to S3. To enable compression, add ENABLE_COMPRESSION env variable to lambda configuration (with value true/false).
| /arduino/s3-exporter/{stack-name}/destination-bucket | S3 destination bucket |
| /arduino/s3-exporter/{stack-name}/enable_compression | Compress CSV files with gzip before uploading to S3 bucket |

### Tag filtering

Expand Down
23 changes: 18 additions & 5 deletions app/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,28 @@ import (
"github.com/sirupsen/logrus"
)

func StartExporter(ctx context.Context, logger *logrus.Entry, key, secret, orgid string, tagsF *string, resolution, timeWindowMinutes int, destinationS3Bucket string, aggregationStat string, compress bool) error {
func StartExporter(
ctx context.Context,
logger *logrus.Entry,
key, secret, orgid string,
tagsF *string,
resolution, timeWindowMinutes int,
destinationS3Bucket string,
aggregationStat string,
compress, enableAlignTimeWindow bool) error {

// Init client
iotcl, err := iot.NewClient(key, secret, orgid)
if err != nil {
return err
}

if tagsF == nil {
logger.Infoln("Things - searching with no filter")
if tagsF != nil {
logger.Infoln("Filtering things linked to configured account using tags: ", *tagsF)
} else {
logger.Infoln("Things - searching by tags: ", *tagsF)
logger.Infoln("Importing all things linked to configured account")
}

things, err := iotcl.ThingList(ctx, nil, nil, true, utils.ParseTags(tagsF))
if err != nil {
return err
Expand All @@ -60,7 +69,11 @@ func StartExporter(ctx context.Context, logger *logrus.Entry, key, secret, orgid
return err
}

if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat); err != nil {
if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, enableAlignTimeWindow); err != nil {
if writer != nil {
writer.Close()
defer writer.Delete()
}
logger.Error("Error aligning time series samples: ", err)
return err
} else {
Expand Down
33 changes: 28 additions & 5 deletions business/tsextractor/tsextractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package tsextractor
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -45,13 +46,18 @@ func New(iotcl iot.API, logger *logrus.Entry) *TsExtractor {
return &TsExtractor{iotcl: iotcl, logger: logger}
}

func computeTimeAlignment(resolutionSeconds, timeWindowInMinutes int) (time.Time, time.Time) {
func computeTimeAlignment(resolutionSeconds, timeWindowInMinutes int, enableAlignTimeWindow bool) (time.Time, time.Time) {
// Compute time alignment
if resolutionSeconds <= 60 {
resolutionSeconds = 300 // Align to 5 minutes
}
to := time.Now().Truncate(time.Duration(resolutionSeconds) * time.Second).UTC()
if resolutionSeconds <= 900 {

timeAlignmentSeconds := resolutionSeconds
if enableAlignTimeWindow {
timeAlignmentSeconds = timeWindowInMinutes * 60
}
to := time.Now().Truncate(time.Duration(timeAlignmentSeconds) * time.Second).UTC()
if !enableAlignTimeWindow && resolutionSeconds <= 900 {
// Shift time window to avoid missing data
to = to.Add(-time.Duration(300) * time.Second)
}
Expand All @@ -68,10 +74,11 @@ func (a *TsExtractor) ExportTSToFile(
timeWindowInMinutes int,
thingsMap map[string]iotclient.ArduinoThing,
resolution int,
aggregationStat string) (*csv.CsvWriter, time.Time, error) {
aggregationStat string,
enableAlignTimeWindow bool) (*csv.CsvWriter, time.Time, error) {

// Truncate time to given resolution
from, to := computeTimeAlignment(resolution, timeWindowInMinutes)
from, to := computeTimeAlignment(resolution, timeWindowInMinutes, enableAlignTimeWindow)

// Open csv output writer
writer, err := csv.NewWriter(from, a.logger, isRawResolution(resolution))
Expand All @@ -81,6 +88,7 @@ func (a *TsExtractor) ExportTSToFile(

var wg sync.WaitGroup
tokens := make(chan struct{}, importConcurrency)
errorChannel := make(chan error, len(thingsMap))

if isRawResolution(resolution) {
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: raw")
Expand All @@ -106,20 +114,23 @@ func (a *TsExtractor) ExportTSToFile(
err := a.populateRawTSDataIntoS3(ctx, from, to, thingID, thing, writer)
if err != nil {
a.logger.Error("Error populating raw time series data: ", err)
errorChannel <- err
return
}
} else {
// Populate numeric time series data
err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, aggregationStat, writer)
if err != nil {
a.logger.Error("Error populating time series data: ", err)
errorChannel <- err
return
}

// Populate string time series data, if any
err = a.populateStringTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
if err != nil {
a.logger.Error("Error populating string time series data: ", err)
errorChannel <- err
return
}
}
Expand All @@ -129,6 +140,18 @@ func (a *TsExtractor) ExportTSToFile(
// Wait for all routines termination
a.logger.Infoln("Waiting for all data extraction jobs to terminate...")
wg.Wait()
close(errorChannel)

// Check if there were errors
detectedErrors := false
for err := range errorChannel {
if err != nil {
a.logger.Error(err)
}
}
if detectedErrors {
return writer, from, errors.New("errors detected during data export")
}

return writer, from, nil
}
Expand Down
28 changes: 21 additions & 7 deletions business/tsextractor/tsextractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,46 @@ import (
)

func TestTimeAlignment_HourlyTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows
from, to := computeTimeAlignment(3600, 60)
// Test the time alignment with hourly time windows, not aligned
nowTruncated := time.Now().UTC().Truncate(time.Duration(300) * time.Second).Add(-time.Duration(300) * time.Second)
fromTuncated := nowTruncated.Add(-time.Hour)
from, to := computeTimeAlignment(300, 60, false)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
assert.Equal(t, nowTruncated, to)
assert.Equal(t, fromTuncated, from)
}

func TestTimeAlignment_HourlyTimeWindows_aligned(t *testing.T) {
// Test the time alignment with hourly time windows, complete last hour
nowTruncated := time.Now().UTC().Truncate(time.Hour)
fromTuncated := nowTruncated.Add(-time.Hour)
from, to := computeTimeAlignment(300, 60, true)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
assert.Equal(t, nowTruncated, to)
assert.Equal(t, fromTuncated, from)
}

func TestTimeAlignment_15minTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows
from, to := computeTimeAlignment(3600, 15)
from, to := computeTimeAlignment(3600, 15, false)
assert.Equal(t, int64(900), to.Unix()-from.Unix())
}

func TestTimeAlignment_15min_HourlyTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows and 15min resolution
from, to := computeTimeAlignment(900, 60)
from, to := computeTimeAlignment(900, 60, false)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
}

func TestTimeAlignment_5min_HourlyTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows and 5min resolution
from, to := computeTimeAlignment(300, 60)
from, to := computeTimeAlignment(300, 60, false)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
}

func TestTimeAlignment_raw_HourlyTimeWindows(t *testing.T) {
// Test the time alignment with hourly time windows and 5min resolution
from, to := computeTimeAlignment(-1, 60)
from, to := computeTimeAlignment(-1, 60, false)
assert.Equal(t, int64(3600), to.Unix()-from.Unix())
}

Expand Down Expand Up @@ -91,7 +105,7 @@ func TestExtractionFlow_defaultAggregation(t *testing.T) {
PropertiesCount: &propCount,
}

writer, from, err := tsextractorClient.ExportTSToFile(ctx, 60, thingsMap, 300, "AVG")
writer, from, err := tsextractorClient.ExportTSToFile(ctx, 60, thingsMap, 300, "AVG", false)
assert.NoError(t, err)
assert.NotNil(t, writer)
assert.NotNil(t, from)
Expand Down
16 changes: 16 additions & 0 deletions deployment/cloud-formation-template/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ Resources:
Ref: ResolutionAggregationStatistic
Tier: Standard

CompressionParameter:
Type: AWS::SSM::Parameter
Properties:
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/enable_compression
Type: String
Value: "false"
Tier: Standard

AlignExtractionParameter:
Type: AWS::SSM::Parameter
Properties:
Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/align_with_time_window
Type: String
Value: "false"
Tier: Standard

# EventBridge Rule to trigger Lambda every hour
EventBridgeRule:
Type: AWS::Events::Rule
Expand Down
32 changes: 22 additions & 10 deletions lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type AWSS3ImportTrigger struct {
const (
GlobalArduinoPrefix = "/arduino/s3-importer"

// Compatibility parameters for backward compatibility
// Parameters for backward compatibility
IoTApiKey = GlobalArduinoPrefix + "/iot/api-key"
IoTApiSecret = GlobalArduinoPrefix + "/iot/api-secret"
IoTApiOrgId = GlobalArduinoPrefix + "/iot/org-id"
Expand All @@ -53,6 +53,8 @@ const (
SchedulingStack = PerStackArduinoPrefix + "/iot/scheduling"
DestinationS3BucketStack = PerStackArduinoPrefix + "/destination-bucket"
AggregationStatStack = PerStackArduinoPrefix + "/iot/aggregation-statistic"
AlignWithTimeWindowStack = PerStackArduinoPrefix + "/iot/align_with_time_window"
EnableCompressionStack = PerStackArduinoPrefix + "/enable_compression"

SamplesResolutionSeconds = 300
DefaultTimeExtractionWindowMinutes = 60
Expand All @@ -61,8 +63,8 @@ const (
func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, error) {

logger := logrus.NewEntry(logrus.New())

stackName := os.Getenv("STACK_NAME")
compressFile := os.Getenv("ENABLE_COMPRESSION")

var apikey *string
var apiSecret *string
Expand All @@ -71,6 +73,8 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
var orgId *string
var err error
var aggregationStat *string
enabledCompression := false
enableAlignTimeWindow := false

logger.Infoln("------ Reading parameters from SSM")
paramReader, err := parameters.New()
Expand Down Expand Up @@ -98,6 +102,17 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
tags = tagsParam
}
aggregationStat, _ = paramReader.ReadConfigByStack(AggregationStatStack, stackName)

alignTs, _ := paramReader.ReadConfigByStack(AlignWithTimeWindowStack, stackName)
if alignTs != nil && *alignTs == "true" {
enableAlignTimeWindow = true
}

compression, _ := paramReader.ReadConfigByStack(EnableCompressionStack, stackName)
if compression != nil && *compression == "true" {
enabledCompression = true
}

} else {
apikey, err = paramReader.ReadConfig(IoTApiKey)
if err != nil {
Expand Down Expand Up @@ -148,11 +163,6 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
resolution = &defReso
}

enabledCompression := false
if compressFile == "true" {
enabledCompression = true
}

logger.Infoln("------ Running import")
if event.Dev || os.Getenv("DEV") == "true" {
logger.Infoln("Running in dev mode")
Expand All @@ -174,12 +184,14 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
logger.Infoln("resolution:", *resolution, "seconds")
}
logger.Infoln("aggregation statistic:", *aggregationStat)
logger.Infoln("data extraction time windows:", extractionWindowMinutes, "minutes")
logger.Infoln("data extraction time window:", *extractionWindowMinutes, "minutes")
logger.Infoln("file compression enabled:", enabledCompression)
logger.Infoln("align time window:", enableAlignTimeWindow)

err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat, enabledCompression)
err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat, enabledCompression, enableAlignTimeWindow)
if err != nil {
return nil, err
message := "Error detected during data export"
return &message, err
}

message := "Data exported successfully"
Expand Down
2 changes: 1 addition & 1 deletion resources/test/localexecution.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func HandleRequest(ctx context.Context, dev bool) (*string, error) {
logger.Infoln("tags:", *tags)
}

err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket, "MAX", true)
err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket, "MAX", true, true)
if err != nil {
return nil, err
}
Expand Down
Loading