Skip to content

fix(emr): [120069959] support emr horizontal expansion #2904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/2904.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/tencentcloud_emr_cluster: support emr horizontal expansion
```
5 changes: 5 additions & 0 deletions tencentcloud/services/emr/extension_emr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ const (
EMR_MASTER_WAN_TYPE_NOT_NEED_MASTER_WAN = "NOT_NEED_MASTER_WAN"
)

const (
F_KEY_FLOW_ID = "FlowId"
F_KEY_TRACE_ID = "TraceId"
)

var EMR_MASTER_WAN_TYPES = []string{EMR_MASTER_WAN_TYPE_NEED_MASTER_WAN, EMR_MASTER_WAN_TYPE_NOT_NEED_MASTER_WAN}

func buildResourceSpecSchema() *schema.Schema {
Expand Down
147 changes: 100 additions & 47 deletions tencentcloud/services/emr/resource_tc_emr_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
innerErr "errors"
"fmt"
"strconv"
"time"

tccommon "github.com/tencentcloudstack/terraform-provider-tencentcloud/tencentcloud/common"
"github.com/tencentcloudstack/terraform-provider-tencentcloud/tencentcloud/internal/helper"
Expand Down Expand Up @@ -103,6 +105,28 @@ func ResourceTencentCloudEmrCluster() *schema.Resource {
},
Description: "Resource specification of EMR instance.",
},
"terminate_node_info": {
Type: schema.TypeList,
Optional: true,
Description: "Terminate nodes. Note: it only works when the number of nodes decreases.",
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"cvm_instance_ids": {
Type: schema.TypeList,
Optional: true,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Description: "Destroy resource list.",
},
"node_flag": {
Type: schema.TypeString,
Optional: true,
Description: "Value range of destruction node type: `MASTER`, `TASK`, `CORE`, `ROUTER`.",
},
},
},
},
"support_ha": {
Type: schema.TypeInt,
Required: true,
Expand Down Expand Up @@ -220,7 +244,7 @@ func resourceTencentCloudEmrClusterUpdate(d *schema.ResourceData, meta interface
logId := tccommon.GetLogId(tccommon.ContextNil)
ctx := context.WithValue(context.TODO(), tccommon.LogIdKey, logId)

immutableFields := []string{"auto_renew", "placement", "placement_info", "display_strategy", "login_settings", "resource_spec.0.master_count", "resource_spec.0.task_count", "resource_spec.0.core_count"}
immutableFields := []string{"auto_renew", "placement", "placement_info", "display_strategy", "login_settings", "extend_fs_field"}
for _, f := range immutableFields {
if d.HasChange(f) {
return fmt.Errorf("cannot update argument `%s`", f)
Expand All @@ -245,63 +269,92 @@ func resourceTencentCloudEmrClusterUpdate(d *schema.ResourceData, meta interface
}
}

hasChange := false
request := emr.NewScaleOutInstanceRequest()
request.TimeUnit = common.StringPtr(timeUnit.(string))
request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int)))
request.PayMode = common.Uint64Ptr((uint64)(payMode.(int)))
request.InstanceId = common.StringPtr(instanceId)

tmpResourceSpec := d.Get("resource_spec").([]interface{})
resourceSpec := tmpResourceSpec[0].(map[string]interface{})

if d.HasChange("resource_spec.0.master_count") {
request.MasterCount = common.Uint64Ptr((uint64)(resourceSpec["master_count"].(int)))
hasChange = true
request := emr.NewScaleOutInstanceRequest()
request.TimeUnit = common.StringPtr(timeUnit.(string))
request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int)))
request.PayMode = common.Uint64Ptr((uint64)(payMode.(int)))
request.InstanceId = common.StringPtr(instanceId)

o, n := d.GetChange("resource_spec.0.master_count")
if o.(int) < n.(int) {
request.MasterCount = common.Uint64Ptr((uint64)(n.(int) - o.(int)))
traceId, err := emrService.ScaleOutInstance(ctx, request)
if err != nil {
return err
}
time.Sleep(5 * time.Second)
conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, traceId, F_KEY_TRACE_ID, []string{}))
if _, e := conf.WaitForState(); e != nil {
return e
}
}
}
if d.HasChange("resource_spec.0.task_count") {
request.TaskCount = common.Uint64Ptr((uint64)(resourceSpec["task_count"].(int)))
hasChange = true
request := emr.NewScaleOutInstanceRequest()
request.TimeUnit = common.StringPtr(timeUnit.(string))
request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int)))
request.PayMode = common.Uint64Ptr((uint64)(payMode.(int)))
request.InstanceId = common.StringPtr(instanceId)

o, n := d.GetChange("resource_spec.0.task_count")
if o.(int) < n.(int) {
request.TaskCount = common.Uint64Ptr((uint64)(n.(int) - o.(int)))
traceId, err := emrService.ScaleOutInstance(ctx, request)
if err != nil {
return err
}
time.Sleep(5 * time.Second)
conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, traceId, F_KEY_TRACE_ID, []string{}))
if _, e := conf.WaitForState(); e != nil {
return e
}
}
}
if d.HasChange("resource_spec.0.core_count") {
request.CoreCount = common.Uint64Ptr((uint64)(resourceSpec["core_count"].(int)))
hasChange = true
}
if d.HasChange("extend_fs_field") {
return innerErr.New("extend_fs_field not support update.")
}
if !hasChange {
return nil
}
_, err := emrService.UpdateInstance(ctx, request)
if err != nil {
return err
}
err = resource.Retry(10*tccommon.ReadRetryTimeout, func() *resource.RetryError {
clusters, err := emrService.DescribeInstancesById(ctx, instanceId, DisplayStrategyIsclusterList)

if e, ok := err.(*errors.TencentCloudSDKError); ok {
if e.GetCode() == "InternalError.ClusterNotFound" {
return nil
request := emr.NewScaleOutInstanceRequest()
request.TimeUnit = common.StringPtr(timeUnit.(string))
request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int)))
request.PayMode = common.Uint64Ptr((uint64)(payMode.(int)))
request.InstanceId = common.StringPtr(instanceId)

o, n := d.GetChange("resource_spec.0.core_count")
if o.(int) < n.(int) {
request.CoreCount = common.Uint64Ptr((uint64)(n.(int) - o.(int)))
traceId, err := emrService.ScaleOutInstance(ctx, request)
if err != nil {
return err
}
}

if len(clusters) > 0 {
status := *(clusters[0].Status)
if status != EmrInternetStatusCreated {
return resource.RetryableError(
fmt.Errorf("%v create cluster endpoint status still is %v", instanceId, status))
time.Sleep(5 * time.Second)
conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, traceId, F_KEY_TRACE_ID, []string{}))
if _, e := conf.WaitForState(); e != nil {
return e
}
}
}

if err != nil {
return resource.RetryableError(err)
if d.HasChange("resource_spec.0.master_count") || d.HasChange("resource_spec.0.task_count") || d.HasChange("resource_spec.0.core_count") {
if v, ok := d.GetOk("terminate_node_info"); ok {
terminateNodeInfos := v.([]interface{})
for _, terminateNodeInfo := range terminateNodeInfos {
terminateNodeInfoMap := terminateNodeInfo.(map[string]interface{})
instanceIds := make([]string, 0)
for _, instanceId := range terminateNodeInfoMap["cvm_instance_ids"].([]interface{}) {
instanceIds = append(instanceIds, instanceId.(string))
}
flowId, err := emrService.TerminateClusterNodes(ctx, instanceIds, instanceId, terminateNodeInfoMap["node_flag"].(string))
if err != nil {
return err
}
time.Sleep(5 * time.Second)
conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, strconv.FormatInt(flowId, 10), F_KEY_FLOW_ID, []string{}))
if _, e := conf.WaitForState(); e != nil {
return e
}
}
}
return nil
})
if err != nil {
return err
}

return resourceTencentCloudEmrClusterRead(d, meta)
}

Expand Down
79 changes: 78 additions & 1 deletion tencentcloud/services/emr/resource_tc_emr_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,20 @@ func TestAccTencentCloudEmrClusterResource_Basic(t *testing.T) {
resource.TestCheckResourceAttrSet(testEmrClusterResourceKey, "instance_id"),
resource.TestCheckResourceAttr(testEmrClusterResourceKey, "sg_id", tcacctest.DefaultEMRSgId),
resource.TestCheckResourceAttr(testEmrClusterResourceKey, "tags.emr-key", "emr-value"),
resource.TestCheckResourceAttr(testEmrClusterResourceKey, "resource_spec.0.core_count", "2"),
),
},
{
Config: testEmrBasic_AddCoreNode,
Check: resource.ComposeTestCheckFunc(
testAccCheckEmrExists(testEmrClusterResourceKey),
resource.TestCheckResourceAttr(testEmrClusterResourceKey, "resource_spec.0.core_count", "3"),
),
},
{
ResourceName: testEmrClusterResourceKey,
ImportState: true,
ImportStateVerifyIgnore: []string{"display_strategy", "placement", "time_span", "time_unit", "login_settings"},
ImportStateVerifyIgnore: []string{"display_strategy", "placement", "time_span", "time_unit", "login_settings", "terminate_node_info"},
},
},
})
Expand Down Expand Up @@ -327,6 +335,75 @@ resource "tencentcloud_emr_cluster" "emrrrr" {
}
`

