diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 71fd459191..f78ab11d6f 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -52,20 +52,33 @@ def _find_return_type(func): def iter_resp_lines(resp): - prev = "" - for seg in resp.stream(amt=None, decode_content=False): - if isinstance(seg, bytes): - seg = seg.decode('utf8') - seg = prev + seg - lines = seg.split("\n") - if not seg.endswith("\n"): - prev = lines[-1] - lines = lines[:-1] + buffer = bytearray() + for segment in resp.stream(amt=None, decode_content=False): + + # Append the segment (chunk) to the buffer + # + # Performance note: depending on contents of buffer and the type+value of segment, + # encoding segment into the buffer could be a wasteful step. The approach used here + # simplifies the logic farther down, but in the future it may be reasonable to + # sacrifice readability for performance. + if isinstance(segment, bytes): + buffer.extend(segment) + elif isinstance(segment, str): + buffer.extend(segment.encode("utf-8")) else: - prev = "" - for line in lines: + raise TypeError( + f"Received invalid segment type, {type(segment)}, from stream. Accepts only 'str' or 'bytes'.") + + # Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte) + next_newline = buffer.find(b'\n') + while next_newline != -1: + # Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character + line = buffer[:next_newline].decode( + "utf-8", errors="replace") + buffer = buffer[next_newline+1:] if line: yield line + next_newline = buffer.find(b'\n') class Watch(object): diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index 8164e7b5da..c5bc5c378c 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -61,6 +61,9 @@ def test_watch_with_decode(self): if count == 4: w.stop() + # make sure that all three records were consumed by the stream + self.assertEqual(4, count) + fake_api.get_namespaces.assert_called_once_with( _preload_content=False, watch=True) fake_resp.stream.assert_called_once_with( @@ -68,6 +71,123 @@ def test_watch_with_decode(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_with_interspersed_newlines(self): + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock( + return_value=[ + '\n', + '{"type": "ADDED", "object": {"metadata":', + '{"name": "test1","resourceVersion": "1"}}}\n{"type": "ADDED", ', + '"object": {"metadata": {"name": "test2", "resourceVersion": "2"}}}\n', + '\n', + '', + '{"type": "ADDED", "object": {"metadata": {"name": "test3", "resourceVersion": "3"}}}\n', + '\n\n\n', + '\n', + ]) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + count = 0 + + # Consume all test events from the mock service, stopping when no more data is available. + # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is + # the only way to do so. Without that, the stream will re-read the test data forever. + for e in w.stream(fake_api.get_namespaces, timeout_seconds=1): + count += 1 + self.assertEqual("test%d" % count, e['object'].metadata.name) + self.assertEqual(3, count) + + def test_watch_with_multibyte_utf8(self): + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock( + return_value=[ + # two-byte utf-8 character + '{"type":"MODIFIED","object":{"data":{"utf-8":"© 1"},"metadata":{"name":"test1","resourceVersion":"1"}}}\n', + # same copyright character expressed as bytes + b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2\xA9 2"},"metadata":{"name":"test2","resourceVersion":"2"}}}\n' + # same copyright character with bytes split across two stream chunks + b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2', + b'\xA9 3"},"metadata":{"n', + # more chunks of the same event, sent as a mix of bytes and strings + 'ame":"test3","resourceVersion":"3"', + '}}}', + b'\n' + ]) + + fake_api = Mock() + fake_api.get_configmaps = Mock(return_value=fake_resp) + fake_api.get_configmaps.__doc__ = ':return: V1ConfigMapList' + + w = Watch() + count = 0 + + # Consume all test events from the mock service, stopping when no more data is available. + # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is + # the only way to do so. Without that, the stream will re-read the test data forever. + for event in w.stream(fake_api.get_configmaps, timeout_seconds=1): + count += 1 + self.assertEqual("MODIFIED", event['type']) + self.assertEqual("test%d" % count, event['object'].metadata.name) + self.assertEqual("© %d" % count, event['object'].data["utf-8"]) + self.assertEqual( + "%d" % count, event['object'].metadata.resource_version) + self.assertEqual("%d" % count, w.resource_version) + self.assertEqual(3, count) + + def test_watch_with_invalid_utf8(self): + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock( + # test 1 uses 1 invalid utf-8 byte + # test 2 uses a sequence of 2 invalid utf-8 bytes + # test 3 uses a sequence of 3 invalid utf-8 bytes + return_value=[ + # utf-8 sequence for 😄 is \xF0\x9F\x98\x84 + # all other sequences below are invalid + # ref: https://www.w3.org/2001/06/utf-8-wrong/UTF-8-test.html + b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0\x9F\x98\x84 1","invalid":"\x80 1"},"metadata":{"name":"test1"}}}\n', + b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0\x9F\x98\x84 2","invalid":"\xC0\xAF 2"},"metadata":{"name":"test2"}}}\n', + # mix bytes/strings and split byte sequences across chunks + b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0\x9F\x98', + b'\x84 ', + b'', + b'3","invalid":"\xE0\x80', + b'\xAF ', + '3"},"metadata":{"n', + 'ame":"test3"', + '}}}', + b'\n' + ]) + + fake_api = Mock() + fake_api.get_configmaps = Mock(return_value=fake_resp) + fake_api.get_configmaps.__doc__ = ':return: V1ConfigMapList' + + w = Watch() + count = 0 + + # Consume all test events from the mock service, stopping when no more data is available. + # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is + # the only way to do so. Without that, the stream will re-read the test data forever. + for event in w.stream(fake_api.get_configmaps, timeout_seconds=1): + count += 1 + self.assertEqual("MODIFIED", event['type']) + self.assertEqual("test%d" % count, event['object'].metadata.name) + self.assertEqual("😄 %d" % count, event['object'].data["utf-8"]) + # expect N replacement characters in test N + self.assertEqual("� %d".replace('�', '�'*count) % + count, event['object'].data["invalid"]) + self.assertEqual(3, count) + def test_watch_for_follow(self): fake_resp = Mock() fake_resp.close = Mock()