Skip to content

Commit 57890a1

Browse files
authored
GGV2 client: add exception logging so that errors are always visible, even when using executors (#277)
1 parent 2b9ead0 commit 57890a1

File tree

3 files changed

+34
-2
lines changed

3 files changed

+34
-2
lines changed

awsiot/greengrasscoreipc/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,7 @@ class GreengrassCoreIPCClient(rpc.Client):
11091109
"""
11101110
Client for the GreengrassCoreIPC service.
11111111
There is a new V2 client available for testing in developer preview.
1112-
See the GreengrassCoreIPCClientV2 class.
1112+
See the GreengrassCoreIPCClientV2 class in the clientv2 subpackage.
11131113
11141114
Args:
11151115
connection: Connection that this client will use.

awsiot/greengrasscoreipc/clientv2.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def __init__(self, client: typing.Optional[GreengrassCoreIPCClient] = None,
3434
import awsiot.greengrasscoreipc
3535
client = awsiot.greengrasscoreipc.connect()
3636
self.client = client
37-
if executor == True:
37+
if executor is True:
3838
executor = concurrent.futures.ThreadPoolExecutor()
3939
self.executor = executor
4040

@@ -67,20 +67,38 @@ def callback(*args, **kwargs):
6767
future1.add_done_callback(callback)
6868
return future2
6969

70+
@staticmethod
71+
def __handle_error():
72+
import sys
73+
import traceback
74+
traceback.print_exc(file=sys.stderr)
75+
76+
def __wrap_error(self, func):
77+
def wrapper(*args, **kwargs):
78+
try:
79+
return func(*args, **kwargs)
80+
except Exception as e:
81+
self.__handle_error()
82+
raise e
83+
return wrapper
84+
7085
def __create_stream_handler(real_self, operation, on_stream_event, on_stream_error, on_stream_closed):
7186
stream_handler_type = type(operation + 'Handler', (getattr(client, operation + "StreamHandler"),), {})
7287
if on_stream_event is not None:
88+
on_stream_event = real_self.__wrap_error(on_stream_event)
7389
def handler(self, event):
7490
if real_self.executor is not None:
7591
real_self.executor.submit(on_stream_event, event)
7692
else:
7793
on_stream_event(event)
7894
setattr(stream_handler_type, "on_stream_event", handler)
7995
if on_stream_error is not None:
96+
on_stream_error = real_self.__wrap_error(on_stream_error)
8097
def handler(self, error):
8198
return on_stream_error(error)
8299
setattr(stream_handler_type, "on_stream_error", handler)
83100
if on_stream_closed is not None:
101+
on_stream_closed = real_self.__wrap_error(on_stream_closed)
84102
def handler(self):
85103
if real_self.executor is not None:
86104
real_self.executor.submit(on_stream_closed)

test/test_ggv2.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import threading
33
from unittest import TestCase
44
from unittest.mock import patch
5+
import io
6+
import contextlib
57

68
from awsiot.greengrasscoreipc.client import SubscribeToTopicStreamHandler
79
from awsiot.greengrasscoreipc.model import CreateLocalDeploymentResponse, SubscribeToTopicResponse, \
@@ -83,3 +85,15 @@ def on_stream_event(self, event):
8385

8486
self.assertEqual("xyz".encode("utf-8"), subscription_fut.result(TIMEOUT).binary_message.message)
8587
self.assertEqual(threading.get_ident(), thread_id_fut.result(TIMEOUT))
88+
89+
# Verify we nicely print errors in user-provided handler methods
90+
def on_stream_event(r):
91+
raise ValueError("Broken!")
92+
93+
c.subscribe_to_topic(topic="abc", on_stream_event=on_stream_event)
94+
sub_handler = mock_client.new_subscribe_to_topic.call_args[0][0]
95+
f = io.StringIO()
96+
with contextlib.redirect_stderr(f):
97+
self.assertRaises(ValueError, lambda: sub_handler.on_stream_event(
98+
SubscriptionResponseMessage(binary_message=BinaryMessage(message="xyz"))))
99+
self.assertIn("ValueError: Broken!", f.getvalue())

0 commit comments

Comments
 (0)