Skip to content

Added types to Index submodule #1244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 71 additions & 49 deletions git/index/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@
git_working_dir
)

# typing -----------------------------------------------------------------------------

from typing import Any, Callable, Dict, IO, Iterator, List, Sequence, TYPE_CHECKING, Tuple, Union

from git.types import PathLike, TBD

if TYPE_CHECKING:
from subprocess import Popen
from git.repo import Repo

StageType = int
Treeish = Union[Tree, Commit, bytes]


__all__ = ('IndexFile', 'CheckoutError')

Expand Down Expand Up @@ -93,7 +106,7 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
_VERSION = 2 # latest version we support
S_IFGITLINK = S_IFGITLINK # a submodule

def __init__(self, repo, file_path=None):
def __init__(self, repo: 'Repo', file_path: PathLike = None) -> None:
"""Initialize this Index instance, optionally from the given ``file_path``.
If no file_path is given, we will be created from the current index file.

Expand All @@ -102,9 +115,9 @@ def __init__(self, repo, file_path=None):
self.repo = repo
self.version = self._VERSION
self._extension_data = b''
self._file_path = file_path or self._index_path()
self._file_path = file_path or self._index_path() # type: PathLike

def _set_cache_(self, attr):
def _set_cache_(self, attr: str) -> None:
if attr == "entries":
# read the current index
# try memory map for speed
Expand All @@ -115,8 +128,8 @@ def _set_cache_(self, attr):
ok = True
except OSError:
# in new repositories, there may be no index, which means we are empty
self.entries = {}
return
self.entries = {} # type: Dict[Tuple[PathLike, StageType], IndexEntry]
return None
finally:
if not ok:
lfd.rollback()
Expand All @@ -133,15 +146,18 @@ def _set_cache_(self, attr):
else:
super(IndexFile, self)._set_cache_(attr)

def _index_path(self):
return join_path_native(self.repo.git_dir, "index")
def _index_path(self) -> PathLike:
if self.repo.git_dir:
return join_path_native(self.repo.git_dir, "index")
else:
raise GitCommandError("No git directory given to join index path")

@property
def path(self):
def path(self) -> PathLike:
""" :return: Path to the index file we are representing """
return self._file_path

def _delete_entries_cache(self):
def _delete_entries_cache(self) -> None:
"""Safely clear the entries cache so it can be recreated"""
try:
del(self.entries)
Expand All @@ -152,26 +168,26 @@ def _delete_entries_cache(self):

#{ Serializable Interface

def _deserialize(self, stream):
def _deserialize(self, stream: IO) -> 'IndexFile':
"""Initialize this instance with index values read from the given stream"""
self.version, self.entries, self._extension_data, _conten_sha = read_cache(stream)
return self

def _entries_sorted(self):
def _entries_sorted(self) -> List[TBD]:
""":return: list of entries, in a sorted fashion, first by path, then by stage"""
return sorted(self.entries.values(), key=lambda e: (e.path, e.stage))

def _serialize(self, stream, ignore_extension_data=False):
def _serialize(self, stream: IO, ignore_extension_data: bool = False) -> 'IndexFile':
entries = self._entries_sorted()
extension_data = self._extension_data
extension_data = self._extension_data # type: Union[None, bytes]
if ignore_extension_data:
extension_data = None
write_cache(entries, stream, extension_data)
return self

#} END serializable interface

def write(self, file_path=None, ignore_extension_data=False):
def write(self, file_path: Union[None, PathLike] = None, ignore_extension_data: bool = False) -> None:
"""Write the current state to our file path or to the given one

:param file_path:
Expand All @@ -191,7 +207,7 @@ def write(self, file_path=None, ignore_extension_data=False):
Alternatively, use IndexFile.write_tree() to handle this case
automatically

:return: self"""
:return: self # does it? or returns None?"""
# make sure we have our entries read before getting a write lock
# else it would be done when streaming. This can happen
# if one doesn't change the index, but writes it right away
Expand All @@ -215,7 +231,7 @@ def write(self, file_path=None, ignore_extension_data=False):

@post_clear_cache
@default_index
def merge_tree(self, rhs, base=None):
def merge_tree(self, rhs: Treeish, base: Union[None, Treeish] = None) -> 'IndexFile':
"""Merge the given rhs treeish into the current index, possibly taking
a common base treeish into account.

Expand All @@ -242,7 +258,7 @@ def merge_tree(self, rhs, base=None):
# -i : ignore working tree status
# --aggressive : handle more merge cases
# -m : do an actual merge
args = ["--aggressive", "-i", "-m"]
args = ["--aggressive", "-i", "-m"] # type: List[Union[Treeish, str]]
if base is not None:
args.append(base)
args.append(rhs)
Expand All @@ -251,7 +267,7 @@ def merge_tree(self, rhs, base=None):
return self

@classmethod
def new(cls, repo, *tree_sha):
def new(cls, repo: 'Repo', *tree_sha: bytes) -> 'IndexFile':
""" Merge the given treeish revisions into a new index which is returned.
This method behaves like git-read-tree --aggressive when doing the merge.

