Skip to content

Commit a0f62cb

Browse files
committed
populate last value
1 parent 2ebd408 commit a0f62cb

File tree

4 files changed

+94
-102
lines changed

4 files changed

+94
-102
lines changed

business/tsextractor/tsextractor.go

Lines changed: 91 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"encoding/json"
2121
"errors"
2222
"fmt"
23+
"slices"
2324
"strconv"
2425
"strings"
2526
"sync"
@@ -95,10 +96,10 @@ func (a *TsExtractor) ExportTSToFile(
9596
} else {
9697
a.logger.Infoln("=====> Exporting data. Time window: ", timeWindowInMinutes, "m (resolution: ", resolution, "s). From ", from, " to ", to, " - aggregation: ", aggregationStat)
9798
}
98-
for thingID, thing := range thingsMap {
99+
for _, thing := range thingsMap {
99100

100101
if len(thing.Properties) == 0 {
101-
a.logger.Warn("Skipping thing with no properties: ", thingID)
102+
a.logger.Warn("Skipping thing with no properties: ", thing.Id)
102103
continue
103104
}
104105

@@ -109,31 +110,51 @@ func (a *TsExtractor) ExportTSToFile(
109110
defer func() { <-tokens }()
110111
defer wg.Done()
111112

112-
if isRawResolution(resolution) {
113+
detectedProperties := []string{}
114+
isRaw := isRawResolution(resolution)
115+
if isRaw {
113116
// Populate raw time series data
114-
err := a.populateRawTSDataIntoS3(ctx, from, to, thing, writer)
117+
populatedProperties, err := a.populateRawTSDataIntoS3(ctx, from, to, thing, writer)
115118
if err != nil {
116119
a.logger.Error("Error populating raw time series data: ", err)
117120
errorChannel <- err
118121
return
119122
}
123+
if len(populatedProperties) > 0 {
124+
detectedProperties = append(detectedProperties, populatedProperties...)
125+
}
120126
} else {
121127
// Populate numeric time series data
122-
err := a.populateNumericTSDataIntoS3(ctx, from, to, thing, resolution, aggregationStat, writer)
128+
populatedProperties, err := a.populateNumericTSDataIntoS3(ctx, from, to, thing, resolution, aggregationStat, writer)
123129
if err != nil {
124130
a.logger.Error("Error populating time series data: ", err)
125131
errorChannel <- err
126132
return
127133
}
134+
if len(populatedProperties) > 0 {
135+
detectedProperties = append(detectedProperties, populatedProperties...)
136+
}
128137

129138
// Populate string time series data, if any
130-
err = a.populateStringTSDataIntoS3(ctx, from, to, thing, resolution, writer)
139+
populatedProperties, err = a.populateStringTSDataIntoS3(ctx, from, to, thing, resolution, writer)
131140
if err != nil {
132141
a.logger.Error("Error populating string time series data: ", err)
133142
errorChannel <- err
134143
return
135144
}
145+
if len(populatedProperties) > 0 {
146+
detectedProperties = append(detectedProperties, populatedProperties...)
147+
}
136148
}
149+
150+
// Populate last value samples for ON_CHANGE properties, if needed
151+
err = a.populateLastValueSamplesForOnChangeProperties(isRaw, thing, detectedProperties, writer)
152+
if err != nil {
153+
a.logger.Error("Error populating last value data: ", err)
154+
errorChannel <- err
155+
return
156+
}
157+
137158
}(thing, writer)
138159
}
139160

@@ -174,12 +195,13 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
174195
thing iotclient.ArduinoThing,
175196
resolution int,
176197
aggregationStat string,
177-
writer *csv.CsvWriter) error {
198+
writer *csv.CsvWriter) ([]string, error) {
178199

179200
if resolution <= 60 {
180201
resolution = 60
181202
}
182203

204+
populatedProperties := []string{}
183205
var batched *iotclient.ArduinoSeriesBatch
184206
var err error
185207
var retry bool
@@ -194,7 +216,7 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
194216
}
195217
}
196218
if err != nil {
197-
return err
219+
return nil, err
198220
}
199221

200222
sampleCount := int64(0)
@@ -214,19 +236,22 @@ func (a *TsExtractor) populateNumericTSDataIntoS3(
214236

215237
ts := response.Times[i]
216238
value := response.Values[i]
239+
if !slices.Contains(populatedProperties, propertyID) {
240+
populatedProperties = append(populatedProperties, propertyID)
241+
}
217242
samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, strconv.FormatFloat(value, 'f', -1, 64), aggregationStat))
218243
}
219244
}
220245

