(root)/
Python-3.12.0/
Lib/
test/
support/
os_helper.py
       1  import collections.abc
       2  import contextlib
       3  import errno
       4  import os
       5  import re
       6  import stat
       7  import string
       8  import sys
       9  import time
      10  import unittest
      11  import warnings
      12  
      13  
      14  # Filename used for testing
      15  TESTFN_ASCII = '@test'
      16  
      17  # Disambiguate TESTFN for parallel testing, while letting it remain a valid
      18  # module name.
      19  TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
      20  
      21  # TESTFN_UNICODE is a non-ascii filename
      22  TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
      23  if sys.platform == 'darwin':
      24      # In Mac OS X's VFS API file names are, by definition, canonically
      25      # decomposed Unicode, encoded using UTF-8. See QA1173:
      26      # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
      27      import unicodedata
      28      TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
      29  
      30  # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
      31  # encoded by the filesystem encoding (in strict mode). It can be None if we
      32  # cannot generate such filename.
      33  TESTFN_UNENCODABLE = None
      34  if os.name == 'nt':
      35      # skip win32s (0) or Windows 9x/ME (1)
      36      if sys.getwindowsversion().platform >= 2:
      37          # Different kinds of characters from various languages to minimize the
      38          # probability that the whole name is encodable to MBCS (issue #9819)
      39          TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
      40          try:
      41              TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
      42          except UnicodeEncodeError:
      43              pass
      44          else:
      45              print('WARNING: The filename %r CAN be encoded by the filesystem '
      46                    'encoding (%s). Unicode filename tests may not be effective'
      47                    % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
      48              TESTFN_UNENCODABLE = None
      49  # macOS and Emscripten deny unencodable filenames (invalid utf-8)
      50  elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
      51      try:
      52          # ascii and utf-8 cannot encode the byte 0xff
      53          b'\xff'.decode(sys.getfilesystemencoding())
      54      except UnicodeDecodeError:
      55          # 0xff will be encoded using the surrogate character u+DCFF
      56          TESTFN_UNENCODABLE = TESTFN_ASCII \
      57              + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
      58      else:
      59          # File system encoding (eg. ISO-8859-* encodings) can encode
      60          # the byte 0xff. Skip some unicode filename tests.
      61          pass
      62  
      63  # FS_NONASCII: non-ASCII character encodable by os.fsencode(),
      64  # or an empty string if there is no such character.
      65  FS_NONASCII = ''
      66  for character in (
      67      # First try printable and common characters to have a readable filename.
      68      # For each character, the encoding list are just example of encodings able
      69      # to encode the character (the list is not exhaustive).
      70  
      71      # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
      72      '\u00E6',
      73      # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
      74      '\u0130',
      75      # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
      76      '\u0141',
      77      # U+03C6 (Greek Small Letter Phi): cp1253
      78      '\u03C6',
      79      # U+041A (Cyrillic Capital Letter Ka): cp1251
      80      '\u041A',
      81      # U+05D0 (Hebrew Letter Alef): Encodable to cp424
      82      '\u05D0',
      83      # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
      84      '\u060C',
      85      # U+062A (Arabic Letter Teh): cp720
      86      '\u062A',
      87      # U+0E01 (Thai Character Ko Kai): cp874
      88      '\u0E01',
      89  
      90      # Then try more "special" characters. "special" because they may be
      91      # interpreted or displayed differently depending on the exact locale
      92      # encoding and the font.
      93  
      94      # U+00A0 (No-Break Space)
      95      '\u00A0',
      96      # U+20AC (Euro Sign)
      97      '\u20AC',
      98  ):
      99      try:
     100          # If Python is set up to use the legacy 'mbcs' in Windows,
     101          # 'replace' error mode is used, and encode() returns b'?'
     102          # for characters missing in the ANSI codepage
     103          if os.fsdecode(os.fsencode(character)) != character:
     104              raise UnicodeError
     105      except UnicodeError:
     106          pass
     107      else:
     108          FS_NONASCII = character
     109          break
     110  
     111  # Save the initial cwd
     112  SAVEDCWD = os.getcwd()
     113  
     114  # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
     115  # decoded from the filesystem encoding (in strict mode). It can be None if we
     116  # cannot generate such filename (ex: the latin1 encoding can decode any byte
     117  # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
     118  # to the surrogateescape error handler (PEP 383), but not from the filesystem
     119  # encoding in strict mode.
     120  TESTFN_UNDECODABLE = None
     121  for name in (
     122      # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
     123      # accepts it to create a file or a directory, or don't accept to enter to
     124      # such directory (when the bytes name is used). So test b'\xe7' first:
     125      # it is not decodable from cp932.
     126      b'\xe7w\xf0',
     127      # undecodable from ASCII, UTF-8
     128      b'\xff',
     129      # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
     130      # and cp857
     131      b'\xae\xd5'
     132      # undecodable from UTF-8 (UNIX and Mac OS X)
     133      b'\xed\xb2\x80', b'\xed\xb4\x80',
     134      # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
     135      # cp1253, cp1254, cp1255, cp1257, cp1258
     136      b'\x81\x98',
     137  ):
     138      try:
     139          name.decode(sys.getfilesystemencoding())
     140      except UnicodeDecodeError:
     141          try:
     142              name.decode(sys.getfilesystemencoding(),
     143                          sys.getfilesystemencodeerrors())
     144          except UnicodeDecodeError:
     145              continue
     146          TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
     147          break
     148  
     149  if FS_NONASCII:
     150      TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
     151  else:
     152      TESTFN_NONASCII = None
     153  TESTFN = TESTFN_NONASCII or TESTFN_ASCII
     154  
     155  
     156  def make_bad_fd():
     157      """
     158      Create an invalid file descriptor by opening and closing a file and return
     159      its fd.
     160      """
     161      file = open(TESTFN, "wb")
     162      try:
     163          return file.fileno()
     164      finally:
     165          file.close()
     166          unlink(TESTFN)
     167  
     168  
     169  _can_symlink = None
     170  
     171  
     172  def can_symlink():
     173      global _can_symlink
     174      if _can_symlink is not None:
     175          return _can_symlink
     176      # WASI / wasmtime prevents symlinks with absolute paths, see man
     177      # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
     178      # paths. Skip symlink tests on WASI for now.
     179      src = os.path.abspath(TESTFN)
     180      symlink_path = src + "can_symlink"
     181      try:
     182          os.symlink(src, symlink_path)
     183          can = True
     184      except (OSError, NotImplementedError, AttributeError):
     185          can = False
     186      else:
     187          os.remove(symlink_path)
     188      _can_symlink = can
     189      return can
     190  
     191  
     192  def skip_unless_symlink(test):
     193      """Skip decorator for tests that require functional symlink"""
     194      ok = can_symlink()
     195      msg = "Requires functional symlink implementation"
     196      return test if ok else unittest.skip(msg)(test)
     197  
     198  
     199  _can_xattr = None
     200  
     201  
     202  def can_xattr():
     203      import tempfile
     204      global _can_xattr
     205      if _can_xattr is not None:
     206          return _can_xattr
     207      if not hasattr(os, "setxattr"):
     208          can = False
     209      else:
     210          import platform
     211          tmp_dir = tempfile.mkdtemp()
     212          tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
     213          try:
     214              with open(TESTFN, "wb") as fp:
     215                  try:
     216                      # TESTFN & tempfile may use different file systems with
     217                      # different capabilities
     218                      os.setxattr(tmp_fp, b"user.test", b"")
     219                      os.setxattr(tmp_name, b"trusted.foo", b"42")
     220                      os.setxattr(fp.fileno(), b"user.test", b"")
     221                      # Kernels < 2.6.39 don't respect setxattr flags.
     222                      kernel_version = platform.release()
     223                      m = re.match(r"2.6.(\d{1,2})", kernel_version)
     224                      can = m is None or int(m.group(1)) >= 39
     225                  except OSError:
     226                      can = False
     227          finally:
     228              unlink(TESTFN)
     229              unlink(tmp_name)
     230              rmdir(tmp_dir)
     231      _can_xattr = can
     232      return can
     233  
     234  
     235  def skip_unless_xattr(test):
     236      """Skip decorator for tests that require functional extended attributes"""
     237      ok = can_xattr()
     238      msg = "no non-broken extended attribute support"
     239      return test if ok else unittest.skip(msg)(test)
     240  
     241  
     242  _can_chmod = None
     243  
     244  def can_chmod():
     245      global _can_chmod
     246      if _can_chmod is not None:
     247          return _can_chmod
     248      if not hasattr(os, "chown"):
     249          _can_chmod = False
     250          return _can_chmod
     251      try:
     252          with open(TESTFN, "wb") as f:
     253              try:
     254                  os.chmod(TESTFN, 0o777)
     255                  mode1 = os.stat(TESTFN).st_mode
     256                  os.chmod(TESTFN, 0o666)
     257                  mode2 = os.stat(TESTFN).st_mode
     258              except OSError as e:
     259                  can = False
     260              else:
     261                  can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
     262      finally:
     263          unlink(TESTFN)
     264      _can_chmod = can
     265      return can
     266  
     267  
     268  def skip_unless_working_chmod(test):
     269      """Skip tests that require working os.chmod()
     270  
     271      WASI SDK 15.0 cannot change file mode bits.
     272      """
     273      ok = can_chmod()
     274      msg = "requires working os.chmod()"
     275      return test if ok else unittest.skip(msg)(test)
     276  
     277  
     278  # Check whether the current effective user has the capability to override
     279  # DAC (discretionary access control). Typically user root is able to
     280  # bypass file read, write, and execute permission checks. The capability
     281  # is independent of the effective user. See capabilities(7).
     282  _can_dac_override = None
     283  
     284  def can_dac_override():
     285      global _can_dac_override
     286  
     287      if not can_chmod():
     288          _can_dac_override = False
     289      if _can_dac_override is not None:
     290          return _can_dac_override
     291  
     292      try:
     293          with open(TESTFN, "wb") as f:
     294              os.chmod(TESTFN, 0o400)
     295              try:
     296                  with open(TESTFN, "wb"):
     297                      pass
     298              except OSError:
     299                  _can_dac_override = False
     300              else:
     301                  _can_dac_override = True
     302      finally:
     303          unlink(TESTFN)
     304  
     305      return _can_dac_override
     306  
     307  
     308  def skip_if_dac_override(test):
     309      ok = not can_dac_override()
     310      msg = "incompatible with CAP_DAC_OVERRIDE"
     311      return test if ok else unittest.skip(msg)(test)
     312  
     313  
     314  def skip_unless_dac_override(test):
     315      ok = can_dac_override()
     316      msg = "requires CAP_DAC_OVERRIDE"
     317      return test if ok else unittest.skip(msg)(test)
     318  
     319  
     320  def unlink(filename):
     321      try:
     322          _unlink(filename)
     323      except (FileNotFoundError, NotADirectoryError):
     324          pass
     325  
     326  
     327  if sys.platform.startswith("win"):
     328      def _waitfor(func, pathname, waitall=False):
     329          # Perform the operation
     330          func(pathname)
     331          # Now setup the wait loop
     332          if waitall:
     333              dirname = pathname
     334          else:
     335              dirname, name = os.path.split(pathname)
     336              dirname = dirname or '.'
     337          # Check for `pathname` to be removed from the filesystem.
     338          # The exponential backoff of the timeout amounts to a total
     339          # of ~1 second after which the deletion is probably an error
     340          # anyway.
     341          # Testing on an i7@4.3GHz shows that usually only 1 iteration is
     342          # required when contention occurs.
     343          timeout = 0.001
     344          while timeout < 1.0:
     345              # Note we are only testing for the existence of the file(s) in
     346              # the contents of the directory regardless of any security or
     347              # access rights.  If we have made it this far, we have sufficient
     348              # permissions to do that much using Python's equivalent of the
     349              # Windows API FindFirstFile.
     350              # Other Windows APIs can fail or give incorrect results when
     351              # dealing with files that are pending deletion.
     352              L = os.listdir(dirname)
     353              if not (L if waitall else name in L):
     354                  return
     355              # Increase the timeout and try again
     356              time.sleep(timeout)
     357              timeout *= 2
     358          warnings.warn('tests may fail, delete still pending for ' + pathname,
     359                        RuntimeWarning, stacklevel=4)
     360  
     361      def _unlink(filename):
     362          _waitfor(os.unlink, filename)
     363  
     364      def _rmdir(dirname):
     365          _waitfor(os.rmdir, dirname)
     366  
     367      def _rmtree(path):
     368          from test.support import _force_run
     369  
     370          def _rmtree_inner(path):
     371              for name in _force_run(path, os.listdir, path):
     372                  fullname = os.path.join(path, name)
     373                  try:
     374                      mode = os.lstat(fullname).st_mode
     375                  except OSError as exc:
     376                      print("support.rmtree(): os.lstat(%r) failed with %s"
     377                            % (fullname, exc),
     378                            file=sys.__stderr__)
     379                      mode = 0
     380                  if stat.S_ISDIR(mode):
     381                      _waitfor(_rmtree_inner, fullname, waitall=True)
     382                      _force_run(fullname, os.rmdir, fullname)
     383                  else:
     384                      _force_run(fullname, os.unlink, fullname)
     385          _waitfor(_rmtree_inner, path, waitall=True)
     386          _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
     387  
     388      def _longpath(path):
     389          try:
     390              import ctypes
     391          except ImportError:
     392              # No ctypes means we can't expands paths.
     393              pass
     394          else:
     395              buffer = ctypes.create_unicode_buffer(len(path) * 2)
     396              length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
     397                                                               len(buffer))
     398              if length:
     399                  return buffer[:length]
     400          return path
     401  else:
     402      _unlink = os.unlink
     403      _rmdir = os.rmdir
     404  
     405      def _rmtree(path):
     406          import shutil
     407          try:
     408              shutil.rmtree(path)
     409              return
     410          except OSError:
     411              pass
     412  
     413          def _rmtree_inner(path):
     414              from test.support import _force_run
     415              for name in _force_run(path, os.listdir, path):
     416                  fullname = os.path.join(path, name)
     417                  try:
     418                      mode = os.lstat(fullname).st_mode
     419                  except OSError:
     420                      mode = 0
     421                  if stat.S_ISDIR(mode):
     422                      _rmtree_inner(fullname)
     423                      _force_run(path, os.rmdir, fullname)
     424                  else:
     425                      _force_run(path, os.unlink, fullname)
     426          _rmtree_inner(path)
     427          os.rmdir(path)
     428  
     429      def _longpath(path):
     430          return path
     431  
     432  
     433  def rmdir(dirname):
     434      try:
     435          _rmdir(dirname)
     436      except FileNotFoundError:
     437          pass
     438  
     439  
     440  def rmtree(path):
     441      try:
     442          _rmtree(path)
     443      except FileNotFoundError:
     444          pass
     445  
     446  
     447  @contextlib.contextmanager
     448  def temp_dir(path=None, quiet=False):
     449      """Return a context manager that creates a temporary directory.
     450  
     451      Arguments:
     452  
     453        path: the directory to create temporarily.  If omitted or None,
     454          defaults to creating a temporary directory using tempfile.mkdtemp.
     455  
     456        quiet: if False (the default), the context manager raises an exception
     457          on error.  Otherwise, if the path is specified and cannot be
     458          created, only a warning is issued.
     459  
     460      """
     461      import tempfile
     462      dir_created = False
     463      if path is None:
     464          path = tempfile.mkdtemp()
     465          dir_created = True
     466          path = os.path.realpath(path)
     467      else:
     468          try:
     469              os.mkdir(path)
     470              dir_created = True
     471          except OSError as exc:
     472              if not quiet:
     473                  raise
     474              warnings.warn(f'tests may fail, unable to create '
     475                            f'temporary directory {path!r}: {exc}',
     476                            RuntimeWarning, stacklevel=3)
     477      if dir_created:
     478          pid = os.getpid()
     479      try:
     480          yield path
     481      finally:
     482          # In case the process forks, let only the parent remove the
     483          # directory. The child has a different process id. (bpo-30028)
     484          if dir_created and pid == os.getpid():
     485              rmtree(path)
     486  
     487  
     488  @contextlib.contextmanager
     489  def change_cwd(path, quiet=False):
     490      """Return a context manager that changes the current working directory.
     491  
     492      Arguments:
     493  
     494        path: the directory to use as the temporary current working directory.
     495  
     496        quiet: if False (the default), the context manager raises an exception
     497          on error.  Otherwise, it issues only a warning and keeps the current
     498          working directory the same.
     499  
     500      """
     501      saved_dir = os.getcwd()
     502      try:
     503          os.chdir(os.path.realpath(path))
     504      except OSError as exc:
     505          if not quiet:
     506              raise
     507          warnings.warn(f'tests may fail, unable to change the current working '
     508                        f'directory to {path!r}: {exc}',
     509                        RuntimeWarning, stacklevel=3)
     510      try:
     511          yield os.getcwd()
     512      finally:
     513          os.chdir(saved_dir)
     514  
     515  
     516  @contextlib.contextmanager
     517  def temp_cwd(name='tempcwd', quiet=False):
     518      """
     519      Context manager that temporarily creates and changes the CWD.
     520  
     521      The function temporarily changes the current working directory
     522      after creating a temporary directory in the current directory with
     523      name *name*.  If *name* is None, the temporary directory is
     524      created using tempfile.mkdtemp.
     525  
     526      If *quiet* is False (default) and it is not possible to
     527      create or change the CWD, an error is raised.  If *quiet* is True,
     528      only a warning is raised and the original CWD is used.
     529  
     530      """
     531      with temp_dir(path=name, quiet=quiet) as temp_path:
     532          with change_cwd(temp_path, quiet=quiet) as cwd_dir:
     533              yield cwd_dir
     534  
     535  
     536  def create_empty_file(filename):
     537      """Create an empty file. If the file already exists, truncate it."""
     538      fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
     539      os.close(fd)
     540  
     541  
     542  @contextlib.contextmanager
     543  def open_dir_fd(path):
     544      """Open a file descriptor to a directory."""
     545      assert os.path.isdir(path)
     546      flags = os.O_RDONLY
     547      if hasattr(os, "O_DIRECTORY"):
     548          flags |= os.O_DIRECTORY
     549      dir_fd = os.open(path, flags)
     550      try:
     551          yield dir_fd
     552      finally:
     553          os.close(dir_fd)
     554  
     555  
     556  def fs_is_case_insensitive(directory):
     557      """Detects if the file system for the specified directory
     558      is case-insensitive."""
     559      import tempfile
     560      with tempfile.NamedTemporaryFile(dir=directory) as base:
     561          base_path = base.name
     562          case_path = base_path.upper()
     563          if case_path == base_path:
     564              case_path = base_path.lower()
     565          try:
     566              return os.path.samefile(base_path, case_path)
     567          except FileNotFoundError:
     568              return False
     569  
     570  
     571  class ESC[4;38;5;81mFakePath:
     572      """Simple implementation of the path protocol.
     573      """
     574      def __init__(self, path):
     575          self.path = path
     576  
     577      def __repr__(self):
     578          return f'<FakePath {self.path!r}>'
     579  
     580      def __fspath__(self):
     581          if (isinstance(self.path, BaseException) or
     582              isinstance(self.path, type) and
     583                  issubclass(self.path, BaseException)):
     584              raise self.path
     585          else:
     586              return self.path
     587  
     588  
     589  def fd_count():
     590      """Count the number of open file descriptors.
     591      """
     592      if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
     593          try:
     594              names = os.listdir("/proc/self/fd")
     595              # Subtract one because listdir() internally opens a file
     596              # descriptor to list the content of the /proc/self/fd/ directory.
     597              return len(names) - 1
     598          except FileNotFoundError:
     599              pass
     600  
     601      MAXFD = 256
     602      if hasattr(os, 'sysconf'):
     603          try:
     604              MAXFD = os.sysconf("SC_OPEN_MAX")
     605          except OSError:
     606              pass
     607  
     608      old_modes = None
     609      if sys.platform == 'win32':
     610          # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
     611          # on invalid file descriptor if Python is compiled in debug mode
     612          try:
     613              import msvcrt
     614              msvcrt.CrtSetReportMode
     615          except (AttributeError, ImportError):
     616              # no msvcrt or a release build
     617              pass
     618          else:
     619              old_modes = {}
     620              for report_type in (msvcrt.CRT_WARN,
     621                                  msvcrt.CRT_ERROR,
     622                                  msvcrt.CRT_ASSERT):
     623                  old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
     624                                                                   0)
     625  
     626      try:
     627          count = 0
     628          for fd in range(MAXFD):
     629              try:
     630                  # Prefer dup() over fstat(). fstat() can require input/output
     631                  # whereas dup() doesn't.
     632                  fd2 = os.dup(fd)
     633              except OSError as e:
     634                  if e.errno != errno.EBADF:
     635                      raise
     636              else:
     637                  os.close(fd2)
     638                  count += 1
     639      finally:
     640          if old_modes is not None:
     641              for report_type in (msvcrt.CRT_WARN,
     642                                  msvcrt.CRT_ERROR,
     643                                  msvcrt.CRT_ASSERT):
     644                  msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
     645  
     646      return count
     647  
     648  
     649  if hasattr(os, "umask"):
     650      @contextlib.contextmanager
     651      def temp_umask(umask):
     652          """Context manager that temporarily sets the process umask."""
     653          oldmask = os.umask(umask)
     654          try:
     655              yield
     656          finally:
     657              os.umask(oldmask)
     658  else:
     659      @contextlib.contextmanager
     660      def temp_umask(umask):
     661          """no-op on platforms without umask()"""
     662          yield
     663  
     664  
     665  class ESC[4;38;5;81mEnvironmentVarGuard(ESC[4;38;5;149mcollectionsESC[4;38;5;149m.ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mMutableMapping):
     666  
     667      """Class to help protect the environment variable properly.  Can be used as
     668      a context manager."""
     669  
     670      def __init__(self):
     671          self._environ = os.environ
     672          self._changed = {}
     673  
     674      def __getitem__(self, envvar):
     675          return self._environ[envvar]
     676  
     677      def __setitem__(self, envvar, value):
     678          # Remember the initial value on the first access
     679          if envvar not in self._changed:
     680              self._changed[envvar] = self._environ.get(envvar)
     681          self._environ[envvar] = value
     682  
     683      def __delitem__(self, envvar):
     684          # Remember the initial value on the first access
     685          if envvar not in self._changed:
     686              self._changed[envvar] = self._environ.get(envvar)
     687          if envvar in self._environ:
     688              del self._environ[envvar]
     689  
     690      def keys(self):
     691          return self._environ.keys()
     692  
     693      def __iter__(self):
     694          return iter(self._environ)
     695  
     696      def __len__(self):
     697          return len(self._environ)
     698  
     699      def set(self, envvar, value):
     700          self[envvar] = value
     701  
     702      def unset(self, envvar):
     703          del self[envvar]
     704  
     705      def copy(self):
     706          # We do what os.environ.copy() does.
     707          return dict(self)
     708  
     709      def __enter__(self):
     710          return self
     711  
     712      def __exit__(self, *ignore_exc):
     713          for (k, v) in self._changed.items():
     714              if v is None:
     715                  if k in self._environ:
     716                      del self._environ[k]
     717              else:
     718                  self._environ[k] = v
     719          os.environ = self._environ
     720  
     721  
     722  try:
     723      import ctypes
     724      kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
     725  
     726      ERROR_FILE_NOT_FOUND = 2
     727      DDD_REMOVE_DEFINITION = 2
     728      DDD_EXACT_MATCH_ON_REMOVE = 4
     729      DDD_NO_BROADCAST_SYSTEM = 8
     730  except (ImportError, AttributeError):
     731      def subst_drive(path):
     732          raise unittest.SkipTest('ctypes or kernel32 is not available')
     733  else:
     734      @contextlib.contextmanager
     735      def subst_drive(path):
     736          """Temporarily yield a substitute drive for a given path."""
     737          for c in reversed(string.ascii_uppercase):
     738              drive = f'{c}:'
     739              if (not kernel32.QueryDosDeviceW(drive, None, 0) and
     740                      ctypes.get_last_error() == ERROR_FILE_NOT_FOUND):
     741                  break
     742          else:
     743              raise unittest.SkipTest('no available logical drive')
     744          if not kernel32.DefineDosDeviceW(
     745                  DDD_NO_BROADCAST_SYSTEM, drive, path):
     746              raise ctypes.WinError(ctypes.get_last_error())
     747          try:
     748              yield drive
     749          finally:
     750              if not kernel32.DefineDosDeviceW(
     751                      DDD_REMOVE_DEFINITION | DDD_EXACT_MATCH_ON_REMOVE,
     752                      drive, path):
     753                  raise ctypes.WinError(ctypes.get_last_error())