diff --git a/awsiot/greengrasscoreipc/client.py b/awsiot/greengrasscoreipc/client.py index 3bb5cf49..c14c3470 100644 --- a/awsiot/greengrasscoreipc/client.py +++ b/awsiot/greengrasscoreipc/client.py @@ -1109,7 +1109,7 @@ class GreengrassCoreIPCClient(rpc.Client): """ Client for the GreengrassCoreIPC service. There is a new V2 client available for testing in developer preview. - See the GreengrassCoreIPCClientV2 class. + See the GreengrassCoreIPCClientV2 class in the clientv2 subpackage. Args: connection: Connection that this client will use. diff --git a/awsiot/greengrasscoreipc/clientv2.py b/awsiot/greengrasscoreipc/clientv2.py index 970e262f..7fa9418f 100644 --- a/awsiot/greengrasscoreipc/clientv2.py +++ b/awsiot/greengrasscoreipc/clientv2.py @@ -34,7 +34,7 @@ def __init__(self, client: typing.Optional[GreengrassCoreIPCClient] = None, import awsiot.greengrasscoreipc client = awsiot.greengrasscoreipc.connect() self.client = client - if executor == True: + if executor is True: executor = concurrent.futures.ThreadPoolExecutor() self.executor = executor @@ -67,9 +67,25 @@ def callback(*args, **kwargs): future1.add_done_callback(callback) return future2 + @staticmethod + def __handle_error(): + import sys + import traceback + traceback.print_exc(file=sys.stderr) + + def __wrap_error(self, func): + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + self.__handle_error() + raise e + return wrapper + def __create_stream_handler(real_self, operation, on_stream_event, on_stream_error, on_stream_closed): stream_handler_type = type(operation + 'Handler', (getattr(client, operation + "StreamHandler"),), {}) if on_stream_event is not None: + on_stream_event = real_self.__wrap_error(on_stream_event) def handler(self, event): if real_self.executor is not None: real_self.executor.submit(on_stream_event, event) @@ -77,10 +93,12 @@ def handler(self, event): on_stream_event(event) setattr(stream_handler_type, "on_stream_event", handler) if on_stream_error is not None: + on_stream_error = real_self.__wrap_error(on_stream_error) def handler(self, error): return on_stream_error(error) setattr(stream_handler_type, "on_stream_error", handler) if on_stream_closed is not None: + on_stream_closed = real_self.__wrap_error(on_stream_closed) def handler(self): if real_self.executor is not None: real_self.executor.submit(on_stream_closed) diff --git a/test/test_ggv2.py b/test/test_ggv2.py index 4d251a6d..ab2195de 100644 --- a/test/test_ggv2.py +++ b/test/test_ggv2.py @@ -2,6 +2,8 @@ import threading from unittest import TestCase from unittest.mock import patch +import io +import contextlib from awsiot.greengrasscoreipc.client import SubscribeToTopicStreamHandler from awsiot.greengrasscoreipc.model import CreateLocalDeploymentResponse, SubscribeToTopicResponse, \ @@ -83,3 +85,15 @@ def on_stream_event(self, event): self.assertEqual("xyz".encode("utf-8"), subscription_fut.result(TIMEOUT).binary_message.message) self.assertEqual(threading.get_ident(), thread_id_fut.result(TIMEOUT)) + + # Verify we nicely print errors in user-provided handler methods + def on_stream_event(r): + raise ValueError("Broken!") + + c.subscribe_to_topic(topic="abc", on_stream_event=on_stream_event) + sub_handler = mock_client.new_subscribe_to_topic.call_args[0][0] + f = io.StringIO() + with contextlib.redirect_stderr(f): + self.assertRaises(ValueError, lambda: sub_handler.on_stream_event( + SubscriptionResponseMessage(binary_message=BinaryMessage(message="xyz")))) + self.assertIn("ValueError: Broken!", f.getvalue())