221246
// Write samples to csv ouput file
222247
if len(samples) > 0 {
223248
if err := writer.Write(samples); err != nil {
224-
return err
249+
return nil, err
225250
}
226251
a.logger.Debugf("Thing %s [%s] saved %d values\n", thing.Id, thing.Name, sampleCount)
227252
}
228253

229-
return nil
254+
return populatedProperties, nil
230255
}
231256

232257
func composeRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, propertyType string, value string, aggregation string) []string {
@@ -280,7 +305,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
280305
to time.Time,
281306
thing iotclient.ArduinoThing,
282307
resolution int,
283-
writer *csv.CsvWriter) error {
308+
writer *csv.CsvWriter) ([]string, error) {
284309

285310
// Filter properties by char type
286311
stringProperties := []string{}
@@ -291,9 +316,10 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
291316
}
292317

293318
if len(stringProperties) == 0 {
294-
return nil
319+
return nil, nil
295320
}
296321

322+
populatedProperties := []string{}
297323
var batched *iotclient.ArduinoSeriesBatchSampled
298324
var err error
299325
var retry bool
@@ -308,7 +334,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
308334
}
309335
}
310336
if err != nil {
311-
return err
337+
return nil, err
312338
}
313339

314340
sampleCount := int64(0)
@@ -331,28 +357,32 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
331357
if value == nil {
332358
continue
333359
}
360+
if !slices.Contains(populatedProperties, propertyID) {
361+
populatedProperties = append(populatedProperties, propertyID)
362+
}
334363
samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), ""))
335364
}
336365
}
337366

338367
// Write samples to csv ouput file
339368
if len(samples) > 0 {
340369
if err := writer.Write(samples); err != nil {
341-
return err
370+
return nil, err
342371
}
343372
a.logger.Debugf("Thing %s [%s] string properties saved %d values\n", thing.Id, thing.Name, sampleCount)
344373
}
345374

346-
return nil
375+
return populatedProperties, nil
347376
}
348377

349378
func (a *TsExtractor) populateRawTSDataIntoS3(
350379
ctx context.Context,
351380
from time.Time,
352381
to time.Time,
353382
thing iotclient.ArduinoThing,
354-
writer *csv.CsvWriter) error {
383+
writer *csv.CsvWriter) ([]string, error) {
355384

385+
populatedProperties := []string{}
356386
var batched *iotclient.ArduinoSeriesRawBatch
357387
var err error
358388
var retry bool
@@ -367,7 +397,7 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
367397
}
368398
}
369399
if err != nil {
370-
return err
400+
return nil, err
371401
}
372402

373403
sampleCount := int64(0)
@@ -390,19 +420,22 @@ func (a *TsExtractor) populateRawTSDataIntoS3(
390420
if value == nil {
391421
continue
392422
}
423+
if !slices.Contains(populatedProperties, propertyID) {
424+
populatedProperties = append(populatedProperties, propertyID)
425+
}
393426
samples = append(samples, composeRawRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value)))
394427
}
395428
}
396429

397430
// Write samples to csv ouput file
398431
if len(samples) > 0 {
399432
if err := writer.Write(samples); err != nil {
400-
return err
433+
return nil, err
401434
}
402435
a.logger.Debugf("Thing %s [%s] raw data saved %d values\n", thing.Id, thing.Name, sampleCount)
403436
}
404437

405-
return nil
438+
return populatedProperties, nil
406439
}
407440

408441
func (a *TsExtractor) interfaceToString(value interface{}) string {
@@ -426,3 +459,42 @@ func (a *TsExtractor) interfaceToString(value interface{}) string {
426459
return fmt.Sprintf("%v", v)
427460
}
428461
}
462+
463+
func (a *TsExtractor) populateLastValueSamplesForOnChangeProperties(
464+
isRaw bool,
465+
thing iotclient.ArduinoThing,
466+
propertiesWithExtractedValue []string,
467+
writer *csv.CsvWriter) error {
468+
469+
// Check if there are ON_CHANGE properties
470+
if len(thing.Properties) == 0 {
471+
return nil
472+
}
473+
samples := [][]string{}
474+
sampleCount := 0
475+
for _, prop := range thing.Properties {
476+
if prop.UpdateStrategy == "ON_CHANGE" && !slices.Contains(propertiesWithExtractedValue, prop.Id) {
477+
if prop.ValueUpdatedAt == nil {
478+
continue
479+
}
480+
var toAdd []string
481+
if isRaw {
482+
toAdd = composeRawRow(*prop.ValueUpdatedAt, thing.Id, thing.Name, prop.Id, prop.Name, prop.Type, a.interfaceToString(prop.LastValue))
483+
} else {
484+
toAdd = composeRow(*prop.ValueUpdatedAt, thing.Id, thing.Name, prop.Id, prop.Name, prop.Type, a.interfaceToString(prop.LastValue), "LAST_VALUE")
485+
}
486+
samples = append(samples, toAdd)
487+
sampleCount++
488+
}
489+
}
490+
491+
// Write samples to csv ouput file
492+
if len(samples) > 0 {
493+
if err := writer.Write(samples); err != nil {
494+
return err
495+
}
496+
a.logger.Debugf("Thing %s [%s] last value data saved %d values\n", thing.Id, thing.Name, sampleCount)
497+
}
498+
499+
return nil
500+
}

