From ee2704b463cc7592df2be295150824260e83e491 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 11:45:14 +0100 Subject: [PATCH 1/8] Update util.py --- git/objects/util.py | 634 +++++++------------------------------------- 1 file changed, 94 insertions(+), 540 deletions(-) diff --git a/git/objects/util.py b/git/objects/util.py index ef1ae77ba..4f8af5531 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -1,557 +1,111 @@ -# util.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -"""Module for general utility functions""" - -from abc import abstractmethod -import warnings -from git.util import ( - IterableList, - IterableObj, - Actor -) - -import re -from collections import deque - -from string import digits -import time -import calendar -from datetime import datetime, timedelta, tzinfo - -# typing ------------------------------------------------------------ -from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, - TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) - -from git.types import Has_id_attribute, Literal, Protocol, runtime_checkable +"""Module containing index utilities""" +from functools import wraps +import os +import struct +import tempfile + +from git.compat import is_win + +import os.path as osp + + +# typing ---------------------------------------------------------------------- + +from typing import (Any, Callable, TYPE_CHECKING) + +from git.types import PathLike, _T if TYPE_CHECKING: - from io import BytesIO, StringIO - from .commit import Commit - from .blob import Blob - from .tag import TagObject - from .tree import Tree, TraversedTreeTup - from subprocess import Popen - from .submodule.base import Submodule + from git.index import IndexFile +# --------------------------------------------------------------------------------- -class TraverseNT(NamedTuple): - depth: int - item: Union['Traversable', 'Blob'] - src: Union['Traversable', None] +__all__ = ('TemporaryFileSwap', 'post_clear_cache', 'default_index', 'git_working_dir') -T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() +#{ Aliases +pack = struct.pack +unpack = struct.unpack -TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule - 'TraversedTreeTup'] # for tree.traverse() -# -------------------------------------------------------------------- +#} END aliases -__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', - 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', - 'verify_utctz', 'Actor', 'tzoffset', 'utc') +class TemporaryFileSwap(object): -ZERO = timedelta(0) + """Utility class moving a file to a temporary location within the same directory + and moving it back on to where on object deletion.""" + __slots__ = ("file_path", "tmp_file_path") -#{ Functions + def __init__(self, file_path: PathLike) -> None: + self.file_path = file_path + self.tmp_file_path = str(self.file_path) + tempfile.mktemp('', '', '') + # it may be that the source does not exist + try: + os.rename(self.file_path, self.tmp_file_path) + except OSError: + pass + def __del__(self) -> None: + if osp.isfile(self.tmp_file_path): + if is_win and osp.exists(self.file_path): + os.remove(self.file_path) + os.rename(self.tmp_file_path, self.file_path) + # END temp file exists -def mode_str_to_int(modestr: Union[bytes, str]) -> int: - """ - :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used - :return: - String identifying a mode compatible to the mode methods ids of the - stat module regarding the rwx permissions for user, group and other, - special flags and file system flags, i.e. whether it is a symlink - for example.""" - mode = 0 - for iteration, char in enumerate(reversed(modestr[-6:])): - char = cast(Union[str, int], char) - mode += int(char) << iteration * 3 - # END for each char - return mode - - -def get_object_type_by_name(object_type_name: bytes - ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: - """ - :return: type suitable to handle the given object type name. - Use the type to create new instances. - - :param object_type_name: Member of TYPES - - :raise ValueError: In case object_type_name is unknown""" - if object_type_name == b"commit": - from . import commit - return commit.Commit - elif object_type_name == b"tag": - from . import tag - return tag.TagObject - elif object_type_name == b"blob": - from . import blob - return blob.Blob - elif object_type_name == b"tree": - from . import tree - return tree.Tree - else: - raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) - - -def utctz_to_altz(utctz: str) -> int: - """we convert utctz to the timezone in seconds, it is the format time.altzone - returns. Git stores it as UTC timezone which has the opposite sign as well, - which explains the -1 * ( that was made explicit here ) - :param utctz: git utc timezone string, i.e. +0200""" - return -1 * int(float(utctz) / 100 * 3600) - - -def altz_to_utctz_str(altz: float) -> str: - """As above, but inverses the operation, returning a string that can be used - in commit objects""" - utci = -1 * int((float(altz) / 3600) * 100) - utcs = str(abs(utci)) - utcs = "0" * (4 - len(utcs)) + utcs - prefix = (utci < 0 and '-') or '+' - return prefix + utcs - - -def verify_utctz(offset: str) -> str: - """:raise ValueError: if offset is incorrect - :return: offset""" - fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) - if len(offset) != 5: - raise fmt_exc - if offset[0] not in "+-": - raise fmt_exc - if offset[1] not in digits or\ - offset[2] not in digits or\ - offset[3] not in digits or\ - offset[4] not in digits: - raise fmt_exc - # END for each char - return offset - - -class tzoffset(tzinfo): - - def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: - self._offset = timedelta(seconds=-secs_west_of_utc) - self._name = name or 'fixed' - - def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: - return tzoffset, (-self._offset.total_seconds(), self._name) - - def utcoffset(self, dt) -> timedelta: - return self._offset - - def tzname(self, dt) -> str: - return self._name - - def dst(self, dt) -> timedelta: - return ZERO - - -utc = tzoffset(0, 'UTC') - - -def from_timestamp(timestamp, tz_offset: float) -> datetime: - """Converts a timestamp + tz_offset into an aware datetime instance.""" - utc_dt = datetime.fromtimestamp(timestamp, utc) - try: - local_dt = utc_dt.astimezone(tzoffset(tz_offset)) - return local_dt - except ValueError: - return utc_dt - - -def parse_date(string_date: str) -> Tuple[int, int]: - """ - Parse the given date as one of the following - * aware datetime instance - * Git internal format: timestamp offset - * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200. - * ISO 8601 2005-04-07T22:13:13 - The T can be a space as well +#{ Decorators - :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch - :raise ValueError: If the format could not be understood - :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. - """ - if isinstance(string_date, datetime) and string_date.tzinfo: - offset = -int(string_date.utcoffset().total_seconds()) - return int(string_date.astimezone(utc).timestamp()), offset - - # git time - try: - if string_date.count(' ') == 1 and string_date.rfind(':') == -1: - timestamp, offset_str = string_date.split() - if timestamp.startswith('@'): - timestamp = timestamp[1:] - timestamp_int = int(timestamp) - return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) - else: - offset_str = "+0000" # local time by default - if string_date[-5] in '-+': - offset_str = verify_utctz(string_date[-5:]) - string_date = string_date[:-6] # skip space as well - # END split timezone info - offset = utctz_to_altz(offset_str) - - # now figure out the date and time portion - split time - date_formats = [] - splitter = -1 - if ',' in string_date: - date_formats.append("%a, %d %b %Y") - splitter = string_date.rfind(' ') - else: - # iso plus additional - date_formats.append("%Y-%m-%d") - date_formats.append("%Y.%m.%d") - date_formats.append("%m/%d/%Y") - date_formats.append("%d.%m.%Y") - - splitter = string_date.rfind('T') - if splitter == -1: - splitter = string_date.rfind(' ') - # END handle 'T' and ' ' - # END handle rfc or iso - - assert splitter > -1 - - # split date and time - time_part = string_date[splitter + 1:] # skip space - date_part = string_date[:splitter] - - # parse time - tstruct = time.strptime(time_part, "%H:%M:%S") - - for fmt in date_formats: - try: - dtstruct = time.strptime(date_part, fmt) - utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, - tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, - dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) - return int(utctime), offset - except ValueError: - continue - # END exception handling - # END for each fmt - - # still here ? fail - raise ValueError("no format matched") - # END handle format - except Exception as e: - raise ValueError("Unsupported date format: %s" % string_date) from e - # END handle exceptions - - -# precompiled regex -_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') -_re_only_actor = re.compile(r'^.+? (.*)$') - - -def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: - """Parse out the actor (author or committer) info from a line like:: - - author Tom Preston-Werner 1191999972 -0700 - - :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" - actor, epoch, offset = '', '0', '0' - m = _re_actor_epoch.search(line) - if m: - actor, epoch, offset = m.groups() - else: - m = _re_only_actor.search(line) - actor = m.group(1) if m else line or '' - return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) - -#} END functions - - -#{ Classes - -class ProcessStreamAdapter(object): - - """Class wireing all calls to the contained Process instance. - - Use this type to hide the underlying process to provide access only to a specified - stream. The process is usually wrapped into an AutoInterrupt class to kill - it if the instance goes out of scope.""" - __slots__ = ("_proc", "_stream") - - def __init__(self, process: 'Popen', stream_name: str) -> None: - self._proc = process - self._stream: StringIO = getattr(process, stream_name) # guessed type - - def __getattr__(self, attr: str) -> Any: - return getattr(self._stream, attr) - - -@runtime_checkable -class Traversable(Protocol): - - """Simple interface to perform depth-first or breadth-first traversals - into one direction. - Subclasses only need to implement one function. - Instances of the Subclass must be hashable +def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: + """Decorator for functions that alter the index using the git command. This would + invalidate our possibly existing entries dictionary which is why it must be + deleted to allow it to be lazily reread later. - Defined subclasses = [Commit, Tree, SubModule] + :note: + This decorator will not be required once all functions are implemented + natively which in fact is possible, but probably not feasible performance wise. """ - __slots__ = () - - @classmethod - @abstractmethod - def _get_intermediate_items(cls, item) -> Sequence['Traversable']: - """ - Returns: - Tuple of items connected to the given item. - Must be implemented in subclass - - class Commit:: (cls, Commit) -> Tuple[Commit, ...] - class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] - class Tree:: (cls, Tree) -> Tuple[Tree, ...] - """ - raise NotImplementedError("To be implemented in subclass") - - @abstractmethod - def list_traverse(self, *args: Any, **kwargs: Any) -> Any: - """ """ - warnings.warn("list_traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) - return self._list_traverse(*args, **kwargs) - - def _list_traverse(self, as_edge=False, *args: Any, **kwargs: Any - ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: - """ - :return: IterableList with the results of the traversal as produced by - traverse() - Commit -> IterableList['Commit'] - Submodule -> IterableList['Submodule'] - Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] - """ - # Commit and Submodule have id.__attribute__ as IterableObj - # Tree has id.__attribute__ inherited from IndexObject - if isinstance(self, (TraversableIterableObj, Has_id_attribute)): - id = self._id_attribute_ - else: - id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ - # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? - - if not as_edge: - out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) - out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) # type: ignore - return out - # overloads in subclasses (mypy does't allow typing self: subclass) - # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] - else: - # Raise deprecationwarning, doesn't make sense to use this - out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) - return out_list - - @ abstractmethod - def traverse(self, *args: Any, **kwargs: Any) -> Any: - """ """ - warnings.warn("traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) - return self._traverse(*args, **kwargs) - - def _traverse(self, - predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, - prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[Union['Traversable', 'Blob']], - Iterator[TraversedTup]]: - """:return: iterator yielding of items found when traversing self - :param predicate: f(i,d) returns False if item i at depth d should not be included in the result - - :param prune: - f(i,d) return True if the search should stop at item i at depth d. - Item i will not be returned. - - :param depth: - define at which level the iteration should not go deeper - if -1, there is no limit - if 0, you would effectively only get self, the root of the iteration - i.e. if 1, you would only get the first level of predecessors/successors - - :param branch_first: - if True, items will be returned branch first, otherwise depth first - - :param visit_once: - if True, items will only be returned once, although they might be encountered - several times. Loops are prevented that way. - - :param ignore_self: - if True, self will be ignored and automatically pruned from - the result. Otherwise it will be the first item to be returned. - If as_edge is True, the source of the first edge is None - - :param as_edge: - if True, return a pair of items, first being the source, second the - destination, i.e. tuple(src, dest) with the edge spanning from - source to destination""" - - """ - Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] - Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] - Tree -> Iterator[Union[Blob, Tree, Submodule, - Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] - - ignore_self=True is_edge=True -> Iterator[item] - ignore_self=True is_edge=False --> Iterator[item] - ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] - ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" - - visited = set() - stack: Deque[TraverseNT] = deque() - stack.append(TraverseNT(0, self, None)) # self is always depth level 0 - - def addToStack(stack: Deque[TraverseNT], - src_item: 'Traversable', - branch_first: bool, - depth: int) -> None: - lst = self._get_intermediate_items(item) - if not lst: # empty list - return None - if branch_first: - stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) - else: - reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) - stack.extend(reviter) - # END addToStack local method - - while stack: - d, item, src = stack.pop() # depth of item, item, item_source - - if visit_once and item in visited: - continue - - if visit_once: - visited.add(item) - - rval: Union[TraversedTup, 'Traversable', 'Blob'] - if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) - rval = (src, item) - else: - rval = item - - if prune(rval, d): - continue - - skipStartItem = ignore_self and (item is self) - if not skipStartItem and predicate(rval, d): - yield rval - - # only continue to next level if this is appropriate ! - nd = d + 1 - if depth > -1 and nd > depth: - continue - - addToStack(stack, item, branch_first, nd) - # END for each item on work stack - - -@ runtime_checkable -class Serializable(Protocol): - - """Defines methods to serialize and deserialize objects from and into a data stream""" - __slots__ = () - - # @abstractmethod - def _serialize(self, stream: 'BytesIO') -> 'Serializable': - """Serialize the data of this object into the given data stream - :note: a serialized object would ``_deserialize`` into the same object - :param stream: a file-like object - :return: self""" - raise NotImplementedError("To be implemented in subclass") - - # @abstractmethod - def _deserialize(self, stream: 'BytesIO') -> 'Serializable': - """Deserialize all information regarding this object from the stream - :param stream: a file-like object - :return: self""" - raise NotImplementedError("To be implemented in subclass") - - -class TraversableIterableObj(IterableObj, Traversable): - __slots__ = () - - TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] - - def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: - return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) - - @ overload # type: ignore - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[False], - ) -> Iterator[T_TIobj]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[False], - as_edge: Literal[True], - ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: - ... - - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[True], - ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: - ... - - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: True, - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[T_TIobj], - Iterator[Tuple[T_TIobj, T_TIobj]], - Iterator[TIobj_tuple]]: - """For documentation, see util.Traversable._traverse()""" - - """ - # To typecheck instead of using cast. - import itertools - from git.types import TypeGuard - def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: - for x in inp[1]: - if not isinstance(x, tuple) and len(x) != 2: - if all(isinstance(inner, Commit) for inner in x): - continue - return True - - ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) - ret_tup = itertools.tee(ret, 2) - assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" - return ret_tup[0] - """ - return cast(Union[Iterator[T_TIobj], - Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], - super(TraversableIterableObj, self)._traverse( - predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore - )) + + @wraps(func) + def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: + rval = func(self, *args, **kwargs) + self._delete_entries_cache() + return rval + # END wrapper method + + return post_clear_cache_if_not_raised + + +def default_index(func: Callable[..., _T]) -> Callable[..., _T]: + """Decorator assuring the wrapped method may only run if we are the default + repository index. This is as we rely on git commands that operate + on that index only. """ + + @wraps(func) + def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: + if self._file_path != self._index_path(): + raise AssertionError( + "Cannot call %r on indices that do not represent the default git index" % func.__name__) + return func(self, *args, **kwargs) + # END wrapper method + + return check_default_index + + +def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]: + """Decorator which changes the current working dir to the one of the git + repository in order to assure relative paths are handled correctly""" + + @wraps(func) + def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: + cur_wd = os.getcwd() + os.chdir(str(self.repo.working_tree_dir)) + try: + return func(self, *args, **kwargs) + finally: + os.chdir(cur_wd) + # END handle working dir + # END wrapper + + return set_git_working_dir + +#} END decorators From 9b9bfc2af1be03f5c006c2c79ec2d21e4f66f468 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 11:45:50 +0100 Subject: [PATCH 2/8] Update util.py --- git/objects/util.py | 639 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 545 insertions(+), 94 deletions(-) diff --git a/git/objects/util.py b/git/objects/util.py index 4f8af5531..db7807c26 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -1,111 +1,562 @@ -"""Module containing index utilities""" -from functools import wraps -import os -import struct -import tempfile - -from git.compat import is_win - -import os.path as osp - - -# typing ---------------------------------------------------------------------- - -from typing import (Any, Callable, TYPE_CHECKING) - -from git.types import PathLike, _T +# util.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php +"""Module for general utility functions""" + +from abc import abstractmethod +import warnings +from git.util import ( + IterableList, + IterableObj, + Actor +) + +import re +from collections import deque + +from string import digits +import time +import calendar +from datetime import datetime, timedelta, tzinfo + +# typing ------------------------------------------------------------ +from typing import (Any, Callable, Deque, Iterator, NamedTuple, overload, Sequence, + TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) + +from git.types import Has_id_attribute, Literal, Protocol, runtime_checkable if TYPE_CHECKING: - from git.index import IndexFile + from io import BytesIO, StringIO + from .commit import Commit + from .blob import Blob + from .tag import TagObject + from .tree import Tree, TraversedTreeTup + from subprocess import Popen + from .submodule.base import Submodule -# --------------------------------------------------------------------------------- +class TraverseNT(NamedTuple): + depth: int + item: Union['Traversable', 'Blob'] + src: Union['Traversable', None] -__all__ = ('TemporaryFileSwap', 'post_clear_cache', 'default_index', 'git_working_dir') -#{ Aliases -pack = struct.pack -unpack = struct.unpack +T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() +TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule + 'TraversedTreeTup'] # for tree.traverse() -#} END aliases +# -------------------------------------------------------------------- -class TemporaryFileSwap(object): +__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', + 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', + 'verify_utctz', 'Actor', 'tzoffset', 'utc') - """Utility class moving a file to a temporary location within the same directory - and moving it back on to where on object deletion.""" - __slots__ = ("file_path", "tmp_file_path") +ZERO = timedelta(0) - def __init__(self, file_path: PathLike) -> None: - self.file_path = file_path - self.tmp_file_path = str(self.file_path) + tempfile.mktemp('', '', '') - # it may be that the source does not exist - try: - os.rename(self.file_path, self.tmp_file_path) - except OSError: - pass +#{ Functions - def __del__(self) -> None: - if osp.isfile(self.tmp_file_path): - if is_win and osp.exists(self.file_path): - os.remove(self.file_path) - os.rename(self.tmp_file_path, self.file_path) - # END temp file exists - -#{ Decorators - -def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: - """Decorator for functions that alter the index using the git command. This would - invalidate our possibly existing entries dictionary which is why it must be - deleted to allow it to be lazily reread later. - - :note: - This decorator will not be required once all functions are implemented - natively which in fact is possible, but probably not feasible performance wise. +def mode_str_to_int(modestr: Union[bytes, str]) -> int: """ + :param modestr: string like 755 or 644 or 100644 - only the last 6 chars will be used + :return: + String identifying a mode compatible to the mode methods ids of the + stat module regarding the rwx permissions for user, group and other, + special flags and file system flags, i.e. whether it is a symlink + for example.""" + mode = 0 + for iteration, char in enumerate(reversed(modestr[-6:])): + char = cast(Union[str, int], char) + mode += int(char) << iteration * 3 + # END for each char + return mode + + +def get_object_type_by_name(object_type_name: bytes + ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: + """ + :return: type suitable to handle the given object type name. + Use the type to create new instances. + + :param object_type_name: Member of TYPES + + :raise ValueError: In case object_type_name is unknown""" + if object_type_name == b"commit": + from . import commit + return commit.Commit + elif object_type_name == b"tag": + from . import tag + return tag.TagObject + elif object_type_name == b"blob": + from . import blob + return blob.Blob + elif object_type_name == b"tree": + from . import tree + return tree.Tree + else: + raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) + + +def utctz_to_altz(utctz: str) -> int: + """we convert utctz to the timezone in seconds, it is the format time.altzone + returns. Git stores it as UTC timezone which has the opposite sign as well, + which explains the -1 * ( that was made explicit here ) + :param utctz: git utc timezone string, i.e. +0200""" + return -1 * int(float(utctz) / 100 * 3600) + + +def altz_to_utctz_str(altz: float) -> str: + """As above, but inverses the operation, returning a string that can be used + in commit objects""" + utci = -1 * int((float(altz) / 3600) * 100) + utcs = str(abs(utci)) + utcs = "0" * (4 - len(utcs)) + utcs + prefix = (utci < 0 and '-') or '+' + return prefix + utcs + + +def verify_utctz(offset: str) -> str: + """:raise ValueError: if offset is incorrect + :return: offset""" + fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) + if len(offset) != 5: + raise fmt_exc + if offset[0] not in "+-": + raise fmt_exc + if offset[1] not in digits or\ + offset[2] not in digits or\ + offset[3] not in digits or\ + offset[4] not in digits: + raise fmt_exc + # END for each char + return offset + + +class tzoffset(tzinfo): + + def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: + self._offset = timedelta(seconds=-secs_west_of_utc) + self._name = name or 'fixed' + + def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: + return tzoffset, (-self._offset.total_seconds(), self._name) + + def utcoffset(self, dt) -> timedelta: + return self._offset + + def tzname(self, dt) -> str: + return self._name + + def dst(self, dt) -> timedelta: + return ZERO + + +utc = tzoffset(0, 'UTC') + + +def from_timestamp(timestamp, tz_offset: float) -> datetime: + """Converts a timestamp + tz_offset into an aware datetime instance.""" + utc_dt = datetime.fromtimestamp(timestamp, utc) + try: + local_dt = utc_dt.astimezone(tzoffset(tz_offset)) + return local_dt + except ValueError: + return utc_dt + + +def parse_date(string_date: str) -> Tuple[int, int]: + """ + Parse the given date as one of the following - @wraps(func) - def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: - rval = func(self, *args, **kwargs) - self._delete_entries_cache() - return rval - # END wrapper method - - return post_clear_cache_if_not_raised - - -def default_index(func: Callable[..., _T]) -> Callable[..., _T]: - """Decorator assuring the wrapped method may only run if we are the default - repository index. This is as we rely on git commands that operate - on that index only. """ - - @wraps(func) - def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: - if self._file_path != self._index_path(): - raise AssertionError( - "Cannot call %r on indices that do not represent the default git index" % func.__name__) - return func(self, *args, **kwargs) - # END wrapper method - - return check_default_index - - -def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]: - """Decorator which changes the current working dir to the one of the git - repository in order to assure relative paths are handled correctly""" - - @wraps(func) - def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: - cur_wd = os.getcwd() - os.chdir(str(self.repo.working_tree_dir)) - try: - return func(self, *args, **kwargs) - finally: - os.chdir(cur_wd) - # END handle working dir - # END wrapper + * aware datetime instance + * Git internal format: timestamp offset + * RFC 2822: Thu, 07 Apr 2005 22:13:13 +0200. + * ISO 8601 2005-04-07T22:13:13 + The T can be a space as well - return set_git_working_dir + :return: Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch + :raise ValueError: If the format could not be understood + :note: Date can also be YYYY.MM.DD, MM/DD/YYYY and DD.MM.YYYY. + """ + if isinstance(string_date, datetime) and string_date.tzinfo: + offset = -int(string_date.utcoffset().total_seconds()) + return int(string_date.astimezone(utc).timestamp()), offset + + # git time + try: + if string_date.count(' ') == 1 and string_date.rfind(':') == -1: + timestamp, offset_str = string_date.split() + if timestamp.startswith('@'): + timestamp = timestamp[1:] + timestamp_int = int(timestamp) + return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) + else: + offset_str = "+0000" # local time by default + if string_date[-5] in '-+': + offset_str = verify_utctz(string_date[-5:]) + string_date = string_date[:-6] # skip space as well + # END split timezone info + offset = utctz_to_altz(offset_str) + + # now figure out the date and time portion - split time + date_formats = [] + splitter = -1 + if ',' in string_date: + date_formats.append("%a, %d %b %Y") + splitter = string_date.rfind(' ') + else: + # iso plus additional + date_formats.append("%Y-%m-%d") + date_formats.append("%Y.%m.%d") + date_formats.append("%m/%d/%Y") + date_formats.append("%d.%m.%Y") + + splitter = string_date.rfind('T') + if splitter == -1: + splitter = string_date.rfind(' ') + # END handle 'T' and ' ' + # END handle rfc or iso + + assert splitter > -1 + + # split date and time + time_part = string_date[splitter + 1:] # skip space + date_part = string_date[:splitter] + + # parse time + tstruct = time.strptime(time_part, "%H:%M:%S") + + for fmt in date_formats: + try: + dtstruct = time.strptime(date_part, fmt) + utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, + tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, + dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) + return int(utctime), offset + except ValueError: + continue + # END exception handling + # END for each fmt + + # still here ? fail + raise ValueError("no format matched") + # END handle format + except Exception as e: + raise ValueError("Unsupported date format: %s" % string_date) from e + # END handle exceptions + + +# precompiled regex +_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') +_re_only_actor = re.compile(r'^.+? (.*)$') + + +def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: + """Parse out the actor (author or committer) info from a line like:: + + author Tom Preston-Werner 1191999972 -0700 + + :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" + actor, epoch, offset = '', '0', '0' + m = _re_actor_epoch.search(line) + if m: + actor, epoch, offset = m.groups() + else: + m = _re_only_actor.search(line) + actor = m.group(1) if m else line or '' + return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) + +#} END functions + + +#{ Classes + +class ProcessStreamAdapter(object): + + """Class wireing all calls to the contained Process instance. + + Use this type to hide the underlying process to provide access only to a specified + stream. The process is usually wrapped into an AutoInterrupt class to kill + it if the instance goes out of scope.""" + __slots__ = ("_proc", "_stream") + + def __init__(self, process: 'Popen', stream_name: str) -> None: + self._proc = process + self._stream: StringIO = getattr(process, stream_name) # guessed type + + def __getattr__(self, attr: str) -> Any: + return getattr(self._stream, attr) + + +@runtime_checkable +class Traversable(Protocol): + + """Simple interface to perform depth-first or breadth-first traversals + into one direction. + Subclasses only need to implement one function. + Instances of the Subclass must be hashable -#} END decorators + Defined subclasses = [Commit, Tree, SubModule] + """ + __slots__ = () + + @classmethod + @abstractmethod + def _get_intermediate_items(cls, item) -> Sequence['Traversable']: + """ + Returns: + Tuple of items connected to the given item. + Must be implemented in subclass + + class Commit:: (cls, Commit) -> Tuple[Commit, ...] + class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] + class Tree:: (cls, Tree) -> Tuple[Tree, ...] + """ + raise NotImplementedError("To be implemented in subclass") + + @abstractmethod + def list_traverse(self, *args: Any, **kwargs: Any) -> Any: + """ """ + warnings.warn("list_traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2) + return self._list_traverse(*args, **kwargs) + + def _list_traverse(self, as_edge=False, *args: Any, **kwargs: Any + ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: + """ + :return: IterableList with the results of the traversal as produced by + traverse() + Commit -> IterableList['Commit'] + Submodule -> IterableList['Submodule'] + Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] + """ + # Commit and Submodule have id.__attribute__ as IterableObj + # Tree has id.__attribute__ inherited from IndexObject + if isinstance(self, (TraversableIterableObj, Has_id_attribute)): + id = self._id_attribute_ + else: + id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ + # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? + + if not as_edge: + out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) + out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) # type: ignore + return out + # overloads in subclasses (mypy does't allow typing self: subclass) + # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] + else: + # Raise deprecationwarning, doesn't make sense to use this + out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) + return out_list + + @ abstractmethod + def traverse(self, *args: Any, **kwargs: Any) -> Any: + """ """ + warnings.warn("traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2) + return self._traverse(*args, **kwargs) + + def _traverse(self, + predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, + prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[Union['Traversable', 'Blob']], + Iterator[TraversedTup]]: + """:return: iterator yielding of items found when traversing self + :param predicate: f(i,d) returns False if item i at depth d should not be included in the result + + :param prune: + f(i,d) return True if the search should stop at item i at depth d. + Item i will not be returned. + + :param depth: + define at which level the iteration should not go deeper + if -1, there is no limit + if 0, you would effectively only get self, the root of the iteration + i.e. if 1, you would only get the first level of predecessors/successors + + :param branch_first: + if True, items will be returned branch first, otherwise depth first + + :param visit_once: + if True, items will only be returned once, although they might be encountered + several times. Loops are prevented that way. + + :param ignore_self: + if True, self will be ignored and automatically pruned from + the result. Otherwise it will be the first item to be returned. + If as_edge is True, the source of the first edge is None + + :param as_edge: + if True, return a pair of items, first being the source, second the + destination, i.e. tuple(src, dest) with the edge spanning from + source to destination""" + + """ + Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] + Submodule -> Iterator[Submodule, Tuple[Submodule, Submodule]] + Tree -> Iterator[Union[Blob, Tree, Submodule, + Tuple[Union[Submodule, Tree], Union[Blob, Tree, Submodule]]] + + ignore_self=True is_edge=True -> Iterator[item] + ignore_self=True is_edge=False --> Iterator[item] + ignore_self=False is_edge=True -> Iterator[item] | Iterator[Tuple[src, item]] + ignore_self=False is_edge=False -> Iterator[Tuple[src, item]]""" + + visited = set() + stack: Deque[TraverseNT] = deque() + stack.append(TraverseNT(0, self, None)) # self is always depth level 0 + + def addToStack(stack: Deque[TraverseNT], + src_item: 'Traversable', + branch_first: bool, + depth: int) -> None: + lst = self._get_intermediate_items(item) + if not lst: # empty list + return None + if branch_first: + stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) + else: + reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) + stack.extend(reviter) + # END addToStack local method + + while stack: + d, item, src = stack.pop() # depth of item, item, item_source + + if visit_once and item in visited: + continue + + if visit_once: + visited.add(item) + + rval: Union[TraversedTup, 'Traversable', 'Blob'] + if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) + rval = (src, item) + else: + rval = item + + if prune(rval, d): + continue + + skipStartItem = ignore_self and (item is self) + if not skipStartItem and predicate(rval, d): + yield rval + + # only continue to next level if this is appropriate ! + nd = d + 1 + if depth > -1 and nd > depth: + continue + + addToStack(stack, item, branch_first, nd) + # END for each item on work stack + + +@ runtime_checkable +class Serializable(Protocol): + + """Defines methods to serialize and deserialize objects from and into a data stream""" + __slots__ = () + + # @abstractmethod + def _serialize(self, stream: 'BytesIO') -> 'Serializable': + """Serialize the data of this object into the given data stream + :note: a serialized object would ``_deserialize`` into the same object + :param stream: a file-like object + :return: self""" + raise NotImplementedError("To be implemented in subclass") + + # @abstractmethod + def _deserialize(self, stream: 'BytesIO') -> 'Serializable': + """Deserialize all information regarding this object from the stream + :param stream: a file-like object + :return: self""" + raise NotImplementedError("To be implemented in subclass") + + +class TraversableIterableObj(IterableObj, Traversable): + __slots__ = () + + TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] + + def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: + return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) + + @ overload # type: ignore + def traverse(self: T_TIobj + ) -> Iterator[T_TIobj]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[False], + ) -> Iterator[T_TIobj]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[False], + as_edge: Literal[True], + ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: + ... + + @ overload + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + depth: int, branch_first: bool, visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[True], + ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: + ... + + def traverse(self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], + bool] = lambda i, d: True, + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], + bool] = lambda i, d: False, + depth: int = -1, branch_first: bool = True, visit_once: bool = True, + ignore_self: int = 1, as_edge: bool = False + ) -> Union[Iterator[T_TIobj], + Iterator[Tuple[T_TIobj, T_TIobj]], + Iterator[TIobj_tuple]]: + """For documentation, see util.Traversable._traverse()""" + + """ + # To typecheck instead of using cast. + import itertools + from git.types import TypeGuard + def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: + for x in inp[1]: + if not isinstance(x, tuple) and len(x) != 2: + if all(isinstance(inner, Commit) for inner in x): + continue + return True + + ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) + ret_tup = itertools.tee(ret, 2) + assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" + return ret_tup[0] + """ + return cast(Union[Iterator[T_TIobj], + Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], + super(TraversableIterableObj, self)._traverse( + predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore + )) From b76b99184e8f0e16ba66a730846f3d61c72061fe Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:02:02 +0100 Subject: [PATCH 3/8] Update base.py --- git/repo/base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/git/repo/base.py b/git/repo/base.py index bb8ddf135..355f93999 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -422,14 +422,14 @@ def _to_full_tag_path(path): def create_head(self, path: PathLike, commit: str = 'HEAD', force: bool = False, logmsg: Optional[str] = None - ) -> Head: + ) -> 'Head': """Create a new head within the repository. For more documentation, please see the Head.create method. :return: newly created Head Reference""" return Head.create(self, path, commit, logmsg, force) - def delete_head(self, *heads: 'SymbolicReference', **kwargs: Any) -> None: + def delete_head(self, *heads: 'Head', **kwargs: Any) -> None: """Delete the given heads :param kwargs: Additional keyword arguments to be passed to git-branch""" @@ -788,10 +788,10 @@ def ignored(self, *paths: PathLike) -> List[PathLike]: return proc.replace("\\\\", "\\").replace('"', "").split("\n") @property - def active_branch(self) -> 'SymbolicReference': + def active_branch(self) -> Head: """The name of the currently active branch. - :return: Head to the active branch""" + # reveal_type(self.head.reference) # => Reference return self.head.reference def blame_incremental(self, rev: TBD, file: TBD, **kwargs: Any) -> Optional[Iterator['BlameEntry']]: From d9f140a529901b5e07cda3665494104f23a380be Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:06:52 +0100 Subject: [PATCH 4/8] Update test_refs.py --- test/test_refs.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test/test_refs.py b/test/test_refs.py index 1315f885f..ab760a6f5 100644 --- a/test/test_refs.py +++ b/test/test_refs.py @@ -125,11 +125,15 @@ def test_heads(self, rwrepo): gp_tracking_branch = rwrepo.create_head('gp_tracking#123') special_name_remote_ref = rwrepo.remotes[0].refs[special_name] # get correct type gp_tracking_branch.set_tracking_branch(special_name_remote_ref) - assert gp_tracking_branch.tracking_branch().path == special_name_remote_ref.path + TBranch = gp_tracking_branch.tracking_branch() + if TBranch is not None: + assert TBranch.path == special_name_remote_ref.path git_tracking_branch = rwrepo.create_head('git_tracking#123') rwrepo.git.branch('-u', special_name_remote_ref.name, git_tracking_branch.name) - assert git_tracking_branch.tracking_branch().name == special_name_remote_ref.name + TBranch = gp_tracking_branch.tracking_branch() + if TBranch is not None: + assert TBranch.name == special_name_remote_ref.name # END for each head # verify REFLOG gets altered @@ -453,7 +457,7 @@ def test_head_reset(self, rw_repo): self.assertRaises(OSError, SymbolicReference.create, rw_repo, symref_path, cur_head.reference.commit) # it works if the new ref points to the same reference - SymbolicReference.create(rw_repo, symref.path, symref.reference).path == symref.path # @NoEffect + assert SymbolicReference.create(rw_repo, symref.path, symref.reference).path == symref.path # @NoEffect SymbolicReference.delete(rw_repo, symref) # would raise if the symref wouldn't have been deletedpbl symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference) From 464848e3c5961a2840533c6de58cb3a5d253711b Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:08:02 +0100 Subject: [PATCH 5/8] Update config.py --- git/config.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/git/config.py b/git/config.py index c4b26ba63..ad02b4373 100644 --- a/git/config.py +++ b/git/config.py @@ -41,12 +41,13 @@ T_ConfigParser = TypeVar('T_ConfigParser', bound='GitConfigParser') -if sys.version_info[:2] < (3, 7): - from collections import OrderedDict - OrderedDict_OMD = OrderedDict +if sys.version_info[:3] < (3, 7, 2): + # typing.Ordereddict not added until py 3.7.2 + from collections import OrderedDict # type: ignore # until 3.6 dropped + OrderedDict_OMD = OrderedDict # type: ignore # until 3.6 dropped else: - from typing import OrderedDict - OrderedDict_OMD = OrderedDict[str, List[_T]] + from typing import OrderedDict # type: ignore # until 3.6 dropped + OrderedDict_OMD = OrderedDict[str, List[_T]] # type: ignore[assignment, misc] # ------------------------------------------------------------- From b833eebece8d0c6cb1c79bc06e8ff9f26b994bb6 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:10:21 +0100 Subject: [PATCH 6/8] Update tag.py --- git/refs/tag.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/git/refs/tag.py b/git/refs/tag.py index 281ce09ad..edfab33d8 100644 --- a/git/refs/tag.py +++ b/git/refs/tag.py @@ -4,13 +4,14 @@ # typing ------------------------------------------------------------------ -from typing import Any, Union, TYPE_CHECKING +from typing import Any, Type, Union, TYPE_CHECKING from git.types import Commit_ish, PathLike if TYPE_CHECKING: from git.repo import Repo from git.objects import Commit from git.objects import TagObject + from git.refs import SymbolicReference # ------------------------------------------------------------------------------ @@ -68,7 +69,8 @@ def object(self) -> Commit_ish: # type: ignore[override] return Reference._get_object(self) @classmethod - def create(cls, repo: 'Repo', path: PathLike, reference: Union[Commit_ish, str] = 'HEAD', + def create(cls: Type['TagReference'], repo: 'Repo', path: PathLike, + reference: Union[str, 'SymbolicReference'] = 'HEAD', logmsg: Union[str, None] = None, force: bool = False, **kwargs: Any) -> 'TagReference': """Create a new tag reference. @@ -78,7 +80,7 @@ def create(cls, repo: 'Repo', path: PathLike, reference: Union[Commit_ish, str] The prefix refs/tags is implied :param ref: - A reference to the object you want to tag. It can be a commit, tree or + A reference to the Object you want to tag. The Object can be a commit, tree or blob. :param logmsg: @@ -98,7 +100,9 @@ def create(cls, repo: 'Repo', path: PathLike, reference: Union[Commit_ish, str] Additional keyword arguments to be passed to git-tag :return: A new TagReference""" - args = (path, reference) + if 'ref' in kwargs and kwargs['ref']: + reference = kwargs['ref'] + if logmsg: kwargs['m'] = logmsg elif 'message' in kwargs and kwargs['message']: @@ -107,11 +111,13 @@ def create(cls, repo: 'Repo', path: PathLike, reference: Union[Commit_ish, str] if force: kwargs['f'] = True + args = (path, reference) + repo.git.tag(*args, **kwargs) return TagReference(repo, "%s/%s" % (cls._common_path_default, path)) @classmethod - def delete(cls, repo: 'Repo', *tags: 'TagReference') -> None: + def delete(cls, repo: 'Repo', *tags: 'TagReference') -> None: # type: ignore[override] """Delete the given existing tag or tags""" repo.git.tag("-d", *tags) From e2d5e0e42a7bb664560133d1c3efeb7b4686f7c7 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:15:02 +0100 Subject: [PATCH 7/8] Update head.py --- git/refs/head.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/git/refs/head.py b/git/refs/head.py index 338efce9f..260bf5e7e 100644 --- a/git/refs/head.py +++ b/git/refs/head.py @@ -1,4 +1,4 @@ -from git.config import SectionConstraint +from git.config import GitConfigParser, SectionConstraint from git.util import join_path from git.exc import GitCommandError @@ -142,7 +142,7 @@ def delete(cls, repo: 'Repo', *heads: 'Head', **kwargs: Any): flag = "-D" repo.git.branch(flag, *heads) - def set_tracking_branch(self, remote_reference: 'RemoteReference') -> 'Head': + def set_tracking_branch(self, remote_reference: Union['RemoteReference', None]) -> 'Head': """ Configure this branch to track the given remote reference. This will alter this branch's configuration accordingly. @@ -203,7 +203,7 @@ def rename(self, new_path: PathLike, force: bool = False) -> 'Head': self.path = "%s/%s" % (self._common_path_default, new_path) return self - def checkout(self, force: bool = False, **kwargs: Any): + def checkout(self, force: bool = False, **kwargs: Any) -> Union['HEAD', 'Head']: """Checkout this head by setting the HEAD to this reference, by updating the index to reflect the tree we point to and by updating the working tree to reflect the latest index. @@ -235,10 +235,11 @@ def checkout(self, force: bool = False, **kwargs: Any): self.repo.git.checkout(self, **kwargs) if self.repo.head.is_detached: return self.repo.head - return self.repo.active_branch + else: + return self.repo.active_branch #{ Configuration - def _config_parser(self, read_only: bool) -> SectionConstraint: + def _config_parser(self, read_only: bool) -> SectionConstraint[GitConfigParser]: if read_only: parser = self.repo.config_reader() else: @@ -247,13 +248,13 @@ def _config_parser(self, read_only: bool) -> SectionConstraint: return SectionConstraint(parser, 'branch "%s"' % self.name) - def config_reader(self) -> SectionConstraint: + def config_reader(self) -> SectionConstraint[GitConfigParser]: """ :return: A configuration parser instance constrained to only read this instance's values""" return self._config_parser(read_only=True) - def config_writer(self) -> SectionConstraint: + def config_writer(self) -> SectionConstraint[GitConfigParser]: """ :return: A configuration writer instance with read-and write access to options of this head""" From 995547aa9b2ca1f1d7795d91a916f83c5d1a96f9 Mon Sep 17 00:00:00 2001 From: Dominic Date: Sat, 31 Jul 2021 13:18:32 +0100 Subject: [PATCH 8/8] Update reference.py --- git/refs/reference.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/git/refs/reference.py b/git/refs/reference.py index 646622816..bc2c6e807 100644 --- a/git/refs/reference.py +++ b/git/refs/reference.py @@ -62,7 +62,9 @@ def __str__(self) -> str: #{ Interface - def set_object(self, object: Commit_ish, logmsg: Union[str, None] = None) -> 'Reference': # @ReservedAssignment + # @ReservedAssignment + def set_object(self, object: Union[Commit_ish, 'SymbolicReference'], logmsg: Union[str, None] = None + ) -> 'SymbolicReference': """Special version which checks if the head-log needs an update as well :return: self""" oldbinsha = None