1 from __future__ import absolute_import |
|
2 |
|
3 import silenttestrunner |
|
4 import unittest |
|
5 |
|
6 from mercurial import util |
|
7 |
|
8 class contextmanager(object): |
|
9 def __init__(self, name, trace): |
|
10 self.name = name |
|
11 self.entered = False |
|
12 self.exited = False |
|
13 self.trace = trace |
|
14 |
|
15 def __enter__(self): |
|
16 self.entered = True |
|
17 self.trace(('enter', self.name)) |
|
18 return self |
|
19 |
|
20 def __exit__(self, exc_type, exc_val, exc_tb): |
|
21 self.exited = exc_type, exc_val, exc_tb |
|
22 self.trace(('exit', self.name)) |
|
23 |
|
24 def __repr__(self): |
|
25 return '<ctx %r>' % self.name |
|
26 |
|
27 class ctxerror(Exception): |
|
28 pass |
|
29 |
|
30 class raise_on_enter(contextmanager): |
|
31 def __enter__(self): |
|
32 self.trace(('raise', self.name)) |
|
33 raise ctxerror(self.name) |
|
34 |
|
35 class raise_on_exit(contextmanager): |
|
36 def __exit__(self, exc_type, exc_val, exc_tb): |
|
37 self.trace(('raise', self.name)) |
|
38 raise ctxerror(self.name) |
|
39 |
|
40 def ctxmgr(name, trace): |
|
41 return lambda: contextmanager(name, trace) |
|
42 |
|
43 class test_ctxmanager(unittest.TestCase): |
|
44 def test_basics(self): |
|
45 trace = [] |
|
46 addtrace = trace.append |
|
47 with util.ctxmanager(ctxmgr('a', addtrace), ctxmgr('b', addtrace)) as c: |
|
48 a, b = c.enter() |
|
49 c.atexit(addtrace, ('atexit', 'x')) |
|
50 c.atexit(addtrace, ('atexit', 'y')) |
|
51 self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'), |
|
52 ('atexit', 'y'), ('atexit', 'x'), |
|
53 ('exit', 'b'), ('exit', 'a')]) |
|
54 |
|
55 def test_raise_on_enter(self): |
|
56 trace = [] |
|
57 addtrace = trace.append |
|
58 def go(): |
|
59 with util.ctxmanager(ctxmgr('a', addtrace), |
|
60 lambda: raise_on_enter('b', addtrace)) as c: |
|
61 c.enter() |
|
62 addtrace('unreachable') |
|
63 self.assertRaises(ctxerror, go) |
|
64 self.assertEqual(trace, [('enter', 'a'), ('raise', 'b'), ('exit', 'a')]) |
|
65 |
|
66 def test_raise_on_exit(self): |
|
67 trace = [] |
|
68 addtrace = trace.append |
|
69 def go(): |
|
70 with util.ctxmanager(ctxmgr('a', addtrace), |
|
71 lambda: raise_on_exit('b', addtrace)) as c: |
|
72 c.enter() |
|
73 addtrace('running') |
|
74 self.assertRaises(ctxerror, go) |
|
75 self.assertEqual(trace, [('enter', 'a'), ('enter', 'b'), 'running', |
|
76 ('raise', 'b'), ('exit', 'a')]) |
|
77 |
|
78 if __name__ == '__main__': |
|
79 silenttestrunner.main(__name__) |
|