tests/test-lock.py
changeset 26289 c4b667a7a51d
child 26321 db4c192cb9b3
equal deleted inserted replaced
26288:2239626369f5 26289:c4b667a7a51d
       
     1 from __future__ import absolute_import
       
     2 
       
     3 import os
       
     4 import silenttestrunner
       
     5 import tempfile
       
     6 import unittest
       
     7 
       
     8 from mercurial import (
       
     9     lock,
       
    10     scmutil,
       
    11 )
       
    12 
       
    13 testlockname = 'testlock'
       
    14 
       
    15 class teststate(object):
       
    16     def __init__(self, testcase):
       
    17         self._testcase = testcase
       
    18         self._releasecalled = False
       
    19         self._postreleasecalled = False
       
    20         d = tempfile.mkdtemp(dir=os.getcwd())
       
    21         self.vfs = scmutil.vfs(d, audit=False)
       
    22 
       
    23     def makelock(self, *args, **kwargs):
       
    24         l = lock.lock(self.vfs, testlockname, releasefn=self.releasefn, *args,
       
    25                       **kwargs)
       
    26         l.postrelease.append(self.postreleasefn)
       
    27         return l
       
    28 
       
    29     def releasefn(self):
       
    30         self._releasecalled = True
       
    31 
       
    32     def postreleasefn(self):
       
    33         self._postreleasecalled = True
       
    34 
       
    35     def assertreleasecalled(self, called):
       
    36         self._testcase.assertEqual(
       
    37             self._releasecalled, called,
       
    38             'expected release to be %s but was actually %s' % (
       
    39                 self._tocalled(called),
       
    40                 self._tocalled(self._releasecalled),
       
    41             ))
       
    42 
       
    43     def assertpostreleasecalled(self, called):
       
    44         self._testcase.assertEqual(
       
    45             self._postreleasecalled, called,
       
    46             'expected postrelease to be %s but was actually %s' % (
       
    47                 self._tocalled(called),
       
    48                 self._tocalled(self._postreleasecalled),
       
    49             ))
       
    50 
       
    51     def assertlockexists(self, exists):
       
    52         actual = self.vfs.lexists(testlockname)
       
    53         self._testcase.assertEqual(
       
    54             actual, exists,
       
    55             'expected lock to %s but actually did %s' % (
       
    56                 self._toexists(exists),
       
    57                 self._toexists(actual),
       
    58             ))
       
    59 
       
    60     def _tocalled(self, called):
       
    61         if called:
       
    62             return 'called'
       
    63         else:
       
    64             return 'not called'
       
    65 
       
    66     def _toexists(self, exists):
       
    67         if exists:
       
    68             return 'exists'
       
    69         else:
       
    70             return 'not exists'
       
    71 
       
    72 class testlock(unittest.TestCase):
       
    73     def testlock(self):
       
    74         state = teststate(self)
       
    75         lock = state.makelock()
       
    76         lock.release()
       
    77         state.assertreleasecalled(True)
       
    78         state.assertpostreleasecalled(True)
       
    79         state.assertlockexists(False)
       
    80 
       
    81     def testrecursivelock(self):
       
    82         state = teststate(self)
       
    83         lock = state.makelock()
       
    84         lock.lock()
       
    85         lock.release() # brings lock refcount down from 2 to 1
       
    86         state.assertreleasecalled(False)
       
    87         state.assertpostreleasecalled(False)
       
    88         state.assertlockexists(True)
       
    89 
       
    90         lock.release() # releases the lock
       
    91         state.assertreleasecalled(True)
       
    92         state.assertpostreleasecalled(True)
       
    93         state.assertlockexists(False)
       
    94 
       
    95     def testlockfork(self):
       
    96         state = teststate(self)
       
    97         lock = state.makelock()
       
    98         lock.lock()
       
    99         # fake a fork
       
   100         lock.pid += 1
       
   101         lock.release()
       
   102         state.assertreleasecalled(False)
       
   103         state.assertpostreleasecalled(False)
       
   104         state.assertlockexists(True)
       
   105 
       
   106         # release the actual lock
       
   107         lock.pid -= 1
       
   108         lock.release()
       
   109         state.assertreleasecalled(True)
       
   110         state.assertpostreleasecalled(True)
       
   111         state.assertlockexists(False)
       
   112 
       
   113 if __name__ == '__main__':
       
   114     silenttestrunner.main(__name__)