diff --git a/git/cmd.py b/git/cmd.py index 7df855817..dd887a18b 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -831,7 +831,7 @@ def execute(self, except cmd_not_found_exception as err: raise GitCommandNotFound(redacted_command, err) from err else: - proc = cast(Popen, proc) + # replace with a typeguard for Popen[bytes]? proc.stdout = cast(BinaryIO, proc.stdout) proc.stderr = cast(BinaryIO, proc.stderr) diff --git a/git/config.py b/git/config.py index 5c5ceea80..2c863f938 100644 --- a/git/config.py +++ b/git/config.py @@ -31,12 +31,14 @@ # typing------------------------------------------------------- -from typing import Any, Callable, IO, List, Dict, Sequence, TYPE_CHECKING, Tuple, Union, cast, overload +from typing import (Any, Callable, IO, List, Dict, Sequence, + TYPE_CHECKING, Tuple, Union, cast, overload) -from git.types import Literal, Lit_config_levels, PathLike, TBD +from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, TBD, assert_never, is_config_level if TYPE_CHECKING: from git.repo.base import Repo + from io import BytesIO # ------------------------------------------------------------- @@ -48,8 +50,10 @@ # invariants # represents the configuration level of a configuration file -CONFIG_LEVELS = ("system", "user", "global", "repository" - ) # type: Tuple[Literal['system'], Literal['user'], Literal['global'], Literal['repository']] + + +CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository") + # Section pattern to detect conditional includes. # https://git-scm.com/docs/git-config#_conditional_includes @@ -229,8 +233,9 @@ def get_config_path(config_level: Lit_config_levels) -> str: return osp.normpath(osp.expanduser("~/.gitconfig")) elif config_level == "repository": raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") - - raise ValueError("Invalid configuration level: %r" % config_level) + else: + # Should not reach here. Will raise ValueError if does. Static typing will warn missing elifs + assert_never(config_level, ValueError(f"Invalid configuration level: {config_level!r}")) class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, object)): # type: ignore ## mypy does not understand dynamic class creation # noqa: E501 @@ -271,7 +276,7 @@ class GitConfigParser(with_metaclass(MetaParserBuilder, cp.RawConfigParser, obje # list of RawConfigParser methods able to change the instance _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set") - def __init__(self, file_or_files: Union[None, PathLike, IO, Sequence[Union[PathLike, IO]]] = None, + def __init__(self, file_or_files: Union[None, PathLike, 'BytesIO', Sequence[Union[PathLike, 'BytesIO']]] = None, read_only: bool = True, merge_includes: bool = True, config_level: Union[Lit_config_levels, None] = None, repo: Union['Repo', None] = None) -> None: @@ -300,13 +305,13 @@ def __init__(self, file_or_files: Union[None, PathLike, IO, Sequence[Union[PathL self._proxies = self._dict() if file_or_files is not None: - self._file_or_files = file_or_files # type: Union[PathLike, IO, Sequence[Union[PathLike, IO]]] + self._file_or_files: Union[PathLike, 'BytesIO', Sequence[Union[PathLike, 'BytesIO']]] = file_or_files else: if config_level is None: if read_only: - self._file_or_files = [get_config_path(f) # type: ignore - for f in CONFIG_LEVELS # Can type f properly when 3.5 dropped - if f != 'repository'] + self._file_or_files = [get_config_path(f) + for f in CONFIG_LEVELS + if is_config_level(f) and f != 'repository'] else: raise ValueError("No configuration level or configuration files specified") else: @@ -323,15 +328,13 @@ def __init__(self, file_or_files: Union[None, PathLike, IO, Sequence[Union[PathL def _acquire_lock(self) -> None: if not self._read_only: if not self._lock: - if isinstance(self._file_or_files, (tuple, list)): - raise ValueError( - "Write-ConfigParsers can operate on a single file only, multiple files have been passed") - # END single file check - if isinstance(self._file_or_files, (str, os.PathLike)): file_or_files = self._file_or_files + elif isinstance(self._file_or_files, (tuple, list, Sequence)): + raise ValueError( + "Write-ConfigParsers can operate on a single file only, multiple files have been passed") else: - file_or_files = cast(IO, self._file_or_files).name + file_or_files = self._file_or_files.name # END get filename from handle/stream # initialize lock base - we want to write @@ -649,7 +652,7 @@ def write(self) -> None: a file lock""" self._assure_writable("write") if not self._dirty: - return + return None if isinstance(self._file_or_files, (list, tuple)): raise AssertionError("Cannot write back if there is not exactly a single file to write to, have %i files" @@ -665,7 +668,7 @@ def write(self) -> None: fp = self._file_or_files # we have a physical file on disk, so get a lock - is_file_lock = isinstance(fp, (str, IOBase)) # can't use Pathlike until 3.5 dropped + is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) # can't use Pathlike until 3.5 dropped if is_file_lock and self._lock is not None: # else raise Error? self._lock._obtain_lock() @@ -674,7 +677,7 @@ def write(self) -> None: with open(fp, "wb") as fp_open: self._write(fp_open) else: - fp = cast(IO, fp) + fp = cast('BytesIO', fp) fp.seek(0) # make sure we do not overwrite into an existing file if hasattr(fp, 'truncate'): diff --git a/git/diff.py b/git/diff.py index 346a2ca7b..51dac3909 100644 --- a/git/diff.py +++ b/git/diff.py @@ -15,19 +15,26 @@ # typing ------------------------------------------------------------------ -from typing import Any, Iterator, List, Match, Optional, Tuple, Type, Union, TYPE_CHECKING -from git.types import PathLike, TBD, Literal +from typing import Any, Iterator, List, Match, Optional, Tuple, Type, TypeVar, Union, TYPE_CHECKING +from git.types import PathLike, TBD, Literal, TypeGuard if TYPE_CHECKING: from .objects.tree import Tree + from .objects import Commit from git.repo.base import Repo - + from git.objects.base import IndexObject from subprocess import Popen -Lit_change_type = Literal['A', 'D', 'M', 'R', 'T'] +Lit_change_type = Literal['A', 'D', 'C', 'M', 'R', 'T', 'U'] + + +def is_change_type(inp: str) -> TypeGuard[Lit_change_type]: + # return True + return inp in ['A', 'D', 'C', 'M', 'R', 'T', 'U'] # ------------------------------------------------------------------------ + __all__ = ('Diffable', 'DiffIndex', 'Diff', 'NULL_TREE') # Special object to compare against the empty tree in diffs @@ -75,7 +82,8 @@ class Diffable(object): class Index(object): pass - def _process_diff_args(self, args: List[Union[str, 'Diffable', object]]) -> List[Union[str, 'Diffable', object]]: + def _process_diff_args(self, args: List[Union[str, 'Diffable', Type['Diffable.Index'], object]] + ) -> List[Union[str, 'Diffable', Type['Diffable.Index'], object]]: """ :return: possibly altered version of the given args list. @@ -83,7 +91,7 @@ def _process_diff_args(self, args: List[Union[str, 'Diffable', object]]) -> List Subclasses can use it to alter the behaviour of the superclass""" return args - def diff(self, other: Union[Type[Index], Type['Tree'], object, None, str] = Index, + def diff(self, other: Union[Type['Index'], 'Tree', 'Commit', None, str, object] = Index, paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None, create_patch: bool = False, **kwargs: Any) -> 'DiffIndex': """Creates diffs between two items being trees, trees and index or an @@ -116,7 +124,7 @@ def diff(self, other: Union[Type[Index], Type['Tree'], object, None, str] = Inde :note: On a bare repository, 'other' needs to be provided as Index or as as Tree/Commit, or a git command error will occur""" - args = [] # type: List[Union[str, Diffable, object]] + args: List[Union[PathLike, Diffable, Type['Diffable.Index'], object]] = [] args.append("--abbrev=40") # we need full shas args.append("--full-index") # get full index paths, not only filenames @@ -134,8 +142,8 @@ def diff(self, other: Union[Type[Index], Type['Tree'], object, None, str] = Inde if paths is not None and not isinstance(paths, (tuple, list)): paths = [paths] - if hasattr(self, 'repo'): # else raise Error? - self.repo = self.repo # type: 'Repo' + if hasattr(self, 'Has_Repo'): + self.repo: Repo = self.repo diff_cmd = self.repo.git.diff if other is self.Index: @@ -169,7 +177,10 @@ def diff(self, other: Union[Type[Index], Type['Tree'], object, None, str] = Inde return index -class DiffIndex(list): +T_Diff = TypeVar('T_Diff', bound='Diff') + + +class DiffIndex(List[T_Diff]): """Implements an Index for diffs, allowing a list of Diffs to be queried by the diff properties. @@ -183,7 +194,7 @@ class DiffIndex(list): # T = Changed in the type change_type = ("A", "C", "D", "R", "M", "T") - def iter_change_type(self, change_type: Lit_change_type) -> Iterator['Diff']: + def iter_change_type(self, change_type: Lit_change_type) -> Iterator[T_Diff]: """ :return: iterator yielding Diff instances that match the given change_type @@ -200,19 +211,19 @@ def iter_change_type(self, change_type: Lit_change_type) -> Iterator['Diff']: if change_type not in self.change_type: raise ValueError("Invalid change type: %s" % change_type) - for diff in self: # type: 'Diff' - if diff.change_type == change_type: - yield diff - elif change_type == "A" and diff.new_file: - yield diff - elif change_type == "D" and diff.deleted_file: - yield diff - elif change_type == "C" and diff.copied_file: - yield diff - elif change_type == "R" and diff.renamed: - yield diff - elif change_type == "M" and diff.a_blob and diff.b_blob and diff.a_blob != diff.b_blob: - yield diff + for diffidx in self: + if diffidx.change_type == change_type: + yield diffidx + elif change_type == "A" and diffidx.new_file: + yield diffidx + elif change_type == "D" and diffidx.deleted_file: + yield diffidx + elif change_type == "C" and diffidx.copied_file: + yield diffidx + elif change_type == "R" and diffidx.renamed: + yield diffidx + elif change_type == "M" and diffidx.a_blob and diffidx.b_blob and diffidx.a_blob != diffidx.b_blob: + yield diffidx # END for each diff @@ -281,7 +292,7 @@ def __init__(self, repo: 'Repo', a_mode: Union[bytes, str, None], b_mode: Union[bytes, str, None], new_file: bool, deleted_file: bool, copied_file: bool, raw_rename_from: Optional[bytes], raw_rename_to: Optional[bytes], - diff: Union[str, bytes, None], change_type: Optional[str], score: Optional[int]) -> None: + diff: Union[str, bytes, None], change_type: Optional[Lit_change_type], score: Optional[int]) -> None: assert a_rawpath is None or isinstance(a_rawpath, bytes) assert b_rawpath is None or isinstance(b_rawpath, bytes) @@ -300,19 +311,21 @@ def __init__(self, repo: 'Repo', repo = submodule.module() break + self.a_blob: Union['IndexObject', None] if a_blob_id is None or a_blob_id == self.NULL_HEX_SHA: self.a_blob = None else: self.a_blob = Blob(repo, hex_to_bin(a_blob_id), mode=self.a_mode, path=self.a_path) + self.b_blob: Union['IndexObject', None] if b_blob_id is None or b_blob_id == self.NULL_HEX_SHA: self.b_blob = None else: self.b_blob = Blob(repo, hex_to_bin(b_blob_id), mode=self.b_mode, path=self.b_path) - self.new_file = new_file - self.deleted_file = deleted_file - self.copied_file = copied_file + self.new_file: bool = new_file + self.deleted_file: bool = deleted_file + self.copied_file: bool = copied_file # be clear and use None instead of empty strings assert raw_rename_from is None or isinstance(raw_rename_from, bytes) @@ -321,7 +334,7 @@ def __init__(self, repo: 'Repo', self.raw_rename_to = raw_rename_to or None self.diff = diff - self.change_type = change_type + self.change_type: Union[Lit_change_type, None] = change_type self.score = score def __eq__(self, other: object) -> bool: @@ -386,36 +399,36 @@ def __str__(self) -> str: # end return res - @property + @ property def a_path(self) -> Optional[str]: return self.a_rawpath.decode(defenc, 'replace') if self.a_rawpath else None - @property + @ property def b_path(self) -> Optional[str]: return self.b_rawpath.decode(defenc, 'replace') if self.b_rawpath else None - @property + @ property def rename_from(self) -> Optional[str]: return self.raw_rename_from.decode(defenc, 'replace') if self.raw_rename_from else None - @property + @ property def rename_to(self) -> Optional[str]: return self.raw_rename_to.decode(defenc, 'replace') if self.raw_rename_to else None - @property + @ property def renamed(self) -> bool: """:returns: True if the blob of our diff has been renamed :note: This property is deprecated, please use ``renamed_file`` instead. """ return self.renamed_file - @property + @ property def renamed_file(self) -> bool: """:returns: True if the blob of our diff has been renamed """ return self.rename_from != self.rename_to - @classmethod + @ classmethod def _pick_best_path(cls, path_match: bytes, rename_match: bytes, path_fallback_match: bytes) -> Optional[bytes]: if path_match: return decode_path(path_match) @@ -428,7 +441,7 @@ def _pick_best_path(cls, path_match: bytes, rename_match: bytes, path_fallback_m return None - @classmethod + @ classmethod def _index_from_patch_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex: """Create a new DiffIndex from the given text which must be in patch format :param repo: is the repository we are operating on - it is required @@ -441,7 +454,7 @@ def _index_from_patch_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex: # for now, we have to bake the stream text = b''.join(text_list) - index = DiffIndex() + index: 'DiffIndex' = DiffIndex() previous_header = None header = None a_path, b_path = None, None # for mypy @@ -491,19 +504,21 @@ def _index_from_patch_format(cls, repo: 'Repo', proc: TBD) -> DiffIndex: return index - @staticmethod + @ staticmethod def _handle_diff_line(lines_bytes: bytes, repo: 'Repo', index: DiffIndex) -> None: lines = lines_bytes.decode(defenc) for line in lines.split(':')[1:]: meta, _, path = line.partition('\x00') path = path.rstrip('\x00') - a_blob_id, b_blob_id = None, None # Type: Optional[str] + a_blob_id: Optional[str] + b_blob_id: Optional[str] old_mode, new_mode, a_blob_id, b_blob_id, _change_type = meta.split(None, 4) # Change type can be R100 # R: status letter # 100: score (in case of copy and rename) - change_type = _change_type[0] + assert is_change_type(_change_type[0]), f"Unexpected value for change_type received: {_change_type[0]}" + change_type: Lit_change_type = _change_type[0] score_str = ''.join(_change_type[1:]) score = int(score_str) if score_str.isdigit() else None path = path.strip() @@ -543,14 +558,14 @@ def _handle_diff_line(lines_bytes: bytes, repo: 'Repo', index: DiffIndex) -> Non '', change_type, score) index.append(diff) - @classmethod + @ classmethod def _index_from_raw_format(cls, repo: 'Repo', proc: 'Popen') -> 'DiffIndex': """Create a new DiffIndex from the given stream which must be in raw format. :return: git.DiffIndex""" # handles # :100644 100644 687099101... 37c5e30c8... M .gitignore - index = DiffIndex() + index: 'DiffIndex' = DiffIndex() handle_process_output(proc, lambda byt: cls._handle_diff_line(byt, repo, index), None, finalize_process, decode_streams=False) diff --git a/git/index/base.py b/git/index/base.py index f4ffba7b9..3aa06e381 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -18,6 +18,7 @@ from git.exc import ( GitCommandError, CheckoutError, + GitError, InvalidGitRepositoryError ) from git.objects import ( @@ -40,7 +41,7 @@ from gitdb.base import IStream from gitdb.db import MemoryDB -import git.diff as diff +import git.diff as git_diff import os.path as osp from .fun import ( @@ -66,10 +67,10 @@ # typing ----------------------------------------------------------------------------- -from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, - Sequence, TYPE_CHECKING, Tuple, Union) +from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, NoReturn, + Sequence, TYPE_CHECKING, Tuple, Type, Union) -from git.types import PathLike, TBD +from git.types import Commit_ish, PathLike, TBD if TYPE_CHECKING: from subprocess import Popen @@ -87,7 +88,7 @@ __all__ = ('IndexFile', 'CheckoutError') -class IndexFile(LazyMixin, diff.Diffable, Serializable): +class IndexFile(LazyMixin, git_diff.Diffable, Serializable): """ Implements an Index that can be manipulated using a native implementation in @@ -113,7 +114,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): _VERSION = 2 # latest version we support S_IFGITLINK = S_IFGITLINK # a submodule - def __init__(self, repo: 'Repo', file_path: PathLike = None) -> None: + def __init__(self, repo: 'Repo', file_path: Union[PathLike, None] = None) -> None: """Initialize this Index instance, optionally from the given ``file_path``. If no file_path is given, we will be created from the current index file. @@ -372,13 +373,13 @@ def from_tree(cls, repo: 'Repo', *treeish: Treeish, **kwargs: Any) -> 'IndexFile # UTILITIES @unbare_repo - def _iter_expand_paths(self, paths: Sequence[PathLike]) -> Iterator[PathLike]: + def _iter_expand_paths(self: 'IndexFile', paths: Sequence[PathLike]) -> Iterator[PathLike]: """Expand the directories in list of paths to the corresponding paths accordingly, Note: git will add items multiple times even if a glob overlapped with manually specified paths or if paths where specified multiple times - we respect that and do not prune""" - def raise_exc(e): + def raise_exc(e: Exception) -> NoReturn: raise e r = str(self.repo.working_tree_dir) rs = r + os.sep @@ -410,7 +411,7 @@ def raise_exc(e): # whose name contains wildcard characters. if abs_path not in resolved_paths: for f in self._iter_expand_paths(glob.glob(abs_path)): - yield f.replace(rs, '') + yield str(f).replace(rs, '') continue # END glob handling try: @@ -426,7 +427,8 @@ def raise_exc(e): # END path exception handling # END for each path - def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item, fmakeexc, fprogress, + def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item: TBD, fmakeexc: Callable[..., GitError], + fprogress: Callable[[PathLike, bool, TBD], None], read_from_stdout: bool = True) -> Union[None, str]: """Write path to proc.stdin and make sure it processes the item, including progress. @@ -498,7 +500,7 @@ def unmerged_blobs(self) -> Dict[PathLike, List[Tuple[StageType, Blob]]]: line.sort() return path_map - @classmethod + @ classmethod def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]: return entry_key(*entry) @@ -570,11 +572,12 @@ def write_tree(self) -> Tree: # note: additional deserialization could be saved if write_tree_from_cache # would return sorted tree entries root_tree = Tree(self.repo, binsha, path='') - root_tree._cache = tree_items # type: ignore + root_tree._cache = tree_items # type: ignore # should this be encoded to [bytes, int, str]? return root_tree - def _process_diff_args(self, args: List[Union[str, diff.Diffable, object]] - ) -> List[Union[str, diff.Diffable, object]]: + def _process_diff_args(self, # type: ignore[override] + args: List[Union[str, 'git_diff.Diffable', Type['git_diff.Diffable.Index']]] + ) -> List[Union[str, 'git_diff.Diffable', Type['git_diff.Diffable.Index']]]: try: args.pop(args.index(self)) except IndexError: @@ -593,7 +596,7 @@ def _to_relative_path(self, path: PathLike) -> PathLike: raise ValueError("Absolute path %r is not in git repository at %r" % (path, self.repo.working_tree_dir)) return os.path.relpath(path, self.repo.working_tree_dir) - def _preprocess_add_items(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]] + def _preprocess_add_items(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']] ) -> Tuple[List[PathLike], List[BaseIndexEntry]]: """ Split the items into two lists of path strings and BaseEntries. """ paths = [] @@ -631,11 +634,11 @@ def _store_path(self, filepath: PathLike, fprogress: Callable) -> BaseIndexEntry return BaseIndexEntry((stat_mode_to_index_mode(st.st_mode), istream.binsha, 0, to_native_path_linux(filepath))) - @unbare_repo - @git_working_dir + @ unbare_repo + @ git_working_dir def _entries_for_paths(self, paths: List[str], path_rewriter: Callable, fprogress: Callable, entries: List[BaseIndexEntry]) -> List[BaseIndexEntry]: - entries_added = [] # type: List[BaseIndexEntry] + entries_added: List[BaseIndexEntry] = [] if path_rewriter: for path in paths: if osp.isabs(path): @@ -664,8 +667,8 @@ def _entries_for_paths(self, paths: List[str], path_rewriter: Callable, fprogres # END path handling return entries_added - def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], force: bool = True, - fprogress: Callable = lambda *args: None, path_rewriter: Callable = None, + def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], force: bool = True, + fprogress: Callable = lambda *args: None, path_rewriter: Union[Callable[..., PathLike], None] = None, write: bool = True, write_extension_data: bool = False) -> List[BaseIndexEntry]: """Add files from the working tree, specific blobs or BaseIndexEntries to the index. @@ -769,7 +772,7 @@ def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], # automatically # paths can be git-added, for everything else we use git-update-index paths, entries = self._preprocess_add_items(items) - entries_added = [] + entries_added: List[BaseIndexEntry] = [] # This code needs a working tree, therefore we try not to run it unless required. # That way, we are OK on a bare repository as well. # If there are no paths, the rewriter has nothing to do either @@ -788,8 +791,8 @@ def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], # create objects if required, otherwise go with the existing shas null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA] if null_entries_indices: - @git_working_dir - def handle_null_entries(self): + @ git_working_dir + def handle_null_entries(self: 'IndexFile') -> None: for ei in null_entries_indices: null_entry = entries[ei] new_entry = self._store_path(null_entry.path, fprogress) @@ -833,12 +836,13 @@ def handle_null_entries(self): return entries_added - def _items_to_rela_paths(self, items): + def _items_to_rela_paths(self, items: Union[PathLike, Sequence[Union[PathLike, BaseIndexEntry, Blob, Submodule]]] + ) -> List[PathLike]: """Returns a list of repo-relative paths from the given items which may be absolute or relative paths, entries or blobs""" paths = [] # if string put in list - if isinstance(items, str): + if isinstance(items, (str, os.PathLike)): items = [items] for item in items: @@ -851,9 +855,9 @@ def _items_to_rela_paths(self, items): # END for each item return paths - @post_clear_cache - @default_index - def remove(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], working_tree: bool = False, + @ post_clear_cache + @ default_index + def remove(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], working_tree: bool = False, **kwargs: Any) -> List[str]: """Remove the given items from the index and optionally from the working tree as well. @@ -903,9 +907,9 @@ def remove(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule # rm 'path' return [p[4:-1] for p in removed_paths] - @post_clear_cache - @default_index - def move(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], skip_errors: bool = False, + @ post_clear_cache + @ default_index + def move(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], skip_errors: bool = False, **kwargs: Any) -> List[Tuple[str, str]]: """Rename/move the items, whereas the last item is considered the destination of the move operation. If the destination is a file, the first item ( of two ) @@ -968,8 +972,14 @@ def move(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]] return out - def commit(self, message: str, parent_commits=None, head: bool = True, author: Union[None, 'Actor'] = None, - committer: Union[None, 'Actor'] = None, author_date: str = None, commit_date: str = None, + def commit(self, + message: str, + parent_commits: Union[Commit_ish, None] = None, + head: bool = True, + author: Union[None, 'Actor'] = None, + committer: Union[None, 'Actor'] = None, + author_date: Union[str, None] = None, + commit_date: Union[str, None] = None, skip_hooks: bool = False) -> Commit: """Commit the current default index file, creating a commit object. For more information on the arguments, see tree.commit. @@ -1023,7 +1033,7 @@ def _flush_stdin_and_wait(cls, proc: 'Popen[bytes]', ignore_stdout: bool = False proc.wait() return stdout - @default_index + @ default_index def checkout(self, paths: Union[None, Iterable[PathLike]] = None, force: bool = False, fprogress: Callable = lambda *args: None, **kwargs: Any ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]: @@ -1192,7 +1202,7 @@ def handle_stderr(proc: 'Popen[bytes]', iter_checked_out_files: Iterable[PathLik # END paths handling assert "Should not reach this point" - @default_index + @ default_index def reset(self, commit: Union[Commit, 'Reference', str] = 'HEAD', working_tree: bool = False, paths: Union[None, Iterable[PathLike]] = None, head: bool = False, **kwargs: Any) -> 'IndexFile': @@ -1262,10 +1272,12 @@ def reset(self, commit: Union[Commit, 'Reference', str] = 'HEAD', working_tree: return self - @default_index - def diff(self, other: Union[diff.Diffable.Index, 'IndexFile.Index', Treeish, None, object] = diff.Diffable.Index, - paths: Union[str, List[PathLike], Tuple[PathLike, ...]] = None, create_patch: bool = False, **kwargs: Any - ) -> diff.DiffIndex: + # @ default_index, breaks typing for some reason, copied into function + def diff(self, # type: ignore[override] + other: Union[Type['git_diff.Diffable.Index'], 'Tree', 'Commit', str, None] = git_diff.Diffable.Index, + paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None, + create_patch: bool = False, **kwargs: Any + ) -> git_diff.DiffIndex: """Diff this index against the working copy or a Tree or Commit object For a documentation of the parameters and return values, see, @@ -1275,9 +1287,14 @@ def diff(self, other: Union[diff.Diffable.Index, 'IndexFile.Index', Treeish, Non Will only work with indices that represent the default git index as they have not been initialized with a stream. """ + + # only run if we are the default repository index + if self._file_path != self._index_path(): + raise AssertionError( + "Cannot call %r on indices that do not represent the default git index" % self.diff()) # index against index is always empty if other is self.Index: - return diff.DiffIndex() + return git_diff.DiffIndex() # index against anything but None is a reverse diff with the respective # item. Handle existing -R flags properly. Transform strings to the object diff --git a/git/index/fun.py b/git/index/fun.py index ffd109b1f..e5e566a05 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -57,6 +57,8 @@ if TYPE_CHECKING: from .base import IndexFile + from git.objects.tree import TreeCacheTup + # from git.objects.fun import EntryTupOrNone # ------------------------------------------------------------------------------------ @@ -186,16 +188,15 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i """:return: Key suitable to be used for the index.entries dictionary :param entry: One instance of type BaseIndexEntry or the path and the stage""" - def is_entry_tuple(entry: Tuple) -> TypeGuard[Tuple[PathLike, int]]: - return isinstance(entry, tuple) and len(entry) == 2 - + def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]: + return isinstance(entry_key, tuple) and len(entry_key) == 2 + if len(entry) == 1: entry_first = entry[0] assert isinstance(entry_first, BaseIndexEntry) return (entry_first.path, entry_first.stage) else: - # entry = tuple(entry) - assert is_entry_tuple(entry) + assert is_entry_key_tup(entry) return entry # END handle entry @@ -249,7 +250,7 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 - ) -> Tuple[bytes, List[Tuple[str, int, str]]]: + ) -> Tuple[bytes, List['TreeCacheTup']]: """Create a tree from the given sorted list of entries and put the respective trees into the given object database @@ -259,8 +260,8 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 :param sl: slice indicating the range we should process on the entries list :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name""" - tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]] - tree_items_append = tree_items.append + tree_items: List['TreeCacheTup'] = [] + ci = sl.start end = sl.stop while ci < end: @@ -272,7 +273,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 rbound = entry.path.find('/', si) if rbound == -1: # its not a tree - tree_items_append((entry.binsha, entry.mode, entry.path[si:])) + tree_items.append((entry.binsha, entry.mode, entry.path[si:])) else: # find common base range base = entry.path[si:rbound] @@ -289,7 +290,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 # enter recursion # ci - 1 as we want to count our current item as well sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1) - tree_items_append((sha, S_IFDIR, base)) + tree_items.append((sha, S_IFDIR, base)) # skip ahead ci = xi @@ -298,15 +299,14 @@ def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 # finally create the tree sio = BytesIO() - tree_to_stream(tree_items, sio.write) # converts bytes of each item[0] to str - tree_items_stringified = cast(List[Tuple[str, int, str]], tree_items) + tree_to_stream(tree_items, sio.write) # writes to stream as bytes, but doesnt change tree_items sio.seek(0) istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) - return (istream.binsha, tree_items_stringified) + return (istream.binsha, tree_items) -def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry: +def _tree_entry_to_baseindexentry(tree_entry: 'TreeCacheTup', stage: int) -> BaseIndexEntry: return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) @@ -319,14 +319,13 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr :param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas If 1 or two, the entries will effectively correspond to the last given tree If 3 are given, a 3 way merge is performed""" - out = [] # type: List[BaseIndexEntry] - out_append = out.append + out: List[BaseIndexEntry] = [] # one and two way is the same for us, as we don't have to handle an existing # index, instrea if len(tree_shas) in (1, 2): for entry in traverse_tree_recursive(odb, tree_shas[-1], ''): - out_append(_tree_entry_to_baseindexentry(entry, 0)) + out.append(_tree_entry_to_baseindexentry(entry, 0)) # END for each entry return out # END handle single tree @@ -347,23 +346,23 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr if(base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or \ (base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]): # changed by both - out_append(_tree_entry_to_baseindexentry(base, 1)) - out_append(_tree_entry_to_baseindexentry(ours, 2)) - out_append(_tree_entry_to_baseindexentry(theirs, 3)) + out.append(_tree_entry_to_baseindexentry(base, 1)) + out.append(_tree_entry_to_baseindexentry(ours, 2)) + out.append(_tree_entry_to_baseindexentry(theirs, 3)) elif base[0] != ours[0] or base[1] != ours[1]: # only we changed it - out_append(_tree_entry_to_baseindexentry(ours, 0)) + out.append(_tree_entry_to_baseindexentry(ours, 0)) else: # either nobody changed it, or they did. In either # case, use theirs - out_append(_tree_entry_to_baseindexentry(theirs, 0)) + out.append(_tree_entry_to_baseindexentry(theirs, 0)) # END handle modification else: if ours[0] != base[0] or ours[1] != base[1]: # they deleted it, we changed it, conflict - out_append(_tree_entry_to_baseindexentry(base, 1)) - out_append(_tree_entry_to_baseindexentry(ours, 2)) + out.append(_tree_entry_to_baseindexentry(base, 1)) + out.append(_tree_entry_to_baseindexentry(ours, 2)) # else: # we didn't change it, ignore # pass @@ -376,8 +375,8 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr else: if theirs[0] != base[0] or theirs[1] != base[1]: # deleted in ours, changed theirs, conflict - out_append(_tree_entry_to_baseindexentry(base, 1)) - out_append(_tree_entry_to_baseindexentry(theirs, 3)) + out.append(_tree_entry_to_baseindexentry(base, 1)) + out.append(_tree_entry_to_baseindexentry(theirs, 3)) # END theirs changed # else: # theirs didn't change @@ -388,18 +387,19 @@ def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntr # all three can't be None if ours is None: # added in their branch - out_append(_tree_entry_to_baseindexentry(theirs, 0)) + assert theirs is not None + out.append(_tree_entry_to_baseindexentry(theirs, 0)) elif theirs is None: # added in our branch - out_append(_tree_entry_to_baseindexentry(ours, 0)) + out.append(_tree_entry_to_baseindexentry(ours, 0)) else: # both have it, except for the base, see whether it changed if ours[0] != theirs[0] or ours[1] != theirs[1]: - out_append(_tree_entry_to_baseindexentry(ours, 2)) - out_append(_tree_entry_to_baseindexentry(theirs, 3)) + out.append(_tree_entry_to_baseindexentry(ours, 2)) + out.append(_tree_entry_to_baseindexentry(theirs, 3)) else: # it was added the same in both - out_append(_tree_entry_to_baseindexentry(ours, 0)) + out.append(_tree_entry_to_baseindexentry(ours, 0)) # END handle two items # END handle heads # END handle base exists diff --git a/git/index/util.py b/git/index/util.py index 471e9262f..4f8af5531 100644 --- a/git/index/util.py +++ b/git/index/util.py @@ -11,9 +11,12 @@ # typing ---------------------------------------------------------------------- -from typing import (Any, Callable) +from typing import (Any, Callable, TYPE_CHECKING) -from git.types import PathLike +from git.types import PathLike, _T + +if TYPE_CHECKING: + from git.index import IndexFile # --------------------------------------------------------------------------------- @@ -52,7 +55,7 @@ def __del__(self) -> None: #{ Decorators -def post_clear_cache(func: Callable[..., Any]) -> Callable[..., Any]: +def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: """Decorator for functions that alter the index using the git command. This would invalidate our possibly existing entries dictionary which is why it must be deleted to allow it to be lazily reread later. @@ -63,7 +66,7 @@ def post_clear_cache(func: Callable[..., Any]) -> Callable[..., Any]: """ @wraps(func) - def post_clear_cache_if_not_raised(self, *args: Any, **kwargs: Any) -> Any: + def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: rval = func(self, *args, **kwargs) self._delete_entries_cache() return rval @@ -72,13 +75,13 @@ def post_clear_cache_if_not_raised(self, *args: Any, **kwargs: Any) -> Any: return post_clear_cache_if_not_raised -def default_index(func: Callable[..., Any]) -> Callable[..., Any]: +def default_index(func: Callable[..., _T]) -> Callable[..., _T]: """Decorator assuring the wrapped method may only run if we are the default repository index. This is as we rely on git commands that operate on that index only. """ @wraps(func) - def check_default_index(self, *args: Any, **kwargs: Any) -> Any: + def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: if self._file_path != self._index_path(): raise AssertionError( "Cannot call %r on indices that do not represent the default git index" % func.__name__) @@ -88,14 +91,14 @@ def check_default_index(self, *args: Any, **kwargs: Any) -> Any: return check_default_index -def git_working_dir(func: Callable[..., Any]) -> Callable[..., None]: +def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]: """Decorator which changes the current working dir to the one of the git repository in order to assure relative paths are handled correctly""" @wraps(func) - def set_git_working_dir(self, *args: Any, **kwargs: Any) -> None: + def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: cur_wd = os.getcwd() - os.chdir(self.repo.working_tree_dir) + os.chdir(str(self.repo.working_tree_dir)) try: return func(self, *args, **kwargs) finally: diff --git a/git/objects/commit.py b/git/objects/commit.py index 81978ae8a..65a87591e 100644 --- a/git/objects/commit.py +++ b/git/objects/commit.py @@ -80,7 +80,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): "message", "parents", "encoding", "gpgsig") _id_attribute_ = "hexsha" - def __init__(self, repo: 'Repo', binsha: bytes, tree: 'Tree' = None, + def __init__(self, repo: 'Repo', binsha: bytes, tree: Union['Tree', None] = None, author: Union[Actor, None] = None, authored_date: Union[int, None] = None, author_tz_offset: Union[None, float] = None, diff --git a/git/objects/fun.py b/git/objects/fun.py index 339a53b8c..fc2ea1e7e 100644 --- a/git/objects/fun.py +++ b/git/objects/fun.py @@ -1,6 +1,7 @@ """Module with functions which are supposed to be as fast as possible""" from stat import S_ISDIR + from git.compat import ( safe_decode, defenc @@ -8,8 +9,14 @@ # typing ---------------------------------------------- -from typing import List, Tuple +from typing import Callable, List, MutableSequence, Sequence, Tuple, TYPE_CHECKING, Union, overload + +if TYPE_CHECKING: + from _typeshed import ReadableBuffer + from git import GitCmdObjectDB +EntryTup = Tuple[bytes, int, str] # same as TreeCacheTup in tree.py +EntryTupOrNone = Union[EntryTup, None] # --------------------------------------------------- @@ -18,7 +25,7 @@ 'traverse_tree_recursive') -def tree_to_stream(entries, write): +def tree_to_stream(entries: Sequence[EntryTup], write: Callable[['ReadableBuffer'], Union[int, None]]) -> None: """Write the give list of entries into a stream using its write method :param entries: **sorted** list of tuples with (binsha, mode, name) :param write: write method which takes a data string""" @@ -42,12 +49,14 @@ def tree_to_stream(entries, write): # According to my tests, this is exactly what git does, that is it just # takes the input literally, which appears to be utf8 on linux. if isinstance(name, str): - name = name.encode(defenc) - write(b''.join((mode_str, b' ', name, b'\0', binsha))) + name_bytes = name.encode(defenc) + else: + name_bytes = name + write(b''.join((mode_str, b' ', name_bytes, b'\0', binsha))) # END for each item -def tree_entries_from_data(data: bytes) -> List[Tuple[bytes, int, str]]: +def tree_entries_from_data(data: bytes) -> List[EntryTup]: """Reads the binary representation of a tree and returns tuples of Tree items :param data: data block with tree data (as bytes) :return: list(tuple(binsha, mode, tree_relative_path), ...)""" @@ -93,11 +102,13 @@ def tree_entries_from_data(data: bytes) -> List[Tuple[bytes, int, str]]: return out -def _find_by_name(tree_data, name, is_dir, start_at): +def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int + ) -> EntryTupOrNone: """return data entry matching the given name and tree mode or None. Before the item is returned, the respective data item is set None in the tree_data list to mark it done""" + try: item = tree_data[start_at] if item and item[2] == name and S_ISDIR(item[1]) == is_dir: @@ -115,16 +126,27 @@ def _find_by_name(tree_data, name, is_dir, start_at): return None -def _to_full_path(item, path_prefix): +@ overload +def _to_full_path(item: None, path_prefix: str) -> None: + ... + + +@ overload +def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup: + ... + + +def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone: """Rebuild entry with given path prefix""" if not item: return item return (item[0], item[1], path_prefix + item[2]) -def traverse_trees_recursive(odb, tree_shas, path_prefix): +def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[bytes, None]], + path_prefix: str) -> List[Tuple[EntryTupOrNone, ...]]: """ - :return: list with entries according to the given binary tree-shas. + :return: list of list with entries according to the given binary tree-shas. The result is encoded in a list of n tuple|None per blob/commit, (n == len(tree_shas)), where * [0] == 20 byte sha @@ -137,28 +159,31 @@ def traverse_trees_recursive(odb, tree_shas, path_prefix): :param path_prefix: a prefix to be added to the returned paths on this level, set it '' for the first iteration :note: The ordering of the returned items will be partially lost""" - trees_data = [] + trees_data: List[List[EntryTupOrNone]] = [] + nt = len(tree_shas) for tree_sha in tree_shas: if tree_sha is None: - data = [] + data: List[EntryTupOrNone] = [] else: - data = tree_entries_from_data(odb.stream(tree_sha).read()) + # make new list for typing as list invariant + data = [x for x in tree_entries_from_data(odb.stream(tree_sha).read())] # END handle muted trees trees_data.append(data) # END for each sha to get data for - out = [] - out_append = out.append + out: List[Tuple[EntryTupOrNone, ...]] = [] # find all matching entries and recursively process them together if the match # is a tree. If the match is a non-tree item, put it into the result. # Processed items will be set None for ti, tree_data in enumerate(trees_data): + for ii, item in enumerate(tree_data): if not item: continue # END skip already done items + entries: List[EntryTupOrNone] entries = [None for _ in range(nt)] entries[ti] = item _sha, mode, name = item @@ -170,16 +195,16 @@ def traverse_trees_recursive(odb, tree_shas, path_prefix): for tio in range(ti + 1, ti + nt): tio = tio % nt entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii) - # END for each other item data + # END for each other item data # if we are a directory, enter recursion if is_dir: out.extend(traverse_trees_recursive( odb, [((ei and ei[0]) or None) for ei in entries], path_prefix + name + '/')) else: - out_append(tuple(_to_full_path(e, path_prefix) for e in entries)) - # END handle recursion + out.append(tuple(_to_full_path(e, path_prefix) for e in entries)) + # END handle recursion # finally mark it done tree_data[ii] = None # END for each item @@ -190,7 +215,7 @@ def traverse_trees_recursive(odb, tree_shas, path_prefix): return out -def traverse_tree_recursive(odb, tree_sha, path_prefix): +def traverse_tree_recursive(odb: 'GitCmdObjectDB', tree_sha: bytes, path_prefix: str) -> List[EntryTup]: """ :return: list of entries of the tree pointed to by the binary tree_sha. An entry has the following format: diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index c95b66f2e..b485dbf6b 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -49,12 +49,13 @@ # typing ---------------------------------------------------------------------- -from typing import Dict, TYPE_CHECKING +from typing import Callable, Dict, Mapping, Sequence, TYPE_CHECKING, cast from typing import Any, Iterator, Union -from git.types import Commit_ish, PathLike +from git.types import Commit_ish, PathLike, TBD if TYPE_CHECKING: + from git.index import IndexFile from git.repo import Repo @@ -114,7 +115,7 @@ def __init__(self, repo: 'Repo', binsha: bytes, path: Union[PathLike, None] = None, name: Union[str, None] = None, parent_commit: Union[Commit_ish, None] = None, - url: str = None, + url: Union[str, None] = None, branch_path: Union[PathLike, None] = None ) -> None: """Initialize this instance with its attributes. We only document the ones @@ -131,14 +132,14 @@ def __init__(self, repo: 'Repo', binsha: bytes, if url is not None: self._url = url if branch_path is not None: - assert isinstance(branch_path, str) + # assert isinstance(branch_path, str) self._branch_path = branch_path if name is not None: self._name = name def _set_cache_(self, attr: str) -> None: if attr in ('path', '_url', '_branch_path'): - reader = self.config_reader() + reader: SectionConstraint = self.config_reader() # default submodule values try: self.path = reader.get('path') @@ -226,7 +227,7 @@ def _config_parser(cls, repo: 'Repo', return SubmoduleConfigParser(fp_module, read_only=read_only) - def _clear_cache(self): + def _clear_cache(self) -> None: # clear the possibly changed values for name in self._cache_attrs: try: @@ -246,7 +247,7 @@ def _sio_modules(cls, parent_commit: Commit_ish) -> BytesIO: def _config_parser_constrained(self, read_only: bool) -> SectionConstraint: """:return: Config Parser constrained to our submodule in read or write mode""" try: - pc = self.parent_commit + pc: Union['Commit_ish', None] = self.parent_commit except ValueError: pc = None # end handle empty parent repository @@ -255,10 +256,12 @@ def _config_parser_constrained(self, read_only: bool) -> SectionConstraint: return SectionConstraint(parser, sm_section(self.name)) @classmethod - def _module_abspath(cls, parent_repo, path, name): + def _module_abspath(cls, parent_repo: 'Repo', path: PathLike, name: str) -> PathLike: if cls._need_gitfile_submodules(parent_repo.git): return osp.join(parent_repo.git_dir, 'modules', name) - return osp.join(parent_repo.working_tree_dir, path) + if parent_repo.working_tree_dir: + return osp.join(parent_repo.working_tree_dir, path) + raise NotADirectoryError() # end @classmethod @@ -286,7 +289,7 @@ def _clone_repo(cls, repo, url, path, name, **kwargs): return clone @classmethod - def _to_relative_path(cls, parent_repo, path): + def _to_relative_path(cls, parent_repo: 'Repo', path: PathLike) -> PathLike: """:return: a path guaranteed to be relative to the given parent - repository :raise ValueError: if path is not contained in the parent repository's working tree""" path = to_native_path_linux(path) @@ -294,7 +297,7 @@ def _to_relative_path(cls, parent_repo, path): path = path[:-1] # END handle trailing slash - if osp.isabs(path): + if osp.isabs(path) and parent_repo.working_tree_dir: working_tree_linux = to_native_path_linux(parent_repo.working_tree_dir) if not path.startswith(working_tree_linux): raise ValueError("Submodule checkout path '%s' needs to be within the parents repository at '%s'" @@ -308,7 +311,7 @@ def _to_relative_path(cls, parent_repo, path): return path @classmethod - def _write_git_file_and_module_config(cls, working_tree_dir, module_abspath): + def _write_git_file_and_module_config(cls, working_tree_dir: PathLike, module_abspath: PathLike) -> None: """Writes a .git file containing a(preferably) relative path to the actual git module repository. It is an error if the module_abspath cannot be made into a relative path, relative to the working_tree_dir :note: will overwrite existing files ! @@ -335,7 +338,8 @@ def _write_git_file_and_module_config(cls, working_tree_dir, module_abspath): @classmethod def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = None, - branch=None, no_checkout: bool = False, depth=None, env=None, clone_multi_options=None + branch: Union[str, None] = None, no_checkout: bool = False, depth: Union[int, None] = None, + env: Union[Mapping[str, str], None] = None, clone_multi_options: Union[Sequence[TBD], None] = None ) -> 'Submodule': """Add a new submodule to the given repository. This will alter the index as well as the .gitmodules file, but will not create a new commit. @@ -391,7 +395,7 @@ def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = No if sm.exists(): # reretrieve submodule from tree try: - sm = repo.head.commit.tree[path] + sm = repo.head.commit.tree[str(path)] sm._name = name return sm except KeyError: @@ -414,12 +418,14 @@ def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = No # END check url # END verify urls match - mrepo = None + mrepo: Union[Repo, None] = None + if url is None: if not has_module: raise ValueError("A URL was not given and a repository did not exist at %s" % path) # END check url mrepo = sm.module() + # assert isinstance(mrepo, git.Repo) urls = [r.url for r in mrepo.remotes] if not urls: raise ValueError("Didn't find any remote url in repository at %s" % sm.abspath) @@ -427,7 +433,7 @@ def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = No url = urls[0] else: # clone new repo - kwargs: Dict[str, Union[bool, int]] = {'n': no_checkout} + kwargs: Dict[str, Union[bool, int, Sequence[TBD]]] = {'n': no_checkout} if not branch_is_default: kwargs['b'] = br.name # END setup checkout-branch @@ -451,6 +457,8 @@ def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = No # otherwise there is a '-' character in front of the submodule listing # a38efa84daef914e4de58d1905a500d8d14aaf45 mymodule (v0.9.0-1-ga38efa8) # -a38efa84daef914e4de58d1905a500d8d14aaf45 submodules/intermediate/one + writer: Union[GitConfigParser, SectionConstraint] + with sm.repo.config_writer() as writer: writer.set_value(sm_section(name), 'url', url) @@ -467,13 +475,15 @@ def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = No sm._branch_path = br.path # we deliberately assume that our head matches our index ! - sm.binsha = mrepo.head.commit.binsha + sm.binsha = mrepo.head.commit.binsha # type: ignore index.add([sm], write=True) return sm - def update(self, recursive=False, init=True, to_latest_revision=False, progress=None, dry_run=False, - force=False, keep_going=False, env=None, clone_multi_options=None): + def update(self, recursive: bool = False, init: bool = True, to_latest_revision: bool = False, + progress: Union['UpdateProgress', None] = None, dry_run: bool = False, + force: bool = False, keep_going: bool = False, env: Union[Mapping[str, str], None] = None, + clone_multi_options: Union[Sequence[TBD], None] = None): """Update the repository of this submodule to point to the checkout we point at with the binsha of this instance. @@ -580,6 +590,7 @@ def update(self, recursive=False, init=True, to_latest_revision=False, progress= if not dry_run: # see whether we have a valid branch to checkout try: + mrepo = cast('Repo', mrepo) # find a remote which has our branch - we try to be flexible remote_branch = find_first_remote_branch(mrepo.remotes, self.branch_name) local_branch = mkhead(mrepo, self.branch_path) @@ -640,7 +651,7 @@ def update(self, recursive=False, init=True, to_latest_revision=False, progress= may_reset = True if mrepo.head.commit.binsha != self.NULL_BIN_SHA: base_commit = mrepo.merge_base(mrepo.head.commit, hexsha) - if len(base_commit) == 0 or base_commit[0].hexsha == hexsha: + if len(base_commit) == 0 or (base_commit[0] is not None and base_commit[0].hexsha == hexsha): if force: msg = "Will force checkout or reset on local branch that is possibly in the future of" msg += "the commit it will be checked out to, effectively 'forgetting' new commits" @@ -807,7 +818,8 @@ def move(self, module_path, configuration=True, module=True): return self @unbare_repo - def remove(self, module=True, force=False, configuration=True, dry_run=False): + def remove(self, module: bool = True, force: bool = False, + configuration: bool = True, dry_run: bool = False) -> 'Submodule': """Remove this submodule from the repository. This will remove our entry from the .gitmodules file and the entry in the .git / config file. @@ -861,7 +873,7 @@ def remove(self, module=True, force=False, configuration=True, dry_run=False): # TODO: If we run into permission problems, we have a highly inconsistent # state. Delete the .git folders last, start with the submodules first mp = self.abspath - method = None + method: Union[None, Callable[[PathLike], None]] = None if osp.islink(mp): method = os.remove elif osp.isdir(mp): @@ -914,7 +926,7 @@ def remove(self, module=True, force=False, configuration=True, dry_run=False): import gc gc.collect() try: - rmtree(wtd) + rmtree(str(wtd)) except Exception as ex: if HIDE_WINDOWS_KNOWN_ERRORS: raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex @@ -928,7 +940,7 @@ def remove(self, module=True, force=False, configuration=True, dry_run=False): rmtree(git_dir) except Exception as ex: if HIDE_WINDOWS_KNOWN_ERRORS: - raise SkipTest("FIXME: fails with: PermissionError\n %s", ex) from ex + raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex else: raise # end handle separate bare repository @@ -952,6 +964,8 @@ def remove(self, module=True, force=False, configuration=True, dry_run=False): # now git config - need the config intact, otherwise we can't query # information anymore + writer: Union[GitConfigParser, SectionConstraint] + with self.repo.config_writer() as writer: writer.remove_section(sm_section(self.name)) @@ -961,7 +975,7 @@ def remove(self, module=True, force=False, configuration=True, dry_run=False): return self - def set_parent_commit(self, commit: Union[Commit_ish, None], check=True): + def set_parent_commit(self, commit: Union[Commit_ish, None], check: bool = True) -> 'Submodule': """Set this instance to use the given commit whose tree is supposed to contain the .gitmodules blob. @@ -1009,7 +1023,7 @@ def set_parent_commit(self, commit: Union[Commit_ish, None], check=True): return self @unbare_repo - def config_writer(self, index=None, write=True): + def config_writer(self, index: Union['IndexFile', None] = None, write: bool = True) -> SectionConstraint: """:return: a config writer instance allowing you to read and write the data belonging to this submodule into the .gitmodules file. @@ -1030,7 +1044,7 @@ def config_writer(self, index=None, write=True): return writer @unbare_repo - def rename(self, new_name): + def rename(self, new_name: str) -> 'Submodule': """Rename this submodule :note: This method takes care of renaming the submodule in various places, such as @@ -1065,13 +1079,14 @@ def rename(self, new_name): destination_module_abspath = self._module_abspath(self.repo, self.path, new_name) source_dir = mod.git_dir # Let's be sure the submodule name is not so obviously tied to a directory - if destination_module_abspath.startswith(mod.git_dir): + if str(destination_module_abspath).startswith(str(mod.git_dir)): tmp_dir = self._module_abspath(self.repo, self.path, str(uuid.uuid4())) os.renames(source_dir, tmp_dir) source_dir = tmp_dir # end handle self-containment os.renames(source_dir, destination_module_abspath) - self._write_git_file_and_module_config(mod.working_tree_dir, destination_module_abspath) + if mod.working_tree_dir: + self._write_git_file_and_module_config(mod.working_tree_dir, destination_module_abspath) # end move separate git repository return self @@ -1081,7 +1096,7 @@ def rename(self, new_name): #{ Query Interface @unbare_repo - def module(self): + def module(self) -> 'Repo': """:return: Repo instance initialized from the repository at our submodule path :raise InvalidGitRepositoryError: if a repository was not available. This could also mean that it was not yet initialized""" @@ -1098,7 +1113,7 @@ def module(self): raise InvalidGitRepositoryError("Repository at %r was not yet checked out" % module_checkout_abspath) # END handle exceptions - def module_exists(self): + def module_exists(self) -> bool: """:return: True if our module exists and is a valid git repository. See module() method""" try: self.module() @@ -1107,7 +1122,7 @@ def module_exists(self): return False # END handle exception - def exists(self): + def exists(self) -> bool: """ :return: True if the submodule exists, False otherwise. Please note that a submodule may exist ( in the .gitmodules file) even though its module @@ -1148,26 +1163,26 @@ def branch(self): return mkhead(self.module(), self._branch_path) @property - def branch_path(self): + def branch_path(self) -> PathLike: """ :return: full(relative) path as string to the branch we would checkout from the remote and track""" return self._branch_path @property - def branch_name(self): + def branch_name(self) -> str: """:return: the name of the branch, which is the shortest possible branch name""" # use an instance method, for this we create a temporary Head instance # which uses a repository that is available at least ( it makes no difference ) return git.Head(self.repo, self._branch_path).name @property - def url(self): + def url(self) -> str: """:return: The url to the repository which our module - repository refers to""" return self._url @property - def parent_commit(self): + def parent_commit(self) -> 'Commit_ish': """:return: Commit instance with the tree containing the .gitmodules file :note: will always point to the current head's commit if it was not set explicitly""" if self._parent_commit is None: @@ -1175,7 +1190,7 @@ def parent_commit(self): return self._parent_commit @property - def name(self): + def name(self) -> str: """:return: The name of this submodule. It is used to identify it within the .gitmodules file. :note: by default, the name is the path at which to find the submodule, but diff --git a/git/objects/submodule/root.py b/git/objects/submodule/root.py index 0af487100..bcac5419a 100644 --- a/git/objects/submodule/root.py +++ b/git/objects/submodule/root.py @@ -10,6 +10,18 @@ import logging +# typing ------------------------------------------------------------------- + +from typing import TYPE_CHECKING, Union + +from git.types import Commit_ish + +if TYPE_CHECKING: + from git.repo import Repo + from git.util import IterableList + +# ---------------------------------------------------------------------------- + __all__ = ["RootModule", "RootUpdateProgress"] log = logging.getLogger('git.objects.submodule.root') @@ -42,7 +54,7 @@ class RootModule(Submodule): k_root_name = '__ROOT__' - def __init__(self, repo): + def __init__(self, repo: 'Repo'): # repo, binsha, mode=None, path=None, name = None, parent_commit=None, url=None, ref=None) super(RootModule, self).__init__( repo, @@ -55,15 +67,17 @@ def __init__(self, repo): branch_path=git.Head.to_full_path(self.k_head_default) ) - def _clear_cache(self): + def _clear_cache(self) -> None: """May not do anything""" pass #{ Interface - def update(self, previous_commit=None, recursive=True, force_remove=False, init=True, - to_latest_revision=False, progress=None, dry_run=False, force_reset=False, - keep_going=False): + def update(self, previous_commit: Union[Commit_ish, None] = None, # type: ignore[override] + recursive: bool = True, force_remove: bool = False, init: bool = True, + to_latest_revision: bool = False, progress: Union[None, 'RootUpdateProgress'] = None, + dry_run: bool = False, force_reset: bool = False, keep_going: bool = False + ) -> 'RootModule': """Update the submodules of this repository to the current HEAD commit. This method behaves smartly by determining changes of the path of a submodules repository, next to changes to the to-be-checked-out commit or the branch to be @@ -128,8 +142,8 @@ def update(self, previous_commit=None, recursive=True, force_remove=False, init= previous_commit = repo.commit(previous_commit) # obtain commit object # END handle previous commit - psms = self.list_items(repo, parent_commit=previous_commit) - sms = self.list_items(repo) + psms: 'IterableList[Submodule]' = self.list_items(repo, parent_commit=previous_commit) + sms: 'IterableList[Submodule]' = self.list_items(repo) spsms = set(psms) ssms = set(sms) @@ -162,8 +176,8 @@ def update(self, previous_commit=None, recursive=True, force_remove=False, init= csms = (spsms & ssms) len_csms = len(csms) for i, csm in enumerate(csms): - psm = psms[csm.name] - sm = sms[csm.name] + psm: 'Submodule' = psms[csm.name] + sm: 'Submodule' = sms[csm.name] # PATH CHANGES ############## @@ -343,7 +357,7 @@ def update(self, previous_commit=None, recursive=True, force_remove=False, init= return self - def module(self): + def module(self) -> 'Repo': """:return: the actual repository containing the submodules""" return self.repo #} END interface diff --git a/git/objects/submodule/util.py b/git/objects/submodule/util.py index 5290000be..a776af889 100644 --- a/git/objects/submodule/util.py +++ b/git/objects/submodule/util.py @@ -5,11 +5,20 @@ import weakref -from typing import Any, TYPE_CHECKING, Union +# typing ----------------------------------------------------------------------- + +from typing import Any, Sequence, TYPE_CHECKING, Union + +from git.types import PathLike if TYPE_CHECKING: from .base import Submodule from weakref import ReferenceType + from git.repo import Repo + from git.refs import Head + from git import Remote + from git.refs import RemoteReference + __all__ = ('sm_section', 'sm_name', 'mkhead', 'find_first_remote_branch', 'SubmoduleConfigParser') @@ -17,23 +26,23 @@ #{ Utilities -def sm_section(name): +def sm_section(name: str) -> str: """:return: section title used in .gitmodules configuration file""" - return 'submodule "%s"' % name + return f'submodule "{name}"' -def sm_name(section): +def sm_name(section: str) -> str: """:return: name of the submodule as parsed from the section name""" section = section.strip() return section[11:-1] -def mkhead(repo, path): +def mkhead(repo: 'Repo', path: PathLike) -> 'Head': """:return: New branch/head instance""" return git.Head(repo, git.Head.to_full_path(path)) -def find_first_remote_branch(remotes, branch_name): +def find_first_remote_branch(remotes: Sequence['Remote'], branch_name: str) -> 'RemoteReference': """Find the remote branch matching the name of the given branch or raise InvalidGitRepositoryError""" for remote in remotes: try: @@ -92,7 +101,7 @@ def flush_to_index(self) -> None: #{ Overridden Methods def write(self) -> None: - rval = super(SubmoduleConfigParser, self).write() + rval: None = super(SubmoduleConfigParser, self).write() self.flush_to_index() return rval # END overridden methods diff --git a/git/objects/tree.py b/git/objects/tree.py index 2e8d8a794..a9656c1d3 100644 --- a/git/objects/tree.py +++ b/git/objects/tree.py @@ -4,8 +4,8 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php -from git.util import join_path -import git.diff as diff +from git.util import IterableList, join_path +import git.diff as git_diff from git.util import to_bin_sha from . import util @@ -21,8 +21,8 @@ # typing ------------------------------------------------- -from typing import (Callable, Dict, Generic, Iterable, Iterator, List, - Tuple, Type, TypeVar, Union, cast, TYPE_CHECKING) +from typing import (Any, Callable, Dict, Iterable, Iterator, List, + Tuple, Type, Union, cast, TYPE_CHECKING) from git.types import PathLike, TypeGuard @@ -30,10 +30,15 @@ from git.repo import Repo from io import BytesIO -T_Tree_cache = TypeVar('T_Tree_cache', bound=Tuple[bytes, int, str]) +TreeCacheTup = Tuple[bytes, int, str] + TraversedTreeTup = Union[Tuple[Union['Tree', None], IndexObjUnion, Tuple['Submodule', 'Submodule']]] + +def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]: + return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str) + #-------------------------------------------------------- @@ -42,9 +47,9 @@ __all__ = ("TreeModifier", "Tree") -def git_cmp(t1: T_Tree_cache, t2: T_Tree_cache) -> int: +def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int: a, b = t1[2], t2[2] - assert isinstance(a, str) and isinstance(b, str) # Need as mypy 9.0 cannot unpack TypeVar properly + # assert isinstance(a, str) and isinstance(b, str) len_a, len_b = len(a), len(b) min_len = min(len_a, len_b) min_cmp = cmp(a[:min_len], b[:min_len]) @@ -55,8 +60,8 @@ def git_cmp(t1: T_Tree_cache, t2: T_Tree_cache) -> int: return len_a - len_b -def merge_sort(a: List[T_Tree_cache], - cmp: Callable[[T_Tree_cache, T_Tree_cache], int]) -> None: +def merge_sort(a: List[TreeCacheTup], + cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None: if len(a) < 2: return None @@ -91,7 +96,7 @@ def merge_sort(a: List[T_Tree_cache], k = k + 1 -class TreeModifier(Generic[T_Tree_cache], object): +class TreeModifier(object): """A utility class providing methods to alter the underlying cache in a list-like fashion. @@ -99,7 +104,7 @@ class TreeModifier(Generic[T_Tree_cache], object): the cache of a tree, will be sorted. Assuring it will be in a serializable state""" __slots__ = '_cache' - def __init__(self, cache: List[T_Tree_cache]) -> None: + def __init__(self, cache: List[TreeCacheTup]) -> None: self._cache = cache def _index_by_name(self, name: str) -> int: @@ -141,11 +146,8 @@ def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> 'TreeMod sha = to_bin_sha(sha) index = self._index_by_name(name) - def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[T_Tree_cache]: - return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str) - item = (sha, mode, name) - assert is_tree_cache(item) + # assert is_tree_cache(item) if index == -1: self._cache.append(item) @@ -167,7 +169,7 @@ def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None: For more information on the parameters, see ``add`` :param binsha: 20 byte binary sha""" assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) - tree_cache = cast(T_Tree_cache, (binsha, mode, name)) + tree_cache = (binsha, mode, name) self._cache.append(tree_cache) @@ -180,7 +182,7 @@ def __delitem__(self, name: str) -> None: #} END mutators -class Tree(IndexObject, diff.Diffable, util.Traversable, util.Serializable): +class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): """Tree objects represent an ordered list of Blobs and other Trees. @@ -216,7 +218,6 @@ def __init__(self, repo: 'Repo', binsha: bytes, mode: int = tree_id << 12, path: def _get_intermediate_items(cls, index_object: 'Tree', ) -> Union[Tuple['Tree', ...], Tuple[()]]: if index_object.type == "tree": - index_object = cast('Tree', index_object) return tuple(index_object._iter_convert_to_object(index_object._cache)) return () @@ -224,12 +225,12 @@ def _set_cache_(self, attr: str) -> None: if attr == "_cache": # Set the data when we need it ostream = self.repo.odb.stream(self.binsha) - self._cache: List[Tuple[bytes, int, str]] = tree_entries_from_data(ostream.read()) + self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read()) else: super(Tree, self)._set_cache_(attr) # END handle attribute - def _iter_convert_to_object(self, iterable: Iterable[Tuple[bytes, int, str]] + def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup] ) -> Iterator[IndexObjUnion]: """Iterable yields tuples of (binsha, mode, name), which will be converted to the respective object representation""" @@ -324,6 +325,14 @@ def traverse(self, # type: ignore # overrides super() super(Tree, self).traverse(predicate, prune, depth, # type: ignore branch_first, visit_once, ignore_self)) + def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]: + """ + :return: IterableList with the results of the traversal as produced by + traverse() + Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] + """ + return super(Tree, self).list_traverse(* args, **kwargs) + # List protocol def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]: diff --git a/git/objects/util.py b/git/objects/util.py index 0b449b7bb..fbe3d9def 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -23,7 +23,7 @@ from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) -from git.types import Literal, TypeGuard +from git.types import Has_id_attribute, Literal if TYPE_CHECKING: from io import BytesIO, StringIO @@ -32,8 +32,15 @@ from .tag import TagObject from .tree import Tree, TraversedTreeTup from subprocess import Popen + from .submodule.base import Submodule + + +class TraverseNT(NamedTuple): + depth: int + item: Union['Traversable', 'Blob'] + src: Union['Traversable', None] + - T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule @@ -306,20 +313,29 @@ class Tree:: (cls, Tree) -> Tuple[Tree, ...] """ raise NotImplementedError("To be implemented in subclass") - def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList['TraversableIterableObj']: + def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: """ :return: IterableList with the results of the traversal as produced by traverse() - List objects must be IterableObj and Traversable e.g. Commit, Submodule""" - - def is_TraversableIterableObj(inp: 'Traversable') -> TypeGuard['TraversableIterableObj']: - # return isinstance(self, TraversableIterableObj) - # Can it be anythin else? - return isinstance(self, Traversable) - - assert is_TraversableIterableObj(self), f"{type(self)}" - out: IterableList['TraversableIterableObj'] = IterableList(self._id_attribute_) - out.extend(self.traverse(*args, **kwargs)) + Commit -> IterableList['Commit'] + Submodule -> IterableList['Submodule'] + Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] + """ + # Commit and Submodule have id.__attribute__ as IterableObj + # Tree has id.__attribute__ inherited from IndexObject + if isinstance(self, (TraversableIterableObj, Has_id_attribute)): + id = self._id_attribute_ + else: + id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ + # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? + + out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) + # overloads in subclasses (mypy does't allow typing self: subclass) + # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] + + # NOTE: if is_edge=True, self.traverse returns a Tuple, so should be prevented or flattened? + kwargs['as_edge'] = False + out.extend(self.traverse(*args, **kwargs)) # type: ignore return out def traverse(self, @@ -364,15 +380,11 @@ def traverse(self, Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] Tree -> Iterator[Union[Blob, Tree, Submodule, Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] - + ignore_self=True is_edge=True -> Iterator[item] ignore_self=True is_edge=False --> Iterator[item] ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" - class TraverseNT(NamedTuple): - depth: int - item: Union['Traversable', 'Blob'] - src: Union['Traversable', None] visited = set() stack = deque() # type: Deque[TraverseNT] @@ -447,7 +459,10 @@ class TraversableIterableObj(Traversable, IterableObj): TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] - @overload # type: ignore + def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: # type: ignore[override] + return super(TraversableIterableObj, self).list_traverse(* args, **kwargs) + + @ overload # type: ignore def traverse(self: T_TIobj, predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], @@ -457,7 +472,7 @@ def traverse(self: T_TIobj, ) -> Iterator[T_TIobj]: ... - @overload + @ overload def traverse(self: T_TIobj, predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], @@ -467,7 +482,7 @@ def traverse(self: T_TIobj, ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: ... - @overload + @ overload def traverse(self: T_TIobj, predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], diff --git a/git/refs/head.py b/git/refs/head.py index c698004dc..97c8e6a1f 100644 --- a/git/refs/head.py +++ b/git/refs/head.py @@ -5,9 +5,13 @@ from .symbolic import SymbolicReference from .reference import Reference -from typing import Union +from typing import Union, TYPE_CHECKING + from git.types import Commit_ish +if TYPE_CHECKING: + from git.repo import Repo + __all__ = ["HEAD", "Head"] @@ -25,12 +29,13 @@ class HEAD(SymbolicReference): _ORIG_HEAD_NAME = 'ORIG_HEAD' __slots__ = () - def __init__(self, repo, path=_HEAD_NAME): + def __init__(self, repo: 'Repo', path=_HEAD_NAME): if path != self._HEAD_NAME: raise ValueError("HEAD instance must point to %r, got %r" % (self._HEAD_NAME, path)) super(HEAD, self).__init__(repo, path) + self.commit: 'Commit_ish' - def orig_head(self): + def orig_head(self) -> 'SymbolicReference': """ :return: SymbolicReference pointing at the ORIG_HEAD, which is maintained to contain the previous value of HEAD""" diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index ca0691d92..f0bd9316f 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -1,3 +1,4 @@ +from git.types import PathLike import os from git.compat import defenc @@ -408,7 +409,7 @@ def log_entry(self, index): return RefLog.entry_at(RefLog.path(self), index) @classmethod - def to_full_path(cls, path): + def to_full_path(cls, path) -> PathLike: """ :return: string with a full repository-relative path which can be used to initialize a Reference instance, for instance by using ``Reference.from_path``""" diff --git a/git/remote.py b/git/remote.py index 0ef54ea7e..f59b3245b 100644 --- a/git/remote.py +++ b/git/remote.py @@ -42,6 +42,7 @@ if TYPE_CHECKING: from git.repo.base import Repo + from git.objects.submodule.base import UpdateProgress # from git.objects.commit import Commit # from git.objects import Blob, Tree, TagObject @@ -64,7 +65,9 @@ def is_flagKeyLiteral(inp: str) -> TypeGuard[flagKeyLiteral]: #{ Utilities -def add_progress(kwargs: Any, git: Git, progress: Union[Callable[..., Any], None]) -> Any: +def add_progress(kwargs: Any, git: Git, + progress: Union[RemoteProgress, 'UpdateProgress', Callable[..., RemoteProgress], None] + ) -> Any: """Add the --progress flag to the given kwargs dict if supported by the git command. If the actual progress in the given progress instance is not given, we do not request any progress @@ -446,8 +449,9 @@ def __init__(self, repo: 'Repo', name: str) -> None: :param repo: The repository we are a remote of :param name: the name of the remote, i.e. 'origin'""" - self.repo = repo # type: 'Repo' + self.repo = repo self.name = name + self.url: str def __getattr__(self, attr: str) -> Any: """Allows to call this instance like @@ -466,7 +470,7 @@ def __getattr__(self, attr: str) -> Any: def _config_section_name(self) -> str: return 'remote "%s"' % self.name - def _set_cache_(self, attr: str) -> Any: + def _set_cache_(self, attr: str) -> None: if attr == "_config_reader": # NOTE: This is cached as __getattr__ is overridden to return remote config values implicitly, such as # in print(r.pushurl) @@ -555,7 +559,7 @@ def delete_url(self, url: str, **kwargs: Any) -> 'Remote': """ return self.set_url(url, delete=True) - @property + @ property def urls(self) -> Iterator[str]: """:return: Iterator yielding all configured URL targets on a remote as strings""" try: @@ -588,7 +592,7 @@ def urls(self) -> Iterator[str]: else: raise ex - @property + @ property def refs(self) -> IterableList[RemoteReference]: """ :return: @@ -599,7 +603,7 @@ def refs(self) -> IterableList[RemoteReference]: out_refs.extend(RemoteReference.list_items(self.repo, remote=self.name)) return out_refs - @property + @ property def stale_refs(self) -> IterableList[Reference]: """ :return: @@ -633,7 +637,7 @@ def stale_refs(self) -> IterableList[Reference]: # END for each line return out_refs - @classmethod + @ classmethod def create(cls, repo: 'Repo', name: str, url: str, **kwargs: Any) -> 'Remote': """Create a new remote to the given repository :param repo: Repository instance that is to receive the new remote @@ -650,7 +654,7 @@ def create(cls, repo: 'Repo', name: str, url: str, **kwargs: Any) -> 'Remote': # add is an alias add = create - @classmethod + @ classmethod def remove(cls, repo: 'Repo', name: str) -> str: """Remove the remote with the given name :return: the passed remote name to remove @@ -794,7 +798,7 @@ def _assert_refspec(self) -> None: config.release() def fetch(self, refspec: Union[str, List[str], None] = None, - progress: Union[Callable[..., Any], None] = None, + progress: Union[RemoteProgress, None, 'UpdateProgress'] = None, verbose: bool = True, **kwargs: Any) -> IterableList[FetchInfo]: """Fetch the latest changes for this remote @@ -841,7 +845,7 @@ def fetch(self, refspec: Union[str, List[str], None] = None, return res def pull(self, refspec: Union[str, List[str], None] = None, - progress: Union[Callable[..., Any], None] = None, + progress: Union[RemoteProgress, 'UpdateProgress', None] = None, **kwargs: Any) -> IterableList[FetchInfo]: """Pull changes from the given branch, being the same as a fetch followed by a merge of branch with your local branch. @@ -862,7 +866,7 @@ def pull(self, refspec: Union[str, List[str], None] = None, return res def push(self, refspec: Union[str, List[str], None] = None, - progress: Union[Callable[..., Any], None] = None, + progress: Union[RemoteProgress, 'UpdateProgress', Callable[..., RemoteProgress], None] = None, **kwargs: Any) -> IterableList[PushInfo]: """Push changes from source branch in refspec to target branch in refspec. diff --git a/git/repo/base.py b/git/repo/base.py index d77b19c13..3214b528e 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -36,16 +36,20 @@ # typing ------------------------------------------------------ -from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish +from git.types import TBD, PathLike, Lit_config_levels, Commit_ish, Tree_ish, is_config_level from typing import (Any, BinaryIO, Callable, Dict, Iterator, List, Mapping, Optional, Sequence, TextIO, Tuple, Type, Union, NamedTuple, cast, TYPE_CHECKING) -if TYPE_CHECKING: # only needed for types +from git.types import ConfigLevels_Tup + +if TYPE_CHECKING: from git.util import IterableList from git.refs.symbolic import SymbolicReference from git.objects import Tree + from git.objects.submodule.base import UpdateProgress + from git.remote import RemoteProgress # ----------------------------------------------------------- @@ -55,12 +59,11 @@ __all__ = ('Repo',) -BlameEntry = NamedTuple('BlameEntry', [ - ('commit', Dict[str, TBD]), - ('linenos', range), - ('orig_path', Optional[str]), - ('orig_linenos', range)] -) +class BlameEntry(NamedTuple): + commit: Dict[str, 'Commit'] + linenos: range + orig_path: Optional[str] + orig_linenos: range class Repo(object): @@ -95,7 +98,7 @@ class Repo(object): # invariants # represents the configuration level of a configuration file - config_level = ("system", "user", "global", "repository") # type: Tuple[Lit_config_levels, ...] + config_level: ConfigLevels_Tup = ("system", "user", "global", "repository") # Subclass configuration # Subclasses may easily bring in their own custom types by placing a constructor or type here @@ -495,7 +498,7 @@ def config_reader(self, config_level: Optional[Lit_config_levels] = None) -> Git unknown, instead the global path will be used.""" files = None if config_level is None: - files = [self._get_config_path(f) for f in self.config_level] + files = [self._get_config_path(f) for f in self.config_level if is_config_level(f)] else: files = [self._get_config_path(config_level)] return GitConfigParser(files, read_only=True, repo=self) @@ -574,7 +577,7 @@ def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, Sequenc return Commit.iter_items(self, rev, paths, **kwargs) def merge_base(self, *rev: TBD, **kwargs: Any - ) -> List[Union['SymbolicReference', Commit_ish, None]]: + ) -> List[Union[Commit_ish, None]]: """Find the closest common ancestor for the given revision (e.g. Commits, Tags, References, etc) :param rev: At least two revs to find the common ancestor for. @@ -587,7 +590,7 @@ def merge_base(self, *rev: TBD, **kwargs: Any raise ValueError("Please specify at least two revs, got only %i" % len(rev)) # end handle input - res = [] # type: List[Union['SymbolicReference', Commit_ish, None]] + res = [] # type: List[Union[Commit_ish, None]] try: lines = self.git.merge_base(*rev, **kwargs).splitlines() # List[str] except GitCommandError as err: @@ -620,7 +623,7 @@ def is_ancestor(self, ancestor_rev: 'Commit', rev: 'Commit') -> bool: raise return True - def is_valid_object(self, sha: str, object_type: str = None) -> bool: + def is_valid_object(self, sha: str, object_type: Union[str, None] = None) -> bool: try: complete_sha = self.odb.partial_to_complete_sha_hex(sha) object_info = self.odb.info(complete_sha) @@ -801,7 +804,7 @@ def blame_incremental(self, rev: TBD, file: TBD, **kwargs: Any) -> Optional[Iter should get a continuous range spanning all line numbers in the file. """ data = self.git.blame(rev, '--', file, p=True, incremental=True, stdout_as_string=False, **kwargs) - commits = {} # type: Dict[str, TBD] + commits: Dict[str, Commit] = {} stream = (line for line in data.split(b'\n') if line) while True: @@ -973,7 +976,7 @@ def blame(self, rev: TBD, file: TBD, incremental: bool = False, **kwargs: Any return blames @classmethod - def init(cls, path: PathLike = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB, + def init(cls, path: Union[PathLike, None] = None, mkdir: bool = True, odbt: Type[GitCmdObjectDB] = GitCmdObjectDB, expand_vars: bool = True, **kwargs: Any) -> 'Repo': """Initialize a git repository at the given path if specified @@ -1013,7 +1016,8 @@ def init(cls, path: PathLike = None, mkdir: bool = True, odbt: Type[GitCmdObject @classmethod def _clone(cls, git: 'Git', url: PathLike, path: PathLike, odb_default_type: Type[GitCmdObjectDB], - progress: Optional[Callable], multi_options: Optional[List[str]] = None, **kwargs: Any + progress: Union['RemoteProgress', 'UpdateProgress', Callable[..., 'RemoteProgress'], None] = None, + multi_options: Optional[List[str]] = None, **kwargs: Any ) -> 'Repo': odbt = kwargs.pop('odbt', odb_default_type) diff --git a/git/types.py b/git/types.py index fb63f46e7..9181e0406 100644 --- a/git/types.py +++ b/git/types.py @@ -4,13 +4,16 @@ import os import sys -from typing import Dict, Union, Any, TYPE_CHECKING +from typing import (Callable, Dict, NoReturn, Sequence, Tuple, Union, Any, Iterator, # noqa: F401 + NamedTuple, TYPE_CHECKING, TypeVar) # noqa: F401 +if TYPE_CHECKING: + from git.repo import Repo if sys.version_info[:2] >= (3, 8): - from typing import Final, Literal, SupportsIndex, TypedDict, Protocol # noqa: F401 + from typing import Final, Literal, SupportsIndex, TypedDict, Protocol, runtime_checkable # noqa: F401 else: - from typing_extensions import Final, Literal, SupportsIndex, TypedDict, Protocol # noqa: F401 + from typing_extensions import Final, Literal, SupportsIndex, TypedDict, Protocol, runtime_checkable # noqa: F401 if sys.version_info[:2] >= (3, 10): from typing import TypeGuard # noqa: F401 @@ -29,13 +32,44 @@ # from git.refs import SymbolicReference TBD = Any +_T = TypeVar('_T') Tree_ish = Union['Commit', 'Tree'] Commit_ish = Union['Commit', 'TagObject', 'Blob', 'Tree'] +# Config_levels --------------------------------------------------------- + Lit_config_levels = Literal['system', 'global', 'user', 'repository'] +def is_config_level(inp: str) -> TypeGuard[Lit_config_levels]: + # return inp in get_args(Lit_config_level) # only py >= 3.8 + return inp in ("system", "user", "global", "repository") + + +ConfigLevels_Tup = Tuple[Literal['system'], Literal['user'], Literal['global'], Literal['repository']] + +#----------------------------------------------------------------------------------- + + +def assert_never(inp: NoReturn, raise_error: bool = True, exc: Union[Exception, None] = None) -> None: + """For use in exhaustive checking of literal or Enum in if/else chain. + Should only be reached if all memebers not handled OR attempt to pass non-members through chain. + + If all members handled, type is Empty. Otherwise, will cause mypy error. + If non-members given, should cause mypy error at variable creation. + + If raise_error is True, will also raise AssertionError or the Exception passed to exc. + """ + if raise_error: + if exc is None: + raise ValueError(f"An unhandled Literal ({inp}) in an if/else chain was found") + else: + raise exc + else: + pass + + class Files_TD(TypedDict): insertions: int deletions: int @@ -52,3 +86,13 @@ class Total_TD(TypedDict): class HSH_TD(TypedDict): total: Total_TD files: Dict[PathLike, Files_TD] + + +@runtime_checkable +class Has_Repo(Protocol): + repo: 'Repo' + + +@runtime_checkable +class Has_id_attribute(Protocol): + _id_attribute_: str diff --git a/git/util.py b/git/util.py index abc82bd35..571e261e1 100644 --- a/git/util.py +++ b/git/util.py @@ -36,11 +36,14 @@ from git.remote import Remote from git.repo.base import Repo from git.config import GitConfigParser, SectionConstraint + # from git.objects.base import IndexObject -from .types import (Literal, Protocol, SupportsIndex, # because behind py version guards - PathLike, HSH_TD, Total_TD, Files_TD) # aliases -T_IterableObj = TypeVar('T_IterableObj', bound='IterableObj', covariant=True) +from .types import (Literal, SupportsIndex, # because behind py version guards + PathLike, HSH_TD, Total_TD, Files_TD, # aliases + Has_id_attribute) + +T_IterableObj = TypeVar('T_IterableObj', bound=Union['IterableObj', 'Has_id_attribute'], covariant=True) # So IterableList[Head] is subtype of IterableList[IterableObj] # --------------------------------------------------------------------- @@ -80,15 +83,17 @@ HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_KNOWN_ERRORS', True) HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get('HIDE_WINDOWS_FREEZE_ERRORS', True) -#{ Utility Methods +# { Utility Methods + +T = TypeVar('T') -def unbare_repo(func: Callable) -> Callable: +def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: """Methods with this decorator raise InvalidGitRepositoryError if they encounter a bare repository""" @wraps(func) - def wrapper(self: 'Remote', *args: Any, **kwargs: Any) -> Callable: + def wrapper(self: 'Remote', *args: Any, **kwargs: Any) -> T: if self.repo.bare: raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) # END bare method @@ -243,7 +248,7 @@ def is_exec(fpath: str) -> bool: def _cygexpath(drive: Optional[str], path: str) -> str: if osp.isabs(path) and not drive: - ## Invoked from `cygpath()` directly with `D:Apps\123`? + # Invoked from `cygpath()` directly with `D:Apps\123`? # It's an error, leave it alone just slashes) p = path # convert to str if AnyPath given else: @@ -261,8 +266,8 @@ def _cygexpath(drive: Optional[str], path: str) -> str: _cygpath_parsers = ( - ## See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx - ## and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths + # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx + # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths (re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), (lambda server, share, rest_path: '//%s/%s/%s' % (server, share, rest_path.replace('\\', '/'))), False @@ -293,7 +298,7 @@ def _cygexpath(drive: Optional[str], path: str) -> str: def cygpath(path: str) -> str: """Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment.""" path = str(path) # ensure is str and not AnyPath. - #Fix to use Paths when 3.5 dropped. or to be just str if only for urls? + # Fix to use Paths when 3.5 dropped. or to be just str if only for urls? if not path.startswith(('/cygdrive', '//')): for regex, parser, recurse in _cygpath_parsers: match = regex.match(path) @@ -353,7 +358,7 @@ def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: res = py_where(git_executable) git_dir = osp.dirname(res[0]) if res else "" - ## Just a name given, not a real path. + # Just a name given, not a real path. uname_cmd = osp.join(git_dir, 'uname') process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True) @@ -374,7 +379,7 @@ def get_user_id() -> str: def finalize_process(proc: subprocess.Popen, **kwargs: Any) -> None: """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly""" - ## TODO: No close proc-streams?? + # TODO: No close proc-streams?? proc.wait(**kwargs) @@ -428,9 +433,9 @@ def remove_password_if_present(cmdline): return new_cmdline -#} END utilities +# } END utilities -#{ Classes +# { Classes class RemoteProgress(object): @@ -980,7 +985,7 @@ def __contains__(self, attr: object) -> bool: return False # END handle membership - def __getattr__(self, attr: str) -> Any: + def __getattr__(self, attr: str) -> T_IterableObj: attr = self._prefix + attr for item in self: if getattr(item, self._id_attr) == attr: @@ -988,7 +993,7 @@ def __getattr__(self, attr: str) -> Any: # END for each item return list.__getattribute__(self, attr) - def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> Any: + def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> 'T_IterableObj': # type: ignore assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str" @@ -1003,7 +1008,7 @@ def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> Any: raise IndexError("No item found with id %r" % (self._prefix + index)) from e # END handle getattr - def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> Any: + def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: assert isinstance(index, (int, str)), "Index of IterableList should be an int or str" @@ -1066,7 +1071,7 @@ def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any): raise NotImplementedError("To be implemented by Subclass") -class IterableObj(Protocol): +class IterableObj(): """Defines an interface for iterable items which is to assure a uniform way to retrieve and iterate items within the git repository @@ -1097,7 +1102,7 @@ def iter_items(cls, repo: 'Repo', *args: Any, **kwargs: Any :return: iterator yielding Items""" raise NotImplementedError("To be implemented by Subclass") -#} END classes +# } END classes class NullHandler(logging.Handler): diff --git a/mypy.ini b/mypy.ini index 8f86a6af7..67397d40f 100644 --- a/mypy.ini +++ b/mypy.ini @@ -3,6 +3,11 @@ # TODO: enable when we've fully annotated everything # disallow_untyped_defs = True +no_implicit_optional = True +warn_redundant_casts = True +# warn_unused_ignores = True +# warn_unreachable = True +pretty = True # TODO: remove when 'gitdb' is fully annotated [mypy-gitdb.*]