const testEmrBasic_AddCoreNode = tcacctest.DefaultEMRVariable + `
data "tencentcloud_instance_types" "cvm4c8m" {
exclude_sold_out=true
cpu_core_count=4
memory_size=8
filter {
name = "instance-charge-type"
values = ["POSTPAID_BY_HOUR"]
}
filter {
name = "zone"
values = ["ap-guangzhou-3"]
}
}

resource "tencentcloud_emr_cluster" "emrrrr" {
product_id=38
vpc_settings={
vpc_id=var.vpc_id
subnet_id=var.subnet_id
}
softwares = [
"hdfs-2.8.5",
"knox-1.6.1",
"openldap-2.4.44",
"yarn-2.8.5",
"zookeeper-3.6.3",
]
support_ha=0
instance_name="emr-test-demo"
resource_spec {
master_resource_spec {
mem_size=8192
cpu=4
disk_size=100
disk_type="CLOUD_PREMIUM"
spec="CVM.${data.tencentcloud_instance_types.cvm4c8m.instance_types.0.family}"
storage_type=5
root_size=50
}
core_resource_spec {
mem_size=8192
cpu=4
disk_size=100
disk_type="CLOUD_PREMIUM"
spec="CVM.${data.tencentcloud_instance_types.cvm4c8m.instance_types.0.family}"
storage_type=5
root_size=50
}
master_count=1
core_count=3
}
login_settings={
password="Tencent@cloud123"
}
time_span=3600
time_unit="s"
pay_mode=0
placement_info {
zone="ap-guangzhou-3"
project_id=0
}
sg_id=var.sg_id
tags = {
emr-key = "emr-value"
}
}
`

