15 if pycompat.ispy3: |
15 if pycompat.ispy3: |
16 xrange = range |
16 xrange = range |
17 |
17 |
18 class testatomictempfile(unittest.TestCase): |
18 class testatomictempfile(unittest.TestCase): |
19 def setUp(self): |
19 def setUp(self): |
20 self._testdir = tempfile.mkdtemp('atomictempfiletest') |
20 self._testdir = tempfile.mkdtemp(b'atomictempfiletest') |
21 self._filename = os.path.join(self._testdir, 'testfilename') |
21 self._filename = os.path.join(self._testdir, b'testfilename') |
22 |
22 |
23 def tearDown(self): |
23 def tearDown(self): |
24 shutil.rmtree(self._testdir, True) |
24 shutil.rmtree(self._testdir, True) |
25 |
25 |
26 def testsimple(self): |
26 def testsimple(self): |
27 file = atomictempfile(self._filename) |
27 file = atomictempfile(self._filename) |
28 self.assertFalse(os.path.isfile(self._filename)) |
28 self.assertFalse(os.path.isfile(self._filename)) |
29 tempfilename = file._tempname |
29 tempfilename = file._tempname |
30 self.assertTrue(tempfilename in glob.glob( |
30 self.assertTrue(tempfilename in glob.glob( |
31 os.path.join(self._testdir, '.testfilename-*'))) |
31 os.path.join(self._testdir, b'.testfilename-*'))) |
32 |
32 |
33 file.write(b'argh\n') |
33 file.write(b'argh\n') |
34 file.close() |
34 file.close() |
35 |
35 |
36 self.assertTrue(os.path.isfile(self._filename)) |
36 self.assertTrue(os.path.isfile(self._filename)) |
37 self.assertTrue(tempfilename not in glob.glob( |
37 self.assertTrue(tempfilename not in glob.glob( |
38 os.path.join(self._testdir, '.testfilename-*'))) |
38 os.path.join(self._testdir, b'.testfilename-*'))) |
39 |
39 |
40 # discard() removes the temp file without making the write permanent |
40 # discard() removes the temp file without making the write permanent |
41 def testdiscard(self): |
41 def testdiscard(self): |
42 file = atomictempfile(self._filename) |
42 file = atomictempfile(self._filename) |
43 (dir, basename) = os.path.split(file._tempname) |
43 (dir, basename) = os.path.split(file._tempname) |
44 |
44 |
45 file.write(b'yo\n') |
45 file.write(b'yo\n') |
46 file.discard() |
46 file.discard() |
47 |
47 |
48 self.assertFalse(os.path.isfile(self._filename)) |
48 self.assertFalse(os.path.isfile(self._filename)) |
49 self.assertTrue(basename not in os.listdir('.')) |
49 self.assertTrue(basename not in os.listdir(b'.')) |
50 |
50 |
51 # if a programmer screws up and passes bad args to atomictempfile, they |
51 # if a programmer screws up and passes bad args to atomictempfile, they |
52 # get a plain ordinary TypeError, not infinite recursion |
52 # get a plain ordinary TypeError, not infinite recursion |
53 def testoops(self): |
53 def testoops(self): |
54 with self.assertRaises(TypeError): |
54 with self.assertRaises(TypeError): |
56 |
56 |
57 # checkambig=True avoids ambiguity of timestamp |
57 # checkambig=True avoids ambiguity of timestamp |
58 def testcheckambig(self): |
58 def testcheckambig(self): |
59 def atomicwrite(checkambig): |
59 def atomicwrite(checkambig): |
60 f = atomictempfile(self._filename, checkambig=checkambig) |
60 f = atomictempfile(self._filename, checkambig=checkambig) |
61 f.write('FOO') |
61 f.write(b'FOO') |
62 f.close() |
62 f.close() |
63 |
63 |
64 # try some times, because reproduction of ambiguity depends on |
64 # try some times, because reproduction of ambiguity depends on |
65 # "filesystem time" |
65 # "filesystem time" |
66 for i in xrange(5): |
66 for i in xrange(5): |
95 pass |
95 pass |
96 |
96 |
97 def testread(self): |
97 def testread(self): |
98 with open(self._filename, 'wb') as f: |
98 with open(self._filename, 'wb') as f: |
99 f.write(b'foobar\n') |
99 f.write(b'foobar\n') |
100 file = atomictempfile(self._filename, mode='rb') |
100 file = atomictempfile(self._filename, mode=b'rb') |
101 self.assertTrue(file.read(), b'foobar\n') |
101 self.assertTrue(file.read(), b'foobar\n') |
102 file.discard() |
102 file.discard() |
103 |
103 |
104 def testcontextmanagersuccess(self): |
104 def testcontextmanagersuccess(self): |
105 """When the context closes, the file is closed""" |
105 """When the context closes, the file is closed""" |
106 with atomictempfile('foo') as f: |
106 with atomictempfile(b'foo') as f: |
107 self.assertFalse(os.path.isfile('foo')) |
107 self.assertFalse(os.path.isfile(b'foo')) |
108 f.write(b'argh\n') |
108 f.write(b'argh\n') |
109 self.assertTrue(os.path.isfile('foo')) |
109 self.assertTrue(os.path.isfile(b'foo')) |
110 |
110 |
111 def testcontextmanagerfailure(self): |
111 def testcontextmanagerfailure(self): |
112 """On exception, the file is discarded""" |
112 """On exception, the file is discarded""" |
113 try: |
113 try: |
114 with atomictempfile('foo') as f: |
114 with atomictempfile(b'foo') as f: |
115 self.assertFalse(os.path.isfile('foo')) |
115 self.assertFalse(os.path.isfile(b'foo')) |
116 f.write(b'argh\n') |
116 f.write(b'argh\n') |
117 raise ValueError |
117 raise ValueError |
118 except ValueError: |
118 except ValueError: |
119 pass |
119 pass |
120 self.assertFalse(os.path.isfile('foo')) |
120 self.assertFalse(os.path.isfile(b'foo')) |
121 |
121 |
122 if __name__ == '__main__': |
122 if __name__ == '__main__': |
123 import silenttestrunner |
123 import silenttestrunner |
124 silenttestrunner.main(__name__) |
124 silenttestrunner.main(__name__) |