From 2ebd408319b59a993a330514202a156b8bae7774 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Thu, 19 Sep 2024 14:46:16 +0200 Subject: [PATCH 1/6] started work --- business/tsextractor/tsextractor.go | 43 ++++++++++++++--------------- internal/iot/client.go | 43 +++++++++++++++++++++++++++++ internal/iot/mocks/iot_api.go | 39 +++++++++++++++++++++++++- 3 files changed, 101 insertions(+), 24 deletions(-) diff --git a/business/tsextractor/tsextractor.go b/business/tsextractor/tsextractor.go index 22c4d62..9745d5c 100755 --- a/business/tsextractor/tsextractor.go +++ b/business/tsextractor/tsextractor.go @@ -97,7 +97,7 @@ func (a *TsExtractor) ExportTSToFile( } for thingID, thing := range thingsMap { - if thing.Properties == nil || len(thing.Properties) == 0 { + if len(thing.Properties) == 0 { a.logger.Warn("Skipping thing with no properties: ", thingID) continue } @@ -105,13 +105,13 @@ func (a *TsExtractor) ExportTSToFile( tokens <- struct{}{} wg.Add(1) - go func(thingID string, thing iotclient.ArduinoThing, writer *csv.CsvWriter) { + go func(thing iotclient.ArduinoThing, writer *csv.CsvWriter) { defer func() { <-tokens }() defer wg.Done() if isRawResolution(resolution) { // Populate raw time series data - err := a.populateRawTSDataIntoS3(ctx, from, to, thingID, thing, writer) + err := a.populateRawTSDataIntoS3(ctx, from, to, thing, writer) if err != nil { a.logger.Error("Error populating raw time series data: ", err) errorChannel <- err @@ -119,7 +119,7 @@ func (a *TsExtractor) ExportTSToFile( } } else { // Populate numeric time series data - err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, aggregationStat, writer) + err := a.populateNumericTSDataIntoS3(ctx, from, to, thing, resolution, aggregationStat, writer) if err != nil { a.logger.Error("Error populating time series data: ", err) errorChannel <- err @@ -127,14 +127,14 @@ func (a *TsExtractor) ExportTSToFile( } // Populate string time series data, if any - err = a.populateStringTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer) + err = a.populateStringTSDataIntoS3(ctx, from, to, thing, resolution, writer) if err != nil { a.logger.Error("Error populating string time series data: ", err) errorChannel <- err return } } - }(thingID, thing, writer) + }(thing, writer) } // Wait for all routines termination @@ -171,7 +171,6 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( ctx context.Context, from time.Time, to time.Time, - thingID string, thing iotclient.ArduinoThing, resolution int, aggregationStat string, @@ -185,12 +184,12 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( var err error var retry bool for i := 0; i < retryCount; i++ { - batched, retry, err = a.iotcl.GetTimeSeriesByThing(ctx, thingID, from, to, int64(resolution), aggregationStat) + batched, retry, err = a.iotcl.GetTimeSeriesByThing(ctx, thing.Id, from, to, int64(resolution), aggregationStat) if !retry { break } else { // This is due to a rate limit on the IoT API, we need to wait a bit before retrying - a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID) + a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thing.Id) randomRateLimitingSleep() } } @@ -206,7 +205,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( } propertyID := strings.Replace(response.Query, "property.", "", 1) - a.logger.Debugf("Thing %s - Property %s - %d values\n", thingID, propertyID, response.CountValues) + a.logger.Debugf("Thing %s - Property %s - %d values\n", thing.Id, propertyID, response.CountValues) sampleCount += response.CountValues propertyName, propertyType := extractPropertyNameAndType(thing, propertyID) @@ -215,7 +214,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( ts := response.Times[i] value := response.Values[i] - samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64), aggregationStat)) + samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64), aggregationStat)) } } @@ -224,7 +223,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( if err := writer.Write(samples); err != nil { return err } - a.logger.Debugf("Thing %s [%s] saved %d values\n", thingID, thing.Name, sampleCount) + a.logger.Debugf("Thing %s [%s] saved %d values\n", thing.Id, thing.Name, sampleCount) } return nil @@ -279,7 +278,6 @@ func (a *TsExtractor) populateStringTSDataIntoS3( ctx context.Context, from time.Time, to time.Time, - thingID string, thing iotclient.ArduinoThing, resolution int, writer *csv.CsvWriter) error { @@ -305,7 +303,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3( break } else { // This is due to a rate limit on the IoT API, we need to wait a bit before retrying - a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID) + a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thing.Id) randomRateLimitingSleep() } } @@ -321,7 +319,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3( } propertyID := strings.Replace(response.Query, "property.", "", 1) - a.logger.Debugf("Thing %s - String Property %s - %d values\n", thingID, propertyID, response.CountValues) + a.logger.Debugf("Thing %s - String Property %s - %d values\n", thing.Id, propertyID, response.CountValues) sampleCount += response.CountValues propertyName, propertyType := extractPropertyNameAndType(thing, propertyID) @@ -333,7 +331,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3( if value == nil { continue } - samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), "")) + samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), "")) } } @@ -342,7 +340,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3( if err := writer.Write(samples); err != nil { return err } - a.logger.Debugf("Thing %s [%s] string properties saved %d values\n", thingID, thing.Name, sampleCount) + a.logger.Debugf("Thing %s [%s] string properties saved %d values\n", thing.Id, thing.Name, sampleCount) } return nil @@ -352,7 +350,6 @@ func (a *TsExtractor) populateRawTSDataIntoS3( ctx context.Context, from time.Time, to time.Time, - thingID string, thing iotclient.ArduinoThing, writer *csv.CsvWriter) error { @@ -360,12 +357,12 @@ func (a *TsExtractor) populateRawTSDataIntoS3( var err error var retry bool for i := 0; i < retryCount; i++ { - batched, retry, err = a.iotcl.GetRawTimeSeriesByThing(ctx, thingID, from, to) + batched, retry, err = a.iotcl.GetRawTimeSeriesByThing(ctx, thing.Id, from, to) if !retry { break } else { // This is due to a rate limit on the IoT API, we need to wait a bit before retrying - a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID) + a.logger.Warnf("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thing.Id) randomRateLimitingSleep() } } @@ -381,7 +378,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3( } propertyID := strings.Replace(response.Query, "property.", "", 1) - a.logger.Debugf("Thing %s - Query %s Property %s - %d values\n", thingID, response.Query, propertyID, response.CountValues) + a.logger.Debugf("Thing %s - Query %s Property %s - %d values\n", thing.Id, response.Query, propertyID, response.CountValues) sampleCount += response.CountValues propertyName, propertyType := extractPropertyNameAndType(thing, propertyID) @@ -393,7 +390,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3( if value == nil { continue } - samples = append(samples, composeRawRow(ts, thingID, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value))) + samples = append(samples, composeRawRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value))) } } @@ -402,7 +399,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3( if err := writer.Write(samples); err != nil { return err } - a.logger.Debugf("Thing %s [%s] raw data saved %d values\n", thingID, thing.Name, sampleCount) + a.logger.Debugf("Thing %s [%s] raw data saved %d values\n", thing.Id, thing.Name, sampleCount) } return nil diff --git a/internal/iot/client.go b/internal/iot/client.go index 0c61f23..28871ef 100755 --- a/internal/iot/client.go +++ b/internal/iot/client.go @@ -31,6 +31,7 @@ type API interface { GetTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time, interval int64, aggregationStat string) (*iotclient.ArduinoSeriesBatch, bool, error) GetTimeSeriesStringSampling(ctx context.Context, properties []string, from, to time.Time, interval int32) (*iotclient.ArduinoSeriesBatchSampled, bool, error) GetRawTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time) (*iotclient.ArduinoSeriesRawBatch, bool, error) + GetPropertiesLastValue(ctx context.Context, properties []string, thingId string) (*iotclient.ArduinoSeriesRawBatchLastvalue, bool, error) } // Client can perform actions on Arduino IoT Cloud. @@ -232,3 +233,45 @@ func (cl *Client) GetRawTimeSeriesByThing(ctx context.Context, thingID string, f } return ts, false, nil } + +func (cl *Client) GetPropertiesLastValue(ctx context.Context, properties []string, thingId string) (*iotclient.ArduinoSeriesRawBatchLastvalue, bool, error) { + if len(properties) == 0 { + return nil, false, fmt.Errorf("no properties provided") + } + + ctx, err := ctxWithToken(ctx, cl.token) + if err != nil { + return nil, false, err + } + + requests := make([]iotclient.BatchQueryRawLastValueRequestMediaV1, 0, len(properties)) + for _, prop := range properties { + if prop == "" { + continue + } + requests = append(requests, iotclient.BatchQueryRawLastValueRequestMediaV1{ + PropertyId: prop, + ThingId: thingId, + }) + } + + if len(requests) == 0 { + return nil, false, fmt.Errorf("no valid properties provided") + } + + batchQueryRequestsMediaV1 := iotclient.BatchLastValueRequestsMediaV1{ + Requests: requests, + } + + request := cl.api.SeriesV2Api.SeriesV2BatchQueryRawLastValue(ctx) + request = request.BatchLastValueRequestsMediaV1(batchQueryRequestsMediaV1) + ts, httpResponse, err := cl.api.SeriesV2Api.SeriesV2BatchQueryRawLastValueExecute(request) + if err != nil { + err = fmt.Errorf("retrieving time series last value: %w", errorDetail(err)) + if httpResponse != nil && httpResponse.StatusCode == 429 { // Retry if rate limited + return nil, true, err + } + return nil, false, err + } + return ts, false, nil +} diff --git a/internal/iot/mocks/iot_api.go b/internal/iot/mocks/iot_api.go index 28374ae..0635314 100644 --- a/internal/iot/mocks/iot_api.go +++ b/internal/iot/mocks/iot_api.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.44.1. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package mocks @@ -17,6 +17,43 @@ type API struct { mock.Mock } +// GetPropertiesLastValue provides a mock function with given fields: ctx, properties, thingId +func (_m *API) GetPropertiesLastValue(ctx context.Context, properties []string, thingId string) (*iot.ArduinoSeriesRawBatchLastvalue, bool, error) { + ret := _m.Called(ctx, properties, thingId) + + if len(ret) == 0 { + panic("no return value specified for GetPropertiesLastValue") + } + + var r0 *iot.ArduinoSeriesRawBatchLastvalue + var r1 bool + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, []string, string) (*iot.ArduinoSeriesRawBatchLastvalue, bool, error)); ok { + return rf(ctx, properties, thingId) + } + if rf, ok := ret.Get(0).(func(context.Context, []string, string) *iot.ArduinoSeriesRawBatchLastvalue); ok { + r0 = rf(ctx, properties, thingId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*iot.ArduinoSeriesRawBatchLastvalue) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []string, string) bool); ok { + r1 = rf(ctx, properties, thingId) + } else { + r1 = ret.Get(1).(bool) + } + + if rf, ok := ret.Get(2).(func(context.Context, []string, string) error); ok { + r2 = rf(ctx, properties, thingId) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // GetRawTimeSeriesByThing provides a mock function with given fields: ctx, thingID, from, to func (_m *API) GetRawTimeSeriesByThing(ctx context.Context, thingID string, from time.Time, to time.Time) (*iot.ArduinoSeriesRawBatch, bool, error) { ret := _m.Called(ctx, thingID, from, to) From a0f62cb313e65647e3054569963484eb09443a2c Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Thu, 19 Sep 2024 15:23:30 +0200 Subject: [PATCH 2/6] populate last value --- business/tsextractor/tsextractor.go | 110 +++++++++++++++++++---- business/tsextractor/tsextractor_test.go | 2 +- internal/iot/client.go | 47 +--------- internal/iot/mocks/iot_api.go | 37 -------- 4 files changed, 94 insertions(+), 102 deletions(-) diff --git a/business/tsextractor/tsextractor.go b/business/tsextractor/tsextractor.go index 9745d5c..5aef464 100755 --- a/business/tsextractor/tsextractor.go +++ b/business/tsextractor/tsextractor.go @@ -20,6 +20,7 @@ import ( "encoding/json" "errors" "fmt" + "slices" "strconv" "strings" "sync" @@ -95,10 +96,10 @@ func (a *TsExtractor) ExportTSToFile( } else { a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: ", aggregationStat) } - for thingID, thing := range thingsMap { + for _, thing := range thingsMap { if len(thing.Properties) == 0 { - a.logger.Warn("Skipping thing with no properties: ", thingID) + a.logger.Warn("Skipping thing with no properties: ", thing.Id) continue } @@ -109,31 +110,51 @@ func (a *TsExtractor) ExportTSToFile( defer func() { <-tokens }() defer wg.Done() - if isRawResolution(resolution) { + detectedProperties := []string{} + isRaw := isRawResolution(resolution) + if isRaw { // Populate raw time series data - err := a.populateRawTSDataIntoS3(ctx, from, to, thing, writer) + populatedProperties, err := a.populateRawTSDataIntoS3(ctx, from, to, thing, writer) if err != nil { a.logger.Error("Error populating raw time series data: ", err) errorChannel <- err return } + if len(populatedProperties) > 0 { + detectedProperties = append(detectedProperties, populatedProperties...) + } } else { // Populate numeric time series data - err := a.populateNumericTSDataIntoS3(ctx, from, to, thing, resolution, aggregationStat, writer) + populatedProperties, err := a.populateNumericTSDataIntoS3(ctx, from, to, thing, resolution, aggregationStat, writer) if err != nil { a.logger.Error("Error populating time series data: ", err) errorChannel <- err return } + if len(populatedProperties) > 0 { + detectedProperties = append(detectedProperties, populatedProperties...) + } // Populate string time series data, if any - err = a.populateStringTSDataIntoS3(ctx, from, to, thing, resolution, writer) + populatedProperties, err = a.populateStringTSDataIntoS3(ctx, from, to, thing, resolution, writer) if err != nil { a.logger.Error("Error populating string time series data: ", err) errorChannel <- err return } + if len(populatedProperties) > 0 { + detectedProperties = append(detectedProperties, populatedProperties...) + } } + + // Populate last value samples for ON_CHANGE properties, if needed + err = a.populateLastValueSamplesForOnChangeProperties(isRaw, thing, detectedProperties, writer) + if err != nil { + a.logger.Error("Error populating last value data: ", err) + errorChannel <- err + return + } + }(thing, writer) } @@ -174,12 +195,13 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( thing iotclient.ArduinoThing, resolution int, aggregationStat string, - writer *csv.CsvWriter) error { + writer *csv.CsvWriter) ([]string, error) { if resolution <= 60 { resolution = 60 } + populatedProperties := []string{} var batched *iotclient.ArduinoSeriesBatch var err error var retry bool @@ -194,7 +216,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( } } if err != nil { - return err + return nil, err } sampleCount := int64(0) @@ -214,6 +236,9 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( ts := response.Times[i] value := response.Values[i] + if !slices.Contains(populatedProperties, propertyID) { + populatedProperties = append(populatedProperties, propertyID) + } samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64), aggregationStat)) } } @@ -221,12 +246,12 @@ func (a *TsExtractor) populateNumericTSDataIntoS3( // Write samples to csv ouput file if len(samples) > 0 { if err := writer.Write(samples); err != nil { - return err + return nil, err } a.logger.Debugf("Thing %s [%s] saved %d values\n", thing.Id, thing.Name, sampleCount) } - return nil + return populatedProperties, nil } func composeRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string, aggregation string) []string { @@ -280,7 +305,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3( to time.Time, thing iotclient.ArduinoThing, resolution int, - writer *csv.CsvWriter) error { + writer *csv.CsvWriter) ([]string, error) { // Filter properties by char type stringProperties := []string{} @@ -291,9 +316,10 @@ func (a *TsExtractor) populateStringTSDataIntoS3( } if len(stringProperties) == 0 { - return nil + return nil, nil } + populatedProperties := []string{} var batched *iotclient.ArduinoSeriesBatchSampled var err error var retry bool @@ -308,7 +334,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3( } } if err != nil { - return err + return nil, err } sampleCount := int64(0) @@ -331,6 +357,9 @@ func (a *TsExtractor) populateStringTSDataIntoS3( if value == nil { continue } + if !slices.Contains(populatedProperties, propertyID) { + populatedProperties = append(populatedProperties, propertyID) + } samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), "")) } } @@ -338,12 +367,12 @@ func (a *TsExtractor) populateStringTSDataIntoS3( // Write samples to csv ouput file if len(samples) > 0 { if err := writer.Write(samples); err != nil { - return err + return nil, err } a.logger.Debugf("Thing %s [%s] string properties saved %d values\n", thing.Id, thing.Name, sampleCount) } - return nil + return populatedProperties, nil } func (a *TsExtractor) populateRawTSDataIntoS3( @@ -351,8 +380,9 @@ func (a *TsExtractor) populateRawTSDataIntoS3( from time.Time, to time.Time, thing iotclient.ArduinoThing, - writer *csv.CsvWriter) error { + writer *csv.CsvWriter) ([]string, error) { + populatedProperties := []string{} var batched *iotclient.ArduinoSeriesRawBatch var err error var retry bool @@ -367,7 +397,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3( } } if err != nil { - return err + return nil, err } sampleCount := int64(0) @@ -390,6 +420,9 @@ func (a *TsExtractor) populateRawTSDataIntoS3( if value == nil { continue } + if !slices.Contains(populatedProperties, propertyID) { + populatedProperties = append(populatedProperties, propertyID) + } samples = append(samples, composeRawRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value))) } } @@ -397,12 +430,12 @@ func (a *TsExtractor) populateRawTSDataIntoS3( // Write samples to csv ouput file if len(samples) > 0 { if err := writer.Write(samples); err != nil { - return err + return nil, err } a.logger.Debugf("Thing %s [%s] raw data saved %d values\n", thing.Id, thing.Name, sampleCount) } - return nil + return populatedProperties, nil } func (a *TsExtractor) interfaceToString(value interface{}) string { @@ -426,3 +459,42 @@ func (a *TsExtractor) interfaceToString(value interface{}) string { return fmt.Sprintf("%v", v) } } + +func (a *TsExtractor) populateLastValueSamplesForOnChangeProperties( + isRaw bool, + thing iotclient.ArduinoThing, + propertiesWithExtractedValue []string, + writer *csv.CsvWriter) error { + + // Check if there are ON_CHANGE properties + if len(thing.Properties) == 0 { + return nil + } + samples := [][]string{} + sampleCount := 0 + for _, prop := range thing.Properties { + if prop.UpdateStrategy == "ON_CHANGE" && !slices.Contains(propertiesWithExtractedValue, prop.Id) { + if prop.ValueUpdatedAt == nil { + continue + } + var toAdd []string + if isRaw { + toAdd = composeRawRow(*prop.ValueUpdatedAt, thing.Id, thing.Name, prop.Id, prop.Name, prop.Type, a.interfaceToString(prop.LastValue)) + } else { + toAdd = composeRow(*prop.ValueUpdatedAt, thing.Id, thing.Name, prop.Id, prop.Name, prop.Type, a.interfaceToString(prop.LastValue), "LAST_VALUE") + } + samples = append(samples, toAdd) + sampleCount++ + } + } + + // Write samples to csv ouput file + if len(samples) > 0 { + if err := writer.Write(samples); err != nil { + return err + } + a.logger.Debugf("Thing %s [%s] last value data saved %d values\n", thing.Id, thing.Name, sampleCount) + } + + return nil +} diff --git a/business/tsextractor/tsextractor_test.go b/business/tsextractor/tsextractor_test.go index e4feedf..353fe39 100644 --- a/business/tsextractor/tsextractor_test.go +++ b/business/tsextractor/tsextractor_test.go @@ -180,7 +180,7 @@ func TestExtractionFlow_rawResolution(t *testing.T) { Times: []time.Time{now.Add(-time.Minute * 2), now.Add(-time.Minute * 1), now}, Values: []any{"a", "b", "c"}, CountValues: 3, - }, + }, } samples := iotclient.ArduinoSeriesRawBatch{ Responses: responses, diff --git a/internal/iot/client.go b/internal/iot/client.go index 28871ef..23a7934 100755 --- a/internal/iot/client.go +++ b/internal/iot/client.go @@ -31,7 +31,6 @@ type API interface { GetTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time, interval int64, aggregationStat string) (*iotclient.ArduinoSeriesBatch, bool, error) GetTimeSeriesStringSampling(ctx context.Context, properties []string, from, to time.Time, interval int32) (*iotclient.ArduinoSeriesBatchSampled, bool, error) GetRawTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time) (*iotclient.ArduinoSeriesRawBatch, bool, error) - GetPropertiesLastValue(ctx context.Context, properties []string, thingId string) (*iotclient.ArduinoSeriesRawBatchLastvalue, bool, error) } // Client can perform actions on Arduino IoT Cloud. @@ -74,14 +73,14 @@ func (cl *Client) setup(client, secret, organizationId string) error { } // ThingList returns a list of things on Arduino IoT Cloud. -func (cl *Client) ThingList(ctx context.Context, ids []string, device *string, props bool, tags map[string]string) ([]iotclient.ArduinoThing, error) { +func (cl *Client) ThingList(ctx context.Context, ids []string, device *string, showProperties bool, tags map[string]string) ([]iotclient.ArduinoThing, error) { ctx, err := ctxWithToken(ctx, cl.token) if err != nil { return nil, err } request := cl.api.ThingsV2Api.ThingsV2List(ctx) - request = request.ShowProperties(props) + request = request.ShowProperties(showProperties) if ids != nil { request = request.Ids(ids) @@ -233,45 +232,3 @@ func (cl *Client) GetRawTimeSeriesByThing(ctx context.Context, thingID string, f } return ts, false, nil } - -func (cl *Client) GetPropertiesLastValue(ctx context.Context, properties []string, thingId string) (*iotclient.ArduinoSeriesRawBatchLastvalue, bool, error) { - if len(properties) == 0 { - return nil, false, fmt.Errorf("no properties provided") - } - - ctx, err := ctxWithToken(ctx, cl.token) - if err != nil { - return nil, false, err - } - - requests := make([]iotclient.BatchQueryRawLastValueRequestMediaV1, 0, len(properties)) - for _, prop := range properties { - if prop == "" { - continue - } - requests = append(requests, iotclient.BatchQueryRawLastValueRequestMediaV1{ - PropertyId: prop, - ThingId: thingId, - }) - } - - if len(requests) == 0 { - return nil, false, fmt.Errorf("no valid properties provided") - } - - batchQueryRequestsMediaV1 := iotclient.BatchLastValueRequestsMediaV1{ - Requests: requests, - } - - request := cl.api.SeriesV2Api.SeriesV2BatchQueryRawLastValue(ctx) - request = request.BatchLastValueRequestsMediaV1(batchQueryRequestsMediaV1) - ts, httpResponse, err := cl.api.SeriesV2Api.SeriesV2BatchQueryRawLastValueExecute(request) - if err != nil { - err = fmt.Errorf("retrieving time series last value: %w", errorDetail(err)) - if httpResponse != nil && httpResponse.StatusCode == 429 { // Retry if rate limited - return nil, true, err - } - return nil, false, err - } - return ts, false, nil -} diff --git a/internal/iot/mocks/iot_api.go b/internal/iot/mocks/iot_api.go index 0635314..844f912 100644 --- a/internal/iot/mocks/iot_api.go +++ b/internal/iot/mocks/iot_api.go @@ -17,43 +17,6 @@ type API struct { mock.Mock } -// GetPropertiesLastValue provides a mock function with given fields: ctx, properties, thingId -func (_m *API) GetPropertiesLastValue(ctx context.Context, properties []string, thingId string) (*iot.ArduinoSeriesRawBatchLastvalue, bool, error) { - ret := _m.Called(ctx, properties, thingId) - - if len(ret) == 0 { - panic("no return value specified for GetPropertiesLastValue") - } - - var r0 *iot.ArduinoSeriesRawBatchLastvalue - var r1 bool - var r2 error - if rf, ok := ret.Get(0).(func(context.Context, []string, string) (*iot.ArduinoSeriesRawBatchLastvalue, bool, error)); ok { - return rf(ctx, properties, thingId) - } - if rf, ok := ret.Get(0).(func(context.Context, []string, string) *iot.ArduinoSeriesRawBatchLastvalue); ok { - r0 = rf(ctx, properties, thingId) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*iot.ArduinoSeriesRawBatchLastvalue) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, []string, string) bool); ok { - r1 = rf(ctx, properties, thingId) - } else { - r1 = ret.Get(1).(bool) - } - - if rf, ok := ret.Get(2).(func(context.Context, []string, string) error); ok { - r2 = rf(ctx, properties, thingId) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - // GetRawTimeSeriesByThing provides a mock function with given fields: ctx, thingID, from, to func (_m *API) GetRawTimeSeriesByThing(ctx context.Context, thingID string, from time.Time, to time.Time) (*iot.ArduinoSeriesRawBatch, bool, error) { ret := _m.Called(ctx, thingID, from, to) From a7906033bc9ad47f29389d14b53c251f6ffb46e5 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Thu, 19 Sep 2024 15:28:32 +0200 Subject: [PATCH 3/6] Added last value --- business/tsextractor/tsextractor_test.go | 26 ++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/business/tsextractor/tsextractor_test.go b/business/tsextractor/tsextractor_test.go index 353fe39..ebd23aa 100644 --- a/business/tsextractor/tsextractor_test.go +++ b/business/tsextractor/tsextractor_test.go @@ -70,6 +70,7 @@ func TestExtractionFlow_defaultAggregation(t *testing.T) { thingId := "91f30213-2bd7-480a-b1dc-f31b01840e7e" propertyId := "c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac" propertyStringId := "a86f4ed9-7f52-4bd3-bdc6-b2936bec68bb" + propertyIdOnChange := "b77f4ed5-7f52-4bd3-bdc6-b2936bec12de" // Init client iotcl := iotMocks.NewAPI(t) @@ -106,7 +107,8 @@ func TestExtractionFlow_defaultAggregation(t *testing.T) { tsextractorClient := New(iotcl, logger) - propCount := int64(2) + lastValueTime := now.Add(-time.Minute * 1) + propCount := int64(3) thingsMap := make(map[string]iotclient.ArduinoThing) thingsMap[thingId] = iotclient.ArduinoThing{ Id: thingId, @@ -122,6 +124,14 @@ func TestExtractionFlow_defaultAggregation(t *testing.T) { Id: propertyStringId, Type: "CHARSTRING", }, + { + Name: "pOnChange", + Id: propertyIdOnChange, + Type: "FLOAT", + UpdateStrategy: "ON_CHANGE", + LastValue: 2.34, + ValueUpdatedAt: &lastValueTime, + }, }, PropertiesCount: &propCount, } @@ -149,6 +159,7 @@ func TestExtractionFlow_defaultAggregation(t *testing.T) { "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,a86f4ed9-7f52-4bd3-bdc6-b2936bec68bb,pstringVar,CHARSTRING,a,", "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,a86f4ed9-7f52-4bd3-bdc6-b2936bec68bb,pstringVar,CHARSTRING,b,", "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,a86f4ed9-7f52-4bd3-bdc6-b2936bec68bb,pstringVar,CHARSTRING,c,", + "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,b77f4ed5-7f52-4bd3-bdc6-b2936bec12de,pOnChange,FLOAT,2.34,LAST_VALUE", } for _, entry := range entries { assert.Contains(t, string(content), entry) @@ -162,6 +173,7 @@ func TestExtractionFlow_rawResolution(t *testing.T) { thingId := "91f30213-2bd7-480a-b1dc-f31b01840e7e" propertyId := "c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac" propertyStringId := "a86f4ed9-7f52-4bd3-bdc6-b2936bec68bb" + propertyIdOnChange := "b77f4ed5-7f52-4bd3-bdc6-b2936bec12de" // Init client iotcl := iotMocks.NewAPI(t) @@ -189,7 +201,8 @@ func TestExtractionFlow_rawResolution(t *testing.T) { tsextractorClient := New(iotcl, logger) - propCount := int64(2) + lastValueTime := now.Add(-time.Minute * 1) + propCount := int64(3) thingsMap := make(map[string]iotclient.ArduinoThing) thingsMap[thingId] = iotclient.ArduinoThing{ Id: thingId, @@ -205,6 +218,14 @@ func TestExtractionFlow_rawResolution(t *testing.T) { Id: propertyStringId, Type: "CHARSTRING", }, + { + Name: "pOnChange", + Id: propertyIdOnChange, + Type: "FLOAT", + UpdateStrategy: "ON_CHANGE", + LastValue: 2.34, + ValueUpdatedAt: &lastValueTime, + }, }, PropertiesCount: &propCount, } @@ -232,6 +253,7 @@ func TestExtractionFlow_rawResolution(t *testing.T) { "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,a86f4ed9-7f52-4bd3-bdc6-b2936bec68bb,pstringVar,CHARSTRING,a", "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,a86f4ed9-7f52-4bd3-bdc6-b2936bec68bb,pstringVar,CHARSTRING,b", "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,a86f4ed9-7f52-4bd3-bdc6-b2936bec68bb,pstringVar,CHARSTRING,c", + "91f30213-2bd7-480a-b1dc-f31b01840e7e,test,b77f4ed5-7f52-4bd3-bdc6-b2936bec12de,pOnChange,FLOAT,2.34", } for _, entry := range entries { assert.Contains(t, string(content), entry) From 3a1a2258030126c0d7001c8adbfab094467ed9a2 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Thu, 19 Sep 2024 15:32:10 +0200 Subject: [PATCH 4/6] Dependency update --- go.mod | 20 ++++++++++---------- go.sum | 40 ++++++++++++++++++++-------------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 5201961..048eab9 100755 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( github.com/arduino/iot-client-go/v2 v2.0.4 github.com/aws/aws-lambda-go v1.47.0 github.com/aws/aws-sdk-go-v2 v1.30.5 - github.com/aws/aws-sdk-go-v2/config v1.27.11 - github.com/aws/aws-sdk-go-v2/service/s3 v1.61.1 - github.com/aws/aws-sdk-go-v2/service/ssm v1.50.1 + github.com/aws/aws-sdk-go-v2/config v1.27.35 + github.com/aws/aws-sdk-go-v2/service/s3 v1.62.0 + github.com/aws/aws-sdk-go-v2/service/ssm v1.53.0 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 golang.org/x/oauth2 v0.21.0 @@ -16,24 +16,24 @@ require ( require ( github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.17.11 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.33 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.22.8 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.8 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.30.8 // indirect github.com/aws/smithy-go v1.20.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - golang.org/x/sys v0.20.0 // indirect + golang.org/x/sys v0.25.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index e18e4d8..8d4f974 100755 --- a/go.sum +++ b/go.sum @@ -6,18 +6,18 @@ github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDy github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw= -github.com/aws/aws-sdk-go-v2/config v1.27.11 h1:f47rANd2LQEYHda2ddSCKYId18/8BhSRM4BULGmfgNA= -github.com/aws/aws-sdk-go-v2/config v1.27.11/go.mod h1:SMsV78RIOYdve1vf36z8LmnszlRWkwMQtomCAI0/mIE= -github.com/aws/aws-sdk-go-v2/credentials v1.17.11 h1:YuIB1dJNf1Re822rriUOTxopaHHvIq0l/pX3fwO+Tzs= -github.com/aws/aws-sdk-go-v2/credentials v1.17.11/go.mod h1:AQtFPsDH9bI2O+71anW6EKL+NcD7LG3dpKGMV4SShgo= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= +github.com/aws/aws-sdk-go-v2/config v1.27.35 h1:jeFgiWYNV0vrgdZqB4kZBjYNdy0IKkwrAjr2fwpHIig= +github.com/aws/aws-sdk-go-v2/config v1.27.35/go.mod h1:qnpEvTq8ZfjrCqmJGRfWZuF+lGZ/vG8LK2K0L/TY1gQ= +github.com/aws/aws-sdk-go-v2/credentials v1.17.33 h1:lBHAQQznENv0gLHAZ73ONiTSkCtr8q3pSqWrpbBBZz0= +github.com/aws/aws-sdk-go-v2/credentials v1.17.33/go.mod h1:MBuqCUOT3ChfLuxNDGyra67eskx7ge9e3YKYBce7wpI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 h1:pfQ2sqNpMVK6xz2RbqLEL0GH87JOwSxPV2rzm8Zsb74= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13/go.mod h1:NG7RXPUlqfsCLLFfi0+IpKN4sCB9D9fw/qTaSB+xRoU= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 h1:pI7Bzt0BJtYA0N/JEC6B8fJ4RBrEMi1LBrkMdFYNSnQ= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17/go.mod h1:Dh5zzJYMtxfIjYW+/evjQ8uj2OyR/ve2KROHGHlSFqE= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 h1:Mqr/V5gvrhA2gvgnF42Zh5iMiQNcOYthFYwCyrnuWlc= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17/go.mod h1:aLJpZlCmjE+V+KtN1q1uyZkfnUWpQGpbsn89XPKyzfU= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 h1:Roo69qTpfu8OlJ2Tb7pAYVuF0CpuUMB0IYWwYP/4DZM= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17/go.mod h1:NcWPxQzGM1USQggaTVwz6VpqMZPX1CvDJLDh6jnOCa4= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI= @@ -28,16 +28,16 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 h1:rfprUlsd github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19/go.mod h1:SCWkEdRq8/7EK60NcvvQ6NXKuTcchAD4ROAsC37VEZE= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 h1:u+EfGmksnJc/x5tq3A+OD7LrMbSSR/5TrKLvkdy/fhY= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17/go.mod h1:VaMx6302JHax2vHJWgRo+5n9zvbacs3bLU/23DNQrTY= -github.com/aws/aws-sdk-go-v2/service/s3 v1.61.1 h1:6ZRIbdMbN83W2/EIAU5z8FQZpmuULsBojTaok+uBEIg= -github.com/aws/aws-sdk-go-v2/service/s3 v1.61.1/go.mod h1:5FmD/Dqq57gP+XwaUnd5WFPipAuzrf0HmupX27Gvjvc= -github.com/aws/aws-sdk-go-v2/service/ssm v1.50.1 h1:vgpeoBRWw22qcb1xo3eJFkuulwPI4E/xQgIGi0gtVUs= -github.com/aws/aws-sdk-go-v2/service/ssm v1.50.1/go.mod h1:Ebk/HZmGhxWKDVxM4+pwbxGjm3RQOQLMjAEosI3ss9Q= -github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 h1:vN8hEbpRnL7+Hopy9dzmRle1xmDc7o8tmY0klsr175w= -github.com/aws/aws-sdk-go-v2/service/sso v1.20.5/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2KCuY61kzoCpvtvJJBtOE= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= -github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= -github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= +github.com/aws/aws-sdk-go-v2/service/s3 v1.62.0 h1:rd/aA3iDq1q7YsL5sc4dEwChutH7OZF9Ihfst6pXQzI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.62.0/go.mod h1:5FmD/Dqq57gP+XwaUnd5WFPipAuzrf0HmupX27Gvjvc= +github.com/aws/aws-sdk-go-v2/service/ssm v1.53.0 h1:+btWuHF/6IuNrGgSZTWW4zs3Xz22/1xiv6LDhw10Xao= +github.com/aws/aws-sdk-go-v2/service/ssm v1.53.0/go.mod h1:nUSNPaG8mv5rIu7EclHnFqZOjhreEUwRKENtKTtJ9aw= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.8 h1:JRwuL+S1Qe1owZQoxblV7ORgRf2o0SrtzDVIbaVCdQ0= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.8/go.mod h1:eEygMHnTKH/3kNp9Jr1n3PdejuSNcgwLe1dWgQtO0VQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.8 h1:+HpGETD9463PFSj7lX5+eq7aLDs85QUIA+NBkeAsscA= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.8/go.mod h1:bCbAxKDqNvkHxRaIMnyVPXPo+OaPRwvmgzMxbz1VKSA= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.8 h1:bAi+4p5EKnni+jrfcAhb7iHFQ24bthOAV9t0taf3DCE= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.8/go.mod h1:NXi1dIAGteSaRLqYgarlhP/Ij0cFT+qmCwiJqWh/U5o= github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -62,8 +62,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8 golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= From c4586d108655fde921acf5acf43821ca653d84f2 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Thu, 19 Sep 2024 15:49:22 +0200 Subject: [PATCH 5/6] Removing not allowed properties --- business/tsextractor/tsextractor.go | 13 +- business/tsextractor/tsextractor_test.go | 30 ++- internal/iot/types.go | 284 +++++++++++++++++++++++ 3 files changed, 313 insertions(+), 14 deletions(-) create mode 100644 internal/iot/types.go diff --git a/business/tsextractor/tsextractor.go b/business/tsextractor/tsextractor.go index 5aef464..f5d682f 100755 --- a/business/tsextractor/tsextractor.go +++ b/business/tsextractor/tsextractor.go @@ -296,7 +296,7 @@ func extractPropertyNameAndType(thing iotclient.ArduinoThing, propertyID string) } func isStringProperty(ptype string) bool { - return ptype == "CHARSTRING" || ptype == "LOCATION" + return iot.IsPropertyString(ptype) || iot.IsPropertyLocation(ptype) } func (a *TsExtractor) populateStringTSDataIntoS3( @@ -460,6 +460,10 @@ func (a *TsExtractor) interfaceToString(value interface{}) string { } } +func isLastValueAllowedProperty(prop iotclient.ArduinoProperty) bool { + return prop.UpdateStrategy == "ON_CHANGE" && (isStringProperty(prop.Type) || iot.IsPropertyBool(prop.Type) || iot.IsPropertyNumberType(prop.Type)) +} + func (a *TsExtractor) populateLastValueSamplesForOnChangeProperties( isRaw bool, thing iotclient.ArduinoThing, @@ -473,15 +477,16 @@ func (a *TsExtractor) populateLastValueSamplesForOnChangeProperties( samples := [][]string{} sampleCount := 0 for _, prop := range thing.Properties { - if prop.UpdateStrategy == "ON_CHANGE" && !slices.Contains(propertiesWithExtractedValue, prop.Id) { + if isLastValueAllowedProperty(prop) && !slices.Contains(propertiesWithExtractedValue, prop.Id) { if prop.ValueUpdatedAt == nil { continue } + propName, propType := extractPropertyNameAndType(thing, prop.Id) var toAdd []string if isRaw { - toAdd = composeRawRow(*prop.ValueUpdatedAt, thing.Id, thing.Name, prop.Id, prop.Name, prop.Type, a.interfaceToString(prop.LastValue)) + toAdd = composeRawRow(*prop.ValueUpdatedAt, thing.Id, thing.Name, prop.Id, propName, propType, a.interfaceToString(prop.LastValue)) } else { - toAdd = composeRow(*prop.ValueUpdatedAt, thing.Id, thing.Name, prop.Id, prop.Name, prop.Type, a.interfaceToString(prop.LastValue), "LAST_VALUE") + toAdd = composeRow(*prop.ValueUpdatedAt, thing.Id, thing.Name, prop.Id, propName, propType, a.interfaceToString(prop.LastValue), "LAST_VALUE") } samples = append(samples, toAdd) sampleCount++ diff --git a/business/tsextractor/tsextractor_test.go b/business/tsextractor/tsextractor_test.go index ebd23aa..cee0ebf 100644 --- a/business/tsextractor/tsextractor_test.go +++ b/business/tsextractor/tsextractor_test.go @@ -125,13 +125,13 @@ func TestExtractionFlow_defaultAggregation(t *testing.T) { Type: "CHARSTRING", }, { - Name: "pOnChange", - Id: propertyIdOnChange, - Type: "FLOAT", + Name: "pOnChange", + Id: propertyIdOnChange, + Type: "FLOAT", UpdateStrategy: "ON_CHANGE", - LastValue: 2.34, + LastValue: 2.34, ValueUpdatedAt: &lastValueTime, - }, + }, }, PropertiesCount: &propCount, } @@ -174,6 +174,7 @@ func TestExtractionFlow_rawResolution(t *testing.T) { propertyId := "c86f4ed9-7f52-4bd3-bdc6-b2936bec68ac" propertyStringId := "a86f4ed9-7f52-4bd3-bdc6-b2936bec68bb" propertyIdOnChange := "b77f4ed5-7f52-4bd3-bdc6-b2936bec12de" + propertyIdNotImport := "b88f4ed5-7f52-4bd3-bdc6-b2936bec12de" // Init client iotcl := iotMocks.NewAPI(t) @@ -219,13 +220,21 @@ func TestExtractionFlow_rawResolution(t *testing.T) { Type: "CHARSTRING", }, { - Name: "pOnChange", - Id: propertyIdOnChange, - Type: "FLOAT", + Name: "pOnChange", + Id: propertyIdOnChange, + Type: "FLOAT", + UpdateStrategy: "ON_CHANGE", + LastValue: 2.34, + ValueUpdatedAt: &lastValueTime, + }, + { + Name: "pNOIMPORT", + Id: propertyIdNotImport, + Type: "SCHEDULER", UpdateStrategy: "ON_CHANGE", - LastValue: 2.34, + LastValue: "{}", ValueUpdatedAt: &lastValueTime, - }, + }, }, PropertiesCount: &propCount, } @@ -257,5 +266,6 @@ func TestExtractionFlow_rawResolution(t *testing.T) { } for _, entry := range entries { assert.Contains(t, string(content), entry) + assert.NotContains(t, string(content), "pNOIMPORT") } } diff --git a/internal/iot/types.go b/internal/iot/types.go new file mode 100644 index 0000000..ab9bdb5 --- /dev/null +++ b/internal/iot/types.go @@ -0,0 +1,284 @@ +// This file is part of arduino aws-sitewise-integration. +// +// Copyright 2024 ARDUINO SA (http://www.arduino.cc/) +// +// This software is released under the Mozilla Public License Version 2.0, +// which covers the main part of aws-sitewise-integration. +// The terms of this license can be found at: +// https://www.mozilla.org/media/MPL/2.0/index.815ca599c9df.txt +// +// You can be released from the requirements of the above licenses by purchasing +// a commercial license. Buying such a license is mandatory if you want to +// modify or otherwise use the software for commercial activities involving the +// Arduino software without disclosing the source code of your own applications. +// To purchase a commercial license, send an email to license@arduino.cc. + +package iot + +type Type string + +const ( + Analog Type = "ANALOG" + CharString Type = "CHARSTRING" + Float Type = "FLOAT" + Int Type = "INT" + LenghtC Type = "LENGHT_C" + LenghtI Type = "LENGHT_I" + LenghtM Type = "LENGHT_M" + Percentage Type = "PERCENTAGE" + Status Type = "STATUS" + TemperatureC Type = "TEMPERATURE_C" + TemperatureF Type = "TEMPERATURE_F" + Meter Type = "METER" + Kilogram Type = "KILOGRAM" + Gram Type = "GRAM" + Second Type = "SECOND" + Ampere Type = "AMPERE" + Kelvin Type = "KELVIN" + Candela Type = "CANDELA" + Mole Type = "MOLE" + Hertz Type = "HERTZ" + Radian Type = "RADIAN" + Steradian Type = "STERADIAN" + Newton Type = "NEWTON" + Pascal Type = "PASCAL" + Joule Type = "JOULE" + Watt Type = "WATT" + Coulomb Type = "COULOMB" + Volt Type = "VOLT" + Farad Type = "FARAD" + Ohm Type = "OHM" + Siemens Type = "SIEMENS" + Weber Type = "WEBER" + Tesla Type = "TESLA" + Henry Type = "HENRY" + DegreesCelsius Type = "DEGREES_CELSIUS" + Lumen Type = "LUMEN" + Lux Type = "LUX" + Becquerel Type = "BECQUEREL" + Gray Type = "GRAY" + Sievert Type = "SIEVERT" + Katal Type = "KATAL" + SquareMeter Type = "SQUARE_METER" + CubicMeter Type = "CUBIC_METER" + Liter Type = "LITER" + MeterPerSecond Type = "METER_PER_SECOND" + MeterPerSquareSecond Type = "METER_PER_SQUARE_SECOND" + CubicMeterPerSecond Type = "CUBIC_METER_PER_SECOND" + LiterPerSecond Type = "LITER_PER_SECOND" + WattPerSquareMeter Type = "WATT_PER_SQUARE_METER" + CandelaPerSquareMeter Type = "CANDELA_PER_SQUARE_METER" + Bit Type = "BIT" + BitPerSecond Type = "BIT_PER_SECOND" + DegreesLatitude Type = "DEGREES_LATITUDE" + DegreesLongitude Type = "DEGREES_LONGITUDE" + PhValue Type = "PH_VALUE" + Decibel Type = "DECIBEL" + Decibel1w Type = "DECIBEL_1W" + Bel Type = "BEL" + Count Type = "COUNT" + RatioDiv Type = "RATIO_DIV" + RatioMod Type = "RATIO_MOD" + PercentageRelativeHumidity Type = "PERCENTAGE_RELATIVE_HUMIDITY" + PercentageBatteryLevel Type = "PERCENTAGE_BATTERY_LEVEL" + SecondsBatteryLevel Type = "SECONDS_BATTERY_LEVEL" + EventRateSecond Type = "EVENT_RATE_SECOND" + EventRateMinute Type = "EVENT_RATE_MINUTE" + HeartRate Type = "HEART_RATE" + HeartBeats Type = "HEART_BEATS" + SiemensPerMeter Type = "SIEMENS_PER_METER" + // Complex properties + Location Type = "LOCATION" + ColorHSB Type = "COLOR_HSB" + ColorRGB Type = "COLOR_RGB" + GenericComplexProperty = "GENERIC_COMPLEX_PROPERTY" + Schedule Type = "SCHEDULE" + // Alexa Properties + HomeColoredLight = "HOME_COLORED_LIGHT" + HomeDimmedLight = "HOME_DIMMED_LIGHT" + HomeLight Type = "HOME_LIGHT" + HomeContactSensor = "HOME_CONTACT_SENSOR" + HomeMotionSensor = "HOME_MOTION_SENSOR" + HomeSmartPlugType = "HOME_SMART_PLUG" + HomeTemperature = "HOME_TEMPERATURE" + HomeTemperatureC = "HOME_TEMPERATURE_C" + HomeTemperatureF = "HOME_TEMPERATURE_F" + HomeSwitch Type = "HOME_SWITCH" + HomeTelevision = "HOME_TELEVISION" + // New Types based on dimensions + Energy Type = "ENERGY" + Force Type = "FORCE" + Temperature Type = "TEMPERATURE" + Power Type = "POWER" + ElectricCurrent Type = "ELECTRIC_CURRENT" + ElectricPotential = "ELECTRIC_POTENTIAL" + ElectricalResistance = "ELECTRICAL_RESISTANCE" + Capacitance Type = "CAPACITANCE" + Time Type = "TIME" + Frequency Type = "FREQUENCY" + DataRate Type = "DATA_RATE" + Acceleration Type = "ACCELERATION" + Area Type = "AREA" + Length Type = "LENGTH" + Velocity Type = "VELOCITY" + Mass Type = "MASS" + Volume Type = "VOLUME" + FlowRate Type = "FLOW_RATE" + Angle Type = "ANGLE" + Illuminance Type = "ILLUMINANCE" + LuminousFlux Type = "LUMINOUS_FLUX" + Luminance Type = "LUMINANCE" + LuminousIntensity = "LUMINOUS_INTENSITY" + LogarithmicQuantity = "LOGARITHMIC_QUANTITY" + Pressure Type = "PRESSURE" + InformationContent = "INFORMATION_CONTENT" +) + +var floatPropertyTypes = []Type{ + Analog, + Float, + LenghtC, + LenghtI, + LenghtM, + Percentage, + TemperatureC, + TemperatureF, + Meter, + Kilogram, + Gram, + Second, + Ampere, + Kelvin, + Candela, + Mole, + Hertz, + Radian, + Steradian, + Newton, + Pascal, + Joule, + Watt, + Coulomb, + Volt, + Farad, + Ohm, + Siemens, + Weber, + Tesla, + Henry, + DegreesCelsius, + Lumen, + Lux, + Becquerel, + Gray, + Sievert, + Katal, + SquareMeter, + CubicMeter, + Liter, + MeterPerSecond, + MeterPerSquareSecond, + CubicMeterPerSecond, + LiterPerSecond, + WattPerSquareMeter, + CandelaPerSquareMeter, + Bit, + BitPerSecond, + DegreesLatitude, + DegreesLongitude, + PhValue, + Decibel, + Decibel1w, + Bel, + RatioDiv, + RatioMod, + PercentageRelativeHumidity, + PercentageBatteryLevel, + SecondsBatteryLevel, + EventRateSecond, + EventRateMinute, + HeartRate, + HeartBeats, + SiemensPerMeter, + HomeTemperature, + HomeTemperatureC, + HomeTemperatureF, + Energy, + Force, + Temperature, + Power, + ElectricCurrent, + ElectricPotential, + ElectricalResistance, + Capacitance, + Frequency, + DataRate, + Acceleration, + Area, + Length, + Velocity, + Mass, + Volume, + FlowRate, + Angle, + Illuminance, + LuminousFlux, + Luminance, + LuminousIntensity, + LogarithmicQuantity, + Pressure, +} + +var intPropertyTypes = []Type{ + Int, + Count, + Time, + InformationContent, +} + +var booleanPropertyTypes = []Type{ + Status, + HomeLight, + HomeSwitch, + HomeContactSensor, + HomeMotionSensor, +} + +func IsPropertyFloat(pType string) bool { + for _, tpy := range floatPropertyTypes { + if pType == string(tpy) { + return true + } + } + return false +} + +func IsPropertyInt(pType string) bool { + for _, tpy := range intPropertyTypes { + if pType == string(tpy) { + return true + } + } + return false +} + +func IsPropertyNumberType(pType string) bool { + return IsPropertyFloat(pType) || IsPropertyInt(pType) +} + +func IsPropertyString(pType string) bool { + return pType == "CHARSTRING" +} + +func IsPropertyLocation(pType string) bool { + return Type(pType) == Location +} + +func IsPropertyBool(pType string) bool { + for _, tpy := range booleanPropertyTypes { + if pType == string(tpy) { + return true + } + } + return false +} From 6b3924246dbd84e1caecda7ce25a3aa271129b5f Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Fri, 20 Sep 2024 09:19:23 +0200 Subject: [PATCH 6/6] code refactoring --- app/exporter/exporter.go | 59 ++++++++++++++++++----------- business/tsextractor/tsextractor.go | 2 +- lambda.go | 6 ++- resources/test/localexecution.go | 7 +++- 4 files changed, 48 insertions(+), 26 deletions(-) diff --git a/app/exporter/exporter.go b/app/exporter/exporter.go index 070a254..49390b7 100755 --- a/app/exporter/exporter.go +++ b/app/exporter/exporter.go @@ -28,40 +28,53 @@ import ( "github.com/sirupsen/logrus" ) -func StartExporter( - ctx context.Context, - logger *logrus.Entry, - key, secret, orgid string, - tagsF *string, - resolution, timeWindowMinutes int, - destinationS3Bucket string, - aggregationStat string, - compress, enableAlignTimeWindow bool) error { +type samplesExporter struct { + iotClient *iot.Client + logger *logrus.Entry + tagsF *string + compress bool + enableAlignTimeWindow bool +} - // Init client +func New(key, secret, orgid string, tagsF *string, compress, enableAlignTimeWindow bool, logger *logrus.Entry) (*samplesExporter, error) { iotcl, err := iot.NewClient(key, secret, orgid) if err != nil { - return err + return nil, err } - if tagsF != nil { - logger.Infoln("Filtering things linked to configured account using tags: ", *tagsF) + return &samplesExporter{ + iotClient: iotcl, + logger: logger, + tagsF: tagsF, + compress: compress, + enableAlignTimeWindow: enableAlignTimeWindow, + }, nil +} + +func (s *samplesExporter) StartExporter( + ctx context.Context, + resolution, timeWindowMinutes int, + destinationS3Bucket string, + aggregationStat string) error { + + if s.tagsF != nil { + s.logger.Infoln("Filtering things linked to configured account using tags: ", *s.tagsF) } else { - logger.Infoln("Importing all things linked to configured account") + s.logger.Infoln("Importing all things linked to configured account") } - things, err := iotcl.ThingList(ctx, nil, nil, true, utils.ParseTags(tagsF)) + things, err := s.iotClient.ThingList(ctx, nil, nil, true, utils.ParseTags(s.tagsF)) if err != nil { return err } thingsMap := make(map[string]iotclient.ArduinoThing, len(things)) for _, thing := range things { - logger.Infoln(" Thing: ", thing.Id, thing.Name) + s.logger.Infoln(" Thing: ", thing.Id, thing.Name) thingsMap[thing.Id] = thing } // Extract data points from thing and push to S3 - tsextractorClient := tsextractor.New(iotcl, logger) + tsextractorClient := tsextractor.New(s.iotClient, s.logger) // Open s3 output writer s3cl, err := s3.NewS3Client(destinationS3Bucket) @@ -69,12 +82,12 @@ func StartExporter( return err } - if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, enableAlignTimeWindow); err != nil { + if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, s.enableAlignTimeWindow); err != nil { if writer != nil { writer.Close() defer writer.Delete() } - logger.Error("Error aligning time series samples: ", err) + s.logger.Error("Error aligning time series samples: ", err) return err } else { writer.Close() @@ -82,20 +95,20 @@ func StartExporter( fileToUpload := writer.GetFilePath() destinationKeyFormat := "%s/%s.csv" - if compress { - logger.Infof("Compressing file: %s\n", fileToUpload) + if s.compress { + s.logger.Infof("Compressing file: %s\n", fileToUpload) compressedFile, err := utils.GzipFileCompression(fileToUpload) if err != nil { return err } fileToUpload = compressedFile - logger.Infof("Generated compressed file: %s\n", fileToUpload) + s.logger.Infof("Generated compressed file: %s\n", fileToUpload) destinationKeyFormat = "%s/%s.csv.gz" defer func(f string) { os.Remove(f) }(fileToUpload) } destinationKey := fmt.Sprintf(destinationKeyFormat, from.Format("2006-01-02"), from.Format("2006-01-02-15-04")) - logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey) + s.logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey) if err := s3cl.WriteFile(ctx, destinationKey, fileToUpload); err != nil { return err } diff --git a/business/tsextractor/tsextractor.go b/business/tsextractor/tsextractor.go index f5d682f..75bf04b 100755 --- a/business/tsextractor/tsextractor.go +++ b/business/tsextractor/tsextractor.go @@ -360,7 +360,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3( if !slices.Contains(populatedProperties, propertyID) { populatedProperties = append(populatedProperties, propertyID) } - samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), "")) + samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), "SAMPLED")) } } diff --git a/lambda.go b/lambda.go index a7dfa49..0133b0d 100755 --- a/lambda.go +++ b/lambda.go @@ -188,7 +188,11 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err logger.Infoln("file compression enabled:", enabledCompression) logger.Infoln("align time window:", enableAlignTimeWindow) - err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat, enabledCompression, enableAlignTimeWindow) + tsExporter, err := exporter.New(*apikey, *apiSecret, organizationId, tags, enabledCompression, enableAlignTimeWindow, logger) + if err != nil { + return nil, err + } + err = tsExporter.StartExporter(ctx, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat) if err != nil { message := "Error detected during data export" return &message, err diff --git a/resources/test/localexecution.go b/resources/test/localexecution.go index 6baa9ef..11bda6b 100644 --- a/resources/test/localexecution.go +++ b/resources/test/localexecution.go @@ -89,10 +89,15 @@ func HandleRequest(ctx context.Context, dev bool) (*string, error) { logger.Infoln("tags:", *tags) } - err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket, "MAX", true, true) + tsExporter, err := exporter.New(*apikey, *apiSecret, organizationId, tags, true, true, logger) if err != nil { return nil, err } + err = tsExporter.StartExporter(ctx, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket, "MAX") + if err != nil { + message := "Error detected during data export" + return &message, err + } message := "Data exported successfully" return &message, nil