Skip to content

Added raw data extraction support #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
vendor/

# IDEs and editors
.vscode
.idea
.env
.tool-versions

cover.out
80 changes: 62 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Arduino AWS S3 exporter
# Arduino AWS S3 CSV exporter

This project provides a way to extract time series samples from Arduino cloud, publishing to a S3 destination bucket.
Things can be filterd by tags.
Data are extracted at the given resolution via a scheduled Lambda function. Then samples are stored in CSV files and saved to S3.

## Deployment schema
## Architecture

S3 exporter is based on a Go lambda function triggered by periodic events from EventBridge.
Job is configured to extract samples for a 60min time window.
One file is created per run, containing all samples for the given hour. Time series samples are exported in UTC.
S3 exporter is based on a Go lambda function triggered by periodic event from EventBridge.
Job is configured to extract samples for a 60min time window with the default resolution of 5min.
One file is created per execution, containing all samples for selected things. Time series samples are exported at UTC timezone.
By default, all Arduino things present in the account are exported: it is possible to filter them via tags configuration.

CSV produced has the following structure:
```console
Expand All @@ -28,19 +29,40 @@ Files are organized in S3 bucket by date and files of the same day are grouped.
## Deployment via Cloud Formation Template

It is possible to deploy required resources via [cloud formation template](deployment/cloud-formation-template/deployment.yaml)
Required steps to deploy project:
* compile lambda
```console
foo@bar:~$ ./compile-lambda.sh
arduino-s3-integration-lambda.zip archive created
```
* Save zip file on an S3 bucket accessible by the AWS account
* Start creation of a new cloud formation stack provising the [cloud formation template](deployment/cloud-formation-template/deployment.yaml)
* Fill all required parameters (mandatory: Arduino API key and secret, S3 bucket and key where code has been uploaded, destination S3 bucket. Optionally, tag filter for filtering things, organization identifier and samples resolution)

CFT deployment requires:
* an AWS account with rights for creating a new CFT stack. Account must have rights to create:
* S3 buckets
* IAM Roles
* Lambda functions
* EventBridge rules
* SSM parameters (Parameter store)

Before stack creation, two S3 buckets has to be created:
* a temporary bucket where lambda binaries and CFT can be uploaded
* CSVs destination bucket, where all generated file will be uploaded
bucket must be in the same region where stack will be created.

To deploy stack, follow these steps:
* download [lambda code binaries](deployment/binaries/arduino-s3-integration-lambda.zip) and [Cloud Formation Template](deployment/cloud-formation-template/deployment.yaml)
* Upload CFT and binary zip file on an S3 bucket accessible by the AWS account. For the CFT yaml file, copy the Object URL (it will be required in next step).

![object URL](docs/objecturl.png)

* Start creation of a new cloud formation stack. Follow these steps:

![CFT 1](docs/cft-stack-1.png)

* Fill all required parameters.
<br/>**Mandatory**: Arduino API key and secret, S3 bucket where code has been uploaded, destination S3 bucket
<br/>**Optional**: tag filter for filtering things, organization identifier and samples resolution

![CFT 2](docs/cft-stack-2.png)

### Configuration parameters

Here is a list of all configuration properties available in 'Parameter store'.
Here is a list of all configuration properties available in 'Parameter store'.
These parameters are filled by CFT and can be adjusted later in case of need (for example, API keys rotation)

| Parameter | Description |
| --------- | ----------- |
Expand All @@ -51,6 +73,28 @@ Here is a list of all configuration properties available in 'Parameter store'.
| /arduino/s3-importer/iot/samples-resolution-seconds | (optional) samples resolution (default: 300s) |
| /arduino/s3-importer/destination-bucket | S3 destination bucket |

### Policies
### Tag filtering

It is possible to filter only the Things of interest.
You can use tag filtering if you need to reduce data export to a specific set of Things.

See policies defined in [cloud formation template](deployment/cloud-formation-template/deployment.yaml)
* Add a tag in Arduino Cloud UI on all Things you want to export. To do that, select a thing and go in 'metadata' section and add a new tag.


![tag 1](docs/tag-1.png)

![tag 2](docs/tag-2.png)

* Configure tag filter during CFT creation of by editing '/arduino/s3-importer/iot/filter/tags' parameter.

![tag filter](docs/tag-filter.png)

### Building code

Code requires go v 1.22.
To compile code:

```console
foo@bar:~$ ./compile-lambda.sh
arduino-s3-integration-lambda.zip archive created
```
116 changes: 101 additions & 15 deletions business/tsextractor/tsextractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,27 @@ func (a *TsExtractor) ExportTSToS3(
defer func() { <-tokens }()
defer wg.Done()

// Populate numeric time series data
err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
if err != nil {
a.logger.Error("Error populating time series data: ", err)
return
}

// Populate string time series data, if any
err = a.populateStringTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
if err != nil {
a.logger.Error("Error populating string time series data: ", err)
return
if resolution <= 0 {
// Populate raw time series data
err := a.populateRawTSDataIntoS3(ctx, from, to, thingID, thing, writer)
if err != nil {
a.logger.Error("Error populating raw time series data: ", err)
return
}
} else {
// Populate numeric time series data
err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
if err != nil {
a.logger.Error("Error populating time series data: ", err)
return
}

// Populate string time series data, if any
err = a.populateStringTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer)
if err != nil {
a.logger.Error("Error populating string time series data: ", err)
return
}
}
}(thingID, thing, writer)
}
Expand Down Expand Up @@ -121,6 +130,10 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
resolution int,
writer *csv.CsvWriter) error {

if resolution <= 60 {
resolution = 60
}

var batched *iotclient.ArduinoSeriesBatch
var err error
var retry bool
Expand Down Expand Up @@ -254,9 +267,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
if value == nil {
continue
}
if strValue, ok := value.(string); ok {
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, strValue))
}
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, interfaceToString(value)))
}
}

Expand All @@ -270,3 +281,78 @@ func (a *TsExtractor) populateStringTSDataIntoS3(

return nil
}

func (a *TsExtractor) populateRawTSDataIntoS3(
ctx context.Context,
from time.Time,
to time.Time,
thingID string,
thing iotclient.ArduinoThing,
writer *csv.CsvWriter) error {

var batched *iotclient.ArduinoSeriesRawBatch
var err error
var retry bool
for i := 0; i < 3; i++ {
batched, retry, err = a.iotcl.GetRawTimeSeriesByThing(ctx, thingID, from, to)
if !retry {
break
} else {
// This is due to a rate limit on the IoT API, we need to wait a bit before retrying
a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID)
time.Sleep(1 * time.Second)
}
}
if err != nil {
return err
}

sampleCount := int64(0)
samples := [][]string{}
for _, response := range batched.Responses {
if response.CountValues == 0 {
continue
}

propertyID := strings.Replace(response.Query, "property.", "", 1)
a.logger.Infof("Thing %s - Query %s Property %s - %d values\n", thingID, response.Query, propertyID, response.CountValues)
sampleCount += response.CountValues

propertyName := extractPropertyName(thing, propertyID)

for i := 0; i < len(response.Times); i++ {

ts := response.Times[i]
value := response.Values[i]
if value == nil {
continue
}
samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, interfaceToString(value)))
}
}

// Write samples to csv ouput file
if len(samples) > 0 {
if err := writer.Write(samples); err != nil {
return err
}
a.logger.Debugf("Thing %s [%s] raw data saved %d values\n", thingID, thing.Name, sampleCount)
}

return nil
}

func interfaceToString(value interface{}) string {
switch v := value.(type) {
case string:
return v
case int:
return strconv.Itoa(v)
case float64:
return strconv.FormatFloat(v, 'f', 3, 64)
case bool:
return strconv.FormatBool(v)
default:
return fmt.Sprintf("%v", v)
}
}
4 changes: 3 additions & 1 deletion compile-lambda.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/bin/bash

mkdir -p deployment/binaries
GOOS=linux CGO_ENABLED=0 go build -o bootstrap -tags lambda.norpc lambda.go
zip arduino-s3-integration-lambda.zip bootstrap
mv arduino-s3-integration-lambda.zip deployment/binaries/
rm bootstrap
echo "arduino-s3-integration-lambda.zip archive created"
echo "deployment/binaries/arduino-s3-integration-lambda.zip archive created"
Binary file not shown.
8 changes: 4 additions & 4 deletions deployment/cloud-formation-template/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Description: Arduino S3 data exporter. For deployment and architectural details,
Parameters:
LambdaFunctionName:
Type: String
Default: 'arduino-s3-data-exporter'
Default: 'arduino-s3-csv-data-exporter'
Description: Name of the Lambda function.

LambdaCodeS3Bucket:
Expand Down Expand Up @@ -38,12 +38,11 @@ Parameters:
Resolution:
Type: Number
Default: 300
Description: Samples resolution in seconds. Default is 5 minutes (300s).
Description: Samples resolution in seconds. Default is 5 minutes (300s). Set to -1 to export raw data.

DestinationS3Bucket:
Type: String
Default: 'aws-arduino-data-export'
Description: S3 bucket where saved samples will be stored.
Description: S3 bucket where CSV files will be stored.

Resources:

Expand Down Expand Up @@ -164,6 +163,7 @@ Resources:
Targets:
- Arn: !GetAtt LambdaFunction.Arn
Id: LambdaTarget
Input: '{}'
State: ENABLED

# Permission for EventBridge to invoke Lambda
Expand Down
Binary file added docs/cft-stack-1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/cft-stack-2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/objecturl.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/tag-1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/tag-2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/tag-filter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading