diff --git a/CHANGELOG.md b/CHANGELOG.md
index ee10cc58c..d1f9d7b9a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -17,6 +17,7 @@
}
```
- Add support for managing Kibana spaces ([#272](https://github.com/elastic/terraform-provider-elasticstack/pull/272))
+- Add support for managing Elasticsearch transforms ([#284](https://github.com/elastic/terraform-provider-elasticstack/pull/284))
### Fixed
- Respect `ignore_unavailable` and `include_global_state` values when configuring SLM policies ([#224](https://github.com/elastic/terraform-provider-elasticstack/pull/224))
diff --git a/docs/resources/elasticsearch_transform.md b/docs/resources/elasticsearch_transform.md
new file mode 100644
index 000000000..e54449ca2
--- /dev/null
+++ b/docs/resources/elasticsearch_transform.md
@@ -0,0 +1,170 @@
+---
+subcategory: "Transform"
+layout: ""
+page_title: "Elasticstack: elasticstack_elasticsearch_transform Resource"
+description: |-
+ Manages transforms. Transforms enable you to convert existing Elasticsearch indices into summarized indices.
+---
+
+# Resource: elasticstack_elasticsearch_transform
+
+Creates, updates, starts and stops a transform. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/transforms.html
+
+**NOTE:** Some transform settings require a minimum Elasticsearch version. Such settings will be ignored when applied to versions below the required one (a warning will be issued in the logs).
+
+## Example Usage
+
+```terraform
+resource "elasticstack_elasticsearch_transform" "transform_with_pivot" {
+ name = "transform-pivot"
+ description = "A meaningful description"
+
+ source {
+ indices = ["names_or_patterns_for_input_index"]
+ }
+
+ destination {
+ index = "destination_index_for_transform"
+ }
+
+ pivot = jsonencode({
+ "group_by" : {
+ "customer_id" : {
+ "terms" : {
+ "field" : "customer_id",
+ "missing_bucket" : true
+ }
+ }
+ },
+ "aggregations" : {
+ "max_price" : {
+ "max" : {
+ "field" : "taxful_total_price"
+ }
+ }
+ }
+ })
+
+ frequency = "5m"
+
+ retention_policy {
+ time {
+ field = "order_date"
+ max_age = "30d"
+ }
+ }
+
+ sync {
+ time {
+ field = "order_date"
+ delay = "10s"
+ }
+ }
+
+ max_page_search_size = 2000
+
+ enabled = false
+ defer_validation = false
+}
+```
+
+
+## Schema
+
+### Required
+
+- `destination` (Block List, Min: 1, Max: 1) The destination for the transform. (see [below for nested schema](#nestedblock--destination))
+- `name` (String) Name of the transform you wish to create.
+- `source` (Block List, Min: 1, Max: 1) The source of the data for the transform. (see [below for nested schema](#nestedblock--source))
+
+### Optional
+
+- `align_checkpoints` (Boolean) Specifies whether the transform checkpoint ranges should be optimized for performance.
+- `dates_as_epoch_millis` (Boolean) Defines if dates in the output should be written as ISO formatted string (default) or as millis since epoch.
+- `deduce_mappings` (Boolean) Specifies whether the transform should deduce the destination index mappings from the transform config.
+- `defer_validation` (Boolean) When true, deferrable validations are not run upon creation, but rather when the transform is started. This behavior may be desired if the source index does not exist until after the transform is created. Default is `false`
+- `description` (String) Free text description of the transform.
+- `docs_per_second` (Number) Specifies a limit on the number of input documents per second. Default (unset) value disables throttling.
+- `enabled` (Boolean) Controls wether the transform should be started or stopped. Default is `false` (stopped).
+- `frequency` (String) The interval between checks for changes in the source indices when the transform is running continuously. Defaults to `1m`.
+- `latest` (String) The latest method transforms the data by finding the latest document for each unique key. JSON definition expected. Either 'pivot' or 'latest' must be present.
+- `max_page_search_size` (Number) Defines the initial page size to use for the composite aggregation for each checkpoint. Default is 500.
+- `metadata` (String) Defines optional transform metadata.
+- `num_failure_retries` (Number) Defines the number of retries on a recoverable failure before the transform task is marked as failed. The default value is the cluster-level setting num_transform_failure_retries.
+- `pivot` (String) The pivot method transforms the data by aggregating and grouping it. JSON definition expected. Either 'pivot' or 'latest' must be present.
+- `retention_policy` (Block List, Max: 1) Defines a retention policy for the transform. (see [below for nested schema](#nestedblock--retention_policy))
+- `sync` (Block List, Max: 1) Defines the properties transforms require to run continuously. (see [below for nested schema](#nestedblock--sync))
+- `timeout` (String) Period to wait for a response from Elastisearch when performing any management operation. If no response is received before the timeout expires, the operation fails and returns an error. Defaults to `30s`.
+- `unattended` (Boolean) In unattended mode, the transform retries indefinitely in case of an error which means the transform never fails.
+
+### Read-Only
+
+- `id` (String) Internal identifier of the resource
+
+
+### Nested Schema for `destination`
+
+Required:
+
+- `index` (String) The destination index for the transform.
+
+Optional:
+
+- `pipeline` (String) The unique identifier for an ingest pipeline.
+
+
+
+### Nested Schema for `source`
+
+Required:
+
+- `indices` (List of String) The source indices for the transform.
+
+Optional:
+
+- `query` (String) A query clause that retrieves a subset of data from the source index.
+- `runtime_mappings` (String) Definitions of search-time runtime fields that can be used by the transform.
+
+
+
+### Nested Schema for `retention_policy`
+
+Required:
+
+- `time` (Block List, Min: 1, Max: 1) Specifies that the transform uses a time field to set the retention policy. This is currently the only supported option. (see [below for nested schema](#nestedblock--retention_policy--time))
+
+
+### Nested Schema for `retention_policy.time`
+
+Required:
+
+- `field` (String) The date field that is used to calculate the age of the document.
+- `max_age` (String) Specifies the maximum age of a document in the destination index.
+
+
+
+
+### Nested Schema for `sync`
+
+Required:
+
+- `time` (Block List, Min: 1, Max: 1) Specifies that the transform uses a time field to synchronize the source and destination indices. This is currently the only supported option. (see [below for nested schema](#nestedblock--sync--time))
+
+
+### Nested Schema for `sync.time`
+
+Required:
+
+- `field` (String) The date field that is used to identify new documents in the source.
+
+Optional:
+
+- `delay` (String) The time delay between the current time and the latest input data time. The default value is 60s.
+
+## Import
+
+Import is supported using the following syntax:
+
+```shell
+terraform import elasticstack_elasticsearch_tranform.my_new_transform /
+```
diff --git a/examples/resources/elasticstack_elasticsearch_transform/import.sh b/examples/resources/elasticstack_elasticsearch_transform/import.sh
new file mode 100644
index 000000000..01a277ec7
--- /dev/null
+++ b/examples/resources/elasticstack_elasticsearch_transform/import.sh
@@ -0,0 +1 @@
+terraform import elasticstack_elasticsearch_tranform.my_new_transform /
diff --git a/examples/resources/elasticstack_elasticsearch_transform/resource.tf b/examples/resources/elasticstack_elasticsearch_transform/resource.tf
new file mode 100644
index 000000000..4baab47ad
--- /dev/null
+++ b/examples/resources/elasticstack_elasticsearch_transform/resource.tf
@@ -0,0 +1,51 @@
+resource "elasticstack_elasticsearch_transform" "transform_with_pivot" {
+ name = "transform-pivot"
+ description = "A meaningful description"
+
+ source {
+ indices = ["names_or_patterns_for_input_index"]
+ }
+
+ destination {
+ index = "destination_index_for_transform"
+ }
+
+ pivot = jsonencode({
+ "group_by" : {
+ "customer_id" : {
+ "terms" : {
+ "field" : "customer_id",
+ "missing_bucket" : true
+ }
+ }
+ },
+ "aggregations" : {
+ "max_price" : {
+ "max" : {
+ "field" : "taxful_total_price"
+ }
+ }
+ }
+ })
+
+ frequency = "5m"
+
+ retention_policy {
+ time {
+ field = "order_date"
+ max_age = "30d"
+ }
+ }
+
+ sync {
+ time {
+ field = "order_date"
+ delay = "10s"
+ }
+ }
+
+ max_page_search_size = 2000
+
+ enabled = false
+ defer_validation = false
+}
diff --git a/internal/clients/elasticsearch/transform.go b/internal/clients/elasticsearch/transform.go
new file mode 100644
index 000000000..f50417e6b
--- /dev/null
+++ b/internal/clients/elasticsearch/transform.go
@@ -0,0 +1,320 @@
+package elasticsearch
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "time"
+
+ "github.com/elastic/go-elasticsearch/v7"
+ "github.com/elastic/go-elasticsearch/v7/esapi"
+ "github.com/elastic/terraform-provider-elasticstack/internal/clients"
+ "github.com/elastic/terraform-provider-elasticstack/internal/models"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/hashicorp/go-version"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/diag"
+)
+
+var transformFeatureMinSupportedVersion = version.Must(version.NewVersion("7.2.0"))
+
+var apiOperationTimeoutParamMinSupportedVersion = version.Must(version.NewVersion("7.17.0"))
+
+func PutTransform(ctx context.Context, apiClient *clients.ApiClient, transform *models.Transform, params *models.PutTransformParams) diag.Diagnostics {
+
+ var diags diag.Diagnostics
+ transformBytes, err := json.Marshal(transform)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ esClient, err := apiClient.GetESClient()
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ serverVersion, diags := apiClient.ServerVersion(ctx)
+ if diags.HasError() {
+ return diags
+ }
+
+ if serverVersion.LessThan(transformFeatureMinSupportedVersion) {
+ diags = append(diags, diag.Diagnostic{
+ Severity: diag.Error,
+ Summary: "Transforms not supported",
+ Detail: fmt.Sprintf(`Transform feature requires a minimum Elasticsearch version of "%s"`, transformFeatureMinSupportedVersion),
+ })
+ return diags
+ }
+
+ withTimeout := serverVersion.GreaterThanOrEqual(apiOperationTimeoutParamMinSupportedVersion)
+
+ putOptions := []func(*esapi.TransformPutTransformRequest){
+ esClient.TransformPutTransform.WithContext(ctx),
+ esClient.TransformPutTransform.WithDeferValidation(params.DeferValidation),
+ }
+
+ if withTimeout {
+ putOptions = append(putOptions, esClient.TransformPutTransform.WithTimeout(params.Timeout))
+ }
+
+ res, err := esClient.TransformPutTransform(bytes.NewReader(transformBytes), transform.Name, putOptions...)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ defer res.Body.Close()
+ if diags := utils.CheckError(res, fmt.Sprintf("Unable to create transform: %s", transform.Name)); diags.HasError() {
+ return diags
+ }
+
+ if params.Enabled {
+
+ var timeout time.Duration
+ if withTimeout {
+ timeout = params.Timeout
+ } else {
+ timeout = 0
+ }
+
+ if diags := startTransform(ctx, esClient, transform.Name, timeout); diags.HasError() {
+ return diags
+ }
+ }
+
+ return diags
+}
+
+func GetTransform(ctx context.Context, apiClient *clients.ApiClient, name *string) (*models.Transform, diag.Diagnostics) {
+
+ var diags diag.Diagnostics
+ esClient, err := apiClient.GetESClient()
+ if err != nil {
+ return nil, diag.FromErr(err)
+ }
+
+ req := esClient.TransformGetTransform.WithTransformID(*name)
+ res, err := esClient.TransformGetTransform(req, esClient.TransformGetTransform.WithContext(ctx))
+ if err != nil {
+ return nil, diag.FromErr(err)
+ }
+
+ defer res.Body.Close()
+ if res.StatusCode == http.StatusNotFound {
+ return nil, nil
+ }
+ if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested transform: %s", *name)); diags.HasError() {
+ return nil, diags
+ }
+
+ var transformsResponse models.GetTransformResponse
+ if err := json.NewDecoder(res.Body).Decode(&transformsResponse); err != nil {
+ return nil, diag.FromErr(err)
+ }
+
+ var foundTransform *models.Transform = nil
+ for _, t := range transformsResponse.Transforms {
+ if t.Id == *name {
+ foundTransform = &t
+ break
+ }
+ }
+
+ if foundTransform == nil {
+ diags = append(diags, diag.Diagnostic{
+ Severity: diag.Error,
+ Summary: "Unable to find the transform in the cluster",
+ Detail: fmt.Sprintf(`Unable to find "%s" transform in the cluster`, *name),
+ })
+
+ return nil, diags
+ }
+
+ foundTransform.Name = *name
+ return foundTransform, diags
+}
+
+func GetTransformStats(ctx context.Context, apiClient *clients.ApiClient, name *string) (*models.TransformStats, diag.Diagnostics) {
+ var diags diag.Diagnostics
+ esClient, err := apiClient.GetESClient()
+ if err != nil {
+ return nil, diag.FromErr(err)
+ }
+
+ getStatsOptions := []func(*esapi.TransformGetTransformStatsRequest){
+ esClient.TransformGetTransformStats.WithContext(ctx),
+ }
+
+ statsRes, err := esClient.TransformGetTransformStats(*name, getStatsOptions...)
+ if err != nil {
+ return nil, diag.FromErr(err)
+ }
+
+ defer statsRes.Body.Close()
+ if diags := utils.CheckError(statsRes, fmt.Sprintf("Unable to get transform stats: %s", *name)); diags.HasError() {
+ return nil, diags
+ }
+
+ var transformsStatsResponse models.GetTransformStatsResponse
+ if err := json.NewDecoder(statsRes.Body).Decode(&transformsStatsResponse); err != nil {
+ return nil, diag.FromErr(err)
+ }
+
+ var foundTransformStats *models.TransformStats = nil
+ for _, ts := range transformsStatsResponse.TransformStats {
+ if ts.Id == *name {
+ foundTransformStats = &ts
+ break
+ }
+ }
+
+ if foundTransformStats == nil {
+ diags = append(diags, diag.Diagnostic{
+ Severity: diag.Error,
+ Summary: "Unable to find the transform stats in the cluster",
+ Detail: fmt.Sprintf(`Unable to find "%s" transform stats in the cluster`, *name),
+ })
+ return nil, diags
+ }
+
+ return foundTransformStats, diags
+}
+
+func UpdateTransform(ctx context.Context, apiClient *clients.ApiClient, transform *models.Transform, params *models.UpdateTransformParams) diag.Diagnostics {
+
+ var diags diag.Diagnostics
+ transformBytes, err := json.Marshal(transform)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ esClient, err := apiClient.GetESClient()
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ serverVersion, diags := apiClient.ServerVersion(ctx)
+ if diags.HasError() {
+ return diags
+ }
+
+ if serverVersion.LessThan(transformFeatureMinSupportedVersion) {
+ diags = append(diags, diag.Diagnostic{
+ Severity: diag.Error,
+ Summary: "Transforms not supported",
+ Detail: fmt.Sprintf(`Transform feature requires a minimum Elasticsearch version of "%s"`, transformFeatureMinSupportedVersion),
+ })
+ return diags
+ }
+
+ withTimeout := serverVersion.GreaterThanOrEqual(apiOperationTimeoutParamMinSupportedVersion)
+
+ updateOptions := []func(*esapi.TransformUpdateTransformRequest){
+ esClient.TransformUpdateTransform.WithContext(ctx),
+ esClient.TransformUpdateTransform.WithDeferValidation(params.DeferValidation),
+ }
+
+ if withTimeout {
+ updateOptions = append(updateOptions, esClient.TransformUpdateTransform.WithTimeout(params.Timeout))
+ }
+
+ res, err := esClient.TransformUpdateTransform(bytes.NewReader(transformBytes), transform.Name, updateOptions...)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ defer res.Body.Close()
+ if diags := utils.CheckError(res, fmt.Sprintf("Unable to update transform: %s", transform.Name)); diags.HasError() {
+ return diags
+ }
+
+ var timeout time.Duration
+ if withTimeout {
+ timeout = params.Timeout
+ } else {
+ timeout = 0
+ }
+
+ if params.ApplyEnabled {
+ if params.Enabled {
+ if diags := startTransform(ctx, esClient, transform.Name, timeout); diags.HasError() {
+ return diags
+ }
+ } else {
+ if diags := stopTransform(ctx, esClient, transform.Name, timeout); diags.HasError() {
+ return diags
+ }
+ }
+ }
+
+ return diags
+}
+
+func DeleteTransform(ctx context.Context, apiClient *clients.ApiClient, name *string) diag.Diagnostics {
+
+ var diags diag.Diagnostics
+ esClient, err := apiClient.GetESClient()
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ res, err := esClient.TransformDeleteTransform(*name, esClient.TransformDeleteTransform.WithForce(true), esClient.TransformDeleteTransform.WithContext(ctx))
+ if err != nil {
+ return diag.FromErr(err)
+ }
+ defer res.Body.Close()
+ if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete transform: %s", *name)); diags.HasError() {
+ return diags
+ }
+
+ return diags
+}
+
+func startTransform(ctx context.Context, esClient *elasticsearch.Client, transformName string, timeout time.Duration) diag.Diagnostics {
+ var diags diag.Diagnostics
+
+ startOptions := []func(*esapi.TransformStartTransformRequest){
+ esClient.TransformStartTransform.WithContext(ctx),
+ }
+
+ if timeout > 0 {
+ startOptions = append(startOptions, esClient.TransformStartTransform.WithTimeout(timeout))
+ }
+
+ startRes, err := esClient.TransformStartTransform(transformName, startOptions...)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ defer startRes.Body.Close()
+ if diags := utils.CheckError(startRes, fmt.Sprintf("Unable to start transform: %s", transformName)); diags.HasError() {
+ return diags
+ }
+
+ return diags
+}
+
+func stopTransform(ctx context.Context, esClient *elasticsearch.Client, transformName string, timeout time.Duration) diag.Diagnostics {
+ var diags diag.Diagnostics
+
+ stopOptions := []func(*esapi.TransformStopTransformRequest){
+ esClient.TransformStopTransform.WithContext(ctx),
+ }
+
+ if timeout > 0 {
+ stopOptions = append(stopOptions, esClient.TransformStopTransform.WithTimeout(timeout))
+ }
+
+ startRes, err := esClient.TransformStopTransform(transformName, stopOptions...)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ defer startRes.Body.Close()
+ if diags := utils.CheckError(startRes, fmt.Sprintf("Unable to stop transform: %s", transformName)); diags.HasError() {
+ return diags
+ }
+
+ return diags
+}
diff --git a/internal/elasticsearch/transform/transform.go b/internal/elasticsearch/transform/transform.go
new file mode 100644
index 000000000..ff376c97c
--- /dev/null
+++ b/internal/elasticsearch/transform/transform.go
@@ -0,0 +1,838 @@
+package transform
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "regexp"
+ "strings"
+ "time"
+
+ "github.com/elastic/terraform-provider-elasticstack/internal/clients"
+ "github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
+ "github.com/elastic/terraform-provider-elasticstack/internal/models"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/hashicorp/go-version"
+ "github.com/hashicorp/terraform-plugin-log/tflog"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/diag"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
+)
+
+var settingsRequiredVersions map[string]*version.Version
+
+func init() {
+ settingsRequiredVersions = make(map[string]*version.Version)
+
+ // capabilities
+ settingsRequiredVersions["destination.pipeline"] = version.Must(version.NewVersion("7.3.0"))
+ settingsRequiredVersions["frequency"] = version.Must(version.NewVersion("7.3.0"))
+ settingsRequiredVersions["latest"] = version.Must(version.NewVersion("7.11.0"))
+ settingsRequiredVersions["retention_policy"] = version.Must(version.NewVersion("7.12.0"))
+ settingsRequiredVersions["source.runtime_mappings"] = version.Must(version.NewVersion("7.12.0"))
+ settingsRequiredVersions["metadata"] = version.Must(version.NewVersion("7.16.0"))
+
+ // settings
+ settingsRequiredVersions["docs_per_second"] = version.Must(version.NewVersion("7.8.0"))
+ settingsRequiredVersions["max_page_search_size"] = version.Must(version.NewVersion("7.8.0"))
+ settingsRequiredVersions["dates_as_epoch_millis"] = version.Must(version.NewVersion("7.11.0"))
+ settingsRequiredVersions["align_checkpoints"] = version.Must(version.NewVersion("7.15.0"))
+ settingsRequiredVersions["deduce_mappings"] = version.Must(version.NewVersion("8.1.0"))
+ settingsRequiredVersions["num_failure_retries"] = version.Must(version.NewVersion("8.4.0"))
+ settingsRequiredVersions["unattended"] = version.Must(version.NewVersion("8.5.0"))
+}
+
+func ResourceTransform() *schema.Resource {
+ transformSchema := map[string]*schema.Schema{
+ "id": {
+ Description: "Internal identifier of the resource",
+ Type: schema.TypeString,
+ Computed: true,
+ },
+ "name": {
+ Description: "Name of the transform you wish to create.",
+ Type: schema.TypeString,
+ Required: true,
+ ForceNew: true,
+ ValidateFunc: validation.All(
+ validation.StringLenBetween(1, 64),
+ validation.StringMatch(regexp.MustCompile(`^[a-z0-9_-]+$`), "must contain only lower case alphanumeric characters, hyphens, and underscores"),
+ validation.StringMatch(regexp.MustCompile(`^[a-z0-9].*[a-z0-9]$`), "must start and end with a lowercase alphanumeric character"),
+ ),
+ },
+ "description": {
+ Description: "Free text description of the transform.",
+ Type: schema.TypeString,
+ Optional: true,
+ },
+ "source": {
+ Description: "The source of the data for the transform.",
+ Type: schema.TypeList,
+ Required: true,
+ MaxItems: 1,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "indices": {
+ Description: "The source indices for the transform.",
+ Type: schema.TypeList,
+ Required: true,
+ Elem: &schema.Schema{
+ Type: schema.TypeString,
+ },
+ },
+ "query": {
+ Description: "A query clause that retrieves a subset of data from the source index.",
+ Type: schema.TypeString,
+ Optional: true,
+ Default: `{"match_all":{}}`,
+ DiffSuppressFunc: utils.DiffJsonSuppress,
+ ValidateFunc: validation.StringIsJSON,
+ },
+ "runtime_mappings": {
+ Description: "Definitions of search-time runtime fields that can be used by the transform.",
+ Type: schema.TypeString,
+ Optional: true,
+ DiffSuppressFunc: utils.DiffJsonSuppress,
+ ValidateFunc: validation.StringIsJSON,
+ },
+ },
+ },
+ },
+ "destination": {
+ Description: "The destination for the transform.",
+ Type: schema.TypeList,
+ Required: true,
+ MaxItems: 1,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "index": {
+ Description: "The destination index for the transform.",
+ Type: schema.TypeString,
+ Required: true,
+ ValidateFunc: validation.All(
+ validation.StringLenBetween(1, 255),
+ validation.StringNotInSlice([]string{".", ".."}, true),
+ validation.StringMatch(regexp.MustCompile(`^[^-_+]`), "cannot start with -, _, +"),
+ validation.StringMatch(regexp.MustCompile(`^[a-z0-9!$%&'()+.;=@[\]^{}~_-]+$`), "must contain lower case alphanumeric characters and selected punctuation, see: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params"),
+ ),
+ },
+ "pipeline": {
+ Description: "The unique identifier for an ingest pipeline.",
+ Type: schema.TypeString,
+ Optional: true,
+ },
+ },
+ },
+ },
+ "pivot": {
+ Description: "The pivot method transforms the data by aggregating and grouping it. JSON definition expected. Either 'pivot' or 'latest' must be present.",
+ Type: schema.TypeString,
+ Optional: true,
+ ExactlyOneOf: []string{"pivot", "latest"},
+ DiffSuppressFunc: utils.DiffJsonSuppress,
+ ValidateFunc: validation.StringIsJSON,
+ ForceNew: true,
+ },
+ "latest": {
+ Description: "The latest method transforms the data by finding the latest document for each unique key. JSON definition expected. Either 'pivot' or 'latest' must be present.",
+ Type: schema.TypeString,
+ Optional: true,
+ ExactlyOneOf: []string{"pivot", "latest"},
+ DiffSuppressFunc: utils.DiffJsonSuppress,
+ ValidateFunc: validation.StringIsJSON,
+ ForceNew: true,
+ },
+ "frequency": {
+ Type: schema.TypeString,
+ Description: "The interval between checks for changes in the source indices when the transform is running continuously. Defaults to `1m`.",
+ Optional: true,
+ Default: "1m",
+ ValidateFunc: utils.StringIsElasticDuration,
+ },
+ "metadata": {
+ Description: "Defines optional transform metadata.",
+ Type: schema.TypeString,
+ Optional: true,
+ ValidateFunc: validation.StringIsJSON,
+ DiffSuppressFunc: utils.DiffJsonSuppress,
+ },
+ "retention_policy": {
+ Description: "Defines a retention policy for the transform.",
+ Type: schema.TypeList,
+ Optional: true,
+ MaxItems: 1,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "time": {
+ Description: "Specifies that the transform uses a time field to set the retention policy. This is currently the only supported option.",
+ Type: schema.TypeList,
+ Required: true,
+ MaxItems: 1,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "field": {
+ Description: "The date field that is used to calculate the age of the document.",
+ Type: schema.TypeString,
+ Required: true,
+ ValidateFunc: validation.StringIsNotWhiteSpace,
+ },
+ "max_age": {
+ Description: "Specifies the maximum age of a document in the destination index.",
+ Type: schema.TypeString,
+ Required: true,
+ ValidateFunc: utils.StringIsElasticDuration,
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ "sync": {
+ Description: "Defines the properties transforms require to run continuously.",
+ Type: schema.TypeList,
+ Optional: true,
+ MaxItems: 1,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "time": {
+ Description: "Specifies that the transform uses a time field to synchronize the source and destination indices. This is currently the only supported option.",
+ Type: schema.TypeList,
+ Required: true,
+ MaxItems: 1,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "field": {
+ Description: "The date field that is used to identify new documents in the source.",
+ Type: schema.TypeString,
+ Required: true,
+ ValidateFunc: validation.StringIsNotWhiteSpace,
+ },
+ "delay": {
+ Description: "The time delay between the current time and the latest input data time. The default value is 60s.",
+ Type: schema.TypeString,
+ Optional: true,
+ Default: "60s",
+ ValidateFunc: utils.StringIsElasticDuration,
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ "align_checkpoints": {
+ Description: "Specifies whether the transform checkpoint ranges should be optimized for performance.",
+ Type: schema.TypeBool,
+ Optional: true,
+ },
+ "dates_as_epoch_millis": {
+ Description: "Defines if dates in the output should be written as ISO formatted string (default) or as millis since epoch.",
+ Type: schema.TypeBool,
+ Optional: true,
+ },
+ "deduce_mappings": {
+ Description: "Specifies whether the transform should deduce the destination index mappings from the transform config.",
+ Type: schema.TypeBool,
+ Optional: true,
+ },
+ "docs_per_second": {
+ Description: "Specifies a limit on the number of input documents per second. Default (unset) value disables throttling.",
+ Type: schema.TypeFloat,
+ Optional: true,
+ ValidateFunc: validation.FloatAtLeast(0),
+ },
+ "max_page_search_size": {
+ Description: "Defines the initial page size to use for the composite aggregation for each checkpoint. Default is 500.",
+ Type: schema.TypeInt,
+ Optional: true,
+ ValidateFunc: validation.IntBetween(10, 65536),
+ },
+ "num_failure_retries": {
+ Description: "Defines the number of retries on a recoverable failure before the transform task is marked as failed. The default value is the cluster-level setting num_transform_failure_retries.",
+ Type: schema.TypeInt,
+ Optional: true,
+ ValidateFunc: validation.IntBetween(-1, 100),
+ },
+ "unattended": {
+ Description: "In unattended mode, the transform retries indefinitely in case of an error which means the transform never fails.",
+ Type: schema.TypeBool,
+ Optional: true,
+ },
+ "defer_validation": {
+ Type: schema.TypeBool,
+ Description: "When true, deferrable validations are not run upon creation, but rather when the transform is started. This behavior may be desired if the source index does not exist until after the transform is created. Default is `false`",
+ Optional: true,
+ Default: false,
+ },
+ "timeout": {
+ Type: schema.TypeString,
+ Description: "Period to wait for a response from Elastisearch when performing any management operation. If no response is received before the timeout expires, the operation fails and returns an error. Defaults to `30s`.",
+ Optional: true,
+ Default: "30s",
+ ValidateFunc: utils.StringIsDuration,
+ },
+ "enabled": {
+ Type: schema.TypeBool,
+ Description: "Controls wether the transform should be started or stopped. Default is `false` (stopped).",
+ Optional: true,
+ Default: false,
+ },
+ }
+
+ return &schema.Resource{
+ Schema: transformSchema,
+ Description: "Manages Elasticsearch transforms. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/transforms.html",
+
+ CreateContext: resourceTransformCreate,
+ ReadContext: resourceTransformRead,
+ UpdateContext: resourceTransformUpdate,
+ DeleteContext: resourceTransformDelete,
+
+ Importer: &schema.ResourceImporter{
+ StateContext: schema.ImportStatePassthroughContext,
+ },
+ }
+}
+
+func resourceTransformCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
+
+ client, diags := clients.NewApiClient(d, meta)
+ if diags.HasError() {
+ return diags
+ }
+
+ transformName := d.Get("name").(string)
+ id, diags := client.ID(ctx, transformName)
+ if diags.HasError() {
+ return diags
+ }
+
+ serverVersion, diags := client.ServerVersion(ctx)
+ if diags.HasError() {
+ return diags
+ }
+
+ transform, err := getTransformFromResourceData(ctx, d, transformName, serverVersion)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ timeout, err := time.ParseDuration(d.Get("timeout").(string))
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ params := models.PutTransformParams{
+ DeferValidation: d.Get("defer_validation").(bool),
+ Enabled: d.Get("enabled").(bool),
+ Timeout: timeout,
+ }
+
+ if diags := elasticsearch.PutTransform(ctx, client, transform, ¶ms); diags.HasError() {
+ return diags
+ }
+
+ d.SetId(id.String())
+ return resourceTransformRead(ctx, d, meta)
+}
+
+func resourceTransformRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
+
+ client, diags := clients.NewApiClient(d, meta)
+ if diags.HasError() {
+ return diags
+ }
+ compId, diags := clients.CompositeIdFromStr(d.Id())
+ if diags.HasError() {
+ return diags
+ }
+
+ transformName := compId.ResourceId
+ if err := d.Set("name", transformName); err != nil {
+ return diag.FromErr(err)
+ }
+
+ // actual resource state is established from two sources: the transform definition (model) and the transform stats
+ // 1. read transform definition
+ transform, diags := elasticsearch.GetTransform(ctx, client, &transformName)
+ if transform == nil && diags == nil {
+ tflog.Warn(ctx, fmt.Sprintf(`Transform "%s" not found, removing from state`, compId.ResourceId))
+ d.SetId("")
+ return diags
+ }
+ if diags.HasError() {
+ return diags
+ }
+
+ if err := updateResourceDataFromModel(ctx, d, transform); err != nil {
+ return diag.FromErr(err)
+ }
+
+ // 2. read transform stats
+ transformStats, diags := elasticsearch.GetTransformStats(ctx, client, &transformName)
+ if diags.HasError() {
+ return diags
+ }
+
+ if err := updateResourceDataFromStats(ctx, d, transformStats); err != nil {
+ return diag.FromErr(err)
+ }
+
+ return diags
+}
+
+func resourceTransformUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
+
+ client, diags := clients.NewApiClient(d, meta)
+ if diags.HasError() {
+ return diags
+ }
+
+ transformName := d.Get("name").(string)
+ _, diags = client.ID(ctx, transformName)
+ if diags.HasError() {
+ return diags
+ }
+
+ serverVersion, diags := client.ServerVersion(ctx)
+ if diags.HasError() {
+ return diags
+ }
+
+ updatedTransform, err := getTransformFromResourceData(ctx, d, transformName, serverVersion)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ // pivot and latest cannot be updated; sending them to the API for an update operation would result in an error
+ updatedTransform.Pivot = nil
+ updatedTransform.Latest = nil
+
+ timeout, err := time.ParseDuration(d.Get("timeout").(string))
+ if err != nil {
+ return diag.FromErr(err)
+ }
+
+ params := models.UpdateTransformParams{
+ DeferValidation: d.Get("defer_validation").(bool),
+ Timeout: timeout,
+ Enabled: d.Get("enabled").(bool),
+ ApplyEnabled: d.HasChange("enabled"),
+ }
+
+ if diags := elasticsearch.UpdateTransform(ctx, client, updatedTransform, ¶ms); diags.HasError() {
+ return diags
+ }
+
+ return resourceTransformRead(ctx, d, meta)
+}
+
+func resourceTransformDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
+
+ client, diags := clients.NewApiClient(d, meta)
+ if diags.HasError() {
+ return diags
+ }
+
+ id := d.Id()
+ compId, diags := clients.CompositeIdFromStr(id)
+ if diags.HasError() {
+ return diags
+ }
+
+ if diags := elasticsearch.DeleteTransform(ctx, client, &compId.ResourceId); diags.HasError() {
+ return diags
+ }
+
+ return diags
+}
+
+func getTransformFromResourceData(ctx context.Context, d *schema.ResourceData, name string, serverVersion *version.Version) (*models.Transform, error) {
+
+ var transform models.Transform
+ transform.Name = name
+
+ if v, ok := d.GetOk("description"); ok {
+ transform.Description = v.(string)
+ }
+
+ if v, ok := d.GetOk("source"); ok {
+ definedSource := v.([]interface{})[0].(map[string]interface{})
+
+ transform.Source = new(models.TransformSource)
+ indices := make([]string, 0)
+ for _, i := range definedSource["indices"].([]interface{}) {
+ indices = append(indices, i.(string))
+ }
+ transform.Source.Indices = indices
+
+ if v, ok := definedSource["query"]; ok && len(v.(string)) > 0 {
+ var query interface{}
+ if err := json.NewDecoder(strings.NewReader(v.(string))).Decode(&query); err != nil {
+ return nil, err
+ }
+ transform.Source.Query = query
+ }
+
+ if v, ok := definedSource["runtime_mappings"]; ok && len(v.(string)) > 0 && isSettingAllowed(ctx, "source.runtime_mappings", serverVersion) {
+ var runtimeMappings interface{}
+ if err := json.NewDecoder(strings.NewReader(v.(string))).Decode(&runtimeMappings); err != nil {
+ return nil, err
+ }
+ transform.Source.RuntimeMappings = runtimeMappings
+ }
+ }
+
+ if v, ok := d.GetOk("destination"); ok {
+
+ definedDestination := v.([]interface{})[0].(map[string]interface{})
+
+ transform.Destination = &models.TransformDestination{
+ Index: definedDestination["index"].(string),
+ }
+
+ if pipeline, ok := definedDestination["pipeline"]; ok && isSettingAllowed(ctx, "destination.pipeline", serverVersion) {
+ transform.Destination.Pipeline = pipeline.(string)
+ }
+ }
+
+ if v, ok := d.GetOk("pivot"); ok {
+ var pivot interface{}
+ if err := json.NewDecoder(strings.NewReader(v.(string))).Decode(&pivot); err != nil {
+ return nil, err
+ }
+ transform.Pivot = pivot
+ }
+
+ if v, ok := d.GetOk("latest"); ok && isSettingAllowed(ctx, "latest", serverVersion) {
+ var latest interface{}
+ if err := json.NewDecoder(strings.NewReader(v.(string))).Decode(&latest); err != nil {
+ return nil, err
+ }
+ transform.Latest = latest
+ }
+
+ if v, ok := d.GetOk("frequency"); ok && isSettingAllowed(ctx, "frequency", serverVersion) {
+ transform.Frequency = v.(string)
+ }
+
+ if v, ok := d.GetOk("metadata"); ok && isSettingAllowed(ctx, "metadata", serverVersion) {
+ var metadata map[string]interface{}
+ if err := json.NewDecoder(strings.NewReader(v.(string))).Decode(&metadata); err != nil {
+ return nil, err
+ }
+ transform.Meta = metadata
+ }
+
+ if v, ok := d.GetOk("retention_policy"); ok && isSettingAllowed(ctx, "retention_policy", serverVersion) {
+ definedRetentionPolicy := v.([]interface{})[0].(map[string]interface{})
+
+ if v, ok := definedRetentionPolicy["time"]; ok {
+ retentionTime := models.TransformRetentionPolicyTime{}
+ var definedRetentionTime = v.([]interface{})[0].(map[string]interface{})
+ if f, ok := definedRetentionTime["field"]; ok {
+ retentionTime.Field = f.(string)
+ }
+ if ma, ok := definedRetentionTime["max_age"]; ok {
+ retentionTime.MaxAge = ma.(string)
+ }
+ transform.RetentionPolicy = &models.TransformRetentionPolicy{
+ Time: retentionTime,
+ }
+ }
+ }
+
+ if v, ok := d.GetOk("sync"); ok {
+ definedSync := v.([]interface{})[0].(map[string]interface{})
+
+ if v, ok := definedSync["time"]; ok {
+ syncTime := models.TransformSyncTime{}
+ var definedSyncTime = v.([]interface{})[0].(map[string]interface{})
+ if f, ok := definedSyncTime["field"]; ok {
+ syncTime.Field = f.(string)
+ }
+ if d, ok := definedSyncTime["delay"]; ok {
+ syncTime.Delay = d.(string)
+ }
+ transform.Sync = &models.TransformSync{
+ Time: syncTime,
+ }
+ }
+ }
+
+ // settings
+ settings := models.TransformSettings{}
+ setSettings := false
+
+ if v, ok := d.GetOk("align_checkpoints"); ok && isSettingAllowed(ctx, "align_checkpoints", serverVersion) {
+ setSettings = true
+ ac := v.(bool)
+ settings.AlignCheckpoints = &ac
+ }
+ if v, ok := d.GetOk("dates_as_epoch_millis"); ok && isSettingAllowed(ctx, "dates_as_epoch_millis", serverVersion) {
+ setSettings = true
+ dem := v.(bool)
+ settings.DatesAsEpochMillis = &dem
+ }
+ if v, ok := d.GetOk("deduce_mappings"); ok && isSettingAllowed(ctx, "deduce_mappings", serverVersion) {
+ setSettings = true
+ dm := v.(bool)
+ settings.DeduceMappings = &dm
+ }
+ if v, ok := d.GetOk("docs_per_second"); ok && isSettingAllowed(ctx, "docs_per_second", serverVersion) {
+ setSettings = true
+ dps := v.(float64)
+ settings.DocsPerSecond = &dps
+ }
+ if v, ok := d.GetOk("max_page_search_size"); ok && isSettingAllowed(ctx, "max_page_search_size", serverVersion) {
+ setSettings = true
+ mpss := v.(int)
+ settings.MaxPageSearchSize = &mpss
+ }
+ if v, ok := d.GetOk("num_failure_retries"); ok && isSettingAllowed(ctx, "num_failure_retries", serverVersion) {
+ setSettings = true
+ nfr := v.(int)
+ settings.NumFailureRetries = &nfr
+ }
+ if v, ok := d.GetOk("unattended"); ok && isSettingAllowed(ctx, "unattended", serverVersion) {
+ setSettings = true
+ u := v.(bool)
+ settings.Unattended = &u
+ }
+
+ if setSettings {
+ transform.Settings = &settings
+ }
+
+ return &transform, nil
+}
+
+func updateResourceDataFromModel(ctx context.Context, d *schema.ResourceData, transform *models.Transform) error {
+
+ // transform.Description
+ if err := d.Set("description", transform.Description); err != nil {
+ return err
+ }
+
+ // transform.Source
+ if err := d.Set("source", flattenSource(transform.Source)); err != nil {
+ return err
+ }
+
+ // transform.Destination
+ if err := d.Set("destination", flattenDestination(transform.Destination)); err != nil {
+ return err
+ }
+
+ // transform.Pivot
+ if transform.Pivot != nil {
+ pivot, err := json.Marshal(transform.Pivot)
+ if err != nil {
+ return err
+ }
+ if err := d.Set("pivot", string(pivot)); err != nil {
+ return err
+ }
+ }
+
+ // transform.Latest
+ if transform.Latest != nil {
+ latest, err := json.Marshal(transform.Latest)
+ if err != nil {
+ return err
+ }
+ if err := d.Set("latest", string(latest)); err != nil {
+ return err
+ }
+ }
+
+ // transform.Frequency
+ if err := d.Set("frequency", transform.Frequency); err != nil {
+ return err
+ }
+
+ // transform.Sync
+ if err := d.Set("sync", flattenSync(transform.Sync)); err != nil {
+ return err
+ }
+
+ // transform.RetentionPolicy
+ if err := d.Set("retention_policy", flattenRetentionPolicy(transform.RetentionPolicy)); err != nil {
+ return err
+ }
+
+ // transform.Settings
+ if transform.Settings != nil && transform.Settings.AlignCheckpoints != nil {
+ if err := d.Set("align_checkpoints", *(transform.Settings.AlignCheckpoints)); err != nil {
+ return err
+ }
+ }
+
+ if transform.Settings != nil && transform.Settings.DatesAsEpochMillis != nil {
+ if err := d.Set("dates_as_epoch_millis", *(transform.Settings.DatesAsEpochMillis)); err != nil {
+ return err
+ }
+ }
+
+ if transform.Settings != nil && transform.Settings.DeduceMappings != nil {
+ if err := d.Set("deduce_mappings", *(transform.Settings.DeduceMappings)); err != nil {
+ return err
+ }
+ }
+
+ if transform.Settings != nil && transform.Settings.DocsPerSecond != nil {
+ if err := d.Set("docs_per_second", *(transform.Settings.DocsPerSecond)); err != nil {
+ return err
+ }
+ }
+
+ if transform.Settings != nil && transform.Settings.MaxPageSearchSize != nil {
+ if err := d.Set("max_page_search_size", *(transform.Settings.MaxPageSearchSize)); err != nil {
+ return err
+ }
+ }
+
+ if transform.Settings != nil && transform.Settings.NumFailureRetries != nil {
+ if err := d.Set("num_failure_retries", *(transform.Settings.NumFailureRetries)); err != nil {
+ return err
+ }
+ }
+
+ if transform.Settings != nil && transform.Settings.Unattended != nil {
+ if err := d.Set("unattended", *(transform.Settings.Unattended)); err != nil {
+ return err
+ }
+ }
+
+ // transform.Meta
+ if transform.Meta == nil {
+ if err := d.Set("metadata", nil); err != nil {
+ return err
+ }
+ } else {
+ meta, err := json.Marshal(transform.Meta)
+ if err != nil {
+ return err
+ }
+
+ if err := d.Set("metadata", string(meta)); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func updateResourceDataFromStats(ctx context.Context, d *schema.ResourceData, transformStats *models.TransformStats) error {
+
+ // transform.Enabled
+ if err := d.Set("enabled", transformStats.IsStarted()); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func flattenSource(source *models.TransformSource) []interface{} {
+ if source == nil {
+ return []interface{}{}
+ }
+
+ s := make(map[string]interface{})
+
+ if source.Indices != nil {
+ s["indices"] = source.Indices
+ }
+
+ if source.Query != nil {
+ query, err := json.Marshal(source.Query)
+ if err != nil {
+ return []interface{}{}
+ }
+ if len(query) > 0 {
+ s["query"] = string(query)
+ }
+ }
+
+ if source.RuntimeMappings != nil {
+ rm, err := json.Marshal(source.RuntimeMappings)
+ if err != nil {
+ return []interface{}{}
+ }
+ if len(rm) > 0 {
+ s["runtime_mappings"] = string(rm)
+ }
+ }
+
+ return []interface{}{s}
+}
+
+func flattenDestination(dest *models.TransformDestination) []interface{} {
+ if dest == nil {
+ return []interface{}{}
+ }
+
+ d := make(map[string]interface{})
+
+ d["index"] = dest.Index
+
+ if dest.Pipeline != "" {
+ d["pipeline"] = dest.Pipeline
+ }
+
+ return []interface{}{d}
+}
+
+func flattenSync(sync *models.TransformSync) []interface{} {
+ if sync == nil {
+ return nil
+ }
+
+ time := make(map[string]interface{})
+
+ if sync.Time.Delay != "" {
+ time["delay"] = sync.Time.Delay
+ }
+
+ if sync.Time.Field != "" {
+ time["field"] = sync.Time.Field
+ }
+
+ s := make(map[string]interface{})
+ s["time"] = []interface{}{time}
+
+ return []interface{}{s}
+}
+
+func flattenRetentionPolicy(retention *models.TransformRetentionPolicy) []interface{} {
+ if retention == nil {
+ return []interface{}{}
+ }
+
+ time := make(map[string]interface{})
+
+ if retention.Time.MaxAge != "" {
+ time["max_age"] = retention.Time.MaxAge
+ }
+
+ if retention.Time.Field != "" {
+ time["field"] = retention.Time.Field
+ }
+
+ r := make(map[string]interface{})
+ r["time"] = []interface{}{time}
+
+ return []interface{}{r}
+}
+
+func isSettingAllowed(ctx context.Context, settingName string, serverVersion *version.Version) bool {
+ if minVersion, ok := settingsRequiredVersions[settingName]; ok {
+ if serverVersion.LessThan(minVersion) {
+ tflog.Warn(ctx, fmt.Sprintf("Setting [%s] not allowed for Elasticsearch server version %v; min required is %v", settingName, *serverVersion, *minVersion))
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/internal/elasticsearch/transform/transform_test.go b/internal/elasticsearch/transform/transform_test.go
new file mode 100644
index 000000000..0de736470
--- /dev/null
+++ b/internal/elasticsearch/transform/transform_test.go
@@ -0,0 +1,387 @@
+package transform_test
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/elastic/terraform-provider-elasticstack/internal/acctest"
+ "github.com/elastic/terraform-provider-elasticstack/internal/clients"
+ sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
+)
+
+func TestAccResourceTransformWithPivot(t *testing.T) {
+
+ transformNamePivot := sdkacctest.RandStringFromCharSet(18, sdkacctest.CharSetAlphaNum)
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceTransformDestroy,
+ ProtoV5ProviderFactories: acctest.Providers,
+ Steps: []resource.TestStep{
+ {
+ Config: testAccResourceTransformWithPivotCreate(transformNamePivot),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "name", transformNamePivot),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "description", "test description"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "source.0.indices.0", "source_index_for_transform"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "destination.0.index", "dest_index_for_transform"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "frequency", "5m"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "max_page_search_size", "2000"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "enabled", "false"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "sync.0.time.0.field", "order_date"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "sync.0.time.0.delay", "20s"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "defer_validation", "true"),
+ resource.TestCheckNoResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "latest"),
+ ),
+ },
+ {
+ Config: testAccResourceTransformWithPivotUpdate(transformNamePivot),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "name", transformNamePivot),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "description", "yet another test description"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "source.0.indices.0", "source_index_for_transform"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "source.0.indices.1", "additional_index"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "destination.0.index", "dest_index_for_transform_v2"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "frequency", "10m"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "max_page_search_size", "2000"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "enabled", "true"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "retention_policy.0.time.0.field", "order_date"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "retention_policy.0.time.0.max_age", "7d"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "defer_validation", "true"),
+ resource.TestCheckNoResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "latest"),
+ ),
+ },
+ },
+ })
+}
+
+func TestAccResourceTransformWithLatest(t *testing.T) {
+
+ transformNameLatest := sdkacctest.RandStringFromCharSet(20, sdkacctest.CharSetAlphaNum)
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceTransformDestroy,
+ ProtoV5ProviderFactories: acctest.Providers,
+ Steps: []resource.TestStep{
+ {
+ Config: testAccResourceTransformWithLatestCreate(transformNameLatest),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_latest", "name", transformNameLatest),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_latest", "description", "test description (latest)"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_latest", "source.0.indices.0", "source_index_for_transform"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_latest", "destination.0.index", "dest_index_for_transform"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_latest", "frequency", "2m"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_latest", "defer_validation", "true"),
+ resource.TestCheckNoResourceAttr("elasticstack_elasticsearch_transform.test_latest", "pivot"),
+ ),
+ },
+ },
+ })
+}
+
+func TestAccResourceTransformNoDefer(t *testing.T) {
+
+ transformName := sdkacctest.RandStringFromCharSet(18, sdkacctest.CharSetAlphaNum)
+ indexName := sdkacctest.RandStringFromCharSet(22, sdkacctest.CharSetAlphaNum)
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceTransformDestroy,
+ ProtoV5ProviderFactories: acctest.Providers,
+ Steps: []resource.TestStep{
+ {
+ Config: testAccResourceTransformNoDeferCreate(transformName, indexName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "name", transformName),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "description", "test description"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "source.0.indices.0", indexName),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "destination.0.index", "dest_index_for_transform"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "frequency", "5m"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_transform.test_pivot", "defer_validation", "false"),
+ ),
+ },
+ },
+ })
+}
+
+// create a transform referencing a non-existing source index;
+// because validations are deferred, this should pass
+func testAccResourceTransformWithPivotCreate(name string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+}
+
+resource "elasticstack_elasticsearch_transform" "test_pivot" {
+ name = "%s"
+ description = "test description"
+
+ source {
+ indices = ["source_index_for_transform"]
+ }
+
+ destination {
+ index = "dest_index_for_transform"
+ }
+
+ pivot = jsonencode({
+ "group_by": {
+ "customer_id": {
+ "terms": {
+ "field": "customer_id",
+ "missing_bucket": true
+ }
+ }
+ },
+ "aggregations": {
+ "max_price": {
+ "max": {
+ "field": "taxful_total_price"
+ }
+ }
+ }
+ })
+
+ sync {
+ time {
+ field = "order_date"
+ delay = "20s"
+ }
+ }
+
+ max_page_search_size = 2000
+ frequency = "5m"
+ enabled = false
+
+ defer_validation = true
+ timeout = "1m"
+}
+ `, name)
+}
+
+// update the existing transform, add another source index and start it (enabled = true)
+// validations are now unavoidable (at start), so make sure to create the indices _before_ updating the transform
+// the tf script below uses implicit dependency, but `depends_on` is also an option
+func testAccResourceTransformWithPivotUpdate(name string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+}
+
+resource "elasticstack_elasticsearch_index" "test_source_index_1" {
+ name = "source_index_for_transform"
+
+ alias {
+ name = "test_alias_1"
+ }
+
+ mappings = jsonencode({
+ properties = {
+ field1 = { type = "text" }
+ }
+ })
+
+ deletion_protection = false
+ wait_for_active_shards = "all"
+ master_timeout = "1m"
+ timeout = "1m"
+}
+
+resource "elasticstack_elasticsearch_index" "test_source_index_2" {
+ name = "additional_index"
+
+ alias {
+ name = "test_alias_2"
+ }
+
+ mappings = jsonencode({
+ properties = {
+ field1 = { type = "text" }
+ }
+ })
+
+ deletion_protection = false
+ wait_for_active_shards = "all"
+ master_timeout = "1m"
+ timeout = "1m"
+}
+
+resource "elasticstack_elasticsearch_transform" "test_pivot" {
+ name = "%s"
+ description = "yet another test description"
+
+ source {
+ indices = [
+ elasticstack_elasticsearch_index.test_source_index_1.name,
+ elasticstack_elasticsearch_index.test_source_index_2.name
+ ]
+ }
+
+ destination {
+ index = "dest_index_for_transform_v2"
+ }
+
+ pivot = jsonencode({
+ "group_by": {
+ "customer_id": {
+ "terms": {
+ "field": "customer_id",
+ "missing_bucket": true
+ }
+ }
+ },
+ "aggregations": {
+ "max_price": {
+ "max": {
+ "field": "taxful_total_price"
+ }
+ }
+ }
+ })
+
+ sync {
+ time {
+ field = "order_date"
+ delay = "20s"
+ }
+ }
+
+ retention_policy {
+ time {
+ field = "order_date"
+ max_age = "7d"
+ }
+ }
+
+ max_page_search_size = 2000
+ frequency = "10m"
+ enabled = true
+
+ defer_validation = true
+ timeout = "1m"
+}
+ `, name)
+}
+
+func testAccResourceTransformWithLatestCreate(name string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+}
+
+resource "elasticstack_elasticsearch_transform" "test_latest" {
+ name = "%s"
+ description = "test description (latest)"
+
+ source {
+ indices = ["source_index_for_transform"]
+ }
+
+ destination {
+ index = "dest_index_for_transform"
+ }
+
+ latest = jsonencode({
+ "unique_key": ["customer_id"],
+ "sort": "order_date"
+ })
+ frequency = "2m"
+ enabled = false
+
+ defer_validation = true
+ timeout = "1m"
+}
+ `, name)
+}
+
+func testAccResourceTransformNoDeferCreate(transformName, indexName string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+}
+
+resource "elasticstack_elasticsearch_index" "test_index" {
+ name = "%s"
+
+ alias {
+ name = "test_alias_1"
+ }
+
+ mappings = jsonencode({
+ properties = {
+ field1 = { type = "text" }
+ }
+ })
+
+ deletion_protection = false
+ wait_for_active_shards = "all"
+ master_timeout = "1m"
+ timeout = "1m"
+}
+
+resource "elasticstack_elasticsearch_transform" "test_pivot" {
+ name = "%s"
+ description = "test description"
+
+ source {
+ indices = [elasticstack_elasticsearch_index.test_index.name]
+ }
+
+ destination {
+ index = "dest_index_for_transform"
+ }
+
+ pivot = jsonencode({
+ "group_by": {
+ "customer_id": {
+ "terms": {
+ "field": "customer_id",
+ "missing_bucket": true
+ }
+ }
+ },
+ "aggregations": {
+ "max_price": {
+ "max": {
+ "field": "taxful_total_price"
+ }
+ }
+ }
+ })
+ frequency = "5m"
+ enabled = false
+
+ defer_validation = false
+ timeout = "1m"
+}
+ `, indexName, transformName)
+}
+
+func checkResourceTransformDestroy(s *terraform.State) error {
+ client, err := clients.NewAcceptanceTestingClient()
+ if err != nil {
+ return err
+ }
+
+ for _, rs := range s.RootModule().Resources {
+ if rs.Type != "elasticstack_elasticsearch_transform" {
+ continue
+ }
+ compId, _ := clients.CompositeIdFromStr(rs.Primary.ID)
+
+ esClient, err := client.GetESClient()
+ if err != nil {
+ return err
+ }
+ req := esClient.TransformGetTransform.WithTransformID(compId.ResourceId)
+ res, err := esClient.TransformGetTransform(req)
+ if err != nil {
+ return err
+ }
+
+ if res.StatusCode != 404 {
+ return fmt.Errorf("Transform (%s) still exists", compId.ResourceId)
+ }
+ }
+ return nil
+}
diff --git a/internal/models/transform.go b/internal/models/transform.go
new file mode 100644
index 000000000..2973219e8
--- /dev/null
+++ b/internal/models/transform.go
@@ -0,0 +1,92 @@
+package models
+
+import (
+ "encoding/json"
+ "time"
+)
+
+type Transform struct {
+ Id string `json:"id,omitempty"`
+ Name string `json:"-"`
+ Description string `json:"description,omitempty"`
+ Source *TransformSource `json:"source"`
+ Destination *TransformDestination `json:"dest"`
+ Pivot interface{} `json:"pivot,omitempty"`
+ Latest interface{} `json:"latest,omitempty"`
+ Frequency string `json:"frequency,omitempty"`
+ RetentionPolicy *TransformRetentionPolicy `json:"retention_policy,omitempty"`
+ Sync *TransformSync `json:"sync,omitempty"`
+ Meta interface{} `json:"_meta,omitempty"`
+ Settings *TransformSettings `json:"settings,omitempty"`
+}
+
+type TransformSource struct {
+ Indices []string `json:"index"`
+ Query interface{} `json:"query,omitempty"`
+ RuntimeMappings interface{} `json:"runtime_mappings,omitempty"`
+}
+
+type TransformDestination struct {
+ Index string `json:"index"`
+ Pipeline string `json:"pipeline,omitempty"`
+}
+
+type TransformRetentionPolicy struct {
+ Time TransformRetentionPolicyTime `json:"time"`
+}
+
+type TransformRetentionPolicyTime struct {
+ Field string `json:"field"`
+ MaxAge string `json:"max_age"`
+}
+
+type TransformSync struct {
+ Time TransformSyncTime `json:"time"`
+}
+
+type TransformSyncTime struct {
+ Field string `json:"field"`
+ Delay string `json:"delay,omitempty"`
+}
+
+type TransformSettings struct {
+ AlignCheckpoints *bool `json:"align_checkpoints,omitempty"`
+ DatesAsEpochMillis *bool `json:"dates_as_epoch_millis,omitempty"`
+ DeduceMappings *bool `json:"deduce_mappings,omitempty"`
+ DocsPerSecond *float64 `json:"docs_per_second,omitempty"`
+ MaxPageSearchSize *int `json:"max_page_search_size,omitempty"`
+ NumFailureRetries *int `json:"num_failure_retries,omitempty"`
+ Unattended *bool `json:"unattended,omitempty"`
+}
+
+type PutTransformParams struct {
+ DeferValidation bool
+ Timeout time.Duration
+ Enabled bool
+}
+
+type UpdateTransformParams struct {
+ DeferValidation bool
+ Timeout time.Duration
+ Enabled bool
+ ApplyEnabled bool
+}
+
+type GetTransformResponse struct {
+ Count json.Number `json:"count"`
+ Transforms []Transform `json:"transforms"`
+}
+
+type TransformStats struct {
+ Id string `json:"id"`
+ State string `json:"state"`
+}
+
+type GetTransformStatsResponse struct {
+ Count json.Number `json:"count"`
+ TransformStats []TransformStats `json:"transforms"`
+}
+
+func (ts *TransformStats) IsStarted() bool {
+ return ts.State == "started" || ts.State == "indexing"
+}
diff --git a/internal/utils/validation.go b/internal/utils/validation.go
index 38e90b6d8..04a367482 100644
--- a/internal/utils/validation.go
+++ b/internal/utils/validation.go
@@ -2,6 +2,7 @@ package utils
import (
"fmt"
+ "regexp"
"time"
)
@@ -18,3 +19,24 @@ func StringIsDuration(i interface{}, k string) (warnings []string, errors []erro
return nil, nil
}
+
+// StringIsElasticDuration is a SchemaValidateFunc which tests to make sure the supplied string is valid duration using Elastic time units:
+// d, h, m, s, ms, micros, nanos. (see https://www.elastic.co/guide/en/elasticsearch/reference/current/api-conventions.html#time-units)
+func StringIsElasticDuration(i interface{}, k string) (warnings []string, errors []error) {
+ v, ok := i.(string)
+ if !ok {
+ return nil, []error{fmt.Errorf("expected type of %s to be string", k)}
+ }
+
+ if v == "" {
+ return nil, []error{fmt.Errorf("%q contains an invalid duration: [empty]", k)}
+ }
+
+ r := regexp.MustCompile(`^[0-9]+(?:\.[0-9]+)?(?:d|h|m|s|ms|micros|nanos)$`)
+
+ if !r.MatchString(v) {
+ return nil, []error{fmt.Errorf("%q contains an invalid duration: not conforming to Elastic time-units format", k)}
+ }
+
+ return nil, nil
+}
diff --git a/internal/utils/validation_test.go b/internal/utils/validation_test.go
index 3a8b8c1d0..ef06bed5a 100644
--- a/internal/utils/validation_test.go
+++ b/internal/utils/validation_test.go
@@ -46,3 +46,50 @@ func TestStringIsDuration(t *testing.T) {
})
}
}
+
+func TestStringIsElasticDuration(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ i interface{}
+ k string
+ wantWarnings []string
+ wantErrors []error
+ }{
+ {
+ name: "valid Elastic duration string",
+ i: "30d",
+ k: "delay",
+ },
+ {
+ name: "invalid Elastic duration unit",
+ i: "12w",
+ k: "delay",
+ wantErrors: []error{errors.New(`"delay" contains an invalid duration: not conforming to Elastic time-units format`)},
+ },
+ {
+ name: "invalid Elastic duration value",
+ i: ".12s",
+ k: "delay",
+ wantErrors: []error{errors.New(`"delay" contains an invalid duration: not conforming to Elastic time-units format`)},
+ },
+ {
+ name: "invalid data type",
+ i: 30,
+ k: "delay",
+ wantErrors: []error{errors.New("expected type of delay to be string")},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ gotWarnings, gotErrors := StringIsElasticDuration(tt.i, tt.k)
+ if !reflect.DeepEqual(gotWarnings, tt.wantWarnings) {
+ t.Errorf("StringIsElasticDuration() gotWarnings = %v, want %v", gotWarnings, tt.wantWarnings)
+ }
+ if !reflect.DeepEqual(gotErrors, tt.wantErrors) {
+ t.Errorf("StringIsElasticDuration() gotErrors = %v, want %v", gotErrors, tt.wantErrors)
+ }
+ })
+ }
+}
diff --git a/provider/provider.go b/provider/provider.go
index ea0e7dd49..08b8aa64f 100644
--- a/provider/provider.go
+++ b/provider/provider.go
@@ -7,6 +7,7 @@ import (
"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ingest"
"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/logstash"
"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/security"
+ "github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/transform"
"github.com/elastic/terraform-provider-elasticstack/internal/kibana"
providerSchema "github.com/elastic/terraform-provider-elasticstack/internal/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
@@ -87,6 +88,7 @@ func New(version string) *schema.Provider {
"elasticstack_elasticsearch_snapshot_lifecycle": cluster.ResourceSlm(),
"elasticstack_elasticsearch_snapshot_repository": cluster.ResourceSnapshotRepository(),
"elasticstack_elasticsearch_script": cluster.ResourceScript(),
+ "elasticstack_elasticsearch_transform": transform.ResourceTransform(),
"elasticstack_kibana_space": kibana.ResourceSpace(),
},
diff --git a/templates/resources/elasticsearch_transform.md.tmpl b/templates/resources/elasticsearch_transform.md.tmpl
new file mode 100644
index 000000000..6c997cef8
--- /dev/null
+++ b/templates/resources/elasticsearch_transform.md.tmpl
@@ -0,0 +1,25 @@
+---
+subcategory: "Transform"
+layout: ""
+page_title: "Elasticstack: elasticstack_elasticsearch_transform Resource"
+description: |-
+ Manages transforms. Transforms enable you to convert existing Elasticsearch indices into summarized indices.
+---
+
+# Resource: elasticstack_elasticsearch_transform
+
+Creates, updates, starts and stops a transform. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/transforms.html
+
+**NOTE:** Some transform settings require a minimum Elasticsearch version. Such settings will be ignored when applied to versions below the required one (a warning will be issued in the logs).
+
+## Example Usage
+
+{{ tffile "examples/resources/elasticstack_elasticsearch_transform/resource.tf" }}
+
+{{ .SchemaMarkdown | trimspace }}
+
+## Import
+
+Import is supported using the following syntax:
+
+{{ codefile "shell" "examples/resources/elasticstack_elasticsearch_transform/import.sh" }}