Skip to content

Commit 3f0bce2

Browse files
committed
mypy: partial debug.py and pytracer.py
1 parent ffc701a commit 3f0bce2

File tree

3 files changed

+70
-42
lines changed

3 files changed

+70
-42
lines changed

coverage/control.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def __init__( # pylint: disable=too-many-arguments
225225
data_file = None
226226

227227
# This is injectable by tests.
228-
self._debug_file = None
228+
self._debug_file: Optional[IO[str]] = None
229229

230230
self._auto_load = self._auto_save = auto_data
231231
self._data_suffix_specified = data_suffix

coverage/debug.py

+57-31
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
"""Control of and utilities for debugging."""
55

6+
from __future__ import annotations
7+
68
import contextlib
79
import functools
810
import inspect
@@ -15,7 +17,10 @@
1517
import types
1618
import _thread
1719

18-
from typing import Any, Callable, Iterable, Iterator, Tuple
20+
from typing import (
21+
Any, Callable, Generator, IO, Iterable, Iterator, Optional, List, Tuple,
22+
cast,
23+
)
1924

2025
from coverage.misc import isolate_module
2126

@@ -25,7 +30,7 @@
2530
# When debugging, it can be helpful to force some options, especially when
2631
# debugging the configuration mechanisms you usually use to control debugging!
2732
# This is a list of forced debugging options.
28-
FORCED_DEBUG = []
33+
FORCED_DEBUG: List[str] = []
2934
FORCED_DEBUG_FILE = None
3035

3136

@@ -34,7 +39,7 @@ class DebugControl:
3439

3540
show_repr_attr = False # For AutoReprMixin
3641

37-
def __init__(self, options, output):
42+
def __init__(self, options: Iterable[str], output: Optional[IO[str]]) -> None:
3843
"""Configure the options and output file for debugging."""
3944
self.options = list(options) + FORCED_DEBUG
4045
self.suppress_callers = False
@@ -49,17 +54,17 @@ def __init__(self, options, output):
4954
)
5055
self.raw_output = self.output.outfile
5156

52-
def __repr__(self):
57+
def __repr__(self) -> str:
5358
return f"<DebugControl options={self.options!r} raw_output={self.raw_output!r}>"
5459

55-
def should(self, option):
60+
def should(self, option: str) -> bool:
5661
"""Decide whether to output debug information in category `option`."""
5762
if option == "callers" and self.suppress_callers:
5863
return False
5964
return (option in self.options)
6065

6166
@contextlib.contextmanager
62-
def without_callers(self):
67+
def without_callers(self) -> Generator[None, None, None]:
6368
"""A context manager to prevent call stacks from being logged."""
6469
old = self.suppress_callers
6570
self.suppress_callers = True
@@ -68,7 +73,7 @@ def without_callers(self):
6873
finally:
6974
self.suppress_callers = old
7075

71-
def write(self, msg):
76+
def write(self, msg: str) -> None:
7277
"""Write a line of debug output.
7378
7479
`msg` is the line to write. A newline will be appended.
@@ -86,26 +91,26 @@ def write(self, msg):
8691

8792
class DebugControlString(DebugControl):
8893
"""A `DebugControl` that writes to a StringIO, for testing."""
89-
def __init__(self, options):
94+
def __init__(self, options: Iterable[str]) -> None:
9095
super().__init__(options, io.StringIO())
9196

92-
def get_output(self):
97+
def get_output(self) -> str:
9398
"""Get the output text from the `DebugControl`."""
94-
return self.raw_output.getvalue()
99+
return cast(str, self.raw_output.getvalue())
95100

96101

97102
class NoDebugging:
98103
"""A replacement for DebugControl that will never try to do anything."""
99-
def should(self, option): # pylint: disable=unused-argument
104+
def should(self, option: str) -> bool: # pylint: disable=unused-argument
100105
"""Should we write debug messages? Never."""
101106
return False
102107

103-
def write(self, msg):
108+
def write(self, msg: str) -> None:
104109
"""This will never be called."""
105110
raise AssertionError("NoDebugging.write should never be called.")
106111

107112

108-
def info_header(label):
113+
def info_header(label: str) -> str:
109114
"""Make a nice header string."""
110115
return "--{:-<60s}".format(" "+label+" ")
111116

@@ -155,7 +160,7 @@ def write_formatted_info(
155160
write(f" {line}")
156161

157162

158-
def short_stack(limit=None, skip=0):
163+
def short_stack(limit: Optional[int]=None, skip: int=0) -> str:
159164
"""Return a string summarizing the call stack.
160165
161166
The string is multi-line, with one line per stack frame. Each line shows
@@ -177,29 +182,33 @@ def short_stack(limit=None, skip=0):
177182
return "\n".join("%30s : %s:%d" % (t[3], t[1], t[2]) for t in stack)
178183

179184

