| | |
| | |
| | |
| | |
| |
|
| | __all__ = ["Commit"] |
| |
|
| | from collections import defaultdict |
| | import datetime |
| | from io import BytesIO |
| | import logging |
| | import os |
| | import re |
| | from subprocess import Popen, PIPE |
| | import sys |
| | from time import altzone, daylight, localtime, time, timezone |
| | import warnings |
| |
|
| | from gitdb import IStream |
| |
|
| | from git.cmd import Git |
| | from git.diff import Diffable |
| | from git.util import Actor, Stats, finalize_process, hex_to_bin |
| |
|
| | from . import base |
| | from .tree import Tree |
| | from .util import ( |
| | Serializable, |
| | TraversableIterableObj, |
| | altz_to_utctz_str, |
| | from_timestamp, |
| | parse_actor_and_date, |
| | parse_date, |
| | ) |
| |
|
| | |
| |
|
| | from typing import ( |
| | Any, |
| | Dict, |
| | IO, |
| | Iterator, |
| | List, |
| | Sequence, |
| | Tuple, |
| | TYPE_CHECKING, |
| | Union, |
| | cast, |
| | ) |
| |
|
| | if sys.version_info >= (3, 8): |
| | from typing import Literal |
| | else: |
| | from typing_extensions import Literal |
| |
|
| | from git.types import PathLike |
| |
|
| | if TYPE_CHECKING: |
| | from git.refs import SymbolicReference |
| | from git.repo import Repo |
| |
|
| | |
| |
|
| | _logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): |
| | """Wraps a git commit object. |
| | |
| | See :manpage:`gitglossary(7)` on "commit object": |
| | https://git-scm.com/docs/gitglossary#def_commit_object |
| | |
| | :note: |
| | This class will act lazily on some of its attributes and will query the value on |
| | demand only if it involves calling the git binary. |
| | """ |
| |
|
| | |
| | |
| | env_author_date = "GIT_AUTHOR_DATE" |
| | env_committer_date = "GIT_COMMITTER_DATE" |
| |
|
| | |
| | conf_encoding = "i18n.commitencoding" |
| |
|
| | |
| | default_encoding = "UTF-8" |
| |
|
| | type: Literal["commit"] = "commit" |
| |
|
| | __slots__ = ( |
| | "tree", |
| | "author", |
| | "authored_date", |
| | "author_tz_offset", |
| | "committer", |
| | "committed_date", |
| | "committer_tz_offset", |
| | "message", |
| | "parents", |
| | "encoding", |
| | "gpgsig", |
| | ) |
| |
|
| | _id_attribute_ = "hexsha" |
| |
|
| | parents: Sequence["Commit"] |
| |
|
| | def __init__( |
| | self, |
| | repo: "Repo", |
| | binsha: bytes, |
| | tree: Union[Tree, None] = None, |
| | author: Union[Actor, None] = None, |
| | authored_date: Union[int, None] = None, |
| | author_tz_offset: Union[None, float] = None, |
| | committer: Union[Actor, None] = None, |
| | committed_date: Union[int, None] = None, |
| | committer_tz_offset: Union[None, float] = None, |
| | message: Union[str, bytes, None] = None, |
| | parents: Union[Sequence["Commit"], None] = None, |
| | encoding: Union[str, None] = None, |
| | gpgsig: Union[str, None] = None, |
| | ) -> None: |
| | """Instantiate a new :class:`Commit`. All keyword arguments taking ``None`` as |
| | default will be implicitly set on first query. |
| | |
| | :param binsha: |
| | 20 byte sha1. |
| | |
| | :param tree: |
| | A :class:`~git.objects.tree.Tree` object. |
| | |
| | :param author: |
| | The author :class:`~git.util.Actor` object. |
| | |
| | :param authored_date: int_seconds_since_epoch |
| | The authored DateTime - use :func:`time.gmtime` to convert it into a |
| | different format. |
| | |
| | :param author_tz_offset: int_seconds_west_of_utc |
| | The timezone that the `authored_date` is in. |
| | |
| | :param committer: |
| | The committer string, as an :class:`~git.util.Actor` object. |
| | |
| | :param committed_date: int_seconds_since_epoch |
| | The committed DateTime - use :func:`time.gmtime` to convert it into a |
| | different format. |
| | |
| | :param committer_tz_offset: int_seconds_west_of_utc |
| | The timezone that the `committed_date` is in. |
| | |
| | :param message: string |
| | The commit message. |
| | |
| | :param encoding: string |
| | Encoding of the message, defaults to UTF-8. |
| | |
| | :param parents: |
| | List or tuple of :class:`Commit` objects which are our parent(s) in the |
| | commit dependency graph. |
| | |
| | :return: |
| | :class:`Commit` |
| | |
| | :note: |
| | Timezone information is in the same format and in the same sign as what |
| | :func:`time.altzone` returns. The sign is inverted compared to git's UTC |
| | timezone. |
| | """ |
| | super().__init__(repo, binsha) |
| | self.binsha = binsha |
| | if tree is not None: |
| | assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree) |
| | if tree is not None: |
| | self.tree = tree |
| | if author is not None: |
| | self.author = author |
| | if authored_date is not None: |
| | self.authored_date = authored_date |
| | if author_tz_offset is not None: |
| | self.author_tz_offset = author_tz_offset |
| | if committer is not None: |
| | self.committer = committer |
| | if committed_date is not None: |
| | self.committed_date = committed_date |
| | if committer_tz_offset is not None: |
| | self.committer_tz_offset = committer_tz_offset |
| | if message is not None: |
| | self.message = message |
| | if parents is not None: |
| | self.parents = parents |
| | if encoding is not None: |
| | self.encoding = encoding |
| | if gpgsig is not None: |
| | self.gpgsig = gpgsig |
| |
|
| | @classmethod |
| | def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]: |
| | return tuple(commit.parents) |
| |
|
| | @classmethod |
| | def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes: |
| | """Calculate the sha of a commit. |
| | |
| | :param repo: |
| | :class:`~git.repo.base.Repo` object the commit should be part of. |
| | |
| | :param commit: |
| | :class:`Commit` object for which to generate the sha. |
| | """ |
| |
|
| | stream = BytesIO() |
| | commit._serialize(stream) |
| | streamlen = stream.tell() |
| | stream.seek(0) |
| |
|
| | istream = repo.odb.store(IStream(cls.type, streamlen, stream)) |
| | return istream.binsha |
| |
|
| | def replace(self, **kwargs: Any) -> "Commit": |
| | """Create new commit object from an existing commit object. |
| | |
| | Any values provided as keyword arguments will replace the corresponding |
| | attribute in the new object. |
| | """ |
| |
|
| | attrs = {k: getattr(self, k) for k in self.__slots__} |
| |
|
| | for attrname in kwargs: |
| | if attrname not in self.__slots__: |
| | raise ValueError("invalid attribute name") |
| |
|
| | attrs.update(kwargs) |
| | new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs) |
| | new_commit.binsha = self._calculate_sha_(self.repo, new_commit) |
| |
|
| | return new_commit |
| |
|
| | def _set_cache_(self, attr: str) -> None: |
| | if attr in Commit.__slots__: |
| | |
| | _binsha, _typename, self.size, stream = self.repo.odb.stream(self.binsha) |
| | self._deserialize(BytesIO(stream.read())) |
| | else: |
| | super()._set_cache_(attr) |
| | |
| |
|
| | @property |
| | def authored_datetime(self) -> datetime.datetime: |
| | return from_timestamp(self.authored_date, self.author_tz_offset) |
| |
|
| | @property |
| | def committed_datetime(self) -> datetime.datetime: |
| | return from_timestamp(self.committed_date, self.committer_tz_offset) |
| |
|
| | @property |
| | def summary(self) -> Union[str, bytes]: |
| | """:return: First line of the commit message""" |
| | if isinstance(self.message, str): |
| | return self.message.split("\n", 1)[0] |
| | else: |
| | return self.message.split(b"\n", 1)[0] |
| |
|
| | def count(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> int: |
| | """Count the number of commits reachable from this commit. |
| | |
| | :param paths: |
| | An optional path or a list of paths restricting the return value to commits |
| | actually containing the paths. |
| | |
| | :param kwargs: |
| | Additional options to be passed to :manpage:`git-rev-list(1)`. They must not |
| | alter the output style of the command, or parsing will yield incorrect |
| | results. |
| | |
| | :return: |
| | An int defining the number of reachable commits |
| | """ |
| | |
| | |
| | if paths: |
| | return len(self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines()) |
| | return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines()) |
| |
|
| | @property |
| | def name_rev(self) -> str: |
| | """ |
| | :return: |
| | String describing the commits hex sha based on the closest |
| | `~git.refs.reference.Reference`. |
| | |
| | :note: |
| | Mostly useful for UI purposes. |
| | """ |
| | return self.repo.git.name_rev(self) |
| |
|
| | @classmethod |
| | def iter_items( |
| | cls, |
| | repo: "Repo", |
| | rev: Union[str, "Commit", "SymbolicReference"], |
| | paths: Union[PathLike, Sequence[PathLike]] = "", |
| | **kwargs: Any, |
| | ) -> Iterator["Commit"]: |
| | R"""Find all commits matching the given criteria. |
| | |
| | :param repo: |
| | The :class:`~git.repo.base.Repo`. |
| | |
| | :param rev: |
| | Revision specifier. See :manpage:`git-rev-parse(1)` for viable options. |
| | |
| | :param paths: |
| | An optional path or list of paths. If set only :class:`Commit`\s that |
| | include the path or paths will be considered. |
| | |
| | :param kwargs: |
| | Optional keyword arguments to :manpage:`git-rev-list(1)` where: |
| | |
| | * ``max_count`` is the maximum number of commits to fetch. |
| | * ``skip`` is the number of commits to skip. |
| | * ``since`` selects all commits since some date, e.g. ``"1970-01-01"``. |
| | |
| | :return: |
| | Iterator yielding :class:`Commit` items. |
| | """ |
| | if "pretty" in kwargs: |
| | raise ValueError("--pretty cannot be used as parsing expects single sha's only") |
| | |
| |
|
| | |
| | |
| |
|
| | args_list: List[PathLike] = ["--"] |
| |
|
| | if paths: |
| | paths_tup: Tuple[PathLike, ...] |
| | if isinstance(paths, (str, os.PathLike)): |
| | paths_tup = (paths,) |
| | else: |
| | paths_tup = tuple(paths) |
| |
|
| | args_list.extend(paths_tup) |
| | |
| |
|
| | proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs) |
| | return cls._iter_from_process_or_stream(repo, proc) |
| |
|
| | def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> Iterator["Commit"]: |
| | R"""Iterate _all_ parents of this commit. |
| | |
| | :param paths: |
| | Optional path or list of paths limiting the :class:`Commit`\s to those that |
| | contain at least one of the paths. |
| | |
| | :param kwargs: |
| | All arguments allowed by :manpage:`git-rev-list(1)`. |
| | |
| | :return: |
| | Iterator yielding :class:`Commit` objects which are parents of ``self`` |
| | """ |
| | |
| | skip = kwargs.get("skip", 1) |
| | if skip == 0: |
| | skip = 1 |
| | kwargs["skip"] = skip |
| |
|
| | return self.iter_items(self.repo, self, paths, **kwargs) |
| |
|
| | @property |
| | def stats(self) -> Stats: |
| | """Create a git stat from changes between this commit and its first parent |
| | or from all changes done if this is the very first commit. |
| | |
| | :return: |
| | :class:`Stats` |
| | """ |
| |
|
| | def process_lines(lines: List[str]) -> str: |
| | text = "" |
| | for file_info, line in zip(lines, lines[len(lines) // 2 :]): |
| | change_type = file_info.split("\t")[0][-1] |
| | (insertions, deletions, filename) = line.split("\t") |
| | text += "%s\t%s\t%s\t%s\n" % (change_type, insertions, deletions, filename) |
| | return text |
| |
|
| | if not self.parents: |
| | lines = self.repo.git.diff_tree( |
| | self.hexsha, "--", numstat=True, no_renames=True, root=True, raw=True |
| | ).splitlines()[1:] |
| | text = process_lines(lines) |
| | else: |
| | lines = self.repo.git.diff( |
| | self.parents[0].hexsha, self.hexsha, "--", numstat=True, no_renames=True, raw=True |
| | ).splitlines() |
| | text = process_lines(lines) |
| | return Stats._list_from_string(self.repo, text) |
| |
|
| | @property |
| | def trailers(self) -> Dict[str, str]: |
| | """Deprecated. Get the trailers of the message as a dictionary. |
| | |
| | :note: |
| | This property is deprecated, please use either :attr:`trailers_list` or |
| | :attr:`trailers_dict`. |
| | |
| | :return: |
| | Dictionary containing whitespace stripped trailer information. |
| | Only contains the latest instance of each trailer key. |
| | """ |
| | warnings.warn( |
| | "Commit.trailers is deprecated, use Commit.trailers_list or Commit.trailers_dict instead", |
| | DeprecationWarning, |
| | stacklevel=2, |
| | ) |
| | return {k: v[0] for k, v in self.trailers_dict.items()} |
| |
|
| | @property |
| | def trailers_list(self) -> List[Tuple[str, str]]: |
| | """Get the trailers of the message as a list. |
| | |
| | Git messages can contain trailer information that are similar to :rfc:`822` |
| | e-mail headers. See :manpage:`git-interpret-trailers(1)`. |
| | |
| | This function calls ``git interpret-trailers --parse`` onto the message to |
| | extract the trailer information, returns the raw trailer data as a list. |
| | |
| | Valid message with trailer:: |
| | |
| | Subject line |
| | |
| | some body information |
| | |
| | another information |
| | |
| | key1: value1.1 |
| | key1: value1.2 |
| | key2 : value 2 with inner spaces |
| | |
| | Returned list will look like this:: |
| | |
| | [ |
| | ("key1", "value1.1"), |
| | ("key1", "value1.2"), |
| | ("key2", "value 2 with inner spaces"), |
| | ] |
| | |
| | :return: |
| | List containing key-value tuples of whitespace stripped trailer information. |
| | """ |
| | cmd = ["git", "interpret-trailers", "--parse"] |
| | proc: Git.AutoInterrupt = self.repo.git.execute( |
| | cmd, |
| | as_process=True, |
| | istream=PIPE, |
| | ) |
| | trailer: str = proc.communicate(str(self.message).encode())[0].decode("utf8") |
| | trailer = trailer.strip() |
| |
|
| | if not trailer: |
| | return [] |
| |
|
| | trailer_list = [] |
| | for t in trailer.split("\n"): |
| | key, val = t.split(":", 1) |
| | trailer_list.append((key.strip(), val.strip())) |
| |
|
| | return trailer_list |
| |
|
| | @property |
| | def trailers_dict(self) -> Dict[str, List[str]]: |
| | """Get the trailers of the message as a dictionary. |
| | |
| | Git messages can contain trailer information that are similar to :rfc:`822` |
| | e-mail headers. See :manpage:`git-interpret-trailers(1)`. |
| | |
| | This function calls ``git interpret-trailers --parse`` onto the message to |
| | extract the trailer information. The key value pairs are stripped of leading and |
| | trailing whitespaces before they get saved into a dictionary. |
| | |
| | Valid message with trailer:: |
| | |
| | Subject line |
| | |
| | some body information |
| | |
| | another information |
| | |
| | key1: value1.1 |
| | key1: value1.2 |
| | key2 : value 2 with inner spaces |
| | |
| | Returned dictionary will look like this:: |
| | |
| | { |
| | "key1": ["value1.1", "value1.2"], |
| | "key2": ["value 2 with inner spaces"], |
| | } |
| | |
| | |
| | :return: |
| | Dictionary containing whitespace stripped trailer information, mapping |
| | trailer keys to a list of their corresponding values. |
| | """ |
| | d = defaultdict(list) |
| | for key, val in self.trailers_list: |
| | d[key].append(val) |
| | return dict(d) |
| |
|
| | @classmethod |
| | def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, IO]) -> Iterator["Commit"]: |
| | """Parse out commit information into a list of :class:`Commit` objects. |
| | |
| | We expect one line per commit, and parse the actual commit information directly |
| | from our lighting fast object database. |
| | |
| | :param proc: |
| | :manpage:`git-rev-list(1)` process instance - one sha per line. |
| | |
| | :return: |
| | Iterator supplying :class:`Commit` objects |
| | """ |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | if hasattr(proc_or_stream, "wait"): |
| | proc_or_stream = cast(Popen, proc_or_stream) |
| | if proc_or_stream.stdout is not None: |
| | stream = proc_or_stream.stdout |
| | elif hasattr(proc_or_stream, "readline"): |
| | proc_or_stream = cast(IO, proc_or_stream) |
| | stream = proc_or_stream |
| |
|
| | readline = stream.readline |
| | while True: |
| | line = readline() |
| | if not line: |
| | break |
| | hexsha = line.strip() |
| | if len(hexsha) > 40: |
| | |
| | hexsha, _ = line.split(None, 1) |
| | |
| |
|
| | assert len(hexsha) == 40, "Invalid line: %s" % hexsha |
| | yield cls(repo, hex_to_bin(hexsha)) |
| | |
| |
|
| | |
| | |
| | if hasattr(proc_or_stream, "wait"): |
| | proc_or_stream = cast(Popen, proc_or_stream) |
| | finalize_process(proc_or_stream) |
| |
|
| | @classmethod |
| | def create_from_tree( |
| | cls, |
| | repo: "Repo", |
| | tree: Union[Tree, str], |
| | message: str, |
| | parent_commits: Union[None, List["Commit"]] = None, |
| | head: bool = False, |
| | author: Union[None, Actor] = None, |
| | committer: Union[None, Actor] = None, |
| | author_date: Union[None, str, datetime.datetime] = None, |
| | commit_date: Union[None, str, datetime.datetime] = None, |
| | ) -> "Commit": |
| | """Commit the given tree, creating a :class:`Commit` object. |
| | |
| | :param repo: |
| | :class:`~git.repo.base.Repo` object the commit should be part of. |
| | |
| | :param tree: |
| | :class:`~git.objects.tree.Tree` object or hex or bin sha. |
| | The tree of the new commit. |
| | |
| | :param message: |
| | Commit message. It may be an empty string if no message is provided. It will |
| | be converted to a string, in any case. |
| | |
| | :param parent_commits: |
| | Optional :class:`Commit` objects to use as parents for the new commit. If |
| | empty list, the commit will have no parents at all and become a root commit. |
| | If ``None``, the current head commit will be the parent of the new commit |
| | object. |
| | |
| | :param head: |
| | If ``True``, the HEAD will be advanced to the new commit automatically. |
| | Otherwise the HEAD will remain pointing on the previous commit. This could |
| | lead to undesired results when diffing files. |
| | |
| | :param author: |
| | The name of the author, optional. |
| | If unset, the repository configuration is used to obtain this value. |
| | |
| | :param committer: |
| | The name of the committer, optional. |
| | If unset, the repository configuration is used to obtain this value. |
| | |
| | :param author_date: |
| | The timestamp for the author field. |
| | |
| | :param commit_date: |
| | The timestamp for the committer field. |
| | |
| | :return: |
| | :class:`Commit` object representing the new commit. |
| | |
| | :note: |
| | Additional information about the committer and author are taken from the |
| | environment or from the git configuration. See :manpage:`git-commit-tree(1)` |
| | for more information. |
| | """ |
| | if parent_commits is None: |
| | try: |
| | parent_commits = [repo.head.commit] |
| | except ValueError: |
| | |
| | parent_commits = [] |
| | |
| | else: |
| | for p in parent_commits: |
| | if not isinstance(p, cls): |
| | raise ValueError(f"Parent commit '{p!r}' must be of type {cls}") |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | cr = repo.config_reader() |
| | env = os.environ |
| |
|
| | committer = committer or Actor.committer(cr) |
| | author = author or Actor.author(cr) |
| |
|
| | |
| | unix_time = int(time()) |
| | is_dst = daylight and localtime().tm_isdst > 0 |
| | offset = altzone if is_dst else timezone |
| |
|
| | author_date_str = env.get(cls.env_author_date, "") |
| | if author_date: |
| | author_time, author_offset = parse_date(author_date) |
| | elif author_date_str: |
| | author_time, author_offset = parse_date(author_date_str) |
| | else: |
| | author_time, author_offset = unix_time, offset |
| | |
| |
|
| | committer_date_str = env.get(cls.env_committer_date, "") |
| | if commit_date: |
| | committer_time, committer_offset = parse_date(commit_date) |
| | elif committer_date_str: |
| | committer_time, committer_offset = parse_date(committer_date_str) |
| | else: |
| | committer_time, committer_offset = unix_time, offset |
| | |
| |
|
| | |
| | enc_section, enc_option = cls.conf_encoding.split(".") |
| | conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding) |
| | if not isinstance(conf_encoding, str): |
| | raise TypeError("conf_encoding could not be coerced to str") |
| |
|
| | |
| | |
| | if isinstance(tree, str): |
| | tree = repo.tree(tree) |
| | |
| |
|
| | |
| | new_commit = cls( |
| | repo, |
| | cls.NULL_BIN_SHA, |
| | tree, |
| | author, |
| | author_time, |
| | author_offset, |
| | committer, |
| | committer_time, |
| | committer_offset, |
| | message, |
| | parent_commits, |
| | conf_encoding, |
| | ) |
| |
|
| | new_commit.binsha = cls._calculate_sha_(repo, new_commit) |
| |
|
| | if head: |
| | |
| | |
| | import git.refs |
| |
|
| | try: |
| | repo.head.set_commit(new_commit, logmsg=message) |
| | except ValueError: |
| | |
| | |
| | master = git.refs.Head.create( |
| | repo, |
| | repo.head.ref, |
| | new_commit, |
| | logmsg="commit (initial): %s" % message, |
| | ) |
| | repo.head.set_reference(master, logmsg="commit: Switching to %s" % master) |
| | |
| | |
| |
|
| | return new_commit |
| |
|
| | |
| |
|
| | def _serialize(self, stream: BytesIO) -> "Commit": |
| | write = stream.write |
| | write(("tree %s\n" % self.tree).encode("ascii")) |
| | for p in self.parents: |
| | write(("parent %s\n" % p).encode("ascii")) |
| |
|
| | a = self.author |
| | aname = a.name |
| | c = self.committer |
| | fmt = "%s %s <%s> %s %s\n" |
| | write( |
| | ( |
| | fmt |
| | % ( |
| | "author", |
| | aname, |
| | a.email, |
| | self.authored_date, |
| | altz_to_utctz_str(self.author_tz_offset), |
| | ) |
| | ).encode(self.encoding) |
| | ) |
| |
|
| | |
| | aname = c.name |
| | write( |
| | ( |
| | fmt |
| | % ( |
| | "committer", |
| | aname, |
| | c.email, |
| | self.committed_date, |
| | altz_to_utctz_str(self.committer_tz_offset), |
| | ) |
| | ).encode(self.encoding) |
| | ) |
| |
|
| | if self.encoding != self.default_encoding: |
| | write(("encoding %s\n" % self.encoding).encode("ascii")) |
| |
|
| | try: |
| | if self.__getattribute__("gpgsig"): |
| | write(b"gpgsig") |
| | for sigline in self.gpgsig.rstrip("\n").split("\n"): |
| | write((" " + sigline + "\n").encode("ascii")) |
| | except AttributeError: |
| | pass |
| |
|
| | write(b"\n") |
| |
|
| | |
| | if isinstance(self.message, str): |
| | write(self.message.encode(self.encoding)) |
| | else: |
| | write(self.message) |
| | |
| | return self |
| |
|
| | def _deserialize(self, stream: BytesIO) -> "Commit": |
| | readline = stream.readline |
| | self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "") |
| |
|
| | self.parents = [] |
| | next_line = None |
| | while True: |
| | parent_line = readline() |
| | if not parent_line.startswith(b"parent"): |
| | next_line = parent_line |
| | break |
| | |
| | self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii")))) |
| | |
| | self.parents = tuple(self.parents) |
| |
|
| | |
| | |
| | author_line = next_line |
| | committer_line = readline() |
| |
|
| | |
| | next_line = readline() |
| | while next_line.startswith(b"mergetag "): |
| | next_line = readline() |
| | while next_line.startswith(b" "): |
| | next_line = readline() |
| | |
| |
|
| | |
| | |
| | self.encoding = self.default_encoding |
| | self.gpgsig = "" |
| |
|
| | |
| | enc = next_line |
| | buf = enc.strip() |
| | while buf: |
| | if buf[0:10] == b"encoding ": |
| | self.encoding = buf[buf.find(b" ") + 1 :].decode(self.encoding, "ignore") |
| | elif buf[0:7] == b"gpgsig ": |
| | sig = buf[buf.find(b" ") + 1 :] + b"\n" |
| | is_next_header = False |
| | while True: |
| | sigbuf = readline() |
| | if not sigbuf: |
| | break |
| | if sigbuf[0:1] != b" ": |
| | buf = sigbuf.strip() |
| | is_next_header = True |
| | break |
| | sig += sigbuf[1:] |
| | |
| | self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore") |
| | if is_next_header: |
| | continue |
| | buf = readline().strip() |
| |
|
| | |
| | try: |
| | ( |
| | self.author, |
| | self.authored_date, |
| | self.author_tz_offset, |
| | ) = parse_actor_and_date(author_line.decode(self.encoding, "replace")) |
| | except UnicodeDecodeError: |
| | _logger.error( |
| | "Failed to decode author line '%s' using encoding %s", |
| | author_line, |
| | self.encoding, |
| | exc_info=True, |
| | ) |
| |
|
| | try: |
| | ( |
| | self.committer, |
| | self.committed_date, |
| | self.committer_tz_offset, |
| | ) = parse_actor_and_date(committer_line.decode(self.encoding, "replace")) |
| | except UnicodeDecodeError: |
| | _logger.error( |
| | "Failed to decode committer line '%s' using encoding %s", |
| | committer_line, |
| | self.encoding, |
| | exc_info=True, |
| | ) |
| | |
| |
|
| | |
| | |
| | self.message = stream.read() |
| | try: |
| | self.message = self.message.decode(self.encoding, "replace") |
| | except UnicodeDecodeError: |
| | _logger.error( |
| | "Failed to decode message '%s' using encoding %s", |
| | self.message, |
| | self.encoding, |
| | exc_info=True, |
| | ) |
| | |
| |
|
| | return self |
| |
|
| | |
| |
|
| | @property |
| | def co_authors(self) -> List[Actor]: |
| | """Search the commit message for any co-authors of this commit. |
| | |
| | Details on co-authors: |
| | https://github.blog/2018-01-29-commit-together-with-co-authors/ |
| | |
| | :return: |
| | List of co-authors for this commit (as :class:`~git.util.Actor` objects). |
| | """ |
| | co_authors = [] |
| |
|
| | if self.message: |
| | results = re.findall( |
| | r"^Co-authored-by: (.*) <(.*?)>$", |
| | self.message, |
| | re.MULTILINE, |
| | ) |
| | for author in results: |
| | co_authors.append(Actor(*author)) |
| |
|
| | return co_authors |
| |
|