| |
| |
| |
| |
| """Utilities used in ODB testing""" |
| from gitdb import OStream |
|
|
| import sys |
| import random |
| from array import array |
|
|
| from io import BytesIO |
|
|
| import glob |
| import unittest |
| import tempfile |
| import shutil |
| import os |
| import gc |
| import logging |
| from functools import wraps |
|
|
|
|
| |
|
|
| class TestBase(unittest.TestCase): |
| """Base class for all tests |
| |
| TestCase providing access to readonly repositories using the following member variables. |
| |
| * gitrepopath |
| |
| * read-only base path of the git source repository, i.e. .../git/.git |
| """ |
|
|
| |
| k_env_git_repo = "GITDB_TEST_GIT_REPO_BASE" |
| |
|
|
| @classmethod |
| def setUpClass(cls): |
| try: |
| super().setUpClass() |
| except AttributeError: |
| pass |
|
|
| cls.gitrepopath = os.environ.get(cls.k_env_git_repo) |
| if not cls.gitrepopath: |
| logging.info( |
| "You can set the %s environment variable to a .git repository of your choice - defaulting to the gitdb repository", cls.k_env_git_repo) |
| ospd = os.path.dirname |
| cls.gitrepopath = os.path.join(ospd(ospd(ospd(__file__))), '.git') |
| |
| assert cls.gitrepopath.endswith('.git') |
|
|
|
|
| |
|
|
| |
|
|
| def with_rw_directory(func): |
| """Create a temporary directory which can be written to, remove it if the |
| test succeeds, but leave it otherwise to aid additional debugging""" |
|
|
| def wrapper(self): |
| path = tempfile.mktemp(prefix=func.__name__) |
| os.mkdir(path) |
| keep = False |
| try: |
| try: |
| return func(self, path) |
| except Exception: |
| sys.stderr.write(f"Test {type(self).__name__}.{func.__name__} failed, output is at {path!r}\n") |
| keep = True |
| raise |
| finally: |
| |
| |
| |
| |
| if not keep: |
| gc.collect() |
| shutil.rmtree(path) |
| |
| |
|
|
| wrapper.__name__ = func.__name__ |
| return wrapper |
|
|
|
|
| def with_packs_rw(func): |
| """Function that provides a path into which the packs for testing should be |
| copied. Will pass on the path to the actual function afterwards""" |
|
|
| def wrapper(self, path): |
| src_pack_glob = fixture_path('packs/*') |
| copy_files_globbed(src_pack_glob, path, hard_link_ok=True) |
| return func(self, path) |
| |
|
|
| wrapper.__name__ = func.__name__ |
| return wrapper |
|
|
| |
|
|
| |
|
|
|
|
| def fixture_path(relapath=''): |
| """:return: absolute path into the fixture directory |
| :param relapath: relative path into the fixtures directory, or '' |
| to obtain the fixture directory itself""" |
| return os.path.join(os.path.dirname(__file__), 'fixtures', relapath) |
|
|
|
|
| def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): |
| """Copy all files found according to the given source glob into the target directory |
| :param hard_link_ok: if True, hard links will be created if possible. Otherwise |
| the files will be copied""" |
| for src_file in glob.glob(source_glob): |
| if hard_link_ok and hasattr(os, 'link'): |
| target = os.path.join(target_dir, os.path.basename(src_file)) |
| try: |
| os.link(src_file, target) |
| except OSError: |
| shutil.copy(src_file, target_dir) |
| |
| else: |
| shutil.copy(src_file, target_dir) |
| |
| |
|
|
|
|
| def make_bytes(size_in_bytes, randomize=False): |
| """:return: string with given size in bytes |
| :param randomize: try to produce a very random stream""" |
| actual_size = size_in_bytes // 4 |
| producer = range(actual_size) |
| if randomize: |
| producer = list(producer) |
| random.shuffle(producer) |
| |
| a = array('i', producer) |
| return a.tobytes() |
|
|
|
|
| def make_object(type, data): |
| """:return: bytes resembling an uncompressed object""" |
| odata = "blob %i\0" % len(data) |
| return odata.encode("ascii") + data |
|
|
|
|
| def make_memory_file(size_in_bytes, randomize=False): |
| """:return: tuple(size_of_stream, stream) |
| :param randomize: try to produce a very random stream""" |
| d = make_bytes(size_in_bytes, randomize) |
| return len(d), BytesIO(d) |
|
|
| |
|
|
| |
|
|
|
|
| class DummyStream: |
|
|
| def __init__(self): |
| self.was_read = False |
| self.bytes = 0 |
| self.closed = False |
|
|
| def read(self, size): |
| self.was_read = True |
| self.bytes = size |
|
|
| def close(self): |
| self.closed = True |
|
|
| def _assert(self): |
| assert self.was_read |
|
|
|
|
| class DeriveTest(OStream): |
|
|
| def __init__(self, sha, type, size, stream, *args, **kwargs): |
| self.myarg = kwargs.pop('myarg') |
| self.args = args |
|
|
| def _assert(self): |
| assert self.args |
| assert self.myarg |
|
|
| |
|
|