1
+ #
2
+ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
+ #
4
+ # Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
5
+ # with the License. A copy of the License is located at
6
+ #
7
+ # http://www.apache.org/licenses/LICENSE-2.0
8
+ #
9
+ # or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES
10
+ # OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions
11
+ # and limitations under the License.
12
+ #
13
+
14
+ from opensearchpy import (OpenSearch , AuthorizationException )
15
+
16
+ import boto3
17
+ import logging
18
+ import os
19
+ import uuid
20
+
21
+ from tenacity import (
22
+ retry ,
23
+ retry_if_exception_type ,
24
+ stop_after_attempt ,
25
+ wait_exponential_jitter ,
26
+ )
27
+
28
+ from typing import List , TypedDict
29
+
30
+ from custom_resources .cr_types import CustomResourceRequest , CustomResourceResponse
31
+ from opensearch_index import connect_opensearch
32
+
33
+ LOG_LEVEL = os .getenv ("LOG_LEVEL" , "INFO" )
34
+
35
+ logger = logging .getLogger (__name__ )
36
+ logger .setLevel (LOG_LEVEL )
37
+
38
+
39
+ class VpcEndpointProperties (TypedDict ):
40
+ Endpoint : str
41
+ DomainArn : str
42
+ SubnetIds : List [str ]
43
+ SecurityGroupIds : List [str ]
44
+
45
+ def validate_event (event : CustomResourceRequest [VpcEndpointProperties ]) -> bool :
46
+ if event ["ResourceProperties" ] is None :
47
+ raise ValueError ("ResourceProperties is required" )
48
+ if event ["ResourceProperties" ]["Endpoint" ] is None :
49
+ raise ValueError ("Endpoint is required" )
50
+ if event ["ResourceProperties" ]["DomainArn" ] is None :
51
+ raise ValueError ("DomainArn is required" )
52
+ if event ["ResourceProperties" ]["SubnetIds" ] is None :
53
+ raise ValueError ("SubnetIds is required" )
54
+ if event ["ResourceProperties" ]["SecurityGroupIds" ] is None :
55
+ raise ValueError ("SecurityGroupIds is required" )
56
+
57
+ @retry (
58
+ retry = retry_if_exception_type (AuthorizationException ),
59
+ stop = stop_after_attempt (30 ),
60
+ wait = wait_exponential_jitter (1 , 3 ),
61
+ )
62
+ def handle_create (
63
+ client : OpenSearch ,
64
+ domain_arn : str ,
65
+ subnet_ids : List [str ],
66
+ security_group_ids : List [str ],
67
+ client_token : str
68
+ ):
69
+ try :
70
+ response = client .create_vpc_endpoint (
71
+ DomainArn = domain_arn ,
72
+ VpcOptions = {
73
+ "SubnetIds" : subnet_ids ,
74
+ "SecurityGroupIds" : security_group_ids ,
75
+ },
76
+ ClientToken = client_token ,
77
+ )
78
+ except Exception as e :
79
+ logger .error (f"Error creating VPC endpoint for domain: { domain_arn } " )
80
+ logger .exception (e )
81
+ raise e
82
+ return response ["VpcEndpoint" ]["VpcEndpointId" ]
83
+
84
+ @retry (
85
+ retry = retry_if_exception_type (AuthorizationException ),
86
+ stop = stop_after_attempt (30 ),
87
+ wait = wait_exponential_jitter (1 , 3 ),
88
+ )
89
+ def handle_update (
90
+ client : OpenSearch ,
91
+ vpc_endpoint_id : str ,
92
+ subnet_ids : List [str ],
93
+ security_group_ids : List [str ]
94
+ ):
95
+ try :
96
+ response = client .update_vpc_endpoint (
97
+ VpcEndpointId = vpc_endpoint_id ,
98
+ VpcOptions = {
99
+ "SubnetIds" : subnet_ids ,
100
+ "SecurityGroupIds" : security_group_ids ,
101
+ },
102
+ )
103
+ except Exception as e :
104
+ logger .error (f"Error updating VPC endpoint: { vpc_endpoint_id } " )
105
+ logger .exception (e )
106
+ raise e
107
+ return response ["VpcEndpoint" ]["VpcEndpointId" ]
108
+
109
+ @retry (
110
+ retry = retry_if_exception_type (AuthorizationException ),
111
+ stop = stop_after_attempt (30 ),
112
+ wait = wait_exponential_jitter (1 , 3 ),
113
+ )
114
+ def handle_delete (
115
+ client : OpenSearch ,
116
+ vpc_endpoint_id : str ,
117
+ ):
118
+ try :
119
+ response = client .delete_vpc_endpoint (
120
+ VpcEndpointId = vpc_endpoint_id ,
121
+ )
122
+ except Exception as e :
123
+ logger .error (f"Error deleting VPC endpoint: { vpc_endpoint_id } " )
124
+ logger .exception (e )
125
+ raise e
126
+ return response ["VpcEndpointSummary" ]["VpcEndpointId" ]
127
+
128
+ def on_create (event : CustomResourceRequest [VpcEndpointProperties ]) -> CustomResourceResponse :
129
+ validate_event (event )
130
+ client = connect_opensearch (event ["ResourceProperties" ]["Endpoint" ])
131
+ physical_id = handle_create (client ,
132
+ event ["ResourceProperties" ]["DomainArn" ],
133
+ event ["ResourceProperties" ]["SubnetIds" ],
134
+ event ["ResourceProperties" ]["SecurityGroupIds" ],
135
+ str (uuid .uuid4 ())
136
+ )
137
+ return {"PhysicalResourceId" : physical_id }
138
+
139
+ def on_update (
140
+ event : CustomResourceRequest [VpcEndpointProperties ],
141
+ ) -> CustomResourceResponse :
142
+ validate_event (event )
143
+ client = connect_opensearch (event ["ResourceProperties" ]["Endpoint" ])
144
+ physical_id = handle_update (client ,
145
+ event ["PhysicalResourceId" ],
146
+ event ["ResourceProperties" ]["SubnetIds" ],
147
+ event ["ResourceProperties" ]["SecurityGroupIds" ]
148
+ )
149
+ return {"PhysicalResourceId" : physical_id }
150
+ def on_delete (
151
+ event : CustomResourceRequest [VpcEndpointProperties ],
152
+ ) -> CustomResourceResponse :
153
+ validate_event (event )
154
+ client = connect_opensearch (event ["ResourceProperties" ]["Endpoint" ])
155
+ pyhiscal_id = handle_delete (client , event ["PhysicalResourceId" ])
156
+
157
+ return {"PhysicalResourceId" : pyhiscal_id }
158
+
159
+
160
+ def on_event (event , context ):
161
+ logger .info (f"event: { event } " )
162
+ request_type = event ["RequestType" ]
163
+ if request_type == "Create" :
164
+ return on_create (event , context )
165
+ if request_type == "Update" :
166
+ return on_update (event , context )
167
+ if request_type == "Delete" :
168
+ return on_delete (event , context )
169
+ raise Exception ("Invalid request type: %s" % request_type )
0 commit comments