diff -r 02fa567f8a3c -r 3e47d1ec9da5 mercurial/util.py --- a/mercurial/util.py Sat Mar 30 13:13:10 2019 -0700 +++ b/mercurial/util.py Wed Mar 27 16:45:14 2019 +0100 @@ -16,7 +16,6 @@ from __future__ import absolute_import, print_function import abc -import bz2 import collections import contextlib import errno @@ -34,7 +33,6 @@ import time import traceback import warnings -import zlib from .thirdparty import ( attr, @@ -50,6 +48,7 @@ urllibcompat, ) from .utils import ( + compression, procutil, stringutil, ) @@ -127,6 +126,11 @@ unlink = platform.unlink username = platform.username +# small compat layer +compengines = compression.compengines +SERVERROLE = compression.SERVERROLE +CLIENTROLE = compression.CLIENTROLE + try: recvfds = osutil.recvfds except AttributeError: @@ -3206,714 +3210,6 @@ yield path[:pos] pos = path.rfind('/', 0, pos) -# compression code - -SERVERROLE = 'server' -CLIENTROLE = 'client' - -compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport', - (r'name', r'serverpriority', - r'clientpriority')) - -class compressormanager(object): - """Holds registrations of various compression engines. - - This class essentially abstracts the differences between compression - engines to allow new compression formats to be added easily, possibly from - extensions. - - Compressors are registered against the global instance by calling its - ``register()`` method. - """ - def __init__(self): - self._engines = {} - # Bundle spec human name to engine name. - self._bundlenames = {} - # Internal bundle identifier to engine name. - self._bundletypes = {} - # Revlog header to engine name. - self._revlogheaders = {} - # Wire proto identifier to engine name. - self._wiretypes = {} - - def __getitem__(self, key): - return self._engines[key] - - def __contains__(self, key): - return key in self._engines - - def __iter__(self): - return iter(self._engines.keys()) - - def register(self, engine): - """Register a compression engine with the manager. - - The argument must be a ``compressionengine`` instance. - """ - if not isinstance(engine, compressionengine): - raise ValueError(_('argument must be a compressionengine')) - - name = engine.name() - - if name in self._engines: - raise error.Abort(_('compression engine %s already registered') % - name) - - bundleinfo = engine.bundletype() - if bundleinfo: - bundlename, bundletype = bundleinfo - - if bundlename in self._bundlenames: - raise error.Abort(_('bundle name %s already registered') % - bundlename) - if bundletype in self._bundletypes: - raise error.Abort(_('bundle type %s already registered by %s') % - (bundletype, self._bundletypes[bundletype])) - - # No external facing name declared. - if bundlename: - self._bundlenames[bundlename] = name - - self._bundletypes[bundletype] = name - - wiresupport = engine.wireprotosupport() - if wiresupport: - wiretype = wiresupport.name - if wiretype in self._wiretypes: - raise error.Abort(_('wire protocol compression %s already ' - 'registered by %s') % - (wiretype, self._wiretypes[wiretype])) - - self._wiretypes[wiretype] = name - - revlogheader = engine.revlogheader() - if revlogheader and revlogheader in self._revlogheaders: - raise error.Abort(_('revlog header %s already registered by %s') % - (revlogheader, self._revlogheaders[revlogheader])) - - if revlogheader: - self._revlogheaders[revlogheader] = name - - self._engines[name] = engine - - @property - def supportedbundlenames(self): - return set(self._bundlenames.keys()) - - @property - def supportedbundletypes(self): - return set(self._bundletypes.keys()) - - def forbundlename(self, bundlename): - """Obtain a compression engine registered to a bundle name. - - Will raise KeyError if the bundle type isn't registered. - - Will abort if the engine is known but not available. - """ - engine = self._engines[self._bundlenames[bundlename]] - if not engine.available(): - raise error.Abort(_('compression engine %s could not be loaded') % - engine.name()) - return engine - - def forbundletype(self, bundletype): - """Obtain a compression engine registered to a bundle type. - - Will raise KeyError if the bundle type isn't registered. - - Will abort if the engine is known but not available. - """ - engine = self._engines[self._bundletypes[bundletype]] - if not engine.available(): - raise error.Abort(_('compression engine %s could not be loaded') % - engine.name()) - return engine - - def supportedwireengines(self, role, onlyavailable=True): - """Obtain compression engines that support the wire protocol. - - Returns a list of engines in prioritized order, most desired first. - - If ``onlyavailable`` is set, filter out engines that can't be - loaded. - """ - assert role in (SERVERROLE, CLIENTROLE) - - attr = 'serverpriority' if role == SERVERROLE else 'clientpriority' - - engines = [self._engines[e] for e in self._wiretypes.values()] - if onlyavailable: - engines = [e for e in engines if e.available()] - - def getkey(e): - # Sort first by priority, highest first. In case of tie, sort - # alphabetically. This is arbitrary, but ensures output is - # stable. - w = e.wireprotosupport() - return -1 * getattr(w, attr), w.name - - return list(sorted(engines, key=getkey)) - - def forwiretype(self, wiretype): - engine = self._engines[self._wiretypes[wiretype]] - if not engine.available(): - raise error.Abort(_('compression engine %s could not be loaded') % - engine.name()) - return engine - - def forrevlogheader(self, header): - """Obtain a compression engine registered to a revlog header. - - Will raise KeyError if the revlog header value isn't registered. - """ - return self._engines[self._revlogheaders[header]] - -compengines = compressormanager() - -class compressionengine(object): - """Base class for compression engines. - - Compression engines must implement the interface defined by this class. - """ - def name(self): - """Returns the name of the compression engine. - - This is the key the engine is registered under. - - This method must be implemented. - """ - raise NotImplementedError() - - def available(self): - """Whether the compression engine is available. - - The intent of this method is to allow optional compression engines - that may not be available in all installations (such as engines relying - on C extensions that may not be present). - """ - return True - - def bundletype(self): - """Describes bundle identifiers for this engine. - - If this compression engine isn't supported for bundles, returns None. - - If this engine can be used for bundles, returns a 2-tuple of strings of - the user-facing "bundle spec" compression name and an internal - identifier used to denote the compression format within bundles. To - exclude the name from external usage, set the first element to ``None``. - - If bundle compression is supported, the class must also implement - ``compressstream`` and `decompressorreader``. - - The docstring of this method is used in the help system to tell users - about this engine. - """ - return None - - def wireprotosupport(self): - """Declare support for this compression format on the wire protocol. - - If this compression engine isn't supported for compressing wire - protocol payloads, returns None. - - Otherwise, returns ``compenginewireprotosupport`` with the following - fields: - - * String format identifier - * Integer priority for the server - * Integer priority for the client - - The integer priorities are used to order the advertisement of format - support by server and client. The highest integer is advertised - first. Integers with non-positive values aren't advertised. - - The priority values are somewhat arbitrary and only used for default - ordering. The relative order can be changed via config options. - - If wire protocol compression is supported, the class must also implement - ``compressstream`` and ``decompressorreader``. - """ - return None - - def revlogheader(self): - """Header added to revlog chunks that identifies this engine. - - If this engine can be used to compress revlogs, this method should - return the bytes used to identify chunks compressed with this engine. - Else, the method should return ``None`` to indicate it does not - participate in revlog compression. - """ - return None - - def compressstream(self, it, opts=None): - """Compress an iterator of chunks. - - The method receives an iterator (ideally a generator) of chunks of - bytes to be compressed. It returns an iterator (ideally a generator) - of bytes of chunks representing the compressed output. - - Optionally accepts an argument defining how to perform compression. - Each engine treats this argument differently. - """ - raise NotImplementedError() - - def decompressorreader(self, fh): - """Perform decompression on a file object. - - Argument is an object with a ``read(size)`` method that returns - compressed data. Return value is an object with a ``read(size)`` that - returns uncompressed data. - """ - raise NotImplementedError() - - def revlogcompressor(self, opts=None): - """Obtain an object that can be used to compress revlog entries. - - The object has a ``compress(data)`` method that compresses binary - data. This method returns compressed binary data or ``None`` if - the data could not be compressed (too small, not compressible, etc). - The returned data should have a header uniquely identifying this - compression format so decompression can be routed to this engine. - This header should be identified by the ``revlogheader()`` return - value. - - The object has a ``decompress(data)`` method that decompresses - data. The method will only be called if ``data`` begins with - ``revlogheader()``. The method should return the raw, uncompressed - data or raise a ``StorageError``. - - The object is reusable but is not thread safe. - """ - raise NotImplementedError() - -class _CompressedStreamReader(object): - def __init__(self, fh): - if safehasattr(fh, 'unbufferedread'): - self._reader = fh.unbufferedread - else: - self._reader = fh.read - self._pending = [] - self._pos = 0 - self._eof = False - - def _decompress(self, chunk): - raise NotImplementedError() - - def read(self, l): - buf = [] - while True: - while self._pending: - if len(self._pending[0]) > l + self._pos: - newbuf = self._pending[0] - buf.append(newbuf[self._pos:self._pos + l]) - self._pos += l - return ''.join(buf) - - newbuf = self._pending.pop(0) - if self._pos: - buf.append(newbuf[self._pos:]) - l -= len(newbuf) - self._pos - else: - buf.append(newbuf) - l -= len(newbuf) - self._pos = 0 - - if self._eof: - return ''.join(buf) - chunk = self._reader(65536) - self._decompress(chunk) - if not chunk and not self._pending and not self._eof: - # No progress and no new data, bail out - return ''.join(buf) - -class _GzipCompressedStreamReader(_CompressedStreamReader): - def __init__(self, fh): - super(_GzipCompressedStreamReader, self).__init__(fh) - self._decompobj = zlib.decompressobj() - def _decompress(self, chunk): - newbuf = self._decompobj.decompress(chunk) - if newbuf: - self._pending.append(newbuf) - d = self._decompobj.copy() - try: - d.decompress('x') - d.flush() - if d.unused_data == 'x': - self._eof = True - except zlib.error: - pass - -class _BZ2CompressedStreamReader(_CompressedStreamReader): - def __init__(self, fh): - super(_BZ2CompressedStreamReader, self).__init__(fh) - self._decompobj = bz2.BZ2Decompressor() - def _decompress(self, chunk): - newbuf = self._decompobj.decompress(chunk) - if newbuf: - self._pending.append(newbuf) - try: - while True: - newbuf = self._decompobj.decompress('') - if newbuf: - self._pending.append(newbuf) - else: - break - except EOFError: - self._eof = True - -class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader): - def __init__(self, fh): - super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh) - newbuf = self._decompobj.decompress('BZ') - if newbuf: - self._pending.append(newbuf) - -class _ZstdCompressedStreamReader(_CompressedStreamReader): - def __init__(self, fh, zstd): - super(_ZstdCompressedStreamReader, self).__init__(fh) - self._zstd = zstd - self._decompobj = zstd.ZstdDecompressor().decompressobj() - def _decompress(self, chunk): - newbuf = self._decompobj.decompress(chunk) - if newbuf: - self._pending.append(newbuf) - try: - while True: - newbuf = self._decompobj.decompress('') - if newbuf: - self._pending.append(newbuf) - else: - break - except self._zstd.ZstdError: - self._eof = True - -class _zlibengine(compressionengine): - def name(self): - return 'zlib' - - def bundletype(self): - """zlib compression using the DEFLATE algorithm. - - All Mercurial clients should support this format. The compression - algorithm strikes a reasonable balance between compression ratio - and size. - """ - return 'gzip', 'GZ' - - def wireprotosupport(self): - return compewireprotosupport('zlib', 20, 20) - - def revlogheader(self): - return 'x' - - def compressstream(self, it, opts=None): - opts = opts or {} - - z = zlib.compressobj(opts.get('level', -1)) - for chunk in it: - data = z.compress(chunk) - # Not all calls to compress emit data. It is cheaper to inspect - # here than to feed empty chunks through generator. - if data: - yield data - - yield z.flush() - - def decompressorreader(self, fh): - return _GzipCompressedStreamReader(fh) - - class zlibrevlogcompressor(object): - def compress(self, data): - insize = len(data) - # Caller handles empty input case. - assert insize > 0 - - if insize < 44: - return None - - elif insize <= 1000000: - compressed = zlib.compress(data) - if len(compressed) < insize: - return compressed - return None - - # zlib makes an internal copy of the input buffer, doubling - # memory usage for large inputs. So do streaming compression - # on large inputs. - else: - z = zlib.compressobj() - parts = [] - pos = 0 - while pos < insize: - pos2 = pos + 2**20 - parts.append(z.compress(data[pos:pos2])) - pos = pos2 - parts.append(z.flush()) - - if sum(map(len, parts)) < insize: - return ''.join(parts) - return None - - def decompress(self, data): - try: - return zlib.decompress(data) - except zlib.error as e: - raise error.StorageError(_('revlog decompress error: %s') % - stringutil.forcebytestr(e)) - - def revlogcompressor(self, opts=None): - return self.zlibrevlogcompressor() - -compengines.register(_zlibengine()) - -class _bz2engine(compressionengine): - def name(self): - return 'bz2' - - def bundletype(self): - """An algorithm that produces smaller bundles than ``gzip``. - - All Mercurial clients should support this format. - - This engine will likely produce smaller bundles than ``gzip`` but - will be significantly slower, both during compression and - decompression. - - If available, the ``zstd`` engine can yield similar or better - compression at much higher speeds. - """ - return 'bzip2', 'BZ' - - # We declare a protocol name but don't advertise by default because - # it is slow. - def wireprotosupport(self): - return compewireprotosupport('bzip2', 0, 0) - - def compressstream(self, it, opts=None): - opts = opts or {} - z = bz2.BZ2Compressor(opts.get('level', 9)) - for chunk in it: - data = z.compress(chunk) - if data: - yield data - - yield z.flush() - - def decompressorreader(self, fh): - return _BZ2CompressedStreamReader(fh) - -compengines.register(_bz2engine()) - -class _truncatedbz2engine(compressionengine): - def name(self): - return 'bz2truncated' - - def bundletype(self): - return None, '_truncatedBZ' - - # We don't implement compressstream because it is hackily handled elsewhere. - - def decompressorreader(self, fh): - return _TruncatedBZ2CompressedStreamReader(fh) - -compengines.register(_truncatedbz2engine()) - -class _noopengine(compressionengine): - def name(self): - return 'none' - - def bundletype(self): - """No compression is performed. - - Use this compression engine to explicitly disable compression. - """ - return 'none', 'UN' - - # Clients always support uncompressed payloads. Servers don't because - # unless you are on a fast network, uncompressed payloads can easily - # saturate your network pipe. - def wireprotosupport(self): - return compewireprotosupport('none', 0, 10) - - # We don't implement revlogheader because it is handled specially - # in the revlog class. - - def compressstream(self, it, opts=None): - return it - - def decompressorreader(self, fh): - return fh - - class nooprevlogcompressor(object): - def compress(self, data): - return None - - def revlogcompressor(self, opts=None): - return self.nooprevlogcompressor() - -compengines.register(_noopengine()) - -class _zstdengine(compressionengine): - def name(self): - return 'zstd' - - @propertycache - def _module(self): - # Not all installs have the zstd module available. So defer importing - # until first access. - try: - from . import zstd - # Force delayed import. - zstd.__version__ - return zstd - except ImportError: - return None - - def available(self): - return bool(self._module) - - def bundletype(self): - """A modern compression algorithm that is fast and highly flexible. - - Only supported by Mercurial 4.1 and newer clients. - - With the default settings, zstd compression is both faster and yields - better compression than ``gzip``. It also frequently yields better - compression than ``bzip2`` while operating at much higher speeds. - - If this engine is available and backwards compatibility is not a - concern, it is likely the best available engine. - """ - return 'zstd', 'ZS' - - def wireprotosupport(self): - return compewireprotosupport('zstd', 50, 50) - - def revlogheader(self): - return '\x28' - - def compressstream(self, it, opts=None): - opts = opts or {} - # zstd level 3 is almost always significantly faster than zlib - # while providing no worse compression. It strikes a good balance - # between speed and compression. - level = opts.get('level', 3) - - zstd = self._module - z = zstd.ZstdCompressor(level=level).compressobj() - for chunk in it: - data = z.compress(chunk) - if data: - yield data - - yield z.flush() - - def decompressorreader(self, fh): - return _ZstdCompressedStreamReader(fh, self._module) - - class zstdrevlogcompressor(object): - def __init__(self, zstd, level=3): - # TODO consider omitting frame magic to save 4 bytes. - # This writes content sizes into the frame header. That is - # extra storage. But it allows a correct size memory allocation - # to hold the result. - self._cctx = zstd.ZstdCompressor(level=level) - self._dctx = zstd.ZstdDecompressor() - self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE - self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE - - def compress(self, data): - insize = len(data) - # Caller handles empty input case. - assert insize > 0 - - if insize < 50: - return None - - elif insize <= 1000000: - compressed = self._cctx.compress(data) - if len(compressed) < insize: - return compressed - return None - else: - z = self._cctx.compressobj() - chunks = [] - pos = 0 - while pos < insize: - pos2 = pos + self._compinsize - chunk = z.compress(data[pos:pos2]) - if chunk: - chunks.append(chunk) - pos = pos2 - chunks.append(z.flush()) - - if sum(map(len, chunks)) < insize: - return ''.join(chunks) - return None - - def decompress(self, data): - insize = len(data) - - try: - # This was measured to be faster than other streaming - # decompressors. - dobj = self._dctx.decompressobj() - chunks = [] - pos = 0 - while pos < insize: - pos2 = pos + self._decompinsize - chunk = dobj.decompress(data[pos:pos2]) - if chunk: - chunks.append(chunk) - pos = pos2 - # Frame should be exhausted, so no finish() API. - - return ''.join(chunks) - except Exception as e: - raise error.StorageError(_('revlog decompress error: %s') % - stringutil.forcebytestr(e)) - - def revlogcompressor(self, opts=None): - opts = opts or {} - return self.zstdrevlogcompressor(self._module, - level=opts.get('level', 3)) - -compengines.register(_zstdengine()) - -def bundlecompressiontopics(): - """Obtains a list of available bundle compressions for use in help.""" - # help.makeitemsdocs() expects a dict of names to items with a .__doc__. - items = {} - - # We need to format the docstring. So use a dummy object/type to hold it - # rather than mutating the original. - class docobject(object): - pass - - for name in compengines: - engine = compengines[name] - - if not engine.available(): - continue - - bt = engine.bundletype() - if not bt or not bt[0]: - continue - - doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype)) - - value = docobject() - value.__doc__ = pycompat.sysstr(doc) - value._origdoc = engine.bundletype.__doc__ - value._origfunc = engine.bundletype - - items[bt[0]] = value - - return items - -i18nfunctions = bundlecompressiontopics().values() # convenient shortcut dst = debugstacktrace