1 # tempfile.py unit tests.
2 import tempfile
3 import errno
4 import io
5 import os
6 import pathlib
7 import sys
8 import re
9 import warnings
10 import contextlib
11 import stat
12 import types
13 import weakref
14 from unittest import mock
15
16 import unittest
17 from test import support
18 from test.support import os_helper
19 from test.support import script_helper
20 from test.support import warnings_helper
21
22
23 has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
24 has_spawnl = hasattr(os, 'spawnl')
25
26 # TEST_FILES may need to be tweaked for systems depending on the maximum
27 # number of files that can be opened at one time (see ulimit -n)
28 if sys.platform.startswith('openbsd'):
29 TEST_FILES = 48
30 else:
31 TEST_FILES = 100
32
33 # This is organized as one test for each chunk of code in tempfile.py,
34 # in order of their appearance in the file. Testing which requires
35 # threads is not done here.
36
37 class ESC[4;38;5;81mTestLowLevelInternals(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
38 def test_infer_return_type_singles(self):
39 self.assertIs(str, tempfile._infer_return_type(''))
40 self.assertIs(bytes, tempfile._infer_return_type(b''))
41 self.assertIs(str, tempfile._infer_return_type(None))
42
43 def test_infer_return_type_multiples(self):
44 self.assertIs(str, tempfile._infer_return_type('', ''))
45 self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
46 with self.assertRaises(TypeError):
47 tempfile._infer_return_type('', b'')
48 with self.assertRaises(TypeError):
49 tempfile._infer_return_type(b'', '')
50
51 def test_infer_return_type_multiples_and_none(self):
52 self.assertIs(str, tempfile._infer_return_type(None, ''))
53 self.assertIs(str, tempfile._infer_return_type('', None))
54 self.assertIs(str, tempfile._infer_return_type(None, None))
55 self.assertIs(bytes, tempfile._infer_return_type(b'', None))
56 self.assertIs(bytes, tempfile._infer_return_type(None, b''))
57 with self.assertRaises(TypeError):
58 tempfile._infer_return_type('', None, b'')
59 with self.assertRaises(TypeError):
60 tempfile._infer_return_type(b'', None, '')
61
62 def test_infer_return_type_pathlib(self):
63 self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/')))
64
65 def test_infer_return_type_pathlike(self):
66 class ESC[4;38;5;81mPath:
67 def __init__(self, path):
68 self.path = path
69
70 def __fspath__(self):
71 return self.path
72
73 self.assertIs(str, tempfile._infer_return_type(Path('/')))
74 self.assertIs(bytes, tempfile._infer_return_type(Path(b'/')))
75 self.assertIs(str, tempfile._infer_return_type('', Path('')))
76 self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b'')))
77 self.assertIs(bytes, tempfile._infer_return_type(None, Path(b'')))
78 self.assertIs(str, tempfile._infer_return_type(None, Path('')))
79
80 with self.assertRaises(TypeError):
81 tempfile._infer_return_type('', Path(b''))
82 with self.assertRaises(TypeError):
83 tempfile._infer_return_type(b'', Path(''))
84
85 # Common functionality.
86
87 class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
88
89 str_check = re.compile(r"^[a-z0-9_-]{8}$")
90 b_check = re.compile(br"^[a-z0-9_-]{8}$")
91
92 def setUp(self):
93 self.enterContext(warnings_helper.check_warnings())
94 warnings.filterwarnings("ignore", category=RuntimeWarning,
95 message="mktemp", module=__name__)
96
97 def nameCheck(self, name, dir, pre, suf):
98 (ndir, nbase) = os.path.split(name)
99 npre = nbase[:len(pre)]
100 nsuf = nbase[len(nbase)-len(suf):]
101
102 if dir is not None:
103 self.assertIs(
104 type(name),
105 str
106 if type(dir) is str or isinstance(dir, os.PathLike) else
107 bytes,
108 "unexpected return type",
109 )
110 if pre is not None:
111 self.assertIs(type(name), str if type(pre) is str else bytes,
112 "unexpected return type")
113 if suf is not None:
114 self.assertIs(type(name), str if type(suf) is str else bytes,
115 "unexpected return type")
116 if (dir, pre, suf) == (None, None, None):
117 self.assertIs(type(name), str, "default return type must be str")
118
119 # check for equality of the absolute paths!
120 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
121 "file %r not in directory %r" % (name, dir))
122 self.assertEqual(npre, pre,
123 "file %r does not begin with %r" % (nbase, pre))
124 self.assertEqual(nsuf, suf,
125 "file %r does not end with %r" % (nbase, suf))
126
127 nbase = nbase[len(pre):len(nbase)-len(suf)]
128 check = self.str_check if isinstance(nbase, str) else self.b_check
129 self.assertTrue(check.match(nbase),
130 "random characters %r do not match %r"
131 % (nbase, check.pattern))
132
133
134 class ESC[4;38;5;81mTestExports(ESC[4;38;5;149mBaseTestCase):
135 def test_exports(self):
136 # There are no surprising symbols in the tempfile module
137 dict = tempfile.__dict__
138
139 expected = {
140 "NamedTemporaryFile" : 1,
141 "TemporaryFile" : 1,
142 "mkstemp" : 1,
143 "mkdtemp" : 1,
144 "mktemp" : 1,
145 "TMP_MAX" : 1,
146 "gettempprefix" : 1,
147 "gettempprefixb" : 1,
148 "gettempdir" : 1,
149 "gettempdirb" : 1,
150 "tempdir" : 1,
151 "template" : 1,
152 "SpooledTemporaryFile" : 1,
153 "TemporaryDirectory" : 1,
154 }
155
156 unexp = []
157 for key in dict:
158 if key[0] != '_' and key not in expected:
159 unexp.append(key)
160 self.assertTrue(len(unexp) == 0,
161 "unexpected keys: %s" % unexp)
162
163
164 class ESC[4;38;5;81mTestRandomNameSequence(ESC[4;38;5;149mBaseTestCase):
165 """Test the internal iterator object _RandomNameSequence."""
166
167 def setUp(self):
168 self.r = tempfile._RandomNameSequence()
169 super().setUp()
170
171 def test_get_eight_char_str(self):
172 # _RandomNameSequence returns a eight-character string
173 s = next(self.r)
174 self.nameCheck(s, '', '', '')
175
176 def test_many(self):
177 # _RandomNameSequence returns no duplicate strings (stochastic)
178
179 dict = {}
180 r = self.r
181 for i in range(TEST_FILES):
182 s = next(r)
183 self.nameCheck(s, '', '', '')
184 self.assertNotIn(s, dict)
185 dict[s] = 1
186
187 def supports_iter(self):
188 # _RandomNameSequence supports the iterator protocol
189
190 i = 0
191 r = self.r
192 for s in r:
193 i += 1
194 if i == 20:
195 break
196
197 @support.requires_fork()
198 def test_process_awareness(self):
199 # ensure that the random source differs between
200 # child and parent.
201 read_fd, write_fd = os.pipe()
202 pid = None
203 try:
204 pid = os.fork()
205 if not pid:
206 # child process
207 os.close(read_fd)
208 os.write(write_fd, next(self.r).encode("ascii"))
209 os.close(write_fd)
210 # bypass the normal exit handlers- leave those to
211 # the parent.
212 os._exit(0)
213
214 # parent process
215 parent_value = next(self.r)
216 child_value = os.read(read_fd, len(parent_value)).decode("ascii")
217 finally:
218 if pid:
219 support.wait_process(pid, exitcode=0)
220
221 os.close(read_fd)
222 os.close(write_fd)
223 self.assertNotEqual(child_value, parent_value)
224
225
226
227 class ESC[4;38;5;81mTestCandidateTempdirList(ESC[4;38;5;149mBaseTestCase):
228 """Test the internal function _candidate_tempdir_list."""
229
230 def test_nonempty_list(self):
231 # _candidate_tempdir_list returns a nonempty list of strings
232
233 cand = tempfile._candidate_tempdir_list()
234
235 self.assertFalse(len(cand) == 0)
236 for c in cand:
237 self.assertIsInstance(c, str)
238
239 def test_wanted_dirs(self):
240 # _candidate_tempdir_list contains the expected directories
241
242 # Make sure the interesting environment variables are all set.
243 with os_helper.EnvironmentVarGuard() as env:
244 for envname in 'TMPDIR', 'TEMP', 'TMP':
245 dirname = os.getenv(envname)
246 if not dirname:
247 env[envname] = os.path.abspath(envname)
248
249 cand = tempfile._candidate_tempdir_list()
250
251 for envname in 'TMPDIR', 'TEMP', 'TMP':
252 dirname = os.getenv(envname)
253 if not dirname: raise ValueError
254 self.assertIn(dirname, cand)
255
256 try:
257 dirname = os.getcwd()
258 except (AttributeError, OSError):
259 dirname = os.curdir
260
261 self.assertIn(dirname, cand)
262
263 # Not practical to try to verify the presence of OS-specific
264 # paths in this list.
265
266
267 # We test _get_default_tempdir some more by testing gettempdir.
268
269 class ESC[4;38;5;81mTestGetDefaultTempdir(ESC[4;38;5;149mBaseTestCase):
270 """Test _get_default_tempdir()."""
271
272 def test_no_files_left_behind(self):
273 # use a private empty directory
274 with tempfile.TemporaryDirectory() as our_temp_directory:
275 # force _get_default_tempdir() to consider our empty directory
276 def our_candidate_list():
277 return [our_temp_directory]
278
279 with support.swap_attr(tempfile, "_candidate_tempdir_list",
280 our_candidate_list):
281 # verify our directory is empty after _get_default_tempdir()
282 tempfile._get_default_tempdir()
283 self.assertEqual(os.listdir(our_temp_directory), [])
284
285 def raise_OSError(*args, **kwargs):
286 raise OSError()
287
288 with support.swap_attr(os, "open", raise_OSError):
289 # test again with failing os.open()
290 with self.assertRaises(FileNotFoundError):
291 tempfile._get_default_tempdir()
292 self.assertEqual(os.listdir(our_temp_directory), [])
293
294 with support.swap_attr(os, "write", raise_OSError):
295 # test again with failing os.write()
296 with self.assertRaises(FileNotFoundError):
297 tempfile._get_default_tempdir()
298 self.assertEqual(os.listdir(our_temp_directory), [])
299
300
301 class ESC[4;38;5;81mTestGetCandidateNames(ESC[4;38;5;149mBaseTestCase):
302 """Test the internal function _get_candidate_names."""
303
304 def test_retval(self):
305 # _get_candidate_names returns a _RandomNameSequence object
306 obj = tempfile._get_candidate_names()
307 self.assertIsInstance(obj, tempfile._RandomNameSequence)
308
309 def test_same_thing(self):
310 # _get_candidate_names always returns the same object
311 a = tempfile._get_candidate_names()
312 b = tempfile._get_candidate_names()
313
314 self.assertTrue(a is b)
315
316
317 @contextlib.contextmanager
318 def _inside_empty_temp_dir():
319 dir = tempfile.mkdtemp()
320 try:
321 with support.swap_attr(tempfile, 'tempdir', dir):
322 yield
323 finally:
324 os_helper.rmtree(dir)
325
326
327 def _mock_candidate_names(*names):
328 return support.swap_attr(tempfile,
329 '_get_candidate_names',
330 lambda: iter(names))
331
332
333 class ESC[4;38;5;81mTestBadTempdir:
334
335 @unittest.skipIf(
336 support.is_emscripten, "Emscripten cannot remove write bits."
337 )
338 def test_read_only_directory(self):
339 with _inside_empty_temp_dir():
340 oldmode = mode = os.stat(tempfile.tempdir).st_mode
341 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
342 os.chmod(tempfile.tempdir, mode)
343 try:
344 if os.access(tempfile.tempdir, os.W_OK):
345 self.skipTest("can't set the directory read-only")
346 with self.assertRaises(PermissionError):
347 self.make_temp()
348 self.assertEqual(os.listdir(tempfile.tempdir), [])
349 finally:
350 os.chmod(tempfile.tempdir, oldmode)
351
352 def test_nonexisting_directory(self):
353 with _inside_empty_temp_dir():
354 tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
355 with support.swap_attr(tempfile, 'tempdir', tempdir):
356 with self.assertRaises(FileNotFoundError):
357 self.make_temp()
358
359 def test_non_directory(self):
360 with _inside_empty_temp_dir():
361 tempdir = os.path.join(tempfile.tempdir, 'file')
362 open(tempdir, 'wb').close()
363 with support.swap_attr(tempfile, 'tempdir', tempdir):
364 with self.assertRaises((NotADirectoryError, FileNotFoundError)):
365 self.make_temp()
366
367
368 class ESC[4;38;5;81mTestMkstempInner(ESC[4;38;5;149mTestBadTempdir, ESC[4;38;5;149mBaseTestCase):
369 """Test the internal function _mkstemp_inner."""
370
371 class ESC[4;38;5;81mmkstemped:
372 _bflags = tempfile._bin_openflags
373 _tflags = tempfile._text_openflags
374 _close = os.close
375 _unlink = os.unlink
376
377 def __init__(self, dir, pre, suf, bin):
378 if bin: flags = self._bflags
379 else: flags = self._tflags
380
381 output_type = tempfile._infer_return_type(dir, pre, suf)
382 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
383
384 def write(self, str):
385 os.write(self.fd, str)
386
387 def __del__(self):
388 self._close(self.fd)
389 self._unlink(self.name)
390
391 def do_create(self, dir=None, pre=None, suf=None, bin=1):
392 output_type = tempfile._infer_return_type(dir, pre, suf)
393 if dir is None:
394 if output_type is str:
395 dir = tempfile.gettempdir()
396 else:
397 dir = tempfile.gettempdirb()
398 if pre is None:
399 pre = output_type()
400 if suf is None:
401 suf = output_type()
402 file = self.mkstemped(dir, pre, suf, bin)
403
404 self.nameCheck(file.name, dir, pre, suf)
405 return file
406
407 def test_basic(self):
408 # _mkstemp_inner can create files
409 self.do_create().write(b"blat")
410 self.do_create(pre="a").write(b"blat")
411 self.do_create(suf="b").write(b"blat")
412 self.do_create(pre="a", suf="b").write(b"blat")
413 self.do_create(pre="aa", suf=".txt").write(b"blat")
414
415 def test_basic_with_bytes_names(self):
416 # _mkstemp_inner can create files when given name parts all
417 # specified as bytes.
418 dir_b = tempfile.gettempdirb()
419 self.do_create(dir=dir_b, suf=b"").write(b"blat")
420 self.do_create(dir=dir_b, pre=b"a").write(b"blat")
421 self.do_create(dir=dir_b, suf=b"b").write(b"blat")
422 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
423 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
424 # Can't mix str & binary types in the args.
425 with self.assertRaises(TypeError):
426 self.do_create(dir="", suf=b"").write(b"blat")
427 with self.assertRaises(TypeError):
428 self.do_create(dir=dir_b, pre="").write(b"blat")
429 with self.assertRaises(TypeError):
430 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
431
432 def test_basic_many(self):
433 # _mkstemp_inner can create many files (stochastic)
434 extant = list(range(TEST_FILES))
435 for i in extant:
436 extant[i] = self.do_create(pre="aa")
437
438 def test_choose_directory(self):
439 # _mkstemp_inner can create files in a user-selected directory
440 dir = tempfile.mkdtemp()
441 try:
442 self.do_create(dir=dir).write(b"blat")
443 self.do_create(dir=pathlib.Path(dir)).write(b"blat")
444 finally:
445 support.gc_collect() # For PyPy or other GCs.
446 os.rmdir(dir)
447
448 @os_helper.skip_unless_working_chmod
449 def test_file_mode(self):
450 # _mkstemp_inner creates files with the proper mode
451
452 file = self.do_create()
453 mode = stat.S_IMODE(os.stat(file.name).st_mode)
454 expected = 0o600
455 if sys.platform == 'win32':
456 # There's no distinction among 'user', 'group' and 'world';
457 # replicate the 'user' bits.
458 user = expected >> 6
459 expected = user * (1 + 8 + 64)
460 self.assertEqual(mode, expected)
461
462 @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
463 @support.requires_subprocess()
464 def test_noinherit(self):
465 # _mkstemp_inner file handles are not inherited by child processes
466
467 if support.verbose:
468 v="v"
469 else:
470 v="q"
471
472 file = self.do_create()
473 self.assertEqual(os.get_inheritable(file.fd), False)
474 fd = "%d" % file.fd
475
476 try:
477 me = __file__
478 except NameError:
479 me = sys.argv[0]
480
481 # We have to exec something, so that FD_CLOEXEC will take
482 # effect. The core of this test is therefore in
483 # tf_inherit_check.py, which see.
484 tester = os.path.join(os.path.dirname(os.path.abspath(me)),
485 "tf_inherit_check.py")
486
487 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
488 # but an arg with embedded spaces should be decorated with double
489 # quotes on each end
490 if sys.platform == 'win32':
491 decorated = '"%s"' % sys.executable
492 tester = '"%s"' % tester
493 else:
494 decorated = sys.executable
495
496 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
497 self.assertFalse(retval < 0,
498 "child process caught fatal signal %d" % -retval)
499 self.assertFalse(retval > 0, "child process reports failure %d"%retval)
500
501 @unittest.skipUnless(has_textmode, "text mode not available")
502 def test_textmode(self):
503 # _mkstemp_inner can create files in text mode
504
505 # A text file is truncated at the first Ctrl+Z byte
506 f = self.do_create(bin=0)
507 f.write(b"blat\x1a")
508 f.write(b"extra\n")
509 os.lseek(f.fd, 0, os.SEEK_SET)
510 self.assertEqual(os.read(f.fd, 20), b"blat")
511
512 def make_temp(self):
513 return tempfile._mkstemp_inner(tempfile.gettempdir(),
514 tempfile.gettempprefix(),
515 '',
516 tempfile._bin_openflags,
517 str)
518
519 def test_collision_with_existing_file(self):
520 # _mkstemp_inner tries another name when a file with
521 # the chosen name already exists
522 with _inside_empty_temp_dir(), \
523 _mock_candidate_names('aaa', 'aaa', 'bbb'):
524 (fd1, name1) = self.make_temp()
525 os.close(fd1)
526 self.assertTrue(name1.endswith('aaa'))
527
528 (fd2, name2) = self.make_temp()
529 os.close(fd2)
530 self.assertTrue(name2.endswith('bbb'))
531
532 def test_collision_with_existing_directory(self):
533 # _mkstemp_inner tries another name when a directory with
534 # the chosen name already exists
535 with _inside_empty_temp_dir(), \
536 _mock_candidate_names('aaa', 'aaa', 'bbb'):
537 dir = tempfile.mkdtemp()
538 self.assertTrue(dir.endswith('aaa'))
539
540 (fd, name) = self.make_temp()
541 os.close(fd)
542 self.assertTrue(name.endswith('bbb'))
543
544
545 class ESC[4;38;5;81mTestGetTempPrefix(ESC[4;38;5;149mBaseTestCase):
546 """Test gettempprefix()."""
547
548 def test_sane_template(self):
549 # gettempprefix returns a nonempty prefix string
550 p = tempfile.gettempprefix()
551
552 self.assertIsInstance(p, str)
553 self.assertGreater(len(p), 0)
554
555 pb = tempfile.gettempprefixb()
556
557 self.assertIsInstance(pb, bytes)
558 self.assertGreater(len(pb), 0)
559
560 def test_usable_template(self):
561 # gettempprefix returns a usable prefix string
562
563 # Create a temp directory, avoiding use of the prefix.
564 # Then attempt to create a file whose name is
565 # prefix + 'xxxxxx.xxx' in that directory.
566 p = tempfile.gettempprefix() + "xxxxxx.xxx"
567 d = tempfile.mkdtemp(prefix="")
568 try:
569 p = os.path.join(d, p)
570 fd = os.open(p, os.O_RDWR | os.O_CREAT)
571 os.close(fd)
572 os.unlink(p)
573 finally:
574 os.rmdir(d)
575
576
577 class ESC[4;38;5;81mTestGetTempDir(ESC[4;38;5;149mBaseTestCase):
578 """Test gettempdir()."""
579
580 def test_directory_exists(self):
581 # gettempdir returns a directory which exists
582
583 for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
584 self.assertTrue(os.path.isabs(d) or d == os.curdir,
585 "%r is not an absolute path" % d)
586 self.assertTrue(os.path.isdir(d),
587 "%r is not a directory" % d)
588
589 def test_directory_writable(self):
590 # gettempdir returns a directory writable by the user
591
592 # sneaky: just instantiate a NamedTemporaryFile, which
593 # defaults to writing into the directory returned by
594 # gettempdir.
595 with tempfile.NamedTemporaryFile() as file:
596 file.write(b"blat")
597
598 def test_same_thing(self):
599 # gettempdir always returns the same object
600 a = tempfile.gettempdir()
601 b = tempfile.gettempdir()
602 c = tempfile.gettempdirb()
603
604 self.assertTrue(a is b)
605 self.assertNotEqual(type(a), type(c))
606 self.assertEqual(a, os.fsdecode(c))
607
608 def test_case_sensitive(self):
609 # gettempdir should not flatten its case
610 # even on a case-insensitive file system
611 case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
612 _tempdir, tempfile.tempdir = tempfile.tempdir, None
613 try:
614 with os_helper.EnvironmentVarGuard() as env:
615 # Fake the first env var which is checked as a candidate
616 env["TMPDIR"] = case_sensitive_tempdir
617 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
618 finally:
619 tempfile.tempdir = _tempdir
620 os_helper.rmdir(case_sensitive_tempdir)
621
622
623 class ESC[4;38;5;81mTestMkstemp(ESC[4;38;5;149mBaseTestCase):
624 """Test mkstemp()."""
625
626 def do_create(self, dir=None, pre=None, suf=None):
627 output_type = tempfile._infer_return_type(dir, pre, suf)
628 if dir is None:
629 if output_type is str:
630 dir = tempfile.gettempdir()
631 else:
632 dir = tempfile.gettempdirb()
633 if pre is None:
634 pre = output_type()
635 if suf is None:
636 suf = output_type()
637 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
638 (ndir, nbase) = os.path.split(name)
639 adir = os.path.abspath(dir)
640 self.assertEqual(adir, ndir,
641 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
642
643 try:
644 self.nameCheck(name, dir, pre, suf)
645 finally:
646 os.close(fd)
647 os.unlink(name)
648
649 def test_basic(self):
650 # mkstemp can create files
651 self.do_create()
652 self.do_create(pre="a")
653 self.do_create(suf="b")
654 self.do_create(pre="a", suf="b")
655 self.do_create(pre="aa", suf=".txt")
656 self.do_create(dir=".")
657
658 def test_basic_with_bytes_names(self):
659 # mkstemp can create files when given name parts all
660 # specified as bytes.
661 d = tempfile.gettempdirb()
662 self.do_create(dir=d, suf=b"")
663 self.do_create(dir=d, pre=b"a")
664 self.do_create(dir=d, suf=b"b")
665 self.do_create(dir=d, pre=b"a", suf=b"b")
666 self.do_create(dir=d, pre=b"aa", suf=b".txt")
667 self.do_create(dir=b".")
668 with self.assertRaises(TypeError):
669 self.do_create(dir=".", pre=b"aa", suf=b".txt")
670 with self.assertRaises(TypeError):
671 self.do_create(dir=b".", pre="aa", suf=b".txt")
672 with self.assertRaises(TypeError):
673 self.do_create(dir=b".", pre=b"aa", suf=".txt")
674
675
676 def test_choose_directory(self):
677 # mkstemp can create directories in a user-selected directory
678 dir = tempfile.mkdtemp()
679 try:
680 self.do_create(dir=dir)
681 self.do_create(dir=pathlib.Path(dir))
682 finally:
683 os.rmdir(dir)
684
685 def test_for_tempdir_is_bytes_issue40701_api_warts(self):
686 orig_tempdir = tempfile.tempdir
687 self.assertIsInstance(tempfile.tempdir, (str, type(None)))
688 try:
689 fd, path = tempfile.mkstemp()
690 os.close(fd)
691 os.unlink(path)
692 self.assertIsInstance(path, str)
693 tempfile.tempdir = tempfile.gettempdirb()
694 self.assertIsInstance(tempfile.tempdir, bytes)
695 self.assertIsInstance(tempfile.gettempdir(), str)
696 self.assertIsInstance(tempfile.gettempdirb(), bytes)
697 fd, path = tempfile.mkstemp()
698 os.close(fd)
699 os.unlink(path)
700 self.assertIsInstance(path, bytes)
701 fd, path = tempfile.mkstemp(suffix='.txt')
702 os.close(fd)
703 os.unlink(path)
704 self.assertIsInstance(path, str)
705 fd, path = tempfile.mkstemp(prefix='test-temp-')
706 os.close(fd)
707 os.unlink(path)
708 self.assertIsInstance(path, str)
709 fd, path = tempfile.mkstemp(dir=tempfile.gettempdir())
710 os.close(fd)
711 os.unlink(path)
712 self.assertIsInstance(path, str)
713 finally:
714 tempfile.tempdir = orig_tempdir
715
716
717 class ESC[4;38;5;81mTestMkdtemp(ESC[4;38;5;149mTestBadTempdir, ESC[4;38;5;149mBaseTestCase):
718 """Test mkdtemp()."""
719
720 def make_temp(self):
721 return tempfile.mkdtemp()
722
723 def do_create(self, dir=None, pre=None, suf=None):
724 output_type = tempfile._infer_return_type(dir, pre, suf)
725 if dir is None:
726 if output_type is str:
727 dir = tempfile.gettempdir()
728 else:
729 dir = tempfile.gettempdirb()
730 if pre is None:
731 pre = output_type()
732 if suf is None:
733 suf = output_type()
734 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
735
736 try:
737 self.nameCheck(name, dir, pre, suf)
738 return name
739 except:
740 os.rmdir(name)
741 raise
742
743 def test_basic(self):
744 # mkdtemp can create directories
745 os.rmdir(self.do_create())
746 os.rmdir(self.do_create(pre="a"))
747 os.rmdir(self.do_create(suf="b"))
748 os.rmdir(self.do_create(pre="a", suf="b"))
749 os.rmdir(self.do_create(pre="aa", suf=".txt"))
750
751 def test_basic_with_bytes_names(self):
752 # mkdtemp can create directories when given all binary parts
753 d = tempfile.gettempdirb()
754 os.rmdir(self.do_create(dir=d))
755 os.rmdir(self.do_create(dir=d, pre=b"a"))
756 os.rmdir(self.do_create(dir=d, suf=b"b"))
757 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
758 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
759 with self.assertRaises(TypeError):
760 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
761 with self.assertRaises(TypeError):
762 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
763 with self.assertRaises(TypeError):
764 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
765
766 def test_basic_many(self):
767 # mkdtemp can create many directories (stochastic)
768 extant = list(range(TEST_FILES))
769 try:
770 for i in extant:
771 extant[i] = self.do_create(pre="aa")
772 finally:
773 for i in extant:
774 if(isinstance(i, str)):
775 os.rmdir(i)
776
777 def test_choose_directory(self):
778 # mkdtemp can create directories in a user-selected directory
779 dir = tempfile.mkdtemp()
780 try:
781 os.rmdir(self.do_create(dir=dir))
782 os.rmdir(self.do_create(dir=pathlib.Path(dir)))
783 finally:
784 os.rmdir(dir)
785
786 @os_helper.skip_unless_working_chmod
787 def test_mode(self):
788 # mkdtemp creates directories with the proper mode
789
790 dir = self.do_create()
791 try:
792 mode = stat.S_IMODE(os.stat(dir).st_mode)
793 mode &= 0o777 # Mask off sticky bits inherited from /tmp
794 expected = 0o700
795 if sys.platform == 'win32':
796 # There's no distinction among 'user', 'group' and 'world';
797 # replicate the 'user' bits.
798 user = expected >> 6
799 expected = user * (1 + 8 + 64)
800 self.assertEqual(mode, expected)
801 finally:
802 os.rmdir(dir)
803
804 def test_collision_with_existing_file(self):
805 # mkdtemp tries another name when a file with
806 # the chosen name already exists
807 with _inside_empty_temp_dir(), \
808 _mock_candidate_names('aaa', 'aaa', 'bbb'):
809 file = tempfile.NamedTemporaryFile(delete=False)
810 file.close()
811 self.assertTrue(file.name.endswith('aaa'))
812 dir = tempfile.mkdtemp()
813 self.assertTrue(dir.endswith('bbb'))
814
815 def test_collision_with_existing_directory(self):
816 # mkdtemp tries another name when a directory with
817 # the chosen name already exists
818 with _inside_empty_temp_dir(), \
819 _mock_candidate_names('aaa', 'aaa', 'bbb'):
820 dir1 = tempfile.mkdtemp()
821 self.assertTrue(dir1.endswith('aaa'))
822 dir2 = tempfile.mkdtemp()
823 self.assertTrue(dir2.endswith('bbb'))
824
825 def test_for_tempdir_is_bytes_issue40701_api_warts(self):
826 orig_tempdir = tempfile.tempdir
827 self.assertIsInstance(tempfile.tempdir, (str, type(None)))
828 try:
829 path = tempfile.mkdtemp()
830 os.rmdir(path)
831 self.assertIsInstance(path, str)
832 tempfile.tempdir = tempfile.gettempdirb()
833 self.assertIsInstance(tempfile.tempdir, bytes)
834 self.assertIsInstance(tempfile.gettempdir(), str)
835 self.assertIsInstance(tempfile.gettempdirb(), bytes)
836 path = tempfile.mkdtemp()
837 os.rmdir(path)
838 self.assertIsInstance(path, bytes)
839 path = tempfile.mkdtemp(suffix='-dir')
840 os.rmdir(path)
841 self.assertIsInstance(path, str)
842 path = tempfile.mkdtemp(prefix='test-mkdtemp-')
843 os.rmdir(path)
844 self.assertIsInstance(path, str)
845 path = tempfile.mkdtemp(dir=tempfile.gettempdir())
846 os.rmdir(path)
847 self.assertIsInstance(path, str)
848 finally:
849 tempfile.tempdir = orig_tempdir
850
851
852 class ESC[4;38;5;81mTestMktemp(ESC[4;38;5;149mBaseTestCase):
853 """Test mktemp()."""
854
855 # For safety, all use of mktemp must occur in a private directory.
856 # We must also suppress the RuntimeWarning it generates.
857 def setUp(self):
858 self.dir = tempfile.mkdtemp()
859 super().setUp()
860
861 def tearDown(self):
862 if self.dir:
863 os.rmdir(self.dir)
864 self.dir = None
865 super().tearDown()
866
867 class ESC[4;38;5;81mmktemped:
868 _unlink = os.unlink
869 _bflags = tempfile._bin_openflags
870
871 def __init__(self, dir, pre, suf):
872 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
873 # Create the file. This will raise an exception if it's
874 # mysteriously appeared in the meanwhile.
875 os.close(os.open(self.name, self._bflags, 0o600))
876
877 def __del__(self):
878 self._unlink(self.name)
879
880 def do_create(self, pre="", suf=""):
881 file = self.mktemped(self.dir, pre, suf)
882
883 self.nameCheck(file.name, self.dir, pre, suf)
884 return file
885
886 def test_basic(self):
887 # mktemp can choose usable file names
888 self.do_create()
889 self.do_create(pre="a")
890 self.do_create(suf="b")
891 self.do_create(pre="a", suf="b")
892 self.do_create(pre="aa", suf=".txt")
893
894 def test_many(self):
895 # mktemp can choose many usable file names (stochastic)
896 extant = list(range(TEST_FILES))
897 for i in extant:
898 extant[i] = self.do_create(pre="aa")
899 del extant
900 support.gc_collect() # For PyPy or other GCs.
901
902 ## def test_warning(self):
903 ## # mktemp issues a warning when used
904 ## warnings.filterwarnings("error",
905 ## category=RuntimeWarning,
906 ## message="mktemp")
907 ## self.assertRaises(RuntimeWarning,
908 ## tempfile.mktemp, dir=self.dir)
909
910
911 # We test _TemporaryFileWrapper by testing NamedTemporaryFile.
912
913
914 class ESC[4;38;5;81mTestNamedTemporaryFile(ESC[4;38;5;149mBaseTestCase):
915 """Test NamedTemporaryFile()."""
916
917 def do_create(self, dir=None, pre="", suf="", delete=True):
918 if dir is None:
919 dir = tempfile.gettempdir()
920 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
921 delete=delete)
922
923 self.nameCheck(file.name, dir, pre, suf)
924 return file
925
926
927 def test_basic(self):
928 # NamedTemporaryFile can create files
929 self.do_create()
930 self.do_create(pre="a")
931 self.do_create(suf="b")
932 self.do_create(pre="a", suf="b")
933 self.do_create(pre="aa", suf=".txt")
934
935 def test_method_lookup(self):
936 # Issue #18879: Looking up a temporary file method should keep it
937 # alive long enough.
938 f = self.do_create()
939 wr = weakref.ref(f)
940 write = f.write
941 write2 = f.write
942 del f
943 write(b'foo')
944 del write
945 write2(b'bar')
946 del write2
947 if support.check_impl_detail(cpython=True):
948 # No reference cycle was created.
949 self.assertIsNone(wr())
950
951 def test_iter(self):
952 # Issue #23700: getting iterator from a temporary file should keep
953 # it alive as long as it's being iterated over
954 lines = [b'spam\n', b'eggs\n', b'beans\n']
955 def make_file():
956 f = tempfile.NamedTemporaryFile(mode='w+b')
957 f.write(b''.join(lines))
958 f.seek(0)
959 return f
960 for i, l in enumerate(make_file()):
961 self.assertEqual(l, lines[i])
962 self.assertEqual(i, len(lines) - 1)
963
964 def test_creates_named(self):
965 # NamedTemporaryFile creates files with names
966 f = tempfile.NamedTemporaryFile()
967 self.assertTrue(os.path.exists(f.name),
968 "NamedTemporaryFile %s does not exist" % f.name)
969
970 def test_del_on_close(self):
971 # A NamedTemporaryFile is deleted when closed
972 dir = tempfile.mkdtemp()
973 try:
974 with tempfile.NamedTemporaryFile(dir=dir) as f:
975 f.write(b'blat')
976 self.assertEqual(os.listdir(dir), [])
977 self.assertFalse(os.path.exists(f.name),
978 "NamedTemporaryFile %s exists after close" % f.name)
979 finally:
980 os.rmdir(dir)
981
982 def test_dis_del_on_close(self):
983 # Tests that delete-on-close can be disabled
984 dir = tempfile.mkdtemp()
985 tmp = None
986 try:
987 f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
988 tmp = f.name
989 f.write(b'blat')
990 f.close()
991 self.assertTrue(os.path.exists(f.name),
992 "NamedTemporaryFile %s missing after close" % f.name)
993 finally:
994 if tmp is not None:
995 os.unlink(tmp)
996 os.rmdir(dir)
997
998 def test_multiple_close(self):
999 # A NamedTemporaryFile can be closed many times without error
1000 f = tempfile.NamedTemporaryFile()
1001 f.write(b'abc\n')
1002 f.close()
1003 f.close()
1004 f.close()
1005
1006 def test_context_manager(self):
1007 # A NamedTemporaryFile can be used as a context manager
1008 with tempfile.NamedTemporaryFile() as f:
1009 self.assertTrue(os.path.exists(f.name))
1010 self.assertFalse(os.path.exists(f.name))
1011 def use_closed():
1012 with f:
1013 pass
1014 self.assertRaises(ValueError, use_closed)
1015
1016 def test_bad_mode(self):
1017 dir = tempfile.mkdtemp()
1018 self.addCleanup(os_helper.rmtree, dir)
1019 with self.assertRaises(ValueError):
1020 tempfile.NamedTemporaryFile(mode='wr', dir=dir)
1021 with self.assertRaises(TypeError):
1022 tempfile.NamedTemporaryFile(mode=2, dir=dir)
1023 self.assertEqual(os.listdir(dir), [])
1024
1025 def test_bad_encoding(self):
1026 dir = tempfile.mkdtemp()
1027 self.addCleanup(os_helper.rmtree, dir)
1028 with self.assertRaises(LookupError):
1029 tempfile.NamedTemporaryFile('w', encoding='bad-encoding', dir=dir)
1030 self.assertEqual(os.listdir(dir), [])
1031
1032 def test_unexpected_error(self):
1033 dir = tempfile.mkdtemp()
1034 self.addCleanup(os_helper.rmtree, dir)
1035 with mock.patch('tempfile._TemporaryFileWrapper') as mock_ntf, \
1036 mock.patch('io.open', mock.mock_open()) as mock_open:
1037 mock_ntf.side_effect = KeyboardInterrupt()
1038 with self.assertRaises(KeyboardInterrupt):
1039 tempfile.NamedTemporaryFile(dir=dir)
1040 mock_open().close.assert_called()
1041 self.assertEqual(os.listdir(dir), [])
1042
1043 # How to test the mode and bufsize parameters?
1044
1045 class ESC[4;38;5;81mTestSpooledTemporaryFile(ESC[4;38;5;149mBaseTestCase):
1046 """Test SpooledTemporaryFile()."""
1047
1048 def do_create(self, max_size=0, dir=None, pre="", suf=""):
1049 if dir is None:
1050 dir = tempfile.gettempdir()
1051 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
1052
1053 return file
1054
1055
1056 def test_basic(self):
1057 # SpooledTemporaryFile can create files
1058 f = self.do_create()
1059 self.assertFalse(f._rolled)
1060 f = self.do_create(max_size=100, pre="a", suf=".txt")
1061 self.assertFalse(f._rolled)
1062
1063 def test_is_iobase(self):
1064 # SpooledTemporaryFile should implement io.IOBase
1065 self.assertIsInstance(self.do_create(), io.IOBase)
1066
1067 def test_iobase_interface(self):
1068 # SpooledTemporaryFile should implement the io.IOBase interface.
1069 # Ensure it has all the required methods and properties.
1070 iobase_attrs = {
1071 # From IOBase
1072 'fileno', 'seek', 'truncate', 'close', 'closed', '__enter__',
1073 '__exit__', 'flush', 'isatty', '__iter__', '__next__', 'readable',
1074 'readline', 'readlines', 'seekable', 'tell', 'writable',
1075 'writelines',
1076 # From BufferedIOBase (binary mode) and TextIOBase (text mode)
1077 'detach', 'read', 'read1', 'write', 'readinto', 'readinto1',
1078 'encoding', 'errors', 'newlines',
1079 }
1080 spooledtempfile_attrs = set(dir(tempfile.SpooledTemporaryFile))
1081 missing_attrs = iobase_attrs - spooledtempfile_attrs
1082 self.assertFalse(
1083 missing_attrs,
1084 'SpooledTemporaryFile missing attributes from IOBase/BufferedIOBase/TextIOBase'
1085 )
1086
1087 def test_del_on_close(self):
1088 # A SpooledTemporaryFile is deleted when closed
1089 dir = tempfile.mkdtemp()
1090 try:
1091 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
1092 self.assertFalse(f._rolled)
1093 f.write(b'blat ' * 5)
1094 self.assertTrue(f._rolled)
1095 filename = f.name
1096 f.close()
1097 self.assertEqual(os.listdir(dir), [])
1098 if not isinstance(filename, int):
1099 self.assertFalse(os.path.exists(filename),
1100 "SpooledTemporaryFile %s exists after close" % filename)
1101 finally:
1102 os.rmdir(dir)
1103
1104 def test_del_unrolled_file(self):
1105 # The unrolled SpooledTemporaryFile should raise a ResourceWarning
1106 # when deleted since the file was not explicitly closed.
1107 f = self.do_create(max_size=10)
1108 f.write(b'foo')
1109 self.assertEqual(f.name, None) # Unrolled so no filename/fd
1110 with self.assertWarns(ResourceWarning):
1111 f.__del__()
1112
1113 @unittest.skipIf(
1114 support.is_emscripten, "Emscripten cannot fstat renamed files."
1115 )
1116 def test_del_rolled_file(self):
1117 # The rolled file should be deleted when the SpooledTemporaryFile
1118 # object is deleted. This should raise a ResourceWarning since the file
1119 # was not explicitly closed.
1120 f = self.do_create(max_size=2)
1121 f.write(b'foo')
1122 name = f.name # This is a fd on posix+cygwin, a filename everywhere else
1123 self.assertTrue(os.path.exists(name))
1124 with self.assertWarns(ResourceWarning):
1125 f.__del__()
1126 self.assertFalse(
1127 os.path.exists(name),
1128 "Rolled SpooledTemporaryFile (name=%s) exists after delete" % name
1129 )
1130
1131 def test_rewrite_small(self):
1132 # A SpooledTemporaryFile can be written to multiple within the max_size
1133 f = self.do_create(max_size=30)
1134 self.assertFalse(f._rolled)
1135 for i in range(5):
1136 f.seek(0, 0)
1137 f.write(b'x' * 20)
1138 self.assertFalse(f._rolled)
1139
1140 def test_write_sequential(self):
1141 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1142 # over afterward
1143 f = self.do_create(max_size=30)
1144 self.assertFalse(f._rolled)
1145 f.write(b'x' * 20)
1146 self.assertFalse(f._rolled)
1147 f.write(b'x' * 10)
1148 self.assertFalse(f._rolled)
1149 f.write(b'x')
1150 self.assertTrue(f._rolled)
1151
1152 def test_writelines(self):
1153 # Verify writelines with a SpooledTemporaryFile
1154 f = self.do_create()
1155 f.writelines((b'x', b'y', b'z'))
1156 pos = f.seek(0)
1157 self.assertEqual(pos, 0)
1158 buf = f.read()
1159 self.assertEqual(buf, b'xyz')
1160
1161 def test_writelines_sequential(self):
1162 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1163 # over afterward
1164 f = self.do_create(max_size=35)
1165 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
1166 self.assertFalse(f._rolled)
1167 f.write(b'x')
1168 self.assertTrue(f._rolled)
1169
1170 def test_sparse(self):
1171 # A SpooledTemporaryFile that is written late in the file will extend
1172 # when that occurs
1173 f = self.do_create(max_size=30)
1174 self.assertFalse(f._rolled)
1175 pos = f.seek(100, 0)
1176 self.assertEqual(pos, 100)
1177 self.assertFalse(f._rolled)
1178 f.write(b'x')
1179 self.assertTrue(f._rolled)
1180
1181 def test_fileno(self):
1182 # A SpooledTemporaryFile should roll over to a real file on fileno()
1183 f = self.do_create(max_size=30)
1184 self.assertFalse(f._rolled)
1185 self.assertTrue(f.fileno() > 0)
1186 self.assertTrue(f._rolled)
1187
1188 def test_multiple_close_before_rollover(self):
1189 # A SpooledTemporaryFile can be closed many times without error
1190 f = tempfile.SpooledTemporaryFile()
1191 f.write(b'abc\n')
1192 self.assertFalse(f._rolled)
1193 f.close()
1194 f.close()
1195 f.close()
1196
1197 def test_multiple_close_after_rollover(self):
1198 # A SpooledTemporaryFile can be closed many times without error
1199 f = tempfile.SpooledTemporaryFile(max_size=1)
1200 f.write(b'abc\n')
1201 self.assertTrue(f._rolled)
1202 f.close()
1203 f.close()
1204 f.close()
1205
1206 def test_bound_methods(self):
1207 # It should be OK to steal a bound method from a SpooledTemporaryFile
1208 # and use it independently; when the file rolls over, those bound
1209 # methods should continue to function
1210 f = self.do_create(max_size=30)
1211 read = f.read
1212 write = f.write
1213 seek = f.seek
1214
1215 write(b"a" * 35)
1216 write(b"b" * 35)
1217 seek(0, 0)
1218 self.assertEqual(read(70), b'a'*35 + b'b'*35)
1219
1220 def test_properties(self):
1221 f = tempfile.SpooledTemporaryFile(max_size=10)
1222 f.write(b'x' * 10)
1223 self.assertFalse(f._rolled)
1224 self.assertEqual(f.mode, 'w+b')
1225 self.assertIsNone(f.name)
1226 with self.assertRaises(AttributeError):
1227 f.newlines
1228 with self.assertRaises(AttributeError):
1229 f.encoding
1230 with self.assertRaises(AttributeError):
1231 f.errors
1232
1233 f.write(b'x')
1234 self.assertTrue(f._rolled)
1235 self.assertEqual(f.mode, 'rb+')
1236 self.assertIsNotNone(f.name)
1237 with self.assertRaises(AttributeError):
1238 f.newlines
1239 with self.assertRaises(AttributeError):
1240 f.encoding
1241 with self.assertRaises(AttributeError):
1242 f.errors
1243
1244 def test_text_mode(self):
1245 # Creating a SpooledTemporaryFile with a text mode should produce
1246 # a file object reading and writing (Unicode) text strings.
1247 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1248 encoding="utf-8")
1249 f.write("abc\n")
1250 f.seek(0)
1251 self.assertEqual(f.read(), "abc\n")
1252 f.write("def\n")
1253 f.seek(0)
1254 self.assertEqual(f.read(), "abc\ndef\n")
1255 self.assertFalse(f._rolled)
1256 self.assertEqual(f.mode, 'w+')
1257 self.assertIsNone(f.name)
1258 self.assertEqual(f.newlines, os.linesep)
1259 self.assertEqual(f.encoding, "utf-8")
1260 self.assertEqual(f.errors, "strict")
1261
1262 f.write("xyzzy\n")
1263 f.seek(0)
1264 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
1265 # Check that Ctrl+Z doesn't truncate the file
1266 f.write("foo\x1abar\n")
1267 f.seek(0)
1268 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
1269 self.assertTrue(f._rolled)
1270 self.assertEqual(f.mode, 'w+')
1271 self.assertIsNotNone(f.name)
1272 self.assertEqual(f.newlines, os.linesep)
1273 self.assertEqual(f.encoding, "utf-8")
1274 self.assertEqual(f.errors, "strict")
1275
1276 def test_text_newline_and_encoding(self):
1277 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1278 newline='', encoding='utf-8',
1279 errors='ignore')
1280 f.write("\u039B\r\n")
1281 f.seek(0)
1282 self.assertEqual(f.read(), "\u039B\r\n")
1283 self.assertFalse(f._rolled)
1284 self.assertEqual(f.mode, 'w+')
1285 self.assertIsNone(f.name)
1286 self.assertIsNotNone(f.newlines)
1287 self.assertEqual(f.encoding, "utf-8")
1288 self.assertEqual(f.errors, "ignore")
1289
1290 f.write("\u039C" * 10 + "\r\n")
1291 f.write("\u039D" * 20)
1292 f.seek(0)
1293 self.assertEqual(f.read(),
1294 "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20))
1295 self.assertTrue(f._rolled)
1296 self.assertEqual(f.mode, 'w+')
1297 self.assertIsNotNone(f.name)
1298 self.assertIsNotNone(f.newlines)
1299 self.assertEqual(f.encoding, 'utf-8')
1300 self.assertEqual(f.errors, 'ignore')
1301
1302 def test_context_manager_before_rollover(self):
1303 # A SpooledTemporaryFile can be used as a context manager
1304 with tempfile.SpooledTemporaryFile(max_size=1) as f:
1305 self.assertFalse(f._rolled)
1306 self.assertFalse(f.closed)
1307 self.assertTrue(f.closed)
1308 def use_closed():
1309 with f:
1310 pass
1311 self.assertRaises(ValueError, use_closed)
1312
1313 def test_context_manager_during_rollover(self):
1314 # A SpooledTemporaryFile can be used as a context manager
1315 with tempfile.SpooledTemporaryFile(max_size=1) as f:
1316 self.assertFalse(f._rolled)
1317 f.write(b'abc\n')
1318 f.flush()
1319 self.assertTrue(f._rolled)
1320 self.assertFalse(f.closed)
1321 self.assertTrue(f.closed)
1322 def use_closed():
1323 with f:
1324 pass
1325 self.assertRaises(ValueError, use_closed)
1326
1327 def test_context_manager_after_rollover(self):
1328 # A SpooledTemporaryFile can be used as a context manager
1329 f = tempfile.SpooledTemporaryFile(max_size=1)
1330 f.write(b'abc\n')
1331 f.flush()
1332 self.assertTrue(f._rolled)
1333 with f:
1334 self.assertFalse(f.closed)
1335 self.assertTrue(f.closed)
1336 def use_closed():
1337 with f:
1338 pass
1339 self.assertRaises(ValueError, use_closed)
1340
1341 @unittest.skipIf(
1342 support.is_emscripten, "Emscripten cannot fstat renamed files."
1343 )
1344 def test_truncate_with_size_parameter(self):
1345 # A SpooledTemporaryFile can be truncated to zero size
1346 f = tempfile.SpooledTemporaryFile(max_size=10)
1347 f.write(b'abcdefg\n')
1348 f.seek(0)
1349 f.truncate()
1350 self.assertFalse(f._rolled)
1351 self.assertEqual(f._file.getvalue(), b'')
1352 # A SpooledTemporaryFile can be truncated to a specific size
1353 f = tempfile.SpooledTemporaryFile(max_size=10)
1354 f.write(b'abcdefg\n')
1355 f.truncate(4)
1356 self.assertFalse(f._rolled)
1357 self.assertEqual(f._file.getvalue(), b'abcd')
1358 # A SpooledTemporaryFile rolls over if truncated to large size
1359 f = tempfile.SpooledTemporaryFile(max_size=10)
1360 f.write(b'abcdefg\n')
1361 f.truncate(20)
1362 self.assertTrue(f._rolled)
1363 self.assertEqual(os.fstat(f.fileno()).st_size, 20)
1364
1365 def test_class_getitem(self):
1366 self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes],
1367 types.GenericAlias)
1368
1369 if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1370
1371 class ESC[4;38;5;81mTestTemporaryFile(ESC[4;38;5;149mBaseTestCase):
1372 """Test TemporaryFile()."""
1373
1374 def test_basic(self):
1375 # TemporaryFile can create files
1376 # No point in testing the name params - the file has no name.
1377 tempfile.TemporaryFile()
1378
1379 def test_has_no_name(self):
1380 # TemporaryFile creates files with no names (on this system)
1381 dir = tempfile.mkdtemp()
1382 f = tempfile.TemporaryFile(dir=dir)
1383 f.write(b'blat')
1384
1385 # Sneaky: because this file has no name, it should not prevent
1386 # us from removing the directory it was created in.
1387 try:
1388 os.rmdir(dir)
1389 except:
1390 # cleanup
1391 f.close()
1392 os.rmdir(dir)
1393 raise
1394
1395 def test_multiple_close(self):
1396 # A TemporaryFile can be closed many times without error
1397 f = tempfile.TemporaryFile()
1398 f.write(b'abc\n')
1399 f.close()
1400 f.close()
1401 f.close()
1402
1403 # How to test the mode and bufsize parameters?
1404 def test_mode_and_encoding(self):
1405
1406 def roundtrip(input, *args, **kwargs):
1407 with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
1408 fileobj.write(input)
1409 fileobj.seek(0)
1410 self.assertEqual(input, fileobj.read())
1411
1412 roundtrip(b"1234", "w+b")
1413 roundtrip("abdc\n", "w+")
1414 roundtrip("\u039B", "w+", encoding="utf-16")
1415 roundtrip("foo\r\n", "w+", newline="")
1416
1417 def test_bad_mode(self):
1418 dir = tempfile.mkdtemp()
1419 self.addCleanup(os_helper.rmtree, dir)
1420 with self.assertRaises(ValueError):
1421 tempfile.TemporaryFile(mode='wr', dir=dir)
1422 with self.assertRaises(TypeError):
1423 tempfile.TemporaryFile(mode=2, dir=dir)
1424 self.assertEqual(os.listdir(dir), [])
1425
1426 def test_bad_encoding(self):
1427 dir = tempfile.mkdtemp()
1428 self.addCleanup(os_helper.rmtree, dir)
1429 with self.assertRaises(LookupError):
1430 tempfile.TemporaryFile('w', encoding='bad-encoding', dir=dir)
1431 self.assertEqual(os.listdir(dir), [])
1432
1433 def test_unexpected_error(self):
1434 dir = tempfile.mkdtemp()
1435 self.addCleanup(os_helper.rmtree, dir)
1436 with mock.patch('tempfile._O_TMPFILE_WORKS', False), \
1437 mock.patch('os.unlink') as mock_unlink, \
1438 mock.patch('os.open') as mock_open, \
1439 mock.patch('os.close') as mock_close:
1440 mock_unlink.side_effect = KeyboardInterrupt()
1441 with self.assertRaises(KeyboardInterrupt):
1442 tempfile.TemporaryFile(dir=dir)
1443 mock_close.assert_called()
1444 self.assertEqual(os.listdir(dir), [])
1445
1446
1447 # Helper for test_del_on_shutdown
1448 class ESC[4;38;5;81mNulledModules:
1449 def __init__(self, *modules):
1450 self.refs = [mod.__dict__ for mod in modules]
1451 self.contents = [ref.copy() for ref in self.refs]
1452
1453 def __enter__(self):
1454 for d in self.refs:
1455 for key in d:
1456 d[key] = None
1457
1458 def __exit__(self, *exc_info):
1459 for d, c in zip(self.refs, self.contents):
1460 d.clear()
1461 d.update(c)
1462
1463
1464 class ESC[4;38;5;81mTestTemporaryDirectory(ESC[4;38;5;149mBaseTestCase):
1465 """Test TemporaryDirectory()."""
1466
1467 def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1,
1468 ignore_cleanup_errors=False):
1469 if dir is None:
1470 dir = tempfile.gettempdir()
1471 tmp = tempfile.TemporaryDirectory(
1472 dir=dir, prefix=pre, suffix=suf,
1473 ignore_cleanup_errors=ignore_cleanup_errors)
1474 self.nameCheck(tmp.name, dir, pre, suf)
1475 self.do_create2(tmp.name, recurse, dirs, files)
1476 return tmp
1477
1478 def do_create2(self, path, recurse=1, dirs=1, files=1):
1479 # Create subdirectories and some files
1480 if recurse:
1481 for i in range(dirs):
1482 name = os.path.join(path, "dir%d" % i)
1483 os.mkdir(name)
1484 self.do_create2(name, recurse-1, dirs, files)
1485 for i in range(files):
1486 with open(os.path.join(path, "test%d.txt" % i), "wb") as f:
1487 f.write(b"Hello world!")
1488
1489 def test_mkdtemp_failure(self):
1490 # Check no additional exception if mkdtemp fails
1491 # Previously would raise AttributeError instead
1492 # (noted as part of Issue #10188)
1493 with tempfile.TemporaryDirectory() as nonexistent:
1494 pass
1495 with self.assertRaises(FileNotFoundError) as cm:
1496 tempfile.TemporaryDirectory(dir=nonexistent)
1497 self.assertEqual(cm.exception.errno, errno.ENOENT)
1498
1499 def test_explicit_cleanup(self):
1500 # A TemporaryDirectory is deleted when cleaned up
1501 dir = tempfile.mkdtemp()
1502 try:
1503 d = self.do_create(dir=dir)
1504 self.assertTrue(os.path.exists(d.name),
1505 "TemporaryDirectory %s does not exist" % d.name)
1506 d.cleanup()
1507 self.assertFalse(os.path.exists(d.name),
1508 "TemporaryDirectory %s exists after cleanup" % d.name)
1509 finally:
1510 os.rmdir(dir)
1511
1512 def test_explict_cleanup_ignore_errors(self):
1513 """Test that cleanup doesn't return an error when ignoring them."""
1514 with tempfile.TemporaryDirectory() as working_dir:
1515 temp_dir = self.do_create(
1516 dir=working_dir, ignore_cleanup_errors=True)
1517 temp_path = pathlib.Path(temp_dir.name)
1518 self.assertTrue(temp_path.exists(),
1519 f"TemporaryDirectory {temp_path!s} does not exist")
1520 with open(temp_path / "a_file.txt", "w+t") as open_file:
1521 open_file.write("Hello world!\n")
1522 temp_dir.cleanup()
1523 self.assertEqual(len(list(temp_path.glob("*"))),
1524 int(sys.platform.startswith("win")),
1525 "Unexpected number of files in "
1526 f"TemporaryDirectory {temp_path!s}")
1527 self.assertEqual(
1528 temp_path.exists(),
1529 sys.platform.startswith("win"),
1530 f"TemporaryDirectory {temp_path!s} existence state unexpected")
1531 temp_dir.cleanup()
1532 self.assertFalse(
1533 temp_path.exists(),
1534 f"TemporaryDirectory {temp_path!s} exists after cleanup")
1535
1536 @os_helper.skip_unless_symlink
1537 def test_cleanup_with_symlink_to_a_directory(self):
1538 # cleanup() should not follow symlinks to directories (issue #12464)
1539 d1 = self.do_create()
1540 d2 = self.do_create(recurse=0)
1541
1542 # Symlink d1/foo -> d2
1543 os.symlink(d2.name, os.path.join(d1.name, "foo"))
1544
1545 # This call to cleanup() should not follow the "foo" symlink
1546 d1.cleanup()
1547
1548 self.assertFalse(os.path.exists(d1.name),
1549 "TemporaryDirectory %s exists after cleanup" % d1.name)
1550 self.assertTrue(os.path.exists(d2.name),
1551 "Directory pointed to by a symlink was deleted")
1552 self.assertEqual(os.listdir(d2.name), ['test0.txt'],
1553 "Contents of the directory pointed to by a symlink "
1554 "were deleted")
1555 d2.cleanup()
1556
1557 @support.cpython_only
1558 def test_del_on_collection(self):
1559 # A TemporaryDirectory is deleted when garbage collected
1560 dir = tempfile.mkdtemp()
1561 try:
1562 d = self.do_create(dir=dir)
1563 name = d.name
1564 del d # Rely on refcounting to invoke __del__
1565 self.assertFalse(os.path.exists(name),
1566 "TemporaryDirectory %s exists after __del__" % name)
1567 finally:
1568 os.rmdir(dir)
1569
1570 @support.cpython_only
1571 def test_del_on_collection_ignore_errors(self):
1572 """Test that ignoring errors works when TemporaryDirectory is gced."""
1573 with tempfile.TemporaryDirectory() as working_dir:
1574 temp_dir = self.do_create(
1575 dir=working_dir, ignore_cleanup_errors=True)
1576 temp_path = pathlib.Path(temp_dir.name)
1577 self.assertTrue(temp_path.exists(),
1578 f"TemporaryDirectory {temp_path!s} does not exist")
1579 with open(temp_path / "a_file.txt", "w+t") as open_file:
1580 open_file.write("Hello world!\n")
1581 del temp_dir
1582 self.assertEqual(len(list(temp_path.glob("*"))),
1583 int(sys.platform.startswith("win")),
1584 "Unexpected number of files in "
1585 f"TemporaryDirectory {temp_path!s}")
1586 self.assertEqual(
1587 temp_path.exists(),
1588 sys.platform.startswith("win"),
1589 f"TemporaryDirectory {temp_path!s} existence state unexpected")
1590
1591 def test_del_on_shutdown(self):
1592 # A TemporaryDirectory may be cleaned up during shutdown
1593 with self.do_create() as dir:
1594 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
1595 code = """if True:
1596 import builtins
1597 import os
1598 import shutil
1599 import sys
1600 import tempfile
1601 import warnings
1602
1603 tmp = tempfile.TemporaryDirectory(dir={dir!r})
1604 sys.stdout.buffer.write(tmp.name.encode())
1605
1606 tmp2 = os.path.join(tmp.name, 'test_dir')
1607 os.mkdir(tmp2)
1608 with open(os.path.join(tmp2, "test0.txt"), "w") as f:
1609 f.write("Hello world!")
1610
1611 {mod}.tmp = tmp
1612
1613 warnings.filterwarnings("always", category=ResourceWarning)
1614 """.format(dir=dir, mod=mod)
1615 rc, out, err = script_helper.assert_python_ok("-c", code)
1616 tmp_name = out.decode().strip()
1617 self.assertFalse(os.path.exists(tmp_name),
1618 "TemporaryDirectory %s exists after cleanup" % tmp_name)
1619 err = err.decode('utf-8', 'backslashreplace')
1620 self.assertNotIn("Exception ", err)
1621 self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1622
1623 def test_del_on_shutdown_ignore_errors(self):
1624 """Test ignoring errors works when a tempdir is gc'ed on shutdown."""
1625 with tempfile.TemporaryDirectory() as working_dir:
1626 code = """if True:
1627 import pathlib
1628 import sys
1629 import tempfile
1630 import warnings
1631
1632 temp_dir = tempfile.TemporaryDirectory(
1633 dir={working_dir!r}, ignore_cleanup_errors=True)
1634 sys.stdout.buffer.write(temp_dir.name.encode())
1635
1636 temp_dir_2 = pathlib.Path(temp_dir.name) / "test_dir"
1637 temp_dir_2.mkdir()
1638 with open(temp_dir_2 / "test0.txt", "w") as test_file:
1639 test_file.write("Hello world!")
1640 open_file = open(temp_dir_2 / "open_file.txt", "w")
1641 open_file.write("Hello world!")
1642
1643 warnings.filterwarnings("always", category=ResourceWarning)
1644 """.format(working_dir=working_dir)
1645 __, out, err = script_helper.assert_python_ok("-c", code)
1646 temp_path = pathlib.Path(out.decode().strip())
1647 self.assertEqual(len(list(temp_path.glob("*"))),
1648 int(sys.platform.startswith("win")),
1649 "Unexpected number of files in "
1650 f"TemporaryDirectory {temp_path!s}")
1651 self.assertEqual(
1652 temp_path.exists(),
1653 sys.platform.startswith("win"),
1654 f"TemporaryDirectory {temp_path!s} existence state unexpected")
1655 err = err.decode('utf-8', 'backslashreplace')
1656 self.assertNotIn("Exception", err)
1657 self.assertNotIn("Error", err)
1658 self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1659
1660 def test_exit_on_shutdown(self):
1661 # Issue #22427
1662 with self.do_create() as dir:
1663 code = """if True:
1664 import sys
1665 import tempfile
1666 import warnings
1667
1668 def generator():
1669 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
1670 yield tmp
1671 g = generator()
1672 sys.stdout.buffer.write(next(g).encode())
1673
1674 warnings.filterwarnings("always", category=ResourceWarning)
1675 """.format(dir=dir)
1676 rc, out, err = script_helper.assert_python_ok("-c", code)
1677 tmp_name = out.decode().strip()
1678 self.assertFalse(os.path.exists(tmp_name),
1679 "TemporaryDirectory %s exists after cleanup" % tmp_name)
1680 err = err.decode('utf-8', 'backslashreplace')
1681 self.assertNotIn("Exception ", err)
1682 self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1683
1684 def test_warnings_on_cleanup(self):
1685 # ResourceWarning will be triggered by __del__
1686 with self.do_create() as dir:
1687 d = self.do_create(dir=dir, recurse=3)
1688 name = d.name
1689
1690 # Check for the resource warning
1691 with warnings_helper.check_warnings(('Implicitly',
1692 ResourceWarning),
1693 quiet=False):
1694 warnings.filterwarnings("always", category=ResourceWarning)
1695 del d
1696 support.gc_collect()
1697 self.assertFalse(os.path.exists(name),
1698 "TemporaryDirectory %s exists after __del__" % name)
1699
1700 def test_multiple_close(self):
1701 # Can be cleaned-up many times without error
1702 d = self.do_create()
1703 d.cleanup()
1704 d.cleanup()
1705 d.cleanup()
1706
1707 def test_context_manager(self):
1708 # Can be used as a context manager
1709 d = self.do_create()
1710 with d as name:
1711 self.assertTrue(os.path.exists(name))
1712 self.assertEqual(name, d.name)
1713 self.assertFalse(os.path.exists(name))
1714
1715 def test_modes(self):
1716 for mode in range(8):
1717 mode <<= 6
1718 with self.subTest(mode=format(mode, '03o')):
1719 d = self.do_create(recurse=3, dirs=2, files=2)
1720 with d:
1721 # Change files and directories mode recursively.
1722 for root, dirs, files in os.walk(d.name, topdown=False):
1723 for name in files:
1724 os.chmod(os.path.join(root, name), mode)
1725 os.chmod(root, mode)
1726 d.cleanup()
1727 self.assertFalse(os.path.exists(d.name))
1728
1729 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
1730 def test_flags(self):
1731 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
1732
1733 # skip the test if these flags are not supported (ex: FreeBSD 13)
1734 filename = os_helper.TESTFN
1735 try:
1736 open(filename, "w").close()
1737 try:
1738 os.chflags(filename, flags)
1739 except OSError as exc:
1740 # "OSError: [Errno 45] Operation not supported"
1741 self.skipTest(f"chflags() doesn't support "
1742 f"UF_IMMUTABLE|UF_NOUNLINK: {exc}")
1743 else:
1744 os.chflags(filename, 0)
1745 finally:
1746 os_helper.unlink(filename)
1747
1748 d = self.do_create(recurse=3, dirs=2, files=2)
1749 with d:
1750 # Change files and directories flags recursively.
1751 for root, dirs, files in os.walk(d.name, topdown=False):
1752 for name in files:
1753 os.chflags(os.path.join(root, name), flags)
1754 os.chflags(root, flags)
1755 d.cleanup()
1756 self.assertFalse(os.path.exists(d.name))
1757
1758
1759 if __name__ == "__main__":
1760 unittest.main()