Skip to content

Commit a22f7db

Browse files
committed
feat(parameters): add multi-thread option
1 parent d238286 commit a22f7db

File tree

1 file changed

+58
-22
lines changed
  • aws_lambda_powertools/utilities/parameters

1 file changed

+58
-22
lines changed

Diff for: aws_lambda_powertools/utilities/parameters/ssm.py

+58-22
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""
22
AWS SSM Parameter retrieval and caching utility
33
"""
4-
4+
import concurrent.futures
5+
import functools
6+
from concurrent.futures import Future
57
from typing import TYPE_CHECKING, Any, Dict, Optional, Union, overload
68

79
import boto3
@@ -107,7 +109,7 @@ def get( # type: ignore[override]
107109
transform: Optional[str] = None,
108110
decrypt: bool = False,
109111
force_fetch: bool = False,
110-
**sdk_options
112+
**sdk_options,
111113
) -> Optional[Union[str, dict, bytes]]:
112114
"""
113115
Retrieve a parameter value or return the cached value
@@ -207,7 +209,7 @@ def get_parameter(
207209
decrypt: bool = False,
208210
force_fetch: bool = False,
209211
max_age: int = DEFAULT_MAX_AGE_SECS,
210-
**sdk_options
212+
**sdk_options,
211213
) -> Union[str, dict, bytes]:
212214
"""
213215
Retrieve a parameter value from AWS Systems Manager (SSM) Parameter Store
@@ -276,7 +278,7 @@ def get_parameters(
276278
force_fetch: bool = False,
277279
max_age: int = DEFAULT_MAX_AGE_SECS,
278280
raise_on_transform_error: bool = False,
279-
**sdk_options
281+
**sdk_options,
280282
) -> Union[Dict[str, str], Dict[str, dict], Dict[str, bytes]]:
281283
"""
282284
Retrieve multiple parameter values from AWS Systems Manager (SSM) Parameter Store
@@ -343,7 +345,7 @@ def get_parameters(
343345
transform=transform,
344346
raise_on_transform_error=raise_on_transform_error,
345347
force_fetch=force_fetch,
346-
**sdk_options
348+
**sdk_options,
347349
)
348350

349351

@@ -354,6 +356,7 @@ def get_parameters_by_name(
354356
decrypt: bool = False,
355357
force_fetch: bool = False,
356358
max_age: int = DEFAULT_MAX_AGE_SECS,
359+
parallel: bool = False,
357360
) -> Dict[str, str]:
358361
...
359362

@@ -365,6 +368,7 @@ def get_parameters_by_name(
365368
decrypt: bool = False,
366369
force_fetch: bool = False,
367370
max_age: int = DEFAULT_MAX_AGE_SECS,
371+
parallel: bool = False,
368372
) -> Dict[str, bytes]:
369373
...
370374

@@ -376,6 +380,7 @@ def get_parameters_by_name(
376380
decrypt: bool = False,
377381
force_fetch: bool = False,
378382
max_age: int = DEFAULT_MAX_AGE_SECS,
383+
parallel: bool = False,
379384
) -> Dict[str, Dict[str, Any]]:
380385
...
381386

@@ -387,6 +392,7 @@ def get_parameters_by_name(
387392
decrypt: bool = False,
388393
force_fetch: bool = False,
389394
max_age: int = DEFAULT_MAX_AGE_SECS,
395+
parallel: bool = False,
390396
) -> Union[Dict[str, str], Dict[str, dict]]:
391397
...
392398

@@ -397,6 +403,7 @@ def get_parameters_by_name(
397403
decrypt: bool = False,
398404
force_fetch: bool = False,
399405
max_age: int = DEFAULT_MAX_AGE_SECS,
406+
parallel: bool = True,
400407
) -> Union[Dict[str, str], Dict[str, bytes], Dict[str, dict]]:
401408
"""
402409
Retrieve multiple parameter values by name from AWS Systems Manager (SSM) Parameter Store
@@ -428,24 +435,53 @@ def get_parameters_by_name(
428435
# NOTE: Need a param for hard failure mode on parameter retrieval
429436
# by default, we should return an empty string on failure (ask customer for desired behaviour)
430437

431-
# NOTE: Check costs of threads to assess when it's worth the overhead.
432-
# for threads, assess failure mode to absorb OR raise/cancel futures
438+
# NOTE: Decide whether to leave multi-threaded option or not due to slower results (throttling+fork cost)
433439

434440
ret: Dict[str, Any] = {}
435-
436-
for parameter, options in parameters.items():
437-
if isinstance(options, dict):
438-
transform = options.get("transform") or transform
439-
decrypt = options.get("decrypt") or decrypt
440-
max_age = options.get("max_age") or max_age
441-
force_fetch = options.get("force_fetch") or force_fetch
442-
443-
ret[parameter] = get_parameter(
444-
name=parameter,
445-
transform=transform,
446-
decrypt=decrypt,
447-
max_age=max_age,
448-
force_fetch=force_fetch,
449-
)
441+
future_to_param: Dict[Future, str] = {}
442+
443+
if parallel:
444+
with concurrent.futures.ThreadPoolExecutor(max_workers=len(parameters)) as pool:
445+
for parameter, options in parameters.items():
446+
if isinstance(options, dict):
447+
transform = options.get("transform") or transform
448+
decrypt = options.get("decrypt") or decrypt
449+
max_age = options.get("max_age") or max_age
450+
force_fetch = options.get("force_fetch") or force_fetch
451+
452+
fetch_parameter_callable = functools.partial(
453+
get_parameter,
454+
name=parameter,
455+
transform=transform,
456+
decrypt=decrypt,
457+
max_age=max_age,
458+
force_fetch=force_fetch,
459+
)
460+
461+
future = pool.submit(fetch_parameter_callable)
462+
future_to_param[future] = parameter
463+
464+
for future in concurrent.futures.as_completed(future_to_param):
465+
try:
466+
# "parameter": "future result"
467+
ret[future_to_param[future]] = future.result()
468+
except Exception as exc:
469+
print(f"Uh oh, failed to fetch '{future_to_param[future]}': {exc}")
470+
471+
else:
472+
for parameter, options in parameters.items():
473+
if isinstance(options, dict):
474+
transform = options.get("transform") or transform
475+
decrypt = options.get("decrypt") or decrypt
476+
max_age = options.get("max_age") or max_age
477+
force_fetch = options.get("force_fetch") or force_fetch
478+
479+
ret[parameter] = get_parameter(
480+
name=parameter,
481+
transform=transform,
482+
decrypt=decrypt,
483+
max_age=max_age,
484+
force_fetch=force_fetch,
485+
)
450486

451487
return ret

0 commit comments

Comments
 (0)