180-
def dump_stack_frames(limit=None, out=None, skip=0):
185+
def dump_stack_frames(
186+
limit: Optional[int]=None,
187+
out: Optional[IO[str]]=None,
188+
skip: int=0
189+
) -> None:
181190
"""Print a summary of the stack to stdout, or someplace else."""
182191
out = out or sys.stdout
183192
out.write(short_stack(limit=limit, skip=skip+1))
184193
out.write("\n")
185194

186195

187-
def clipped_repr(text, numchars=50):
196+
def clipped_repr(text: str, numchars: int=50) -> str:
188197
"""`repr(text)`, but limited to `numchars`."""
189198
r = reprlib.Repr()
190199
r.maxstring = numchars
191200
return r.repr(text)
192201

193202

194-
def short_id(id64):
203+
def short_id(id64: int) -> int:
195204
"""Given a 64-bit id, make a shorter 16-bit one."""
196205
id16 = 0
197206
for offset in range(0, 64, 16):
198207
id16 ^= id64 >> offset
199208
return id16 & 0xFFFF
200209

201210

202-
def add_pid_and_tid(text):
211+
def add_pid_and_tid(text: str) -> str:
203212
"""A filter to add pid and tid to debug messages."""
204213
# Thread ids are useful, but too long. Make a shorter one.
205214
tid = f"{short_id(_thread.get_ident()):04x}"
@@ -211,7 +220,7 @@ class AutoReprMixin:
211220
"""A mixin implementing an automatic __repr__ for debugging."""
212221
auto_repr_ignore = ['auto_repr_ignore', '$coverage.object_id']
213222

214-
def __repr__(self):
223+
def __repr__(self) -> str:
215224
show_attrs = (
216225
(k, v) for k, v in self.__dict__.items()
217226
if getattr(v, "show_repr_attr", True)
@@ -225,7 +234,7 @@ def __repr__(self):
225234
)
226235

227236

228-
def simplify(v): # pragma: debugging
237+
def simplify(v: Any) -> Any: # pragma: debugging
229238
"""Turn things which are nearly dict/list/etc into dict/list/etc."""
230239
if isinstance(v, dict):
231240
return {k:simplify(vv) for k, vv in v.items()}
@@ -237,13 +246,13 @@ def simplify(v): # pragma: debugging
237246
return v
238247

239248

240-
def pp(v): # pragma: debugging
249+
def pp(v: Any) -> None: # pragma: debugging
241250
"""Debug helper to pretty-print data, including SimpleNamespace objects."""
242251
# Might not be needed in 3.9+
243252
pprint.pprint(simplify(v))
244253

245254

246-
def filter_text(text, filters):
255+
def filter_text(text: str, filters: Iterable[Callable[[str], str]]) -> str:
247256
"""Run `text` through a series of filters.
248257
249258
`filters` is a list of functions. Each takes a string and returns a
@@ -266,10 +275,10 @@ def filter_text(text, filters):
266275

267276
class CwdTracker: # pragma: debugging
268277
"""A class to add cwd info to debug messages."""
269-
def __init__(self):
270-
self.cwd = None
278+
def __init__(self) -> None:
279+
self.cwd: Optional[str] = None
271280

272-
def filter(self, text):
281+
def filter(self, text: str) -> str:
273282
"""Add a cwd message for each new cwd."""
274283
cwd = os.getcwd()
275284
if cwd != self.cwd:
@@ -280,7 +289,12 @@ def filter(self, text):
280289

281290
class DebugOutputFile: # pragma: debugging
282291
"""A file-like object that includes pid and cwd information."""
283-
def __init__(self, outfile, show_process, filters):
292+
def __init__(
293+
self,
294+
outfile: Optional[IO[str]],
295+
show_process: bool,
296+
filters: Iterable[Callable[[str], str]],
297+
):
284298
self.outfile = outfile
285299
self.show_process = show_process
286300
self.filters = list(filters)
@@ -296,7 +310,13 @@ def __init__(self, outfile, show_process, filters):
296310
SINGLETON_ATTR = 'the_one_and_is_interim'
297311

298312
@classmethod
299-
def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False):
313+
def get_one(
314+
cls,
315+
fileobj: Optional[IO[str]]=None,
316+
show_process: bool=True,
317+
filters: Iterable[Callable[[str], str]]=(),
318+
interim: bool=False,
319+
) -> DebugOutputFile:
300320
"""Get a DebugOutputFile.
301321
302322
If `fileobj` is provided, then a new DebugOutputFile is made with it.
@@ -339,13 +359,15 @@ def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False):
339359
sys.modules[cls.SYS_MOD_NAME] = singleton_module
340360
return the_one
341361

342-
def write(self, text):
362+
def write(self, text: str) -> None:
343363
"""Just like file.write, but filter through all our filters."""
364+
assert self.outfile is not None
344365
self.outfile.write(filter_text(text, self.filters))
345366
self.outfile.flush()
346367

347-
def flush(self):
368+
def flush(self) -> None:
348369
"""Flush our file."""
370+
assert self.outfile is not None
349371
self.outfile.flush()
350372

351373

