| | |
| | |
| | |
| | |
| |
|
| | from io import BytesIO |
| |
|
| | import mmap |
| | import os |
| | import sys |
| | import zlib |
| |
|
| | from gitdb.fun import ( |
| | msb_size, |
| | stream_copy, |
| | apply_delta_data, |
| | connect_deltas, |
| | delta_types |
| | ) |
| |
|
| | from gitdb.util import ( |
| | allocate_memory, |
| | LazyMixin, |
| | make_sha, |
| | write, |
| | close, |
| | ) |
| |
|
| | from gitdb.const import NULL_BYTE, BYTE_SPACE |
| | from gitdb.utils.encoding import force_bytes |
| |
|
| | has_perf_mod = False |
| | try: |
| | from gitdb_speedups._perf import apply_delta as c_apply_delta |
| | has_perf_mod = True |
| | except ImportError: |
| | pass |
| |
|
| | __all__ = ('DecompressMemMapReader', 'FDCompressedSha1Writer', 'DeltaApplyReader', |
| | 'Sha1Writer', 'FlexibleSha1Writer', 'ZippedStoreShaWriter', 'FDCompressedSha1Writer', |
| | 'FDStream', 'NullStream') |
| |
|
| |
|
| | |
| |
|
| | class DecompressMemMapReader(LazyMixin): |
| |
|
| | """Reads data in chunks from a memory map and decompresses it. The client sees |
| | only the uncompressed data, respective file-like read calls are handling on-demand |
| | buffered decompression accordingly |
| | |
| | A constraint on the total size of bytes is activated, simulating |
| | a logical file within a possibly larger physical memory area |
| | |
| | To read efficiently, you clearly don't want to read individual bytes, instead, |
| | read a few kilobytes at least. |
| | |
| | **Note:** The chunk-size should be carefully selected as it will involve quite a bit |
| | of string copying due to the way the zlib is implemented. Its very wasteful, |
| | hence we try to find a good tradeoff between allocation time and number of |
| | times we actually allocate. An own zlib implementation would be good here |
| | to better support streamed reading - it would only need to keep the mmap |
| | and decompress it into chunks, that's all ... """ |
| | __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close', |
| | '_cbr', '_phi') |
| |
|
| | max_read_size = 512 * 1024 |
| |
|
| | def __init__(self, m, close_on_deletion, size=None): |
| | """Initialize with mmap for stream reading |
| | :param m: must be content data - use new if you have object data and no size""" |
| | self._m = m |
| | self._zip = zlib.decompressobj() |
| | self._buf = None |
| | self._buflen = 0 |
| | if size is not None: |
| | self._s = size |
| | self._br = 0 |
| | self._cws = 0 |
| | self._cwe = 0 |
| | self._cbr = 0 |
| | self._phi = False |
| | self._close = close_on_deletion |
| |
|
| | def _set_cache_(self, attr): |
| | assert attr == '_s' |
| | |
| | |
| | self._parse_header_info() |
| |
|
| | def __del__(self): |
| | self.close() |
| |
|
| | def _parse_header_info(self): |
| | """If this stream contains object data, parse the header info and skip the |
| | stream to a point where each read will yield object content |
| | |
| | :return: parsed type_string, size""" |
| | |
| | |
| | |
| | maxb = 8192 |
| | self._s = maxb |
| | hdr = self.read(maxb) |
| | hdrend = hdr.find(NULL_BYTE) |
| | typ, size = hdr[:hdrend].split(BYTE_SPACE) |
| | size = int(size) |
| | self._s = size |
| |
|
| | |
| | |
| | self._br = 0 |
| | hdrend += 1 |
| | self._buf = BytesIO(hdr[hdrend:]) |
| | self._buflen = len(hdr) - hdrend |
| |
|
| | self._phi = True |
| |
|
| | return typ, size |
| |
|
| | |
| |
|
| | @classmethod |
| | def new(self, m, close_on_deletion=False): |
| | """Create a new DecompressMemMapReader instance for acting as a read-only stream |
| | This method parses the object header from m and returns the parsed |
| | type and size, as well as the created stream instance. |
| | |
| | :param m: memory map on which to operate. It must be object data ( header + contents ) |
| | :param close_on_deletion: if True, the memory map will be closed once we are |
| | being deleted""" |
| | inst = DecompressMemMapReader(m, close_on_deletion, 0) |
| | typ, size = inst._parse_header_info() |
| | return typ, size, inst |
| |
|
| | def data(self): |
| | """:return: random access compatible data we are working on""" |
| | return self._m |
| |
|
| | def close(self): |
| | """Close our underlying stream of compressed bytes if this was allowed during initialization |
| | :return: True if we closed the underlying stream |
| | :note: can be called safely |
| | """ |
| | if self._close: |
| | if hasattr(self._m, 'close'): |
| | self._m.close() |
| | self._close = False |
| | |
| |
|
| | def compressed_bytes_read(self): |
| | """ |
| | :return: number of compressed bytes read. This includes the bytes it |
| | took to decompress the header ( if there was one )""" |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | if self._br == self._s and not self._zip.unused_data: |
| | |
| | |
| | self._br = 0 |
| | if hasattr(self._zip, 'status'): |
| | while self._zip.status == zlib.Z_OK: |
| | self.read(mmap.PAGESIZE) |
| | |
| | else: |
| | |
| | while not self._zip.unused_data and self._cbr != len(self._m): |
| | self.read(mmap.PAGESIZE) |
| | |
| | |
| |
|
| | |
| | self._br = self._s |
| | |
| |
|
| | |
| | |
| | return self._cbr |
| |
|
| | |
| |
|
| | def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)): |
| | """Allows to reset the stream to restart reading |
| | :raise ValueError: If offset and whence are not 0""" |
| | if offset != 0 or whence != getattr(os, 'SEEK_SET', 0): |
| | raise ValueError("Can only seek to position 0") |
| | |
| |
|
| | self._zip = zlib.decompressobj() |
| | self._br = self._cws = self._cwe = self._cbr = 0 |
| | if self._phi: |
| | self._phi = False |
| | del(self._s) |
| | |
| |
|
| | def read(self, size=-1): |
| | if size < 1: |
| | size = self._s - self._br |
| | else: |
| | size = min(size, self._s - self._br) |
| | |
| |
|
| | if size == 0: |
| | return b'' |
| | |
| |
|
| | |
| | |
| | |
| | dat = b'' |
| | if self._buf: |
| | if self._buflen >= size: |
| | |
| | dat = self._buf.read(size) |
| | self._buflen -= size |
| | self._br += size |
| | return dat |
| | else: |
| | dat = self._buf.read() |
| | size -= self._buflen |
| | self._br += self._buflen |
| |
|
| | self._buflen = 0 |
| | self._buf = None |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | tail = self._zip.unconsumed_tail |
| | if tail: |
| | |
| | |
| | |
| | |
| | |
| | |
| | self._cws = self._cwe - len(tail) |
| | self._cwe = self._cws + size |
| | else: |
| | cws = self._cws |
| | self._cws = self._cwe |
| | self._cwe = cws + size |
| | |
| |
|
| | |
| | if self._cwe - self._cws < 8: |
| | self._cwe = self._cws + 8 |
| | |
| |
|
| | |
| | indata = self._m[self._cws:self._cwe] |
| |
|
| | |
| | self._cwe = self._cws + len(indata) |
| | dcompdat = self._zip.decompress(indata, size) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if getattr(zlib, 'ZLIB_RUNTIME_VERSION', zlib.ZLIB_VERSION) in ('1.2.7', '1.2.5') and not sys.platform == 'darwin': |
| | unused_datalen = len(self._zip.unconsumed_tail) |
| | else: |
| | unused_datalen = len(self._zip.unconsumed_tail) + len(self._zip.unused_data) |
| | |
| |
|
| | self._cbr += len(indata) - unused_datalen |
| | self._br += len(dcompdat) |
| |
|
| | if dat: |
| | dcompdat = dat + dcompdat |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | if dcompdat and (len(dcompdat) - len(dat)) < size and self._br < self._s: |
| | dcompdat += self.read(size - len(dcompdat)) |
| | |
| | return dcompdat |
| |
|
| |
|
| | class DeltaApplyReader(LazyMixin): |
| |
|
| | """A reader which dynamically applies pack deltas to a base object, keeping the |
| | memory demands to a minimum. |
| | |
| | The size of the final object is only obtainable once all deltas have been |
| | applied, unless it is retrieved from a pack index. |
| | |
| | The uncompressed Delta has the following layout (MSB being a most significant |
| | bit encoded dynamic size): |
| | |
| | * MSB Source Size - the size of the base against which the delta was created |
| | * MSB Target Size - the size of the resulting data after the delta was applied |
| | * A list of one byte commands (cmd) which are followed by a specific protocol: |
| | |
| | * cmd & 0x80 - copy delta_data[offset:offset+size] |
| | |
| | * Followed by an encoded offset into the delta data |
| | * Followed by an encoded size of the chunk to copy |
| | |
| | * cmd & 0x7f - insert |
| | |
| | * insert cmd bytes from the delta buffer into the output stream |
| | |
| | * cmd == 0 - invalid operation ( or error in delta stream ) |
| | """ |
| | __slots__ = ( |
| | "_bstream", |
| | "_dstreams", |
| | "_mm_target", |
| | "_size", |
| | "_br" |
| | ) |
| |
|
| | |
| | k_max_memory_move = 250 * 1000 * 1000 |
| | |
| |
|
| | def __init__(self, stream_list): |
| | """Initialize this instance with a list of streams, the first stream being |
| | the delta to apply on top of all following deltas, the last stream being the |
| | base object onto which to apply the deltas""" |
| | assert len(stream_list) > 1, "Need at least one delta and one base stream" |
| |
|
| | self._bstream = stream_list[-1] |
| | self._dstreams = tuple(stream_list[:-1]) |
| | self._br = 0 |
| |
|
| | def _set_cache_too_slow_without_c(self, attr): |
| | |
| | |
| | |
| | |
| | |
| | if len(self._dstreams) == 1: |
| | return self._set_cache_brute_(attr) |
| |
|
| | |
| | |
| | |
| | dcl = connect_deltas(self._dstreams) |
| |
|
| | |
| | |
| | if dcl.rbound() == 0: |
| | self._size = 0 |
| | self._mm_target = allocate_memory(0) |
| | return |
| | |
| |
|
| | self._size = dcl.rbound() |
| | self._mm_target = allocate_memory(self._size) |
| |
|
| | bbuf = allocate_memory(self._bstream.size) |
| | stream_copy(self._bstream.read, bbuf.write, self._bstream.size, 256 * mmap.PAGESIZE) |
| |
|
| | |
| | write = self._mm_target.write |
| | dcl.apply(bbuf, write) |
| |
|
| | self._mm_target.seek(0) |
| |
|
| | def _set_cache_brute_(self, attr): |
| | """If we are here, we apply the actual deltas""" |
| | |
| | |
| | |
| | buffer_info_list = list() |
| | max_target_size = 0 |
| | for dstream in self._dstreams: |
| | buf = dstream.read(512) |
| | offset, src_size = msb_size(buf) |
| | offset, target_size = msb_size(buf, offset) |
| | buffer_info_list.append((buf[offset:], offset, src_size, target_size)) |
| | max_target_size = max(max_target_size, target_size) |
| | |
| |
|
| | |
| | |
| | base_size = self._bstream.size |
| | target_size = max_target_size |
| |
|
| | |
| | |
| | if len(self._dstreams) > 1: |
| | base_size = target_size = max(base_size, max_target_size) |
| | |
| |
|
| | |
| | |
| | bbuf = allocate_memory(base_size) |
| | stream_copy(self._bstream.read, bbuf.write, base_size, 256 * mmap.PAGESIZE) |
| |
|
| | |
| | |
| | |
| | |
| | tbuf = allocate_memory(target_size) |
| |
|
| | |
| | |
| | |
| | |
| | final_target_size = None |
| | for (dbuf, offset, src_size, target_size), dstream in zip(reversed(buffer_info_list), reversed(self._dstreams)): |
| | |
| | |
| | |
| | |
| | |
| | ddata = allocate_memory(dstream.size - offset) |
| | ddata.write(dbuf) |
| | |
| | stream_copy(dstream.read, ddata.write, dstream.size, 256 * mmap.PAGESIZE) |
| |
|
| | |
| | if 'c_apply_delta' in globals(): |
| | c_apply_delta(bbuf, ddata, tbuf) |
| | else: |
| | apply_delta_data(bbuf, src_size, ddata, len(ddata), tbuf.write) |
| | |
| |
|
| | |
| | |
| | bbuf, tbuf = tbuf, bbuf |
| | bbuf.seek(0) |
| | tbuf.seek(0) |
| | final_target_size = target_size |
| | |
| |
|
| | |
| | |
| | |
| | self._mm_target = bbuf |
| | self._size = final_target_size |
| |
|
| | |
| | if not has_perf_mod: |
| | _set_cache_ = _set_cache_brute_ |
| | else: |
| | _set_cache_ = _set_cache_too_slow_without_c |
| |
|
| | |
| |
|
| | def read(self, count=0): |
| | bl = self._size - self._br |
| | if count < 1 or count > bl: |
| | count = bl |
| | |
| | |
| | data = self._mm_target.read(count) |
| | self._br += len(data) |
| | return data |
| |
|
| | def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)): |
| | """Allows to reset the stream to restart reading |
| | |
| | :raise ValueError: If offset and whence are not 0""" |
| | if offset != 0 or whence != getattr(os, 'SEEK_SET', 0): |
| | raise ValueError("Can only seek to position 0") |
| | |
| | self._br = 0 |
| | self._mm_target.seek(0) |
| |
|
| | |
| |
|
| | @classmethod |
| | def new(cls, stream_list): |
| | """ |
| | Convert the given list of streams into a stream which resolves deltas |
| | when reading from it. |
| | |
| | :param stream_list: two or more stream objects, first stream is a Delta |
| | to the object that you want to resolve, followed by N additional delta |
| | streams. The list's last stream must be a non-delta stream. |
| | |
| | :return: Non-Delta OPackStream object whose stream can be used to obtain |
| | the decompressed resolved data |
| | :raise ValueError: if the stream list cannot be handled""" |
| | if len(stream_list) < 2: |
| | raise ValueError("Need at least two streams") |
| | |
| |
|
| | if stream_list[-1].type_id in delta_types: |
| | raise ValueError( |
| | "Cannot resolve deltas if there is no base object stream, last one was type: %s" % stream_list[-1].type) |
| | |
| | return cls(stream_list) |
| |
|
| | |
| |
|
| | |
| |
|
| | @property |
| | def type(self): |
| | return self._bstream.type |
| |
|
| | @property |
| | def type_id(self): |
| | return self._bstream.type_id |
| |
|
| | @property |
| | def size(self): |
| | """:return: number of uncompressed bytes in the stream""" |
| | return self._size |
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| | class Sha1Writer: |
| |
|
| | """Simple stream writer which produces a sha whenever you like as it degests |
| | everything it is supposed to write""" |
| | __slots__ = "sha1" |
| |
|
| | def __init__(self): |
| | self.sha1 = make_sha() |
| |
|
| | |
| |
|
| | def write(self, data): |
| | """:raise IOError: If not all bytes could be written |
| | :param data: byte object |
| | :return: length of incoming data""" |
| |
|
| | self.sha1.update(data) |
| |
|
| | return len(data) |
| |
|
| | |
| |
|
| | |
| |
|
| | def sha(self, as_hex=False): |
| | """:return: sha so far |
| | :param as_hex: if True, sha will be hex-encoded, binary otherwise""" |
| | if as_hex: |
| | return self.sha1.hexdigest() |
| | return self.sha1.digest() |
| |
|
| | |
| |
|
| |
|
| | class FlexibleSha1Writer(Sha1Writer): |
| |
|
| | """Writer producing a sha1 while passing on the written bytes to the given |
| | write function""" |
| | __slots__ = 'writer' |
| |
|
| | def __init__(self, writer): |
| | Sha1Writer.__init__(self) |
| | self.writer = writer |
| |
|
| | def write(self, data): |
| | Sha1Writer.write(self, data) |
| | self.writer(data) |
| |
|
| |
|
| | class ZippedStoreShaWriter(Sha1Writer): |
| |
|
| | """Remembers everything someone writes to it and generates a sha""" |
| | __slots__ = ('buf', 'zip') |
| |
|
| | def __init__(self): |
| | Sha1Writer.__init__(self) |
| | self.buf = BytesIO() |
| | self.zip = zlib.compressobj(zlib.Z_BEST_SPEED) |
| |
|
| | def __getattr__(self, attr): |
| | return getattr(self.buf, attr) |
| |
|
| | def write(self, data): |
| | alen = Sha1Writer.write(self, data) |
| | self.buf.write(self.zip.compress(data)) |
| |
|
| | return alen |
| |
|
| | def close(self): |
| | self.buf.write(self.zip.flush()) |
| |
|
| | def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)): |
| | """Seeking currently only supports to rewind written data |
| | Multiple writes are not supported""" |
| | if offset != 0 or whence != getattr(os, 'SEEK_SET', 0): |
| | raise ValueError("Can only seek to position 0") |
| | |
| | self.buf.seek(0) |
| |
|
| | def getvalue(self): |
| | """:return: string value from the current stream position to the end""" |
| | return self.buf.getvalue() |
| |
|
| |
|
| | class FDCompressedSha1Writer(Sha1Writer): |
| |
|
| | """Digests data written to it, making the sha available, then compress the |
| | data and write it to the file descriptor |
| | |
| | **Note:** operates on raw file descriptors |
| | **Note:** for this to work, you have to use the close-method of this instance""" |
| | __slots__ = ("fd", "sha1", "zip") |
| |
|
| | |
| | exc = IOError("Failed to write all bytes to filedescriptor") |
| |
|
| | def __init__(self, fd): |
| | super().__init__() |
| | self.fd = fd |
| | self.zip = zlib.compressobj(zlib.Z_BEST_SPEED) |
| |
|
| | |
| |
|
| | def write(self, data): |
| | """:raise IOError: If not all bytes could be written |
| | :return: length of incoming data""" |
| | self.sha1.update(data) |
| | cdata = self.zip.compress(data) |
| | bytes_written = write(self.fd, cdata) |
| |
|
| | if bytes_written != len(cdata): |
| | raise self.exc |
| |
|
| | return len(data) |
| |
|
| | def close(self): |
| | remainder = self.zip.flush() |
| | if write(self.fd, remainder) != len(remainder): |
| | raise self.exc |
| | return close(self.fd) |
| |
|
| | |
| |
|
| |
|
| | class FDStream: |
| |
|
| | """A simple wrapper providing the most basic functions on a file descriptor |
| | with the fileobject interface. Cannot use os.fdopen as the resulting stream |
| | takes ownership""" |
| | __slots__ = ("_fd", '_pos') |
| |
|
| | def __init__(self, fd): |
| | self._fd = fd |
| | self._pos = 0 |
| |
|
| | def write(self, data): |
| | self._pos += len(data) |
| | os.write(self._fd, data) |
| |
|
| | def read(self, count=0): |
| | if count == 0: |
| | count = os.path.getsize(self._filepath) |
| | |
| |
|
| | bytes = os.read(self._fd, count) |
| | self._pos += len(bytes) |
| | return bytes |
| |
|
| | def fileno(self): |
| | return self._fd |
| |
|
| | def tell(self): |
| | return self._pos |
| |
|
| | def close(self): |
| | close(self._fd) |
| |
|
| |
|
| | class NullStream: |
| |
|
| | """A stream that does nothing but providing a stream interface. |
| | Use it like /dev/null""" |
| | __slots__ = tuple() |
| |
|
| | def read(self, size=0): |
| | return '' |
| |
|
| | def close(self): |
| | pass |
| |
|
| | def write(self, data): |
| | return len(data) |
| |
|
| |
|
| | |
| |
|