-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Changes made in wacth.py to print Empty newlines that are skipped when watching pod logs. #2372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
d1adc8a
b2f9755
3cee537
14a2554
f0a73c8
1268769
1e093d0
f4d0842
d451d2f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,12 +14,18 @@ | |
|
||
import unittest | ||
|
||
import os | ||
|
||
import time | ||
|
||
from unittest.mock import Mock, call | ||
|
||
from kubernetes import client | ||
from kubernetes import client,config | ||
|
||
from .watch import Watch | ||
|
||
from kubernetes.client import ApiException | ||
|
||
|
||
class WatchTests(unittest.TestCase): | ||
def setUp(self): | ||
|
@@ -99,6 +105,9 @@ def test_watch_with_interspersed_newlines(self): | |
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is | ||
# the only way to do so. Without that, the stream will re-read the test data forever. | ||
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1): | ||
# Here added a statement for exception for empty lines. | ||
if e is None: | ||
continue | ||
count += 1 | ||
self.assertEqual("test%d" % count, e['object'].metadata.name) | ||
self.assertEqual(3, count) | ||
|
@@ -488,7 +497,82 @@ def test_watch_with_error_event_and_timeout_param(self): | |
amt=None, decode_content=False) | ||
fake_resp.close.assert_called_once() | ||
fake_resp.release_conn.assert_called_once() | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls.api = Mock() | ||
cls.namespace = "default" | ||
|
||
def test_pod_log_empty_lines(self): | ||
pod_name = "demo-bug" | ||
# Manifest with busybax to keep pod engaged for sometiem | ||
pod_manifest = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where is pod_manifest used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used minikube before replacing it with mock function. After checking locally forgot to delete it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just deleted pod_manifest from watch_test.py. |
||
"apiVersion": "v1", | ||
"kind": "Pod", | ||
"metadata": {"name": pod_name}, | ||
"spec": { | ||
"containers": [{ | ||
"image": "busybox", | ||
"name": "my-container", | ||
"command": ["sh", "-c", "while true; do echo Hello from Docker ; sleep 10; done"] | ||
}] | ||
}, | ||
} | ||
|
||
try: | ||
self.api.create_namespaced_pod = Mock() | ||
self.api.read_namespaced_pod = Mock() | ||
self.api.delete_namespaced_pod = Mock() | ||
self.api.read_namespaced_pod_log = Mock() | ||
|
||
#pod creating step | ||
self.api.create_namespaced_pod.return_value = None | ||
|
||
#Checking pod status | ||
mock_pod = Mock() | ||
mock_pod.status.phase = "Running" | ||
self.api.read_namespaced_pod.return_value = mock_pod | ||
|
||
# Printing at pod output | ||
self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"]) | ||
|
||
# Wait for the pod to reach 'Running' | ||
timeout = 60 | ||
start_time = time.time() | ||
while time.time() - start_time < timeout: | ||
pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace) | ||
if pod.status.phase == "Running": | ||
break | ||
time.sleep(2) | ||
else: | ||
self.fail("Pod did not reach 'Running' state within timeout") | ||
|
||
# Reading and streaming logs using Watch (mocked) | ||
w = Watch() | ||
log_output = [] | ||
#Mock logs used for this test | ||
w.stream = Mock(return_value=[ | ||
"Hello from Docker", | ||
"\n", # Empty line | ||
"Another log line", | ||
"\n", # Another empty line | ||
"Final log" | ||
]) | ||
for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): | ||
log_output.append(event) | ||
print(event) | ||
|
||
# Print outputs | ||
print(f"Captured logs: {log_output}") | ||
# self.assertTrue(any("Hello from Docker" in line for line in log_output)) | ||
self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you add an assertion that ensures the log_output has exactly what is expected, the whole log? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed assertive statement to make sure entire output logs are verified. |
||
|
||
except ApiException as e: | ||
self.fail(f"Kubernetes API exception: {e}") | ||
finally: | ||
#checking pod is calling for delete | ||
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) | ||
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) | ||
|
||
if __name__ == '__main__': | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: sometiem ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will adjust