| |
| |
| |
| |
| """Test for object db""" |
|
|
| from gitdb.test.lib import ( |
| TestBase, |
| DummyStream, |
| make_bytes, |
| make_object, |
| fixture_path |
| ) |
|
|
| from gitdb import ( |
| DecompressMemMapReader, |
| FDCompressedSha1Writer, |
| LooseObjectDB, |
| Sha1Writer, |
| MemoryDB, |
| IStream, |
| ) |
| from gitdb.util import hex_to_bin |
|
|
| import zlib |
| from gitdb.typ import ( |
| str_blob_type |
| ) |
|
|
| import tempfile |
| import os |
| from io import BytesIO |
|
|
|
|
| class TestStream(TestBase): |
|
|
| """Test stream classes""" |
|
|
| data_sizes = (15, 10000, 1000 * 1024 + 512) |
|
|
| def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): |
| """Make stream tests - the orig_stream is seekable, allowing it to be |
| rewound and reused |
| :param cdata: the data we expect to read from stream, the contents |
| :param rewind_stream: function called to rewind the stream to make it ready |
| for reuse""" |
| ns = 10 |
| assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata)) |
|
|
| |
| ss = len(cdata) // ns |
| for i in range(ns): |
| data = stream.read(ss) |
| chunk = cdata[i * ss:(i + 1) * ss] |
| assert data == chunk |
| |
| rest = stream.read() |
| if rest: |
| assert rest == cdata[-len(rest):] |
| |
|
|
| if isinstance(stream, DecompressMemMapReader): |
| assert len(stream.data()) == stream.compressed_bytes_read() |
| |
|
|
| rewind_stream(stream) |
|
|
| |
| rdata = stream.read() |
| assert rdata == cdata |
|
|
| if isinstance(stream, DecompressMemMapReader): |
| assert len(stream.data()) == stream.compressed_bytes_read() |
| |
|
|
| def test_decompress_reader(self): |
| for close_on_deletion in range(2): |
| for with_size in range(2): |
| for ds in self.data_sizes: |
| cdata = make_bytes(ds, randomize=False) |
|
|
| |
| |
|
|
| |
| if with_size: |
| |
| zdata = zlib.compress(make_object(str_blob_type, cdata)) |
| typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) |
| assert size == len(cdata) |
| assert typ == str_blob_type |
|
|
| |
| test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) |
| assert test_reader._s == len(cdata) |
| else: |
| |
| zdata = zlib.compress(cdata) |
| reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) |
| assert reader._s == len(cdata) |
| |
|
|
| self._assert_stream_reader(reader, cdata, lambda r: r.seek(0)) |
|
|
| |
| dummy = DummyStream() |
| reader._m = dummy |
|
|
| assert not dummy.closed |
| del(reader) |
| assert dummy.closed == close_on_deletion |
| |
| |
| |
|
|
| def test_sha_writer(self): |
| writer = Sha1Writer() |
| assert 2 == writer.write(b"hi") |
| assert len(writer.sha(as_hex=1)) == 40 |
| assert len(writer.sha(as_hex=0)) == 20 |
|
|
| |
| prev_sha = writer.sha() |
| writer.write(b"hi again") |
| assert writer.sha() != prev_sha |
|
|
| def test_compressed_writer(self): |
| for ds in self.data_sizes: |
| fd, path = tempfile.mkstemp() |
| ostream = FDCompressedSha1Writer(fd) |
| data = make_bytes(ds, randomize=False) |
|
|
| |
| assert len(data) == ostream.write(data) |
| ostream.close() |
|
|
| |
| self.assertRaises(OSError, os.close, fd) |
|
|
| |
| fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0)) |
| written_data = os.read(fd, os.path.getsize(path)) |
| assert len(written_data) == os.path.getsize(path) |
| os.close(fd) |
| assert written_data == zlib.compress(data, 1) |
|
|
| os.remove(path) |
| |
|
|
| def test_decompress_reader_special_case(self): |
| odb = LooseObjectDB(fixture_path('objects')) |
| mdb = MemoryDB() |
| for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911', |
| b'7bb839852ed5e3a069966281bb08d50012fb309b',): |
| ostream = odb.stream(hex_to_bin(sha)) |
|
|
| |
| data = ostream.read() |
| assert len(data) == ostream.size |
|
|
| |
| dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data))) |
| assert dump.hexsha == sha |
| |
|
|