Skip to content

Commit b9d0a87

Browse files
committed
misc corrections
1 parent db04052 commit b9d0a87

File tree

4 files changed

+153
-27
lines changed

4 files changed

+153
-27
lines changed

internal/clients/elasticsearch/index.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -548,11 +548,13 @@ func DeleteIngestPipeline(ctx context.Context, apiClient *clients.ApiClient, nam
548548
func PutTransform(ctx context.Context, apiClient *clients.ApiClient, transform *models.Transform, params *models.PutTransformParams) diag.Diagnostics {
549549
fmt.Println("entering PutTransform")
550550
var diags diag.Diagnostics
551-
pipelineBytes, err := json.Marshal(transform)
551+
transformBytes, err := json.Marshal(transform)
552552
if err != nil {
553553
return diag.FromErr(err)
554554
}
555555

556+
fmt.Printf("%s\n", transformBytes)
557+
556558
esClient, err := apiClient.GetESClient()
557559
if err != nil {
558560
return diag.FromErr(err)
@@ -564,7 +566,7 @@ func PutTransform(ctx context.Context, apiClient *clients.ApiClient, transform *
564566
esClient.TransformPutTransform.WithTimeout(params.Timeout),
565567
}
566568

567-
res, err := esClient.TransformPutTransform(bytes.NewReader(pipelineBytes), transform.Name, opts...)
569+
res, err := esClient.TransformPutTransform(bytes.NewReader(transformBytes), transform.Name, opts...)
568570
if err != nil {
569571
return diag.FromErr(err)
570572
}
@@ -615,11 +617,13 @@ func GetTransform(ctx context.Context, apiClient *clients.ApiClient, name *strin
615617
func UpdateTransform(ctx context.Context, apiClient *clients.ApiClient, transform *models.Transform, params *models.UpdateTransformParams) diag.Diagnostics {
616618
fmt.Println("entering UpdateTransform")
617619
var diags diag.Diagnostics
618-
pipelineBytes, err := json.Marshal(transform)
620+
transformBytes, err := json.Marshal(transform)
619621
if err != nil {
620622
return diag.FromErr(err)
621623
}
622624

625+
fmt.Printf("%s\n", transformBytes)
626+
623627
esClient, err := apiClient.GetESClient()
624628
if err != nil {
625629
return diag.FromErr(err)
@@ -631,7 +635,7 @@ func UpdateTransform(ctx context.Context, apiClient *clients.ApiClient, transfor
631635
esClient.TransformUpdateTransform.WithTimeout(params.Timeout),
632636
}
633637

634-
res, err := esClient.TransformUpdateTransform(bytes.NewReader(pipelineBytes), transform.Name, opts...)
638+
res, err := esClient.TransformUpdateTransform(bytes.NewReader(transformBytes), transform.Name, opts...)
635639
if err != nil {
636640
return diag.FromErr(err)
637641
}

internal/elasticsearch/transform/transform.go

Lines changed: 83 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func ResourceTransform() *schema.Resource {
100100
Description: "The pivot method transforms the data by aggregating and grouping it.",
101101
Type: schema.TypeString,
102102
Optional: true,
103-
AtLeastOneOf: []string{"pivot", "latest"},
103+
ExactlyOneOf: []string{"pivot", "latest"},
104104
DiffSuppressFunc: utils.DiffJsonSuppress,
105105
ValidateFunc: validation.StringIsJSON,
106106
ForceNew: true,
@@ -109,7 +109,7 @@ func ResourceTransform() *schema.Resource {
109109
Description: "The latest method transforms the data by finding the latest document for each unique key.",
110110
Type: schema.TypeString,
111111
Optional: true,
112-
AtLeastOneOf: []string{"pivot", "latest"},
112+
ExactlyOneOf: []string{"pivot", "latest"},
113113
DiffSuppressFunc: utils.DiffJsonSuppress,
114114
ValidateFunc: validation.StringIsJSON,
115115
ForceNew: true,
@@ -119,7 +119,7 @@ func ResourceTransform() *schema.Resource {
119119
Description: "The interval between checks for changes in the source indices when the transform is running continuously. Defaults to `1m`.",
120120
Optional: true,
121121
Default: "1m",
122-
ValidateFunc: utils.StringIsDuration,
122+
ValidateFunc: utils.StringIsElasticDuration,
123123
},
124124
"metadata": {
125125
Description: "Defines optional transform metadata.",
@@ -151,7 +151,7 @@ func ResourceTransform() *schema.Resource {
151151
Description: "Specifies the maximum age of a document in the destination index.",
152152
Type: schema.TypeString,
153153
Required: true,
154-
ValidateFunc: utils.StringIsDuration,
154+
ValidateFunc: utils.StringIsElasticDuration,
155155
},
156156
},
157157
},
@@ -183,7 +183,7 @@ func ResourceTransform() *schema.Resource {
183183
Type: schema.TypeString,
184184
Optional: true,
185185
Default: "60s",
186-
ValidateFunc: utils.StringIsDuration,
186+
ValidateFunc: utils.StringIsElasticDuration,
187187
},
188188
},
189189
},
@@ -405,13 +405,12 @@ func getTransformFromResourceData(ctx context.Context, d *schema.ResourceData, n
405405
if v, ok := d.GetOk("source"); ok {
406406
definedSource := v.([]interface{})[0].(map[string]interface{})
407407

408+
transform.Source = new(models.TransformSource)
408409
indices := make([]string, 0)
409410
for _, i := range definedSource["indices"].([]interface{}) {
410411
indices = append(indices, i.(string))
411412
}
412-
transform.Source = models.TransformSource{
413-
Indices: indices,
414-
}
413+
transform.Source.Indices = indices
415414

416415
if v, ok := definedSource["query"]; ok && len(v.(string)) > 0 {
417416
var query interface{}
@@ -431,12 +430,13 @@ func getTransformFromResourceData(ctx context.Context, d *schema.ResourceData, n
431430
}
432431

433432
if v, ok := d.GetOk("destination"); ok {
433+
434434
definedDestination := v.([]interface{})[0].(map[string]interface{})
435-
transform.Destination = models.TransformDestination{
436-
Index: definedDestination["index"].(string),
437-
}
435+
transform.Destination = new(models.TransformDestination)
438436

439-
if pipeline, ok := definedDestination["pipeline"]; ok {
437+
transform.Destination.Index = definedDestination["index"].(string)
438+
439+
if pipeline, ok := definedDestination["pipeline"]; ok && len(pipeline.(string)) > 0 {
440440
transform.Destination.Pipeline = pipeline.(string)
441441
}
442442
}
@@ -457,6 +457,10 @@ func getTransformFromResourceData(ctx context.Context, d *schema.ResourceData, n
457457
transform.Latest = latest
458458
}
459459

460+
if v, ok := d.GetOk("frequency"); ok {
461+
transform.Frequency = v.(string)
462+
}
463+
460464
if v, ok := d.GetOk("metadata"); ok {
461465
metadata := make(map[string]interface{})
462466
if err := json.NewDecoder(strings.NewReader(v.(string))).Decode(&metadata); err != nil {
@@ -465,5 +469,72 @@ func getTransformFromResourceData(ctx context.Context, d *schema.ResourceData, n
465469
transform.Meta = metadata
466470
}
467471

472+
if v, ok := d.GetOk("retention_policy"); ok && v != nil {
473+
definedRetentionPolicy := v.([]interface{})[0].(map[string]interface{})
474+
retentionTime := models.TransformRetentionPolicyTime{}
475+
if v, ok := definedRetentionPolicy["time"]; ok {
476+
var definedRetentionTime = v.([]interface{})[0].(map[string]interface{})
477+
if f, ok := definedRetentionTime["field"]; ok {
478+
retentionTime.Field = f.(string)
479+
}
480+
if ma, ok := definedRetentionTime["max_age"]; ok {
481+
retentionTime.MaxAge = ma.(string)
482+
}
483+
transform.RetentionPolicy = new(models.TransformRetentionPolicy)
484+
transform.RetentionPolicy.Time = retentionTime
485+
}
486+
}
487+
488+
if v, ok := d.GetOk("sync"); ok {
489+
definedRetentionPolicy := v.([]interface{})[0].(map[string]interface{})
490+
syncTime := models.TransformSyncTime{}
491+
if v, ok := definedRetentionPolicy["time"]; ok {
492+
var definedRetentionTime = v.([]interface{})[0].(map[string]interface{})
493+
if f, ok := definedRetentionTime["field"]; ok {
494+
syncTime.Field = f.(string)
495+
}
496+
if d, ok := definedRetentionTime["delay"]; ok {
497+
syncTime.Delay = d.(string)
498+
}
499+
transform.Sync = new(models.TransformSync)
500+
transform.Sync.Time = syncTime
501+
}
502+
}
503+
504+
if v, ok := d.GetOk("settings"); ok {
505+
definedSettings := v.([]interface{})[0].(map[string]interface{})
506+
settings := models.TransformSettings{}
507+
if v, ok := definedSettings["align_checkpoints"]; ok {
508+
settings.AlignCheckpoints = new(bool)
509+
*settings.AlignCheckpoints = v.(bool)
510+
}
511+
if v, ok := definedSettings["dates_as_epoch_millis"]; ok {
512+
settings.DatesAsEpochMillis = new(bool)
513+
*settings.DatesAsEpochMillis = v.(bool)
514+
}
515+
if v, ok := definedSettings["deduce_mappings"]; ok {
516+
settings.DeduceMappings = new(bool)
517+
*settings.DeduceMappings = v.(bool)
518+
}
519+
if v, ok := definedSettings["docs_per_second"]; ok {
520+
settings.DocsPerSecond = new(float64)
521+
*settings.DocsPerSecond = v.(float64)
522+
}
523+
if v, ok := definedSettings["max_page_search_size"]; ok {
524+
settings.MaxPageSearchSize = new(int)
525+
*settings.MaxPageSearchSize = v.(int)
526+
}
527+
if v, ok := definedSettings["num_failure_retries"]; ok {
528+
settings.NumFailureRetries = new(int)
529+
*settings.NumFailureRetries = v.(int)
530+
}
531+
if v, ok := definedSettings["unattended"]; ok {
532+
settings.Unattended = new(bool)
533+
*settings.Unattended = v.(bool)
534+
}
535+
536+
transform.Settings = &settings
537+
}
538+
468539
return &transform, nil
469540
}

internal/models/transform.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,18 @@ import (
66
)
77

88
type Transform struct {
9-
Id string `json:"id,omitempty"`
10-
Name string `json:"-"`
11-
Description string `json:"description,omitempty"`
12-
Source TransformSource `json:"source"`
13-
Destination TransformDestination `json:"dest"`
14-
Pivot interface{} `json:"pivot,omitempty"`
15-
Latest interface{} `json:"latest,omitempty"`
16-
Frequency string `json:"frequency,omitempty"`
17-
RetentionPolicy TransformRetentionPolicy `json:"retention_policy,omitempty"`
18-
Sync TransformSync `json:"sync,omitempty"`
19-
Meta map[string]interface{} `json:"_meta,omitempty"`
9+
Id string `json:"id,omitempty"`
10+
Name string `json:"-"`
11+
Description string `json:"description,omitempty"`
12+
Source *TransformSource `json:"source"`
13+
Destination *TransformDestination `json:"dest"`
14+
Pivot interface{} `json:"pivot,omitempty"`
15+
Latest interface{} `json:"latest,omitempty"`
16+
Frequency string `json:"frequency,omitempty"`
17+
RetentionPolicy *TransformRetentionPolicy `json:"retention_policy,omitempty"`
18+
Sync *TransformSync `json:"sync,omitempty"`
19+
Meta map[string]interface{} `json:"_meta,omitempty"`
20+
Settings *TransformSettings `json:"settings,omitempty"`
2021
}
2122

2223
type TransformSource struct {

internal/utils/validation.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,53 @@ func StringIsDuration(i interface{}, k string) (warnings []string, errors []erro
1818

1919
return nil, nil
2020
}
21+
22+
// StringIsElasticDuration is a SchemaValidateFunc which tests to make sure the supplied string is valid duration using Elastic time units:
23+
// d, h, m, s, ms, micros, nanos. (see https://www.elastic.co/guide/en/elasticsearch/reference/current/api-conventions.html#time-units)
24+
func StringIsElasticDuration(i interface{}, k string) (warnings []string, errors []error) {
25+
v, ok := i.(string)
26+
if !ok {
27+
return nil, []error{fmt.Errorf("expected type of %s to be string", k)}
28+
}
29+
30+
if v == "" {
31+
return nil, []error{fmt.Errorf("%q contains an invalid duration: [empty]", k)}
32+
}
33+
34+
firstPartCount := 0
35+
for v != "" {
36+
// first part must contain only characters in range [0-9] and .
37+
if ('0' <= v[0] && v[0] <= '9') || v[0] == '.' {
38+
v = v[1:]
39+
firstPartCount++
40+
continue
41+
}
42+
43+
if firstPartCount == 0 {
44+
return nil, []error{fmt.Errorf("%q contains an invalid duration: should start with a numeric value", k)}
45+
}
46+
47+
if !isValidElasticTimeUnit(v) {
48+
return nil, []error{fmt.Errorf("%q contains an invalid duration: unrecognized time unit [%s]", k, v)}
49+
}
50+
51+
break
52+
}
53+
54+
return nil, nil
55+
}
56+
57+
func isValidElasticTimeUnit(timeUnit string) bool {
58+
switch timeUnit {
59+
case
60+
"d",
61+
"h",
62+
"m",
63+
"s",
64+
"ms",
65+
"micros",
66+
"nanos":
67+
return true
68+
}
69+
return false
70+
}

0 commit comments

Comments
 (0)