python (3.12.0)
1 # tempfile.py unit tests.
2 import tempfile
3 import errno
4 import io
5 import os
6 import pathlib
7 import sys
8 import re
9 import warnings
10 import contextlib
11 import stat
12 import types
13 import weakref
14 import gc
15 import shutil
16 from unittest import mock
17
18 import unittest
19 from test import support
20 from test.support import os_helper
21 from test.support import script_helper
22 from test.support import warnings_helper
23
24
25 has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
26 has_spawnl = hasattr(os, 'spawnl')
27
28 # TEST_FILES may need to be tweaked for systems depending on the maximum
29 # number of files that can be opened at one time (see ulimit -n)
30 if sys.platform.startswith('openbsd'):
31 TEST_FILES = 48
32 else:
33 TEST_FILES = 100
34
35 # This is organized as one test for each chunk of code in tempfile.py,
36 # in order of their appearance in the file. Testing which requires
37 # threads is not done here.
38
39 class ESC[4;38;5;81mTestLowLevelInternals(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
40 def test_infer_return_type_singles(self):
41 self.assertIs(str, tempfile._infer_return_type(''))
42 self.assertIs(bytes, tempfile._infer_return_type(b''))
43 self.assertIs(str, tempfile._infer_return_type(None))
44
45 def test_infer_return_type_multiples(self):
46 self.assertIs(str, tempfile._infer_return_type('', ''))
47 self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
48 with self.assertRaises(TypeError):
49 tempfile._infer_return_type('', b'')
50 with self.assertRaises(TypeError):
51 tempfile._infer_return_type(b'', '')
52
53 def test_infer_return_type_multiples_and_none(self):
54 self.assertIs(str, tempfile._infer_return_type(None, ''))
55 self.assertIs(str, tempfile._infer_return_type('', None))
56 self.assertIs(str, tempfile._infer_return_type(None, None))
57 self.assertIs(bytes, tempfile._infer_return_type(b'', None))
58 self.assertIs(bytes, tempfile._infer_return_type(None, b''))
59 with self.assertRaises(TypeError):
60 tempfile._infer_return_type('', None, b'')
61 with self.assertRaises(TypeError):
62 tempfile._infer_return_type(b'', None, '')
63
64 def test_infer_return_type_pathlib(self):
65 self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/')))
66
67 def test_infer_return_type_pathlike(self):
68 class ESC[4;38;5;81mPath:
69 def __init__(self, path):
70 self.path = path
71
72 def __fspath__(self):
73 return self.path
74
75 self.assertIs(str, tempfile._infer_return_type(Path('/')))
76 self.assertIs(bytes, tempfile._infer_return_type(Path(b'/')))
77 self.assertIs(str, tempfile._infer_return_type('', Path('')))
78 self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b'')))
79 self.assertIs(bytes, tempfile._infer_return_type(None, Path(b'')))
80 self.assertIs(str, tempfile._infer_return_type(None, Path('')))
81
82 with self.assertRaises(TypeError):
83 tempfile._infer_return_type('', Path(b''))
84 with self.assertRaises(TypeError):
85 tempfile._infer_return_type(b'', Path(''))
86
87 # Common functionality.
88
89 class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
90
91 str_check = re.compile(r"^[a-z0-9_-]{8}$")
92 b_check = re.compile(br"^[a-z0-9_-]{8}$")
93
94 def setUp(self):
95 self.enterContext(warnings_helper.check_warnings())
96 warnings.filterwarnings("ignore", category=RuntimeWarning,
97 message="mktemp", module=__name__)
98
99 def nameCheck(self, name, dir, pre, suf):
100 (ndir, nbase) = os.path.split(name)
101 npre = nbase[:len(pre)]
102 nsuf = nbase[len(nbase)-len(suf):]
103
104 if dir is not None:
105 self.assertIs(
106 type(name),
107 str
108 if type(dir) is str or isinstance(dir, os.PathLike) else
109 bytes,
110 "unexpected return type",
111 )
112 if pre is not None:
113 self.assertIs(type(name), str if type(pre) is str else bytes,
114 "unexpected return type")
115 if suf is not None:
116 self.assertIs(type(name), str if type(suf) is str else bytes,
117 "unexpected return type")
118 if (dir, pre, suf) == (None, None, None):
119 self.assertIs(type(name), str, "default return type must be str")
120
121 # check for equality of the absolute paths!
122 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
123 "file %r not in directory %r" % (name, dir))
124 self.assertEqual(npre, pre,
125 "file %r does not begin with %r" % (nbase, pre))
126 self.assertEqual(nsuf, suf,
127 "file %r does not end with %r" % (nbase, suf))
128
129 nbase = nbase[len(pre):len(nbase)-len(suf)]
130 check = self.str_check if isinstance(nbase, str) else self.b_check
131 self.assertTrue(check.match(nbase),
132 "random characters %r do not match %r"
133 % (nbase, check.pattern))
134
135
136 class ESC[4;38;5;81mTestExports(ESC[4;38;5;149mBaseTestCase):
137 def test_exports(self):
138 # There are no surprising symbols in the tempfile module
139 dict = tempfile.__dict__
140
141 expected = {
142 "NamedTemporaryFile" : 1,
143 "TemporaryFile" : 1,
144 "mkstemp" : 1,
145 "mkdtemp" : 1,
146 "mktemp" : 1,
147 "TMP_MAX" : 1,
148 "gettempprefix" : 1,
149 "gettempprefixb" : 1,
150 "gettempdir" : 1,
151 "gettempdirb" : 1,
152 "tempdir" : 1,
153 "template" : 1,
154 "SpooledTemporaryFile" : 1,
155 "TemporaryDirectory" : 1,
156 }
157
158 unexp = []
159 for key in dict:
160 if key[0] != '_' and key not in expected:
161 unexp.append(key)
162 self.assertTrue(len(unexp) == 0,
163 "unexpected keys: %s" % unexp)
164
165
166 class ESC[4;38;5;81mTestRandomNameSequence(ESC[4;38;5;149mBaseTestCase):
167 """Test the internal iterator object _RandomNameSequence."""
168
169 def setUp(self):
170 self.r = tempfile._RandomNameSequence()
171 super().setUp()
172
173 def test_get_eight_char_str(self):
174 # _RandomNameSequence returns a eight-character string
175 s = next(self.r)
176 self.nameCheck(s, '', '', '')
177
178 def test_many(self):
179 # _RandomNameSequence returns no duplicate strings (stochastic)
180
181 dict = {}
182 r = self.r
183 for i in range(TEST_FILES):
184 s = next(r)
185 self.nameCheck(s, '', '', '')
186 self.assertNotIn(s, dict)
187 dict[s] = 1
188
189 def supports_iter(self):
190 # _RandomNameSequence supports the iterator protocol
191
192 i = 0
193 r = self.r
194 for s in r:
195 i += 1
196 if i == 20:
197 break
198
199 @support.requires_fork()
200 def test_process_awareness(self):
201 # ensure that the random source differs between
202 # child and parent.
203 read_fd, write_fd = os.pipe()
204 pid = None
205 try:
206 pid = os.fork()
207 if not pid:
208 # child process
209 os.close(read_fd)
210 os.write(write_fd, next(self.r).encode("ascii"))
211 os.close(write_fd)
212 # bypass the normal exit handlers- leave those to
213 # the parent.
214 os._exit(0)
215
216 # parent process
217 parent_value = next(self.r)
218 child_value = os.read(read_fd, len(parent_value)).decode("ascii")
219 finally:
220 if pid:
221 support.wait_process(pid, exitcode=0)
222
223 os.close(read_fd)
224 os.close(write_fd)
225 self.assertNotEqual(child_value, parent_value)
226
227
228
229 class ESC[4;38;5;81mTestCandidateTempdirList(ESC[4;38;5;149mBaseTestCase):
230 """Test the internal function _candidate_tempdir_list."""
231
232 def test_nonempty_list(self):
233 # _candidate_tempdir_list returns a nonempty list of strings
234
235 cand = tempfile._candidate_tempdir_list()
236
237 self.assertFalse(len(cand) == 0)
238 for c in cand:
239 self.assertIsInstance(c, str)
240
241 def test_wanted_dirs(self):
242 # _candidate_tempdir_list contains the expected directories
243
244 # Make sure the interesting environment variables are all set.
245 with os_helper.EnvironmentVarGuard() as env:
246 for envname in 'TMPDIR', 'TEMP', 'TMP':
247 dirname = os.getenv(envname)
248 if not dirname:
249 env[envname] = os.path.abspath(envname)
250
251 cand = tempfile._candidate_tempdir_list()
252
253 for envname in 'TMPDIR', 'TEMP', 'TMP':
254 dirname = os.getenv(envname)
255 if not dirname: raise ValueError
256 self.assertIn(dirname, cand)
257
258 try:
259 dirname = os.getcwd()
260 except (AttributeError, OSError):
261 dirname = os.curdir
262
263 self.assertIn(dirname, cand)
264
265 # Not practical to try to verify the presence of OS-specific
266 # paths in this list.
267
268
269 # We test _get_default_tempdir some more by testing gettempdir.
270
271 class ESC[4;38;5;81mTestGetDefaultTempdir(ESC[4;38;5;149mBaseTestCase):
272 """Test _get_default_tempdir()."""
273
274 def test_no_files_left_behind(self):
275 # use a private empty directory
276 with tempfile.TemporaryDirectory() as our_temp_directory:
277 # force _get_default_tempdir() to consider our empty directory
278 def our_candidate_list():
279 return [our_temp_directory]
280
281 with support.swap_attr(tempfile, "_candidate_tempdir_list",
282 our_candidate_list):
283 # verify our directory is empty after _get_default_tempdir()
284 tempfile._get_default_tempdir()
285 self.assertEqual(os.listdir(our_temp_directory), [])
286
287 def raise_OSError(*args, **kwargs):
288 raise OSError()
289
290 with support.swap_attr(os, "open", raise_OSError):
291 # test again with failing os.open()
292 with self.assertRaises(FileNotFoundError):
293 tempfile._get_default_tempdir()
294 self.assertEqual(os.listdir(our_temp_directory), [])
295
296 with support.swap_attr(os, "write", raise_OSError):
297 # test again with failing os.write()
298 with self.assertRaises(FileNotFoundError):
299 tempfile._get_default_tempdir()
300 self.assertEqual(os.listdir(our_temp_directory), [])
301
302
303 class ESC[4;38;5;81mTestGetCandidateNames(ESC[4;38;5;149mBaseTestCase):
304 """Test the internal function _get_candidate_names."""
305
306 def test_retval(self):
307 # _get_candidate_names returns a _RandomNameSequence object
308 obj = tempfile._get_candidate_names()
309 self.assertIsInstance(obj, tempfile._RandomNameSequence)
310
311 def test_same_thing(self):
312 # _get_candidate_names always returns the same object
313 a = tempfile._get_candidate_names()
314 b = tempfile._get_candidate_names()
315
316 self.assertTrue(a is b)
317
318
319 @contextlib.contextmanager
320 def _inside_empty_temp_dir():
321 dir = tempfile.mkdtemp()
322 try:
323 with support.swap_attr(tempfile, 'tempdir', dir):
324 yield
325 finally:
326 os_helper.rmtree(dir)
327
328
329 def _mock_candidate_names(*names):
330 return support.swap_attr(tempfile,
331 '_get_candidate_names',
332 lambda: iter(names))
333
334
335 class ESC[4;38;5;81mTestBadTempdir:
336
337 @unittest.skipIf(
338 support.is_emscripten, "Emscripten cannot remove write bits."
339 )
340 def test_read_only_directory(self):
341 with _inside_empty_temp_dir():
342 oldmode = mode = os.stat(tempfile.tempdir).st_mode
343 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
344 os.chmod(tempfile.tempdir, mode)
345 try:
346 if os.access(tempfile.tempdir, os.W_OK):
347 self.skipTest("can't set the directory read-only")
348 with self.assertRaises(PermissionError):
349 self.make_temp()
350 self.assertEqual(os.listdir(tempfile.tempdir), [])
351 finally:
352 os.chmod(tempfile.tempdir, oldmode)
353
354 def test_nonexisting_directory(self):
355 with _inside_empty_temp_dir():
356 tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
357 with support.swap_attr(tempfile, 'tempdir', tempdir):
358 with self.assertRaises(FileNotFoundError):
359 self.make_temp()
360
361 def test_non_directory(self):
362 with _inside_empty_temp_dir():
363 tempdir = os.path.join(tempfile.tempdir, 'file')
364 open(tempdir, 'wb').close()
365 with support.swap_attr(tempfile, 'tempdir', tempdir):
366 with self.assertRaises((NotADirectoryError, FileNotFoundError)):
367 self.make_temp()
368
369
370 class ESC[4;38;5;81mTestMkstempInner(ESC[4;38;5;149mTestBadTempdir, ESC[4;38;5;149mBaseTestCase):
371 """Test the internal function _mkstemp_inner."""
372
373 class ESC[4;38;5;81mmkstemped:
374 _bflags = tempfile._bin_openflags
375 _tflags = tempfile._text_openflags
376 _close = os.close
377 _unlink = os.unlink
378
379 def __init__(self, dir, pre, suf, bin):
380 if bin: flags = self._bflags
381 else: flags = self._tflags
382
383 output_type = tempfile._infer_return_type(dir, pre, suf)
384 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
385
386 def write(self, str):
387 os.write(self.fd, str)
388
389 def __del__(self):
390 self._close(self.fd)
391 self._unlink(self.name)
392
393 def do_create(self, dir=None, pre=None, suf=None, bin=1):
394 output_type = tempfile._infer_return_type(dir, pre, suf)
395 if dir is None:
396 if output_type is str:
397 dir = tempfile.gettempdir()
398 else:
399 dir = tempfile.gettempdirb()
400 if pre is None:
401 pre = output_type()
402 if suf is None:
403 suf = output_type()
404 file = self.mkstemped(dir, pre, suf, bin)
405
406 self.nameCheck(file.name, dir, pre, suf)
407 return file
408
409 def test_basic(self):
410 # _mkstemp_inner can create files
411 self.do_create().write(b"blat")
412 self.do_create(pre="a").write(b"blat")
413 self.do_create(suf="b").write(b"blat")
414 self.do_create(pre="a", suf="b").write(b"blat")
415 self.do_create(pre="aa", suf=".txt").write(b"blat")
416
417 def test_basic_with_bytes_names(self):
418 # _mkstemp_inner can create files when given name parts all
419 # specified as bytes.
420 dir_b = tempfile.gettempdirb()
421 self.do_create(dir=dir_b, suf=b"").write(b"blat")
422 self.do_create(dir=dir_b, pre=b"a").write(b"blat")
423 self.do_create(dir=dir_b, suf=b"b").write(b"blat")
424 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
425 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
426 # Can't mix str & binary types in the args.
427 with self.assertRaises(TypeError):
428 self.do_create(dir="", suf=b"").write(b"blat")
429 with self.assertRaises(TypeError):
430 self.do_create(dir=dir_b, pre="").write(b"blat")
431 with self.assertRaises(TypeError):
432 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
433
434 def test_basic_many(self):
435 # _mkstemp_inner can create many files (stochastic)
436 extant = list(range(TEST_FILES))
437 for i in extant:
438 extant[i] = self.do_create(pre="aa")
439
440 def test_choose_directory(self):
441 # _mkstemp_inner can create files in a user-selected directory
442 dir = tempfile.mkdtemp()
443 try:
444 self.do_create(dir=dir).write(b"blat")
445 self.do_create(dir=pathlib.Path(dir)).write(b"blat")
446 finally:
447 support.gc_collect() # For PyPy or other GCs.
448 os.rmdir(dir)
449
450 @os_helper.skip_unless_working_chmod
451 def test_file_mode(self):
452 # _mkstemp_inner creates files with the proper mode
453
454 file = self.do_create()
455 mode = stat.S_IMODE(os.stat(file.name).st_mode)
456 expected = 0o600
457 if sys.platform == 'win32':
458 # There's no distinction among 'user', 'group' and 'world';
459 # replicate the 'user' bits.
460 user = expected >> 6
461 expected = user * (1 + 8 + 64)
462 self.assertEqual(mode, expected)
463
464 @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
465 @support.requires_subprocess()
466 def test_noinherit(self):
467 # _mkstemp_inner file handles are not inherited by child processes
468
469 if support.verbose:
470 v="v"
471 else:
472 v="q"
473
474 file = self.do_create()
475 self.assertEqual(os.get_inheritable(file.fd), False)
476 fd = "%d" % file.fd
477
478 try:
479 me = __file__
480 except NameError:
481 me = sys.argv[0]
482
483 # We have to exec something, so that FD_CLOEXEC will take
484 # effect. The core of this test is therefore in
485 # tf_inherit_check.py, which see.
486 tester = os.path.join(os.path.dirname(os.path.abspath(me)),
487 "tf_inherit_check.py")
488
489 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
490 # but an arg with embedded spaces should be decorated with double
491 # quotes on each end
492 if sys.platform == 'win32':
493 decorated = '"%s"' % sys.executable
494 tester = '"%s"' % tester
495 else:
496 decorated = sys.executable
497
498 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
499 self.assertFalse(retval < 0,
500 "child process caught fatal signal %d" % -retval)
501 self.assertFalse(retval > 0, "child process reports failure %d"%retval)
502
503 @unittest.skipUnless(has_textmode, "text mode not available")
504 def test_textmode(self):
505 # _mkstemp_inner can create files in text mode
506
507 # A text file is truncated at the first Ctrl+Z byte
508 f = self.do_create(bin=0)
509 f.write(b"blat\x1a")
510 f.write(b"extra\n")
511 os.lseek(f.fd, 0, os.SEEK_SET)
512 self.assertEqual(os.read(f.fd, 20), b"blat")
513
514 def make_temp(self):
515 return tempfile._mkstemp_inner(tempfile.gettempdir(),
516 tempfile.gettempprefix(),
517 '',
518 tempfile._bin_openflags,
519 str)
520
521 def test_collision_with_existing_file(self):
522 # _mkstemp_inner tries another name when a file with
523 # the chosen name already exists
524 with _inside_empty_temp_dir(), \
525 _mock_candidate_names('aaa', 'aaa', 'bbb'):
526 (fd1, name1) = self.make_temp()
527 os.close(fd1)
528 self.assertTrue(name1.endswith('aaa'))
529
530 (fd2, name2) = self.make_temp()
531 os.close(fd2)
532 self.assertTrue(name2.endswith('bbb'))
533
534 def test_collision_with_existing_directory(self):
535 # _mkstemp_inner tries another name when a directory with
536 # the chosen name already exists
537 with _inside_empty_temp_dir(), \
538 _mock_candidate_names('aaa', 'aaa', 'bbb'):
539 dir = tempfile.mkdtemp()
540 self.assertTrue(dir.endswith('aaa'))
541
542 (fd, name) = self.make_temp()
543 os.close(fd)
544 self.assertTrue(name.endswith('bbb'))
545
546
547 class ESC[4;38;5;81mTestGetTempPrefix(ESC[4;38;5;149mBaseTestCase):
548 """Test gettempprefix()."""
549
550 def test_sane_template(self):
551 # gettempprefix returns a nonempty prefix string
552 p = tempfile.gettempprefix()
553
554 self.assertIsInstance(p, str)
555 self.assertGreater(len(p), 0)
556
557 pb = tempfile.gettempprefixb()
558
559 self.assertIsInstance(pb, bytes)
560 self.assertGreater(len(pb), 0)
561
562 def test_usable_template(self):
563 # gettempprefix returns a usable prefix string
564
565 # Create a temp directory, avoiding use of the prefix.
566 # Then attempt to create a file whose name is
567 # prefix + 'xxxxxx.xxx' in that directory.
568 p = tempfile.gettempprefix() + "xxxxxx.xxx"
569 d = tempfile.mkdtemp(prefix="")
570 try:
571 p = os.path.join(d, p)
572 fd = os.open(p, os.O_RDWR | os.O_CREAT)
573 os.close(fd)
574 os.unlink(p)
575 finally:
576 os.rmdir(d)
577
578
579 class ESC[4;38;5;81mTestGetTempDir(ESC[4;38;5;149mBaseTestCase):
580 """Test gettempdir()."""
581
582 def test_directory_exists(self):
583 # gettempdir returns a directory which exists
584
585 for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
586 self.assertTrue(os.path.isabs(d) or d == os.curdir,
587 "%r is not an absolute path" % d)
588 self.assertTrue(os.path.isdir(d),
589 "%r is not a directory" % d)
590
591 def test_directory_writable(self):
592 # gettempdir returns a directory writable by the user
593
594 # sneaky: just instantiate a NamedTemporaryFile, which
595 # defaults to writing into the directory returned by
596 # gettempdir.
597 with tempfile.NamedTemporaryFile() as file:
598 file.write(b"blat")
599
600 def test_same_thing(self):
601 # gettempdir always returns the same object
602 a = tempfile.gettempdir()
603 b = tempfile.gettempdir()
604 c = tempfile.gettempdirb()
605
606 self.assertTrue(a is b)
607 self.assertNotEqual(type(a), type(c))
608 self.assertEqual(a, os.fsdecode(c))
609
610 def test_case_sensitive(self):
611 # gettempdir should not flatten its case
612 # even on a case-insensitive file system
613 case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
614 _tempdir, tempfile.tempdir = tempfile.tempdir, None
615 try:
616 with os_helper.EnvironmentVarGuard() as env:
617 # Fake the first env var which is checked as a candidate
618 env["TMPDIR"] = case_sensitive_tempdir
619 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
620 finally:
621 tempfile.tempdir = _tempdir
622 os_helper.rmdir(case_sensitive_tempdir)
623
624
625 class ESC[4;38;5;81mTestMkstemp(ESC[4;38;5;149mBaseTestCase):
626 """Test mkstemp()."""
627
628 def do_create(self, dir=None, pre=None, suf=None):
629 output_type = tempfile._infer_return_type(dir, pre, suf)
630 if dir is None:
631 if output_type is str:
632 dir = tempfile.gettempdir()
633 else:
634 dir = tempfile.gettempdirb()
635 if pre is None:
636 pre = output_type()
637 if suf is None:
638 suf = output_type()
639 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
640 (ndir, nbase) = os.path.split(name)
641 adir = os.path.abspath(dir)
642 self.assertEqual(adir, ndir,
643 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
644
645 try:
646 self.nameCheck(name, dir, pre, suf)
647 finally:
648 os.close(fd)
649 os.unlink(name)
650
651 def test_basic(self):
652 # mkstemp can create files
653 self.do_create()
654 self.do_create(pre="a")
655 self.do_create(suf="b")
656 self.do_create(pre="a", suf="b")
657 self.do_create(pre="aa", suf=".txt")
658 self.do_create(dir=".")
659
660 def test_basic_with_bytes_names(self):
661 # mkstemp can create files when given name parts all
662 # specified as bytes.
663 d = tempfile.gettempdirb()
664 self.do_create(dir=d, suf=b"")
665 self.do_create(dir=d, pre=b"a")
666 self.do_create(dir=d, suf=b"b")
667 self.do_create(dir=d, pre=b"a", suf=b"b")
668 self.do_create(dir=d, pre=b"aa", suf=b".txt")
669 self.do_create(dir=b".")
670 with self.assertRaises(TypeError):
671 self.do_create(dir=".", pre=b"aa", suf=b".txt")
672 with self.assertRaises(TypeError):
673 self.do_create(dir=b".", pre="aa", suf=b".txt")
674 with self.assertRaises(TypeError):
675 self.do_create(dir=b".", pre=b"aa", suf=".txt")
676
677
678 def test_choose_directory(self):
679 # mkstemp can create directories in a user-selected directory
680 dir = tempfile.mkdtemp()
681 try:
682 self.do_create(dir=dir)
683 self.do_create(dir=pathlib.Path(dir))
684 finally:
685 os.rmdir(dir)
686
687 def test_for_tempdir_is_bytes_issue40701_api_warts(self):
688 orig_tempdir = tempfile.tempdir
689 self.assertIsInstance(tempfile.tempdir, (str, type(None)))
690 try:
691 fd, path = tempfile.mkstemp()
692 os.close(fd)
693 os.unlink(path)
694 self.assertIsInstance(path, str)
695 tempfile.tempdir = tempfile.gettempdirb()
696 self.assertIsInstance(tempfile.tempdir, bytes)
697 self.assertIsInstance(tempfile.gettempdir(), str)
698 self.assertIsInstance(tempfile.gettempdirb(), bytes)
699 fd, path = tempfile.mkstemp()
700 os.close(fd)
701 os.unlink(path)
702 self.assertIsInstance(path, bytes)
703 fd, path = tempfile.mkstemp(suffix='.txt')
704 os.close(fd)
705 os.unlink(path)
706 self.assertIsInstance(path, str)
707 fd, path = tempfile.mkstemp(prefix='test-temp-')
708 os.close(fd)
709 os.unlink(path)
710 self.assertIsInstance(path, str)
711 fd, path = tempfile.mkstemp(dir=tempfile.gettempdir())
712 os.close(fd)
713 os.unlink(path)
714 self.assertIsInstance(path, str)
715 finally:
716 tempfile.tempdir = orig_tempdir
717
718
719 class ESC[4;38;5;81mTestMkdtemp(ESC[4;38;5;149mTestBadTempdir, ESC[4;38;5;149mBaseTestCase):
720 """Test mkdtemp()."""
721
722 def make_temp(self):
723 return tempfile.mkdtemp()
724
725 def do_create(self, dir=None, pre=None, suf=None):
726 output_type = tempfile._infer_return_type(dir, pre, suf)
727 if dir is None:
728 if output_type is str:
729 dir = tempfile.gettempdir()
730 else:
731 dir = tempfile.gettempdirb()
732 if pre is None:
733 pre = output_type()
734 if suf is None:
735 suf = output_type()
736 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
737
738 try:
739 self.nameCheck(name, dir, pre, suf)
740 return name
741 except:
742 os.rmdir(name)
743 raise
744
745 def test_basic(self):
746 # mkdtemp can create directories
747 os.rmdir(self.do_create())
748 os.rmdir(self.do_create(pre="a"))
749 os.rmdir(self.do_create(suf="b"))
750 os.rmdir(self.do_create(pre="a", suf="b"))
751 os.rmdir(self.do_create(pre="aa", suf=".txt"))
752
753 def test_basic_with_bytes_names(self):
754 # mkdtemp can create directories when given all binary parts
755 d = tempfile.gettempdirb()
756 os.rmdir(self.do_create(dir=d))
757 os.rmdir(self.do_create(dir=d, pre=b"a"))
758 os.rmdir(self.do_create(dir=d, suf=b"b"))
759 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
760 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
761 with self.assertRaises(TypeError):
762 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
763 with self.assertRaises(TypeError):
764 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
765 with self.assertRaises(TypeError):
766 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
767
768 def test_basic_many(self):
769 # mkdtemp can create many directories (stochastic)
770 extant = list(range(TEST_FILES))
771 try:
772 for i in extant:
773 extant[i] = self.do_create(pre="aa")
774 finally:
775 for i in extant:
776 if(isinstance(i, str)):
777 os.rmdir(i)
778
779 def test_choose_directory(self):
780 # mkdtemp can create directories in a user-selected directory
781 dir = tempfile.mkdtemp()
782 try:
783 os.rmdir(self.do_create(dir=dir))
784 os.rmdir(self.do_create(dir=pathlib.Path(dir)))
785 finally:
786 os.rmdir(dir)
787
788 @os_helper.skip_unless_working_chmod
789 def test_mode(self):
790 # mkdtemp creates directories with the proper mode
791
792 dir = self.do_create()
793 try:
794 mode = stat.S_IMODE(os.stat(dir).st_mode)
795 mode &= 0o777 # Mask off sticky bits inherited from /tmp
796 expected = 0o700
797 if sys.platform == 'win32':
798 # There's no distinction among 'user', 'group' and 'world';
799 # replicate the 'user' bits.
800 user = expected >> 6
801 expected = user * (1 + 8 + 64)
802 self.assertEqual(mode, expected)
803 finally:
804 os.rmdir(dir)
805
806 def test_collision_with_existing_file(self):
807 # mkdtemp tries another name when a file with
808 # the chosen name already exists
809 with _inside_empty_temp_dir(), \
810 _mock_candidate_names('aaa', 'aaa', 'bbb'):
811 file = tempfile.NamedTemporaryFile(delete=False)
812 file.close()
813 self.assertTrue(file.name.endswith('aaa'))
814 dir = tempfile.mkdtemp()
815 self.assertTrue(dir.endswith('bbb'))
816
817 def test_collision_with_existing_directory(self):
818 # mkdtemp tries another name when a directory with
819 # the chosen name already exists
820 with _inside_empty_temp_dir(), \
821 _mock_candidate_names('aaa', 'aaa', 'bbb'):
822 dir1 = tempfile.mkdtemp()
823 self.assertTrue(dir1.endswith('aaa'))
824 dir2 = tempfile.mkdtemp()
825 self.assertTrue(dir2.endswith('bbb'))
826
827 def test_for_tempdir_is_bytes_issue40701_api_warts(self):
828 orig_tempdir = tempfile.tempdir
829 self.assertIsInstance(tempfile.tempdir, (str, type(None)))
830 try:
831 path = tempfile.mkdtemp()
832 os.rmdir(path)
833 self.assertIsInstance(path, str)
834 tempfile.tempdir = tempfile.gettempdirb()
835 self.assertIsInstance(tempfile.tempdir, bytes)
836 self.assertIsInstance(tempfile.gettempdir(), str)
837 self.assertIsInstance(tempfile.gettempdirb(), bytes)
838 path = tempfile.mkdtemp()
839 os.rmdir(path)
840 self.assertIsInstance(path, bytes)
841 path = tempfile.mkdtemp(suffix='-dir')
842 os.rmdir(path)
843 self.assertIsInstance(path, str)
844 path = tempfile.mkdtemp(prefix='test-mkdtemp-')
845 os.rmdir(path)
846 self.assertIsInstance(path, str)
847 path = tempfile.mkdtemp(dir=tempfile.gettempdir())
848 os.rmdir(path)
849 self.assertIsInstance(path, str)
850 finally:
851 tempfile.tempdir = orig_tempdir
852
853 def test_path_is_absolute(self):
854 # Test that the path returned by mkdtemp with a relative `dir`
855 # argument is absolute
856 try:
857 path = tempfile.mkdtemp(dir=".")
858 self.assertTrue(os.path.isabs(path))
859 finally:
860 os.rmdir(path)
861
862
863 class ESC[4;38;5;81mTestMktemp(ESC[4;38;5;149mBaseTestCase):
864 """Test mktemp()."""
865
866 # For safety, all use of mktemp must occur in a private directory.
867 # We must also suppress the RuntimeWarning it generates.
868 def setUp(self):
869 self.dir = tempfile.mkdtemp()
870 super().setUp()
871
872 def tearDown(self):
873 if self.dir:
874 os.rmdir(self.dir)
875 self.dir = None
876 super().tearDown()
877
878 class ESC[4;38;5;81mmktemped:
879 _unlink = os.unlink
880 _bflags = tempfile._bin_openflags
881
882 def __init__(self, dir, pre, suf):
883 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
884 # Create the file. This will raise an exception if it's
885 # mysteriously appeared in the meanwhile.
886 os.close(os.open(self.name, self._bflags, 0o600))
887
888 def __del__(self):
889 self._unlink(self.name)
890
891 def do_create(self, pre="", suf=""):
892 file = self.mktemped(self.dir, pre, suf)
893
894 self.nameCheck(file.name, self.dir, pre, suf)
895 return file
896
897 def test_basic(self):
898 # mktemp can choose usable file names
899 self.do_create()
900 self.do_create(pre="a")
901 self.do_create(suf="b")
902 self.do_create(pre="a", suf="b")
903 self.do_create(pre="aa", suf=".txt")
904
905 def test_many(self):
906 # mktemp can choose many usable file names (stochastic)
907 extant = list(range(TEST_FILES))
908 for i in extant:
909 extant[i] = self.do_create(pre="aa")
910 del extant
911 support.gc_collect() # For PyPy or other GCs.
912
913 ## def test_warning(self):
914 ## # mktemp issues a warning when used
915 ## warnings.filterwarnings("error",
916 ## category=RuntimeWarning,
917 ## message="mktemp")
918 ## self.assertRaises(RuntimeWarning,
919 ## tempfile.mktemp, dir=self.dir)
920
921
922 # We test _TemporaryFileWrapper by testing NamedTemporaryFile.
923
924
925 class ESC[4;38;5;81mTestNamedTemporaryFile(ESC[4;38;5;149mBaseTestCase):
926 """Test NamedTemporaryFile()."""
927
928 def do_create(self, dir=None, pre="", suf="", delete=True):
929 if dir is None:
930 dir = tempfile.gettempdir()
931 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
932 delete=delete)
933
934 self.nameCheck(file.name, dir, pre, suf)
935 return file
936
937
938 def test_basic(self):
939 # NamedTemporaryFile can create files
940 self.do_create()
941 self.do_create(pre="a")
942 self.do_create(suf="b")
943 self.do_create(pre="a", suf="b")
944 self.do_create(pre="aa", suf=".txt")
945
946 def test_method_lookup(self):
947 # Issue #18879: Looking up a temporary file method should keep it
948 # alive long enough.
949 f = self.do_create()
950 wr = weakref.ref(f)
951 write = f.write
952 write2 = f.write
953 del f
954 write(b'foo')
955 del write
956 write2(b'bar')
957 del write2
958 if support.check_impl_detail(cpython=True):
959 # No reference cycle was created.
960 self.assertIsNone(wr())
961
962 def test_iter(self):
963 # Issue #23700: getting iterator from a temporary file should keep
964 # it alive as long as it's being iterated over
965 lines = [b'spam\n', b'eggs\n', b'beans\n']
966 def make_file():
967 f = tempfile.NamedTemporaryFile(mode='w+b')
968 f.write(b''.join(lines))
969 f.seek(0)
970 return f
971 for i, l in enumerate(make_file()):
972 self.assertEqual(l, lines[i])
973 self.assertEqual(i, len(lines) - 1)
974
975 def test_creates_named(self):
976 # NamedTemporaryFile creates files with names
977 f = tempfile.NamedTemporaryFile()
978 self.assertTrue(os.path.exists(f.name),
979 "NamedTemporaryFile %s does not exist" % f.name)
980
981 def test_del_on_close(self):
982 # A NamedTemporaryFile is deleted when closed
983 dir = tempfile.mkdtemp()
984 try:
985 with tempfile.NamedTemporaryFile(dir=dir) as f:
986 f.write(b'blat')
987 self.assertEqual(os.listdir(dir), [])
988 self.assertFalse(os.path.exists(f.name),
989 "NamedTemporaryFile %s exists after close" % f.name)
990 finally:
991 os.rmdir(dir)
992
993 def test_dis_del_on_close(self):
994 # Tests that delete-on-close can be disabled
995 dir = tempfile.mkdtemp()
996 tmp = None
997 try:
998 f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
999 tmp = f.name
1000 f.write(b'blat')
1001 f.close()
1002 self.assertTrue(os.path.exists(f.name),
1003 "NamedTemporaryFile %s missing after close" % f.name)
1004 finally:
1005 if tmp is not None:
1006 os.unlink(tmp)
1007 os.rmdir(dir)
1008
1009 def test_multiple_close(self):
1010 # A NamedTemporaryFile can be closed many times without error
1011 f = tempfile.NamedTemporaryFile()
1012 f.write(b'abc\n')
1013 f.close()
1014 f.close()
1015 f.close()
1016
1017 def test_context_manager(self):
1018 # A NamedTemporaryFile can be used as a context manager
1019 with tempfile.NamedTemporaryFile() as f:
1020 self.assertTrue(os.path.exists(f.name))
1021 self.assertFalse(os.path.exists(f.name))
1022 def use_closed():
1023 with f:
1024 pass
1025 self.assertRaises(ValueError, use_closed)
1026
1027 def test_context_man_not_del_on_close_if_delete_on_close_false(self):
1028 # Issue gh-58451: tempfile.NamedTemporaryFile is not particularly useful
1029 # on Windows
1030 # A NamedTemporaryFile is NOT deleted when closed if
1031 # delete_on_close=False, but is deleted on context manager exit
1032 dir = tempfile.mkdtemp()
1033 try:
1034 with tempfile.NamedTemporaryFile(dir=dir,
1035 delete=True,
1036 delete_on_close=False) as f:
1037 f.write(b'blat')
1038 f_name = f.name
1039 f.close()
1040 with self.subTest():
1041 # Testing that file is not deleted on close
1042 self.assertTrue(os.path.exists(f.name),
1043 f"NamedTemporaryFile {f.name!r} is incorrectly "
1044 f"deleted on closure when delete_on_close=False")
1045
1046 with self.subTest():
1047 # Testing that file is deleted on context manager exit
1048 self.assertFalse(os.path.exists(f.name),
1049 f"NamedTemporaryFile {f.name!r} exists "
1050 f"after context manager exit")
1051
1052 finally:
1053 os.rmdir(dir)
1054
1055 def test_context_man_ok_to_delete_manually(self):
1056 # In the case of delete=True, a NamedTemporaryFile can be manually
1057 # deleted in a with-statement context without causing an error.
1058 dir = tempfile.mkdtemp()
1059 try:
1060 with tempfile.NamedTemporaryFile(dir=dir,
1061 delete=True,
1062 delete_on_close=False) as f:
1063 f.write(b'blat')
1064 f.close()
1065 os.unlink(f.name)
1066
1067 finally:
1068 os.rmdir(dir)
1069
1070 def test_context_man_not_del_if_delete_false(self):
1071 # A NamedTemporaryFile is not deleted if delete = False
1072 dir = tempfile.mkdtemp()
1073 f_name = ""
1074 try:
1075 # Test that delete_on_close=True has no effect if delete=False.
1076 with tempfile.NamedTemporaryFile(dir=dir, delete=False,
1077 delete_on_close=True) as f:
1078 f.write(b'blat')
1079 f_name = f.name
1080 self.assertTrue(os.path.exists(f.name),
1081 f"NamedTemporaryFile {f.name!r} exists after close")
1082 finally:
1083 os.unlink(f_name)
1084 os.rmdir(dir)
1085
1086 def test_del_by_finalizer(self):
1087 # A NamedTemporaryFile is deleted when finalized in the case of
1088 # delete=True, delete_on_close=False, and no with-statement is used.
1089 def my_func(dir):
1090 f = tempfile.NamedTemporaryFile(dir=dir, delete=True,
1091 delete_on_close=False)
1092 tmp_name = f.name
1093 f.write(b'blat')
1094 # Testing extreme case, where the file is not explicitly closed
1095 # f.close()
1096 return tmp_name
1097 # Make sure that the garbage collector has finalized the file object.
1098 gc.collect()
1099 dir = tempfile.mkdtemp()
1100 try:
1101 tmp_name = my_func(dir)
1102 self.assertFalse(os.path.exists(tmp_name),
1103 f"NamedTemporaryFile {tmp_name!r} "
1104 f"exists after finalizer ")
1105 finally:
1106 os.rmdir(dir)
1107
1108 def test_correct_finalizer_work_if_already_deleted(self):
1109 # There should be no error in the case of delete=True,
1110 # delete_on_close=False, no with-statement is used, and the file is
1111 # deleted manually.
1112 def my_func(dir)->str:
1113 f = tempfile.NamedTemporaryFile(dir=dir, delete=True,
1114 delete_on_close=False)
1115 tmp_name = f.name
1116 f.write(b'blat')
1117 f.close()
1118 os.unlink(tmp_name)
1119 return tmp_name
1120 # Make sure that the garbage collector has finalized the file object.
1121 gc.collect()
1122
1123 def test_bad_mode(self):
1124 dir = tempfile.mkdtemp()
1125 self.addCleanup(os_helper.rmtree, dir)
1126 with self.assertRaises(ValueError):
1127 tempfile.NamedTemporaryFile(mode='wr', dir=dir)
1128 with self.assertRaises(TypeError):
1129 tempfile.NamedTemporaryFile(mode=2, dir=dir)
1130 self.assertEqual(os.listdir(dir), [])
1131
1132 def test_bad_encoding(self):
1133 dir = tempfile.mkdtemp()
1134 self.addCleanup(os_helper.rmtree, dir)
1135 with self.assertRaises(LookupError):
1136 tempfile.NamedTemporaryFile('w', encoding='bad-encoding', dir=dir)
1137 self.assertEqual(os.listdir(dir), [])
1138
1139 def test_unexpected_error(self):
1140 dir = tempfile.mkdtemp()
1141 self.addCleanup(os_helper.rmtree, dir)
1142 with mock.patch('tempfile._TemporaryFileWrapper') as mock_ntf, \
1143 mock.patch('io.open', mock.mock_open()) as mock_open:
1144 mock_ntf.side_effect = KeyboardInterrupt()
1145 with self.assertRaises(KeyboardInterrupt):
1146 tempfile.NamedTemporaryFile(dir=dir)
1147 mock_open().close.assert_called()
1148 self.assertEqual(os.listdir(dir), [])
1149
1150 # How to test the mode and bufsize parameters?
1151
1152 class ESC[4;38;5;81mTestSpooledTemporaryFile(ESC[4;38;5;149mBaseTestCase):
1153 """Test SpooledTemporaryFile()."""
1154
1155 def do_create(self, max_size=0, dir=None, pre="", suf=""):
1156 if dir is None:
1157 dir = tempfile.gettempdir()
1158 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
1159
1160 return file
1161
1162
1163 def test_basic(self):
1164 # SpooledTemporaryFile can create files
1165 f = self.do_create()
1166 self.assertFalse(f._rolled)
1167 f = self.do_create(max_size=100, pre="a", suf=".txt")
1168 self.assertFalse(f._rolled)
1169
1170 def test_is_iobase(self):
1171 # SpooledTemporaryFile should implement io.IOBase
1172 self.assertIsInstance(self.do_create(), io.IOBase)
1173
1174 def test_iobase_interface(self):
1175 # SpooledTemporaryFile should implement the io.IOBase interface.
1176 # Ensure it has all the required methods and properties.
1177 iobase_attrs = {
1178 # From IOBase
1179 'fileno', 'seek', 'truncate', 'close', 'closed', '__enter__',
1180 '__exit__', 'flush', 'isatty', '__iter__', '__next__', 'readable',
1181 'readline', 'readlines', 'seekable', 'tell', 'writable',
1182 'writelines',
1183 # From BufferedIOBase (binary mode) and TextIOBase (text mode)
1184 'detach', 'read', 'read1', 'write', 'readinto', 'readinto1',
1185 'encoding', 'errors', 'newlines',
1186 }
1187 spooledtempfile_attrs = set(dir(tempfile.SpooledTemporaryFile))
1188 missing_attrs = iobase_attrs - spooledtempfile_attrs
1189 self.assertFalse(
1190 missing_attrs,
1191 'SpooledTemporaryFile missing attributes from '
1192 'IOBase/BufferedIOBase/TextIOBase'
1193 )
1194
1195 def test_del_on_close(self):
1196 # A SpooledTemporaryFile is deleted when closed
1197 dir = tempfile.mkdtemp()
1198 try:
1199 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
1200 self.assertFalse(f._rolled)
1201 f.write(b'blat ' * 5)
1202 self.assertTrue(f._rolled)
1203 filename = f.name
1204 f.close()
1205 self.assertEqual(os.listdir(dir), [])
1206 if not isinstance(filename, int):
1207 self.assertFalse(os.path.exists(filename),
1208 "SpooledTemporaryFile %s exists after close" % filename)
1209 finally:
1210 os.rmdir(dir)
1211
1212 def test_del_unrolled_file(self):
1213 # The unrolled SpooledTemporaryFile should raise a ResourceWarning
1214 # when deleted since the file was not explicitly closed.
1215 f = self.do_create(max_size=10)
1216 f.write(b'foo')
1217 self.assertEqual(f.name, None) # Unrolled so no filename/fd
1218 with self.assertWarns(ResourceWarning):
1219 f.__del__()
1220
1221 @unittest.skipIf(
1222 support.is_emscripten, "Emscripten cannot fstat renamed files."
1223 )
1224 def test_del_rolled_file(self):
1225 # The rolled file should be deleted when the SpooledTemporaryFile
1226 # object is deleted. This should raise a ResourceWarning since the file
1227 # was not explicitly closed.
1228 f = self.do_create(max_size=2)
1229 f.write(b'foo')
1230 name = f.name # This is a fd on posix+cygwin, a filename everywhere else
1231 self.assertTrue(os.path.exists(name))
1232 with self.assertWarns(ResourceWarning):
1233 f.__del__()
1234 self.assertFalse(
1235 os.path.exists(name),
1236 "Rolled SpooledTemporaryFile (name=%s) exists after delete" % name
1237 )
1238
1239 def test_rewrite_small(self):
1240 # A SpooledTemporaryFile can be written to multiple within the max_size
1241 f = self.do_create(max_size=30)
1242 self.assertFalse(f._rolled)
1243 for i in range(5):
1244 f.seek(0, 0)
1245 f.write(b'x' * 20)
1246 self.assertFalse(f._rolled)
1247
1248 def test_write_sequential(self):
1249 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1250 # over afterward
1251 f = self.do_create(max_size=30)
1252 self.assertFalse(f._rolled)
1253 f.write(b'x' * 20)
1254 self.assertFalse(f._rolled)
1255 f.write(b'x' * 10)
1256 self.assertFalse(f._rolled)
1257 f.write(b'x')
1258 self.assertTrue(f._rolled)
1259
1260 def test_writelines(self):
1261 # Verify writelines with a SpooledTemporaryFile
1262 f = self.do_create()
1263 f.writelines((b'x', b'y', b'z'))
1264 pos = f.seek(0)
1265 self.assertEqual(pos, 0)
1266 buf = f.read()
1267 self.assertEqual(buf, b'xyz')
1268
1269 def test_writelines_sequential(self):
1270 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1271 # over afterward
1272 f = self.do_create(max_size=35)
1273 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
1274 self.assertFalse(f._rolled)
1275 f.write(b'x')
1276 self.assertTrue(f._rolled)
1277
1278 def test_sparse(self):
1279 # A SpooledTemporaryFile that is written late in the file will extend
1280 # when that occurs
1281 f = self.do_create(max_size=30)
1282 self.assertFalse(f._rolled)
1283 pos = f.seek(100, 0)
1284 self.assertEqual(pos, 100)
1285 self.assertFalse(f._rolled)
1286 f.write(b'x')
1287 self.assertTrue(f._rolled)
1288
1289 def test_fileno(self):
1290 # A SpooledTemporaryFile should roll over to a real file on fileno()
1291 f = self.do_create(max_size=30)
1292 self.assertFalse(f._rolled)
1293 self.assertTrue(f.fileno() > 0)
1294 self.assertTrue(f._rolled)
1295
1296 def test_multiple_close_before_rollover(self):
1297 # A SpooledTemporaryFile can be closed many times without error
1298 f = tempfile.SpooledTemporaryFile()
1299 f.write(b'abc\n')
1300 self.assertFalse(f._rolled)
1301 f.close()
1302 f.close()
1303 f.close()
1304
1305 def test_multiple_close_after_rollover(self):
1306 # A SpooledTemporaryFile can be closed many times without error
1307 f = tempfile.SpooledTemporaryFile(max_size=1)
1308 f.write(b'abc\n')
1309 self.assertTrue(f._rolled)
1310 f.close()
1311 f.close()
1312 f.close()
1313
1314 def test_bound_methods(self):
1315 # It should be OK to steal a bound method from a SpooledTemporaryFile
1316 # and use it independently; when the file rolls over, those bound
1317 # methods should continue to function
1318 f = self.do_create(max_size=30)
1319 read = f.read
1320 write = f.write
1321 seek = f.seek
1322
1323 write(b"a" * 35)
1324 write(b"b" * 35)
1325 seek(0, 0)
1326 self.assertEqual(read(70), b'a'*35 + b'b'*35)
1327
1328 def test_properties(self):
1329 f = tempfile.SpooledTemporaryFile(max_size=10)
1330 f.write(b'x' * 10)
1331 self.assertFalse(f._rolled)
1332 self.assertEqual(f.mode, 'w+b')
1333 self.assertIsNone(f.name)
1334 with self.assertRaises(AttributeError):
1335 f.newlines
1336 with self.assertRaises(AttributeError):
1337 f.encoding
1338 with self.assertRaises(AttributeError):
1339 f.errors
1340
1341 f.write(b'x')
1342 self.assertTrue(f._rolled)
1343 self.assertEqual(f.mode, 'rb+')
1344 self.assertIsNotNone(f.name)
1345 with self.assertRaises(AttributeError):
1346 f.newlines
1347 with self.assertRaises(AttributeError):
1348 f.encoding
1349 with self.assertRaises(AttributeError):
1350 f.errors
1351
1352 def test_text_mode(self):
1353 # Creating a SpooledTemporaryFile with a text mode should produce
1354 # a file object reading and writing (Unicode) text strings.
1355 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1356 encoding="utf-8")
1357 f.write("abc\n")
1358 f.seek(0)
1359 self.assertEqual(f.read(), "abc\n")
1360 f.write("def\n")
1361 f.seek(0)
1362 self.assertEqual(f.read(), "abc\ndef\n")
1363 self.assertFalse(f._rolled)
1364 self.assertEqual(f.mode, 'w+')
1365 self.assertIsNone(f.name)
1366 self.assertEqual(f.newlines, os.linesep)
1367 self.assertEqual(f.encoding, "utf-8")
1368 self.assertEqual(f.errors, "strict")
1369
1370 f.write("xyzzy\n")
1371 f.seek(0)
1372 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
1373 # Check that Ctrl+Z doesn't truncate the file
1374 f.write("foo\x1abar\n")
1375 f.seek(0)
1376 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
1377 self.assertTrue(f._rolled)
1378 self.assertEqual(f.mode, 'w+')
1379 self.assertIsNotNone(f.name)
1380 self.assertEqual(f.newlines, os.linesep)
1381 self.assertEqual(f.encoding, "utf-8")
1382 self.assertEqual(f.errors, "strict")
1383
1384 def test_text_newline_and_encoding(self):
1385 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1386 newline='', encoding='utf-8',
1387 errors='ignore')
1388 f.write("\u039B\r\n")
1389 f.seek(0)
1390 self.assertEqual(f.read(), "\u039B\r\n")
1391 self.assertFalse(f._rolled)
1392 self.assertEqual(f.mode, 'w+')
1393 self.assertIsNone(f.name)
1394 self.assertIsNotNone(f.newlines)
1395 self.assertEqual(f.encoding, "utf-8")
1396 self.assertEqual(f.errors, "ignore")
1397
1398 f.write("\u039C" * 10 + "\r\n")
1399 f.write("\u039D" * 20)
1400 f.seek(0)
1401 self.assertEqual(f.read(),
1402 "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20))
1403 self.assertTrue(f._rolled)
1404 self.assertEqual(f.mode, 'w+')
1405 self.assertIsNotNone(f.name)
1406 self.assertIsNotNone(f.newlines)
1407 self.assertEqual(f.encoding, 'utf-8')
1408 self.assertEqual(f.errors, 'ignore')
1409
1410 def test_context_manager_before_rollover(self):
1411 # A SpooledTemporaryFile can be used as a context manager
1412 with tempfile.SpooledTemporaryFile(max_size=1) as f:
1413 self.assertFalse(f._rolled)
1414 self.assertFalse(f.closed)
1415 self.assertTrue(f.closed)
1416 def use_closed():
1417 with f:
1418 pass
1419 self.assertRaises(ValueError, use_closed)
1420
1421 def test_context_manager_during_rollover(self):
1422 # A SpooledTemporaryFile can be used as a context manager
1423 with tempfile.SpooledTemporaryFile(max_size=1) as f:
1424 self.assertFalse(f._rolled)
1425 f.write(b'abc\n')
1426 f.flush()
1427 self.assertTrue(f._rolled)
1428 self.assertFalse(f.closed)
1429 self.assertTrue(f.closed)
1430 def use_closed():
1431 with f:
1432 pass
1433 self.assertRaises(ValueError, use_closed)
1434
1435 def test_context_manager_after_rollover(self):
1436 # A SpooledTemporaryFile can be used as a context manager
1437 f = tempfile.SpooledTemporaryFile(max_size=1)
1438 f.write(b'abc\n')
1439 f.flush()
1440 self.assertTrue(f._rolled)
1441 with f:
1442 self.assertFalse(f.closed)
1443 self.assertTrue(f.closed)
1444 def use_closed():
1445 with f:
1446 pass
1447 self.assertRaises(ValueError, use_closed)
1448
1449 @unittest.skipIf(
1450 support.is_emscripten, "Emscripten cannot fstat renamed files."
1451 )
1452 def test_truncate_with_size_parameter(self):
1453 # A SpooledTemporaryFile can be truncated to zero size
1454 f = tempfile.SpooledTemporaryFile(max_size=10)
1455 f.write(b'abcdefg\n')
1456 f.seek(0)
1457 f.truncate()
1458 self.assertFalse(f._rolled)
1459 self.assertEqual(f._file.getvalue(), b'')
1460 # A SpooledTemporaryFile can be truncated to a specific size
1461 f = tempfile.SpooledTemporaryFile(max_size=10)
1462 f.write(b'abcdefg\n')
1463 f.truncate(4)
1464 self.assertFalse(f._rolled)
1465 self.assertEqual(f._file.getvalue(), b'abcd')
1466 # A SpooledTemporaryFile rolls over if truncated to large size
1467 f = tempfile.SpooledTemporaryFile(max_size=10)
1468 f.write(b'abcdefg\n')
1469 f.truncate(20)
1470 self.assertTrue(f._rolled)
1471 self.assertEqual(os.fstat(f.fileno()).st_size, 20)
1472
1473 def test_class_getitem(self):
1474 self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes],
1475 types.GenericAlias)
1476
1477 if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1478
1479 class ESC[4;38;5;81mTestTemporaryFile(ESC[4;38;5;149mBaseTestCase):
1480 """Test TemporaryFile()."""
1481
1482 def test_basic(self):
1483 # TemporaryFile can create files
1484 # No point in testing the name params - the file has no name.
1485 tempfile.TemporaryFile()
1486
1487 def test_has_no_name(self):
1488 # TemporaryFile creates files with no names (on this system)
1489 dir = tempfile.mkdtemp()
1490 f = tempfile.TemporaryFile(dir=dir)
1491 f.write(b'blat')
1492
1493 # Sneaky: because this file has no name, it should not prevent
1494 # us from removing the directory it was created in.
1495 try:
1496 os.rmdir(dir)
1497 except:
1498 # cleanup
1499 f.close()
1500 os.rmdir(dir)
1501 raise
1502
1503 def test_multiple_close(self):
1504 # A TemporaryFile can be closed many times without error
1505 f = tempfile.TemporaryFile()
1506 f.write(b'abc\n')
1507 f.close()
1508 f.close()
1509 f.close()
1510
1511 # How to test the mode and bufsize parameters?
1512 def test_mode_and_encoding(self):
1513
1514 def roundtrip(input, *args, **kwargs):
1515 with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
1516 fileobj.write(input)
1517 fileobj.seek(0)
1518 self.assertEqual(input, fileobj.read())
1519
1520 roundtrip(b"1234", "w+b")
1521 roundtrip("abdc\n", "w+")
1522 roundtrip("\u039B", "w+", encoding="utf-16")
1523 roundtrip("foo\r\n", "w+", newline="")
1524
1525 def test_bad_mode(self):
1526 dir = tempfile.mkdtemp()
1527 self.addCleanup(os_helper.rmtree, dir)
1528 with self.assertRaises(ValueError):
1529 tempfile.TemporaryFile(mode='wr', dir=dir)
1530 with self.assertRaises(TypeError):
1531 tempfile.TemporaryFile(mode=2, dir=dir)
1532 self.assertEqual(os.listdir(dir), [])
1533
1534 def test_bad_encoding(self):
1535 dir = tempfile.mkdtemp()
1536 self.addCleanup(os_helper.rmtree, dir)
1537 with self.assertRaises(LookupError):
1538 tempfile.TemporaryFile('w', encoding='bad-encoding', dir=dir)
1539 self.assertEqual(os.listdir(dir), [])
1540
1541 def test_unexpected_error(self):
1542 dir = tempfile.mkdtemp()
1543 self.addCleanup(os_helper.rmtree, dir)
1544 with mock.patch('tempfile._O_TMPFILE_WORKS', False), \
1545 mock.patch('os.unlink') as mock_unlink, \
1546 mock.patch('os.open') as mock_open, \
1547 mock.patch('os.close') as mock_close:
1548 mock_unlink.side_effect = KeyboardInterrupt()
1549 with self.assertRaises(KeyboardInterrupt):
1550 tempfile.TemporaryFile(dir=dir)
1551 mock_close.assert_called()
1552 self.assertEqual(os.listdir(dir), [])
1553
1554
1555 # Helper for test_del_on_shutdown
1556 class ESC[4;38;5;81mNulledModules:
1557 def __init__(self, *modules):
1558 self.refs = [mod.__dict__ for mod in modules]
1559 self.contents = [ref.copy() for ref in self.refs]
1560
1561 def __enter__(self):
1562 for d in self.refs:
1563 for key in d:
1564 d[key] = None
1565
1566 def __exit__(self, *exc_info):
1567 for d, c in zip(self.refs, self.contents):
1568 d.clear()
1569 d.update(c)
1570
1571
1572 class ESC[4;38;5;81mTestTemporaryDirectory(ESC[4;38;5;149mBaseTestCase):
1573 """Test TemporaryDirectory()."""
1574
1575 def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1,
1576 ignore_cleanup_errors=False):
1577 if dir is None:
1578 dir = tempfile.gettempdir()
1579 tmp = tempfile.TemporaryDirectory(
1580 dir=dir, prefix=pre, suffix=suf,
1581 ignore_cleanup_errors=ignore_cleanup_errors)
1582 self.nameCheck(tmp.name, dir, pre, suf)
1583 self.do_create2(tmp.name, recurse, dirs, files)
1584 return tmp
1585
1586 def do_create2(self, path, recurse=1, dirs=1, files=1):
1587 # Create subdirectories and some files
1588 if recurse:
1589 for i in range(dirs):
1590 name = os.path.join(path, "dir%d" % i)
1591 os.mkdir(name)
1592 self.do_create2(name, recurse-1, dirs, files)
1593 for i in range(files):
1594 with open(os.path.join(path, "test%d.txt" % i), "wb") as f:
1595 f.write(b"Hello world!")
1596
1597 def test_mkdtemp_failure(self):
1598 # Check no additional exception if mkdtemp fails
1599 # Previously would raise AttributeError instead
1600 # (noted as part of Issue #10188)
1601 with tempfile.TemporaryDirectory() as nonexistent:
1602 pass
1603 with self.assertRaises(FileNotFoundError) as cm:
1604 tempfile.TemporaryDirectory(dir=nonexistent)
1605 self.assertEqual(cm.exception.errno, errno.ENOENT)
1606
1607 def test_explicit_cleanup(self):
1608 # A TemporaryDirectory is deleted when cleaned up
1609 dir = tempfile.mkdtemp()
1610 try:
1611 d = self.do_create(dir=dir)
1612 self.assertTrue(os.path.exists(d.name),
1613 "TemporaryDirectory %s does not exist" % d.name)
1614 d.cleanup()
1615 self.assertFalse(os.path.exists(d.name),
1616 "TemporaryDirectory %s exists after cleanup" % d.name)
1617 finally:
1618 os.rmdir(dir)
1619
1620 def test_explicit_cleanup_ignore_errors(self):
1621 """Test that cleanup doesn't return an error when ignoring them."""
1622 with tempfile.TemporaryDirectory() as working_dir:
1623 temp_dir = self.do_create(
1624 dir=working_dir, ignore_cleanup_errors=True)
1625 temp_path = pathlib.Path(temp_dir.name)
1626 self.assertTrue(temp_path.exists(),
1627 f"TemporaryDirectory {temp_path!s} does not exist")
1628 with open(temp_path / "a_file.txt", "w+t") as open_file:
1629 open_file.write("Hello world!\n")
1630 temp_dir.cleanup()
1631 self.assertEqual(len(list(temp_path.glob("*"))),
1632 int(sys.platform.startswith("win")),
1633 "Unexpected number of files in "
1634 f"TemporaryDirectory {temp_path!s}")
1635 self.assertEqual(
1636 temp_path.exists(),
1637 sys.platform.startswith("win"),
1638 f"TemporaryDirectory {temp_path!s} existence state unexpected")
1639 temp_dir.cleanup()
1640 self.assertFalse(
1641 temp_path.exists(),
1642 f"TemporaryDirectory {temp_path!s} exists after cleanup")
1643
1644 @os_helper.skip_unless_symlink
1645 def test_cleanup_with_symlink_to_a_directory(self):
1646 # cleanup() should not follow symlinks to directories (issue #12464)
1647 d1 = self.do_create()
1648 d2 = self.do_create(recurse=0)
1649
1650 # Symlink d1/foo -> d2
1651 os.symlink(d2.name, os.path.join(d1.name, "foo"))
1652
1653 # This call to cleanup() should not follow the "foo" symlink
1654 d1.cleanup()
1655
1656 self.assertFalse(os.path.exists(d1.name),
1657 "TemporaryDirectory %s exists after cleanup" % d1.name)
1658 self.assertTrue(os.path.exists(d2.name),
1659 "Directory pointed to by a symlink was deleted")
1660 self.assertEqual(os.listdir(d2.name), ['test0.txt'],
1661 "Contents of the directory pointed to by a symlink "
1662 "were deleted")
1663 d2.cleanup()
1664
1665 @support.cpython_only
1666 def test_del_on_collection(self):
1667 # A TemporaryDirectory is deleted when garbage collected
1668 dir = tempfile.mkdtemp()
1669 try:
1670 d = self.do_create(dir=dir)
1671 name = d.name
1672 del d # Rely on refcounting to invoke __del__
1673 self.assertFalse(os.path.exists(name),
1674 "TemporaryDirectory %s exists after __del__" % name)
1675 finally:
1676 os.rmdir(dir)
1677
1678 @support.cpython_only
1679 def test_del_on_collection_ignore_errors(self):
1680 """Test that ignoring errors works when TemporaryDirectory is gced."""
1681 with tempfile.TemporaryDirectory() as working_dir:
1682 temp_dir = self.do_create(
1683 dir=working_dir, ignore_cleanup_errors=True)
1684 temp_path = pathlib.Path(temp_dir.name)
1685 self.assertTrue(temp_path.exists(),
1686 f"TemporaryDirectory {temp_path!s} does not exist")
1687 with open(temp_path / "a_file.txt", "w+t") as open_file:
1688 open_file.write("Hello world!\n")
1689 del temp_dir
1690 self.assertEqual(len(list(temp_path.glob("*"))),
1691 int(sys.platform.startswith("win")),
1692 "Unexpected number of files in "
1693 f"TemporaryDirectory {temp_path!s}")
1694 self.assertEqual(
1695 temp_path.exists(),
1696 sys.platform.startswith("win"),
1697 f"TemporaryDirectory {temp_path!s} existence state unexpected")
1698
1699 def test_del_on_shutdown(self):
1700 # A TemporaryDirectory may be cleaned up during shutdown
1701 with self.do_create() as dir:
1702 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
1703 code = """if True:
1704 import builtins
1705 import os
1706 import shutil
1707 import sys
1708 import tempfile
1709 import warnings
1710
1711 tmp = tempfile.TemporaryDirectory(dir={dir!r})
1712 sys.stdout.buffer.write(tmp.name.encode())
1713
1714 tmp2 = os.path.join(tmp.name, 'test_dir')
1715 os.mkdir(tmp2)
1716 with open(os.path.join(tmp2, "test0.txt"), "w") as f:
1717 f.write("Hello world!")
1718
1719 {mod}.tmp = tmp
1720
1721 warnings.filterwarnings("always", category=ResourceWarning)
1722 """.format(dir=dir, mod=mod)
1723 rc, out, err = script_helper.assert_python_ok("-c", code)
1724 tmp_name = out.decode().strip()
1725 self.assertFalse(os.path.exists(tmp_name),
1726 "TemporaryDirectory %s exists after cleanup" % tmp_name)
1727 err = err.decode('utf-8', 'backslashreplace')
1728 self.assertNotIn("Exception ", err)
1729 self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1730
1731 def test_del_on_shutdown_ignore_errors(self):
1732 """Test ignoring errors works when a tempdir is gc'ed on shutdown."""
1733 with tempfile.TemporaryDirectory() as working_dir:
1734 code = """if True:
1735 import pathlib
1736 import sys
1737 import tempfile
1738 import warnings
1739
1740 temp_dir = tempfile.TemporaryDirectory(
1741 dir={working_dir!r}, ignore_cleanup_errors=True)
1742 sys.stdout.buffer.write(temp_dir.name.encode())
1743
1744 temp_dir_2 = pathlib.Path(temp_dir.name) / "test_dir"
1745 temp_dir_2.mkdir()
1746 with open(temp_dir_2 / "test0.txt", "w") as test_file:
1747 test_file.write("Hello world!")
1748 open_file = open(temp_dir_2 / "open_file.txt", "w")
1749 open_file.write("Hello world!")
1750
1751 warnings.filterwarnings("always", category=ResourceWarning)
1752 """.format(working_dir=working_dir)
1753 __, out, err = script_helper.assert_python_ok("-c", code)
1754 temp_path = pathlib.Path(out.decode().strip())
1755 self.assertEqual(len(list(temp_path.glob("*"))),
1756 int(sys.platform.startswith("win")),
1757 "Unexpected number of files in "
1758 f"TemporaryDirectory {temp_path!s}")
1759 self.assertEqual(
1760 temp_path.exists(),
1761 sys.platform.startswith("win"),
1762 f"TemporaryDirectory {temp_path!s} existence state unexpected")
1763 err = err.decode('utf-8', 'backslashreplace')
1764 self.assertNotIn("Exception", err)
1765 self.assertNotIn("Error", err)
1766 self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1767
1768 def test_exit_on_shutdown(self):
1769 # Issue #22427
1770 with self.do_create() as dir:
1771 code = """if True:
1772 import sys
1773 import tempfile
1774 import warnings
1775
1776 def generator():
1777 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
1778 yield tmp
1779 g = generator()
1780 sys.stdout.buffer.write(next(g).encode())
1781
1782 warnings.filterwarnings("always", category=ResourceWarning)
1783 """.format(dir=dir)
1784 rc, out, err = script_helper.assert_python_ok("-c", code)
1785 tmp_name = out.decode().strip()
1786 self.assertFalse(os.path.exists(tmp_name),
1787 "TemporaryDirectory %s exists after cleanup" % tmp_name)
1788 err = err.decode('utf-8', 'backslashreplace')
1789 self.assertNotIn("Exception ", err)
1790 self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1791
1792 def test_warnings_on_cleanup(self):
1793 # ResourceWarning will be triggered by __del__
1794 with self.do_create() as dir:
1795 d = self.do_create(dir=dir, recurse=3)
1796 name = d.name
1797
1798 # Check for the resource warning
1799 with warnings_helper.check_warnings(('Implicitly',
1800 ResourceWarning),
1801 quiet=False):
1802 warnings.filterwarnings("always", category=ResourceWarning)
1803 del d
1804 support.gc_collect()
1805 self.assertFalse(os.path.exists(name),
1806 "TemporaryDirectory %s exists after __del__" % name)
1807
1808 def test_multiple_close(self):
1809 # Can be cleaned-up many times without error
1810 d = self.do_create()
1811 d.cleanup()
1812 d.cleanup()
1813 d.cleanup()
1814
1815 def test_context_manager(self):
1816 # Can be used as a context manager
1817 d = self.do_create()
1818 with d as name:
1819 self.assertTrue(os.path.exists(name))
1820 self.assertEqual(name, d.name)
1821 self.assertFalse(os.path.exists(name))
1822
1823 def test_modes(self):
1824 for mode in range(8):
1825 mode <<= 6
1826 with self.subTest(mode=format(mode, '03o')):
1827 d = self.do_create(recurse=3, dirs=2, files=2)
1828 with d:
1829 # Change files and directories mode recursively.
1830 for root, dirs, files in os.walk(d.name, topdown=False):
1831 for name in files:
1832 os.chmod(os.path.join(root, name), mode)
1833 os.chmod(root, mode)
1834 d.cleanup()
1835 self.assertFalse(os.path.exists(d.name))
1836
1837 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
1838 def test_flags(self):
1839 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
1840
1841 # skip the test if these flags are not supported (ex: FreeBSD 13)
1842 filename = os_helper.TESTFN
1843 try:
1844 open(filename, "w").close()
1845 try:
1846 os.chflags(filename, flags)
1847 except OSError as exc:
1848 # "OSError: [Errno 45] Operation not supported"
1849 self.skipTest(f"chflags() doesn't support "
1850 f"UF_IMMUTABLE|UF_NOUNLINK: {exc}")
1851 else:
1852 os.chflags(filename, 0)
1853 finally:
1854 os_helper.unlink(filename)
1855
1856 d = self.do_create(recurse=3, dirs=2, files=2)
1857 with d:
1858 # Change files and directories flags recursively.
1859 for root, dirs, files in os.walk(d.name, topdown=False):
1860 for name in files:
1861 os.chflags(os.path.join(root, name), flags)
1862 os.chflags(root, flags)
1863 d.cleanup()
1864 self.assertFalse(os.path.exists(d.name))
1865
1866 def test_delete_false(self):
1867 with tempfile.TemporaryDirectory(delete=False) as working_dir:
1868 pass
1869 self.assertTrue(os.path.exists(working_dir))
1870 shutil.rmtree(working_dir)
1871
1872 if __name__ == "__main__":
1873 unittest.main()