@@ -388,7 +410,11 @@ def _wrapper(*args, **kwargs):
388410
CALLS = itertools.count()
389411
OBJ_ID_ATTR = "$coverage.object_id"
390412

391-
def show_calls(show_args=True, show_stack=False, show_return=False): # pragma: debugging
413+
def show_calls(
414+
show_args: bool=True,
415+
show_stack: bool=False,
416+
show_return: bool=False,
417+
) -> Callable[..., Any]: # pragma: debugging
392418
"""A method decorator to debug-log each call to the function."""
393419
def _decorator(func):
394420
@functools.wraps(func)
@@ -422,7 +448,7 @@ def _wrapper(self, *args, **kwargs):
422448
return _decorator
423449

424450

425-
def _clean_stack_line(s): # pragma: debugging
451+
def _clean_stack_line(s: str) -> str: # pragma: debugging
426452
"""Simplify some paths in a stack trace, for compactness."""
427453
s = s.strip()
428454
s = s.replace(os.path.dirname(__file__) + '/', '')

coverage/pytracer.py

+12-10
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import sys
99

1010
from types import FrameType
11-
from typing import Any, Callable, Dict, Mapping, Optional
11+
from typing import Any, Callable, Dict, Optional
1212

1313
from coverage import env
1414
from coverage.types import TFileDisposition, TTraceData, TTraceFn, TTracer, TWarnFn
@@ -51,7 +51,7 @@ def __init__(self) -> None:
5151
self.data: TTraceData
5252
self.trace_arcs = False
5353
self.should_trace: Callable[[str, FrameType], TFileDisposition]
54-
self.should_trace_cache: Mapping[str, Optional[TFileDisposition]]
54+
self.should_trace_cache: Dict[str, Optional[TFileDisposition]]
5555
self.should_start_context: Optional[Callable[[FrameType], Optional[str]]] = None
5656
self.switch_context: Optional[Callable[[Optional[str]], None]] = None
5757
self.warn: TWarnFn
@@ -61,8 +61,8 @@ def __init__(self) -> None:
6161

6262
self.cur_file_data = None
6363
self.last_line = 0 # int, but uninitialized.
64-
self.cur_file_name = None
65-
self.context = None
64+
self.cur_file_name: Optional[str] = None
65+
self.context: Optional[str] = None
6666
self.started_context = False
6767

6868
self.data_stack = []
@@ -84,7 +84,7 @@ def __repr__(self) -> str:
8484
files = len(self.data)
8585
return f"<PyTracer at 0x{me:x}: {points} data points in {files} files>"
8686

87-
def log(self, marker, *args) -> None:
87+
def log(self, marker: str, *args: Any) -> None:
8888
"""For hard-core logging of what this tracer is doing."""
8989
with open("/tmp/debug_trace.txt", "a") as f:
9090
f.write("{} {}[{}]".format(
@@ -93,21 +93,21 @@ def log(self, marker, *args) -> None:
9393
len(self.data_stack),
9494
))
9595
if 0: # if you want thread ids..
96-
f.write(".{:x}.{:x}".format(
96+
f.write(".{:x}.{:x}".format( # type: ignore[unreachable]
9797
self.thread.ident,
9898
self.threading.current_thread().ident,
9999
))
100100
f.write(" {}".format(" ".join(map(str, args))))
101101
if 0: # if you want callers..
102-
f.write(" | ")
102+
f.write(" | ") # type: ignore[unreachable]
103103
stack = " / ".join(
104104
(fname or "???").rpartition("/")[-1]
105105
for _, fname, _, _ in self.data_stack
106106
)
107107
f.write(stack)
108108
f.write("\n")
109109

110-
def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
110+
def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> Optional[TTraceFn]:
111111
"""The trace function passed to sys.settrace."""
112112

113113
if THIS_FILE in frame.f_code.co_filename:
@@ -119,8 +119,8 @@ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
119119
# The PyTrace.stop() method has been called, possibly by another
120120
# thread, let's deactivate ourselves now.
121121
if 0:
122-
self.log("---\nX", frame.f_code.co_filename, frame.f_lineno)
123-
f = frame
122+
f = frame # type: ignore[unreachable]
123+
self.log("---\nX", f.f_code.co_filename, f.f_lineno)
124124
while f:
125125
self.log(">", f.f_code.co_filename, f.f_lineno, f.f_code.co_name, f.f_trace)
126126
f = f.f_back
@@ -140,6 +140,7 @@ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
140140
if context_maybe is not None:
141141
self.context = context_maybe
142142
started_context = True
143+
assert self.switch_context is not None
143144
self.switch_context(self.context)
144145
else:
145146
started_context = False
@@ -175,6 +176,7 @@ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
175176
self.cur_file_data = None
176177
if disp.trace:
177178
tracename = disp.source_filename
179+
assert tracename is not None
178180
if tracename not in self.data:
179181
self.data[tracename] = set()
180182
self.cur_file_data = self.data[tracename]

0 commit comments

Comments
 (0)