| | |
| | |
| | |
| | |
| |
|
| | import sys |
| |
|
| | __all__ = [ |
| | "stream_copy", |
| | "join_path", |
| | "to_native_path_linux", |
| | "join_path_native", |
| | "Stats", |
| | "IndexFileSHA1Writer", |
| | "IterableObj", |
| | "IterableList", |
| | "BlockingLockFile", |
| | "LockFile", |
| | "Actor", |
| | "get_user_id", |
| | "assure_directory_exists", |
| | "RemoteProgress", |
| | "CallableRemoteProgress", |
| | "rmtree", |
| | "unbare_repo", |
| | "HIDE_WINDOWS_KNOWN_ERRORS", |
| | ] |
| |
|
| | if sys.platform == "win32": |
| | __all__.append("to_native_path_windows") |
| |
|
| | from abc import abstractmethod |
| | import contextlib |
| | from functools import wraps |
| | import getpass |
| | import logging |
| | import os |
| | import os.path as osp |
| | import pathlib |
| | import platform |
| | import re |
| | import shutil |
| | import stat |
| | import subprocess |
| | import time |
| | from urllib.parse import urlsplit, urlunsplit |
| | import warnings |
| |
|
| | |
| | |
| | |
| | |
| | |
| | from gitdb.util import ( |
| | LazyMixin, |
| | LockedFD, |
| | bin_to_hex, |
| | file_contents_ro, |
| | file_contents_ro_filepath, |
| | hex_to_bin, |
| | make_sha, |
| | to_bin_sha, |
| | to_hex_sha, |
| | ) |
| |
|
| | |
| |
|
| | from typing import ( |
| | Any, |
| | AnyStr, |
| | BinaryIO, |
| | Callable, |
| | Dict, |
| | Generator, |
| | IO, |
| | Iterator, |
| | List, |
| | Optional, |
| | Pattern, |
| | Sequence, |
| | Tuple, |
| | TYPE_CHECKING, |
| | TypeVar, |
| | Union, |
| | cast, |
| | overload, |
| | ) |
| |
|
| | if TYPE_CHECKING: |
| | from git.cmd import Git |
| | from git.config import GitConfigParser, SectionConstraint |
| | from git.remote import Remote |
| | from git.repo.base import Repo |
| |
|
| | from git.types import ( |
| | Files_TD, |
| | Has_id_attribute, |
| | HSH_TD, |
| | Literal, |
| | PathLike, |
| | Protocol, |
| | SupportsIndex, |
| | Total_TD, |
| | runtime_checkable, |
| | ) |
| |
|
| | |
| |
|
| | T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True) |
| | |
| |
|
| | _logger = logging.getLogger(__name__) |
| |
|
| |
|
| | def _read_env_flag(name: str, default: bool) -> bool: |
| | """Read a boolean flag from an environment variable. |
| | |
| | :return: |
| | The flag, or the `default` value if absent or ambiguous. |
| | """ |
| | try: |
| | value = os.environ[name] |
| | except KeyError: |
| | return default |
| |
|
| | _logger.warning( |
| | "The %s environment variable is deprecated. Its effect has never been documented and changes without warning.", |
| | name, |
| | ) |
| |
|
| | adjusted_value = value.strip().lower() |
| |
|
| | if adjusted_value in {"", "0", "false", "no"}: |
| | return False |
| | if adjusted_value in {"1", "true", "yes"}: |
| | return True |
| | _logger.warning("%s has unrecognized value %r, treating as %r.", name, value, default) |
| | return default |
| |
|
| |
|
| | def _read_win_env_flag(name: str, default: bool) -> bool: |
| | """Read a boolean flag from an environment variable on Windows. |
| | |
| | :return: |
| | On Windows, the flag, or the `default` value if absent or ambiguous. |
| | On all other operating systems, ``False``. |
| | |
| | :note: |
| | This only accesses the environment on Windows. |
| | """ |
| | return sys.platform == "win32" and _read_env_flag(name, default) |
| |
|
| |
|
| | |
| | |
| | |
| | HIDE_WINDOWS_KNOWN_ERRORS = _read_win_env_flag("HIDE_WINDOWS_KNOWN_ERRORS", True) |
| | HIDE_WINDOWS_FREEZE_ERRORS = _read_win_env_flag("HIDE_WINDOWS_FREEZE_ERRORS", True) |
| |
|
| | |
| |
|
| | T = TypeVar("T") |
| |
|
| |
|
| | def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: |
| | """Methods with this decorator raise :exc:`~git.exc.InvalidGitRepositoryError` if |
| | they encounter a bare repository.""" |
| |
|
| | from .exc import InvalidGitRepositoryError |
| |
|
| | @wraps(func) |
| | def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T: |
| | if self.repo.bare: |
| | raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) |
| | |
| | return func(self, *args, **kwargs) |
| |
|
| | |
| |
|
| | return wrapper |
| |
|
| |
|
| | @contextlib.contextmanager |
| | def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: |
| | """Context manager to temporarily change directory. |
| | |
| | This is similar to :func:`contextlib.chdir` introduced in Python 3.11, but the |
| | context manager object returned by a single call to this function is not reentrant. |
| | """ |
| | old_dir = os.getcwd() |
| | os.chdir(new_dir) |
| | try: |
| | yield new_dir |
| | finally: |
| | os.chdir(old_dir) |
| |
|
| |
|
| | @contextlib.contextmanager |
| | def patch_env(name: str, value: str) -> Generator[None, None, None]: |
| | """Context manager to temporarily patch an environment variable.""" |
| | old_value = os.getenv(name) |
| | os.environ[name] = value |
| | try: |
| | yield |
| | finally: |
| | if old_value is None: |
| | del os.environ[name] |
| | else: |
| | os.environ[name] = old_value |
| |
|
| |
|
| | def rmtree(path: PathLike) -> None: |
| | """Remove the given directory tree recursively. |
| | |
| | :note: |
| | We use :func:`shutil.rmtree` but adjust its behaviour to see whether files that |
| | couldn't be deleted are read-only. Windows will not remove them in that case. |
| | """ |
| |
|
| | def handler(function: Callable, path: PathLike, _excinfo: Any) -> None: |
| | """Callback for :func:`shutil.rmtree`. |
| | |
| | This works as either a ``onexc`` or ``onerror`` style callback. |
| | """ |
| | |
| | os.chmod(path, stat.S_IWUSR) |
| |
|
| | try: |
| | function(path) |
| | except PermissionError as ex: |
| | if HIDE_WINDOWS_KNOWN_ERRORS: |
| | from unittest import SkipTest |
| |
|
| | raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex |
| | raise |
| |
|
| | if sys.platform != "win32": |
| | shutil.rmtree(path) |
| | elif sys.version_info >= (3, 12): |
| | shutil.rmtree(path, onexc=handler) |
| | else: |
| | shutil.rmtree(path, onerror=handler) |
| |
|
| |
|
| | def rmfile(path: PathLike) -> None: |
| | """Ensure file deleted also on *Windows* where read-only files need special |
| | treatment.""" |
| | if osp.isfile(path): |
| | if sys.platform == "win32": |
| | os.chmod(path, 0o777) |
| | os.remove(path) |
| |
|
| |
|
| | def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: |
| | """Copy all data from the `source` stream into the `destination` stream in chunks |
| | of size `chunk_size`. |
| | |
| | :return: |
| | Number of bytes written |
| | """ |
| | br = 0 |
| | while True: |
| | chunk = source.read(chunk_size) |
| | destination.write(chunk) |
| | br += len(chunk) |
| | if len(chunk) < chunk_size: |
| | break |
| | |
| | return br |
| |
|
| |
|
| | def join_path(a: PathLike, *p: PathLike) -> PathLike: |
| | R"""Join path tokens together similar to osp.join, but always use ``/`` instead of |
| | possibly ``\`` on Windows.""" |
| | path = str(a) |
| | for b in p: |
| | b = str(b) |
| | if not b: |
| | continue |
| | if b.startswith("/"): |
| | path += b[1:] |
| | elif path == "" or path.endswith("/"): |
| | path += b |
| | else: |
| | path += "/" + b |
| | |
| | return path |
| |
|
| |
|
| | if sys.platform == "win32": |
| |
|
| | def to_native_path_windows(path: PathLike) -> PathLike: |
| | path = str(path) |
| | return path.replace("/", "\\") |
| |
|
| | def to_native_path_linux(path: PathLike) -> str: |
| | path = str(path) |
| | return path.replace("\\", "/") |
| |
|
| | to_native_path = to_native_path_windows |
| | else: |
| | |
| | def to_native_path_linux(path: PathLike) -> str: |
| | return str(path) |
| |
|
| | to_native_path = to_native_path_linux |
| |
|
| |
|
| | def join_path_native(a: PathLike, *p: PathLike) -> PathLike: |
| | R"""Like :func:`join_path`, but makes sure an OS native path is returned. |
| | |
| | This is only needed to play it safe on Windows and to ensure nice paths that only |
| | use ``\``. |
| | """ |
| | return to_native_path(join_path(a, *p)) |
| |
|
| |
|
| | def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: |
| | """Make sure that the directory pointed to by path exists. |
| | |
| | :param is_file: |
| | If ``True``, `path` is assumed to be a file and handled correctly. |
| | Otherwise it must be a directory. |
| | |
| | :return: |
| | ``True`` if the directory was created, ``False`` if it already existed. |
| | """ |
| | if is_file: |
| | path = osp.dirname(path) |
| | |
| | if not osp.isdir(path): |
| | os.makedirs(path, exist_ok=True) |
| | return True |
| | return False |
| |
|
| |
|
| | def _get_exe_extensions() -> Sequence[str]: |
| | PATHEXT = os.environ.get("PATHEXT", None) |
| | if PATHEXT: |
| | return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) |
| | elif sys.platform == "win32": |
| | return (".BAT", ".COM", ".EXE") |
| | else: |
| | return () |
| |
|
| |
|
| | def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: |
| | """Perform a path search to assist :func:`is_cygwin_git`. |
| | |
| | This is not robust for general use. It is an implementation detail of |
| | :func:`is_cygwin_git`. When a search following all shell rules is needed, |
| | :func:`shutil.which` can be used instead. |
| | |
| | :note: |
| | Neither this function nor :func:`shutil.which` will predict the effect of an |
| | executable search on a native Windows system due to a :class:`subprocess.Popen` |
| | call without ``shell=True``, because shell and non-shell executable search on |
| | Windows differ considerably. |
| | """ |
| | |
| | winprog_exts = _get_exe_extensions() |
| |
|
| | def is_exec(fpath: str) -> bool: |
| | return ( |
| | osp.isfile(fpath) |
| | and os.access(fpath, os.X_OK) |
| | and ( |
| | sys.platform != "win32" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts) |
| | ) |
| | ) |
| |
|
| | progs = [] |
| | if not path: |
| | path = os.environ["PATH"] |
| | for folder in str(path).split(os.pathsep): |
| | folder = folder.strip('"') |
| | if folder: |
| | exe_path = osp.join(folder, program) |
| | for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]: |
| | if is_exec(f): |
| | progs.append(f) |
| | return progs |
| |
|
| |
|
| | def _cygexpath(drive: Optional[str], path: str) -> str: |
| | if osp.isabs(path) and not drive: |
| | |
| | |
| | p = path |
| | else: |
| | p = path and osp.normpath(osp.expandvars(osp.expanduser(path))) |
| | if osp.isabs(p): |
| | if drive: |
| | |
| | p = path |
| | else: |
| | p = cygpath(p) |
| | elif drive: |
| | p = "/proc/cygdrive/%s/%s" % (drive.lower(), p) |
| | p_str = str(p) |
| | return p_str.replace("\\", "/") |
| |
|
| |
|
| | _cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = ( |
| | |
| | |
| | ( |
| | re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), |
| | (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))), |
| | False, |
| | ), |
| | (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False), |
| | (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False), |
| | (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True), |
| | (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), |
| | ) |
| |
|
| |
|
| | def cygpath(path: str) -> str: |
| | """Use :meth:`git.cmd.Git.polish_url` instead, that works on any environment.""" |
| | path = str(path) |
| | |
| | if not path.startswith(("/cygdrive", "//", "/proc/cygdrive")): |
| | for regex, parser, recurse in _cygpath_parsers: |
| | match = regex.match(path) |
| | if match: |
| | path = parser(*match.groups()) |
| | if recurse: |
| | path = cygpath(path) |
| | break |
| | else: |
| | path = _cygexpath(None, path) |
| |
|
| | return path |
| |
|
| |
|
| | _decygpath_regex = re.compile(r"(?:/proc)?/cygdrive/(\w)(/.*)?") |
| |
|
| |
|
| | def decygpath(path: PathLike) -> str: |
| | path = str(path) |
| | m = _decygpath_regex.match(path) |
| | if m: |
| | drive, rest_path = m.groups() |
| | path = "%s:%s" % (drive.upper(), rest_path or "") |
| |
|
| | return path.replace("/", "\\") |
| |
|
| |
|
| | |
| | |
| | _is_cygwin_cache: Dict[str, Optional[bool]] = {} |
| |
|
| |
|
| | def _is_cygwin_git(git_executable: str) -> bool: |
| | is_cygwin = _is_cygwin_cache.get(git_executable) |
| | if is_cygwin is None: |
| | is_cygwin = False |
| | try: |
| | git_dir = osp.dirname(git_executable) |
| | if not git_dir: |
| | res = py_where(git_executable) |
| | git_dir = osp.dirname(res[0]) if res else "" |
| |
|
| | |
| | uname_cmd = osp.join(git_dir, "uname") |
| | process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True) |
| | uname_out, _ = process.communicate() |
| | |
| | is_cygwin = "CYGWIN" in uname_out |
| | except Exception as ex: |
| | _logger.debug("Failed checking if running in CYGWIN due to: %r", ex) |
| | _is_cygwin_cache[git_executable] = is_cygwin |
| |
|
| | return is_cygwin |
| |
|
| |
|
| | @overload |
| | def is_cygwin_git(git_executable: None) -> Literal[False]: ... |
| |
|
| |
|
| | @overload |
| | def is_cygwin_git(git_executable: PathLike) -> bool: ... |
| |
|
| |
|
| | def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: |
| | if sys.platform == "win32": |
| | return False |
| | elif git_executable is None: |
| | return False |
| | else: |
| | return _is_cygwin_git(str(git_executable)) |
| |
|
| |
|
| | def get_user_id() -> str: |
| | """:return: String identifying the currently active system user as ``name@node``""" |
| | return "%s@%s" % (getpass.getuser(), platform.node()) |
| |
|
| |
|
| | def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None: |
| | """Wait for the process (clone, fetch, pull or push) and handle its errors |
| | accordingly.""" |
| | |
| | proc.wait(**kwargs) |
| |
|
| |
|
| | @overload |
| | def expand_path(p: None, expand_vars: bool = ...) -> None: ... |
| |
|
| |
|
| | @overload |
| | def expand_path(p: PathLike, expand_vars: bool = ...) -> str: |
| | |
| | ... |
| |
|
| |
|
| | def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: |
| | if isinstance(p, pathlib.Path): |
| | return p.resolve() |
| | try: |
| | p = osp.expanduser(p) |
| | if expand_vars: |
| | p = osp.expandvars(p) |
| | return osp.normpath(osp.abspath(p)) |
| | except Exception: |
| | return None |
| |
|
| |
|
| | def remove_password_if_present(cmdline: Sequence[str]) -> List[str]: |
| | """Parse any command line argument and if one of the elements is an URL with a |
| | username and/or password, replace them by stars (in-place). |
| | |
| | If nothing is found, this just returns the command line as-is. |
| | |
| | This should be used for every log line that print a command line, as well as |
| | exception messages. |
| | """ |
| | new_cmdline = [] |
| | for index, to_parse in enumerate(cmdline): |
| | new_cmdline.append(to_parse) |
| | try: |
| | url = urlsplit(to_parse) |
| | |
| | if url.password is None and url.username is None: |
| | continue |
| |
|
| | if url.password is not None: |
| | url = url._replace(netloc=url.netloc.replace(url.password, "*****")) |
| | if url.username is not None: |
| | url = url._replace(netloc=url.netloc.replace(url.username, "*****")) |
| | new_cmdline[index] = urlunsplit(url) |
| | except ValueError: |
| | |
| | continue |
| | return new_cmdline |
| |
|
| |
|
| | |
| |
|
| | |
| |
|
| |
|
| | class RemoteProgress: |
| | """Handler providing an interface to parse progress information emitted by |
| | :manpage:`git-push(1)` and :manpage:`git-fetch(1)` and to dispatch callbacks |
| | allowing subclasses to react to the progress.""" |
| |
|
| | _num_op_codes: int = 9 |
| | ( |
| | BEGIN, |
| | END, |
| | COUNTING, |
| | COMPRESSING, |
| | WRITING, |
| | RECEIVING, |
| | RESOLVING, |
| | FINDING_SOURCES, |
| | CHECKING_OUT, |
| | ) = [1 << x for x in range(_num_op_codes)] |
| | STAGE_MASK = BEGIN | END |
| | OP_MASK = ~STAGE_MASK |
| |
|
| | DONE_TOKEN = "done." |
| | TOKEN_SEPARATOR = ", " |
| |
|
| | __slots__ = ( |
| | "_cur_line", |
| | "_seen_ops", |
| | "error_lines", |
| | "other_lines", |
| | ) |
| | re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)") |
| | re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") |
| |
|
| | def __init__(self) -> None: |
| | self._seen_ops: List[int] = [] |
| | self._cur_line: Optional[str] = None |
| | self.error_lines: List[str] = [] |
| | self.other_lines: List[str] = [] |
| |
|
| | def _parse_progress_line(self, line: AnyStr) -> None: |
| | """Parse progress information from the given line as retrieved by |
| | :manpage:`git-push(1)` or :manpage:`git-fetch(1)`. |
| | |
| | - Lines that do not contain progress info are stored in :attr:`other_lines`. |
| | - Lines that seem to contain an error (i.e. start with ``error:`` or ``fatal:``) |
| | are stored in :attr:`error_lines`. |
| | """ |
| | |
| | |
| | |
| | |
| | |
| | if isinstance(line, bytes): |
| | line_str = line.decode("utf-8") |
| | else: |
| | line_str = line |
| | self._cur_line = line_str |
| |
|
| | if self._cur_line.startswith(("error:", "fatal:")): |
| | self.error_lines.append(self._cur_line) |
| | return |
| |
|
| | cur_count, max_count = None, None |
| | match = self.re_op_relative.match(line_str) |
| | if match is None: |
| | match = self.re_op_absolute.match(line_str) |
| |
|
| | if not match: |
| | self.line_dropped(line_str) |
| | self.other_lines.append(line_str) |
| | return |
| | |
| |
|
| | op_code = 0 |
| | _remote, op_name, _percent, cur_count, max_count, message = match.groups() |
| |
|
| | |
| | if op_name == "Counting objects": |
| | op_code |= self.COUNTING |
| | elif op_name == "Compressing objects": |
| | op_code |= self.COMPRESSING |
| | elif op_name == "Writing objects": |
| | op_code |= self.WRITING |
| | elif op_name == "Receiving objects": |
| | op_code |= self.RECEIVING |
| | elif op_name == "Resolving deltas": |
| | op_code |= self.RESOLVING |
| | elif op_name == "Finding sources": |
| | op_code |= self.FINDING_SOURCES |
| | elif op_name == "Checking out files": |
| | op_code |= self.CHECKING_OUT |
| | else: |
| | |
| | |
| | |
| | |
| | |
| | |
| | self.line_dropped(line_str) |
| | |
| | |
| | return |
| | |
| |
|
| | |
| | if op_code not in self._seen_ops: |
| | self._seen_ops.append(op_code) |
| | op_code |= self.BEGIN |
| | |
| |
|
| | if message is None: |
| | message = "" |
| | |
| |
|
| | message = message.strip() |
| | if message.endswith(self.DONE_TOKEN): |
| | op_code |= self.END |
| | message = message[: -len(self.DONE_TOKEN)] |
| | |
| | message = message.strip(self.TOKEN_SEPARATOR) |
| |
|
| | self.update( |
| | op_code, |
| | cur_count and float(cur_count), |
| | max_count and float(max_count), |
| | message, |
| | ) |
| |
|
| | def new_message_handler(self) -> Callable[[str], None]: |
| | """ |
| | :return: |
| | A progress handler suitable for :func:`~git.cmd.handle_process_output`, |
| | passing lines on to this progress handler in a suitable format. |
| | """ |
| |
|
| | def handler(line: AnyStr) -> None: |
| | return self._parse_progress_line(line.rstrip()) |
| |
|
| | |
| |
|
| | return handler |
| |
|
| | def line_dropped(self, line: str) -> None: |
| | """Called whenever a line could not be understood and was therefore dropped.""" |
| | pass |
| |
|
| | def update( |
| | self, |
| | op_code: int, |
| | cur_count: Union[str, float], |
| | max_count: Union[str, float, None] = None, |
| | message: str = "", |
| | ) -> None: |
| | """Called whenever the progress changes. |
| | |
| | :param op_code: |
| | Integer allowing to be compared against Operation IDs and stage IDs. |
| | |
| | Stage IDs are :const:`BEGIN` and :const:`END`. :const:`BEGIN` will only be |
| | set once for each Operation ID as well as :const:`END`. It may be that |
| | :const:`BEGIN` and :const:`END` are set at once in case only one progress |
| | message was emitted due to the speed of the operation. Between |
| | :const:`BEGIN` and :const:`END`, none of these flags will be set. |
| | |
| | Operation IDs are all held within the :const:`OP_MASK`. Only one Operation |
| | ID will be active per call. |
| | |
| | :param cur_count: |
| | Current absolute count of items. |
| | |
| | :param max_count: |
| | The maximum count of items we expect. It may be ``None`` in case there is no |
| | maximum number of items or if it is (yet) unknown. |
| | |
| | :param message: |
| | In case of the :const:`WRITING` operation, it contains the amount of bytes |
| | transferred. It may possibly be used for other purposes as well. |
| | |
| | :note: |
| | You may read the contents of the current line in |
| | :attr:`self._cur_line <_cur_line>`. |
| | """ |
| | pass |
| |
|
| |
|
| | class CallableRemoteProgress(RemoteProgress): |
| | """A :class:`RemoteProgress` implementation forwarding updates to any callable. |
| | |
| | :note: |
| | Like direct instances of :class:`RemoteProgress`, instances of this |
| | :class:`CallableRemoteProgress` class are not themselves directly callable. |
| | Rather, instances of this class wrap a callable and forward to it. This should |
| | therefore not be confused with :class:`git.types.CallableProgress`. |
| | """ |
| |
|
| | __slots__ = ("_callable",) |
| |
|
| | def __init__(self, fn: Callable) -> None: |
| | self._callable = fn |
| | super().__init__() |
| |
|
| | def update(self, *args: Any, **kwargs: Any) -> None: |
| | self._callable(*args, **kwargs) |
| |
|
| |
|
| | class Actor: |
| | """Actors hold information about a person acting on the repository. They can be |
| | committers and authors or anything with a name and an email as mentioned in the git |
| | log entries.""" |
| |
|
| | |
| | name_only_regex = re.compile(r"<(.*)>") |
| | name_email_regex = re.compile(r"(.*) <(.*?)>") |
| |
|
| | |
| | |
| | env_author_name = "GIT_AUTHOR_NAME" |
| | env_author_email = "GIT_AUTHOR_EMAIL" |
| | env_committer_name = "GIT_COMMITTER_NAME" |
| | env_committer_email = "GIT_COMMITTER_EMAIL" |
| |
|
| | |
| | conf_name = "name" |
| | conf_email = "email" |
| |
|
| | __slots__ = ("name", "email") |
| |
|
| | def __init__(self, name: Optional[str], email: Optional[str]) -> None: |
| | self.name = name |
| | self.email = email |
| |
|
| | def __eq__(self, other: Any) -> bool: |
| | return self.name == other.name and self.email == other.email |
| |
|
| | def __ne__(self, other: Any) -> bool: |
| | return not (self == other) |
| |
|
| | def __hash__(self) -> int: |
| | return hash((self.name, self.email)) |
| |
|
| | def __str__(self) -> str: |
| | return self.name if self.name else "" |
| |
|
| | def __repr__(self) -> str: |
| | return '<git.Actor "%s <%s>">' % (self.name, self.email) |
| |
|
| | @classmethod |
| | def _from_string(cls, string: str) -> "Actor": |
| | """Create an :class:`Actor` from a string. |
| | |
| | :param string: |
| | The string, which is expected to be in regular git format:: |
| | |
| | John Doe <jdoe@example.com> |
| | |
| | :return: |
| | :class:`Actor` |
| | """ |
| | m = cls.name_email_regex.search(string) |
| | if m: |
| | name, email = m.groups() |
| | return Actor(name, email) |
| | else: |
| | m = cls.name_only_regex.search(string) |
| | if m: |
| | return Actor(m.group(1), None) |
| | |
| | return Actor(string, None) |
| | |
| | |
| |
|
| | @classmethod |
| | def _main_actor( |
| | cls, |
| | env_name: str, |
| | env_email: str, |
| | config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None, |
| | ) -> "Actor": |
| | actor = Actor("", "") |
| | user_id = None |
| |
|
| | def default_email() -> str: |
| | nonlocal user_id |
| | if not user_id: |
| | user_id = get_user_id() |
| | return user_id |
| |
|
| | def default_name() -> str: |
| | return default_email().split("@")[0] |
| |
|
| | for attr, evar, cvar, default in ( |
| | ("name", env_name, cls.conf_name, default_name), |
| | ("email", env_email, cls.conf_email, default_email), |
| | ): |
| | try: |
| | val = os.environ[evar] |
| | setattr(actor, attr, val) |
| | except KeyError: |
| | if config_reader is not None: |
| | try: |
| | val = config_reader.get("user", cvar) |
| | except Exception: |
| | val = default() |
| | setattr(actor, attr, val) |
| | |
| | if not getattr(actor, attr): |
| | setattr(actor, attr, default()) |
| | |
| | |
| | return actor |
| |
|
| | @classmethod |
| | def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": |
| | """ |
| | :return: |
| | :class:`Actor` instance corresponding to the configured committer. It |
| | behaves similar to the git implementation, such that the environment will |
| | override configuration values of `config_reader`. If no value is set at all, |
| | it will be generated. |
| | |
| | :param config_reader: |
| | ConfigReader to use to retrieve the values from in case they are not set in |
| | the environment. |
| | """ |
| | return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) |
| |
|
| | @classmethod |
| | def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": |
| | """Same as :meth:`committer`, but defines the main author. It may be specified |
| | in the environment, but defaults to the committer.""" |
| | return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) |
| |
|
| |
|
| | class Stats: |
| | """Represents stat information as presented by git at the end of a merge. It is |
| | created from the output of a diff operation. |
| | |
| | Example:: |
| | |
| | c = Commit( sha1 ) |
| | s = c.stats |
| | s.total # full-stat-dict |
| | s.files # dict( filepath : stat-dict ) |
| | |
| | ``stat-dict`` |
| | |
| | A dictionary with the following keys and values:: |
| | |
| | deletions = number of deleted lines as int |
| | insertions = number of inserted lines as int |
| | lines = total number of lines changed as int, or deletions + insertions |
| | change_type = type of change as str, A|C|D|M|R|T|U|X|B |
| | |
| | ``full-stat-dict`` |
| | |
| | In addition to the items in the stat-dict, it features additional information:: |
| | |
| | files = number of changed files as int |
| | """ |
| |
|
| | __slots__ = ("total", "files") |
| |
|
| | def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]) -> None: |
| | self.total = total |
| | self.files = files |
| |
|
| | @classmethod |
| | def _list_from_string(cls, repo: "Repo", text: str) -> "Stats": |
| | """Create a :class:`Stats` object from output retrieved by |
| | :manpage:`git-diff(1)`. |
| | |
| | :return: |
| | :class:`git.Stats` |
| | """ |
| |
|
| | hsh: HSH_TD = { |
| | "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0}, |
| | "files": {}, |
| | } |
| | for line in text.splitlines(): |
| | (change_type, raw_insertions, raw_deletions, filename) = line.split("\t") |
| | insertions = raw_insertions != "-" and int(raw_insertions) or 0 |
| | deletions = raw_deletions != "-" and int(raw_deletions) or 0 |
| | hsh["total"]["insertions"] += insertions |
| | hsh["total"]["deletions"] += deletions |
| | hsh["total"]["lines"] += insertions + deletions |
| | hsh["total"]["files"] += 1 |
| | files_dict: Files_TD = { |
| | "insertions": insertions, |
| | "deletions": deletions, |
| | "lines": insertions + deletions, |
| | "change_type": change_type, |
| | } |
| | hsh["files"][filename.strip()] = files_dict |
| | return Stats(hsh["total"], hsh["files"]) |
| |
|
| |
|
| | class IndexFileSHA1Writer: |
| | """Wrapper around a file-like object that remembers the SHA1 of the data written to |
| | it. It will write a sha when the stream is closed or if asked for explicitly using |
| | :meth:`write_sha`. |
| | |
| | Only useful to the index file. |
| | |
| | :note: |
| | Based on the dulwich project. |
| | """ |
| |
|
| | __slots__ = ("f", "sha1") |
| |
|
| | def __init__(self, f: IO) -> None: |
| | self.f = f |
| | self.sha1 = make_sha(b"") |
| |
|
| | def write(self, data: AnyStr) -> int: |
| | self.sha1.update(data) |
| | return self.f.write(data) |
| |
|
| | def write_sha(self) -> bytes: |
| | sha = self.sha1.digest() |
| | self.f.write(sha) |
| | return sha |
| |
|
| | def close(self) -> bytes: |
| | sha = self.write_sha() |
| | self.f.close() |
| | return sha |
| |
|
| | def tell(self) -> int: |
| | return self.f.tell() |
| |
|
| |
|
| | class LockFile: |
| | """Provides methods to obtain, check for, and release a file based lock which |
| | should be used to handle concurrent access to the same file. |
| | |
| | As we are a utility class to be derived from, we only use protected methods. |
| | |
| | Locks will automatically be released on destruction. |
| | """ |
| |
|
| | __slots__ = ("_file_path", "_owns_lock") |
| |
|
| | def __init__(self, file_path: PathLike) -> None: |
| | self._file_path = file_path |
| | self._owns_lock = False |
| |
|
| | def __del__(self) -> None: |
| | self._release_lock() |
| |
|
| | def _lock_file_path(self) -> str: |
| | """:return: Path to lockfile""" |
| | return "%s.lock" % (self._file_path) |
| |
|
| | def _has_lock(self) -> bool: |
| | """ |
| | :return: |
| | True if we have a lock and if the lockfile still exists |
| | |
| | :raise AssertionError: |
| | If our lock-file does not exist. |
| | """ |
| | return self._owns_lock |
| |
|
| | def _obtain_lock_or_raise(self) -> None: |
| | """Create a lock file as flag for other instances, mark our instance as |
| | lock-holder. |
| | |
| | :raise IOError: |
| | If a lock was already present or a lock file could not be written. |
| | """ |
| | if self._has_lock(): |
| | return |
| | lock_file = self._lock_file_path() |
| | if osp.isfile(lock_file): |
| | raise IOError( |
| | "Lock for file %r did already exist, delete %r in case the lock is illegal" |
| | % (self._file_path, lock_file) |
| | ) |
| |
|
| | try: |
| | with open(lock_file, mode="w"): |
| | pass |
| | except OSError as e: |
| | raise IOError(str(e)) from e |
| |
|
| | self._owns_lock = True |
| |
|
| | def _obtain_lock(self) -> None: |
| | """The default implementation will raise if a lock cannot be obtained. |
| | |
| | Subclasses may override this method to provide a different implementation. |
| | """ |
| | return self._obtain_lock_or_raise() |
| |
|
| | def _release_lock(self) -> None: |
| | """Release our lock if we have one.""" |
| | if not self._has_lock(): |
| | return |
| |
|
| | |
| | |
| | lfp = self._lock_file_path() |
| | try: |
| | rmfile(lfp) |
| | except OSError: |
| | pass |
| | self._owns_lock = False |
| |
|
| |
|
| | class BlockingLockFile(LockFile): |
| | """The lock file will block until a lock could be obtained, or fail after a |
| | specified timeout. |
| | |
| | :note: |
| | If the directory containing the lock was removed, an exception will be raised |
| | during the blocking period, preventing hangs as the lock can never be obtained. |
| | """ |
| |
|
| | __slots__ = ("_check_interval", "_max_block_time") |
| |
|
| | def __init__( |
| | self, |
| | file_path: PathLike, |
| | check_interval_s: float = 0.3, |
| | max_block_time_s: int = sys.maxsize, |
| | ) -> None: |
| | """Configure the instance. |
| | |
| | :param check_interval_s: |
| | Period of time to sleep until the lock is checked the next time. |
| | By default, it waits a nearly unlimited time. |
| | |
| | :param max_block_time_s: |
| | Maximum amount of seconds we may lock. |
| | """ |
| | super().__init__(file_path) |
| | self._check_interval = check_interval_s |
| | self._max_block_time = max_block_time_s |
| |
|
| | def _obtain_lock(self) -> None: |
| | """This method blocks until it obtained the lock, or raises :exc:`IOError` if it |
| | ran out of time or if the parent directory was not available anymore. |
| | |
| | If this method returns, you are guaranteed to own the lock. |
| | """ |
| | starttime = time.time() |
| | maxtime = starttime + float(self._max_block_time) |
| | while True: |
| | try: |
| | super()._obtain_lock() |
| | except IOError as e: |
| | |
| | |
| | curtime = time.time() |
| | if not osp.isdir(osp.dirname(self._lock_file_path())): |
| | msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % ( |
| | self._lock_file_path(), |
| | curtime - starttime, |
| | ) |
| | raise IOError(msg) from e |
| | |
| |
|
| | if curtime >= maxtime: |
| | msg = "Waited %g seconds for lock at %r" % ( |
| | maxtime - starttime, |
| | self._lock_file_path(), |
| | ) |
| | raise IOError(msg) from e |
| | |
| | time.sleep(self._check_interval) |
| | else: |
| | break |
| | |
| |
|
| |
|
| | class IterableList(List[T_IterableObj]): |
| | """List of iterable objects allowing to query an object by id or by named index:: |
| | |
| | heads = repo.heads |
| | heads.master |
| | heads['master'] |
| | heads[0] |
| | |
| | Iterable parent objects: |
| | |
| | * :class:`Commit <git.objects.Commit>` |
| | * :class:`Submodule <git.objects.submodule.base.Submodule>` |
| | * :class:`Reference <git.refs.reference.Reference>` |
| | * :class:`FetchInfo <git.remote.FetchInfo>` |
| | * :class:`PushInfo <git.remote.PushInfo>` |
| | |
| | Iterable via inheritance: |
| | |
| | * :class:`Head <git.refs.head.Head>` |
| | * :class:`TagReference <git.refs.tag.TagReference>` |
| | * :class:`RemoteReference <git.refs.remote.RemoteReference>` |
| | |
| | This requires an ``id_attribute`` name to be set which will be queried from its |
| | contained items to have a means for comparison. |
| | |
| | A prefix can be specified which is to be used in case the id returned by the items |
| | always contains a prefix that does not matter to the user, so it can be left out. |
| | """ |
| |
|
| | __slots__ = ("_id_attr", "_prefix") |
| |
|
| | def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[T_IterableObj]": |
| | return super().__new__(cls) |
| |
|
| | def __init__(self, id_attr: str, prefix: str = "") -> None: |
| | self._id_attr = id_attr |
| | self._prefix = prefix |
| |
|
| | def __contains__(self, attr: object) -> bool: |
| | |
| | try: |
| | rval = list.__contains__(self, attr) |
| | if rval: |
| | return rval |
| | except (AttributeError, TypeError): |
| | pass |
| | |
| |
|
| | |
| | try: |
| | getattr(self, cast(str, attr)) |
| | return True |
| | except (AttributeError, TypeError): |
| | return False |
| | |
| |
|
| | def __getattr__(self, attr: str) -> T_IterableObj: |
| | attr = self._prefix + attr |
| | for item in self: |
| | if getattr(item, self._id_attr) == attr: |
| | return item |
| | |
| | return list.__getattribute__(self, attr) |
| |
|
| | def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: |
| | assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str" |
| |
|
| | if isinstance(index, int): |
| | return list.__getitem__(self, index) |
| | elif isinstance(index, slice): |
| | raise ValueError("Index should be an int or str") |
| | else: |
| | try: |
| | return getattr(self, index) |
| | except AttributeError as e: |
| | raise IndexError("No item found with id %r" % (self._prefix + index)) from e |
| | |
| |
|
| | def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: |
| | assert isinstance(index, (int, str)), "Index of IterableList should be an int or str" |
| |
|
| | delindex = cast(int, index) |
| | if not isinstance(index, int): |
| | delindex = -1 |
| | name = self._prefix + index |
| | for i, item in enumerate(self): |
| | if getattr(item, self._id_attr) == name: |
| | delindex = i |
| | break |
| | |
| | |
| | if delindex == -1: |
| | raise IndexError("Item with name %s not found" % name) |
| | |
| | |
| | list.__delitem__(self, delindex) |
| |
|
| |
|
| | @runtime_checkable |
| | class IterableObj(Protocol): |
| | """Defines an interface for iterable items, so there is a uniform way to retrieve |
| | and iterate items within the git repository. |
| | |
| | Subclasses: |
| | |
| | * :class:`Submodule <git.objects.submodule.base.Submodule>` |
| | * :class:`Commit <git.objects.Commit>` |
| | * :class:`Reference <git.refs.reference.Reference>` |
| | * :class:`PushInfo <git.remote.PushInfo>` |
| | * :class:`FetchInfo <git.remote.FetchInfo>` |
| | * :class:`Remote <git.remote.Remote>` |
| | """ |
| |
|
| | __slots__ = () |
| |
|
| | _id_attribute_: str |
| |
|
| | @classmethod |
| | @abstractmethod |
| | def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: |
| | |
| | """Find (all) items of this type. |
| | |
| | Subclasses can specify `args` and `kwargs` differently, and may use them for |
| | filtering. However, when the method is called with no additional positional or |
| | keyword arguments, subclasses are obliged to to yield all items. |
| | |
| | :return: |
| | Iterator yielding Items |
| | """ |
| | raise NotImplementedError("To be implemented by Subclass") |
| |
|
| | @classmethod |
| | def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]: |
| | """Find (all) items of this type and collect them into a list. |
| | |
| | For more information about the arguments, see :meth:`iter_items`. |
| | |
| | :note: |
| | Favor the :meth:`iter_items` method as it will avoid eagerly collecting all |
| | items. When there are many items, that can slow performance and increase |
| | memory usage. |
| | |
| | :return: |
| | list(Item,...) list of item instances |
| | """ |
| | out_list: IterableList = IterableList(cls._id_attribute_) |
| | out_list.extend(cls.iter_items(repo, *args, **kwargs)) |
| | return out_list |
| |
|
| |
|
| | class IterableClassWatcher(type): |
| | """Metaclass that issues :exc:`DeprecationWarning` when :class:`git.util.Iterable` |
| | is subclassed.""" |
| |
|
| | def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None: |
| | for base in bases: |
| | if type(base) is IterableClassWatcher: |
| | warnings.warn( |
| | f"GitPython Iterable subclassed by {name}." |
| | " Iterable is deprecated due to naming clash since v3.1.18" |
| | " and will be removed in 4.0.0." |
| | " Use IterableObj instead.", |
| | DeprecationWarning, |
| | stacklevel=2, |
| | ) |
| |
|
| |
|
| | class Iterable(metaclass=IterableClassWatcher): |
| | """Deprecated, use :class:`IterableObj` instead. |
| | |
| | Defines an interface for iterable items, so there is a uniform way to retrieve |
| | and iterate items within the git repository. |
| | """ |
| |
|
| | __slots__ = () |
| |
|
| | _id_attribute_ = "attribute that most suitably identifies your instance" |
| |
|
| | @classmethod |
| | def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: |
| | """Deprecated, use :class:`IterableObj` instead. |
| | |
| | Find (all) items of this type. |
| | |
| | See :meth:`IterableObj.iter_items` for details on usage. |
| | |
| | :return: |
| | Iterator yielding Items |
| | """ |
| | raise NotImplementedError("To be implemented by Subclass") |
| |
|
| | @classmethod |
| | def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: |
| | """Deprecated, use :class:`IterableObj` instead. |
| | |
| | Find (all) items of this type and collect them into a list. |
| | |
| | See :meth:`IterableObj.list_items` for details on usage. |
| | |
| | :return: |
| | list(Item,...) list of item instances |
| | """ |
| | out_list: Any = IterableList(cls._id_attribute_) |
| | out_list.extend(cls.iter_items(repo, *args, **kwargs)) |
| | return out_list |
| |
|
| |
|
| | |
| |
|