Expand All @@ -275,7 +291,7 @@ def new(cls, repo, *tree_sha):
return inst

@classmethod
def from_tree(cls, repo, *treeish, **kwargs):
def from_tree(cls, repo: 'Repo', *treeish: Treeish, **kwargs: Any) -> 'IndexFile':
"""Merge the given treeish revisions into a new index which is returned.
The original index will remain unaltered

Expand Down Expand Up @@ -312,7 +328,7 @@ def from_tree(cls, repo, *treeish, **kwargs):
if len(treeish) == 0 or len(treeish) > 3:
raise ValueError("Please specify between 1 and 3 treeish, got %i" % len(treeish))

arg_list = []
arg_list = [] # type: List[Union[Treeish, str]]
# ignore that working tree and index possibly are out of date
if len(treeish) > 1:
# drop unmerged entries when reading our index and merging
Expand All @@ -331,7 +347,8 @@ def from_tree(cls, repo, *treeish, **kwargs):
# as it considers existing entries. moving it essentially clears the index.
# Unfortunately there is no 'soft' way to do it.
# The TemporaryFileSwap assure the original file get put back
index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, 'index'))
if repo.git_dir:
index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, 'index'))
try:
repo.git.read_tree(*arg_list, **kwargs)
index = cls(repo, tmp_index)
Expand All @@ -346,18 +363,18 @@ def from_tree(cls, repo, *treeish, **kwargs):

# UTILITIES
@unbare_repo
def _iter_expand_paths(self, paths):
def _iter_expand_paths(self, paths: Sequence[PathLike]) -> Iterator[PathLike]:
"""Expand the directories in list of paths to the corresponding paths accordingly,

Note: git will add items multiple times even if a glob overlapped
with manually specified paths or if paths where specified multiple
times - we respect that and do not prune"""
def raise_exc(e):
raise e
r = self.repo.working_tree_dir
r = str(self.repo.working_tree_dir)
rs = r + os.sep
for path in paths:
abs_path = path
abs_path = str(path)
if not osp.isabs(abs_path):
abs_path = osp.join(r, path)
# END make absolute path
Expand All @@ -374,7 +391,7 @@ def raise_exc(e):
# end check symlink

# if the path is not already pointing to an existing file, resolve globs if possible
if not os.path.exists(path) and ('?' in path or '*' in path or '[' in path):
if not os.path.exists(abs_path) and ('?' in abs_path or '*' in abs_path or '[' in abs_path):
resolved_paths = glob.glob(abs_path)
# not abs_path in resolved_paths:
# a glob() resolving to the same path we are feeding it with
Expand All @@ -396,12 +413,12 @@ def raise_exc(e):
# END for each subdirectory
except OSError:
# was a file or something that could not be iterated
yield path.replace(rs, '')
yield abs_path.replace(rs, '')
# END path exception handling
# END for each path

def _write_path_to_stdin(self, proc, filepath, item, fmakeexc, fprogress,
read_from_stdout=True):
def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item, fmakeexc, fprogress,
read_from_stdout: bool = True) -> Union[None, str]:
"""Write path to proc.stdin and make sure it processes the item, including progress.