const testEmrBasicPrepay = tcacctest.DefaultEMRVariable + `
data "tencentcloud_instance_types" "cvm4c8m" {
exclude_sold_out=true
Expand Down
93 changes: 93 additions & 0 deletions tencentcloud/services/emr/service_tencentcloud_emr.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,96 @@ func (me *EMRService) DescribeLiteHbaseInstancesByFilter(ctx context.Context, pa

return
}

func (me *EMRService) TerminateClusterNodes(ctx context.Context, instanceIds []string, instanceId, nodeFlag string) (flowId int64, errRet error) {
logId := tccommon.GetLogId(ctx)

request := emr.NewTerminateClusterNodesRequest()
request.CvmInstanceIds = helper.Strings(instanceIds)
request.InstanceId = helper.String(instanceId)
request.NodeFlag = helper.String(nodeFlag)

defer func() {
if errRet != nil {
log.Printf("[CRITAL]%s api[%s] fail, request body [%s], reason[%s]\n", logId, request.GetAction(), request.ToJsonString(), errRet.Error())
}
}()

var (
response *emr.TerminateClusterNodesResponse
innerErr error
)
err := resource.Retry(tccommon.ReadRetryTimeout, func() *resource.RetryError {
ratelimit.Check(request.GetAction())
response, innerErr = me.client.UseEmrClient().TerminateClusterNodes(request)
if innerErr != nil {
return tccommon.RetryError(innerErr)
}
return nil
})
if err != nil {
errRet = err
return
}

if response.Response != nil && response.Response.FlowId != nil {
flowId = *response.Response.FlowId
return
}
return

}

func (me *EMRService) FlowStatusRefreshFunc(instanceId, flowId, flowType string, failStates []string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {

request := emr.NewDescribeClusterFlowStatusDetailRequest()
request.InstanceId = helper.String(instanceId)
request.FlowParam = &emr.FlowParam{
FKey: helper.String(flowType),
FValue: helper.String(flowId),
}

var (
response *emr.DescribeClusterFlowStatusDetailResponse
innerErr error
)
err := resource.Retry(tccommon.ReadRetryTimeout, func() *resource.RetryError {
ratelimit.Check(request.GetAction())
response, innerErr = me.client.UseEmrClient().DescribeClusterFlowStatusDetail(request)
if innerErr != nil {
return tccommon.RetryError(innerErr)
}
return nil
})
if err != nil {
return nil, "", err
}

if response.Response == nil || response.Response.FlowTotalStatus == nil {
return nil, "", fmt.Errorf("Not found flow.")
}
return response.Response.FlowTotalStatus, helper.Int64ToStr(*response.Response.FlowTotalStatus), nil
}
}

func (me *EMRService) ScaleOutInstance(ctx context.Context, request *emr.ScaleOutInstanceRequest) (traceId string, err error) {
logId := tccommon.GetLogId(ctx)
err = resource.Retry(tccommon.ReadRetryTimeout, func() *resource.RetryError {
ratelimit.Check(request.GetAction())
ratelimit.Check(request.GetAction())
response, e := me.client.UseEmrClient().ScaleOutInstance(request)
if e != nil {
log.Printf("[CRITAL]%s api[%s] fail, request body [%s], reason[%s]\n",
logId, request.GetAction(), request.ToJsonString(), e.Error())
return tccommon.RetryError(e)
}
traceId = *response.Response.TraceId
return nil
})

if err != nil {
return
}
return
}
Loading
Loading