diff --git a/.changelog/2904.txt b/.changelog/2904.txt new file mode 100644 index 0000000000..8f51cbc5c6 --- /dev/null +++ b/.changelog/2904.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/tencentcloud_emr_cluster: support emr horizontal expansion +``` \ No newline at end of file diff --git a/tencentcloud/services/emr/extension_emr.go b/tencentcloud/services/emr/extension_emr.go index a3c02d8b5a..9c33e03090 100644 --- a/tencentcloud/services/emr/extension_emr.go +++ b/tencentcloud/services/emr/extension_emr.go @@ -20,6 +20,11 @@ const ( EMR_MASTER_WAN_TYPE_NOT_NEED_MASTER_WAN = "NOT_NEED_MASTER_WAN" ) +const ( + F_KEY_FLOW_ID = "FlowId" + F_KEY_TRACE_ID = "TraceId" +) + var EMR_MASTER_WAN_TYPES = []string{EMR_MASTER_WAN_TYPE_NEED_MASTER_WAN, EMR_MASTER_WAN_TYPE_NOT_NEED_MASTER_WAN} func buildResourceSpecSchema() *schema.Schema { diff --git a/tencentcloud/services/emr/resource_tc_emr_cluster.go b/tencentcloud/services/emr/resource_tc_emr_cluster.go index fd62a9d6dd..8d0a010074 100644 --- a/tencentcloud/services/emr/resource_tc_emr_cluster.go +++ b/tencentcloud/services/emr/resource_tc_emr_cluster.go @@ -4,6 +4,8 @@ import ( "context" innerErr "errors" "fmt" + "strconv" + "time" tccommon "github.com/tencentcloudstack/terraform-provider-tencentcloud/tencentcloud/common" "github.com/tencentcloudstack/terraform-provider-tencentcloud/tencentcloud/internal/helper" @@ -103,6 +105,28 @@ func ResourceTencentCloudEmrCluster() *schema.Resource { }, Description: "Resource specification of EMR instance.", }, + "terminate_node_info": { + Type: schema.TypeList, + Optional: true, + Description: "Terminate nodes. Note: it only works when the number of nodes decreases.", + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "cvm_instance_ids": { + Type: schema.TypeList, + Optional: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + }, + Description: "Destroy resource list.", + }, + "node_flag": { + Type: schema.TypeString, + Optional: true, + Description: "Value range of destruction node type: `MASTER`, `TASK`, `CORE`, `ROUTER`.", + }, + }, + }, + }, "support_ha": { Type: schema.TypeInt, Required: true, @@ -220,7 +244,7 @@ func resourceTencentCloudEmrClusterUpdate(d *schema.ResourceData, meta interface logId := tccommon.GetLogId(tccommon.ContextNil) ctx := context.WithValue(context.TODO(), tccommon.LogIdKey, logId) - immutableFields := []string{"auto_renew", "placement", "placement_info", "display_strategy", "login_settings", "resource_spec.0.master_count", "resource_spec.0.task_count", "resource_spec.0.core_count"} + immutableFields := []string{"auto_renew", "placement", "placement_info", "display_strategy", "login_settings", "extend_fs_field"} for _, f := range immutableFields { if d.HasChange(f) { return fmt.Errorf("cannot update argument `%s`", f) @@ -245,63 +269,92 @@ func resourceTencentCloudEmrClusterUpdate(d *schema.ResourceData, meta interface } } - hasChange := false - request := emr.NewScaleOutInstanceRequest() - request.TimeUnit = common.StringPtr(timeUnit.(string)) - request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int))) - request.PayMode = common.Uint64Ptr((uint64)(payMode.(int))) - request.InstanceId = common.StringPtr(instanceId) - - tmpResourceSpec := d.Get("resource_spec").([]interface{}) - resourceSpec := tmpResourceSpec[0].(map[string]interface{}) - if d.HasChange("resource_spec.0.master_count") { - request.MasterCount = common.Uint64Ptr((uint64)(resourceSpec["master_count"].(int))) - hasChange = true + request := emr.NewScaleOutInstanceRequest() + request.TimeUnit = common.StringPtr(timeUnit.(string)) + request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int))) + request.PayMode = common.Uint64Ptr((uint64)(payMode.(int))) + request.InstanceId = common.StringPtr(instanceId) + + o, n := d.GetChange("resource_spec.0.master_count") + if o.(int) < n.(int) { + request.MasterCount = common.Uint64Ptr((uint64)(n.(int) - o.(int))) + traceId, err := emrService.ScaleOutInstance(ctx, request) + if err != nil { + return err + } + time.Sleep(5 * time.Second) + conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, traceId, F_KEY_TRACE_ID, []string{})) + if _, e := conf.WaitForState(); e != nil { + return e + } + } } if d.HasChange("resource_spec.0.task_count") { - request.TaskCount = common.Uint64Ptr((uint64)(resourceSpec["task_count"].(int))) - hasChange = true + request := emr.NewScaleOutInstanceRequest() + request.TimeUnit = common.StringPtr(timeUnit.(string)) + request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int))) + request.PayMode = common.Uint64Ptr((uint64)(payMode.(int))) + request.InstanceId = common.StringPtr(instanceId) + + o, n := d.GetChange("resource_spec.0.task_count") + if o.(int) < n.(int) { + request.TaskCount = common.Uint64Ptr((uint64)(n.(int) - o.(int))) + traceId, err := emrService.ScaleOutInstance(ctx, request) + if err != nil { + return err + } + time.Sleep(5 * time.Second) + conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, traceId, F_KEY_TRACE_ID, []string{})) + if _, e := conf.WaitForState(); e != nil { + return e + } + } } if d.HasChange("resource_spec.0.core_count") { - request.CoreCount = common.Uint64Ptr((uint64)(resourceSpec["core_count"].(int))) - hasChange = true - } - if d.HasChange("extend_fs_field") { - return innerErr.New("extend_fs_field not support update.") - } - if !hasChange { - return nil - } - _, err := emrService.UpdateInstance(ctx, request) - if err != nil { - return err - } - err = resource.Retry(10*tccommon.ReadRetryTimeout, func() *resource.RetryError { - clusters, err := emrService.DescribeInstancesById(ctx, instanceId, DisplayStrategyIsclusterList) - - if e, ok := err.(*errors.TencentCloudSDKError); ok { - if e.GetCode() == "InternalError.ClusterNotFound" { - return nil + request := emr.NewScaleOutInstanceRequest() + request.TimeUnit = common.StringPtr(timeUnit.(string)) + request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int))) + request.PayMode = common.Uint64Ptr((uint64)(payMode.(int))) + request.InstanceId = common.StringPtr(instanceId) + + o, n := d.GetChange("resource_spec.0.core_count") + if o.(int) < n.(int) { + request.CoreCount = common.Uint64Ptr((uint64)(n.(int) - o.(int))) + traceId, err := emrService.ScaleOutInstance(ctx, request) + if err != nil { + return err } - } - - if len(clusters) > 0 { - status := *(clusters[0].Status) - if status != EmrInternetStatusCreated { - return resource.RetryableError( - fmt.Errorf("%v create cluster endpoint status still is %v", instanceId, status)) + time.Sleep(5 * time.Second) + conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, traceId, F_KEY_TRACE_ID, []string{})) + if _, e := conf.WaitForState(); e != nil { + return e } } + } - if err != nil { - return resource.RetryableError(err) + if d.HasChange("resource_spec.0.master_count") || d.HasChange("resource_spec.0.task_count") || d.HasChange("resource_spec.0.core_count") { + if v, ok := d.GetOk("terminate_node_info"); ok { + terminateNodeInfos := v.([]interface{}) + for _, terminateNodeInfo := range terminateNodeInfos { + terminateNodeInfoMap := terminateNodeInfo.(map[string]interface{}) + instanceIds := make([]string, 0) + for _, instanceId := range terminateNodeInfoMap["cvm_instance_ids"].([]interface{}) { + instanceIds = append(instanceIds, instanceId.(string)) + } + flowId, err := emrService.TerminateClusterNodes(ctx, instanceIds, instanceId, terminateNodeInfoMap["node_flag"].(string)) + if err != nil { + return err + } + time.Sleep(5 * time.Second) + conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, strconv.FormatInt(flowId, 10), F_KEY_FLOW_ID, []string{})) + if _, e := conf.WaitForState(); e != nil { + return e + } + } } - return nil - }) - if err != nil { - return err } + return resourceTencentCloudEmrClusterRead(d, meta) } diff --git a/tencentcloud/services/emr/resource_tc_emr_cluster_test.go b/tencentcloud/services/emr/resource_tc_emr_cluster_test.go index 13dd9df22e..30b1922dc4 100644 --- a/tencentcloud/services/emr/resource_tc_emr_cluster_test.go +++ b/tencentcloud/services/emr/resource_tc_emr_cluster_test.go @@ -156,12 +156,20 @@ func TestAccTencentCloudEmrClusterResource_Basic(t *testing.T) { resource.TestCheckResourceAttrSet(testEmrClusterResourceKey, "instance_id"), resource.TestCheckResourceAttr(testEmrClusterResourceKey, "sg_id", tcacctest.DefaultEMRSgId), resource.TestCheckResourceAttr(testEmrClusterResourceKey, "tags.emr-key", "emr-value"), + resource.TestCheckResourceAttr(testEmrClusterResourceKey, "resource_spec.0.core_count", "2"), + ), + }, + { + Config: testEmrBasic_AddCoreNode, + Check: resource.ComposeTestCheckFunc( + testAccCheckEmrExists(testEmrClusterResourceKey), + resource.TestCheckResourceAttr(testEmrClusterResourceKey, "resource_spec.0.core_count", "3"), ), }, { ResourceName: testEmrClusterResourceKey, ImportState: true, - ImportStateVerifyIgnore: []string{"display_strategy", "placement", "time_span", "time_unit", "login_settings"}, + ImportStateVerifyIgnore: []string{"display_strategy", "placement", "time_span", "time_unit", "login_settings", "terminate_node_info"}, }, }, }) @@ -327,6 +335,75 @@ resource "tencentcloud_emr_cluster" "emrrrr" { } ` +const testEmrBasic_AddCoreNode = tcacctest.DefaultEMRVariable + ` +data "tencentcloud_instance_types" "cvm4c8m" { + exclude_sold_out=true + cpu_core_count=4 + memory_size=8 + filter { + name = "instance-charge-type" + values = ["POSTPAID_BY_HOUR"] + } + filter { + name = "zone" + values = ["ap-guangzhou-3"] + } +} + +resource "tencentcloud_emr_cluster" "emrrrr" { + product_id=38 + vpc_settings={ + vpc_id=var.vpc_id + subnet_id=var.subnet_id + } + softwares = [ + "hdfs-2.8.5", + "knox-1.6.1", + "openldap-2.4.44", + "yarn-2.8.5", + "zookeeper-3.6.3", + ] + support_ha=0 + instance_name="emr-test-demo" + resource_spec { + master_resource_spec { + mem_size=8192 + cpu=4 + disk_size=100 + disk_type="CLOUD_PREMIUM" + spec="CVM.${data.tencentcloud_instance_types.cvm4c8m.instance_types.0.family}" + storage_type=5 + root_size=50 + } + core_resource_spec { + mem_size=8192 + cpu=4 + disk_size=100 + disk_type="CLOUD_PREMIUM" + spec="CVM.${data.tencentcloud_instance_types.cvm4c8m.instance_types.0.family}" + storage_type=5 + root_size=50 + } + master_count=1 + core_count=3 + } + login_settings={ + password="Tencent@cloud123" + } + time_span=3600 + time_unit="s" + pay_mode=0 + placement_info { + zone="ap-guangzhou-3" + project_id=0 + } + sg_id=var.sg_id + tags = { + emr-key = "emr-value" + } + } +` + const testEmrBasicPrepay = tcacctest.DefaultEMRVariable + ` data "tencentcloud_instance_types" "cvm4c8m" { exclude_sold_out=true diff --git a/tencentcloud/services/emr/service_tencentcloud_emr.go b/tencentcloud/services/emr/service_tencentcloud_emr.go index d4e3d627ee..6aa572ebb4 100644 --- a/tencentcloud/services/emr/service_tencentcloud_emr.go +++ b/tencentcloud/services/emr/service_tencentcloud_emr.go @@ -638,3 +638,96 @@ func (me *EMRService) DescribeLiteHbaseInstancesByFilter(ctx context.Context, pa return } + +func (me *EMRService) TerminateClusterNodes(ctx context.Context, instanceIds []string, instanceId, nodeFlag string) (flowId int64, errRet error) { + logId := tccommon.GetLogId(ctx) + + request := emr.NewTerminateClusterNodesRequest() + request.CvmInstanceIds = helper.Strings(instanceIds) + request.InstanceId = helper.String(instanceId) + request.NodeFlag = helper.String(nodeFlag) + + defer func() { + if errRet != nil { + log.Printf("[CRITAL]%s api[%s] fail, request body [%s], reason[%s]\n", logId, request.GetAction(), request.ToJsonString(), errRet.Error()) + } + }() + + var ( + response *emr.TerminateClusterNodesResponse + innerErr error + ) + err := resource.Retry(tccommon.ReadRetryTimeout, func() *resource.RetryError { + ratelimit.Check(request.GetAction()) + response, innerErr = me.client.UseEmrClient().TerminateClusterNodes(request) + if innerErr != nil { + return tccommon.RetryError(innerErr) + } + return nil + }) + if err != nil { + errRet = err + return + } + + if response.Response != nil && response.Response.FlowId != nil { + flowId = *response.Response.FlowId + return + } + return + +} + +func (me *EMRService) FlowStatusRefreshFunc(instanceId, flowId, flowType string, failStates []string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + + request := emr.NewDescribeClusterFlowStatusDetailRequest() + request.InstanceId = helper.String(instanceId) + request.FlowParam = &emr.FlowParam{ + FKey: helper.String(flowType), + FValue: helper.String(flowId), + } + + var ( + response *emr.DescribeClusterFlowStatusDetailResponse + innerErr error + ) + err := resource.Retry(tccommon.ReadRetryTimeout, func() *resource.RetryError { + ratelimit.Check(request.GetAction()) + response, innerErr = me.client.UseEmrClient().DescribeClusterFlowStatusDetail(request) + if innerErr != nil { + return tccommon.RetryError(innerErr) + } + return nil + }) + if err != nil { + return nil, "", err + } + + if response.Response == nil || response.Response.FlowTotalStatus == nil { + return nil, "", fmt.Errorf("Not found flow.") + } + return response.Response.FlowTotalStatus, helper.Int64ToStr(*response.Response.FlowTotalStatus), nil + } +} + +func (me *EMRService) ScaleOutInstance(ctx context.Context, request *emr.ScaleOutInstanceRequest) (traceId string, err error) { + logId := tccommon.GetLogId(ctx) + err = resource.Retry(tccommon.ReadRetryTimeout, func() *resource.RetryError { + ratelimit.Check(request.GetAction()) + ratelimit.Check(request.GetAction()) + response, e := me.client.UseEmrClient().ScaleOutInstance(request) + if e != nil { + log.Printf("[CRITAL]%s api[%s] fail, request body [%s], reason[%s]\n", + logId, request.GetAction(), request.ToJsonString(), e.Error()) + return tccommon.RetryError(e) + } + traceId = *response.Response.TraceId + return nil + }) + + if err != nil { + return + } + return +} diff --git a/website/docs/r/emr_cluster.html.markdown b/website/docs/r/emr_cluster.html.markdown index 61a55c11a7..6f52d8a523 100644 --- a/website/docs/r/emr_cluster.html.markdown +++ b/website/docs/r/emr_cluster.html.markdown @@ -137,6 +137,7 @@ The following arguments are supported: * `resource_spec` - (Optional, List) Resource specification of EMR instance. * `sg_id` - (Optional, String, ForceNew) The ID of the security group to which the instance belongs, in the form of sg-xxxxxxxx. * `tags` - (Optional, Map) Tag description list. +* `terminate_node_info` - (Optional, List) Terminate nodes. Note: it only works when the number of nodes decreases. * `time_span` - (Optional, Int) The length of time the instance was purchased. Use with TimeUnit.When TimeUnit is s, the parameter can only be filled in at 3600, representing a metered instance. When TimeUnit is m, the number filled in by this parameter indicates the length of purchase of the monthly instance of the package year, such as 1 for one month of purchase. * `time_unit` - (Optional, String) The unit of time in which the instance was purchased. When PayMode is 0, TimeUnit can only take values of s(second). When PayMode is 1, TimeUnit can only take the value m(month). @@ -157,6 +158,11 @@ The `resource_spec` object supports the following: * `task_count` - (Optional, Int) The number of core node. * `task_resource_spec` - (Optional, List, ForceNew) +The `terminate_node_info` object supports the following: + +* `cvm_instance_ids` - (Optional, List) Destroy resource list. +* `node_flag` - (Optional, String) Value range of destruction node type: `MASTER`, `TASK`, `CORE`, `ROUTER`. + ## Attributes Reference In addition to all arguments above, the following attributes are exported: