diff --git a/business/tsextractor/tsextractor.go b/business/tsextractor/tsextractor.go index 295e148..a07423d 100755 --- a/business/tsextractor/tsextractor.go +++ b/business/tsextractor/tsextractor.go @@ -81,11 +81,19 @@ func (a *TsExtractor) ExportTSToS3( defer func() { <-tokens }() defer wg.Done() - err := a.populateTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer) + // Populate numeric time series data + err := a.populateNumericTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer) if err != nil { a.logger.Error("Error populating time series data: ", err) return } + + // Populate string time series data, if any + err = a.populateStringTSDataIntoS3(ctx, from, to, thingID, thing, resolution, writer) + if err != nil { + a.logger.Error("Error populating string time series data: ", err) + return + } }(thingID, thing, writer) } @@ -104,7 +112,7 @@ func (a *TsExtractor) ExportTSToS3( return nil } -func (a *TsExtractor) populateTSDataIntoS3( +func (a *TsExtractor) populateNumericTSDataIntoS3( ctx context.Context, from time.Time, to time.Time, @@ -147,15 +155,7 @@ func (a *TsExtractor) populateTSDataIntoS3( ts := response.Times[i] value := response.Values[i] - row := make([]string, 6) - row[0] = ts.UTC().Format(time.RFC3339) - row[1] = thingID - row[2] = thing.Name - row[3] = propertyID - row[4] = propertyName - row[5] = strconv.FormatFloat(value, 'f', 3, 64) - - samples = append(samples, row) + samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, strconv.FormatFloat(value, 'f', 3, 64))) } } @@ -170,6 +170,17 @@ func (a *TsExtractor) populateTSDataIntoS3( return nil } +func composeRow(ts time.Time, thingID string, thingName string, propertyID string, propertyName string, value string) []string { + row := make([]string, 6) + row[0] = ts.UTC().Format(time.RFC3339) + row[1] = thingID + row[2] = thingName + row[3] = propertyID + row[4] = propertyName + row[5] = value + return row +} + func extractPropertyName(thing iotclient.ArduinoThing, propertyID string) string { propertyName := "" for _, prop := range thing.Properties { @@ -180,3 +191,82 @@ func extractPropertyName(thing iotclient.ArduinoThing, propertyID string) string } return propertyName } + +func isStringProperty(ptype string) bool { + return ptype == "CHARSTRING" +} + +func (a *TsExtractor) populateStringTSDataIntoS3( + ctx context.Context, + from time.Time, + to time.Time, + thingID string, + thing iotclient.ArduinoThing, + resolution int, + writer *csv.CsvWriter) error { + + // Filter properties by char type + stringProperties := []string{} + for _, prop := range thing.Properties { + if isStringProperty(prop.Type) { + stringProperties = append(stringProperties, prop.Id) + } + } + + if len(stringProperties) == 0 { + return nil + } + + var batched *iotclient.ArduinoSeriesBatchSampled + var err error + var retry bool + for i := 0; i < 3; i++ { + batched, retry, err = a.iotcl.GetTimeSeriesStringSampling(ctx, stringProperties, from, to, int32(resolution)) + if !retry { + break + } else { + // This is due to a rate limit on the IoT API, we need to wait a bit before retrying + a.logger.Infof("Rate limit reached for thing %s. Waiting 1 second before retrying.\n", thingID) + time.Sleep(1 * time.Second) + } + } + if err != nil { + return err + } + + sampleCount := int64(0) + samples := [][]string{} + for _, response := range batched.Responses { + if response.CountValues == 0 { + continue + } + + propertyID := strings.Replace(response.Query, "property.", "", 1) + a.logger.Debugf("Thing %s - String Property %s - %d values\n", thingID, propertyID, response.CountValues) + sampleCount += response.CountValues + + propertyName := extractPropertyName(thing, propertyID) + + for i := 0; i < len(response.Times); i++ { + + ts := response.Times[i] + value := response.Values[i] + if value == nil { + continue + } + if strValue, ok := value.(string); ok { + samples = append(samples, composeRow(ts, thingID, thing.Name, propertyID, propertyName, strValue)) + } + } + } + + // Write samples to csv ouput file + if len(samples) > 0 { + if err := writer.Write(samples); err != nil { + return err + } + a.logger.Debugf("Thing %s [%s] string properties saved %d values\n", thingID, thing.Name, sampleCount) + } + + return nil +} diff --git a/internal/iot/client.go b/internal/iot/client.go index dbe2563..0720bae 100755 --- a/internal/iot/client.go +++ b/internal/iot/client.go @@ -298,6 +298,50 @@ func (cl *Client) GetTimeSeriesByThing(ctx context.Context, thingID string, from return ts, false, nil } +func (cl *Client) GetTimeSeriesStringSampling(ctx context.Context, properties []string, from, to time.Time, interval int32) (*iotclient.ArduinoSeriesBatchSampled, bool, error) { + if len(properties) == 0 { + return nil, false, fmt.Errorf("no properties provided") + } + + ctx, err := ctxWithToken(ctx, cl.token) + if err != nil { + return nil, false, err + } + + requests := make([]iotclient.BatchQuerySampledRequestMediaV1, 0, len(properties)) + for _, prop := range properties { + if prop == "" { + continue + } + requests = append(requests, iotclient.BatchQuerySampledRequestMediaV1{ + From: &from, + Interval: &interval, + Q: fmt.Sprintf("property.%s", prop), + To: &to, + }) + } + + if len(requests) == 0 { + return nil, false, fmt.Errorf("no valid properties provided") + } + + batchQueryRequestsMediaV1 := iotclient.BatchQuerySampledRequestsMediaV1{ + Requests: requests, + } + + request := cl.api.SeriesV2Api.SeriesV2BatchQuerySampling(ctx) + request = request.BatchQuerySampledRequestsMediaV1(batchQueryRequestsMediaV1) + ts, httpResponse, err := cl.api.SeriesV2Api.SeriesV2BatchQuerySamplingExecute(request) + if err != nil { + err = fmt.Errorf("retrieving time series sampling: %w", errorDetail(err)) + if httpResponse != nil && httpResponse.StatusCode == 429 { // Retry if rate limited + return nil, true, err + } + return nil, false, err + } + return ts, false, nil +} + func (cl *Client) setup(client, secret, organization string) error { baseURL := GetArduinoAPIBaseURL()