Skip to content

Commit 61ca973

Browse files
author
mikatong
committed
support emr horizontal expansion
1 parent a9a3ce5 commit 61ca973

File tree

5 files changed

+282
-48
lines changed

5 files changed

+282
-48
lines changed

tencentcloud/services/emr/extension_emr.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ const (
2020
EMR_MASTER_WAN_TYPE_NOT_NEED_MASTER_WAN = "NOT_NEED_MASTER_WAN"
2121
)
2222

23+
const (
24+
F_KEY_FLOW_ID = "FlowId"
25+
F_KEY_TRACE_ID = "TraceId"
26+
)
27+
2328
var EMR_MASTER_WAN_TYPES = []string{EMR_MASTER_WAN_TYPE_NEED_MASTER_WAN, EMR_MASTER_WAN_TYPE_NOT_NEED_MASTER_WAN}
2429

2530
func buildResourceSpecSchema() *schema.Schema {

tencentcloud/services/emr/resource_tc_emr_cluster.go

Lines changed: 100 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
innerErr "errors"
66
"fmt"
7+
"strconv"
8+
"time"
79

810
tccommon "github.com/tencentcloudstack/terraform-provider-tencentcloud/tencentcloud/common"
911
"github.com/tencentcloudstack/terraform-provider-tencentcloud/tencentcloud/internal/helper"
@@ -103,6 +105,28 @@ func ResourceTencentCloudEmrCluster() *schema.Resource {
103105
},
104106
Description: "Resource specification of EMR instance.",
105107
},
108+
"terminate_node_info": {
109+
Type: schema.TypeList,
110+
Optional: true,
111+
Description: "Terminate nodes. Note: it only works when the number of nodes decreases.",
112+
Elem: &schema.Resource{
113+
Schema: map[string]*schema.Schema{
114+
"cvm_instance_ids": {
115+
Type: schema.TypeList,
116+
Optional: true,
117+
Elem: &schema.Schema{
118+
Type: schema.TypeString,
119+
},
120+
Description: "Destroy resource list.",
121+
},
122+
"node_flag": {
123+
Type: schema.TypeString,
124+
Optional: true,
125+
Description: "Value range of destruction node type: `MASTER`, `TASK`, `CORE`, `ROUTER`.",
126+
},
127+
},
128+
},
129+
},
106130
"support_ha": {
107131
Type: schema.TypeInt,
108132
Required: true,
@@ -220,7 +244,7 @@ func resourceTencentCloudEmrClusterUpdate(d *schema.ResourceData, meta interface
220244
logId := tccommon.GetLogId(tccommon.ContextNil)
221245
ctx := context.WithValue(context.TODO(), tccommon.LogIdKey, logId)
222246

223-
immutableFields := []string{"auto_renew", "placement", "placement_info", "display_strategy", "login_settings", "resource_spec.0.master_count", "resource_spec.0.task_count", "resource_spec.0.core_count"}
247+
immutableFields := []string{"auto_renew", "placement", "placement_info", "display_strategy", "login_settings", "extend_fs_field"}
224248
for _, f := range immutableFields {
225249
if d.HasChange(f) {
226250
return fmt.Errorf("cannot update argument `%s`", f)
@@ -245,63 +269,92 @@ func resourceTencentCloudEmrClusterUpdate(d *schema.ResourceData, meta interface
245269
}
246270
}
247271

248-
hasChange := false
249-
request := emr.NewScaleOutInstanceRequest()
250-
request.TimeUnit = common.StringPtr(timeUnit.(string))
251-
request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int)))
252-
request.PayMode = common.Uint64Ptr((uint64)(payMode.(int)))
253-
request.InstanceId = common.StringPtr(instanceId)
254-
255-
tmpResourceSpec := d.Get("resource_spec").([]interface{})
256-
resourceSpec := tmpResourceSpec[0].(map[string]interface{})
257-
258272
if d.HasChange("resource_spec.0.master_count") {
259-
request.MasterCount = common.Uint64Ptr((uint64)(resourceSpec["master_count"].(int)))
260-
hasChange = true
273+
request := emr.NewScaleOutInstanceRequest()
274+
request.TimeUnit = common.StringPtr(timeUnit.(string))
275+
request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int)))
276+
request.PayMode = common.Uint64Ptr((uint64)(payMode.(int)))
277+
request.InstanceId = common.StringPtr(instanceId)
278+
279+
o, n := d.GetChange("resource_spec.0.master_count")
280+
if o.(int) < n.(int) {
281+
request.MasterCount = common.Uint64Ptr((uint64)(n.(int) - o.(int)))
282+
traceId, err := emrService.ScaleOutInstance(ctx, request)
283+
if err != nil {
284+
return err
285+
}
286+
time.Sleep(5 * time.Second)
287+
conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, traceId, F_KEY_TRACE_ID, []string{}))
288+
if _, e := conf.WaitForState(); e != nil {
289+
return e
290+
}
291+
}
261292
}
262293
if d.HasChange("resource_spec.0.task_count") {
263-
request.TaskCount = common.Uint64Ptr((uint64)(resourceSpec["task_count"].(int)))
264-
hasChange = true
294+
request := emr.NewScaleOutInstanceRequest()
295+
request.TimeUnit = common.StringPtr(timeUnit.(string))
296+
request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int)))
297+
request.PayMode = common.Uint64Ptr((uint64)(payMode.(int)))
298+
request.InstanceId = common.StringPtr(instanceId)
299+
300+
o, n := d.GetChange("resource_spec.0.task_count")
301+
if o.(int) < n.(int) {
302+
request.TaskCount = common.Uint64Ptr((uint64)(n.(int) - o.(int)))
303+
traceId, err := emrService.ScaleOutInstance(ctx, request)
304+
if err != nil {
305+
return err
306+
}
307+
time.Sleep(5 * time.Second)
308+
conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, traceId, F_KEY_TRACE_ID, []string{}))
309+
if _, e := conf.WaitForState(); e != nil {
310+
return e
311+
}
312+
}
265313
}
266314
if d.HasChange("resource_spec.0.core_count") {
267-
request.CoreCount = common.Uint64Ptr((uint64)(resourceSpec["core_count"].(int)))
268-
hasChange = true
269-
}
270-
if d.HasChange("extend_fs_field") {
271-
return innerErr.New("extend_fs_field not support update.")
272-
}
273-
if !hasChange {
274-
return nil
275-
}
276-
_, err := emrService.UpdateInstance(ctx, request)
277-
if err != nil {
278-
return err
279-
}
280-
err = resource.Retry(10*tccommon.ReadRetryTimeout, func() *resource.RetryError {
281-
clusters, err := emrService.DescribeInstancesById(ctx, instanceId, DisplayStrategyIsclusterList)
282-
283-
if e, ok := err.(*errors.TencentCloudSDKError); ok {
284-
if e.GetCode() == "InternalError.ClusterNotFound" {
285-
return nil
315+
request := emr.NewScaleOutInstanceRequest()
316+
request.TimeUnit = common.StringPtr(timeUnit.(string))
317+
request.TimeSpan = common.Uint64Ptr((uint64)(timeSpan.(int)))
318+
request.PayMode = common.Uint64Ptr((uint64)(payMode.(int)))
319+
request.InstanceId = common.StringPtr(instanceId)
320+
321+
o, n := d.GetChange("resource_spec.0.core_count")
322+
if o.(int) < n.(int) {
323+
request.CoreCount = common.Uint64Ptr((uint64)(n.(int) - o.(int)))
324+
traceId, err := emrService.ScaleOutInstance(ctx, request)
325+
if err != nil {
326+
return err
286327
}
287-
}
288-
289-
if len(clusters) > 0 {
290-
status := *(clusters[0].Status)
291-
if status != EmrInternetStatusCreated {
292-
return resource.RetryableError(
293-
fmt.Errorf("%v create cluster endpoint status still is %v", instanceId, status))
328+
time.Sleep(5 * time.Second)
329+
conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, traceId, F_KEY_TRACE_ID, []string{}))
330+
if _, e := conf.WaitForState(); e != nil {
331+
return e
294332
}
295333
}
334+
}
296335

297-
if err != nil {
298-
return resource.RetryableError(err)
336+
if d.HasChange("resource_spec.0.master_count") || d.HasChange("resource_spec.0.task_count") || d.HasChange("resource_spec.0.core_count") {
337+
if v, ok := d.GetOk("terminate_node_info"); ok {
338+
terminateNodeInfos := v.([]interface{})
339+
for _, terminateNodeInfo := range terminateNodeInfos {
340+
terminateNodeInfoMap := terminateNodeInfo.(map[string]interface{})
341+
instanceIds := make([]string, 0)
342+
for _, instanceId := range terminateNodeInfoMap["cvm_instance_ids"].([]interface{}) {
343+
instanceIds = append(instanceIds, instanceId.(string))
344+
}
345+
flowId, err := emrService.TerminateClusterNodes(ctx, instanceIds, instanceId, terminateNodeInfoMap["node_flag"].(string))
346+
if err != nil {
347+
return err
348+
}
349+
time.Sleep(5 * time.Second)
350+
conf := tccommon.BuildStateChangeConf([]string{}, []string{"2"}, 10*tccommon.ReadRetryTimeout, time.Second, emrService.FlowStatusRefreshFunc(instanceId, strconv.FormatInt(flowId, 10), F_KEY_FLOW_ID, []string{}))
351+
if _, e := conf.WaitForState(); e != nil {
352+
return e
353+
}
354+
}
299355
}
300-
return nil
301-
})
302-
if err != nil {
303-
return err
304356
}
357+
305358
return resourceTencentCloudEmrClusterRead(d, meta)
306359
}
307360

tencentcloud/services/emr/resource_tc_emr_cluster_test.go

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,20 @@ func TestAccTencentCloudEmrClusterResource_Basic(t *testing.T) {
156156
resource.TestCheckResourceAttrSet(testEmrClusterResourceKey, "instance_id"),
157157
resource.TestCheckResourceAttr(testEmrClusterResourceKey, "sg_id", tcacctest.DefaultEMRSgId),
158158
resource.TestCheckResourceAttr(testEmrClusterResourceKey, "tags.emr-key", "emr-value"),
159+
resource.TestCheckResourceAttr(testEmrClusterResourceKey, "resource_spec.core_count", "2"),
160+
),
161+
},
162+
{
163+
Config: testEmrBasic_AddCoreNode,
164+
Check: resource.ComposeTestCheckFunc(
165+
testAccCheckEmrExists(testEmrClusterResourceKey),
166+
resource.TestCheckResourceAttr(testEmrClusterResourceKey, "resource_spec.core_count", "3"),
159167
),
160168
},
161169
{
162170
ResourceName: testEmrClusterResourceKey,
163171
ImportState: true,
164-
ImportStateVerifyIgnore: []string{"display_strategy", "placement", "time_span", "time_unit", "login_settings"},
172+
ImportStateVerifyIgnore: []string{"display_strategy", "placement", "time_span", "time_unit", "login_settings", "terminate_node_info"},
165173
},
166174
},
167175
})
@@ -327,6 +335,75 @@ resource "tencentcloud_emr_cluster" "emrrrr" {
327335
}
328336
`
329337

338+
const testEmrBasic_AddCoreNode = tcacctest.DefaultEMRVariable + `
339+
data "tencentcloud_instance_types" "cvm4c8m" {
340+
exclude_sold_out=true
341+
cpu_core_count=4
342+
memory_size=8
343+
filter {
344+
name = "instance-charge-type"
345+
values = ["POSTPAID_BY_HOUR"]
346+
}
347+
filter {
348+
name = "zone"
349+
values = ["ap-guangzhou-3"]
350+
}
351+
}
352+
353+
resource "tencentcloud_emr_cluster" "emrrrr" {
354+
product_id=38
355+
vpc_settings={
356+
vpc_id=var.vpc_id
357+
subnet_id=var.subnet_id
358+
}
359+
softwares = [
360+
"hdfs-2.8.5",
361+
"knox-1.6.1",
362+
"openldap-2.4.44",
363+
"yarn-2.8.5",
364+
"zookeeper-3.6.3",
365+
]
366+
support_ha=0
367+
instance_name="emr-test-demo"
368+
resource_spec {
369+
master_resource_spec {
370+
mem_size=8192
371+
cpu=4
372+
disk_size=100
373+
disk_type="CLOUD_PREMIUM"
374+
spec="CVM.${data.tencentcloud_instance_types.cvm4c8m.instance_types.0.family}"
375+
storage_type=5
376+
root_size=50
377+
}
378+
core_resource_spec {
379+
mem_size=8192
380+
cpu=4
381+
disk_size=100
382+
disk_type="CLOUD_PREMIUM"
383+
spec="CVM.${data.tencentcloud_instance_types.cvm4c8m.instance_types.0.family}"
384+
storage_type=5
385+
root_size=50
386+
}
387+
master_count=1
388+
core_count=3
389+
}
390+
login_settings={
391+
password="Tencent@cloud123"
392+
}
393+
time_span=3600
394+
time_unit="s"
395+
pay_mode=0
396+
placement_info {
397+
zone="ap-guangzhou-3"
398+
project_id=0
399+
}
400+
sg_id=var.sg_id
401+
tags = {
402+
emr-key = "emr-value"
403+
}
404+
}
405+
`
406+
330407
const testEmrBasicPrepay = tcacctest.DefaultEMRVariable + `
331408
data "tencentcloud_instance_types" "cvm4c8m" {
332409
exclude_sold_out=true

tencentcloud/services/emr/service_tencentcloud_emr.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,3 +638,96 @@ func (me *EMRService) DescribeLiteHbaseInstancesByFilter(ctx context.Context, pa
638638

639639
return
640640
}
641+
642+
func (me *EMRService) TerminateClusterNodes(ctx context.Context, instanceIds []string, instanceId, nodeFlag string) (flowId int64, errRet error) {
643+
logId := tccommon.GetLogId(ctx)
644+
645+
request := emr.NewTerminateClusterNodesRequest()
646+
request.CvmInstanceIds = helper.Strings(instanceIds)
647+
request.InstanceId = helper.String(instanceId)
648+
request.NodeFlag = helper.String(nodeFlag)
649+
650+
defer func() {
651+
if errRet != nil {
652+
log.Printf("[CRITAL]%s api[%s] fail, request body [%s], reason[%s]\n", logId, request.GetAction(), request.ToJsonString(), errRet.Error())
653+
}
654+
}()
655+
656+
var (
657+
response *emr.TerminateClusterNodesResponse
658+
innerErr error
659+
)
660+
err := resource.Retry(tccommon.ReadRetryTimeout, func() *resource.RetryError {
661+
ratelimit.Check(request.GetAction())
662+
response, innerErr = me.client.UseEmrClient().TerminateClusterNodes(request)
663+
if innerErr != nil {
664+
return tccommon.RetryError(innerErr)
665+
}
666+
return nil
667+
})
668+
if err != nil {
669+
errRet = err
670+
return
671+
}
672+
673+
if response.Response != nil && response.Response.FlowId != nil {
674+
flowId = *response.Response.FlowId
675+
return
676+
}
677+
return
678+
679+
}
680+
681+
func (me *EMRService) FlowStatusRefreshFunc(instanceId, flowId, flowType string, failStates []string) resource.StateRefreshFunc {
682+
return func() (interface{}, string, error) {
683+
684+
request := emr.NewDescribeClusterFlowStatusDetailRequest()
685+
request.InstanceId = helper.String(instanceId)
686+
request.FlowParam = &emr.FlowParam{
687+
FKey: helper.String(flowType),
688+
FValue: helper.String(flowId),
689+
}
690+
691+
var (
692+
response *emr.DescribeClusterFlowStatusDetailResponse
693+
innerErr error
694+
)
695+
err := resource.Retry(tccommon.ReadRetryTimeout, func() *resource.RetryError {
696+
ratelimit.Check(request.GetAction())
697+
response, innerErr = me.client.UseEmrClient().DescribeClusterFlowStatusDetail(request)
698+
if innerErr != nil {
699+
return tccommon.RetryError(innerErr)
700+
}
701+
return nil
702+
})
703+
if err != nil {
704+
return nil, "", err
705+
}
706+
707+
if response.Response == nil || response.Response.FlowTotalStatus == nil {
708+
return nil, "", fmt.Errorf("Not found flow.")
709+
}
710+
return response.Response.FlowTotalStatus, helper.Int64ToStr(*response.Response.FlowTotalStatus), nil
711+
}
712+
}
713+
714+
func (me *EMRService) ScaleOutInstance(ctx context.Context, request *emr.ScaleOutInstanceRequest) (traceId string, err error) {
715+
logId := tccommon.GetLogId(ctx)
716+
err = resource.Retry(tccommon.ReadRetryTimeout, func() *resource.RetryError {
717+
ratelimit.Check(request.GetAction())
718+
ratelimit.Check(request.GetAction())
719+
response, e := me.client.UseEmrClient().ScaleOutInstance(request)
720+
if e != nil {
721+
log.Printf("[CRITAL]%s api[%s] fail, request body [%s], reason[%s]\n",
722+
logId, request.GetAction(), request.ToJsonString(), e.Error())
723+
return tccommon.RetryError(e)
724+
}
725+
traceId = *response.Response.TraceId
726+
return nil
727+
})
728+
729+
if err != nil {
730+
return
731+
}
732+
return
733+
}

0 commit comments

Comments
 (0)