diff --git a/git/cmd.py b/git/cmd.py index d46ccef31..d8b82352d 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -615,7 +615,7 @@ def _set_cache_(self, attr: str) -> None: # END handle version info @property - def working_dir(self) -> Union[None, str]: + def working_dir(self) -> Union[None, PathLike]: """:return: Git directory we are working on""" return self._working_dir @@ -1187,7 +1187,7 @@ def __get_object_header(self, cmd, ref: AnyStr) -> Tuple[str, str, int]: cmd.stdin.flush() return self._parse_object_header(cmd.stdout.readline()) - def get_object_header(self, ref: AnyStr) -> Tuple[str, str, int]: + def get_object_header(self, ref: str) -> Tuple[str, str, int]: """ Use this method to quickly examine the type and size of the object behind the given ref. @@ -1198,7 +1198,7 @@ def get_object_header(self, ref: AnyStr) -> Tuple[str, str, int]: cmd = self._get_persistent_cmd("cat_file_header", "cat_file", batch_check=True) return self.__get_object_header(cmd, ref) - def get_object_data(self, ref: AnyStr) -> Tuple[str, str, int, bytes]: + def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: """ As get_object_header, but returns object data as well :return: (hexsha, type_string, size_as_int,data_string) :note: not threadsafe""" @@ -1207,7 +1207,7 @@ def get_object_data(self, ref: AnyStr) -> Tuple[str, str, int, bytes]: del(stream) return (hexsha, typename, size, data) - def stream_object_data(self, ref: AnyStr) -> Tuple[str, str, int, 'Git.CatFileContentStream']: + def stream_object_data(self, ref: str) -> Tuple[str, str, int, 'Git.CatFileContentStream']: """ As get_object_header, but returns the data as a stream :return: (hexsha, type_string, size_as_int, stream) diff --git a/git/db.py b/git/db.py index dc60c5552..47cccda8d 100644 --- a/git/db.py +++ b/git/db.py @@ -12,7 +12,7 @@ # typing------------------------------------------------- -from typing import TYPE_CHECKING, AnyStr +from typing import TYPE_CHECKING from git.types import PathLike if TYPE_CHECKING: @@ -39,18 +39,18 @@ def __init__(self, root_path: PathLike, git: 'Git') -> None: super(GitCmdObjectDB, self).__init__(root_path) self._git = git - def info(self, sha: bytes) -> OInfo: - hexsha, typename, size = self._git.get_object_header(bin_to_hex(sha)) + def info(self, binsha: bytes) -> OInfo: + hexsha, typename, size = self._git.get_object_header(bin_to_hex(binsha)) return OInfo(hex_to_bin(hexsha), typename, size) - def stream(self, sha: bytes) -> OStream: + def stream(self, binsha: bytes) -> OStream: """For now, all lookup is done by git itself""" - hexsha, typename, size, stream = self._git.stream_object_data(bin_to_hex(sha)) + hexsha, typename, size, stream = self._git.stream_object_data(bin_to_hex(binsha)) return OStream(hex_to_bin(hexsha), typename, size, stream) # { Interface - def partial_to_complete_sha_hex(self, partial_hexsha: AnyStr) -> bytes: + def partial_to_complete_sha_hex(self, partial_hexsha: str) -> bytes: """:return: Full binary 20 byte sha from the given partial hexsha :raise AmbiguousObjectName: :raise BadObject: diff --git a/git/diff.py b/git/diff.py index ca673b0ca..a40fc244e 100644 --- a/git/diff.py +++ b/git/diff.py @@ -16,7 +16,7 @@ # typing ------------------------------------------------------------------ from typing import Any, Iterator, List, Match, Optional, Tuple, Type, Union, TYPE_CHECKING -from git.types import TBD, Final, Literal +from git.types import PathLike, TBD, Final, Literal if TYPE_CHECKING: from .objects.tree import Tree @@ -84,7 +84,7 @@ def _process_diff_args(self, args: List[Union[str, 'Diffable', object]]) -> List return args def diff(self, other: Union[Type[Index], Type['Tree'], object, None, str] = Index, - paths: Union[str, List[str], Tuple[str, ...], None] = None, + paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None, create_patch: bool = False, **kwargs: Any) -> 'DiffIndex': """Creates diffs between two items being trees, trees and index or an index and the working tree. It will detect renames automatically. diff --git a/git/exc.py b/git/exc.py index 1e0caf4ed..e8ff784c7 100644 --- a/git/exc.py +++ b/git/exc.py @@ -11,7 +11,7 @@ # typing ---------------------------------------------------- -from typing import List, Optional, Tuple, Union, TYPE_CHECKING +from typing import List, Sequence, Tuple, Union, TYPE_CHECKING from git.types import PathLike if TYPE_CHECKING: @@ -113,7 +113,7 @@ class CheckoutError(GitError): were checked out successfully and hence match the version stored in the index""" - def __init__(self, message: str, failed_files: List[PathLike], valid_files: List[PathLike], + def __init__(self, message: str, failed_files: Sequence[PathLike], valid_files: Sequence[PathLike], failed_reasons: List[str]) -> None: Exception.__init__(self, message) @@ -139,8 +139,11 @@ class HookExecutionError(CommandError): """Thrown if a hook exits with a non-zero exit code. It provides access to the exit code and the string returned via standard output""" - def __init__(self, command: Union[List[str], Tuple[str, ...], str], status: Optional[str], - stderr: Optional[str] = None, stdout: Optional[str] = None) -> None: + def __init__(self, command: Union[List[str], Tuple[str, ...], str], + status: Union[str, int, None, Exception], + stderr: Union[bytes, str, None] = None, + stdout: Union[bytes, str, None] = None) -> None: + super(HookExecutionError, self).__init__(command, status, stderr, stdout) self._msg = "Hook('%s') failed%s" diff --git a/git/index/base.py b/git/index/base.py index 5b3667ace..044240602 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -3,6 +3,7 @@ # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php +from git.refs.reference import Reference import glob from io import BytesIO import os @@ -63,6 +64,23 @@ git_working_dir ) +# typing ----------------------------------------------------------------------------- + +from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, + Sequence, TYPE_CHECKING, Tuple, Union) + +from git.types import PathLike, TBD + +if TYPE_CHECKING: + from subprocess import Popen + from git.repo import Repo + + +StageType = int +Treeish = Union[Tree, Commit, str, bytes] + +# ------------------------------------------------------------------------------------ + __all__ = ('IndexFile', 'CheckoutError') @@ -93,7 +111,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable): _VERSION = 2 # latest version we support S_IFGITLINK = S_IFGITLINK # a submodule - def __init__(self, repo, file_path=None): + def __init__(self, repo: 'Repo', file_path: PathLike = None) -> None: """Initialize this Index instance, optionally from the given ``file_path``. If no file_path is given, we will be created from the current index file. @@ -102,9 +120,9 @@ def __init__(self, repo, file_path=None): self.repo = repo self.version = self._VERSION self._extension_data = b'' - self._file_path = file_path or self._index_path() + self._file_path = file_path or self._index_path() # type: PathLike - def _set_cache_(self, attr): + def _set_cache_(self, attr: str) -> None: if attr == "entries": # read the current index # try memory map for speed @@ -115,8 +133,8 @@ def _set_cache_(self, attr): ok = True except OSError: # in new repositories, there may be no index, which means we are empty - self.entries = {} - return + self.entries = {} # type: Dict[Tuple[PathLike, StageType], IndexEntry] + return None finally: if not ok: lfd.rollback() @@ -133,15 +151,18 @@ def _set_cache_(self, attr): else: super(IndexFile, self)._set_cache_(attr) - def _index_path(self): - return join_path_native(self.repo.git_dir, "index") + def _index_path(self) -> PathLike: + if self.repo.git_dir: + return join_path_native(self.repo.git_dir, "index") + else: + raise GitCommandError("No git directory given to join index path") @property - def path(self): + def path(self) -> PathLike: """ :return: Path to the index file we are representing """ return self._file_path - def _delete_entries_cache(self): + def _delete_entries_cache(self) -> None: """Safely clear the entries cache so it can be recreated""" try: del(self.entries) @@ -152,18 +173,18 @@ def _delete_entries_cache(self): #{ Serializable Interface - def _deserialize(self, stream): + def _deserialize(self, stream: IO) -> 'IndexFile': """Initialize this instance with index values read from the given stream""" self.version, self.entries, self._extension_data, _conten_sha = read_cache(stream) return self - def _entries_sorted(self): + def _entries_sorted(self) -> List[TBD]: """:return: list of entries, in a sorted fashion, first by path, then by stage""" return sorted(self.entries.values(), key=lambda e: (e.path, e.stage)) - def _serialize(self, stream, ignore_extension_data=False): + def _serialize(self, stream: IO, ignore_extension_data: bool = False) -> 'IndexFile': entries = self._entries_sorted() - extension_data = self._extension_data + extension_data = self._extension_data # type: Union[None, bytes] if ignore_extension_data: extension_data = None write_cache(entries, stream, extension_data) @@ -171,7 +192,7 @@ def _serialize(self, stream, ignore_extension_data=False): #} END serializable interface - def write(self, file_path=None, ignore_extension_data=False): + def write(self, file_path: Union[None, PathLike] = None, ignore_extension_data: bool = False) -> None: """Write the current state to our file path or to the given one :param file_path: @@ -191,7 +212,7 @@ def write(self, file_path=None, ignore_extension_data=False): Alternatively, use IndexFile.write_tree() to handle this case automatically - :return: self""" + :return: self # does it? or returns None?""" # make sure we have our entries read before getting a write lock # else it would be done when streaming. This can happen # if one doesn't change the index, but writes it right away @@ -215,7 +236,7 @@ def write(self, file_path=None, ignore_extension_data=False): @post_clear_cache @default_index - def merge_tree(self, rhs, base=None): + def merge_tree(self, rhs: Treeish, base: Union[None, Treeish] = None) -> 'IndexFile': """Merge the given rhs treeish into the current index, possibly taking a common base treeish into account. @@ -242,7 +263,7 @@ def merge_tree(self, rhs, base=None): # -i : ignore working tree status # --aggressive : handle more merge cases # -m : do an actual merge - args = ["--aggressive", "-i", "-m"] + args = ["--aggressive", "-i", "-m"] # type: List[Union[Treeish, str]] if base is not None: args.append(base) args.append(rhs) @@ -251,7 +272,7 @@ def merge_tree(self, rhs, base=None): return self @classmethod - def new(cls, repo, *tree_sha): + def new(cls, repo: 'Repo', *tree_sha: Union[str, Tree]) -> 'IndexFile': """ Merge the given treeish revisions into a new index which is returned. This method behaves like git-read-tree --aggressive when doing the merge. @@ -264,18 +285,20 @@ def new(cls, repo, *tree_sha): New IndexFile instance. Its path will be undefined. If you intend to write such a merged Index, supply an alternate file_path to its 'write' method.""" - base_entries = aggressive_tree_merge(repo.odb, [to_bin_sha(str(t)) for t in tree_sha]) + tree_sha_bytes = [to_bin_sha(str(t)) for t in tree_sha] # List[bytes] + base_entries = aggressive_tree_merge(repo.odb, tree_sha_bytes) inst = cls(repo) # convert to entries dict - entries = dict(zip(((e.path, e.stage) for e in base_entries), - (IndexEntry.from_base(e) for e in base_entries))) + entries = dict(zip( + ((e.path, e.stage) for e in base_entries), + (IndexEntry.from_base(e) for e in base_entries))) # type: Dict[Tuple[PathLike, int], IndexEntry] inst.entries = entries return inst @classmethod - def from_tree(cls, repo, *treeish, **kwargs): + def from_tree(cls, repo: 'Repo', *treeish: Treeish, **kwargs: Any) -> 'IndexFile': """Merge the given treeish revisions into a new index which is returned. The original index will remain unaltered @@ -312,7 +335,7 @@ def from_tree(cls, repo, *treeish, **kwargs): if len(treeish) == 0 or len(treeish) > 3: raise ValueError("Please specify between 1 and 3 treeish, got %i" % len(treeish)) - arg_list = [] + arg_list = [] # type: List[Union[Treeish, str]] # ignore that working tree and index possibly are out of date if len(treeish) > 1: # drop unmerged entries when reading our index and merging @@ -331,7 +354,8 @@ def from_tree(cls, repo, *treeish, **kwargs): # as it considers existing entries. moving it essentially clears the index. # Unfortunately there is no 'soft' way to do it. # The TemporaryFileSwap assure the original file get put back - index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, 'index')) + if repo.git_dir: + index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, 'index')) try: repo.git.read_tree(*arg_list, **kwargs) index = cls(repo, tmp_index) @@ -346,7 +370,7 @@ def from_tree(cls, repo, *treeish, **kwargs): # UTILITIES @unbare_repo - def _iter_expand_paths(self, paths): + def _iter_expand_paths(self, paths: Sequence[PathLike]) -> Iterator[PathLike]: """Expand the directories in list of paths to the corresponding paths accordingly, Note: git will add items multiple times even if a glob overlapped @@ -354,10 +378,10 @@ def _iter_expand_paths(self, paths): times - we respect that and do not prune""" def raise_exc(e): raise e - r = self.repo.working_tree_dir + r = str(self.repo.working_tree_dir) rs = r + os.sep for path in paths: - abs_path = path + abs_path = str(path) if not osp.isabs(abs_path): abs_path = osp.join(r, path) # END make absolute path @@ -374,7 +398,7 @@ def raise_exc(e): # end check symlink # if the path is not already pointing to an existing file, resolve globs if possible - if not os.path.exists(path) and ('?' in path or '*' in path or '[' in path): + if not os.path.exists(abs_path) and ('?' in abs_path or '*' in abs_path or '[' in abs_path): resolved_paths = glob.glob(abs_path) # not abs_path in resolved_paths: # a glob() resolving to the same path we are feeding it with @@ -396,12 +420,12 @@ def raise_exc(e): # END for each subdirectory except OSError: # was a file or something that could not be iterated - yield path.replace(rs, '') + yield abs_path.replace(rs, '') # END path exception handling # END for each path - def _write_path_to_stdin(self, proc, filepath, item, fmakeexc, fprogress, - read_from_stdout=True): + def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item, fmakeexc, fprogress, + read_from_stdout: bool = True) -> Union[None, str]: """Write path to proc.stdin and make sure it processes the item, including progress. :return: stdout string @@ -417,20 +441,24 @@ def _write_path_to_stdin(self, proc, filepath, item, fmakeexc, fprogress, we will close stdin to break the pipe.""" fprogress(filepath, False, item) - rval = None - try: - proc.stdin.write(("%s\n" % filepath).encode(defenc)) - except IOError as e: - # pipe broke, usually because some error happened - raise fmakeexc() from e - # END write exception handling - proc.stdin.flush() - if read_from_stdout: + rval = None # type: Union[None, str] + + if proc.stdin is not None: + try: + proc.stdin.write(("%s\n" % filepath).encode(defenc)) + except IOError as e: + # pipe broke, usually because some error happened + raise fmakeexc() from e + # END write exception handling + proc.stdin.flush() + + if read_from_stdout and proc.stdout is not None: rval = proc.stdout.readline().strip() fprogress(filepath, True, item) return rval - def iter_blobs(self, predicate=lambda t: True): + def iter_blobs(self, predicate: Callable[[Tuple[StageType, Blob]], bool] = lambda t: True + ) -> Iterator[Tuple[StageType, Blob]]: """ :return: Iterator yielding tuples of Blob objects and stages, tuple(stage, Blob) @@ -446,20 +474,21 @@ def iter_blobs(self, predicate=lambda t: True): yield output # END for each entry - def unmerged_blobs(self): + def unmerged_blobs(self) -> Dict[PathLike, List[Tuple[StageType, Blob]]]: """ :return: - Iterator yielding dict(path : list( tuple( stage, Blob, ...))), being + Dict(path : list( tuple( stage, Blob, ...))), being a dictionary associating a path in the index with a list containing sorted stage/blob pairs + :note: Blobs that have been removed in one side simply do not exist in the given stage. I.e. a file removed on the 'other' branch whose entries are at stage 3 will not have a stage 3 entry. """ is_unmerged_blob = lambda t: t[0] != 0 - path_map = {} + path_map = {} # type: Dict[PathLike, List[Tuple[TBD, Blob]]] for stage, blob in self.iter_blobs(is_unmerged_blob): path_map.setdefault(blob.path, []).append((stage, blob)) # END for each unmerged blob @@ -468,10 +497,10 @@ def unmerged_blobs(self): return path_map @classmethod - def entry_key(cls, *entry): + def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]: return entry_key(*entry) - def resolve_blobs(self, iter_blobs): + def resolve_blobs(self, iter_blobs: Iterator[Blob]) -> 'IndexFile': """Resolve the blobs given in blob iterator. This will effectively remove the index entries of the respective path at all non-null stages and add the given blob as new stage null blob. @@ -489,7 +518,7 @@ def resolve_blobs(self, iter_blobs): for blob in iter_blobs: stage_null_key = (blob.path, 0) if stage_null_key in self.entries: - raise ValueError("Path %r already exists at stage 0" % blob.path) + raise ValueError("Path %r already exists at stage 0" % str(blob.path)) # END assert blob is not stage 0 already # delete all possible stages @@ -506,7 +535,7 @@ def resolve_blobs(self, iter_blobs): return self - def update(self): + def update(self) -> 'IndexFile': """Reread the contents of our index file, discarding all cached information we might have. @@ -517,7 +546,7 @@ def update(self): # allows to lazily reread on demand return self - def write_tree(self): + def write_tree(self) -> Tree: """Writes this index to a corresponding Tree object into the repository's object database and return it. @@ -542,7 +571,8 @@ def write_tree(self): root_tree._cache = tree_items return root_tree - def _process_diff_args(self, args): + def _process_diff_args(self, args: List[Union[str, diff.Diffable, object]] + ) -> List[Union[str, diff.Diffable, object]]: try: args.pop(args.index(self)) except IndexError: @@ -550,18 +580,19 @@ def _process_diff_args(self, args): # END remove self return args - def _to_relative_path(self, path): + def _to_relative_path(self, path: PathLike) -> PathLike: """:return: Version of path relative to our git directory or raise ValueError if it is not within our git direcotory""" if not osp.isabs(path): return path if self.repo.bare: raise InvalidGitRepositoryError("require non-bare repository") - if not path.startswith(self.repo.working_tree_dir): + if not str(path).startswith(str(self.repo.working_tree_dir)): raise ValueError("Absolute path %r is not in git repository at %r" % (path, self.repo.working_tree_dir)) return os.path.relpath(path, self.repo.working_tree_dir) - def _preprocess_add_items(self, items): + def _preprocess_add_items(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]] + ) -> Tuple[List[PathLike], List[BaseIndexEntry]]: """ Split the items into two lists of path strings and BaseEntries. """ paths = [] entries = [] @@ -581,13 +612,14 @@ def _preprocess_add_items(self, items): # END for each item return paths, entries - def _store_path(self, filepath, fprogress): + def _store_path(self, filepath: PathLike, fprogress: Callable) -> BaseIndexEntry: """Store file at filepath in the database and return the base index entry Needs the git_working_dir decorator active ! This must be assured in the calling code""" st = os.lstat(filepath) # handles non-symlinks as well if S_ISLNK(st.st_mode): # in PY3, readlink is string, but we need bytes. In PY2, it's just OS encoded bytes, we assume UTF-8 - open_stream = lambda: BytesIO(force_bytes(os.readlink(filepath), encoding=defenc)) + open_stream = lambda: BytesIO(force_bytes(os.readlink(filepath), + encoding=defenc)) # type: Callable[[], BinaryIO] else: open_stream = lambda: open(filepath, 'rb') with open_stream() as stream: @@ -599,16 +631,18 @@ def _store_path(self, filepath, fprogress): @unbare_repo @git_working_dir - def _entries_for_paths(self, paths, path_rewriter, fprogress, entries): - entries_added = [] + def _entries_for_paths(self, paths: List[str], path_rewriter: Callable, fprogress: Callable, + entries: List[BaseIndexEntry]) -> List[BaseIndexEntry]: + entries_added = [] # type: List[BaseIndexEntry] if path_rewriter: for path in paths: if osp.isabs(path): abspath = path - gitrelative_path = path[len(self.repo.working_tree_dir) + 1:] + gitrelative_path = path[len(str(self.repo.working_tree_dir)) + 1:] else: gitrelative_path = path - abspath = osp.join(self.repo.working_tree_dir, gitrelative_path) + if self.repo.working_tree_dir: + abspath = osp.join(self.repo.working_tree_dir, gitrelative_path) # end obtain relative and absolute paths blob = Blob(self.repo, Blob.NULL_BIN_SHA, @@ -628,8 +662,9 @@ def _entries_for_paths(self, paths, path_rewriter, fprogress, entries): # END path handling return entries_added - def add(self, items, force=True, fprogress=lambda *args: None, path_rewriter=None, - write=True, write_extension_data=False): + def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], force: bool = True, + fprogress: Callable = lambda *args: None, path_rewriter: Callable = None, + write: bool = True, write_extension_data: bool = False) -> List[BaseIndexEntry]: """Add files from the working tree, specific blobs or BaseIndexEntries to the index. @@ -816,7 +851,8 @@ def _items_to_rela_paths(self, items): @post_clear_cache @default_index - def remove(self, items, working_tree=False, **kwargs): + def remove(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], working_tree: bool = False, + **kwargs: Any) -> List[str]: """Remove the given items from the index and optionally from the working tree as well. @@ -867,7 +903,8 @@ def remove(self, items, working_tree=False, **kwargs): @post_clear_cache @default_index - def move(self, items, skip_errors=False, **kwargs): + def move(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, Submodule]], skip_errors: bool = False, + **kwargs: Any) -> List[Tuple[str, str]]: """Rename/move the items, whereas the last item is considered the destination of the move operation. If the destination is a file, the first item ( of two ) must be a file as well. If the destination is a directory, it may be preceded @@ -929,9 +966,9 @@ def move(self, items, skip_errors=False, **kwargs): return out - def commit(self, message, parent_commits=None, head=True, author=None, - committer=None, author_date=None, commit_date=None, - skip_hooks=False): + def commit(self, message: str, parent_commits=None, head: bool = True, author: str = None, + committer: str = None, author_date: str = None, commit_date: str = None, + skip_hooks: bool = False) -> Commit: """Commit the current default index file, creating a commit object. For more information on the arguments, see tree.commit. @@ -955,33 +992,39 @@ def commit(self, message, parent_commits=None, head=True, author=None, run_commit_hook('post-commit', self) return rval - def _write_commit_editmsg(self, message): + def _write_commit_editmsg(self, message: str) -> None: with open(self._commit_editmsg_filepath(), "wb") as commit_editmsg_file: commit_editmsg_file.write(message.encode(defenc)) - def _remove_commit_editmsg(self): + def _remove_commit_editmsg(self) -> None: os.remove(self._commit_editmsg_filepath()) - def _read_commit_editmsg(self): + def _read_commit_editmsg(self) -> str: with open(self._commit_editmsg_filepath(), "rb") as commit_editmsg_file: return commit_editmsg_file.read().decode(defenc) - def _commit_editmsg_filepath(self): + def _commit_editmsg_filepath(self) -> str: return osp.join(self.repo.common_dir, "COMMIT_EDITMSG") - @classmethod - def _flush_stdin_and_wait(cls, proc, ignore_stdout=False): - proc.stdin.flush() - proc.stdin.close() - stdout = '' - if not ignore_stdout: + def _flush_stdin_and_wait(cls, proc: 'Popen[bytes]', ignore_stdout: bool = False) -> bytes: + stdin_IO = proc.stdin + if stdin_IO: + stdin_IO.flush() + stdin_IO.close() + + stdout = b'' + if not ignore_stdout and proc.stdout: stdout = proc.stdout.read() - proc.stdout.close() - proc.wait() + + if proc.stdout: + proc.stdout.close() + proc.wait() return stdout @default_index - def checkout(self, paths=None, force=False, fprogress=lambda *args: None, **kwargs): + def checkout(self, paths: Union[None, Iterable[PathLike]] = None, force: bool = False, + fprogress: Callable = lambda *args: None, **kwargs: Any + ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]: """Checkout the given paths or all files from the version known to the index into the working tree. @@ -1032,12 +1075,15 @@ def checkout(self, paths=None, force=False, fprogress=lambda *args: None, **kwar failed_reasons = [] unknown_lines = [] - def handle_stderr(proc, iter_checked_out_files): - stderr = proc.stderr.read() - if not stderr: - return + def handle_stderr(proc: 'Popen[bytes]', iter_checked_out_files: Iterable[PathLike]) -> None: + + stderr_IO = proc.stderr + if not stderr_IO: + return None # return early if stderr empty + else: + stderr_bytes = stderr_IO.read() # line contents: - stderr = stderr.decode(defenc) + stderr = stderr_bytes.decode(defenc) # git-checkout-index: this already exists endings = (' already exists', ' is not in the cache', ' does not exist at stage', ' is unmerged') for line in stderr.splitlines(): @@ -1101,7 +1147,7 @@ def handle_stderr(proc, iter_checked_out_files): proc = self.repo.git.checkout_index(args, **kwargs) # FIXME: Reading from GIL! make_exc = lambda: GitCommandError(("git-checkout-index",) + tuple(args), 128, proc.stderr.read()) - checked_out_files = [] + checked_out_files = [] # type: List[PathLike] for path in paths: co_path = to_native_path_linux(self._to_relative_path(path)) @@ -1111,11 +1157,11 @@ def handle_stderr(proc, iter_checked_out_files): try: self.entries[(co_path, 0)] except KeyError: - folder = co_path + folder = str(co_path) if not folder.endswith('/'): folder += '/' for entry in self.entries.values(): - if entry.path.startswith(folder): + if str(entry.path).startswith(folder): p = entry.path self._write_path_to_stdin(proc, p, p, make_exc, fprogress, read_from_stdout=False) @@ -1145,7 +1191,9 @@ def handle_stderr(proc, iter_checked_out_files): assert "Should not reach this point" @default_index - def reset(self, commit='HEAD', working_tree=False, paths=None, head=False, **kwargs): + def reset(self, commit: Union[Commit, Reference, str] = 'HEAD', working_tree: bool = False, + paths: Union[None, Iterable[PathLike]] = None, + head: bool = False, **kwargs: Any) -> 'IndexFile': """Reset the index to reflect the tree at the given commit. This will not adjust our HEAD reference as opposed to HEAD.reset by default. @@ -1213,10 +1261,12 @@ def reset(self, commit='HEAD', working_tree=False, paths=None, head=False, **kwa return self @default_index - def diff(self, other=diff.Diffable.Index, paths=None, create_patch=False, **kwargs): + def diff(self, other: Union[diff.Diffable.Index, 'IndexFile.Index', Treeish, None, object] = diff.Diffable.Index, + paths: Union[str, List[PathLike], Tuple[PathLike, ...]] = None, create_patch: bool = False, **kwargs: Any + ) -> diff.DiffIndex: """Diff this index against the working copy or a Tree or Commit object - For a documentation of the parameters and return values, see + For a documentation of the parameters and return values, see, Diffable.diff :note: @@ -1234,7 +1284,7 @@ def diff(self, other=diff.Diffable.Index, paths=None, create_patch=False, **kwar other = self.repo.rev_parse(other) # END object conversion - if isinstance(other, Object): + if isinstance(other, Object): # for Tree or Commit # invert the existing R flag cur_val = kwargs.get('R', False) kwargs['R'] = not cur_val diff --git a/git/index/fun.py b/git/index/fun.py index e92e8e381..f40928c33 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -1,6 +1,7 @@ # Contains standalone functions to accompany the index implementation and make it # more versatile # NOTE: Autodoc hates it if this is a docstring + from io import BytesIO import os from stat import ( @@ -47,6 +48,17 @@ unpack ) +# typing ----------------------------------------------------------------------------- + +from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast) + +from git.types import PathLike + +if TYPE_CHECKING: + from .base import IndexFile + +# ------------------------------------------------------------------------------------ + S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule CE_NAMEMASK_INV = ~CE_NAMEMASK @@ -55,12 +67,12 @@ 'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path') -def hook_path(name, git_dir): +def hook_path(name: str, git_dir: PathLike) -> str: """:return: path to the given named hook in the given git repository directory""" return osp.join(git_dir, 'hooks', name) -def run_commit_hook(name, index, *args): +def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None: """Run the commit hook of the given name. Silently ignores hooks that do not exist. :param name: name of hook, like 'pre-commit' :param index: IndexFile instance @@ -68,10 +80,10 @@ def run_commit_hook(name, index, *args): :raises HookExecutionError: """ hp = hook_path(name, index.repo.git_dir) if not os.access(hp, os.X_OK): - return + return None env = os.environ.copy() - env['GIT_INDEX_FILE'] = safe_decode(index.path) + env['GIT_INDEX_FILE'] = safe_decode(str(index.path)) env['GIT_EDITOR'] = ':' try: cmd = subprocess.Popen([hp] + list(args), @@ -84,11 +96,11 @@ def run_commit_hook(name, index, *args): except Exception as ex: raise HookExecutionError(hp, ex) from ex else: - stdout = [] - stderr = [] - handle_process_output(cmd, stdout.append, stderr.append, finalize_process) - stdout = ''.join(stdout) - stderr = ''.join(stderr) + stdout_list = [] # type: List[str] + stderr_list = [] # type: List[str] + handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process) + stdout = ''.join(stdout_list) + stderr = ''.join(stderr_list) if cmd.returncode != 0: stdout = force_text(stdout, defenc) stderr = force_text(stderr, defenc) @@ -106,7 +118,9 @@ def stat_mode_to_index_mode(mode): return S_IFREG | 0o644 | (mode & 0o111) # blobs with or without executable bit -def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1Writer): +def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes], + extension_data: Union[None, bytes] = None, + ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None: """Write the cache represented by entries to a stream :param entries: **sorted** list of entries @@ -119,10 +133,10 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 :param extension_data: any kind of data to write as a trailer, it must begin a 4 byte identifier, followed by its size ( 4 bytes )""" # wrap the stream into a compatible writer - stream = ShaStreamCls(stream) + stream_sha = ShaStreamCls(stream) - tell = stream.tell - write = stream.write + tell = stream_sha.tell + write = stream_sha.write # header version = 2 @@ -134,8 +148,8 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 beginoffset = tell() write(entry[4]) # ctime write(entry[5]) # mtime - path = entry[3] - path = force_bytes(path, encoding=defenc) + path_str = entry[3] # type: str + path = force_bytes(path_str, encoding=defenc) plen = len(path) & CE_NAMEMASK # path length assert plen == len(path), "Path %s too long to fit into index" % entry[3] flags = plen | (entry[2] & CE_NAMEMASK_INV) # clear possible previous values @@ -148,34 +162,38 @@ def write_cache(entries, stream, extension_data=None, ShaStreamCls=IndexFileSHA1 # write previously cached extensions data if extension_data is not None: - stream.write(extension_data) + stream_sha.write(extension_data) # write the sha over the content - stream.write_sha() + stream_sha.write_sha() -def read_header(stream): +def read_header(stream: IO[bytes]) -> Tuple[int, int]: """Return tuple(version_long, num_entries) from the given stream""" type_id = stream.read(4) if type_id != b"DIRC": raise AssertionError("Invalid index file header: %r" % type_id) - version, num_entries = unpack(">LL", stream.read(4 * 2)) + unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2))) + version, num_entries = unpacked # TODO: handle version 3: extended data, see read-cache.c assert version in (1, 2) return version, num_entries -def entry_key(*entry): +def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]: """:return: Key suitable to be used for the index.entries dictionary :param entry: One instance of type BaseIndexEntry or the path and the stage""" if len(entry) == 1: - return (entry[0].path, entry[0].stage) - return tuple(entry) + entry_first = cast(BaseIndexEntry, entry[0]) # type: BaseIndexEntry + return (entry_first.path, entry_first.stage) + else: + entry = cast(Tuple[PathLike, int], tuple(entry)) + return entry # END handle entry -def read_cache(stream): +def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]: """Read a cache file from the given stream :return: tuple(version, entries_dict, extension_data, content_sha) * version is the integer version number @@ -184,7 +202,7 @@ def read_cache(stream): * content_sha is a 20 byte sha on all cache file contents""" version, num_entries = read_header(stream) count = 0 - entries = {} + entries = {} # type: Dict[Tuple[PathLike, int], 'IndexEntry'] read = stream.read tell = stream.tell @@ -223,7 +241,8 @@ def read_cache(stream): return (version, entries, extension_data, content_sha) -def write_tree_from_cache(entries, odb, sl, si=0): +def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0 + ) -> Tuple[bytes, List[Tuple[str, int, str]]]: """Create a tree from the given sorted list of entries and put the respective trees into the given object database @@ -233,7 +252,7 @@ def write_tree_from_cache(entries, odb, sl, si=0): :param sl: slice indicating the range we should process on the entries list :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name""" - tree_items = [] + tree_items = [] # type: List[Tuple[Union[bytes, str], int, str]] tree_items_append = tree_items.append ci = sl.start end = sl.stop @@ -272,18 +291,19 @@ def write_tree_from_cache(entries, odb, sl, si=0): # finally create the tree sio = BytesIO() - tree_to_stream(tree_items, sio.write) + tree_to_stream(tree_items, sio.write) # converts bytes of each item[0] to str + tree_items_stringified = cast(List[Tuple[str, int, str]], tree_items) # type: List[Tuple[str, int, str]] sio.seek(0) istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) - return (istream.binsha, tree_items) + return (istream.binsha, tree_items_stringified) -def _tree_entry_to_baseindexentry(tree_entry, stage): +def _tree_entry_to_baseindexentry(tree_entry: Tuple[str, int, str], stage: int) -> BaseIndexEntry: return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) -def aggressive_tree_merge(odb, tree_shas): +def aggressive_tree_merge(odb, tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: """ :return: list of BaseIndexEntries representing the aggressive merge of the given trees. All valid entries are on stage 0, whereas the conflicting ones are left @@ -292,7 +312,7 @@ def aggressive_tree_merge(odb, tree_shas): :param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas If 1 or two, the entries will effectively correspond to the last given tree If 3 are given, a 3 way merge is performed""" - out = [] + out = [] # type: List[BaseIndexEntry] out_append = out.append # one and two way is the same for us, as we don't have to handle an existing diff --git a/git/index/typ.py b/git/index/typ.py index 2a7dd7990..bb1a03845 100644 --- a/git/index/typ.py +++ b/git/index/typ.py @@ -9,6 +9,17 @@ from git.objects import Blob +# typing ---------------------------------------------------------------------- + +from typing import (List, Sequence, TYPE_CHECKING, Tuple, cast) + +from git.types import PathLike + +if TYPE_CHECKING: + from git.repo import Repo + +# --------------------------------------------------------------------------------- + __all__ = ('BlobFilter', 'BaseIndexEntry', 'IndexEntry') #{ Invariants @@ -31,7 +42,7 @@ class BlobFilter(object): """ __slots__ = 'paths' - def __init__(self, paths): + def __init__(self, paths: Sequence[PathLike]) -> None: """ :param paths: tuple or list of paths which are either pointing to directories or @@ -39,7 +50,7 @@ def __init__(self, paths): """ self.paths = paths - def __call__(self, stage_blob): + def __call__(self, stage_blob: Blob) -> bool: path = stage_blob[1].path for p in self.paths: if path.startswith(p): @@ -57,29 +68,29 @@ class BaseIndexEntry(tuple): expecting a BaseIndexEntry can also handle full IndexEntries even if they use numeric indices for performance reasons. """ - def __str__(self): + def __str__(self) -> str: return "%o %s %i\t%s" % (self.mode, self.hexsha, self.stage, self.path) - def __repr__(self): + def __repr__(self) -> str: return "(%o, %s, %i, %s)" % (self.mode, self.hexsha, self.stage, self.path) @property - def mode(self): + def mode(self) -> int: """ File Mode, compatible to stat module constants """ return self[0] @property - def binsha(self): + def binsha(self) -> bytes: """binary sha of the blob """ return self[1] @property - def hexsha(self): + def hexsha(self) -> str: """hex version of our sha""" return b2a_hex(self[1]).decode('ascii') @property - def stage(self): + def stage(self) -> int: """Stage of the entry, either: * 0 = default stage @@ -92,21 +103,21 @@ def stage(self): return (self[2] & CE_STAGEMASK) >> CE_STAGESHIFT @property - def path(self): + def path(self) -> str: """:return: our path relative to the repository working tree root""" return self[3] @property - def flags(self): + def flags(self) -> List[str]: """:return: flags stored with this entry""" return self[2] @classmethod - def from_blob(cls, blob, stage=0): + def from_blob(cls, blob: Blob, stage: int = 0) -> 'BaseIndexEntry': """:return: Fully equipped BaseIndexEntry at the given stage""" return cls((blob.mode, blob.binsha, stage << CE_STAGESHIFT, blob.path)) - def to_blob(self, repo): + def to_blob(self, repo: 'Repo') -> Blob: """:return: Blob using the information of this index entry""" return Blob(repo, self.binsha, self.mode, self.path) @@ -120,40 +131,40 @@ class IndexEntry(BaseIndexEntry): See the properties for a mapping between names and tuple indices. """ @property - def ctime(self): + def ctime(self) -> Tuple[int, int]: """ :return: Tuple(int_time_seconds_since_epoch, int_nano_seconds) of the file's creation time""" - return unpack(">LL", self[4]) + return cast(Tuple[int, int], unpack(">LL", self[4])) @property - def mtime(self): + def mtime(self) -> Tuple[int, int]: """See ctime property, but returns modification time """ - return unpack(">LL", self[5]) + return cast(Tuple[int, int], unpack(">LL", self[5])) @property - def dev(self): + def dev(self) -> int: """ Device ID """ return self[6] @property - def inode(self): + def inode(self) -> int: """ Inode ID """ return self[7] @property - def uid(self): + def uid(self) -> int: """ User ID """ return self[8] @property - def gid(self): + def gid(self) -> int: """ Group ID """ return self[9] @property - def size(self): + def size(self) -> int: """:return: Uncompressed size of the blob """ return self[10] @@ -169,7 +180,7 @@ def from_base(cls, base): return IndexEntry((base.mode, base.binsha, base.flags, base.path, time, time, 0, 0, 0, 0, 0)) @classmethod - def from_blob(cls, blob, stage=0): + def from_blob(cls, blob: Blob, stage: int = 0) -> 'IndexEntry': """:return: Minimal entry resembling the given blob object""" time = pack(">LL", 0, 0) return IndexEntry((blob.mode, blob.binsha, stage << CE_STAGESHIFT, blob.path, diff --git a/git/index/util.py b/git/index/util.py index 02742a5df..471e9262f 100644 --- a/git/index/util.py +++ b/git/index/util.py @@ -9,6 +9,15 @@ import os.path as osp +# typing ---------------------------------------------------------------------- + +from typing import (Any, Callable) + +from git.types import PathLike + +# --------------------------------------------------------------------------------- + + __all__ = ('TemporaryFileSwap', 'post_clear_cache', 'default_index', 'git_working_dir') #{ Aliases @@ -24,16 +33,16 @@ class TemporaryFileSwap(object): and moving it back on to where on object deletion.""" __slots__ = ("file_path", "tmp_file_path") - def __init__(self, file_path): + def __init__(self, file_path: PathLike) -> None: self.file_path = file_path - self.tmp_file_path = self.file_path + tempfile.mktemp('', '', '') + self.tmp_file_path = str(self.file_path) + tempfile.mktemp('', '', '') # it may be that the source does not exist try: os.rename(self.file_path, self.tmp_file_path) except OSError: pass - def __del__(self): + def __del__(self) -> None: if osp.isfile(self.tmp_file_path): if is_win and osp.exists(self.file_path): os.remove(self.file_path) @@ -43,7 +52,7 @@ def __del__(self): #{ Decorators -def post_clear_cache(func): +def post_clear_cache(func: Callable[..., Any]) -> Callable[..., Any]: """Decorator for functions that alter the index using the git command. This would invalidate our possibly existing entries dictionary which is why it must be deleted to allow it to be lazily reread later. @@ -54,7 +63,7 @@ def post_clear_cache(func): """ @wraps(func) - def post_clear_cache_if_not_raised(self, *args, **kwargs): + def post_clear_cache_if_not_raised(self, *args: Any, **kwargs: Any) -> Any: rval = func(self, *args, **kwargs) self._delete_entries_cache() return rval @@ -63,13 +72,13 @@ def post_clear_cache_if_not_raised(self, *args, **kwargs): return post_clear_cache_if_not_raised -def default_index(func): +def default_index(func: Callable[..., Any]) -> Callable[..., Any]: """Decorator assuring the wrapped method may only run if we are the default repository index. This is as we rely on git commands that operate on that index only. """ @wraps(func) - def check_default_index(self, *args, **kwargs): + def check_default_index(self, *args: Any, **kwargs: Any) -> Any: if self._file_path != self._index_path(): raise AssertionError( "Cannot call %r on indices that do not represent the default git index" % func.__name__) @@ -79,12 +88,12 @@ def check_default_index(self, *args, **kwargs): return check_default_index -def git_working_dir(func): +def git_working_dir(func: Callable[..., Any]) -> Callable[..., None]: """Decorator which changes the current working dir to the one of the git repository in order to assure relative paths are handled correctly""" @wraps(func) - def set_git_working_dir(self, *args, **kwargs): + def set_git_working_dir(self, *args: Any, **kwargs: Any) -> None: cur_wd = os.getcwd() os.chdir(self.repo.working_tree_dir) try: diff --git a/git/refs/log.py b/git/refs/log.py index fcd2c23cf..363c3c5d5 100644 --- a/git/refs/log.py +++ b/git/refs/log.py @@ -97,8 +97,8 @@ def from_line(cls, line): " Got %s" % repr(line)) # END handle first split - oldhexsha = info[:40] - newhexsha = info[41:81] + oldhexsha = info[:40] # type: str + newhexsha = info[41:81] # type: str for hexsha in (oldhexsha, newhexsha): if not cls._re_hexsha_only.match(hexsha): raise ValueError("Invalid hexsha: %r" % (hexsha,)) diff --git a/git/repo/base.py b/git/repo/base.py index ce5f6bd09..e23ebb1ac 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -36,7 +36,7 @@ from git.types import TBD, PathLike, Lit_config_levels from typing import (Any, BinaryIO, Callable, Dict, - Iterator, List, Mapping, Optional, + Iterator, List, Mapping, Optional, Sequence, TextIO, Tuple, Type, Union, NamedTuple, cast, TYPE_CHECKING) @@ -80,8 +80,8 @@ class Repo(object): git = cast('Git', None) # Must exist, or __del__ will fail in case we raise on `__init__()` working_dir = None # type: Optional[PathLike] _working_tree_dir = None # type: Optional[PathLike] - git_dir = None # type: Optional[PathLike] - _common_dir = None # type: Optional[PathLike] + git_dir = "" # type: PathLike + _common_dir = "" # type: PathLike # precompiled regex re_whitespace = re.compile(r'\s+') @@ -208,7 +208,7 @@ def __init__(self, path: Optional[PathLike] = None, odbt: Type[GitCmdObjectDB] = common_dir = open(osp.join(self.git_dir, 'commondir'), 'rt').readlines()[0].strip() self._common_dir = osp.join(self.git_dir, common_dir) except OSError: - self._common_dir = None + self._common_dir = "" # adjust the wd in case we are actually bare - we didn't know that # in the first place @@ -536,7 +536,7 @@ def tree(self, rev: Union['Commit', 'Tree', None] = None) -> 'Tree': return self.head.commit.tree return self.rev_parse(str(rev) + "^{tree}") - def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, List[PathLike]] = '', + def iter_commits(self, rev: Optional[TBD] = None, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> Iterator[Commit]: """A list of Commit objects representing the history of a given ref/commit diff --git a/git/repo/fun.py b/git/repo/fun.py index 703940819..e96b62e0f 100644 --- a/git/repo/fun.py +++ b/git/repo/fun.py @@ -18,7 +18,7 @@ # Typing ---------------------------------------------------------------------- -from typing import AnyStr, Union, Optional, cast, TYPE_CHECKING +from typing import Union, Optional, cast, TYPE_CHECKING from git.types import PathLike if TYPE_CHECKING: from .base import Repo @@ -103,7 +103,7 @@ def find_submodule_git_dir(d: PathLike) -> Optional[PathLike]: return None -def short_to_long(odb: 'GitCmdObjectDB', hexsha: AnyStr) -> Optional[bytes]: +def short_to_long(odb: 'GitCmdObjectDB', hexsha: str) -> Optional[bytes]: """:return: long hexadecimal sha1 from the given less-than-40 byte hexsha or None if no candidate could be found. :param hexsha: hexsha with less than 40 byte""" diff --git a/git/util.py b/git/util.py index 220901a49..76aaee497 100644 --- a/git/util.py +++ b/git/util.py @@ -24,6 +24,7 @@ from typing import (Any, AnyStr, BinaryIO, Callable, Dict, Generator, IO, Iterator, List, Optional, Pattern, Sequence, Tuple, Union, cast, TYPE_CHECKING, overload) +import pathlib if TYPE_CHECKING: from git.remote import Remote @@ -376,10 +377,13 @@ def expand_path(p: None, expand_vars: bool = ...) -> None: @overload def expand_path(p: PathLike, expand_vars: bool = ...) -> str: + # improve these overloads when 3.5 dropped ... -def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[str]: +def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: + if isinstance(p, pathlib.Path): + return p.resolve() try: p = osp.expanduser(p) # type: ignore if expand_vars: