|
1 from __future__ import absolute_import |
|
2 |
|
3 import os |
|
4 import silenttestrunner |
|
5 import tempfile |
|
6 import unittest |
|
7 |
|
8 from mercurial import ( |
|
9 lock, |
|
10 scmutil, |
|
11 ) |
|
12 |
|
13 testlockname = 'testlock' |
|
14 |
|
15 class teststate(object): |
|
16 def __init__(self, testcase): |
|
17 self._testcase = testcase |
|
18 self._releasecalled = False |
|
19 self._postreleasecalled = False |
|
20 d = tempfile.mkdtemp(dir=os.getcwd()) |
|
21 self.vfs = scmutil.vfs(d, audit=False) |
|
22 |
|
23 def makelock(self, *args, **kwargs): |
|
24 l = lock.lock(self.vfs, testlockname, releasefn=self.releasefn, *args, |
|
25 **kwargs) |
|
26 l.postrelease.append(self.postreleasefn) |
|
27 return l |
|
28 |
|
29 def releasefn(self): |
|
30 self._releasecalled = True |
|
31 |
|
32 def postreleasefn(self): |
|
33 self._postreleasecalled = True |
|
34 |
|
35 def assertreleasecalled(self, called): |
|
36 self._testcase.assertEqual( |
|
37 self._releasecalled, called, |
|
38 'expected release to be %s but was actually %s' % ( |
|
39 self._tocalled(called), |
|
40 self._tocalled(self._releasecalled), |
|
41 )) |
|
42 |
|
43 def assertpostreleasecalled(self, called): |
|
44 self._testcase.assertEqual( |
|
45 self._postreleasecalled, called, |
|
46 'expected postrelease to be %s but was actually %s' % ( |
|
47 self._tocalled(called), |
|
48 self._tocalled(self._postreleasecalled), |
|
49 )) |
|
50 |
|
51 def assertlockexists(self, exists): |
|
52 actual = self.vfs.lexists(testlockname) |
|
53 self._testcase.assertEqual( |
|
54 actual, exists, |
|
55 'expected lock to %s but actually did %s' % ( |
|
56 self._toexists(exists), |
|
57 self._toexists(actual), |
|
58 )) |
|
59 |
|
60 def _tocalled(self, called): |
|
61 if called: |
|
62 return 'called' |
|
63 else: |
|
64 return 'not called' |
|
65 |
|
66 def _toexists(self, exists): |
|
67 if exists: |
|
68 return 'exists' |
|
69 else: |
|
70 return 'not exists' |
|
71 |
|
72 class testlock(unittest.TestCase): |
|
73 def testlock(self): |
|
74 state = teststate(self) |
|
75 lock = state.makelock() |
|
76 lock.release() |
|
77 state.assertreleasecalled(True) |
|
78 state.assertpostreleasecalled(True) |
|
79 state.assertlockexists(False) |
|
80 |
|
81 def testrecursivelock(self): |
|
82 state = teststate(self) |
|
83 lock = state.makelock() |
|
84 lock.lock() |
|
85 lock.release() # brings lock refcount down from 2 to 1 |
|
86 state.assertreleasecalled(False) |
|
87 state.assertpostreleasecalled(False) |
|
88 state.assertlockexists(True) |
|
89 |
|
90 lock.release() # releases the lock |
|
91 state.assertreleasecalled(True) |
|
92 state.assertpostreleasecalled(True) |
|
93 state.assertlockexists(False) |
|
94 |
|
95 def testlockfork(self): |
|
96 state = teststate(self) |
|
97 lock = state.makelock() |
|
98 lock.lock() |
|
99 # fake a fork |
|
100 lock.pid += 1 |
|
101 lock.release() |
|
102 state.assertreleasecalled(False) |
|
103 state.assertpostreleasecalled(False) |
|
104 state.assertlockexists(True) |
|
105 |
|
106 # release the actual lock |
|
107 lock.pid -= 1 |
|
108 lock.release() |
|
109 state.assertreleasecalled(True) |
|
110 state.assertpostreleasecalled(True) |
|
111 state.assertlockexists(False) |
|
112 |
|
113 if __name__ == '__main__': |
|
114 silenttestrunner.main(__name__) |