Skip to content

Populate property last value if not present in data extraction #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions app/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,74 +28,87 @@ import (
"github.com/sirupsen/logrus"
)

func StartExporter(
ctx context.Context,
logger *logrus.Entry,
key, secret, orgid string,
tagsF *string,
resolution, timeWindowMinutes int,
destinationS3Bucket string,
aggregationStat string,
compress, enableAlignTimeWindow bool) error {
type samplesExporter struct {
iotClient *iot.Client
logger *logrus.Entry
tagsF *string
compress bool
enableAlignTimeWindow bool
}

// Init client
func New(key, secret, orgid string, tagsF *string, compress, enableAlignTimeWindow bool, logger *logrus.Entry) (*samplesExporter, error) {
iotcl, err := iot.NewClient(key, secret, orgid)
if err != nil {
return err
return nil, err
}

if tagsF != nil {
logger.Infoln("Filtering things linked to configured account using tags: ", *tagsF)
return &samplesExporter{
iotClient: iotcl,
logger: logger,
tagsF: tagsF,
compress: compress,
enableAlignTimeWindow: enableAlignTimeWindow,
}, nil
}

func (s *samplesExporter) StartExporter(
ctx context.Context,
resolution, timeWindowMinutes int,
destinationS3Bucket string,
aggregationStat string) error {

if s.tagsF != nil {
s.logger.Infoln("Filtering things linked to configured account using tags: ", *s.tagsF)
} else {
logger.Infoln("Importing all things linked to configured account")
s.logger.Infoln("Importing all things linked to configured account")
}

things, err := iotcl.ThingList(ctx, nil, nil, true, utils.ParseTags(tagsF))
things, err := s.iotClient.ThingList(ctx, nil, nil, true, utils.ParseTags(s.tagsF))
if err != nil {
return err
}
thingsMap := make(map[string]iotclient.ArduinoThing, len(things))
for _, thing := range things {
logger.Infoln(" Thing: ", thing.Id, thing.Name)
s.logger.Infoln(" Thing: ", thing.Id, thing.Name)
thingsMap[thing.Id] = thing
}

// Extract data points from thing and push to S3
tsextractorClient := tsextractor.New(iotcl, logger)
tsextractorClient := tsextractor.New(s.iotClient, s.logger)

// Open s3 output writer
s3cl, err := s3.NewS3Client(destinationS3Bucket)
if err != nil {
return err
}

if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, enableAlignTimeWindow); err != nil {
if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, s.enableAlignTimeWindow); err != nil {
if writer != nil {
writer.Close()
defer writer.Delete()
}
logger.Error("Error aligning time series samples: ", err)
s.logger.Error("Error aligning time series samples: ", err)
return err
} else {
writer.Close()
defer writer.Delete()

fileToUpload := writer.GetFilePath()
destinationKeyFormat := "%s/%s.csv"
if compress {
logger.Infof("Compressing file: %s\n", fileToUpload)
if s.compress {
s.logger.Infof("Compressing file: %s\n", fileToUpload)
compressedFile, err := utils.GzipFileCompression(fileToUpload)
if err != nil {
return err
}
fileToUpload = compressedFile
logger.Infof("Generated compressed file: %s\n", fileToUpload)
s.logger.Infof("Generated compressed file: %s\n", fileToUpload)
destinationKeyFormat = "%s/%s.csv.gz"
defer func(f string) { os.Remove(f) }(fileToUpload)
}

destinationKey := fmt.Sprintf(destinationKeyFormat, from.Format("2006-01-02"), from.Format("2006-01-02-15-04"))
logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey)
s.logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey)
if err := s3cl.WriteFile(ctx, destinationKey, fileToUpload); err != nil {
return err
}
Expand Down
Loading
Loading