diff --git a/README.md b/README.md index 5d96b1b..4b5aa7d 100644 --- a/README.md +++ b/README.md @@ -15,11 +15,11 @@ By default, all Arduino things present in the account are exported: it is possib CSV produced has the following structure: ```console -timestamp,thing_id,thing_name,property_id,property_name,property_type,value -2024-09-04T11:00:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,3 -2024-09-04T11:01:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,7 -2024-09-04T11:02:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,11 -2024-09-04T11:03:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,15 +timestamp,thing_id,thing_name,property_id,property_name,property_type,value,aggregation_statistic +2024-09-04T11:00:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,3,AVG +2024-09-04T11:01:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,7,AVG +2024-09-04T11:02:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,11,AVG +2024-09-04T11:03:00Z,07846f3c-37ae-4722-a3f5-65d7b4449ad3,H7,137c02d0-b50f-47fb-a2eb-b6d23884ec51,m3,FLOAT,15,AVG ``` Files are organized by date and files of the same day are grouped. @@ -76,6 +76,9 @@ These parameters are filled by CFT at stack creation time and can be adjusted la | /arduino/s3-exporter/{stack-name}/iot/samples-resolution | (optional) samples aggregation resolution (1/5/15 minutes, 1 hour, raw) | | /arduino/s3-exporter/{stack-name}/destination-bucket | S3 destination bucket | | /arduino/s3-exporter/{stack-name}/iot/scheduling | Execution scheduling | +| /arduino/s3-exporter/{stack-name}/iot/aggregation-statistic | Aggregation statistic | + +It is possible to compress (with gzip) files before uploading to S3. To enable compression, add ENABLE_COMPRESSION env variable to lambda configuration (with value true/false). ### Tag filtering diff --git a/app/importer/importer.go b/app/exporter/exporter.go similarity index 55% rename from app/importer/importer.go rename to app/exporter/exporter.go index b0e46ad..d945273 100755 --- a/app/importer/importer.go +++ b/app/exporter/exporter.go @@ -13,19 +13,22 @@ // Arduino software without disclosing the source code of your own applications. // To purchase a commercial license, send an email to license@arduino.cc. -package importer +package exporter import ( "context" + "fmt" + "os" "github.com/arduino/aws-s3-integration/business/tsextractor" "github.com/arduino/aws-s3-integration/internal/iot" + "github.com/arduino/aws-s3-integration/internal/s3" "github.com/arduino/aws-s3-integration/internal/utils" iotclient "github.com/arduino/iot-client-go/v2" "github.com/sirupsen/logrus" ) -func StartImport(ctx context.Context, logger *logrus.Entry, key, secret, orgid string, tagsF *string, resolution, timeWindowMinutes int, destinationS3Bucket string) error { +func StartExporter(ctx context.Context, logger *logrus.Entry, key, secret, orgid string, tagsF *string, resolution, timeWindowMinutes int, destinationS3Bucket string, aggregationStat string, compress bool) error { // Init client iotcl, err := iot.NewClient(key, secret, orgid) @@ -50,8 +53,39 @@ func StartImport(ctx context.Context, logger *logrus.Entry, key, secret, orgid s // Extract data points from thing and push to S3 tsextractorClient := tsextractor.New(iotcl, logger) - if err := tsextractorClient.ExportTSToS3(ctx, timeWindowMinutes, thingsMap, resolution, destinationS3Bucket); err != nil { + + // Open s3 output writer + s3cl, err := s3.NewS3Client(destinationS3Bucket) + if err != nil { + return err + } + + if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat); err != nil { logger.Error("Error aligning time series samples: ", err) + return err + } else { + writer.Close() + defer writer.Delete() + + fileToUpload := writer.GetFilePath() + destinationKeyFormat := "%s/%s.csv" + if compress { + logger.Infof("Compressing file: %s\n", fileToUpload) + compressedFile, err := utils.GzipFileCompression(fileToUpload) + if err != nil { + return err + } + fileToUpload = compressedFile + logger.Infof("Generated compressed file: %s\n", fileToUpload) + destinationKeyFormat = "%s/%s.csv.gz" + defer func(f string) { os.Remove(f) }(fileToUpload) + } + + destinationKey := fmt.Sprintf(destinationKeyFormat, from.Format("2006-01-02"), from.Format("2006-01-02-15-04")) + logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey) + if err := s3cl.WriteFile(ctx, destinationKey, fileToUpload); err != nil { + return err + } } return nil diff --git a/business/tsextractor/tsextractor.go b/business/tsextractor/tsextractor.go index 03d16fb..412a536 100755 --- a/business/tsextractor/tsextractor.go +++ b/business/tsextractor/tsextractor.go @@ -17,6 +17,7 @@ package tsextractor import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -28,7 +29,6 @@ import ( "github.com/arduino/aws-s3-integration/internal/csv" "github.com/arduino/aws-s3-integration/internal/iot" - "github.com/arduino/aws-s3-integration/internal/s3" iotclient "github.com/arduino/iot-client-go/v2" "github.com/sirupsen/logrus" ) @@ -37,11 +37,11 @@ const importConcurrency = 10 const retryCount = 5 type TsExtractor struct { - iotcl *iot.Client + iotcl iot.API logger *logrus.Entry } -func New(iotcl *iot.Client, logger *logrus.Entry) *TsExtractor { +func New(iotcl iot.API, logger *logrus.Entry) *TsExtractor { return &TsExtractor{iotcl: iotcl, logger: logger} } @@ -51,36 +51,42 @@ func computeTimeAlignment(resolutionSeconds, timeWindowInMinutes int) (time.Time resolutionSeconds = 300 // Align to 5 minutes } to := time.Now().Truncate(time.Duration(resolutionSeconds) * time.Second).UTC() + if resolutionSeconds <= 900 { + // Shift time window to avoid missing data + to = to.Add(-time.Duration(300) * time.Second) + } from := to.Add(-time.Duration(timeWindowInMinutes) * time.Minute) return from, to } -func (a *TsExtractor) ExportTSToS3( +func isRawResolution(resolution int) bool { + return resolution <= 0 +} + +func (a *TsExtractor) ExportTSToFile( ctx context.Context, timeWindowInMinutes int, thingsMap map[string]iotclient.ArduinoThing, resolution int, - destinationS3Bucket string) error { + aggregationStat string) (*csv.CsvWriter, time.Time, error) { // Truncate time to given resolution from, to := computeTimeAlignment(resolution, timeWindowInMinutes) - // Open s3 output writer - s3cl, err := s3.NewS3Client(destinationS3Bucket) - if err != nil { - return err - } - // Open csv output writer - writer, err := csv.NewWriter(from, a.logger) + writer, err := csv.NewWriter(from, a.logger, isRawResolution(resolution)) if err != nil { - return err + return nil, from, err } var wg sync.WaitGroup tokens := make(chan struct{}, importConcurrency) - a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to) + if isRawResolution(resolution) { + a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: raw") + } else { + a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: ", aggregationStat) + } for thingID, thing := range thingsMap { if thing.Properties == nil || len(thing.Properties) == 0 { @@ -95,7 +101,7 @@ func (a *TsExtractor) ExportTSToS3( defer func() { <-tokens }() defer wg.Done() - if resolution <= 0 { + if isRawResolution(resolution) { // Populate raw time series data err := a.populateRawTSDataIntoS3(ctx, from, to, thingID, thing, writer) if err != nil { @@ -104,7 +110,7 @@ func (a *TsExtractor) ExportTSToS3( } } else { // Populate numeric time series data - err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer) + err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, aggregationStat, writer) if err != nil { a.logger.Error("Error populating time series data: ", err) return @@ -124,17 +130,7 @@ func (a *TsExtractor) ExportTSToS3( a.logger.Infoln("Waiting for all data extraction jobs to terminate...") wg.Wait() - // Close csv output writer and upload to s3 - writer.Close() - defer writer.Delete() - - destinationKey := fmt.Sprintf("%s/%s.csv", from.Format("2006-01-02"), from.Format("2006-01-02-15-04")) - a.logger.Infof("Uploading file %s to bucket %s\n", destinationKey, s3cl.DestinationBucket()) - if err := s3cl.WriteFile(ctx, destinationKey, writer.GetFilePath()); err != nil { - return err - } - - return nil + return writer, from, nil } func randomRateLimitingSleep() { @@ -155,6 +151,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( thingID string, thing iotclient.ArduinoThing, resolution int, + aggregationStat string, writer *csv.CsvWriter) error { if resolution <= 60 { @@ -165,7 +162,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( var err error var retry bool for i := 0; i < retryCount; i++ { - batched, retry, err = a.iotcl.GetTimeSeriesByThing(ctx, thingID, from, to, int64(resolution)) + batched, retry, err = a.iotcl.GetTimeSeriesByThing(ctx, thingID, from, to, int64(resolution), aggregationStat) if !retry { break } else { @@ -195,7 +192,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( ts := response.Times[i] value := response.Values[i] - samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64))) + samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64), aggregationStat)) } } @@ -210,7 +207,20 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( return nil } -func composeRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string) []string { +func composeRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string, aggregation string) []string { + row := make([]string, 8) + row[0] = ts.UTC().Format(time.RFC3339) + row[1] = thingID + row[2] = thingName + row[3] = propertyID + row[4] = propertyName + row[5] = propertyType + row[6] = value + row[7] = aggregation + return row +} + +func composeRawRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string) []string { row := make([]string, 7) row[0] = ts.UTC().Format(time.RFC3339) row[1] = thingID @@ -300,7 +310,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3( if value == nil { continue } - samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, interfaceToString(value))) + samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), "")) } } @@ -360,7 +370,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3( if value == nil { continue } - samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, interfaceToString(value))) + samples = append(samples, composeRawRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value))) } } @@ -375,7 +385,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3( return nil } -func interfaceToString(value interface{}) string { +func (a *TsExtractor) interfaceToString(value interface{}) string { switch v := value.(type) { case string: return v @@ -385,6 +395,13 @@ func interfaceToString(value interface{}) string { return strconv.FormatFloat(v, 'f', -1, 64) case bool: return strconv.FormatBool(v) + case map[string]any: + encoded, err := json.Marshal(v) + if err != nil { + a.logger.Error("Error encoding map to json: ", err) + return fmt.Sprintf("%v", v) + } + return string(encoded) default: return fmt.Sprintf("%v", v) } diff --git a/business/tsextractor/tsextractor_test.go b/business/tsextractor/tsextractor_test.go index 57400f5..218cb13 100644 --- a/business/tsextractor/tsextractor_test.go +++ b/business/tsextractor/tsextractor_test.go @@ -1,9 +1,18 @@ package tsextractor import ( + "context" + "fmt" + "io" + "os" "testing" + "time" + iotMocks "github.com/arduino/aws-s3-integration/internal/iot/mocks" + iotclient "github.com/arduino/iot-client-go/v2" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestTimeAlignment_HourlyTimeWindows(t *testing.T) { @@ -35,3 +44,73 @@ func TestTimeAlignment_raw_HourlyTimeWindows(t *testing.T) { from, to := computeTimeAlignment(-1, 60) assert.Equal(t, int64(3600), to.Unix()-from.Unix()) } + +func toPtr(val string) *string { + return &val +} + +func TestExtractionFlow_defaultAggregation(t *testing.T) { + logger := logrus.NewEntry(logrus.New()) + ctx := context.Background() + + thingId := "91f30213-2bd7-480a-b1dc-f31b01840e7e" + propertyId := "c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac" + + // Init client + iotcl := iotMocks.NewAPI(t) + + now := time.Now() + responses := []iotclient.ArduinoSeriesResponse{ + { + Aggregation: toPtr("AVG"), + Query: fmt.Sprintf("property.%s", propertyId), + Times: []time.Time{now.Add(-time.Minute * 1), now}, + Values: []float64{1.0, 2.0}, + CountValues: 2, + }, + } + samples := iotclient.ArduinoSeriesBatch{ + Responses: responses, + } + iotcl.On("GetTimeSeriesByThing", ctx, thingId, mock.Anything, mock.Anything, int64(300), "AVG").Return(&samples, false, nil) + + tsextractorClient := New(iotcl, logger) + + propCount := int64(1) + thingsMap := make(map[string]iotclient.ArduinoThing) + thingsMap[thingId] = iotclient.ArduinoThing{ + Id: thingId, + Name: "test", + Properties: []iotclient.ArduinoProperty{ + { + Name: "ptest", + Id: propertyId, + Type: "FLOAT", + }, + }, + PropertiesCount: &propCount, + } + + writer, from, err := tsextractorClient.ExportTSToFile(ctx, 60, thingsMap, 300, "AVG") + assert.NoError(t, err) + assert.NotNil(t, writer) + assert.NotNil(t, from) + + writer.Close() + defer writer.Delete() + + outF, err := os.Open(writer.GetFilePath()) + assert.NoError(t, err) + defer outF.Close() + content, err := io.ReadAll(outF) + assert.NoError(t, err) + entries := []string{ + "timestamp,thing_id,thing_name,property_id,property_name,property_type,value", + "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac,ptest,FLOAT,1,AVG", + "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac,ptest,FLOAT,2,AVG", + } + for _, entry := range entries { + assert.Contains(t, string(content), entry) + } + fmt.Println(string(content)) +} diff --git a/deployment/cloud-formation-template/deployment.yaml b/deployment/cloud-formation-template/deployment.yaml index 57099a6..a26405d 100644 --- a/deployment/cloud-formation-template/deployment.yaml +++ b/deployment/cloud-formation-template/deployment.yaml @@ -46,6 +46,17 @@ Parameters: - 1 hour Default: 5 minutes + ResolutionAggregationStatistic: + Type: String + Description: "Aggregation statistic for data extraction. It is not applicable for 'raw' resolution." + AllowedValues: + - AVG + - MIN + - MAX + - PCT_90 + - PCT_90 + Default: AVG + TagFilter: Type: String Default: '' @@ -177,6 +188,15 @@ Resources: Ref: ExecutionScheduling Tier: Standard + AggregationStatisticParameter: + Type: AWS::SSM::Parameter + Properties: + Name: !Sub /arduino/s3-exporter/${AWS::StackName}/iot/aggregation-statistic + Type: String + Value: + Ref: ResolutionAggregationStatistic + Tier: Standard + # EventBridge Rule to trigger Lambda every hour EventBridgeRule: Type: AWS::Events::Rule diff --git a/go.mod b/go.mod index 0d4359f..5201961 100755 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/arduino/aws-s3-integration -go 1.22.6 +go 1.22 require ( github.com/arduino/iot-client-go/v2 v2.0.4 @@ -33,6 +33,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect golang.org/x/sys v0.20.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 000b46a..e18e4d8 100755 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= diff --git a/internal/csv/csv.go b/internal/csv/csv.go index 18aa870..00ce201 100644 --- a/internal/csv/csv.go +++ b/internal/csv/csv.go @@ -15,16 +15,22 @@ const ( baseTmpStorage = "/tmp" ) -var csvHeader = []string{"timestamp", "thing_id", "thing_name", "property_id", "property_name", "property_type", "value"} +var csvHeader = []string{"timestamp", "thing_id", "thing_name", "property_id", "property_name", "property_type", "value", "aggregation_statistic"} +var csvHeaderRaw = []string{"timestamp", "thing_id", "thing_name", "property_id", "property_name", "property_type", "value"} -func NewWriter(destinationHour time.Time, logger *logrus.Entry) (*CsvWriter, error) { +func NewWriter(destinationHour time.Time, logger *logrus.Entry, isRawData bool) (*CsvWriter, error) { filePath := fmt.Sprintf("%s/%s.csv", baseTmpStorage, destinationHour.Format("2006-01-02-15-04")) file, err := os.Create(filePath) if err != nil { logger.Fatalf("failed creating file: %s", err) } writer := csv.NewWriter(file) - if err := writer.Write(csvHeader); err != nil { + + header := csvHeader + if isRawData { + header = csvHeaderRaw + } + if err := writer.Write(header); err != nil { logger.Fatalf("failed writing record to file: %s", err) } return &CsvWriter{ @@ -32,6 +38,7 @@ func NewWriter(destinationHour time.Time, logger *logrus.Entry) (*CsvWriter, err logger: logger, csvWriter: writer, filePath: filePath, + isRawData: isRawData, }, nil } @@ -41,6 +48,7 @@ type CsvWriter struct { logger *logrus.Entry csvWriter *csv.Writer filePath string + isRawData bool } func (c *CsvWriter) Write(records [][]string) error { diff --git a/internal/iot/client.go b/internal/iot/client.go index aa9ca72..0c61f23 100755 --- a/internal/iot/client.go +++ b/internal/iot/client.go @@ -25,6 +25,14 @@ import ( var ErrOtaAlreadyInProgress = fmt.Errorf("ota already in progress") +//go:generate mockery --name API --filename iot_api.go +type API interface { + ThingList(ctx context.Context, ids []string, device *string, props bool, tags map[string]string) ([]iotclient.ArduinoThing, error) + GetTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time, interval int64, aggregationStat string) (*iotclient.ArduinoSeriesBatch, bool, error) + GetTimeSeriesStringSampling(ctx context.Context, properties []string, from, to time.Time, interval int32) (*iotclient.ArduinoSeriesBatchSampled, bool, error) + GetRawTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time) (*iotclient.ArduinoSeriesRawBatch, bool, error) +} + // Client can perform actions on Arduino IoT Cloud. type Client struct { api *iotclient.APIClient @@ -64,104 +72,6 @@ func (cl *Client) setup(client, secret, organizationId string) error { return nil } -// DeviceList retrieves and returns a list of all Arduino IoT Cloud devices -// belonging to the user performing the request. -func (cl *Client) DeviceList(ctx context.Context, tags map[string]string) ([]iotclient.ArduinoDevicev2, error) { - ctx, err := ctxWithToken(ctx, cl.token) - if err != nil { - return nil, err - } - - request := cl.api.DevicesV2Api.DevicesV2List(ctx) - if tags != nil { - t := make([]string, 0, len(tags)) - for key, val := range tags { - // Use the 'key:value' format required from the backend - t = append(t, key+":"+val) - } - request = request.Tags(t) - } - devices, _, err := cl.api.DevicesV2Api.DevicesV2ListExecute(request) - if err != nil { - err = fmt.Errorf("listing devices: %w", errorDetail(err)) - return nil, err - } - return devices, nil -} - -// DeviceShow allows to retrieve a specific device, given its id, -// from Arduino IoT Cloud. -func (cl *Client) DeviceShow(ctx context.Context, id string) (*iotclient.ArduinoDevicev2, error) { - ctx, err := ctxWithToken(ctx, cl.token) - if err != nil { - return nil, err - } - - request := cl.api.DevicesV2Api.DevicesV2Show(ctx, id) - dev, _, err := cl.api.DevicesV2Api.DevicesV2ShowExecute(request) - if err != nil { - err = fmt.Errorf("retrieving device, %w", errorDetail(err)) - return nil, err - } - return dev, nil -} - -// DeviceTagsCreate allows to create or overwrite tags on a device of Arduino IoT Cloud. -func (cl *Client) DeviceTagsCreate(ctx context.Context, id string, tags map[string]string) error { - ctx, err := ctxWithToken(ctx, cl.token) - if err != nil { - return err - } - - for key, val := range tags { - t := iotclient.Tag{Key: key, Value: val} - request := cl.api.DevicesV2TagsApi.DevicesV2TagsUpsert(ctx, id) - request = request.Tag(t) - _, err := cl.api.DevicesV2TagsApi.DevicesV2TagsUpsertExecute(request) - if err != nil { - err = fmt.Errorf("cannot create tag %s: %w", key, errorDetail(err)) - return err - } - } - return nil -} - -// DeviceTagsDelete deletes the tags of a device of Arduino IoT Cloud, -// given the device id and the keys of the tags. -func (cl *Client) DeviceTagsDelete(ctx context.Context, id string, keys []string) error { - ctx, err := ctxWithToken(ctx, cl.token) - if err != nil { - return err - } - - for _, key := range keys { - request := cl.api.DevicesV2TagsApi.DevicesV2TagsDelete(ctx, id, key) - _, err := cl.api.DevicesV2TagsApi.DevicesV2TagsDeleteExecute(request) - if err != nil { - err = fmt.Errorf("cannot delete tag %s: %w", key, errorDetail(err)) - return err - } - } - return nil -} - -// ThingShow allows to retrieve a specific thing, given its id, -// from Arduino IoT Cloud. -func (cl *Client) ThingShow(ctx context.Context, id string) (*iotclient.ArduinoThing, error) { - ctx, err := ctxWithToken(ctx, cl.token) - if err != nil { - return nil, err - } - - request := cl.api.ThingsV2Api.ThingsV2Show(ctx, id) - thing, _, err := cl.api.ThingsV2Api.ThingsV2ShowExecute(request) - if err != nil { - err = fmt.Errorf("retrieving thing, %w", errorDetail(err)) - return nil, err - } - return thing, nil -} - // ThingList returns a list of things on Arduino IoT Cloud. func (cl *Client) ThingList(ctx context.Context, ids []string, device *string, props bool, tags map[string]string) ([]iotclient.ArduinoThing, error) { ctx, err := ctxWithToken(ctx, cl.token) @@ -197,45 +107,7 @@ func (cl *Client) ThingList(ctx context.Context, ids []string, device *string, p return things, nil } -// ThingTagsCreate allows to create or overwrite tags on a thing of Arduino IoT Cloud. -func (cl *Client) ThingTagsCreate(ctx context.Context, id string, tags map[string]string) error { - ctx, err := ctxWithToken(ctx, cl.token) - if err != nil { - return err - } - - for key, val := range tags { - t := iotclient.Tag{Key: key, Value: val} - request := cl.api.ThingsV2TagsApi.ThingsV2TagsUpsert(ctx, id) - _, err := cl.api.ThingsV2TagsApi.ThingsV2TagsUpsertExecute(request.Tag(t)) - if err != nil { - err = fmt.Errorf("cannot create tag %s: %w", key, errorDetail(err)) - return err - } - } - return nil -} - -// ThingTagsDelete deletes the tags of a thing of Arduino IoT Cloud, -// given the thing id and the keys of the tags. -func (cl *Client) ThingTagsDelete(ctx context.Context, id string, keys []string) error { - ctx, err := ctxWithToken(ctx, cl.token) - if err != nil { - return err - } - - for _, key := range keys { - request := cl.api.ThingsV2TagsApi.ThingsV2TagsDelete(ctx, id, key) - _, err := cl.api.ThingsV2TagsApi.ThingsV2TagsDeleteExecute(request) - if err != nil { - err = fmt.Errorf("cannot delete tag %s: %w", key, errorDetail(err)) - return err - } - } - return nil -} - -func (cl *Client) GetTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time, interval int64) (*iotclient.ArduinoSeriesBatch, bool, error) { +func (cl *Client) GetTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time, interval int64, aggregationStat string) (*iotclient.ArduinoSeriesBatch, bool, error) { if thingID == "" { return nil, false, fmt.Errorf("no thing provided") } @@ -247,10 +119,11 @@ func (cl *Client) GetTimeSeriesByThing(ctx context.Context, thingID string, from requests := []iotclient.BatchQueryRequestMediaV1{ { - From: from, - Interval: &interval, - Q: fmt.Sprintf("thing.%s", thingID), - To: to, + From: from, + Interval: &interval, + Q: fmt.Sprintf("thing.%s", thingID), + To: to, + Aggregation: &aggregationStat, }, } diff --git a/internal/iot/mocks/iot_api.go b/internal/iot/mocks/iot_api.go new file mode 100644 index 0000000..28374ae --- /dev/null +++ b/internal/iot/mocks/iot_api.go @@ -0,0 +1,173 @@ +// Code generated by mockery v2.44.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + iot "github.com/arduino/iot-client-go/v2" + + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// API is an autogenerated mock type for the API type +type API struct { + mock.Mock +} + +// GetRawTimeSeriesByThing provides a mock function with given fields: ctx, thingID, from, to +func (_m *API) GetRawTimeSeriesByThing(ctx context.Context, thingID string, from time.Time, to time.Time) (*iot.ArduinoSeriesRawBatch, bool, error) { + ret := _m.Called(ctx, thingID, from, to) + + if len(ret) == 0 { + panic("no return value specified for GetRawTimeSeriesByThing") + } + + var r0 *iot.ArduinoSeriesRawBatch + var r1 bool + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string, time.Time, time.Time) (*iot.ArduinoSeriesRawBatch, bool, error)); ok { + return rf(ctx, thingID, from, to) + } + if rf, ok := ret.Get(0).(func(context.Context, string, time.Time, time.Time) *iot.ArduinoSeriesRawBatch); ok { + r0 = rf(ctx, thingID, from, to) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*iot.ArduinoSeriesRawBatch) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, time.Time, time.Time) bool); ok { + r1 = rf(ctx, thingID, from, to) + } else { + r1 = ret.Get(1).(bool) + } + + if rf, ok := ret.Get(2).(func(context.Context, string, time.Time, time.Time) error); ok { + r2 = rf(ctx, thingID, from, to) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// GetTimeSeriesByThing provides a mock function with given fields: ctx, thingID, from, to, interval, aggregationStat +func (_m *API) GetTimeSeriesByThing(ctx context.Context, thingID string, from time.Time, to time.Time, interval int64, aggregationStat string) (*iot.ArduinoSeriesBatch, bool, error) { + ret := _m.Called(ctx, thingID, from, to, interval, aggregationStat) + + if len(ret) == 0 { + panic("no return value specified for GetTimeSeriesByThing") + } + + var r0 *iot.ArduinoSeriesBatch + var r1 bool + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string, time.Time, time.Time, int64, string) (*iot.ArduinoSeriesBatch, bool, error)); ok { + return rf(ctx, thingID, from, to, interval, aggregationStat) + } + if rf, ok := ret.Get(0).(func(context.Context, string, time.Time, time.Time, int64, string) *iot.ArduinoSeriesBatch); ok { + r0 = rf(ctx, thingID, from, to, interval, aggregationStat) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*iot.ArduinoSeriesBatch) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, time.Time, time.Time, int64, string) bool); ok { + r1 = rf(ctx, thingID, from, to, interval, aggregationStat) + } else { + r1 = ret.Get(1).(bool) + } + + if rf, ok := ret.Get(2).(func(context.Context, string, time.Time, time.Time, int64, string) error); ok { + r2 = rf(ctx, thingID, from, to, interval, aggregationStat) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// GetTimeSeriesStringSampling provides a mock function with given fields: ctx, properties, from, to, interval +func (_m *API) GetTimeSeriesStringSampling(ctx context.Context, properties []string, from time.Time, to time.Time, interval int32) (*iot.ArduinoSeriesBatchSampled, bool, error) { + ret := _m.Called(ctx, properties, from, to, interval) + + if len(ret) == 0 { + panic("no return value specified for GetTimeSeriesStringSampling") + } + + var r0 *iot.ArduinoSeriesBatchSampled + var r1 bool + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, []string, time.Time, time.Time, int32) (*iot.ArduinoSeriesBatchSampled, bool, error)); ok { + return rf(ctx, properties, from, to, interval) + } + if rf, ok := ret.Get(0).(func(context.Context, []string, time.Time, time.Time, int32) *iot.ArduinoSeriesBatchSampled); ok { + r0 = rf(ctx, properties, from, to, interval) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*iot.ArduinoSeriesBatchSampled) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []string, time.Time, time.Time, int32) bool); ok { + r1 = rf(ctx, properties, from, to, interval) + } else { + r1 = ret.Get(1).(bool) + } + + if rf, ok := ret.Get(2).(func(context.Context, []string, time.Time, time.Time, int32) error); ok { + r2 = rf(ctx, properties, from, to, interval) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// ThingList provides a mock function with given fields: ctx, ids, device, props, tags +func (_m *API) ThingList(ctx context.Context, ids []string, device *string, props bool, tags map[string]string) ([]iot.ArduinoThing, error) { + ret := _m.Called(ctx, ids, device, props, tags) + + if len(ret) == 0 { + panic("no return value specified for ThingList") + } + + var r0 []iot.ArduinoThing + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []string, *string, bool, map[string]string) ([]iot.ArduinoThing, error)); ok { + return rf(ctx, ids, device, props, tags) + } + if rf, ok := ret.Get(0).(func(context.Context, []string, *string, bool, map[string]string) []iot.ArduinoThing); ok { + r0 = rf(ctx, ids, device, props, tags) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]iot.ArduinoThing) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []string, *string, bool, map[string]string) error); ok { + r1 = rf(ctx, ids, device, props, tags) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewAPI creates a new instance of API. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewAPI(t interface { + mock.TestingT + Cleanup(func()) +}) *API { + mock := &API{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/s3/mocks/s3_api.go b/internal/s3/mocks/s3_api.go new file mode 100644 index 0000000..0f65dc2 --- /dev/null +++ b/internal/s3/mocks/s3_api.go @@ -0,0 +1,64 @@ +// Code generated by mockery v2.44.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// API is an autogenerated mock type for the API type +type API struct { + mock.Mock +} + +// DestinationBucket provides a mock function with given fields: +func (_m *API) DestinationBucket() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for DestinationBucket") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// WriteFile provides a mock function with given fields: ctx, key, filePath +func (_m *API) WriteFile(ctx context.Context, key string, filePath string) error { + ret := _m.Called(ctx, key, filePath) + + if len(ret) == 0 { + panic("no return value specified for WriteFile") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, key, filePath) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewAPI creates a new instance of API. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewAPI(t interface { + mock.TestingT + Cleanup(func()) +}) *API { + mock := &API{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/s3/s3.go b/internal/s3/s3.go index c176795..9cd76e6 100644 --- a/internal/s3/s3.go +++ b/internal/s3/s3.go @@ -12,6 +12,12 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3/types" ) +//go:generate mockery --name API --filename s3_api.go +type API interface { + WriteFile(ctx context.Context, key, filePath string) error + DestinationBucket() string +} + type S3Client struct { client *awsS3.Client bucketName string diff --git a/internal/utils/util.go b/internal/utils/util.go index fd2df1c..aaa6313 100755 --- a/internal/utils/util.go +++ b/internal/utils/util.go @@ -15,7 +15,13 @@ package utils -import "strings" +import ( + "compress/gzip" + "fmt" + "io" + "os" + "strings" +) func StringPointer(val string) *string { return &val @@ -40,3 +46,32 @@ func ParseTags(tags *string) map[string]string { } return tagsMap } + +func GzipFileCompression(origFilePath string) (string, error) { + // Open the source file + src, err := os.Open(origFilePath) + if err != nil { + return "", err + } + defer src.Close() + + // Create the destination file with .gz extension + destFilePath := fmt.Sprintf("%s.gz", origFilePath) + dest, err := os.Create(destFilePath) + if err != nil { + return "", err + } + defer dest.Close() + + // Create a new gzip writer + gzipWriter := gzip.NewWriter(dest) + defer gzipWriter.Close() + + // Copy the contents of the source file to the gzip writer + _, err = io.Copy(gzipWriter, src) + if err != nil { + return "", err + } + + return destFilePath, nil +} diff --git a/lambda.go b/lambda.go index 7639ddb..fecf6be 100755 --- a/lambda.go +++ b/lambda.go @@ -20,7 +20,7 @@ import ( "errors" "os" - "github.com/arduino/aws-s3-integration/app/importer" + "github.com/arduino/aws-s3-integration/app/exporter" "github.com/arduino/aws-s3-integration/internal/parameters" "github.com/aws/aws-lambda-go/lambda" "github.com/sirupsen/logrus" @@ -52,6 +52,7 @@ const ( SamplesResoStack = PerStackArduinoPrefix + "/iot/samples-resolution" SchedulingStack = PerStackArduinoPrefix + "/iot/scheduling" DestinationS3BucketStack = PerStackArduinoPrefix + "/destination-bucket" + AggregationStatStack = PerStackArduinoPrefix + "/iot/aggregation-statistic" SamplesResolutionSeconds = 300 DefaultTimeExtractionWindowMinutes = 60 @@ -61,6 +62,7 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err logger := logrus.NewEntry(logrus.New()) stackName := os.Getenv("STACK_NAME") + compressFile := os.Getenv("ENABLE_COMPRESSION") var apikey *string var apiSecret *string @@ -68,6 +70,7 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err var tags *string var orgId *string var err error + var aggregationStat *string logger.Infoln("------ Reading parameters from SSM") paramReader, err := parameters.New() @@ -94,6 +97,7 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err if tagsParam != nil { tags = tagsParam } + aggregationStat, _ = paramReader.ReadConfigByStack(AggregationStatStack, stackName) } else { apikey, err = paramReader.ReadConfig(IoTApiKey) if err != nil { @@ -121,6 +125,10 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err if apikey == nil || apiSecret == nil { return nil, errors.New("key and secret are required") } + if aggregationStat == nil { + avgAggregation := "AVG" + aggregationStat = &avgAggregation + } // Resolve resolution resolution, err := configureExtractionResolution(logger, paramReader, stackName) @@ -140,6 +148,11 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err resolution = &defReso } + enabledCompression := false + if compressFile == "true" { + enabledCompression = true + } + logger.Infoln("------ Running import") if event.Dev || os.Getenv("DEV") == "true" { logger.Infoln("Running in dev mode") @@ -160,9 +173,11 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err } else { logger.Infoln("resolution:", *resolution, "seconds") } + logger.Infoln("aggregation statistic:", *aggregationStat) logger.Infoln("data extraction time windows:", extractionWindowMinutes, "minutes") + logger.Infoln("file compression enabled:", enabledCompression) - err = importer.StartImport(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, *extractionWindowMinutes, *destinationS3Bucket) + err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat, enabledCompression) if err != nil { return nil, err } @@ -193,17 +208,18 @@ func configureExtractionResolution(logger *logrus.Entry, paramReader *parameters return resolution, nil } } + val := SamplesResolutionSeconds switch *res { case "raw": val = -1 - case "1m": + case "1 minute": val = 60 - case "5m": + case "5 minutes": val = 300 - case "15m": + case "15 minutes": val = 900 - case "1h": + case "1 hour": val = 3600 } resolution = &val diff --git a/resources/test/localexecution.go b/resources/test/localexecution.go index 3db78a6..c2ef272 100644 --- a/resources/test/localexecution.go +++ b/resources/test/localexecution.go @@ -5,7 +5,7 @@ import ( "errors" "os" - "github.com/arduino/aws-s3-integration/app/importer" + "github.com/arduino/aws-s3-integration/app/exporter" "github.com/arduino/aws-s3-integration/internal/parameters" "github.com/sirupsen/logrus" ) @@ -89,7 +89,7 @@ func HandleRequest(ctx context.Context, dev bool) (*string, error) { logger.Infoln("tags:", *tags) } - err = importer.StartImport(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket) + err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket, "MAX", true) if err != nil { return nil, err }