| | |
| | |
| | |
| | |
| |
|
| | __all__ = ["TreeModifier", "Tree"] |
| |
|
| | import sys |
| |
|
| | import git.diff as git_diff |
| | from git.util import IterableList, join_path, to_bin_sha |
| |
|
| | from . import util |
| | from .base import IndexObjUnion, IndexObject |
| | from .blob import Blob |
| | from .fun import tree_entries_from_data, tree_to_stream |
| | from .submodule.base import Submodule |
| |
|
| | |
| |
|
| | from typing import ( |
| | Any, |
| | Callable, |
| | Dict, |
| | Iterable, |
| | Iterator, |
| | List, |
| | Tuple, |
| | TYPE_CHECKING, |
| | Type, |
| | Union, |
| | cast, |
| | ) |
| |
|
| | if sys.version_info >= (3, 8): |
| | from typing import Literal |
| | else: |
| | from typing_extensions import Literal |
| |
|
| | from git.types import PathLike |
| |
|
| | if TYPE_CHECKING: |
| | from io import BytesIO |
| |
|
| | from git.repo import Repo |
| |
|
| | TreeCacheTup = Tuple[bytes, int, str] |
| |
|
| | TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]] |
| |
|
| | |
| |
|
| | cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b) |
| |
|
| |
|
| | class TreeModifier: |
| | """A utility class providing methods to alter the underlying cache in a list-like |
| | fashion. |
| | |
| | Once all adjustments are complete, the :attr:`_cache`, which really is a reference |
| | to the cache of a tree, will be sorted. This ensures it will be in a serializable |
| | state. |
| | """ |
| |
|
| | __slots__ = ("_cache",) |
| |
|
| | def __init__(self, cache: List[TreeCacheTup]) -> None: |
| | self._cache = cache |
| |
|
| | def _index_by_name(self, name: str) -> int: |
| | """:return: index of an item with name, or -1 if not found""" |
| | for i, t in enumerate(self._cache): |
| | if t[2] == name: |
| | return i |
| | |
| | |
| | return -1 |
| |
|
| | |
| | def set_done(self) -> "TreeModifier": |
| | """Call this method once you are done modifying the tree information. |
| | |
| | This may be called several times, but be aware that each call will cause a sort |
| | operation. |
| | |
| | :return: |
| | self |
| | """ |
| | self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2]) |
| | return self |
| |
|
| | |
| |
|
| | |
| | def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier": |
| | """Add the given item to the tree. |
| | |
| | If an item with the given name already exists, nothing will be done, but a |
| | :exc:`ValueError` will be raised if the sha and mode of the existing item do not |
| | match the one you add, unless `force` is ``True``. |
| | |
| | :param sha: |
| | The 20 or 40 byte sha of the item to add. |
| | |
| | :param mode: |
| | :class:`int` representing the stat-compatible mode of the item. |
| | |
| | :param force: |
| | If ``True``, an item with your name and information will overwrite any |
| | existing item with the same name, no matter which information it has. |
| | |
| | :return: |
| | self |
| | """ |
| | if "/" in name: |
| | raise ValueError("Name must not contain '/' characters") |
| | if (mode >> 12) not in Tree._map_id_to_type: |
| | raise ValueError("Invalid object type according to mode %o" % mode) |
| |
|
| | sha = to_bin_sha(sha) |
| | index = self._index_by_name(name) |
| |
|
| | item = (sha, mode, name) |
| |
|
| | if index == -1: |
| | self._cache.append(item) |
| | else: |
| | if force: |
| | self._cache[index] = item |
| | else: |
| | ex_item = self._cache[index] |
| | if ex_item[0] != sha or ex_item[1] != mode: |
| | raise ValueError("Item %r existed with different properties" % name) |
| | |
| | |
| | |
| | return self |
| |
|
| | def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None: |
| | """Add the given item to the tree. Its correctness is assumed, so it is the |
| | caller's responsibility to ensure that the input is correct. |
| | |
| | For more information on the parameters, see :meth:`add`. |
| | |
| | :param binsha: |
| | 20 byte binary sha. |
| | """ |
| | assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) |
| | tree_cache = (binsha, mode, name) |
| |
|
| | self._cache.append(tree_cache) |
| |
|
| | def __delitem__(self, name: str) -> None: |
| | """Delete an item with the given name if it exists.""" |
| | index = self._index_by_name(name) |
| | if index > -1: |
| | del self._cache[index] |
| |
|
| | |
| |
|
| |
|
| | class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): |
| | R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and |
| | other :class:`Tree`\s. |
| | |
| | See :manpage:`gitglossary(7)` on "tree object": |
| | https://git-scm.com/docs/gitglossary#def_tree_object |
| | |
| | Subscripting is supported, as with a list or dict: |
| | |
| | * Access a specific blob using the ``tree["filename"]`` notation. |
| | * You may likewise access by index, like ``blob = tree[0]``. |
| | """ |
| |
|
| | type: Literal["tree"] = "tree" |
| |
|
| | __slots__ = ("_cache",) |
| |
|
| | |
| | commit_id = 0o16 |
| | blob_id = 0o10 |
| | symlink_id = 0o12 |
| | tree_id = 0o04 |
| |
|
| | _map_id_to_type: Dict[int, Type[IndexObjUnion]] = { |
| | commit_id: Submodule, |
| | blob_id: Blob, |
| | symlink_id: Blob, |
| | |
| | } |
| |
|
| | def __init__( |
| | self, |
| | repo: "Repo", |
| | binsha: bytes, |
| | mode: int = tree_id << 12, |
| | path: Union[PathLike, None] = None, |
| | ): |
| | super().__init__(repo, binsha, mode, path) |
| |
|
| | @classmethod |
| | def _get_intermediate_items( |
| | cls, |
| | index_object: IndexObjUnion, |
| | ) -> Union[Tuple["Tree", ...], Tuple[()]]: |
| | if index_object.type == "tree": |
| | return tuple(index_object._iter_convert_to_object(index_object._cache)) |
| | return () |
| |
|
| | def _set_cache_(self, attr: str) -> None: |
| | if attr == "_cache": |
| | |
| | ostream = self.repo.odb.stream(self.binsha) |
| | self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read()) |
| | else: |
| | super()._set_cache_(attr) |
| | |
| |
|
| | def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]: |
| | """Iterable yields tuples of (binsha, mode, name), which will be converted to |
| | the respective object representation. |
| | """ |
| | for binsha, mode, name in iterable: |
| | path = join_path(self.path, name) |
| | try: |
| | yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path) |
| | except KeyError as e: |
| | raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e |
| | |
| |
|
| | def join(self, file: str) -> IndexObjUnion: |
| | """Find the named object in this tree's contents. |
| | |
| | :return: |
| | :class:`~git.objects.blob.Blob`, :class:`Tree`, or |
| | :class:`~git.objects.submodule.base.Submodule` |
| | |
| | :raise KeyError: |
| | If the given file or tree does not exist in this tree. |
| | """ |
| | msg = "Blob or Tree named %r not found" |
| | if "/" in file: |
| | tree = self |
| | item = self |
| | tokens = file.split("/") |
| | for i, token in enumerate(tokens): |
| | item = tree[token] |
| | if item.type == "tree": |
| | tree = item |
| | else: |
| | |
| | if i != len(tokens) - 1: |
| | raise KeyError(msg % file) |
| | return item |
| | |
| | |
| | if item == self: |
| | raise KeyError(msg % file) |
| | return item |
| | else: |
| | for info in self._cache: |
| | if info[2] == file: |
| | return self._map_id_to_type[info[1] >> 12]( |
| | self.repo, info[0], info[1], join_path(self.path, info[2]) |
| | ) |
| | |
| | raise KeyError(msg % file) |
| | |
| |
|
| | def __truediv__(self, file: str) -> IndexObjUnion: |
| | """The ``/`` operator is another syntax for joining. |
| | |
| | See :meth:`join` for details. |
| | """ |
| | return self.join(file) |
| |
|
| | @property |
| | def trees(self) -> List["Tree"]: |
| | """:return: list(Tree, ...) List of trees directly below this tree""" |
| | return [i for i in self if i.type == "tree"] |
| |
|
| | @property |
| | def blobs(self) -> List[Blob]: |
| | """:return: list(Blob, ...) List of blobs directly below this tree""" |
| | return [i for i in self if i.type == "blob"] |
| |
|
| | @property |
| | def cache(self) -> TreeModifier: |
| | """ |
| | :return: |
| | An object allowing modification of the internal cache. This can be used to |
| | change the tree's contents. When done, make sure you call |
| | :meth:`~TreeModifier.set_done` on the tree modifier, or serialization |
| | behaviour will be incorrect. |
| | |
| | :note: |
| | See :class:`TreeModifier` for more information on how to alter the cache. |
| | """ |
| | return TreeModifier(self._cache) |
| |
|
| | def traverse( |
| | self, |
| | predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True, |
| | prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False, |
| | depth: int = -1, |
| | branch_first: bool = True, |
| | visit_once: bool = False, |
| | ignore_self: int = 1, |
| | as_edge: bool = False, |
| | ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]: |
| | """For documentation, see |
| | `Traversable._traverse() <git.objects.util.Traversable._traverse>`. |
| | |
| | Trees are set to ``visit_once = False`` to gain more performance in the |
| | traversal. |
| | """ |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | return cast( |
| | Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]], |
| | super()._traverse( |
| | predicate, |
| | prune, |
| | depth, |
| | branch_first, |
| | visit_once, |
| | ignore_self, |
| | ), |
| | ) |
| |
|
| | def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]: |
| | """ |
| | :return: |
| | :class:`~git.util.IterableList` with the results of the traversal as |
| | produced by :meth:`traverse` |
| | |
| | Tree -> IterableList[Union[Submodule, Tree, Blob]] |
| | """ |
| | return super()._list_traverse(*args, **kwargs) |
| |
|
| | |
| |
|
| | def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]: |
| | return list(self._iter_convert_to_object(self._cache[i:j])) |
| |
|
| | def __iter__(self) -> Iterator[IndexObjUnion]: |
| | return self._iter_convert_to_object(self._cache) |
| |
|
| | def __len__(self) -> int: |
| | return len(self._cache) |
| |
|
| | def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion: |
| | if isinstance(item, int): |
| | info = self._cache[item] |
| | return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2])) |
| |
|
| | if isinstance(item, str): |
| | |
| | return self.join(item) |
| | |
| |
|
| | raise TypeError("Invalid index type: %r" % item) |
| |
|
| | def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool: |
| | if isinstance(item, IndexObject): |
| | for info in self._cache: |
| | if item.binsha == info[0]: |
| | return True |
| | |
| | |
| | |
| | |
| |
|
| | |
| | else: |
| | path = self.path |
| | for info in self._cache: |
| | if item == join_path(path, info[2]): |
| | return True |
| | |
| | return False |
| |
|
| | def __reversed__(self) -> Iterator[IndexObjUnion]: |
| | return reversed(self._iter_convert_to_object(self._cache)) |
| |
|
| | def _serialize(self, stream: "BytesIO") -> "Tree": |
| | """Serialize this tree into the stream. Assumes sorted tree data. |
| | |
| | :note: |
| | We will assume our tree data to be in a sorted state. If this is not the |
| | case, serialization will not generate a correct tree representation as these |
| | are assumed to be sorted by algorithms. |
| | """ |
| | tree_to_stream(self._cache, stream.write) |
| | return self |
| |
|
| | def _deserialize(self, stream: "BytesIO") -> "Tree": |
| | self._cache = tree_entries_from_data(stream.read()) |
| | return self |
| |
|
| |
|
| | |
| |
|
| | |
| | Tree._map_id_to_type[Tree.tree_id] = Tree |
| |
|