13
13
TypeVar ,
14
14
)
15
15
16
- from libtmux .exc import LibTmuxException
17
- from libtmux .test .retry import WaitTimeout , retry_until
16
+ from libtmux .exc import LibTmuxException , WaitTimeout
17
+ from libtmux .test .retry import retry_until
18
18
19
19
if t .TYPE_CHECKING :
20
20
from libtmux .pane import Pane
@@ -62,7 +62,7 @@ def __init__(self, pane: Pane, timeout: float = 2.0) -> None:
62
62
def _check_content (
63
63
self ,
64
64
predicate : t .Callable [[str ], bool ],
65
- result : WaitResult ,
65
+ result : WaitResult [ str ] ,
66
66
) -> bool :
67
67
"""Check pane content against predicate.
68
68
@@ -88,7 +88,8 @@ def _check_content(
88
88
if predicate (content ):
89
89
result .value = content
90
90
return True
91
- return False
91
+ else :
92
+ return False
92
93
except Exception as e :
93
94
error = WaiterContentError ("Error capturing pane content" )
94
95
error .__cause__ = e
@@ -100,16 +101,16 @@ def wait_for_content(
100
101
timeout_seconds : float | None = None ,
101
102
interval_seconds : float | None = None ,
102
103
error_message : str | None = None ,
103
- ) -> WaitResult :
104
+ ) -> WaitResult [ str ] :
104
105
"""Wait for content in the pane to match a predicate."""
105
- result = WaitResult (success = False , value = None , error = None )
106
+ result = WaitResult [ str ] (success = False , value = None , error = None )
106
107
try :
107
108
# Give the shell a moment to be ready
108
109
time .sleep (0.1 )
109
110
success = retry_until (
110
111
lambda : self ._check_content (predicate , result ),
111
112
seconds = timeout_seconds or self .timeout ,
112
- interval = interval_seconds ,
113
+ interval = interval_seconds or 0.05 ,
113
114
raises = True ,
114
115
)
115
116
result .success = success
@@ -124,11 +125,7 @@ def wait_for_content(
124
125
result .error = e
125
126
result .success = False
126
127
except Exception as e :
127
- if isinstance (e , (WaiterTimeoutError , WaiterContentError )):
128
- result .error = e
129
- else :
130
- result .error = WaiterContentError ("Error capturing pane content" )
131
- result .error .__cause__ = e
128
+ result .error = WaiterTimeoutError (error_message or str (e ))
132
129
result .success = False
133
130
return result
134
131
@@ -137,7 +134,7 @@ def wait_for_prompt(
137
134
prompt : str ,
138
135
timeout_seconds : float | None = None ,
139
136
error_message : str | None = None ,
140
- ) -> WaitResult :
137
+ ) -> WaitResult [ str ] :
141
138
"""Wait for a specific prompt to appear in the pane."""
142
139
return self .wait_for_content (
143
140
lambda content : prompt in content and len (content .strip ()) > 0 ,
@@ -149,11 +146,15 @@ def wait_for_text(
149
146
self ,
150
147
text : str ,
151
148
timeout_seconds : float | None = None ,
149
+ interval_seconds : float | None = None ,
152
150
error_message : str | None = None ,
153
- ) -> WaitResult :
154
- """Wait for specific text to appear in the pane."""
151
+ ) -> WaitResult [str ]:
152
+ """Wait for text to appear in the pane."""
153
+ if error_message is None :
154
+ error_message = f"Text '{ text } ' not found in pane"
155
155
return self .wait_for_content (
156
156
lambda content : text in content ,
157
157
timeout_seconds = timeout_seconds ,
158
- error_message = error_message or f"Text '{ text } ' not found in pane" ,
158
+ interval_seconds = interval_seconds ,
159
+ error_message = error_message ,
159
160
)
0 commit comments