python (3.12.0)
1 import collections.abc
2 import contextlib
3 import errno
4 import os
5 import re
6 import stat
7 import string
8 import sys
9 import time
10 import unittest
11 import warnings
12
13
14 # Filename used for testing
15 TESTFN_ASCII = '@test'
16
17 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
18 # module name.
19 TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
20
21 # TESTFN_UNICODE is a non-ascii filename
22 TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
23 if sys.platform == 'darwin':
24 # In Mac OS X's VFS API file names are, by definition, canonically
25 # decomposed Unicode, encoded using UTF-8. See QA1173:
26 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
27 import unicodedata
28 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
29
30 # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
31 # encoded by the filesystem encoding (in strict mode). It can be None if we
32 # cannot generate such filename.
33 TESTFN_UNENCODABLE = None
34 if os.name == 'nt':
35 # skip win32s (0) or Windows 9x/ME (1)
36 if sys.getwindowsversion().platform >= 2:
37 # Different kinds of characters from various languages to minimize the
38 # probability that the whole name is encodable to MBCS (issue #9819)
39 TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
40 try:
41 TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
42 except UnicodeEncodeError:
43 pass
44 else:
45 print('WARNING: The filename %r CAN be encoded by the filesystem '
46 'encoding (%s). Unicode filename tests may not be effective'
47 % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
48 TESTFN_UNENCODABLE = None
49 # macOS and Emscripten deny unencodable filenames (invalid utf-8)
50 elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
51 try:
52 # ascii and utf-8 cannot encode the byte 0xff
53 b'\xff'.decode(sys.getfilesystemencoding())
54 except UnicodeDecodeError:
55 # 0xff will be encoded using the surrogate character u+DCFF
56 TESTFN_UNENCODABLE = TESTFN_ASCII \
57 + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
58 else:
59 # File system encoding (eg. ISO-8859-* encodings) can encode
60 # the byte 0xff. Skip some unicode filename tests.
61 pass
62
63 # FS_NONASCII: non-ASCII character encodable by os.fsencode(),
64 # or an empty string if there is no such character.
65 FS_NONASCII = ''
66 for character in (
67 # First try printable and common characters to have a readable filename.
68 # For each character, the encoding list are just example of encodings able
69 # to encode the character (the list is not exhaustive).
70
71 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
72 '\u00E6',
73 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
74 '\u0130',
75 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
76 '\u0141',
77 # U+03C6 (Greek Small Letter Phi): cp1253
78 '\u03C6',
79 # U+041A (Cyrillic Capital Letter Ka): cp1251
80 '\u041A',
81 # U+05D0 (Hebrew Letter Alef): Encodable to cp424
82 '\u05D0',
83 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
84 '\u060C',
85 # U+062A (Arabic Letter Teh): cp720
86 '\u062A',
87 # U+0E01 (Thai Character Ko Kai): cp874
88 '\u0E01',
89
90 # Then try more "special" characters. "special" because they may be
91 # interpreted or displayed differently depending on the exact locale
92 # encoding and the font.
93
94 # U+00A0 (No-Break Space)
95 '\u00A0',
96 # U+20AC (Euro Sign)
97 '\u20AC',
98 ):
99 try:
100 # If Python is set up to use the legacy 'mbcs' in Windows,
101 # 'replace' error mode is used, and encode() returns b'?'
102 # for characters missing in the ANSI codepage
103 if os.fsdecode(os.fsencode(character)) != character:
104 raise UnicodeError
105 except UnicodeError:
106 pass
107 else:
108 FS_NONASCII = character
109 break
110
111 # Save the initial cwd
112 SAVEDCWD = os.getcwd()
113
114 # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
115 # decoded from the filesystem encoding (in strict mode). It can be None if we
116 # cannot generate such filename (ex: the latin1 encoding can decode any byte
117 # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
118 # to the surrogateescape error handler (PEP 383), but not from the filesystem
119 # encoding in strict mode.
120 TESTFN_UNDECODABLE = None
121 for name in (
122 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
123 # accepts it to create a file or a directory, or don't accept to enter to
124 # such directory (when the bytes name is used). So test b'\xe7' first:
125 # it is not decodable from cp932.
126 b'\xe7w\xf0',
127 # undecodable from ASCII, UTF-8
128 b'\xff',
129 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
130 # and cp857
131 b'\xae\xd5'
132 # undecodable from UTF-8 (UNIX and Mac OS X)
133 b'\xed\xb2\x80', b'\xed\xb4\x80',
134 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
135 # cp1253, cp1254, cp1255, cp1257, cp1258
136 b'\x81\x98',
137 ):
138 try:
139 name.decode(sys.getfilesystemencoding())
140 except UnicodeDecodeError:
141 try:
142 name.decode(sys.getfilesystemencoding(),
143 sys.getfilesystemencodeerrors())
144 except UnicodeDecodeError:
145 continue
146 TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
147 break
148
149 if FS_NONASCII:
150 TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
151 else:
152 TESTFN_NONASCII = None
153 TESTFN = TESTFN_NONASCII or TESTFN_ASCII
154
155
156 def make_bad_fd():
157 """
158 Create an invalid file descriptor by opening and closing a file and return
159 its fd.
160 """
161 file = open(TESTFN, "wb")
162 try:
163 return file.fileno()
164 finally:
165 file.close()
166 unlink(TESTFN)
167
168
169 _can_symlink = None
170
171
172 def can_symlink():
173 global _can_symlink
174 if _can_symlink is not None:
175 return _can_symlink
176 # WASI / wasmtime prevents symlinks with absolute paths, see man
177 # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
178 # paths. Skip symlink tests on WASI for now.
179 src = os.path.abspath(TESTFN)
180 symlink_path = src + "can_symlink"
181 try:
182 os.symlink(src, symlink_path)
183 can = True
184 except (OSError, NotImplementedError, AttributeError):
185 can = False
186 else:
187 os.remove(symlink_path)
188 _can_symlink = can
189 return can
190
191
192 def skip_unless_symlink(test):
193 """Skip decorator for tests that require functional symlink"""
194 ok = can_symlink()
195 msg = "Requires functional symlink implementation"
196 return test if ok else unittest.skip(msg)(test)
197
198
199 _can_xattr = None
200
201
202 def can_xattr():
203 import tempfile
204 global _can_xattr
205 if _can_xattr is not None:
206 return _can_xattr
207 if not hasattr(os, "setxattr"):
208 can = False
209 else:
210 import platform
211 tmp_dir = tempfile.mkdtemp()
212 tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
213 try:
214 with open(TESTFN, "wb") as fp:
215 try:
216 # TESTFN & tempfile may use different file systems with
217 # different capabilities
218 os.setxattr(tmp_fp, b"user.test", b"")
219 os.setxattr(tmp_name, b"trusted.foo", b"42")
220 os.setxattr(fp.fileno(), b"user.test", b"")
221 # Kernels < 2.6.39 don't respect setxattr flags.
222 kernel_version = platform.release()
223 m = re.match(r"2.6.(\d{1,2})", kernel_version)
224 can = m is None or int(m.group(1)) >= 39
225 except OSError:
226 can = False
227 finally:
228 unlink(TESTFN)
229 unlink(tmp_name)
230 rmdir(tmp_dir)
231 _can_xattr = can
232 return can
233
234
235 def skip_unless_xattr(test):
236 """Skip decorator for tests that require functional extended attributes"""
237 ok = can_xattr()
238 msg = "no non-broken extended attribute support"
239 return test if ok else unittest.skip(msg)(test)
240
241
242 _can_chmod = None
243
244 def can_chmod():
245 global _can_chmod
246 if _can_chmod is not None:
247 return _can_chmod
248 if not hasattr(os, "chown"):
249 _can_chmod = False
250 return _can_chmod
251 try:
252 with open(TESTFN, "wb") as f:
253 try:
254 os.chmod(TESTFN, 0o777)
255 mode1 = os.stat(TESTFN).st_mode
256 os.chmod(TESTFN, 0o666)
257 mode2 = os.stat(TESTFN).st_mode
258 except OSError as e:
259 can = False
260 else:
261 can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
262 finally:
263 unlink(TESTFN)
264 _can_chmod = can
265 return can
266
267
268 def skip_unless_working_chmod(test):
269 """Skip tests that require working os.chmod()
270
271 WASI SDK 15.0 cannot change file mode bits.
272 """
273 ok = can_chmod()
274 msg = "requires working os.chmod()"
275 return test if ok else unittest.skip(msg)(test)
276
277
278 # Check whether the current effective user has the capability to override
279 # DAC (discretionary access control). Typically user root is able to
280 # bypass file read, write, and execute permission checks. The capability
281 # is independent of the effective user. See capabilities(7).
282 _can_dac_override = None
283
284 def can_dac_override():
285 global _can_dac_override
286
287 if not can_chmod():
288 _can_dac_override = False
289 if _can_dac_override is not None:
290 return _can_dac_override
291
292 try:
293 with open(TESTFN, "wb") as f:
294 os.chmod(TESTFN, 0o400)
295 try:
296 with open(TESTFN, "wb"):
297 pass
298 except OSError:
299 _can_dac_override = False
300 else:
301 _can_dac_override = True
302 finally:
303 unlink(TESTFN)
304
305 return _can_dac_override
306
307
308 def skip_if_dac_override(test):
309 ok = not can_dac_override()
310 msg = "incompatible with CAP_DAC_OVERRIDE"
311 return test if ok else unittest.skip(msg)(test)
312
313
314 def skip_unless_dac_override(test):
315 ok = can_dac_override()
316 msg = "requires CAP_DAC_OVERRIDE"
317 return test if ok else unittest.skip(msg)(test)
318
319
320 def unlink(filename):
321 try:
322 _unlink(filename)
323 except (FileNotFoundError, NotADirectoryError):
324 pass
325
326
327 if sys.platform.startswith("win"):
328 def _waitfor(func, pathname, waitall=False):
329 # Perform the operation
330 func(pathname)
331 # Now setup the wait loop
332 if waitall:
333 dirname = pathname
334 else:
335 dirname, name = os.path.split(pathname)
336 dirname = dirname or '.'
337 # Check for `pathname` to be removed from the filesystem.
338 # The exponential backoff of the timeout amounts to a total
339 # of ~1 second after which the deletion is probably an error
340 # anyway.
341 # Testing on an i7@4.3GHz shows that usually only 1 iteration is
342 # required when contention occurs.
343 timeout = 0.001
344 while timeout < 1.0:
345 # Note we are only testing for the existence of the file(s) in
346 # the contents of the directory regardless of any security or
347 # access rights. If we have made it this far, we have sufficient
348 # permissions to do that much using Python's equivalent of the
349 # Windows API FindFirstFile.
350 # Other Windows APIs can fail or give incorrect results when
351 # dealing with files that are pending deletion.
352 L = os.listdir(dirname)
353 if not (L if waitall else name in L):
354 return
355 # Increase the timeout and try again
356 time.sleep(timeout)
357 timeout *= 2
358 warnings.warn('tests may fail, delete still pending for ' + pathname,
359 RuntimeWarning, stacklevel=4)
360
361 def _unlink(filename):
362 _waitfor(os.unlink, filename)
363
364 def _rmdir(dirname):
365 _waitfor(os.rmdir, dirname)
366
367 def _rmtree(path):
368 from test.support import _force_run
369
370 def _rmtree_inner(path):
371 for name in _force_run(path, os.listdir, path):
372 fullname = os.path.join(path, name)
373 try:
374 mode = os.lstat(fullname).st_mode
375 except OSError as exc:
376 print("support.rmtree(): os.lstat(%r) failed with %s"
377 % (fullname, exc),
378 file=sys.__stderr__)
379 mode = 0
380 if stat.S_ISDIR(mode):
381 _waitfor(_rmtree_inner, fullname, waitall=True)
382 _force_run(fullname, os.rmdir, fullname)
383 else:
384 _force_run(fullname, os.unlink, fullname)
385 _waitfor(_rmtree_inner, path, waitall=True)
386 _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
387
388 def _longpath(path):
389 try:
390 import ctypes
391 except ImportError:
392 # No ctypes means we can't expands paths.
393 pass
394 else:
395 buffer = ctypes.create_unicode_buffer(len(path) * 2)
396 length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
397 len(buffer))
398 if length:
399 return buffer[:length]
400 return path
401 else:
402 _unlink = os.unlink
403 _rmdir = os.rmdir
404
405 def _rmtree(path):
406 import shutil
407 try:
408 shutil.rmtree(path)
409 return
410 except OSError:
411 pass
412
413 def _rmtree_inner(path):
414 from test.support import _force_run
415 for name in _force_run(path, os.listdir, path):
416 fullname = os.path.join(path, name)
417 try:
418 mode = os.lstat(fullname).st_mode
419 except OSError:
420 mode = 0
421 if stat.S_ISDIR(mode):
422 _rmtree_inner(fullname)
423 _force_run(path, os.rmdir, fullname)
424 else:
425 _force_run(path, os.unlink, fullname)
426 _rmtree_inner(path)
427 os.rmdir(path)
428
429 def _longpath(path):
430 return path
431
432
433 def rmdir(dirname):
434 try:
435 _rmdir(dirname)
436 except FileNotFoundError:
437 pass
438
439
440 def rmtree(path):
441 try:
442 _rmtree(path)
443 except FileNotFoundError:
444 pass
445
446
447 @contextlib.contextmanager
448 def temp_dir(path=None, quiet=False):
449 """Return a context manager that creates a temporary directory.
450
451 Arguments:
452
453 path: the directory to create temporarily. If omitted or None,
454 defaults to creating a temporary directory using tempfile.mkdtemp.
455
456 quiet: if False (the default), the context manager raises an exception
457 on error. Otherwise, if the path is specified and cannot be
458 created, only a warning is issued.
459
460 """
461 import tempfile
462 dir_created = False
463 if path is None:
464 path = tempfile.mkdtemp()
465 dir_created = True
466 path = os.path.realpath(path)
467 else:
468 try:
469 os.mkdir(path)
470 dir_created = True
471 except OSError as exc:
472 if not quiet:
473 raise
474 warnings.warn(f'tests may fail, unable to create '
475 f'temporary directory {path!r}: {exc}',
476 RuntimeWarning, stacklevel=3)
477 if dir_created:
478 pid = os.getpid()
479 try:
480 yield path
481 finally:
482 # In case the process forks, let only the parent remove the
483 # directory. The child has a different process id. (bpo-30028)
484 if dir_created and pid == os.getpid():
485 rmtree(path)
486
487
488 @contextlib.contextmanager
489 def change_cwd(path, quiet=False):
490 """Return a context manager that changes the current working directory.
491
492 Arguments:
493
494 path: the directory to use as the temporary current working directory.
495
496 quiet: if False (the default), the context manager raises an exception
497 on error. Otherwise, it issues only a warning and keeps the current
498 working directory the same.
499
500 """
501 saved_dir = os.getcwd()
502 try:
503 os.chdir(os.path.realpath(path))
504 except OSError as exc:
505 if not quiet:
506 raise
507 warnings.warn(f'tests may fail, unable to change the current working '
508 f'directory to {path!r}: {exc}',
509 RuntimeWarning, stacklevel=3)
510 try:
511 yield os.getcwd()
512 finally:
513 os.chdir(saved_dir)
514
515
516 @contextlib.contextmanager
517 def temp_cwd(name='tempcwd', quiet=False):
518 """
519 Context manager that temporarily creates and changes the CWD.
520
521 The function temporarily changes the current working directory
522 after creating a temporary directory in the current directory with
523 name *name*. If *name* is None, the temporary directory is
524 created using tempfile.mkdtemp.
525
526 If *quiet* is False (default) and it is not possible to
527 create or change the CWD, an error is raised. If *quiet* is True,
528 only a warning is raised and the original CWD is used.
529
530 """
531 with temp_dir(path=name, quiet=quiet) as temp_path:
532 with change_cwd(temp_path, quiet=quiet) as cwd_dir:
533 yield cwd_dir
534
535
536 def create_empty_file(filename):
537 """Create an empty file. If the file already exists, truncate it."""
538 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
539 os.close(fd)
540
541
542 @contextlib.contextmanager
543 def open_dir_fd(path):
544 """Open a file descriptor to a directory."""
545 assert os.path.isdir(path)
546 flags = os.O_RDONLY
547 if hasattr(os, "O_DIRECTORY"):
548 flags |= os.O_DIRECTORY
549 dir_fd = os.open(path, flags)
550 try:
551 yield dir_fd
552 finally:
553 os.close(dir_fd)
554
555
556 def fs_is_case_insensitive(directory):
557 """Detects if the file system for the specified directory
558 is case-insensitive."""
559 import tempfile
560 with tempfile.NamedTemporaryFile(dir=directory) as base:
561 base_path = base.name
562 case_path = base_path.upper()
563 if case_path == base_path:
564 case_path = base_path.lower()
565 try:
566 return os.path.samefile(base_path, case_path)
567 except FileNotFoundError:
568 return False
569
570
571 class ESC[4;38;5;81mFakePath:
572 """Simple implementation of the path protocol.
573 """
574 def __init__(self, path):
575 self.path = path
576
577 def __repr__(self):
578 return f'<FakePath {self.path!r}>'
579
580 def __fspath__(self):
581 if (isinstance(self.path, BaseException) or
582 isinstance(self.path, type) and
583 issubclass(self.path, BaseException)):
584 raise self.path
585 else:
586 return self.path
587
588
589 def fd_count():
590 """Count the number of open file descriptors.
591 """
592 if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
593 try:
594 names = os.listdir("/proc/self/fd")
595 # Subtract one because listdir() internally opens a file
596 # descriptor to list the content of the /proc/self/fd/ directory.
597 return len(names) - 1
598 except FileNotFoundError:
599 pass
600
601 MAXFD = 256
602 if hasattr(os, 'sysconf'):
603 try:
604 MAXFD = os.sysconf("SC_OPEN_MAX")
605 except OSError:
606 pass
607
608 old_modes = None
609 if sys.platform == 'win32':
610 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
611 # on invalid file descriptor if Python is compiled in debug mode
612 try:
613 import msvcrt
614 msvcrt.CrtSetReportMode
615 except (AttributeError, ImportError):
616 # no msvcrt or a release build
617 pass
618 else:
619 old_modes = {}
620 for report_type in (msvcrt.CRT_WARN,
621 msvcrt.CRT_ERROR,
622 msvcrt.CRT_ASSERT):
623 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
624 0)
625
626 try:
627 count = 0
628 for fd in range(MAXFD):
629 try:
630 # Prefer dup() over fstat(). fstat() can require input/output
631 # whereas dup() doesn't.
632 fd2 = os.dup(fd)
633 except OSError as e:
634 if e.errno != errno.EBADF:
635 raise
636 else:
637 os.close(fd2)
638 count += 1
639 finally:
640 if old_modes is not None:
641 for report_type in (msvcrt.CRT_WARN,
642 msvcrt.CRT_ERROR,
643 msvcrt.CRT_ASSERT):
644 msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
645
646 return count
647
648
649 if hasattr(os, "umask"):
650 @contextlib.contextmanager
651 def temp_umask(umask):
652 """Context manager that temporarily sets the process umask."""
653 oldmask = os.umask(umask)
654 try:
655 yield
656 finally:
657 os.umask(oldmask)
658 else:
659 @contextlib.contextmanager
660 def temp_umask(umask):
661 """no-op on platforms without umask()"""
662 yield
663
664
665 class ESC[4;38;5;81mEnvironmentVarGuard(ESC[4;38;5;149mcollectionsESC[4;38;5;149m.ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mMutableMapping):
666
667 """Class to help protect the environment variable properly. Can be used as
668 a context manager."""
669
670 def __init__(self):
671 self._environ = os.environ
672 self._changed = {}
673
674 def __getitem__(self, envvar):
675 return self._environ[envvar]
676
677 def __setitem__(self, envvar, value):
678 # Remember the initial value on the first access
679 if envvar not in self._changed:
680 self._changed[envvar] = self._environ.get(envvar)
681 self._environ[envvar] = value
682
683 def __delitem__(self, envvar):
684 # Remember the initial value on the first access
685 if envvar not in self._changed:
686 self._changed[envvar] = self._environ.get(envvar)
687 if envvar in self._environ:
688 del self._environ[envvar]
689
690 def keys(self):
691 return self._environ.keys()
692
693 def __iter__(self):
694 return iter(self._environ)
695
696 def __len__(self):
697 return len(self._environ)
698
699 def set(self, envvar, value):
700 self[envvar] = value
701
702 def unset(self, envvar):
703 del self[envvar]
704
705 def copy(self):
706 # We do what os.environ.copy() does.
707 return dict(self)
708
709 def __enter__(self):
710 return self
711
712 def __exit__(self, *ignore_exc):
713 for (k, v) in self._changed.items():
714 if v is None:
715 if k in self._environ:
716 del self._environ[k]
717 else:
718 self._environ[k] = v
719 os.environ = self._environ
720
721
722 try:
723 import ctypes
724 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
725
726 ERROR_FILE_NOT_FOUND = 2
727 DDD_REMOVE_DEFINITION = 2
728 DDD_EXACT_MATCH_ON_REMOVE = 4
729 DDD_NO_BROADCAST_SYSTEM = 8
730 except (ImportError, AttributeError):
731 def subst_drive(path):
732 raise unittest.SkipTest('ctypes or kernel32 is not available')
733 else:
734 @contextlib.contextmanager
735 def subst_drive(path):
736 """Temporarily yield a substitute drive for a given path."""
737 for c in reversed(string.ascii_uppercase):
738 drive = f'{c}:'
739 if (not kernel32.QueryDosDeviceW(drive, None, 0) and
740 ctypes.get_last_error() == ERROR_FILE_NOT_FOUND):
741 break
742 else:
743 raise unittest.SkipTest('no available logical drive')
744 if not kernel32.DefineDosDeviceW(
745 DDD_NO_BROADCAST_SYSTEM, drive, path):
746 raise ctypes.WinError(ctypes.get_last_error())
747 try:
748 yield drive
749 finally:
750 if not kernel32.DefineDosDeviceW(
751 DDD_REMOVE_DEFINITION | DDD_EXACT_MATCH_ON_REMOVE,
752 drive, path):
753 raise ctypes.WinError(ctypes.get_last_error())