:return: stdout string
Expand All @@ -417,20 +434,24 @@ def _write_path_to_stdin(self, proc, filepath, item, fmakeexc, fprogress,
we will close stdin to break the pipe."""

fprogress(filepath, False, item)
rval = None
try:
proc.stdin.write(("%s\n" % filepath).encode(defenc))
except IOError as e:
# pipe broke, usually because some error happened
raise fmakeexc() from e
# END write exception handling
proc.stdin.flush()
if read_from_stdout:
rval = None # type: Union[None, str]

if proc.stdin is not None:
try:
proc.stdin.write(("%s\n" % filepath).encode(defenc))
except IOError as e:
# pipe broke, usually because some error happened
raise fmakeexc() from e
# END write exception handling
proc.stdin.flush()

if read_from_stdout and proc.stdout is not None:
rval = proc.stdout.readline().strip()
fprogress(filepath, True, item)
return rval

def iter_blobs(self, predicate=lambda t: True):
def iter_blobs(self, predicate: Callable[[Tuple[StageType, Blob]], bool] = lambda t: True
) -> Iterator[Tuple[StageType, Blob]]:
"""
:return: Iterator yielding tuples of Blob objects and stages, tuple(stage, Blob)

Expand All @@ -446,20 +467,21 @@ def iter_blobs(self, predicate=lambda t: True):
yield output
# END for each entry

def unmerged_blobs(self):
def unmerged_blobs(self) -> Dict[PathLike, List[Tuple[StageType, Blob]]]:
"""
:return:
Iterator yielding dict(path : list( tuple( stage, Blob, ...))), being
a dictionary associating a path in the index with a list containing
sorted stage/blob pairs
##### Does it return iterator? or just the Dict?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It really does look like it returns a dict, and the type system probably agrees.
In these cases it should be fine to fix the documentation, which rightfully is confused about types in a codebase that thus far didn't have any statically declared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i changed the docstring then


:note:
Blobs that have been removed in one side simply do not exist in the
given stage. I.e. a file removed on the 'other' branch whose entries
are at stage 3 will not have a stage 3 entry.
"""
is_unmerged_blob = lambda t: t[0] != 0
path_map = {}
path_map = {} # type: Dict[PathLike, List[Tuple[TBD, Blob]]]
for stage, blob in self.iter_blobs(is_unmerged_blob):
path_map.setdefault(blob.path, []).append((stage, blob))
# END for each unmerged blob
Expand All @@ -468,10 +490,10 @@ def unmerged_blobs(self):
return path_map

@classmethod
def entry_key(cls, *entry):
def entry_key(cls, *entry: Union[Tuple[BaseIndexEntry], Tuple[PathLike, StageType]]) -> Tuple[PathLike, StageType]:
return entry_key(*entry)

def resolve_blobs(self, iter_blobs):
def resolve_blobs(self, iter_blobs: Iterator[Blob]) -> 'IndexFile':
"""Resolve the blobs given in blob iterator. This will effectively remove the
index entries of the respective path at all non-null stages and add the given
blob as new stage null blob.
Expand All @@ -489,9 +511,9 @@ def resolve_blobs(self, iter_blobs):
for blob in iter_blobs:
stage_null_key = (blob.path, 0)
if stage_null_key in self.entries:
raise ValueError("Path %r already exists at stage 0" % blob.path)
raise ValueError("Path %r already exists at stage 0" % str(blob.path))
# END assert blob is not stage 0 already

# delete all possible stages
for stage in (1, 2, 3):
try:
Expand All @@ -506,7 +528,7 @@ def resolve_blobs(self, iter_blobs):

return self

def update(self):
def update(self) -> 'IndexFile':
"""Reread the contents of our index file, discarding all cached information
we might have.

Expand All @@ -517,7 +539,7 @@ def update(self):
# allows to lazily reread on demand
return self

def write_tree(self):
def write_tree(self) -> Tree:
"""Writes this index to a corresponding Tree object into the repository's
object database and return it.

Expand All @@ -542,22 +564,22 @@ def write_tree(self):
root_tree._cache = tree_items
return root_tree

def _process_diff_args(self, args):
def _process_diff_args(self, args: Any) -> List[Any]:
try:
args.pop(args.index(self))
except IndexError:
pass
# END remove self
return args

def _to_relative_path(self, path):
def _to_relative_path(self, path: PathLike) -> PathLike:
""":return: Version of path relative to our git directory or raise ValueError
if it is not within our git direcotory"""
if not osp.isabs(path):
return path
if self.repo.bare:
raise InvalidGitRepositoryError("require non-bare repository")
if not path.startswith(self.repo.working_tree_dir):
if not str(path).startswith(str(self.repo.working_tree_dir)):
raise ValueError("Absolute path %r is not in git repository at %r" % (path, self.repo.working_tree_dir))
return os.path.relpath(path, self.repo.working_tree_dir)

Expand Down
15 changes: 10 additions & 5 deletions git/index/fun.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Contains standalone functions to accompany the index implementation and make it
# more versatile
# NOTE: Autodoc hates it if this is a docstring
from git.types import PathLike
from io import BytesIO
import os
from stat import (
Expand All @@ -12,6 +13,7 @@
S_IFREG,
)
import subprocess
from typing import List, Tuple, cast

from git.cmd import PROC_CREATIONFLAGS, handle_process_output
from git.compat import (
Expand Down Expand Up @@ -166,12 +168,15 @@ def read_header(stream):
return version, num_entries


def entry_key(*entry):
def entry_key(*entry) -> Tuple[PathLike, int]:
""":return: Key suitable to be used for the index.entries dictionary
:param entry: One instance of type BaseIndexEntry or the path and the stage"""
if len(entry) == 1:
return (entry[0].path, entry[0].stage)
return tuple(entry)
entry_first = cast(BaseIndexEntry, entry[0]) # type: BaseIndexEntry
return (entry_first.path, entry_first.stage)
else:
entry = cast(Tuple[PathLike, int], tuple(entry))
return entry
# END handle entry


Expand Down Expand Up @@ -283,7 +288,7 @@ def _tree_entry_to_baseindexentry(tree_entry, stage):
return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))


def aggressive_tree_merge(odb, tree_shas):
def aggressive_tree_merge(odb, tree_shas) -> List[BaseIndexEntry]:
"""
:return: list of BaseIndexEntries representing the aggressive merge of the given
trees. All valid entries are on stage 0, whereas the conflicting ones are left
Expand All @@ -292,7 +297,7 @@ def aggressive_tree_merge(odb, tree_shas):
:param tree_shas: 1, 2 or 3 trees as identified by their binary 20 byte shas
If 1 or two, the entries will effectively correspond to the last given tree
If 3 are given, a 3 way merge is performed"""
out = []
out = [] # type: List[BaseIndexEntry]
out_append = out.append

# one and two way is the same for us, as we don't have to handle an existing
Expand Down
Loading