5
5
6
6
from __future__ import annotations
7
7
8
+ import logging
8
9
import random
9
10
import string
10
11
from copy import deepcopy
16
17
from cfnlint .helpers import FUNCTION_FOR_EACH
17
18
from cfnlint .template .transforms ._types import TransformResult
18
19
20
+ LOGGER = logging .getLogger ("cfnlint" )
21
+
19
22
# initializing size of string
20
23
_N = 7
21
24
@@ -51,14 +54,14 @@ def language_extension(cfn: Any) -> TransformResult:
51
54
52
55
message = "Error transforming template: {0}"
53
56
if hasattr (e .key , "start_mark" ):
54
- sm_line = e .key .start_mark . line + 1
55
- sm_column = e .key .start_mark . column + 1
57
+ sm_line = e .key .start_mark [ 0 ] + 1
58
+ sm_column = e .key .start_mark [ 1 ] + 1
56
59
else :
57
60
sm_line = 1
58
61
sm_column = 1
59
62
if hasattr (e .key , "end_mark" ):
60
- em_line = e .key .end_mark . line + 1
61
- em_column = e .key .end_mark . column + 1
63
+ em_line = e .key .end_mark [ 0 ] + 1
64
+ em_column = e .key .end_mark [ 1 ] + 1
62
65
else :
63
66
em_line = 1
64
67
em_column = 1
@@ -76,6 +79,7 @@ def language_extension(cfn: Any) -> TransformResult:
76
79
], None
77
80
except Exception as e : # pylint: disable=broad-exception-caught
78
81
# pylint: disable=import-outside-toplevel
82
+ from cfnlint .match import Match # pylint: disable=cyclic-import
79
83
from cfnlint .rules import TransformError # pylint: disable=cyclic-import
80
84
81
85
message = "Error transforming template: {0}"
@@ -141,6 +145,17 @@ def _walk(self, item: Any, params: MutableMapping[str, Any], cfn: Any):
141
145
)
142
146
if only_string :
143
147
return obj [k ][0 ]
148
+ elif k == "Fn::FindInMap" :
149
+ try :
150
+ mapping = _ForEachValueFnFindInMap (get_hash (v ), v )
151
+ map_value = mapping .value (cfn , params )
152
+ if map_value is None :
153
+ del obj [k ]
154
+ continue
155
+ if isinstance (map_value , str ):
156
+ return map_value
157
+ except Exception as e : # pylint: disable=broad-exception-caught
158
+ LOGGER .debug ("Transform and Fn::FindInMap error: %s" , {str (e )})
144
159
elif k == "Ref" :
145
160
if isinstance (v , str ):
146
161
if v in params :
@@ -152,7 +167,11 @@ def _walk(self, item: Any, params: MutableMapping[str, Any], cfn: Any):
152
167
return params [r ]
153
168
obj [k ] = r
154
169
else :
155
- obj [k ] = self ._walk (v , params , cfn )
170
+ sub_value = self ._walk (v , params , cfn )
171
+ if sub_value is None :
172
+ del obj [k ]
173
+ else :
174
+ obj [k ] = self ._walk (v , params , cfn )
156
175
elif isinstance (obj , list ):
157
176
for i , v in enumerate (obj ):
158
177
obj [i ] = self ._walk (v , params , cfn )
@@ -195,45 +214,89 @@ def create(obj: Any) -> _ForEachValue:
195
214
raise _TypeError (f"Unsupported value { obj !r} " , obj )
196
215
197
216
# pylint: disable=unused-argument
198
- def value (self , cfn ):
217
+ def value (
218
+ self , cfn , params : Optional [Mapping [str , Any ]] = None , only_params : bool = False
219
+ ):
199
220
return self ._value
200
221
201
222
@property
202
223
def hash (self ):
203
224
return self ._hash
204
225
205
226
227
+ class _FnFindInMapDefaultValue (_ForEachValue ):
228
+ def __init__ (self , _hash : str , value : Any = None ) -> None :
229
+ super ().__init__ (_hash , value )
230
+ if not isinstance (value , dict ):
231
+ raise _TypeError (
232
+ "Fn::FindInMap parameter must be an object with key 'DefaultValue'" ,
233
+ value ,
234
+ )
235
+ if len (value ) != 1 :
236
+ raise _ValueError (
237
+ "Fn::FindInMap parameter only supports 'DefaultValue'" , value
238
+ )
239
+
240
+ for k , v in value .items ():
241
+ if k != "DefaultValue" :
242
+ raise _ValueError (
243
+ "Fn::FindInMap parameter only supports 'DefaultValue'" , value
244
+ )
245
+ self ._value = _ForEachValue .create (v )
246
+
247
+ def value (
248
+ self , cfn , params : Optional [Mapping [str , Any ]] = None , only_params : bool = False
249
+ ):
250
+ if params is None :
251
+ params = {}
252
+
253
+ return self ._value .value (cfn , params , only_params )
254
+
255
+
206
256
class _ForEachValueFnFindInMap (_ForEachValue ):
207
257
def __init__ (self , _hash : str , obj : Any ) -> None :
208
258
super ().__init__ (_hash )
209
259
if not isinstance (obj , list ):
210
260
raise _TypeError ("Fn::FindInMap should be a list" , obj )
211
261
212
- if len (obj ) != 3 :
213
- raise _ValueError ("Fn::FindInMap requires a list of 3 values" , obj )
262
+ if len (obj ) not in [ 3 , 4 ] :
263
+ raise _ValueError ("Fn::FindInMap requires a list of 3 or 4 values" , obj )
214
264
215
265
self ._map = [
216
266
_ForEachValue .create (obj [0 ]),
217
267
_ForEachValue .create (obj [1 ]),
218
268
_ForEachValue .create (obj [2 ]),
219
269
]
270
+ if len (obj ) == 4 :
271
+ self ._map .append (_FnFindInMapDefaultValue (get_hash (obj [3 ]), obj [3 ]))
220
272
221
273
self ._obj = obj
222
274
223
- def value (self , cfn : Any ) -> Any :
275
+ def value (
276
+ self ,
277
+ cfn : Any ,
278
+ params : Optional [Mapping [str , Any ]] = None ,
279
+ only_params : bool = False ,
280
+ ) -> Any :
281
+ if params is None :
282
+ params = {}
224
283
t_map = deepcopy (self ._map )
225
284
mapping = None
226
285
227
286
try :
228
- mapping = cfn .template .get ("Mappings" , {}).get (t_map [0 ].value (cfn ))
287
+ mapping = cfn .template .get ("Mappings" , {}).get (
288
+ t_map [0 ].value (cfn , params , only_params )
289
+ )
229
290
except Exception : # pylint: disable=broad-exception-caught
230
291
if len (cfn .template .get ("Mappings" , {}).keys ()) == 1 :
231
292
mapping = cfn .template .get ("Mappings" , {}).get (
232
293
list (cfn .template .get ("Mappings" , {}).keys ())[0 ]
233
294
)
234
295
235
296
try :
236
- if mapping is None and isinstance (t_map [1 ].value (cfn ), str ):
297
+ if mapping is None and isinstance (
298
+ t_map [1 ].value (cfn , params , only_params ), str
299
+ ):
237
300
for k , v in cfn .template .get ("Mappings" , {}).items ():
238
301
if isinstance (v , dict ):
239
302
if t_map [1 ].value (cfn ) in v :
@@ -244,12 +307,14 @@ def value(self, cfn: Any) -> Any:
244
307
pass
245
308
246
309
try :
247
- if mapping is None and isinstance (t_map [2 ].value (cfn ), str ):
310
+ if mapping is None and isinstance (
311
+ t_map [2 ].value (cfn , params , only_params ), str
312
+ ):
248
313
for m1 , mv1 in cfn .template .get ("Mappings" , {}).items ():
249
314
if isinstance (mv1 , dict ):
250
315
for k1 , kv1 in mv1 .items ():
251
316
if isinstance (kv1 , dict ):
252
- if t_map [2 ].value (cfn ) in kv1 :
317
+ if t_map [2 ].value (cfn , params , only_params ) in kv1 :
253
318
t_map [1 ] = _ForEachValue .create (k1 )
254
319
t_map [0 ] = _ForEachValue .create (m1 )
255
320
mapping = mv1
@@ -265,17 +330,23 @@ def value(self, cfn: Any) -> Any:
265
330
t_map [2 ].value (cfn )
266
331
for k , v in mapping .items ():
267
332
if isinstance (v , dict ):
268
- if t_map [2 ].value (cfn ) in v :
333
+ if t_map [2 ].value (cfn , params , only_params ) in v :
269
334
t_map [1 ] = _ForEachValue .create (k )
270
335
except _ResolveError :
271
336
pass
272
337
273
338
if mapping :
274
339
try :
275
- return mapping .get (t_map [1 ].value (cfn )).get (t_map [2 ].value (cfn ))
340
+ return mapping .get (t_map [1 ].value (cfn , params , only_params ), {}).get (
341
+ t_map [2 ].value (cfn , params , only_params )
342
+ )
276
343
except _ResolveError as e :
344
+ if len (self ._map ) == 4 :
345
+ return self ._map [3 ].value (cfn , params , only_params )
277
346
raise _ResolveError ("Can't resolve Fn::FindInMap" , self ._obj ) from e
278
347
348
+ if len (self ._map ) == 4 :
349
+ return self ._map [3 ].value (cfn , params , only_params )
279
350
raise _ResolveError ("Can't resolve Fn::FindInMap" , self ._obj )
280
351
281
352
@@ -289,12 +360,25 @@ def __init__(self, _hash: str, obj: Any) -> None:
289
360
self ._obj = obj
290
361
291
362
# pylint: disable=too-many-return-statements
292
- def value (self , cfn : Any ) -> Any :
363
+ def value (
364
+ self ,
365
+ cfn : Any ,
366
+ params : Optional [Mapping [str , Any ]] = None ,
367
+ only_params : bool = False ,
368
+ ) -> Any :
369
+ if params is None :
370
+ params = {}
293
371
v = self ._ref .value (cfn )
294
372
295
373
if not isinstance (v , str ):
296
374
raise _ResolveError ("Can't resolve Fn::Ref" , self ._obj )
297
375
376
+ if v in params :
377
+ return params [v ]
378
+
379
+ if only_params :
380
+ raise _ResolveError ("Can't resolve Fn::Ref" , self ._obj )
381
+
298
382
region = cfn .regions [0 ]
299
383
account_id = "123456789012"
300
384
partition = "aws"
@@ -373,15 +457,15 @@ def values(
373
457
if self ._collection :
374
458
for item in self ._collection :
375
459
try :
376
- yield item .value (cfn )
460
+ yield item .value (cfn , {}, False )
377
461
except _ResolveError :
378
462
v = "" .join (random .choices (string .ascii_letters , k = _N )) # nosec
379
463
collection_cache [item .hash ] = v
380
464
yield v
381
465
return
382
466
if self ._fn :
383
467
try :
384
- values = self ._fn .value (cfn )
468
+ values = self ._fn .value (cfn , {}, False )
385
469
if values :
386
470
if isinstance (values , list ):
387
471
for value in values :
0 commit comments