business/tsextractor/tsextractor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func TestExtractionFlow_rawResolution(t *testing.T) {
180180
Times: []time.Time{now.Add(-time.Minute * 2), now.Add(-time.Minute * 1), now},
181181
Values: []any{"a", "b", "c"},
182182
CountValues: 3,
183-
},
183+
},
184184
}
185185
samples := iotclient.ArduinoSeriesRawBatch{
186186
Responses: responses,

internal/iot/client.go

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ type API interface {
3131
GetTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time, interval int64, aggregationStat string) (*iotclient.ArduinoSeriesBatch, bool, error)
3232
GetTimeSeriesStringSampling(ctx context.Context, properties []string, from, to time.Time, interval int32) (*iotclient.ArduinoSeriesBatchSampled, bool, error)
3333
GetRawTimeSeriesByThing(ctx context.Context, thingID string, from, to time.Time) (*iotclient.ArduinoSeriesRawBatch, bool, error)
34-
GetPropertiesLastValue(ctx context.Context, properties []string, thingId string) (*iotclient.ArduinoSeriesRawBatchLastvalue, bool, error)
3534
}
3635

3736
// Client can perform actions on Arduino IoT Cloud.
@@ -74,14 +73,14 @@ func (cl *Client) setup(client, secret, organizationId string) error {
7473
}
7574

7675
// ThingList returns a list of things on Arduino IoT Cloud.
77-
func (cl *Client) ThingList(ctx context.Context, ids []string, device *string, props bool, tags map[string]string) ([]iotclient.ArduinoThing, error) {
76+
func (cl *Client) ThingList(ctx context.Context, ids []string, device *string, showProperties bool, tags map[string]string) ([]iotclient.ArduinoThing, error) {
7877
ctx, err := ctxWithToken(ctx, cl.token)
7978
if err != nil {
8079
return nil, err
8180
}
8281

8382
request := cl.api.ThingsV2Api.ThingsV2List(ctx)
84-
request = request.ShowProperties(props)
83+
request = request.ShowProperties(showProperties)
8584

8685
if ids != nil {
8786
request = request.Ids(ids)
@@ -233,45 +232,3 @@ func (cl *Client) GetRawTimeSeriesByThing(ctx context.Context, thingID string, f
233232
}
234233
return ts, false, nil
235234
}
236-
237-
func (cl *Client) GetPropertiesLastValue(ctx context.Context, properties []string, thingId string) (*iotclient.ArduinoSeriesRawBatchLastvalue, bool, error) {
238-
if len(properties) == 0 {
239-
return nil, false, fmt.Errorf("no properties provided")
240-
}
241-
242-
ctx, err := ctxWithToken(ctx, cl.token)
243-
if err != nil {
244-
return nil, false, err
245-
}
246-
247-
requests := make([]iotclient.BatchQueryRawLastValueRequestMediaV1, 0, len(properties))
248-
for _, prop := range properties {
249-
if prop == "" {
250-
continue
251-
}
252-
requests = append(requests, iotclient.BatchQueryRawLastValueRequestMediaV1{
253-
PropertyId: prop,
254-
ThingId: thingId,
255-
})
256-
}
257-
258-
if len(requests) == 0 {
259-
return nil, false, fmt.Errorf("no valid properties provided")
260-
}
261-
262-
batchQueryRequestsMediaV1 := iotclient.BatchLastValueRequestsMediaV1{
263-
Requests: requests,
264-
}
265-
266-
request := cl.api.SeriesV2Api.SeriesV2BatchQueryRawLastValue(ctx)
267-
request = request.BatchLastValueRequestsMediaV1(batchQueryRequestsMediaV1)
268-
ts, httpResponse, err := cl.api.SeriesV2Api.SeriesV2BatchQueryRawLastValueExecute(request)
269-
if err != nil {
270-
err = fmt.Errorf("retrieving time series last value: %w", errorDetail(err))
271-
if httpResponse != nil && httpResponse.StatusCode == 429 { // Retry if rate limited
272-
return nil, true, err
273-
}
274-
return nil, false, err
275-
}
276-
return ts, false, nil
277-
}

internal/iot/mocks/iot_api.go

Lines changed: 0 additions & 37 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)