diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..09320fc --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +vendor/ + +# IDEs and editors +.vscode +.idea +.env +.tool-versions + +cover.out diff --git a/README.md b/README.md index 8458fcf..a344eff 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,14 @@ -# Arduino AWS S3 exporter +# Arduino AWS S3 CSV exporter This project provides a way to extract time series samples from Arduino cloud, publishing to a S3 destination bucket. -Things can be filterd by tags. +Data are extracted at the given resolution via a scheduled Lambda function. Then samples are stored in CSV files and saved to S3. -## Deployment schema +## Architecture -S3 exporter is based on a Go lambda function triggered by periodic events from EventBridge. -Job is configured to extract samples for a 60min time window. -One file is created per run, containing all samples for the given hour. Time series samples are exported in UTC. +S3 exporter is based on a Go lambda function triggered by periodic event from EventBridge. +Job is configured to extract samples for a 60min time window with the default resolution of 5min. +One file is created per execution, containing all samples for selected things. Time series samples are exported at UTC timezone. +By default, all Arduino things present in the account are exported: it is possible to filter them via tags configuration. CSV produced has the following structure: ```console @@ -28,19 +29,40 @@ Files are organized in S3 bucket by date and files of the same day are grouped. ## Deployment via Cloud Formation Template It is possible to deploy required resources via [cloud formation template](deployment/cloud-formation-template/deployment.yaml) -Required steps to deploy project: -* compile lambda -```console -foo@bar:~$ ./compile-lambda.sh -arduino-s3-integration-lambda.zip archive created -``` -* Save zip file on an S3 bucket accessible by the AWS account -* Start creation of a new cloud formation stack provising the [cloud formation template](deployment/cloud-formation-template/deployment.yaml) -* Fill all required parameters (mandatory: Arduino API key and secret, S3 bucket and key where code has been uploaded, destination S3 bucket. Optionally, tag filter for filtering things, organization identifier and samples resolution) + +CFT deployment requires: +* an AWS account with rights for creating a new CFT stack. Account must have rights to create: + * S3 buckets + * IAM Roles + * Lambda functions + * EventBridge rules + * SSM parameters (Parameter store) + +Before stack creation, two S3 buckets has to be created: +* a temporary bucket where lambda binaries and CFT can be uploaded +* CSVs destination bucket, where all generated file will be uploaded +bucket must be in the same region where stack will be created. + +To deploy stack, follow these steps: +* download [lambda code binaries](deployment/binaries/arduino-s3-integration-lambda.zip) and [Cloud Formation Template](deployment/cloud-formation-template/deployment.yaml) +* Upload CFT and binary zip file on an S3 bucket accessible by the AWS account. For the CFT yaml file, copy the Object URL (it will be required in next step). + +![object URL](docs/objecturl.png) + +* Start creation of a new cloud formation stack. Follow these steps: + +![CFT 1](docs/cft-stack-1.png) + +* Fill all required parameters. +
**Mandatory**: Arduino API key and secret, S3 bucket where code has been uploaded, destination S3 bucket +
**Optional**: tag filter for filtering things, organization identifier and samples resolution + +![CFT 2](docs/cft-stack-2.png) ### Configuration parameters -Here is a list of all configuration properties available in 'Parameter store'. +Here is a list of all configuration properties available in 'Parameter store'. +These parameters are filled by CFT and can be adjusted later in case of need (for example, API keys rotation) | Parameter | Description | | --------- | ----------- | @@ -51,6 +73,28 @@ Here is a list of all configuration properties available in 'Parameter store'. | /arduino/s3-importer/iot/samples-resolution-seconds | (optional) samples resolution (default: 300s) | | /arduino/s3-importer/destination-bucket | S3 destination bucket | -### Policies +### Tag filtering + +It is possible to filter only the Things of interest. +You can use tag filtering if you need to reduce data export to a specific set of Things. -See policies defined in [cloud formation template](deployment/cloud-formation-template/deployment.yaml) +* Add a tag in Arduino Cloud UI on all Things you want to export. To do that, select a thing and go in 'metadata' section and add a new tag. + + +![tag 1](docs/tag-1.png) + +![tag 2](docs/tag-2.png) + +* Configure tag filter during CFT creation of by editing '/arduino/s3-importer/iot/filter/tags' parameter. + +![tag filter](docs/tag-filter.png) + +### Building code + +Code requires go v 1.22. +To compile code: + +```console +foo@bar:~$ ./compile-lambda.sh +arduino-s3-integration-lambda.zip archive created +``` \ No newline at end of file diff --git a/business/tsextractor/tsextractor.go b/business/tsextractor/tsextractor.go index a07423d..34c39d7 100755 --- a/business/tsextractor/tsextractor.go +++ b/business/tsextractor/tsextractor.go @@ -81,18 +81,27 @@ func (a *TsExtractor) ExportTSToS3( defer func() { <-tokens }() defer wg.Done() - // Populate numeric time series data - err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer) - if err != nil { - a.logger.Error("Error populating time series data: ", err) - return - } - - // Populate string time series data, if any - err = a.populateStringTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer) - if err != nil { - a.logger.Error("Error populating string time series data: ", err) - return + if resolution <= 0 { + // Populate raw time series data + err := a.populateRawTSDataIntoS3(ctx, from, to, thingID, thing, writer) + if err != nil { + a.logger.Error("Error populating raw time series data: ", err) + return + } + } else { + // Populate numeric time series data + err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer) + if err != nil { + a.logger.Error("Error populating time series data: ", err) + return + } + + // Populate string time series data, if any + err = a.populateStringTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer) + if err != nil { + a.logger.Error("Error populating string time series data: ", err) + return + } } }(thingID, thing, writer) } @@ -121,6 +130,10 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( resolution int, writer *csv.CsvWriter) error { + if resolution <= 60 { + resolution = 60 + } + var batched *iotclient.ArduinoSeriesBatch var err error var retry bool @@ -254,9 +267,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3( if value == nil { continue } - if strValue, ok := value.(string); ok { - samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, strValue)) - } + samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, interfaceToString(value))) } } @@ -270,3 +281,78 @@ func (a *TsExtractor) populateStringTSDataIntoS3( return nil } + +func (a *TsExtractor) populateRawTSDataIntoS3( + ctx context.Context, + from time.Time, + to time.Time, + thingID string, + thing iotclient.ArduinoThing, + writer *csv.CsvWriter) error { + + var batched *iotclient.ArduinoSeriesRawBatch + var err error + var retry bool + for i := 0; i < 3; i++ { + batched, retry, err = a.iotcl.GetRawTimeSeriesByThing(ctx, thingID, from, to) + if !retry { + break + } else { + // This is due to a rate limit on the IoT API, we need to wait a bit before retrying + a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID) + time.Sleep(1 * time.Second) + } + } + if err != nil { + return err + } + + sampleCount := int64(0) + samples := [][]string{} + for _, response := range batched.Responses { + if response.CountValues == 0 { + continue + } + + propertyID := strings.Replace(response.Query, "property.", "", 1) + a.logger.Infof("Thing %s - Query %s Property %s - %d values\n", thingID, response.Query, propertyID, response.CountValues) + sampleCount += response.CountValues + + propertyName := extractPropertyName(thing, propertyID) + + for i := 0; i < len(response.Times); i++ { + + ts := response.Times[i] + value := response.Values[i] + if value == nil { + continue + } + samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, interfaceToString(value))) + } + } + + // Write samples to csv ouput file + if len(samples) > 0 { + if err := writer.Write(samples); err != nil { + return err + } + a.logger.Debugf("Thing %s [%s] raw data saved %d values\n", thingID, thing.Name, sampleCount) + } + + return nil +} + +func interfaceToString(value interface{}) string { + switch v := value.(type) { + case string: + return v + case int: + return strconv.Itoa(v) + case float64: + return strconv.FormatFloat(v, 'f', 3, 64) + case bool: + return strconv.FormatBool(v) + default: + return fmt.Sprintf("%v", v) + } +} diff --git a/compile-lambda.sh b/compile-lambda.sh index 2be4c94..7af6ca9 100755 --- a/compile-lambda.sh +++ b/compile-lambda.sh @@ -1,6 +1,8 @@ #!/bin/bash +mkdir -p deployment/binaries GOOS=linux CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc lambda.go zip arduino-s3-integration-lambda.zip bootstrap +mv arduino-s3-integration-lambda.zip deployment/binaries/ rm bootstrap -echo "arduino-s3-integration-lambda.zip archive created" +echo "deployment/binaries/arduino-s3-integration-lambda.zip archive created" diff --git a/deployment/binaries/arduino-s3-integration-lambda.zip b/deployment/binaries/arduino-s3-integration-lambda.zip new file mode 100644 index 0000000..664e1b6 Binary files /dev/null and b/deployment/binaries/arduino-s3-integration-lambda.zip differ diff --git a/deployment/cloud-formation-template/deployment.yaml b/deployment/cloud-formation-template/deployment.yaml index 56445fd..b75a9e0 100644 --- a/deployment/cloud-formation-template/deployment.yaml +++ b/deployment/cloud-formation-template/deployment.yaml @@ -4,7 +4,7 @@ Description: Arduino S3 data exporter. For deployment and architectural details, Parameters: LambdaFunctionName: Type: String - Default: 'arduino-s3-data-exporter' + Default: 'arduino-s3-csv-data-exporter' Description: Name of the Lambda function. LambdaCodeS3Bucket: @@ -38,12 +38,11 @@ Parameters: Resolution: Type: Number Default: 300 - Description: Samples resolution in seconds. Default is 5 minutes (300s). + Description: Samples resolution in seconds. Default is 5 minutes (300s). Set to -1 to export raw data. DestinationS3Bucket: Type: String - Default: 'aws-arduino-data-export' - Description: S3 bucket where saved samples will be stored. + Description: S3 bucket where CSV files will be stored. Resources: @@ -164,6 +163,7 @@ Resources: Targets: - Arn: !GetAtt LambdaFunction.Arn Id: LambdaTarget + Input: '{}' State: ENABLED # Permission for EventBridge to invoke Lambda diff --git a/docs/cft-stack-1.png b/docs/cft-stack-1.png new file mode 100644 index 0000000..2b7b117 Binary files /dev/null and b/docs/cft-stack-1.png differ diff --git a/docs/cft-stack-2.png b/docs/cft-stack-2.png new file mode 100644 index 0000000..2ea43a9 Binary files /dev/null and b/docs/cft-stack-2.png differ diff --git a/docs/objecturl.png b/docs/objecturl.png new file mode 100644 index 0000000..575e87d Binary files /dev/null and b/docs/objecturl.png differ diff --git a/docs/tag-1.png b/docs/tag-1.png new file mode 100644 index 0000000..76c2589 Binary files /dev/null and b/docs/tag-1.png differ diff --git a/docs/tag-2.png b/docs/tag-2.png new file mode 100644 index 0000000..8141e31 Binary files /dev/null and b/docs/tag-2.png differ diff --git a/docs/tag-filter.png b/docs/tag-filter.png new file mode 100644 index 0000000..55c6c92 Binary files /dev/null and b/docs/tag-filter.png differ diff --git a/internal/iot/client.go b/internal/iot/client.go index 0720bae..468342a 100755 --- a/internal/iot/client.go +++ b/internal/iot/client.go @@ -43,6 +43,27 @@ func NewClient(key, secret, organization string) (*Client, error) { return cl, nil } +func (cl *Client) setup(client, secret, organization string) error { + baseURL := GetArduinoAPIBaseURL() + + // Configure a token source given the user's credentials. + cl.token = NewUserTokenSource(client, secret, baseURL) + + config := iotclient.NewConfiguration() + if organization != "" { + config.AddDefaultHeader("X-Organization", organization) + } + config.Servers = iotclient.ServerConfigurations{ + { + URL: fmt.Sprintf("%s/iot", baseURL), + Description: "IoT API endpoint", + }, + } + cl.api = iotclient.NewAPIClient(config) + + return nil +} + // DeviceList retrieves and returns a list of all Arduino IoT Cloud devices // belonging to the user performing the request. func (cl *Client) DeviceList(ctx context.Context, tags map[string]string) ([]iotclient.ArduinoDevicev2, error) { @@ -214,50 +235,6 @@ func (cl *Client) ThingTagsDelete(ctx context.Context, id string, keys []string) return nil } -func (cl *Client) GetTimeSeries(ctx context.Context, properties []string, from, to time.Time, interval int64) (*iotclient.ArduinoSeriesBatch, bool, error) { - if len(properties) == 0 { - return nil, false, fmt.Errorf("no properties provided") - } - - ctx, err := ctxWithToken(ctx, cl.token) - if err != nil { - return nil, false, err - } - - requests := make([]iotclient.BatchQueryRequestMediaV1, 0, len(properties)) - for _, prop := range properties { - if prop == "" { - continue - } - requests = append(requests, iotclient.BatchQueryRequestMediaV1{ - From: from, - Interval: &interval, - Q: fmt.Sprintf("property.%s", prop), - To: to, - }) - } - - if len(requests) == 0 { - return nil, false, fmt.Errorf("no valid properties provided") - } - - batchQueryRequestsMediaV1 := iotclient.BatchQueryRequestsMediaV1{ - Requests: requests, - } - - request := cl.api.SeriesV2Api.SeriesV2BatchQuery(ctx) - request = request.BatchQueryRequestsMediaV1(batchQueryRequestsMediaV1) - ts, httpResponse, err := cl.api.SeriesV2Api.SeriesV2BatchQueryExecute(request) - if err != nil { - err = fmt.Errorf("retrieving time series: %w", errorDetail(err)) - if httpResponse != nil && httpResponse.StatusCode == 429 { // Retry if rate limited - return nil, true, err - } - return nil, false, err - } - return ts, false, nil -} - func (cl *Client) GetTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time, interval int64) (*iotclient.ArduinoSeriesBatch, bool, error) { if thingID == "" { return nil, false, fmt.Errorf("no thing provided") @@ -309,15 +286,17 @@ func (cl *Client) GetTimeSeriesStringSampling(ctx context.Context, properties [] } requests := make([]iotclient.BatchQuerySampledRequestMediaV1, 0, len(properties)) + limit := int64(1000) for _, prop := range properties { if prop == "" { continue } requests = append(requests, iotclient.BatchQuerySampledRequestMediaV1{ - From: &from, - Interval: &interval, - Q: fmt.Sprintf("property.%s", prop), - To: &to, + From: &from, + Interval: &interval, + Q: fmt.Sprintf("property.%s", prop), + To: &to, + SeriesLimit: &limit, }) } @@ -342,23 +321,41 @@ func (cl *Client) GetTimeSeriesStringSampling(ctx context.Context, properties [] return ts, false, nil } -func (cl *Client) setup(client, secret, organization string) error { - baseURL := GetArduinoAPIBaseURL() - - // Configure a token source given the user's credentials. - cl.token = NewUserTokenSource(client, secret, baseURL) +func (cl *Client) GetRawTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time) (*iotclient.ArduinoSeriesRawBatch, bool, error) { + if thingID == "" { + return nil, false, fmt.Errorf("no thing provided") + } - config := iotclient.NewConfiguration() - if organization != "" { - config.AddDefaultHeader("X-Organization", organization) + ctx, err := ctxWithToken(ctx, cl.token) + if err != nil { + return nil, false, err } - config.Servers = iotclient.ServerConfigurations{ + + requests := []iotclient.BatchQueryRawRequestMediaV1{ { - URL: fmt.Sprintf("%s/iot", baseURL), - Description: "IoT API endpoint", + From: &from, + Q: fmt.Sprintf("thing.%s", thingID), + To: &to, }, } - cl.api = iotclient.NewAPIClient(config) - return nil + if len(requests) == 0 { + return nil, false, fmt.Errorf("no valid properties provided") + } + + batchQueryRequestsMediaV1 := iotclient.BatchQueryRawRequestsMediaV1{ + Requests: requests, + } + + request := cl.api.SeriesV2Api.SeriesV2BatchQueryRaw(ctx) + request = request.BatchQueryRawRequestsMediaV1(batchQueryRequestsMediaV1) + ts, httpResponse, err := cl.api.SeriesV2Api.SeriesV2BatchQueryRawExecute(request) + if err != nil { + err = fmt.Errorf("retrieving raw time series: %w", errorDetail(err)) + if httpResponse != nil && httpResponse.StatusCode == 429 { // Retry if rate limited + return nil, true, err + } + return nil, false, err + } + return ts, false, nil } diff --git a/lambda.go b/lambda.go index 39e37eb..5a55df5 100755 --- a/lambda.go +++ b/lambda.go @@ -83,9 +83,9 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err res := SamplesResolutionSeconds resolution = &res } - if *resolution < 60 || *resolution > 3600 { + if *resolution > 3600 { logger.Errorf("Resolution %d is invalid", *resolution) - return nil, errors.New("resolution must be between 60 and 3600") + return nil, errors.New("resolution must be between -1 and 3600") } logger.Infoln("------ Running import...") @@ -103,6 +103,11 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err if tags != nil { logger.Infoln("tags:", *tags) } + if *resolution <= 0 { + logger.Infoln("resolution: raw") + } else { + logger.Infoln("resolution:", *resolution, "seconds") + } err = importer.StartImport(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket) if err != nil {