hgext/remotefilelog/basepack.py
changeset 40495 3a333a582d7b
child 40506 10c10da14c5d
equal deleted inserted replaced
40494:9aeb9e2d28a7 40495:3a333a582d7b
       
     1 from __future__ import absolute_import
       
     2 
       
     3 import collections
       
     4 import errno
       
     5 import hashlib
       
     6 import mmap
       
     7 import os
       
     8 import struct
       
     9 import time
       
    10 
       
    11 from mercurial.i18n import _
       
    12 from mercurial import (
       
    13     policy,
       
    14     pycompat,
       
    15     util,
       
    16     vfs as vfsmod,
       
    17 )
       
    18 from . import shallowutil
       
    19 
       
    20 osutil = policy.importmod(r'osutil')
       
    21 
       
    22 # The pack version supported by this implementation. This will need to be
       
    23 # rev'd whenever the byte format changes. Ex: changing the fanout prefix,
       
    24 # changing any of the int sizes, changing the delta algorithm, etc.
       
    25 PACKVERSIONSIZE = 1
       
    26 INDEXVERSIONSIZE = 2
       
    27 
       
    28 FANOUTSTART = INDEXVERSIONSIZE
       
    29 
       
    30 # Constant that indicates a fanout table entry hasn't been filled in. (This does
       
    31 # not get serialized)
       
    32 EMPTYFANOUT = -1
       
    33 
       
    34 # The fanout prefix is the number of bytes that can be addressed by the fanout
       
    35 # table. Example: a fanout prefix of 1 means we use the first byte of a hash to
       
    36 # look in the fanout table (which will be 2^8 entries long).
       
    37 SMALLFANOUTPREFIX = 1
       
    38 LARGEFANOUTPREFIX = 2
       
    39 
       
    40 # The number of entries in the index at which point we switch to a large fanout.
       
    41 # It is chosen to balance the linear scan through a sparse fanout, with the
       
    42 # size of the bisect in actual index.
       
    43 # 2^16 / 8 was chosen because it trades off (1 step fanout scan + 5 step
       
    44 # bisect) with (8 step fanout scan + 1 step bisect)
       
    45 # 5 step bisect = log(2^16 / 8 / 255)  # fanout
       
    46 # 10 step fanout scan = 2^16 / (2^16 / 8)  # fanout space divided by entries
       
    47 SMALLFANOUTCUTOFF = 2**16 / 8
       
    48 
       
    49 # The amount of time to wait between checking for new packs. This prevents an
       
    50 # exception when data is moved to a new pack after the process has already
       
    51 # loaded the pack list.
       
    52 REFRESHRATE = 0.1
       
    53 
       
    54 if pycompat.isposix:
       
    55     # With glibc 2.7+ the 'e' flag uses O_CLOEXEC when opening.
       
    56     # The 'e' flag will be ignored on older versions of glibc.
       
    57     PACKOPENMODE = 'rbe'
       
    58 else:
       
    59     PACKOPENMODE = 'rb'
       
    60 
       
    61 class _cachebackedpacks(object):
       
    62     def __init__(self, packs, cachesize):
       
    63         self._packs = set(packs)
       
    64         self._lrucache = util.lrucachedict(cachesize)
       
    65         self._lastpack = None
       
    66 
       
    67         # Avoid cold start of the cache by populating the most recent packs
       
    68         # in the cache.
       
    69         for i in reversed(range(min(cachesize, len(packs)))):
       
    70             self._movetofront(packs[i])
       
    71 
       
    72     def _movetofront(self, pack):
       
    73         # This effectively makes pack the first entry in the cache.
       
    74         self._lrucache[pack] = True
       
    75 
       
    76     def _registerlastpackusage(self):
       
    77         if self._lastpack is not None:
       
    78             self._movetofront(self._lastpack)
       
    79             self._lastpack = None
       
    80 
       
    81     def add(self, pack):
       
    82         self._registerlastpackusage()
       
    83 
       
    84         # This method will mostly be called when packs are not in cache.
       
    85         # Therefore, adding pack to the cache.
       
    86         self._movetofront(pack)
       
    87         self._packs.add(pack)
       
    88 
       
    89     def __iter__(self):
       
    90         self._registerlastpackusage()
       
    91 
       
    92         # Cache iteration is based on LRU.
       
    93         for pack in self._lrucache:
       
    94             self._lastpack = pack
       
    95             yield pack
       
    96 
       
    97         cachedpacks = set(pack for pack in self._lrucache)
       
    98         # Yield for paths not in the cache.
       
    99         for pack in self._packs - cachedpacks:
       
   100             self._lastpack = pack
       
   101             yield pack
       
   102 
       
   103         # Data not found in any pack.
       
   104         self._lastpack = None
       
   105 
       
   106 class basepackstore(object):
       
   107     # Default cache size limit for the pack files.
       
   108     DEFAULTCACHESIZE = 100
       
   109 
       
   110     def __init__(self, ui, path):
       
   111         self.ui = ui
       
   112         self.path = path
       
   113 
       
   114         # lastrefesh is 0 so we'll immediately check for new packs on the first
       
   115         # failure.
       
   116         self.lastrefresh = 0
       
   117 
       
   118         packs = []
       
   119         for filepath, __, __ in self._getavailablepackfilessorted():
       
   120             try:
       
   121                 pack = self.getpack(filepath)
       
   122             except Exception as ex:
       
   123                 # An exception may be thrown if the pack file is corrupted
       
   124                 # somehow.  Log a warning but keep going in this case, just
       
   125                 # skipping this pack file.
       
   126                 #
       
   127                 # If this is an ENOENT error then don't even bother logging.
       
   128                 # Someone could have removed the file since we retrieved the
       
   129                 # list of paths.
       
   130                 if getattr(ex, 'errno', None) != errno.ENOENT:
       
   131                     ui.warn(_('unable to load pack %s: %s\n') % (filepath, ex))
       
   132                 continue
       
   133             packs.append(pack)
       
   134 
       
   135         self.packs = _cachebackedpacks(packs, self.DEFAULTCACHESIZE)
       
   136 
       
   137     def _getavailablepackfiles(self):
       
   138         """For each pack file (a index/data file combo), yields:
       
   139           (full path without extension, mtime, size)
       
   140 
       
   141         mtime will be the mtime of the index/data file (whichever is newer)
       
   142         size is the combined size of index/data file
       
   143         """
       
   144         indexsuffixlen = len(self.INDEXSUFFIX)
       
   145         packsuffixlen = len(self.PACKSUFFIX)
       
   146 
       
   147         ids = set()
       
   148         sizes = collections.defaultdict(lambda: 0)
       
   149         mtimes = collections.defaultdict(lambda: [])
       
   150         try:
       
   151             for filename, type, stat in osutil.listdir(self.path, stat=True):
       
   152                 id = None
       
   153                 if filename[-indexsuffixlen:] == self.INDEXSUFFIX:
       
   154                     id = filename[:-indexsuffixlen]
       
   155                 elif filename[-packsuffixlen:] == self.PACKSUFFIX:
       
   156                     id = filename[:-packsuffixlen]
       
   157 
       
   158                 # Since we expect to have two files corresponding to each ID
       
   159                 # (the index file and the pack file), we can yield once we see
       
   160                 # it twice.
       
   161                 if id:
       
   162                     sizes[id] += stat.st_size # Sum both files' sizes together
       
   163                     mtimes[id].append(stat.st_mtime)
       
   164                     if id in ids:
       
   165                         yield (os.path.join(self.path, id), max(mtimes[id]),
       
   166                             sizes[id])
       
   167                     else:
       
   168                         ids.add(id)
       
   169         except OSError as ex:
       
   170             if ex.errno != errno.ENOENT:
       
   171                 raise
       
   172 
       
   173     def _getavailablepackfilessorted(self):
       
   174         """Like `_getavailablepackfiles`, but also sorts the files by mtime,
       
   175         yielding newest files first.
       
   176 
       
   177         This is desirable, since it is more likely newer packfiles have more
       
   178         desirable data.
       
   179         """
       
   180         files = []
       
   181         for path, mtime, size in self._getavailablepackfiles():
       
   182             files.append((mtime, size, path))
       
   183         files = sorted(files, reverse=True)
       
   184         for mtime, size, path in files:
       
   185             yield path, mtime, size
       
   186 
       
   187     def gettotalsizeandcount(self):
       
   188         """Returns the total disk size (in bytes) of all the pack files in
       
   189         this store, and the count of pack files.
       
   190 
       
   191         (This might be smaller than the total size of the ``self.path``
       
   192         directory, since this only considers fuly-writen pack files, and not
       
   193         temporary files or other detritus on the directory.)
       
   194         """
       
   195         totalsize = 0
       
   196         count = 0
       
   197         for __, __, size in self._getavailablepackfiles():
       
   198             totalsize += size
       
   199             count += 1
       
   200         return totalsize, count
       
   201 
       
   202     def getmetrics(self):
       
   203         """Returns metrics on the state of this store."""
       
   204         size, count = self.gettotalsizeandcount()
       
   205         return {
       
   206             'numpacks': count,
       
   207             'totalpacksize': size,
       
   208         }
       
   209 
       
   210     def getpack(self, path):
       
   211         raise NotImplementedError()
       
   212 
       
   213     def getmissing(self, keys):
       
   214         missing = keys
       
   215         for pack in self.packs:
       
   216             missing = pack.getmissing(missing)
       
   217 
       
   218             # Ensures better performance of the cache by keeping the most
       
   219             # recently accessed pack at the beginning in subsequent iterations.
       
   220             if not missing:
       
   221                 return missing
       
   222 
       
   223         if missing:
       
   224             for pack in self.refresh():
       
   225                 missing = pack.getmissing(missing)
       
   226 
       
   227         return missing
       
   228 
       
   229     def markledger(self, ledger, options=None):
       
   230         for pack in self.packs:
       
   231             pack.markledger(ledger)
       
   232 
       
   233     def markforrefresh(self):
       
   234         """Tells the store that there may be new pack files, so the next time it
       
   235         has a lookup miss it should check for new files."""
       
   236         self.lastrefresh = 0
       
   237 
       
   238     def refresh(self):
       
   239         """Checks for any new packs on disk, adds them to the main pack list,
       
   240         and returns a list of just the new packs."""
       
   241         now = time.time()
       
   242 
       
   243         # If we experience a lot of misses (like in the case of getmissing() on
       
   244         # new objects), let's only actually check disk for new stuff every once
       
   245         # in a while. Generally this code path should only ever matter when a
       
   246         # repack is going on in the background, and that should be pretty rare
       
   247         # to have that happen twice in quick succession.
       
   248         newpacks = []
       
   249         if now > self.lastrefresh + REFRESHRATE:
       
   250             self.lastrefresh = now
       
   251             previous = set(p.path for p in self.packs)
       
   252             for filepath, __, __ in self._getavailablepackfilessorted():
       
   253                 if filepath not in previous:
       
   254                     newpack = self.getpack(filepath)
       
   255                     newpacks.append(newpack)
       
   256                     self.packs.add(newpack)
       
   257 
       
   258         return newpacks
       
   259 
       
   260 class versionmixin(object):
       
   261     # Mix-in for classes with multiple supported versions
       
   262     VERSION = None
       
   263     SUPPORTED_VERSIONS = [0]
       
   264 
       
   265     def _checkversion(self, version):
       
   266         if version in self.SUPPORTED_VERSIONS:
       
   267             if self.VERSION is None:
       
   268                 # only affect this instance
       
   269                 self.VERSION = version
       
   270             elif self.VERSION != version:
       
   271                 raise RuntimeError('inconsistent version: %s' % version)
       
   272         else:
       
   273             raise RuntimeError('unsupported version: %s' % version)
       
   274 
       
   275 class basepack(versionmixin):
       
   276     # The maximum amount we should read via mmap before remmaping so the old
       
   277     # pages can be released (100MB)
       
   278     MAXPAGEDIN = 100 * 1024**2
       
   279 
       
   280     SUPPORTED_VERSIONS = [0]
       
   281 
       
   282     def __init__(self, path):
       
   283         self.path = path
       
   284         self.packpath = path + self.PACKSUFFIX
       
   285         self.indexpath = path + self.INDEXSUFFIX
       
   286 
       
   287         self.indexsize = os.stat(self.indexpath).st_size
       
   288         self.datasize = os.stat(self.packpath).st_size
       
   289 
       
   290         self._index = None
       
   291         self._data = None
       
   292         self.freememory() # initialize the mmap
       
   293 
       
   294         version = struct.unpack('!B', self._data[:PACKVERSIONSIZE])[0]
       
   295         self._checkversion(version)
       
   296 
       
   297         version, config = struct.unpack('!BB', self._index[:INDEXVERSIONSIZE])
       
   298         self._checkversion(version)
       
   299 
       
   300         if 0b10000000 & config:
       
   301             self.params = indexparams(LARGEFANOUTPREFIX, version)
       
   302         else:
       
   303             self.params = indexparams(SMALLFANOUTPREFIX, version)
       
   304 
       
   305     @util.propertycache
       
   306     def _fanouttable(self):
       
   307         params = self.params
       
   308         rawfanout = self._index[FANOUTSTART:FANOUTSTART + params.fanoutsize]
       
   309         fanouttable = []
       
   310         for i in pycompat.xrange(0, params.fanoutcount):
       
   311             loc = i * 4
       
   312             fanoutentry = struct.unpack('!I', rawfanout[loc:loc + 4])[0]
       
   313             fanouttable.append(fanoutentry)
       
   314         return fanouttable
       
   315 
       
   316     @util.propertycache
       
   317     def _indexend(self):
       
   318         if self.VERSION == 0:
       
   319             return self.indexsize
       
   320         else:
       
   321             nodecount = struct.unpack_from('!Q', self._index,
       
   322                                            self.params.indexstart - 8)[0]
       
   323             return self.params.indexstart + nodecount * self.INDEXENTRYLENGTH
       
   324 
       
   325     def freememory(self):
       
   326         """Unmap and remap the memory to free it up after known expensive
       
   327         operations. Return True if self._data and self._index were reloaded.
       
   328         """
       
   329         if self._index:
       
   330             if self._pagedin < self.MAXPAGEDIN:
       
   331                 return False
       
   332 
       
   333             self._index.close()
       
   334             self._data.close()
       
   335 
       
   336         # TODO: use an opener/vfs to access these paths
       
   337         with open(self.indexpath, PACKOPENMODE) as indexfp:
       
   338             # memory-map the file, size 0 means whole file
       
   339             self._index = mmap.mmap(indexfp.fileno(), 0,
       
   340                                     access=mmap.ACCESS_READ)
       
   341         with open(self.packpath, PACKOPENMODE) as datafp:
       
   342             self._data = mmap.mmap(datafp.fileno(), 0, access=mmap.ACCESS_READ)
       
   343 
       
   344         self._pagedin = 0
       
   345         return True
       
   346 
       
   347     def getmissing(self, keys):
       
   348         raise NotImplementedError()
       
   349 
       
   350     def markledger(self, ledger, options=None):
       
   351         raise NotImplementedError()
       
   352 
       
   353     def cleanup(self, ledger):
       
   354         raise NotImplementedError()
       
   355 
       
   356     def __iter__(self):
       
   357         raise NotImplementedError()
       
   358 
       
   359     def iterentries(self):
       
   360         raise NotImplementedError()
       
   361 
       
   362 class mutablebasepack(versionmixin):
       
   363 
       
   364     def __init__(self, ui, packdir, version=0):
       
   365         self._checkversion(version)
       
   366 
       
   367         opener = vfsmod.vfs(packdir)
       
   368         opener.createmode = 0o444
       
   369         self.opener = opener
       
   370 
       
   371         self.entries = {}
       
   372 
       
   373         shallowutil.mkstickygroupdir(ui, packdir)
       
   374         self.packfp, self.packpath = opener.mkstemp(
       
   375             suffix=self.PACKSUFFIX + '-tmp')
       
   376         self.idxfp, self.idxpath = opener.mkstemp(
       
   377             suffix=self.INDEXSUFFIX + '-tmp')
       
   378         self.packfp = os.fdopen(self.packfp, 'w+')
       
   379         self.idxfp = os.fdopen(self.idxfp, 'w+')
       
   380         self.sha = hashlib.sha1()
       
   381         self._closed = False
       
   382 
       
   383         # The opener provides no way of doing permission fixup on files created
       
   384         # via mkstemp, so we must fix it ourselves. We can probably fix this
       
   385         # upstream in vfs.mkstemp so we don't need to use the private method.
       
   386         opener._fixfilemode(opener.join(self.packpath))
       
   387         opener._fixfilemode(opener.join(self.idxpath))
       
   388 
       
   389         # Write header
       
   390         # TODO: make it extensible (ex: allow specifying compression algorithm,
       
   391         # a flexible key/value header, delta algorithm, fanout size, etc)
       
   392         versionbuf = struct.pack('!B', self.VERSION) # unsigned 1 byte int
       
   393         self.writeraw(versionbuf)
       
   394 
       
   395     def __enter__(self):
       
   396         return self
       
   397 
       
   398     def __exit__(self, exc_type, exc_value, traceback):
       
   399         if exc_type is None:
       
   400             self.close()
       
   401         else:
       
   402             self.abort()
       
   403 
       
   404     def abort(self):
       
   405         # Unclean exit
       
   406         self._cleantemppacks()
       
   407 
       
   408     def writeraw(self, data):
       
   409         self.packfp.write(data)
       
   410         self.sha.update(data)
       
   411 
       
   412     def close(self, ledger=None):
       
   413         if self._closed:
       
   414             return
       
   415 
       
   416         try:
       
   417             sha = self.sha.hexdigest()
       
   418             self.packfp.close()
       
   419             self.writeindex()
       
   420 
       
   421             if len(self.entries) == 0:
       
   422                 # Empty pack
       
   423                 self._cleantemppacks()
       
   424                 self._closed = True
       
   425                 return None
       
   426 
       
   427             self.opener.rename(self.packpath, sha + self.PACKSUFFIX)
       
   428             try:
       
   429                 self.opener.rename(self.idxpath, sha + self.INDEXSUFFIX)
       
   430             except Exception as ex:
       
   431                 try:
       
   432                     self.opener.unlink(sha + self.PACKSUFFIX)
       
   433                 except Exception:
       
   434                     pass
       
   435                 # Throw exception 'ex' explicitly since a normal 'raise' would
       
   436                 # potentially throw an exception from the unlink cleanup.
       
   437                 raise ex
       
   438         except Exception:
       
   439             # Clean up temp packs in all exception cases
       
   440             self._cleantemppacks()
       
   441             raise
       
   442 
       
   443         self._closed = True
       
   444         result = self.opener.join(sha)
       
   445         if ledger:
       
   446             ledger.addcreated(result)
       
   447         return result
       
   448 
       
   449     def _cleantemppacks(self):
       
   450         try:
       
   451             self.opener.unlink(self.packpath)
       
   452         except Exception:
       
   453             pass
       
   454         try:
       
   455             self.opener.unlink(self.idxpath)
       
   456         except Exception:
       
   457             pass
       
   458 
       
   459     def writeindex(self):
       
   460         rawindex = ''
       
   461 
       
   462         largefanout = len(self.entries) > SMALLFANOUTCUTOFF
       
   463         if largefanout:
       
   464             params = indexparams(LARGEFANOUTPREFIX, self.VERSION)
       
   465         else:
       
   466             params = indexparams(SMALLFANOUTPREFIX, self.VERSION)
       
   467 
       
   468         fanouttable = [EMPTYFANOUT] * params.fanoutcount
       
   469 
       
   470         # Precompute the location of each entry
       
   471         locations = {}
       
   472         count = 0
       
   473         for node in sorted(self.entries.iterkeys()):
       
   474             location = count * self.INDEXENTRYLENGTH
       
   475             locations[node] = location
       
   476             count += 1
       
   477 
       
   478             # Must use [0] on the unpack result since it's always a tuple.
       
   479             fanoutkey = struct.unpack(params.fanoutstruct,
       
   480                                       node[:params.fanoutprefix])[0]
       
   481             if fanouttable[fanoutkey] == EMPTYFANOUT:
       
   482                 fanouttable[fanoutkey] = location
       
   483 
       
   484         rawfanouttable = ''
       
   485         last = 0
       
   486         for offset in fanouttable:
       
   487             offset = offset if offset != EMPTYFANOUT else last
       
   488             last = offset
       
   489             rawfanouttable += struct.pack('!I', offset)
       
   490 
       
   491         rawentrieslength = struct.pack('!Q', len(self.entries))
       
   492 
       
   493         # The index offset is the it's location in the file. So after the 2 byte
       
   494         # header and the fanouttable.
       
   495         rawindex = self.createindex(locations, 2 + len(rawfanouttable))
       
   496 
       
   497         self._writeheader(params)
       
   498         self.idxfp.write(rawfanouttable)
       
   499         if self.VERSION == 1:
       
   500             self.idxfp.write(rawentrieslength)
       
   501         self.idxfp.write(rawindex)
       
   502         self.idxfp.close()
       
   503 
       
   504     def createindex(self, nodelocations):
       
   505         raise NotImplementedError()
       
   506 
       
   507     def _writeheader(self, indexparams):
       
   508         # Index header
       
   509         #    <version: 1 byte>
       
   510         #    <large fanout: 1 bit> # 1 means 2^16, 0 means 2^8
       
   511         #    <unused: 7 bit> # future use (compression, delta format, etc)
       
   512         config = 0
       
   513         if indexparams.fanoutprefix == LARGEFANOUTPREFIX:
       
   514             config = 0b10000000
       
   515         self.idxfp.write(struct.pack('!BB', self.VERSION, config))
       
   516 
       
   517 class indexparams(object):
       
   518     __slots__ = ('fanoutprefix', 'fanoutstruct', 'fanoutcount', 'fanoutsize',
       
   519                  'indexstart')
       
   520 
       
   521     def __init__(self, prefixsize, version):
       
   522         self.fanoutprefix = prefixsize
       
   523 
       
   524         # The struct pack format for fanout table location (i.e. the format that
       
   525         # converts the node prefix into an integer location in the fanout
       
   526         # table).
       
   527         if prefixsize == SMALLFANOUTPREFIX:
       
   528             self.fanoutstruct = '!B'
       
   529         elif prefixsize == LARGEFANOUTPREFIX:
       
   530             self.fanoutstruct = '!H'
       
   531         else:
       
   532             raise ValueError("invalid fanout prefix size: %s" % prefixsize)
       
   533 
       
   534         # The number of fanout table entries
       
   535         self.fanoutcount = 2**(prefixsize * 8)
       
   536 
       
   537         # The total bytes used by the fanout table
       
   538         self.fanoutsize = self.fanoutcount * 4
       
   539 
       
   540         self.indexstart = FANOUTSTART + self.fanoutsize
       
   541         if version == 1:
       
   542             # Skip the index length
       
   543             self.indexstart += 8