Skip to content

Commit 3b40d9a

Browse files
RobsonSuttontobio
andauthored
[ISSUE-115] Add Logstash Centralised Pipeline Management (#151)
* Initial changes to add logstash pipelines * begin updating put action * simplify structure and begin testing * further development * tidy up input var types * update docs * update role mapping docs * resolve small typo * update to map format * reformat example * set id field to read-only for role mapping and logstash pipeline * strings shouldn't be capitalised and tidy ilm config * format test * missed tab] * use format method * change pipeline settings approach * update settings arr initialisation approach * format settings similar to index settings approach * add additional settings + resolve comments * Whitespace * Update internal/elasticsearch/logstash/pipeline.go Co-authored-by: Toby Brain <[email protected]> Co-authored-by: Toby Brain <[email protected]>
1 parent f15c10b commit 3b40d9a

File tree

16 files changed

+770
-35
lines changed

16 files changed

+770
-35
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## [Unreleased]
22

3+
### Added
4+
- New resource `elasticstack_elasticsearch_logstash_pipeline` to manage Logstash pipelines ([Centralized Pipeline Management](https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html)) ([#151](https://github.com/elastic/terraform-provider-elasticstack/pull/151))
5+
36
## [0.4.0] - 2022-10-07
47
### Added
58

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
---
2+
subcategory: "Logstash"
3+
layout: ""
4+
page_title: "Elasticstack: elasticstack_elasticsearch_logstash_pipeline Resource"
5+
description: |-
6+
Creates or updates centrally managed logstash pipelines.
7+
---
8+
9+
# Resource: elasticstack_elasticsearch_logstash_pipeline
10+
11+
Creates or updates centrally managed logstash pipelines. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/logstash-apis.html
12+
13+
## Example Usage
14+
15+
```terraform
16+
provider "elasticstack" {
17+
elasticsearch {}
18+
}
19+
20+
resource "elasticstack_elasticsearch_logstash_pipeline" "example" {
21+
pipeline_id = "test_pipeline"
22+
description = "This is an example pipeline"
23+
24+
pipeline = <<-EOF
25+
input{}
26+
filter{}
27+
output{}
28+
EOF
29+
30+
pipeline_metadata = {
31+
"type" = "logstash_pipeline"
32+
"version" = 1
33+
}
34+
35+
pipeline_batch_delay = 50
36+
pipeline_batch_size = 125
37+
pipeline_ecs_compatibility = "disabled"
38+
pipeline_ordered = "auto"
39+
pipeline_plugin_classloaders = false
40+
pipeline_unsafe_shutdown = false
41+
pipeline_workers = 1
42+
queue_checkpoint_acks = 1024
43+
queue_checkpoint_retry = true
44+
queue_checkpoint_writes = 1024
45+
queue_drain = false
46+
queue_max_bytes_number = 1
47+
queue_max_bytes_units = "gb"
48+
queue_max_events = 0
49+
queue_page_capacity = "64mb"
50+
queue_type = "persisted"
51+
}
52+
53+
output "pipeline" {
54+
value = elasticstack_elasticsearch_logstash_pipeline.example.pipeline_id
55+
}
56+
```
57+
58+
<!-- schema generated by tfplugindocs -->
59+
## Schema
60+
61+
### Required
62+
63+
- **pipeline** (String) Configuration for the pipeline.
64+
- **pipeline_id** (String) Identifier for the pipeline.
65+
66+
### Optional
67+
68+
- **description** (String) Description of the pipeline.
69+
- **elasticsearch_connection** (Block List, Max: 1) Used to establish connection to Elasticsearch server. Overrides environment variables if present. (see [below for nested schema](#nestedblock--elasticsearch_connection))
70+
- **pipeline_batch_delay** (Number) Time in milliseconds to wait for each event before sending an undersized batch to pipeline workers.
71+
- **pipeline_batch_size** (Number) The maximum number of events an individual worker thread collects before executing filters and outputs.
72+
- **pipeline_ecs_compatibility** (String) Sets the pipeline default value for ecs_compatibility, a setting that is available to plugins that implement an ECS compatibility mode for use with the Elastic Common Schema.
73+
- **pipeline_metadata** (Map of String) Optional metadata about the pipeline.
74+
- **pipeline_ordered** (String) Set the pipeline event ordering.
75+
- **pipeline_plugin_classloaders** (Boolean) (Beta) Load Java plugins in independent classloaders to isolate their dependencies.
76+
- **pipeline_unsafe_shutdown** (Boolean) Forces Logstash to exit during shutdown even if there are still inflight events in memory.
77+
- **pipeline_workers** (Number) The number of parallel workers used to run the filter and output stages of the pipeline.
78+
- **queue_checkpoint_acks** (Number) The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled.
79+
- **queue_checkpoint_retry** (Boolean) When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried.
80+
- **queue_checkpoint_writes** (Number) The maximum number of written events before forcing a checkpoint when persistent queues are enabled.
81+
- **queue_drain** (Boolean) When enabled, Logstash waits until the persistent queue is drained before shutting down.
82+
- **queue_max_bytes_number** (Number) The total capacity of the queue when persistent queues are enabled.
83+
- **queue_max_bytes_units** (String) Units for the total capacity of the queue when persistent queues are enabled.
84+
- **queue_max_events** (Number) The maximum number of unread events in the queue when persistent queues are enabled.
85+
- **queue_page_capacity** (String) The size of the page data files used when persistent queues are enabled. The queue data consists of append-only data files separated into pages.
86+
- **queue_type** (String) The internal queueing model for event buffering. Options are memory for in-memory queueing, or persisted for disk-based acknowledged queueing.
87+
- **username** (String) User who last updated the pipeline.
88+
89+
### Read-Only
90+
91+
- **id** (String) Internal identifier of the resource
92+
- **last_modified** (String) Date the pipeline was last updated.
93+
94+
<a id="nestedblock--elasticsearch_connection"></a>
95+
### Nested Schema for `elasticsearch_connection`
96+
97+
Optional:
98+
99+
- **api_key** (String, Sensitive) API Key to use for authentication to Elasticsearch
100+
- **ca_data** (String) PEM-encoded custom Certificate Authority certificate
101+
- **ca_file** (String) Path to a custom Certificate Authority certificate
102+
- **endpoints** (List of String, Sensitive) A list of endpoints the Terraform provider will point to. They must include the http(s) schema and port number.
103+
- **insecure** (Boolean) Disable TLS certificate validation
104+
- **password** (String, Sensitive) A password to use for API authentication to Elasticsearch.
105+
- **username** (String) A username to use for API authentication to Elasticsearch.
106+
107+
## Import
108+
109+
Import is supported using the following syntax:
110+
111+
```shell
112+
terraform import elasticstack_elasticsearch_security_logstash_pipeline.my_pipeline <cluster_uuid>/<pipeline ID>
113+
```

docs/resources/elasticsearch_security_role_mapping.md

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
---
2-
# generated by https://github.com/hashicorp/terraform-plugin-docs
3-
page_title: "elasticstack_elasticsearch_security_role_mapping Resource - terraform-provider-elasticstack"
4-
subcategory: ""
2+
subcategory: "Security"
3+
layout: ""
4+
page_title: "Elasticstack: elasticstack_elasticsearch_security_role_mapping Resource"
55
description: |-
66
Manage role mappings. See, https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-put-role-mapping.html
77
---
88

9-
# elasticstack_elasticsearch_security_role_mapping (Resource)
9+
# Resource: elasticstack_elasticsearch_security_role_mapping
1010

1111
Manage role mappings. See, https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-put-role-mapping.html
1212

@@ -42,22 +42,27 @@ output "role" {
4242
### Required
4343

4444
- **name** (String) The distinct name that identifies the role mapping, used solely as an identifier.
45-
- **roles** (Set of String) A list of role names that are granted to the users that match the role mapping rules.
46-
- **rules** (String) A list of mustache templates that will be evaluated to determine the role names that should granted to the users that match the role mapping rules. This matches fields of users, rules can be grouped into `all` and `any` top level keys.
45+
- **rules** (String) The rules that determine which users should be matched by the mapping. A rule is a logical condition that is expressed by using a JSON DSL.
4746

4847
### Optional
4948

5049
- **elasticsearch_connection** (Block List, Max: 1) Used to establish connection to Elasticsearch server. Overrides environment variables if present. (see [below for nested schema](#nestedblock--elasticsearch_connection))
5150
- **enabled** (Boolean) Mappings that have `enabled` set to `false` are ignored when role mapping is performed.
52-
- **id** (String) The ID of this resource.
5351
- **metadata** (String) Additional metadata that helps define which roles are assigned to each user. Keys beginning with `_` are reserved for system usage.
52+
- **role_templates** (String) A list of mustache templates that will be evaluated to determine the roles names that should granted to the users that match the role mapping rules.
53+
- **roles** (Set of String) A list of role names that are granted to the users that match the role mapping rules.
54+
55+
### Read-Only
56+
57+
- **id** (String) Internal identifier of the resource
5458

5559
<a id="nestedblock--elasticsearch_connection"></a>
5660
### Nested Schema for `elasticsearch_connection`
5761

5862
Optional:
5963

6064
- **api_key** (String, Sensitive) API Key to use for authentication to Elasticsearch
65+
- **ca_data** (String) PEM-encoded custom Certificate Authority certificate
6166
- **ca_file** (String) Path to a custom Certificate Authority certificate
6267
- **endpoints** (List of String, Sensitive) A list of endpoints the Terraform provider will point to. They must include the http(s) schema and port number.
6368
- **insecure** (Boolean) Disable TLS certificate validation
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
terraform import elasticstack_elasticsearch_security_logstash_pipeline.my_pipeline <cluster_uuid>/<pipeline ID>
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
provider "elasticstack" {
2+
elasticsearch {}
3+
}
4+
5+
resource "elasticstack_elasticsearch_logstash_pipeline" "example" {
6+
pipeline_id = "test_pipeline"
7+
description = "This is an example pipeline"
8+
9+
pipeline = <<-EOF
10+
input{}
11+
filter{}
12+
output{}
13+
EOF
14+
15+
pipeline_metadata = {
16+
"type" = "logstash_pipeline"
17+
"version" = 1
18+
}
19+
20+
pipeline_batch_delay = 50
21+
pipeline_batch_size = 125
22+
pipeline_ecs_compatibility = "disabled"
23+
pipeline_ordered = "auto"
24+
pipeline_plugin_classloaders = false
25+
pipeline_unsafe_shutdown = false
26+
pipeline_workers = 1
27+
queue_checkpoint_acks = 1024
28+
queue_checkpoint_retry = true
29+
queue_checkpoint_writes = 1024
30+
queue_drain = false
31+
queue_max_bytes_number = 1
32+
queue_max_bytes_units = "gb"
33+
queue_max_events = 0
34+
queue_page_capacity = "64mb"
35+
queue_type = "persisted"
36+
}
37+
38+
output "pipeline" {
39+
value = elasticstack_elasticsearch_logstash_pipeline.example.pipeline_id
40+
}

internal/clients/logstash.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package clients
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"net/http"
9+
10+
"github.com/elastic/terraform-provider-elasticstack/internal/models"
11+
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
12+
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
13+
)
14+
15+
func (a *ApiClient) PutLogstashPipeline(ctx context.Context, logstashPipeline *models.LogstashPipeline) diag.Diagnostics {
16+
var diags diag.Diagnostics
17+
logstashPipelineBytes, err := json.Marshal(logstashPipeline)
18+
if err != nil {
19+
return diag.FromErr(err)
20+
}
21+
res, err := a.es.LogstashPutPipeline(logstashPipeline.PipelineID, bytes.NewReader(logstashPipelineBytes), a.es.LogstashPutPipeline.WithContext(ctx))
22+
if err != nil {
23+
return diag.FromErr(err)
24+
}
25+
defer res.Body.Close()
26+
if diags := utils.CheckError(res, "Unable to create or update logstash pipeline"); diags.HasError() {
27+
return diags
28+
}
29+
30+
return diags
31+
}
32+
33+
func (a *ApiClient) GetLogstashPipeline(ctx context.Context, pipelineID string) (*models.LogstashPipeline, diag.Diagnostics) {
34+
var diags diag.Diagnostics
35+
res, err := a.es.LogstashGetPipeline(pipelineID, a.es.LogstashGetPipeline.WithContext(ctx))
36+
if err != nil {
37+
return nil, diag.FromErr(err)
38+
}
39+
defer res.Body.Close()
40+
if res.StatusCode == http.StatusNotFound {
41+
return nil, nil
42+
}
43+
if diags := utils.CheckError(res, "Unable to find logstash pipeline on cluster."); diags.HasError() {
44+
return nil, diags
45+
}
46+
47+
logstashPipeline := make(map[string]models.LogstashPipeline)
48+
if err := json.NewDecoder(res.Body).Decode(&logstashPipeline); err != nil {
49+
return nil, diag.FromErr(err)
50+
}
51+
52+
if logstashPipeline, ok := logstashPipeline[pipelineID]; ok {
53+
logstashPipeline.PipelineID = pipelineID
54+
return &logstashPipeline, diags
55+
}
56+
57+
diags = append(diags, diag.Diagnostic{
58+
Severity: diag.Error,
59+
Summary: "Unable to find logstash pipeline in the cluster",
60+
Detail: fmt.Sprintf(`Unable to find "%s" logstash pipeline in the cluster`, pipelineID),
61+
})
62+
return nil, diags
63+
}
64+
65+
func (a *ApiClient) DeleteLogstashPipeline(ctx context.Context, pipeline_id string) diag.Diagnostics {
66+
var diags diag.Diagnostics
67+
res, err := a.es.LogstashDeletePipeline(pipeline_id, a.es.LogstashDeletePipeline.WithContext(ctx))
68+
69+
if err != nil && res.IsError() {
70+
return diag.FromErr(err)
71+
}
72+
defer res.Body.Close()
73+
if diags := utils.CheckError(res, "Unable to delete logstash pipeline"); diags.HasError() {
74+
return diags
75+
}
76+
return diags
77+
}

internal/elasticsearch/index/ilm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ func expandAction(a []interface{}, settings ...string) (map[string]interface{},
501501
def := make(map[string]interface{})
502502

503503
// can be zero, so we must skip the empty check
504-
settingsToSkip := map[string]struct{}{"number_of_replicas": struct{}{}, "priority": struct{}{}}
504+
settingsToSkip := map[string]struct{}{"number_of_replicas": {}, "priority": {}}
505505

506506
if action := a[0]; action != nil {
507507
for _, setting := range settings {

internal/elasticsearch/index/index.go

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ If specified, this mapping can include: field names, [field data types](https://
521521
// first populate what we can with Read
522522
diags := resourceIndexRead(ctx, d, m)
523523
if diags.HasError() {
524-
return nil, fmt.Errorf("Unable to import requested index")
524+
return nil, fmt.Errorf("unable to import requested index")
525525
}
526526

527527
client, err := clients.NewApiClient(d, m)
@@ -530,12 +530,12 @@ If specified, this mapping can include: field names, [field data types](https://
530530
}
531531
compId, diags := clients.CompositeIdFromStr(d.Id())
532532
if diags.HasError() {
533-
return nil, fmt.Errorf("Failed to parse provided ID")
533+
return nil, fmt.Errorf("failed to parse provided ID")
534534
}
535535
indexName := compId.ResourceId
536536
index, diags := client.GetElasticsearchIndex(ctx, indexName)
537537
if diags.HasError() {
538-
return nil, fmt.Errorf("Failed to get an ES Index")
538+
return nil, fmt.Errorf("failed to get an ES Index")
539539
}
540540

541541
// check the settings and import those as well
@@ -564,7 +564,7 @@ If specified, this mapping can include: field names, [field data types](https://
564564
}
565565
value = v
566566
}
567-
if err := d.Set(convertSettingsKeyToTFFieldKey(key), value); err != nil {
567+
if err := d.Set(utils.ConvertSettingsKeyToTFFieldKey(key), value); err != nil {
568568
return nil, err
569569
}
570570
}
@@ -672,7 +672,7 @@ func resourceIndexCreate(ctx context.Context, d *schema.ResourceData, meta inter
672672
}
673673

674674
index.Settings = map[string]interface{}{}
675-
if settings := expandIndividuallyDefinedIndexSettings(ctx, d, allSettingsKeys); len(settings) > 0 {
675+
if settings := utils.ExpandIndividuallyDefinedSettings(ctx, d, allSettingsKeys); len(settings) > 0 {
676676
index.Settings = settings
677677
}
678678

@@ -783,7 +783,7 @@ func resourceIndexUpdate(ctx context.Context, d *schema.ResourceData, meta inter
783783
// settings
784784
updatedSettings := make(map[string]interface{})
785785
for key := range dynamicsSettingsKeys {
786-
fieldKey := convertSettingsKeyToTFFieldKey(key)
786+
fieldKey := utils.ConvertSettingsKeyToTFFieldKey(key)
787787
if d.HasChange(fieldKey) {
788788
updatedSettings[key] = d.Get(fieldKey)
789789
}
@@ -918,24 +918,3 @@ func resourceIndexDelete(ctx context.Context, d *schema.ResourceData, meta inter
918918
d.SetId("")
919919
return diags
920920
}
921-
922-
func expandIndividuallyDefinedIndexSettings(ctx context.Context, d *schema.ResourceData, settingsKeys map[string]schema.ValueType) map[string]interface{} {
923-
settings := make(map[string]interface{})
924-
for key := range settingsKeys {
925-
tfFieldKey := convertSettingsKeyToTFFieldKey(key)
926-
if raw, ok := d.GetOk(tfFieldKey); ok {
927-
switch field := raw.(type) {
928-
case *schema.Set:
929-
settings[key] = field.List()
930-
default:
931-
settings[key] = raw
932-
}
933-
tflog.Trace(ctx, fmt.Sprintf("expandIndividuallyDefinedIndexSettings: settingsKey:%+v tfFieldKey:%+v value:%+v, %+v", key, tfFieldKey, raw, settings))
934-
}
935-
}
936-
return settings
937-
}
938-
939-
func convertSettingsKeyToTFFieldKey(settingKey string) string {
940-
return strings.Replace(settingKey, ".", "_", -1)
941-
}

0 commit comments

Comments
 (0)