11
11
# and limitations under the License.
12
12
#
13
13
14
+ import logging
15
+ import os
16
+ import time
17
+ from typing import Sequence , TypedDict
18
+
19
+ import boto3
20
+ from custom_resources .cr_types import CustomResourceRequest , CustomResourceResponse
14
21
from opensearchpy import (
22
+ AuthorizationException ,
23
+ AWSV4SignerAuth ,
15
24
OpenSearch ,
16
25
RequestsHttpConnection ,
17
- AWSV4SignerAuth ,
18
- AuthorizationException ,
19
26
)
20
- import boto3
21
- import logging
22
- import os
23
- import time
24
27
from tenacity import (
25
28
retry ,
26
29
retry_if_exception_type ,
27
30
stop_after_attempt ,
28
31
wait_exponential_jitter ,
29
32
)
30
33
31
- from typing import TypedDict , Sequence
32
-
33
- from custom_resources .cr_types import CustomResourceRequest , CustomResourceResponse
34
-
35
34
LOG_LEVEL = os .getenv ("LOG_LEVEL" , "INFO" )
36
35
37
36
logger = logging .getLogger (__name__ )
@@ -44,12 +43,19 @@ class MetadataManagementField(TypedDict):
44
43
Filterable : bool
45
44
46
45
46
+ class AnalyzerProperties (TypedDict ):
47
+ CharacterFilters : Sequence [str ]
48
+ Tokenizer : str
49
+ TokenFilters : Sequence [str ]
50
+
51
+
47
52
class VectorIndexProperties (TypedDict ):
48
53
Endpoint : str
49
54
IndexName : str
50
55
VectorField : str
51
56
Dimensions : int | str
52
57
MetadataManagement : Sequence [MetadataManagementField ]
58
+ Analyzer : AnalyzerProperties | None
53
59
54
60
55
61
def validate_event (event : CustomResourceRequest [VectorIndexProperties ]) -> bool :
@@ -70,6 +76,14 @@ def validate_event(event: CustomResourceRequest[VectorIndexProperties]) -> bool:
70
76
raise ValueError ("MetadataManagement is required" )
71
77
if event ["RequestType" ] == "Update" and event ["PhysicalResourceId" ] is None :
72
78
raise ValueError ("PhysicalResourceId is required" )
79
+ if event ["ResourceProperties" ].get ("Analyzer" ) is not None :
80
+ analyzer = event ["ResourceProperties" ]["Analyzer" ]
81
+ if analyzer ["CharacterFilters" ] is None :
82
+ raise ValueError ("CharacterFilters is required" )
83
+ if analyzer ["Tokenizer" ] is None :
84
+ raise ValueError ("Tokenizer is required" )
85
+ if analyzer ["TokenFilters" ] is None :
86
+ raise ValueError ("TokenFilters is required" )
73
87
elif event ["RequestType" ] == "Delete" :
74
88
if event ["PhysicalResourceId" ] is None :
75
89
raise ValueError ("PhysicalResourceId is required" )
@@ -139,18 +153,39 @@ def create_mapping(
139
153
return mapping
140
154
141
155
142
- def create_index (client : OpenSearch , index_name : str , mapping : dict [str , str ]) -> None :
156
+ def create_setting (analyzer : AnalyzerProperties | None ) -> dict :
157
+ setting = {
158
+ "index" : {
159
+ "number_of_shards" : "2" ,
160
+ "knn.algo_param" : {"ef_search" : "512" },
161
+ "knn" : "true" ,
162
+ },
163
+ }
164
+ if analyzer :
165
+ setting ["analysis" ] = {
166
+ "analyzer" : {
167
+ "custom_analyzer" : {
168
+ "type" : "custom" ,
169
+ "tokenizer" : analyzer ["Tokenizer" ],
170
+ "char_filter" : analyzer ["CharacterFilters" ],
171
+ "filter" : analyzer ["TokenFilters" ],
172
+ }
173
+ }
174
+ }
175
+
176
+ return setting
177
+
178
+
179
+ def create_index (
180
+ client : OpenSearch , index_name : str , mapping : dict [str , str ], setting : dict [str , str ]
181
+ ) -> None :
143
182
logger .debug (f"creating index { index_name } " )
183
+ logger .debug (f"setting: { setting } " )
184
+ logger .debug (f"mapping: { mapping } " )
144
185
client .indices .create (
145
186
index_name ,
146
187
body = {
147
- "settings" : {
148
- "index" : {
149
- "number_of_shards" : "2" ,
150
- "knn.algo_param" : {"ef_search" : "512" },
151
- "knn" : "true" ,
152
- }
153
- },
188
+ "settings" : setting ,
154
189
"mappings" : mapping ,
155
190
},
156
191
params = {"wait_for_active_shards" : "all" },
@@ -171,13 +206,15 @@ def handle_create(
171
206
vector_field : str ,
172
207
dimensions : int ,
173
208
metadata_management : Sequence [MetadataManagementField ],
209
+ analyzer : AnalyzerProperties | None ,
174
210
):
175
211
if client .indices .exists (index_name ):
176
212
raise ValueError (f"Index { index_name } already exists" )
177
213
178
214
try :
179
215
mapping = create_mapping (vector_field , dimensions , metadata_management )
180
- create_index (client , index_name , mapping )
216
+ setting = create_setting (analyzer )
217
+ create_index (client , index_name , mapping , setting )
181
218
except Exception as e :
182
219
logger .error (f"Error creating index { index_name } " )
183
220
logger .exception (e )
@@ -211,6 +248,7 @@ def on_create(
211
248
event ["ResourceProperties" ]["VectorField" ],
212
249
int (event ["ResourceProperties" ]["Dimensions" ]),
213
250
event ["ResourceProperties" ]["MetadataManagement" ],
251
+ event ["ResourceProperties" ].get ("Analyzer" , None ),
214
252
)
215
253
return {"PhysicalResourceId" : physical_id }
216
254
0 commit comments