Skip to content

[Thread-safety] Concurrent instantiation of Boto3 clients #1718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
dispyfree opened this issue Nov 14, 2022 · 8 comments · Fixed by #1899
Closed

[Thread-safety] Concurrent instantiation of Boto3 clients #1718

dispyfree opened this issue Nov 14, 2022 · 8 comments · Fixed by #1899
Assignees
Labels
bug Something isn't working idempotency Idempotency utility

Comments

@dispyfree
Copy link

dispyfree commented Nov 14, 2022

Expected Behaviour

The decorator idempotent_function should be thread-safe. In particular, consumers of the library should not have to worry about threading issues in the underlying library.

In particular, the code snippet provided below should run without errors.

Current Behaviour

AWS Powertools instantiates boto3 client objects concurrently. While boto3 client objects are thread-safe, their instantiation is not.
Hence, we see exceptions like the following:

Traceback (most recent call last):
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\base.py", line 106, in _process_idempotency
self.persistence_store.save_inprogress(
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\persistence\base.py", line 370, in save_inprogress
self._put_record(data_record=data_record)
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\persistence\dynamodb.py", line 218, in _put_record
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
File "ROOTidempotency_problem\venv\lib\site-packages\aws_lambda_powertools\utilities\idempotency\persistence\dynamodb.py", line 108, in table
ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
File "ROOTidempotency_problem\venv\lib\site-packages\boto3\session.py", line 446, in resource
client = self.client(
File "ROOTidempotency_problem\venv\lib\site-packages\boto3\session.py", line 299, in client
return self._session.create_client(
File "ROOTidempotency_problem\venv\lib\site-packages\botocore\session.py", line 953, in create_client
endpoint_resolver = self._get_internal_component('endpoint_resolver')
File "ROOTidempotency_problem\venv\lib\site-packages\botocore\session.py", line 812, in _get_internal_component
return self._internal_components.get_component(name)
File "ROOTidempotency_problem\venv\lib\site-packages\botocore\session.py", line 1112, in get_component
del self._deferred[name]
KeyError: 'endpoint_resolver'

Code snippet

# Two threads, reliably reproducing the issue with 1.31.1 as well as the latest version (2.2.0): 

import functools
import inspect
import threading
import time
from dataclasses import dataclass
from enum import Enum
from botocore.config import Config

from threading import Thread
from typing import Callable

from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent_function
from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer as OriginalDynamoDBPersistenceLayer

idempotentTableName: str = 'trading_idempotency'


class DynamoDBPersistenceLayer(OriginalDynamoDBPersistenceLayer):
	_instance = None

	# def __new__(cls, *_, **__):
	# 	if cls._instance is None:
	# 		cls._instance = super(DynamoDBPersistenceLayer, cls).__new__(cls)
	# 	return cls._instance


def mapArgsToKwargs(function: Callable, args: tuple, kwargs: dict) -> tuple[tuple, dict]:
	"""
	assumption: valid invocation signature
	"""
	argsList: list = list(args)
	kwargsNew: dict = kwargs.copy()

	signature: inspect.Signature = inspect.signature(function)
	defaultKwargs: dict = {k: v.default for k, v in signature.parameters.items() if v.default is not inspect.Parameter.empty}
	funcArgs: list = [arg for arg, _ in signature.parameters.items()]

	for funcArg in funcArgs:
		if argsList:
			kwargsNew[funcArg] = argsList.pop(0)

	for defaultKwargKey in defaultKwargs:
		if defaultKwargKey not in kwargsNew:
			kwargsNew[defaultKwargKey] = defaultKwargs[defaultKwargKey]
	return (), kwargsNew


def retryFunction(func: Callable, args: tuple = (), kwargs: dict = {}, maxRetries: int = 5, sleepFactor: float = 1) -> any:
	retry: int = 0
	while retry < maxRetries:
		time.sleep(retry * sleepFactor)
		try:
			result: any = func(*args, **kwargs)
			return result
		except Exception as e:
			exception = e
			retry += 1
			message: str = f'function "{func.__qualname__}" with args {args} and kwargs {kwargs} failed. retry count at {retry}: '
			print(message + str(e))
	time.sleep(0.1)
	raise exception


region_config = Config(
	region_name='us-east-1',
)
dynamo: DynamoDBPersistenceLayer = DynamoDBPersistenceLayer(table_name=idempotentTableName, boto_config = region_config)

def idempotentFunction(cacheKey: str, expectedDurationSeconds: int = 5, expiresAfterSeconds: int = 60 * 5):
	def decorator(function: Callable):
		@functools.wraps(function)
		def wrapper(*args, **kwargs):
			argsNew, kwargsNew = mapArgsToKwargs(function, args, kwargs)

			config: IdempotencyConfig = IdempotencyConfig(expires_after_seconds=expiresAfterSeconds)

			dynamo.configure(config=config, function_name=function.__qualname__)
			hashKey: str = dynamo._get_hashed_idempotency_key(kwargsNew[cacheKey].__dict__)
			functionToInvoke: Callable = idempotent_function(function, data_keyword_argument=cacheKey, persistence_store=dynamo, config=config)

			print(f'invoking idempotent function {function.__module__}.{function.__qualname__}, {hashKey} , with data {kwargsNew[cacheKey].__dict__}')
			# returnValue = helper.retryFunction(functionToInvoke, args=argsNew, kwargs=kwargsNew, sleepFactor=expectedDurationSeconds)
			returnValue = functionToInvoke(*argsNew, **kwargsNew)
			return returnValue

		return wrapper

	return decorator


class Side(str, Enum):
	BUY: str = 'buy'
	SELL: str = 'sell'


@dataclass
class TradeOrder:
	isin: str
	quantity: int
	side: Side
	limitPrice: float = None
	stopPrice: float = None

	def __str__(self) -> str:
		return f'isin: {self.isin}, quantity {self.quantity}, type {self.side.value}, optional limit/stop price {self.limitPrice}/{self.stopPrice}'


t1 = TradeOrder(isin='US7140461093', quantity=200, side=Side(Side.BUY))
t2 = TradeOrder(isin='US7140461091', quantity=201, side=Side(Side.BUY))


@idempotentFunction(cacheKey='tradeOrder')
def myFancyFunction(tradeOrder: TradeOrder):
	print(tradeOrder)
	time.sleep(5)
	return 'data'


def invoke(t: TradeOrder):
	myFancyFunction(t)


threads = [
	Thread(target=invoke, args=(t1,)),
	Thread(target=invoke, args=(t2,)),
]
for thread in threads:
	thread.start()

for thread in threads:
	thread.join()

Possible Solution

Simply instantiating the client as part of the persistance layer itself, instead of re-instantiating the client upon access of the property mitigates the problem to a large part. In particular, the chance of collisions is reduced greatly. In particular, if thread instantiation itself is sequentially done, then this already mitigates the problem entirely.
This is a patch for DynamoDBPersistanceLayer:

Index: venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py b/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py
--- a/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py	
+++ b/venv/Lib/site-packages/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py	(date 1668465118921)
@@ -95,6 +95,7 @@
        self.status_attr = status_attr
        self.data_attr = data_attr
        self.validation_key_attr = validation_key_attr
+        self.ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
        super(DynamoDBPersistenceLayer, self).__init__()

    @property
@@ -105,8 +106,8 @@
        """
        if self._table:
            return self._table
-        ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
-        self._table = ddb_resource.Table(self.table_name)
+
+        self._table = self.ddb_resource.Table(self.table_name)
        return self._table

    @table.setter

Steps to Reproduce

(see code snippet above - "minimal breaking example" )

AWS Lambda Powertools for Python version

latest

AWS Lambda function runtime

3.9

Packaging format used

PyPi

Debugging logs

No response

@dispyfree dispyfree added bug Something isn't working triage Pending triage from maintainers labels Nov 14, 2022
@heitorlessa
Copy link
Contributor

hey @dispyfree thanks a lot for flagging this, agree, our oversight (not sure why we did this way) -- if you have the bandwidth, I'd appreciate a PR on this while I deal with unrelated issues, so I can make it part of this week's release on time (fine to make a patch release next week too if we can't).

@heitorlessa heitorlessa added idempotency Idempotency utility and removed triage Pending triage from maintainers labels Nov 15, 2022
@heitorlessa
Copy link
Contributor

Just remembered why that was done - lazy initialisation to ease testing. If it was an existing session it could've even thread-safe.

@heitorlessa heitorlessa added the help wanted Could use a second pair of eyes/hands label Nov 17, 2022
@heitorlessa
Copy link
Contributor

Had a quick look into this and it needs a deeper investigation - Boto3 client is thread-safe, when using a Session, however we use the higher level client called Resource which is not thread-safe, requiring one per thread.

For now:

  • Signalling. I've added the help wanted label as the most time consuming is creating a functional test to confidently experiment non-breaking change solutions.
  • Docs. Updating the docs as we speak to call out this isn't a thread-safe operation when calling functions decorated with idempotent_function.

This will highly likely be tackled after re:Invent or in January given PTOs, etc. If anyone else reading this has the bandwidth to implement it along with a functional test to certify the behaviour we'll gladly accept any PR.

@github-actions github-actions bot added the pending-release Fix or implementation already in dev waiting to be released label Nov 17, 2022
@github-actions
Copy link
Contributor

This is now released under 2.3.0 version!

@github-actions github-actions bot removed the pending-release Fix or implementation already in dev waiting to be released label Nov 17, 2022
@heitorlessa heitorlessa reopened this Nov 17, 2022
@mploski mploski self-assigned this Jan 10, 2023
@heitorlessa
Copy link
Contributor

@mploski checking in for an update/ETA when you think you might be able to start working on this. Thanks a lot!

@mploski
Copy link
Contributor

mploski commented Jan 31, 2023

@heitorlessa I've already started working on this but was occupied by other activities, unfortunately. Plan to open the PR by the end of the week :-)

@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2023

⚠️COMMENT VISIBILITY WARNING⚠️

This issue is now closed. Please be mindful that future comments are hard for our team to see.

If you need more assistance, please either tag a team member or open a new issue that references this one.

If you wish to keep having a conversation with other community members under this issue feel free to do so.

@mploski
Copy link
Contributor

mploski commented Feb 10, 2023

Hey @dispyfree, fixes related to issue raised here were merged and will be released soon. Thank you for your contribution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working idempotency Idempotency utility
Projects
None yet
4 participants