Skip to content

Changes made in wacth.py to print Empty newlines that are skipped when watching pod logs. #2372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions kubernetes/base/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,19 @@ def iter_resp_lines(resp):

# Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte)
next_newline = buffer.find(b'\n')
last_was_empty = False # Set empty-line flag
while next_newline != -1:
# Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character
line = buffer[:next_newline].decode(
"utf-8", errors="replace")
buffer = buffer[next_newline+1:]
if line:
yield line
last_was_empty = False # Reset empty-line flag
else:
if not last_was_empty:
yield '' # Only print one empty line
last_was_empty = True # Mark that we handled an empty line
next_newline = buffer.find(b'\n')


Expand Down Expand Up @@ -107,24 +113,29 @@ def get_watch_argument_name(self, func):
return 'watch'

def unmarshal_event(self, data, return_type):
js = json.loads(data)
js['raw_object'] = js['object']
# BOOKMARK event is treated the same as ERROR for a quick fix of
# decoding exception
# TODO: make use of the resource_version in BOOKMARK event for more
# efficient WATCH
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
# For custom objects that we don't have model defined, json
# deserialization results in dictionary
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata'][
'resourceVersion']
return js
if not data or data.isspace():
return None
try:
js = json.loads(data)
js['raw_object'] = js['object']
# BOOKMARK event is treated the same as ERROR for a quick fix of
# decoding exception
# TODO: make use of the resource_version in BOOKMARK event for more
# efficient WATCH
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
# For custom objects that we don't have model defined, json
# deserialization results in dictionary
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata'][
'resourceVersion']
return js
except json.JSONDecodeError:
return None

def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.
Expand Down Expand Up @@ -175,6 +186,7 @@ def stream(self, func, *args, **kwargs):
while True:
resp = func(*args, **kwargs)
try:
last_was_empty = False # Set empty line false
for line in iter_resp_lines(resp):
# unmarshal when we are receiving events from watch,
# return raw string when we are streaming log
Expand All @@ -198,7 +210,12 @@ def stream(self, func, *args, **kwargs):
retry_after_410 = False
yield event
else:
yield line
if line:
yield line # Normal non-empty line
last_was_empty = False
elif not last_was_empty:
yield '' # Only yield one empty line
last_was_empty = True
if self._stop:
break
finally:
Expand Down
86 changes: 85 additions & 1 deletion kubernetes/base/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

import unittest

import os

import time

from unittest.mock import Mock, call

from kubernetes import client
from kubernetes import client,config

from .watch import Watch

from kubernetes.client import ApiException


class WatchTests(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -99,6 +105,9 @@ def test_watch_with_interspersed_newlines(self):
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
# the only way to do so. Without that, the stream will re-read the test data forever.
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
# Here added a statement for exception for empty lines.
if e is None:
continue
count += 1
self.assertEqual("test%d" % count, e['object'].metadata.name)
self.assertEqual(3, count)
Expand Down Expand Up @@ -488,7 +497,82 @@ def test_watch_with_error_event_and_timeout_param(self):
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

@classmethod
def setUpClass(cls):
cls.api = Mock()
cls.namespace = "default"

def test_pod_log_empty_lines(self):
pod_name = "demo-bug"
# Manifest with busybax to keep pod engaged for sometiem
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: sometiem ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will adjust

pod_manifest = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is pod_manifest used?

Copy link
Contributor Author

@p172913 p172913 Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used minikube before replacing it with mock function. After checking locally forgot to delete it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just deleted pod_manifest from watch_test.py.

"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": pod_name},
"spec": {
"containers": [{
"image": "busybox",
"name": "my-container",
"command": ["sh", "-c", "while true; do echo Hello from Docker ; sleep 10; done"]
}]
},
}

try:
self.api.create_namespaced_pod = Mock()
self.api.read_namespaced_pod = Mock()
self.api.delete_namespaced_pod = Mock()
self.api.read_namespaced_pod_log = Mock()

#pod creating step
self.api.create_namespaced_pod.return_value = None

#Checking pod status
mock_pod = Mock()
mock_pod.status.phase = "Running"
self.api.read_namespaced_pod.return_value = mock_pod

# Printing at pod output
self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"])

# Wait for the pod to reach 'Running'
timeout = 60
start_time = time.time()
while time.time() - start_time < timeout:
pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace)
if pod.status.phase == "Running":
break
time.sleep(2)
else:
self.fail("Pod did not reach 'Running' state within timeout")

# Reading and streaming logs using Watch (mocked)
w = Watch()
log_output = []
#Mock logs used for this test
w.stream = Mock(return_value=[
"Hello from Docker",
"\n", # Empty line
"Another log line",
"\n", # Another empty line
"Final log"
])
for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True):
log_output.append(event)
print(event)

# Print outputs
print(f"Captured logs: {log_output}")
# self.assertTrue(any("Hello from Docker" in line for line in log_output))
self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add an assertion that ensures the log_output has exactly what is expected, the whole log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed assertive statement to make sure entire output logs are verified.


except ApiException as e:
self.fail(f"Kubernetes API exception: {e}")
finally:
#checking pod is calling for delete
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)

if __name__ == '__main__':
unittest.main()