From d82a13f11b0b1a2394dfbcf10811d15e4c23ec8e Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 21 Mar 2024 10:59:07 -0700 Subject: [PATCH 1/2] Close notice in client documentation --- awsiot/greengrasscoreipc/__init__.py | 3 ++- awsiot/greengrasscoreipc/client.py | 5 ++++- awsiot/greengrasscoreipc/clientv2.py | 19 ++++--------------- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/awsiot/greengrasscoreipc/__init__.py b/awsiot/greengrasscoreipc/__init__.py index c60e68f8..02a9203b 100644 --- a/awsiot/greengrasscoreipc/__init__.py +++ b/awsiot/greengrasscoreipc/__init__.py @@ -25,7 +25,8 @@ def connect(*, lifecycle_handler: Optional[LifecycleHandler]=None, timeout: float=10.0) -> GreengrassCoreIPCClient: """ - Creates an IPC client and connects to the GreengrassCoreIPC service. + Creates an IPC client and connects to the GreengrassCoreIPC service. When finished with the client, + you must call close() to free the client's native resources. Args: ipc_socket: Path to the Unix domain socket of Greengrass Nucleus, defaults to diff --git a/awsiot/greengrasscoreipc/client.py b/awsiot/greengrasscoreipc/client.py index 7367b6eb..a1cf371e 100644 --- a/awsiot/greengrasscoreipc/client.py +++ b/awsiot/greengrasscoreipc/client.py @@ -1340,7 +1340,10 @@ def close(self): # type: (...) -> concurrent.futures.Future[None] class GreengrassCoreIPCClient(rpc.Client): """ - Client for the GreengrassCoreIPC service. + Client for the GreengrassCoreIPC service. When finished with the client, + you must call close() to free the client's native resources. + + There is a new V2 client which should be preferred. See the GreengrassCoreIPCClientV2 class in the clientv2 subpackage. diff --git a/awsiot/greengrasscoreipc/clientv2.py b/awsiot/greengrasscoreipc/clientv2.py index b8f85220..963fe85f 100644 --- a/awsiot/greengrasscoreipc/clientv2.py +++ b/awsiot/greengrasscoreipc/clientv2.py @@ -14,7 +14,8 @@ class GreengrassCoreIPCClientV2: """ - V2 client for the GreengrassCoreIPC service. + V2 client for the GreengrassCoreIPC service. When finished with the client, + you must call close() to free the client's native resources. Args: client: Connection that this client will use. If you do not provide one, it will be made automatically. @@ -33,7 +34,6 @@ def __init__(self, client: typing.Optional[GreengrassCoreIPCClient] = None, if executor is True: executor = concurrent.futures.ThreadPoolExecutor() self.executor = executor - self.ignore_executor_exceptions = False def close(self, *, executor_wait=True) -> concurrent.futures.Future: """ @@ -50,9 +50,6 @@ def close(self, *, executor_wait=True) -> concurrent.futures.Future: of None if the shutdown was clean and user-initiated. """ fut = self.client.close() - - # events that arrive during the shutdown process will generate executor exceptions, ignore them - self.ignore_executor_exceptions = True if self.executor is not None: self.executor.shutdown(wait=executor_wait) return fut @@ -88,11 +85,7 @@ def __create_stream_handler(real_self, operation, on_stream_event, on_stream_err on_stream_event = real_self.__wrap_error(on_stream_event) def handler(self, event): if real_self.executor is not None: - try: - real_self.executor.submit(on_stream_event, event) - except RuntimeError: - if not real_self.ignore_executor_exceptions: - raise + real_self.executor.submit(on_stream_event, event) else: on_stream_event(event) setattr(stream_handler_type, "on_stream_event", handler) @@ -105,11 +98,7 @@ def handler(self, error): on_stream_closed = real_self.__wrap_error(on_stream_closed) def handler(self): if real_self.executor is not None: - try: - real_self.executor.submit(on_stream_closed) - except RuntimeError: - if real_self.ignore_executor_exceptions: - raise + real_self.executor.submit(on_stream_closed) else: on_stream_closed() setattr(stream_handler_type, "on_stream_closed", handler) From 181f7099fb9aeb26d6bb08792860741ef918054e Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 21 Mar 2024 11:23:40 -0700 Subject: [PATCH 2/2] Regenerate v2 client excecutor exception handling --- awsiot/greengrasscoreipc/clientv2.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/awsiot/greengrasscoreipc/clientv2.py b/awsiot/greengrasscoreipc/clientv2.py index 963fe85f..256a2c94 100644 --- a/awsiot/greengrasscoreipc/clientv2.py +++ b/awsiot/greengrasscoreipc/clientv2.py @@ -34,6 +34,7 @@ def __init__(self, client: typing.Optional[GreengrassCoreIPCClient] = None, if executor is True: executor = concurrent.futures.ThreadPoolExecutor() self.executor = executor + self.ignore_executor_exceptions = False def close(self, *, executor_wait=True) -> concurrent.futures.Future: """ @@ -50,6 +51,9 @@ def close(self, *, executor_wait=True) -> concurrent.futures.Future: of None if the shutdown was clean and user-initiated. """ fut = self.client.close() + + # events that arrive during the shutdown process will generate executor exceptions, ignore them + self.ignore_executor_exceptions = True if self.executor is not None: self.executor.shutdown(wait=executor_wait) return fut @@ -85,7 +89,11 @@ def __create_stream_handler(real_self, operation, on_stream_event, on_stream_err on_stream_event = real_self.__wrap_error(on_stream_event) def handler(self, event): if real_self.executor is not None: - real_self.executor.submit(on_stream_event, event) + try: + real_self.executor.submit(on_stream_event, event) + except RuntimeError: + if not real_self.ignore_executor_exceptions: + raise else: on_stream_event(event) setattr(stream_handler_type, "on_stream_event", handler) @@ -98,7 +106,11 @@ def handler(self, error): on_stream_closed = real_self.__wrap_error(on_stream_closed) def handler(self): if real_self.executor is not None: - real_self.executor.submit(on_stream_closed) + try: + real_self.executor.submit(on_stream_closed) + except RuntimeError: + if real_self.ignore_executor_exceptions: + raise else: on_stream_closed() setattr(stream_handler_type, "on_stream_closed", handler)