Skip to content

Changes made in wacth.py to print Empty newlines that are skipped when watching pod logs. #2372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions kubernetes/base/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,19 @@ def iter_resp_lines(resp):

# Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte)
next_newline = buffer.find(b'\n')
last_was_empty = False # Set empty-line flag
while next_newline != -1:
# Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character
line = buffer[:next_newline].decode(
"utf-8", errors="replace")
buffer = buffer[next_newline+1:]
if line:
yield line
last_was_empty = False # Reset empty-line flag
else:
if not last_was_empty:
yield '' # Only print one empty line
last_was_empty = True # Mark that we handled an empty line
next_newline = buffer.find(b'\n')


Expand Down Expand Up @@ -107,24 +113,29 @@ def get_watch_argument_name(self, func):
return 'watch'

def unmarshal_event(self, data, return_type):
js = json.loads(data)
js['raw_object'] = js['object']
# BOOKMARK event is treated the same as ERROR for a quick fix of
# decoding exception
# TODO: make use of the resource_version in BOOKMARK event for more
# efficient WATCH
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
# For custom objects that we don't have model defined, json
# deserialization results in dictionary
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata'][
'resourceVersion']
return js
if not data or data.isspace():
return None
try:
js = json.loads(data)
js['raw_object'] = js['object']
# BOOKMARK event is treated the same as ERROR for a quick fix of
# decoding exception
# TODO: make use of the resource_version in BOOKMARK event for more
# efficient WATCH
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
# For custom objects that we don't have model defined, json
# deserialization results in dictionary
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata'][
'resourceVersion']
return js
except json.JSONDecodeError:
return None

def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.
Expand Down Expand Up @@ -175,6 +186,7 @@ def stream(self, func, *args, **kwargs):
while True:
resp = func(*args, **kwargs)
try:
last_was_empty = False # Set empty line false
for line in iter_resp_lines(resp):
# unmarshal when we are receiving events from watch,
# return raw string when we are streaming log
Expand All @@ -198,7 +210,12 @@ def stream(self, func, *args, **kwargs):
retry_after_410 = False
yield event
else:
yield line
if line:
yield line # Normal non-empty line
last_was_empty = False
elif not last_was_empty:
yield '' # Only yield one empty line
last_was_empty = True
if self._stop:
break
finally:
Expand Down
84 changes: 82 additions & 2 deletions kubernetes/base/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

import unittest

import os

import time

from unittest.mock import Mock, call

from kubernetes import client
from kubernetes import client,config

from .watch import Watch

from kubernetes.client import ApiException


class WatchTests(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -99,6 +105,9 @@ def test_watch_with_interspersed_newlines(self):
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
# the only way to do so. Without that, the stream will re-read the test data forever.
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
# Here added a statement for exception for empty lines.
if e is None:
continue
count += 1
self.assertEqual("test%d" % count, e['object'].metadata.name)
self.assertEqual(3, count)
Expand Down Expand Up @@ -488,7 +497,78 @@ def test_watch_with_error_event_and_timeout_param(self):
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()


@classmethod
def setUpClass(cls):
cls.api = Mock()
cls.namespace = "default"

def test_pod_log_empty_lines(self):
pod_name = "demo-bug"

try:
self.api.create_namespaced_pod = Mock()
self.api.read_namespaced_pod = Mock()
self.api.delete_namespaced_pod = Mock()
self.api.read_namespaced_pod_log = Mock()

#pod creating step
self.api.create_namespaced_pod.return_value = None

#Checking pod status
mock_pod = Mock()
mock_pod.status.phase = "Running"
self.api.read_namespaced_pod.return_value = mock_pod

# Printing at pod output
self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"])

# Wait for the pod to reach 'Running'
timeout = 60
start_time = time.time()
while time.time() - start_time < timeout:
pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace)
if pod.status.phase == "Running":
break
time.sleep(2)
else:
self.fail("Pod did not reach 'Running' state within timeout")

# Reading and streaming logs using Watch (mocked)
w = Watch()
log_output = []
#Mock logs used for this test
w.stream = Mock(return_value=[
"Hello from Docker",
"", # Empty line
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add another empty line here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added few empty lines to the test case.

"Another log line",
"", # Another empty line
"Final log"
])
for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True):
log_output.append(event)
print(event)

# Print outputs
print(f"Captured logs: {log_output}")
# self.assertTrue(any("Hello from Docker" in line for line in log_output))
# self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs")
expected_log = [
"Hello from Docker",
"",
"Another log line",
"",
"Final log"
]

self.assertEqual(log_output, expected_log, "Captured logs do not match expected logs")

except ApiException as e:
self.fail(f"Kubernetes API exception: {e}")
finally:
#checking pod is calling for delete
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)

if __name__ == '__main__':
unittest.main()