1 import collections.abc
2 import contextlib
3 import errno
4 import os
5 import re
6 import stat
7 import sys
8 import time
9 import unittest
10 import warnings
11
12
13 # Filename used for testing
14 if os.name == 'java':
15 # Jython disallows @ in module names
16 TESTFN_ASCII = '$test'
17 else:
18 TESTFN_ASCII = '@test'
19
20 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
21 # module name.
22 TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
23
24 # TESTFN_UNICODE is a non-ascii filename
25 TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
26 if sys.platform == 'darwin':
27 # In Mac OS X's VFS API file names are, by definition, canonically
28 # decomposed Unicode, encoded using UTF-8. See QA1173:
29 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
30 import unicodedata
31 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
32
33 # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
34 # encoded by the filesystem encoding (in strict mode). It can be None if we
35 # cannot generate such filename.
36 TESTFN_UNENCODABLE = None
37 if os.name == 'nt':
38 # skip win32s (0) or Windows 9x/ME (1)
39 if sys.getwindowsversion().platform >= 2:
40 # Different kinds of characters from various languages to minimize the
41 # probability that the whole name is encodable to MBCS (issue #9819)
42 TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
43 try:
44 TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
45 except UnicodeEncodeError:
46 pass
47 else:
48 print('WARNING: The filename %r CAN be encoded by the filesystem '
49 'encoding (%s). Unicode filename tests may not be effective'
50 % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
51 TESTFN_UNENCODABLE = None
52 # macOS and Emscripten deny unencodable filenames (invalid utf-8)
53 elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
54 try:
55 # ascii and utf-8 cannot encode the byte 0xff
56 b'\xff'.decode(sys.getfilesystemencoding())
57 except UnicodeDecodeError:
58 # 0xff will be encoded using the surrogate character u+DCFF
59 TESTFN_UNENCODABLE = TESTFN_ASCII \
60 + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
61 else:
62 # File system encoding (eg. ISO-8859-* encodings) can encode
63 # the byte 0xff. Skip some unicode filename tests.
64 pass
65
66 # FS_NONASCII: non-ASCII character encodable by os.fsencode(),
67 # or an empty string if there is no such character.
68 FS_NONASCII = ''
69 for character in (
70 # First try printable and common characters to have a readable filename.
71 # For each character, the encoding list are just example of encodings able
72 # to encode the character (the list is not exhaustive).
73
74 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
75 '\u00E6',
76 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
77 '\u0130',
78 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
79 '\u0141',
80 # U+03C6 (Greek Small Letter Phi): cp1253
81 '\u03C6',
82 # U+041A (Cyrillic Capital Letter Ka): cp1251
83 '\u041A',
84 # U+05D0 (Hebrew Letter Alef): Encodable to cp424
85 '\u05D0',
86 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
87 '\u060C',
88 # U+062A (Arabic Letter Teh): cp720
89 '\u062A',
90 # U+0E01 (Thai Character Ko Kai): cp874
91 '\u0E01',
92
93 # Then try more "special" characters. "special" because they may be
94 # interpreted or displayed differently depending on the exact locale
95 # encoding and the font.
96
97 # U+00A0 (No-Break Space)
98 '\u00A0',
99 # U+20AC (Euro Sign)
100 '\u20AC',
101 ):
102 try:
103 # If Python is set up to use the legacy 'mbcs' in Windows,
104 # 'replace' error mode is used, and encode() returns b'?'
105 # for characters missing in the ANSI codepage
106 if os.fsdecode(os.fsencode(character)) != character:
107 raise UnicodeError
108 except UnicodeError:
109 pass
110 else:
111 FS_NONASCII = character
112 break
113
114 # Save the initial cwd
115 SAVEDCWD = os.getcwd()
116
117 # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
118 # decoded from the filesystem encoding (in strict mode). It can be None if we
119 # cannot generate such filename (ex: the latin1 encoding can decode any byte
120 # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
121 # to the surrogateescape error handler (PEP 383), but not from the filesystem
122 # encoding in strict mode.
123 TESTFN_UNDECODABLE = None
124 for name in (
125 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
126 # accepts it to create a file or a directory, or don't accept to enter to
127 # such directory (when the bytes name is used). So test b'\xe7' first:
128 # it is not decodable from cp932.
129 b'\xe7w\xf0',
130 # undecodable from ASCII, UTF-8
131 b'\xff',
132 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
133 # and cp857
134 b'\xae\xd5'
135 # undecodable from UTF-8 (UNIX and Mac OS X)
136 b'\xed\xb2\x80', b'\xed\xb4\x80',
137 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
138 # cp1253, cp1254, cp1255, cp1257, cp1258
139 b'\x81\x98',
140 ):
141 try:
142 name.decode(sys.getfilesystemencoding())
143 except UnicodeDecodeError:
144 TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
145 break
146
147 if FS_NONASCII:
148 TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
149 else:
150 TESTFN_NONASCII = None
151 TESTFN = TESTFN_NONASCII or TESTFN_ASCII
152
153
154 def make_bad_fd():
155 """
156 Create an invalid file descriptor by opening and closing a file and return
157 its fd.
158 """
159 file = open(TESTFN, "wb")
160 try:
161 return file.fileno()
162 finally:
163 file.close()
164 unlink(TESTFN)
165
166
167 _can_symlink = None
168
169
170 def can_symlink():
171 global _can_symlink
172 if _can_symlink is not None:
173 return _can_symlink
174 # WASI / wasmtime prevents symlinks with absolute paths, see man
175 # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
176 # paths. Skip symlink tests on WASI for now.
177 src = os.path.abspath(TESTFN)
178 symlink_path = src + "can_symlink"
179 try:
180 os.symlink(src, symlink_path)
181 can = True
182 except (OSError, NotImplementedError, AttributeError):
183 can = False
184 else:
185 os.remove(symlink_path)
186 _can_symlink = can
187 return can
188
189
190 def skip_unless_symlink(test):
191 """Skip decorator for tests that require functional symlink"""
192 ok = can_symlink()
193 msg = "Requires functional symlink implementation"
194 return test if ok else unittest.skip(msg)(test)
195
196
197 _can_xattr = None
198
199
200 def can_xattr():
201 import tempfile
202 global _can_xattr
203 if _can_xattr is not None:
204 return _can_xattr
205 if not hasattr(os, "setxattr"):
206 can = False
207 else:
208 import platform
209 tmp_dir = tempfile.mkdtemp()
210 tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
211 try:
212 with open(TESTFN, "wb") as fp:
213 try:
214 # TESTFN & tempfile may use different file systems with
215 # different capabilities
216 os.setxattr(tmp_fp, b"user.test", b"")
217 os.setxattr(tmp_name, b"trusted.foo", b"42")
218 os.setxattr(fp.fileno(), b"user.test", b"")
219 # Kernels < 2.6.39 don't respect setxattr flags.
220 kernel_version = platform.release()
221 m = re.match(r"2.6.(\d{1,2})", kernel_version)
222 can = m is None or int(m.group(1)) >= 39
223 except OSError:
224 can = False
225 finally:
226 unlink(TESTFN)
227 unlink(tmp_name)
228 rmdir(tmp_dir)
229 _can_xattr = can
230 return can
231
232
233 def skip_unless_xattr(test):
234 """Skip decorator for tests that require functional extended attributes"""
235 ok = can_xattr()
236 msg = "no non-broken extended attribute support"
237 return test if ok else unittest.skip(msg)(test)
238
239
240 _can_chmod = None
241
242 def can_chmod():
243 global _can_chmod
244 if _can_chmod is not None:
245 return _can_chmod
246 if not hasattr(os, "chown"):
247 _can_chmod = False
248 return _can_chmod
249 try:
250 with open(TESTFN, "wb") as f:
251 try:
252 os.chmod(TESTFN, 0o777)
253 mode1 = os.stat(TESTFN).st_mode
254 os.chmod(TESTFN, 0o666)
255 mode2 = os.stat(TESTFN).st_mode
256 except OSError as e:
257 can = False
258 else:
259 can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
260 finally:
261 unlink(TESTFN)
262 _can_chmod = can
263 return can
264
265
266 def skip_unless_working_chmod(test):
267 """Skip tests that require working os.chmod()
268
269 WASI SDK 15.0 cannot change file mode bits.
270 """
271 ok = can_chmod()
272 msg = "requires working os.chmod()"
273 return test if ok else unittest.skip(msg)(test)
274
275
276 # Check whether the current effective user has the capability to override
277 # DAC (discretionary access control). Typically user root is able to
278 # bypass file read, write, and execute permission checks. The capability
279 # is independent of the effective user. See capabilities(7).
280 _can_dac_override = None
281
282 def can_dac_override():
283 global _can_dac_override
284
285 if not can_chmod():
286 _can_dac_override = False
287 if _can_dac_override is not None:
288 return _can_dac_override
289
290 try:
291 with open(TESTFN, "wb") as f:
292 os.chmod(TESTFN, 0o400)
293 try:
294 with open(TESTFN, "wb"):
295 pass
296 except OSError:
297 _can_dac_override = False
298 else:
299 _can_dac_override = True
300 finally:
301 unlink(TESTFN)
302
303 return _can_dac_override
304
305
306 def skip_if_dac_override(test):
307 ok = not can_dac_override()
308 msg = "incompatible with CAP_DAC_OVERRIDE"
309 return test if ok else unittest.skip(msg)(test)
310
311
312 def skip_unless_dac_override(test):
313 ok = can_dac_override()
314 msg = "requires CAP_DAC_OVERRIDE"
315 return test if ok else unittest.skip(msg)(test)
316
317
318 def unlink(filename):
319 try:
320 _unlink(filename)
321 except (FileNotFoundError, NotADirectoryError):
322 pass
323
324
325 if sys.platform.startswith("win"):
326 def _waitfor(func, pathname, waitall=False):
327 # Perform the operation
328 func(pathname)
329 # Now setup the wait loop
330 if waitall:
331 dirname = pathname
332 else:
333 dirname, name = os.path.split(pathname)
334 dirname = dirname or '.'
335 # Check for `pathname` to be removed from the filesystem.
336 # The exponential backoff of the timeout amounts to a total
337 # of ~1 second after which the deletion is probably an error
338 # anyway.
339 # Testing on an i7@4.3GHz shows that usually only 1 iteration is
340 # required when contention occurs.
341 timeout = 0.001
342 while timeout < 1.0:
343 # Note we are only testing for the existence of the file(s) in
344 # the contents of the directory regardless of any security or
345 # access rights. If we have made it this far, we have sufficient
346 # permissions to do that much using Python's equivalent of the
347 # Windows API FindFirstFile.
348 # Other Windows APIs can fail or give incorrect results when
349 # dealing with files that are pending deletion.
350 L = os.listdir(dirname)
351 if not (L if waitall else name in L):
352 return
353 # Increase the timeout and try again
354 time.sleep(timeout)
355 timeout *= 2
356 warnings.warn('tests may fail, delete still pending for ' + pathname,
357 RuntimeWarning, stacklevel=4)
358
359 def _unlink(filename):
360 _waitfor(os.unlink, filename)
361
362 def _rmdir(dirname):
363 _waitfor(os.rmdir, dirname)
364
365 def _rmtree(path):
366 from test.support import _force_run
367
368 def _rmtree_inner(path):
369 for name in _force_run(path, os.listdir, path):
370 fullname = os.path.join(path, name)
371 try:
372 mode = os.lstat(fullname).st_mode
373 except OSError as exc:
374 print("support.rmtree(): os.lstat(%r) failed with %s"
375 % (fullname, exc),
376 file=sys.__stderr__)
377 mode = 0
378 if stat.S_ISDIR(mode):
379 _waitfor(_rmtree_inner, fullname, waitall=True)
380 _force_run(fullname, os.rmdir, fullname)
381 else:
382 _force_run(fullname, os.unlink, fullname)
383 _waitfor(_rmtree_inner, path, waitall=True)
384 _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
385
386 def _longpath(path):
387 try:
388 import ctypes
389 except ImportError:
390 # No ctypes means we can't expands paths.
391 pass
392 else:
393 buffer = ctypes.create_unicode_buffer(len(path) * 2)
394 length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
395 len(buffer))
396 if length:
397 return buffer[:length]
398 return path
399 else:
400 _unlink = os.unlink
401 _rmdir = os.rmdir
402
403 def _rmtree(path):
404 import shutil
405 try:
406 shutil.rmtree(path)
407 return
408 except OSError:
409 pass
410
411 def _rmtree_inner(path):
412 from test.support import _force_run
413 for name in _force_run(path, os.listdir, path):
414 fullname = os.path.join(path, name)
415 try:
416 mode = os.lstat(fullname).st_mode
417 except OSError:
418 mode = 0
419 if stat.S_ISDIR(mode):
420 _rmtree_inner(fullname)
421 _force_run(path, os.rmdir, fullname)
422 else:
423 _force_run(path, os.unlink, fullname)
424 _rmtree_inner(path)
425 os.rmdir(path)
426
427 def _longpath(path):
428 return path
429
430
431 def rmdir(dirname):
432 try:
433 _rmdir(dirname)
434 except FileNotFoundError:
435 pass
436
437
438 def rmtree(path):
439 try:
440 _rmtree(path)
441 except FileNotFoundError:
442 pass
443
444
445 @contextlib.contextmanager
446 def temp_dir(path=None, quiet=False):
447 """Return a context manager that creates a temporary directory.
448
449 Arguments:
450
451 path: the directory to create temporarily. If omitted or None,
452 defaults to creating a temporary directory using tempfile.mkdtemp.
453
454 quiet: if False (the default), the context manager raises an exception
455 on error. Otherwise, if the path is specified and cannot be
456 created, only a warning is issued.
457
458 """
459 import tempfile
460 dir_created = False
461 if path is None:
462 path = tempfile.mkdtemp()
463 dir_created = True
464 path = os.path.realpath(path)
465 else:
466 try:
467 os.mkdir(path)
468 dir_created = True
469 except OSError as exc:
470 if not quiet:
471 raise
472 warnings.warn(f'tests may fail, unable to create '
473 f'temporary directory {path!r}: {exc}',
474 RuntimeWarning, stacklevel=3)
475 if dir_created:
476 pid = os.getpid()
477 try:
478 yield path
479 finally:
480 # In case the process forks, let only the parent remove the
481 # directory. The child has a different process id. (bpo-30028)
482 if dir_created and pid == os.getpid():
483 rmtree(path)
484
485
486 @contextlib.contextmanager
487 def change_cwd(path, quiet=False):
488 """Return a context manager that changes the current working directory.
489
490 Arguments:
491
492 path: the directory to use as the temporary current working directory.
493
494 quiet: if False (the default), the context manager raises an exception
495 on error. Otherwise, it issues only a warning and keeps the current
496 working directory the same.
497
498 """
499 saved_dir = os.getcwd()
500 try:
501 os.chdir(os.path.realpath(path))
502 except OSError as exc:
503 if not quiet:
504 raise
505 warnings.warn(f'tests may fail, unable to change the current working '
506 f'directory to {path!r}: {exc}',
507 RuntimeWarning, stacklevel=3)
508 try:
509 yield os.getcwd()
510 finally:
511 os.chdir(saved_dir)
512
513
514 @contextlib.contextmanager
515 def temp_cwd(name='tempcwd', quiet=False):
516 """
517 Context manager that temporarily creates and changes the CWD.
518
519 The function temporarily changes the current working directory
520 after creating a temporary directory in the current directory with
521 name *name*. If *name* is None, the temporary directory is
522 created using tempfile.mkdtemp.
523
524 If *quiet* is False (default) and it is not possible to
525 create or change the CWD, an error is raised. If *quiet* is True,
526 only a warning is raised and the original CWD is used.
527
528 """
529 with temp_dir(path=name, quiet=quiet) as temp_path:
530 with change_cwd(temp_path, quiet=quiet) as cwd_dir:
531 yield cwd_dir
532
533
534 def create_empty_file(filename):
535 """Create an empty file. If the file already exists, truncate it."""
536 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
537 os.close(fd)
538
539
540 @contextlib.contextmanager
541 def open_dir_fd(path):
542 """Open a file descriptor to a directory."""
543 assert os.path.isdir(path)
544 flags = os.O_RDONLY
545 if hasattr(os, "O_DIRECTORY"):
546 flags |= os.O_DIRECTORY
547 dir_fd = os.open(path, flags)
548 try:
549 yield dir_fd
550 finally:
551 os.close(dir_fd)
552
553
554 def fs_is_case_insensitive(directory):
555 """Detects if the file system for the specified directory
556 is case-insensitive."""
557 import tempfile
558 with tempfile.NamedTemporaryFile(dir=directory) as base:
559 base_path = base.name
560 case_path = base_path.upper()
561 if case_path == base_path:
562 case_path = base_path.lower()
563 try:
564 return os.path.samefile(base_path, case_path)
565 except FileNotFoundError:
566 return False
567
568
569 class ESC[4;38;5;81mFakePath:
570 """Simple implementation of the path protocol.
571 """
572 def __init__(self, path):
573 self.path = path
574
575 def __repr__(self):
576 return f'<FakePath {self.path!r}>'
577
578 def __fspath__(self):
579 if (isinstance(self.path, BaseException) or
580 isinstance(self.path, type) and
581 issubclass(self.path, BaseException)):
582 raise self.path
583 else:
584 return self.path
585
586
587 def fd_count():
588 """Count the number of open file descriptors.
589 """
590 if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
591 try:
592 names = os.listdir("/proc/self/fd")
593 # Subtract one because listdir() internally opens a file
594 # descriptor to list the content of the /proc/self/fd/ directory.
595 return len(names) - 1
596 except FileNotFoundError:
597 pass
598
599 MAXFD = 256
600 if hasattr(os, 'sysconf'):
601 try:
602 MAXFD = os.sysconf("SC_OPEN_MAX")
603 except OSError:
604 pass
605
606 old_modes = None
607 if sys.platform == 'win32':
608 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
609 # on invalid file descriptor if Python is compiled in debug mode
610 try:
611 import msvcrt
612 msvcrt.CrtSetReportMode
613 except (AttributeError, ImportError):
614 # no msvcrt or a release build
615 pass
616 else:
617 old_modes = {}
618 for report_type in (msvcrt.CRT_WARN,
619 msvcrt.CRT_ERROR,
620 msvcrt.CRT_ASSERT):
621 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
622 0)
623
624 try:
625 count = 0
626 for fd in range(MAXFD):
627 try:
628 # Prefer dup() over fstat(). fstat() can require input/output
629 # whereas dup() doesn't.
630 fd2 = os.dup(fd)
631 except OSError as e:
632 if e.errno != errno.EBADF:
633 raise
634 else:
635 os.close(fd2)
636 count += 1
637 finally:
638 if old_modes is not None:
639 for report_type in (msvcrt.CRT_WARN,
640 msvcrt.CRT_ERROR,
641 msvcrt.CRT_ASSERT):
642 msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
643
644 return count
645
646
647 if hasattr(os, "umask"):
648 @contextlib.contextmanager
649 def temp_umask(umask):
650 """Context manager that temporarily sets the process umask."""
651 oldmask = os.umask(umask)
652 try:
653 yield
654 finally:
655 os.umask(oldmask)
656 else:
657 @contextlib.contextmanager
658 def temp_umask(umask):
659 """no-op on platforms without umask()"""
660 yield
661
662
663 class ESC[4;38;5;81mEnvironmentVarGuard(ESC[4;38;5;149mcollectionsESC[4;38;5;149m.ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mMutableMapping):
664
665 """Class to help protect the environment variable properly. Can be used as
666 a context manager."""
667
668 def __init__(self):
669 self._environ = os.environ
670 self._changed = {}
671
672 def __getitem__(self, envvar):
673 return self._environ[envvar]
674
675 def __setitem__(self, envvar, value):
676 # Remember the initial value on the first access
677 if envvar not in self._changed:
678 self._changed[envvar] = self._environ.get(envvar)
679 self._environ[envvar] = value
680
681 def __delitem__(self, envvar):
682 # Remember the initial value on the first access
683 if envvar not in self._changed:
684 self._changed[envvar] = self._environ.get(envvar)
685 if envvar in self._environ:
686 del self._environ[envvar]
687
688 def keys(self):
689 return self._environ.keys()
690
691 def __iter__(self):
692 return iter(self._environ)
693
694 def __len__(self):
695 return len(self._environ)
696
697 def set(self, envvar, value):
698 self[envvar] = value
699
700 def unset(self, envvar):
701 del self[envvar]
702
703 def copy(self):
704 # We do what os.environ.copy() does.
705 return dict(self)
706
707 def __enter__(self):
708 return self
709
710 def __exit__(self, *ignore_exc):
711 for (k, v) in self._changed.items():
712 if v is None:
713 if k in self._environ:
714 del self._environ[k]
715 else:
716 self._environ[k] = v
717 os.environ = self._environ