(root)/
Python-3.11.7/
Lib/
test/
support/
os_helper.py
       1  import collections.abc
       2  import contextlib
       3  import errno
       4  import os
       5  import re
       6  import stat
       7  import sys
       8  import time
       9  import unittest
      10  import warnings
      11  
      12  
      13  # Filename used for testing
      14  if os.name == 'java':
      15      # Jython disallows @ in module names
      16      TESTFN_ASCII = '$test'
      17  else:
      18      TESTFN_ASCII = '@test'
      19  
      20  # Disambiguate TESTFN for parallel testing, while letting it remain a valid
      21  # module name.
      22  TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
      23  
      24  # TESTFN_UNICODE is a non-ascii filename
      25  TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
      26  if sys.platform == 'darwin':
      27      # In Mac OS X's VFS API file names are, by definition, canonically
      28      # decomposed Unicode, encoded using UTF-8. See QA1173:
      29      # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
      30      import unicodedata
      31      TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
      32  
      33  # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
      34  # encoded by the filesystem encoding (in strict mode). It can be None if we
      35  # cannot generate such filename.
      36  TESTFN_UNENCODABLE = None
      37  if os.name == 'nt':
      38      # skip win32s (0) or Windows 9x/ME (1)
      39      if sys.getwindowsversion().platform >= 2:
      40          # Different kinds of characters from various languages to minimize the
      41          # probability that the whole name is encodable to MBCS (issue #9819)
      42          TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
      43          try:
      44              TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
      45          except UnicodeEncodeError:
      46              pass
      47          else:
      48              print('WARNING: The filename %r CAN be encoded by the filesystem '
      49                    'encoding (%s). Unicode filename tests may not be effective'
      50                    % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
      51              TESTFN_UNENCODABLE = None
      52  # macOS and Emscripten deny unencodable filenames (invalid utf-8)
      53  elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
      54      try:
      55          # ascii and utf-8 cannot encode the byte 0xff
      56          b'\xff'.decode(sys.getfilesystemencoding())
      57      except UnicodeDecodeError:
      58          # 0xff will be encoded using the surrogate character u+DCFF
      59          TESTFN_UNENCODABLE = TESTFN_ASCII \
      60              + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
      61      else:
      62          # File system encoding (eg. ISO-8859-* encodings) can encode
      63          # the byte 0xff. Skip some unicode filename tests.
      64          pass
      65  
      66  # FS_NONASCII: non-ASCII character encodable by os.fsencode(),
      67  # or an empty string if there is no such character.
      68  FS_NONASCII = ''
      69  for character in (
      70      # First try printable and common characters to have a readable filename.
      71      # For each character, the encoding list are just example of encodings able
      72      # to encode the character (the list is not exhaustive).
      73  
      74      # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
      75      '\u00E6',
      76      # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
      77      '\u0130',
      78      # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
      79      '\u0141',
      80      # U+03C6 (Greek Small Letter Phi): cp1253
      81      '\u03C6',
      82      # U+041A (Cyrillic Capital Letter Ka): cp1251
      83      '\u041A',
      84      # U+05D0 (Hebrew Letter Alef): Encodable to cp424
      85      '\u05D0',
      86      # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
      87      '\u060C',
      88      # U+062A (Arabic Letter Teh): cp720
      89      '\u062A',
      90      # U+0E01 (Thai Character Ko Kai): cp874
      91      '\u0E01',
      92  
      93      # Then try more "special" characters. "special" because they may be
      94      # interpreted or displayed differently depending on the exact locale
      95      # encoding and the font.
      96  
      97      # U+00A0 (No-Break Space)
      98      '\u00A0',
      99      # U+20AC (Euro Sign)
     100      '\u20AC',
     101  ):
     102      try:
     103          # If Python is set up to use the legacy 'mbcs' in Windows,
     104          # 'replace' error mode is used, and encode() returns b'?'
     105          # for characters missing in the ANSI codepage
     106          if os.fsdecode(os.fsencode(character)) != character:
     107              raise UnicodeError
     108      except UnicodeError:
     109          pass
     110      else:
     111          FS_NONASCII = character
     112          break
     113  
     114  # Save the initial cwd
     115  SAVEDCWD = os.getcwd()
     116  
     117  # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
     118  # decoded from the filesystem encoding (in strict mode). It can be None if we
     119  # cannot generate such filename (ex: the latin1 encoding can decode any byte
     120  # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
     121  # to the surrogateescape error handler (PEP 383), but not from the filesystem
     122  # encoding in strict mode.
     123  TESTFN_UNDECODABLE = None
     124  for name in (
     125      # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
     126      # accepts it to create a file or a directory, or don't accept to enter to
     127      # such directory (when the bytes name is used). So test b'\xe7' first:
     128      # it is not decodable from cp932.
     129      b'\xe7w\xf0',
     130      # undecodable from ASCII, UTF-8
     131      b'\xff',
     132      # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
     133      # and cp857
     134      b'\xae\xd5'
     135      # undecodable from UTF-8 (UNIX and Mac OS X)
     136      b'\xed\xb2\x80', b'\xed\xb4\x80',
     137      # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
     138      # cp1253, cp1254, cp1255, cp1257, cp1258
     139      b'\x81\x98',
     140  ):
     141      try:
     142          name.decode(sys.getfilesystemencoding())
     143      except UnicodeDecodeError:
     144          TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
     145          break
     146  
     147  if FS_NONASCII:
     148      TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
     149  else:
     150      TESTFN_NONASCII = None
     151  TESTFN = TESTFN_NONASCII or TESTFN_ASCII
     152  
     153  
     154  def make_bad_fd():
     155      """
     156      Create an invalid file descriptor by opening and closing a file and return
     157      its fd.
     158      """
     159      file = open(TESTFN, "wb")
     160      try:
     161          return file.fileno()
     162      finally:
     163          file.close()
     164          unlink(TESTFN)
     165  
     166  
     167  _can_symlink = None
     168  
     169  
     170  def can_symlink():
     171      global _can_symlink
     172      if _can_symlink is not None:
     173          return _can_symlink
     174      # WASI / wasmtime prevents symlinks with absolute paths, see man
     175      # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
     176      # paths. Skip symlink tests on WASI for now.
     177      src = os.path.abspath(TESTFN)
     178      symlink_path = src + "can_symlink"
     179      try:
     180          os.symlink(src, symlink_path)
     181          can = True
     182      except (OSError, NotImplementedError, AttributeError):
     183          can = False
     184      else:
     185          os.remove(symlink_path)
     186      _can_symlink = can
     187      return can
     188  
     189  
     190  def skip_unless_symlink(test):
     191      """Skip decorator for tests that require functional symlink"""
     192      ok = can_symlink()
     193      msg = "Requires functional symlink implementation"
     194      return test if ok else unittest.skip(msg)(test)
     195  
     196  
     197  _can_xattr = None
     198  
     199  
     200  def can_xattr():
     201      import tempfile
     202      global _can_xattr
     203      if _can_xattr is not None:
     204          return _can_xattr
     205      if not hasattr(os, "setxattr"):
     206          can = False
     207      else:
     208          import platform
     209          tmp_dir = tempfile.mkdtemp()
     210          tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
     211          try:
     212              with open(TESTFN, "wb") as fp:
     213                  try:
     214                      # TESTFN & tempfile may use different file systems with
     215                      # different capabilities
     216                      os.setxattr(tmp_fp, b"user.test", b"")
     217                      os.setxattr(tmp_name, b"trusted.foo", b"42")
     218                      os.setxattr(fp.fileno(), b"user.test", b"")
     219                      # Kernels < 2.6.39 don't respect setxattr flags.
     220                      kernel_version = platform.release()
     221                      m = re.match(r"2.6.(\d{1,2})", kernel_version)
     222                      can = m is None or int(m.group(1)) >= 39
     223                  except OSError:
     224                      can = False
     225          finally:
     226              unlink(TESTFN)
     227              unlink(tmp_name)
     228              rmdir(tmp_dir)
     229      _can_xattr = can
     230      return can
     231  
     232  
     233  def skip_unless_xattr(test):
     234      """Skip decorator for tests that require functional extended attributes"""
     235      ok = can_xattr()
     236      msg = "no non-broken extended attribute support"
     237      return test if ok else unittest.skip(msg)(test)
     238  
     239  
     240  _can_chmod = None
     241  
     242  def can_chmod():
     243      global _can_chmod
     244      if _can_chmod is not None:
     245          return _can_chmod
     246      if not hasattr(os, "chown"):
     247          _can_chmod = False
     248          return _can_chmod
     249      try:
     250          with open(TESTFN, "wb") as f:
     251              try:
     252                  os.chmod(TESTFN, 0o777)
     253                  mode1 = os.stat(TESTFN).st_mode
     254                  os.chmod(TESTFN, 0o666)
     255                  mode2 = os.stat(TESTFN).st_mode
     256              except OSError as e:
     257                  can = False
     258              else:
     259                  can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
     260      finally:
     261          unlink(TESTFN)
     262      _can_chmod = can
     263      return can
     264  
     265  
     266  def skip_unless_working_chmod(test):
     267      """Skip tests that require working os.chmod()
     268  
     269      WASI SDK 15.0 cannot change file mode bits.
     270      """
     271      ok = can_chmod()
     272      msg = "requires working os.chmod()"
     273      return test if ok else unittest.skip(msg)(test)
     274  
     275  
     276  # Check whether the current effective user has the capability to override
     277  # DAC (discretionary access control). Typically user root is able to
     278  # bypass file read, write, and execute permission checks. The capability
     279  # is independent of the effective user. See capabilities(7).
     280  _can_dac_override = None
     281  
     282  def can_dac_override():
     283      global _can_dac_override
     284  
     285      if not can_chmod():
     286          _can_dac_override = False
     287      if _can_dac_override is not None:
     288          return _can_dac_override
     289  
     290      try:
     291          with open(TESTFN, "wb") as f:
     292              os.chmod(TESTFN, 0o400)
     293              try:
     294                  with open(TESTFN, "wb"):
     295                      pass
     296              except OSError:
     297                  _can_dac_override = False
     298              else:
     299                  _can_dac_override = True
     300      finally:
     301          unlink(TESTFN)
     302  
     303      return _can_dac_override
     304  
     305  
     306  def skip_if_dac_override(test):
     307      ok = not can_dac_override()
     308      msg = "incompatible with CAP_DAC_OVERRIDE"
     309      return test if ok else unittest.skip(msg)(test)
     310  
     311  
     312  def skip_unless_dac_override(test):
     313      ok = can_dac_override()
     314      msg = "requires CAP_DAC_OVERRIDE"
     315      return test if ok else unittest.skip(msg)(test)
     316  
     317  
     318  def unlink(filename):
     319      try:
     320          _unlink(filename)
     321      except (FileNotFoundError, NotADirectoryError):
     322          pass
     323  
     324  
     325  if sys.platform.startswith("win"):
     326      def _waitfor(func, pathname, waitall=False):
     327          # Perform the operation
     328          func(pathname)
     329          # Now setup the wait loop
     330          if waitall:
     331              dirname = pathname
     332          else:
     333              dirname, name = os.path.split(pathname)
     334              dirname = dirname or '.'
     335          # Check for `pathname` to be removed from the filesystem.
     336          # The exponential backoff of the timeout amounts to a total
     337          # of ~1 second after which the deletion is probably an error
     338          # anyway.
     339          # Testing on an i7@4.3GHz shows that usually only 1 iteration is
     340          # required when contention occurs.
     341          timeout = 0.001
     342          while timeout < 1.0:
     343              # Note we are only testing for the existence of the file(s) in
     344              # the contents of the directory regardless of any security or
     345              # access rights.  If we have made it this far, we have sufficient
     346              # permissions to do that much using Python's equivalent of the
     347              # Windows API FindFirstFile.
     348              # Other Windows APIs can fail or give incorrect results when
     349              # dealing with files that are pending deletion.
     350              L = os.listdir(dirname)
     351              if not (L if waitall else name in L):
     352                  return
     353              # Increase the timeout and try again
     354              time.sleep(timeout)
     355              timeout *= 2
     356          warnings.warn('tests may fail, delete still pending for ' + pathname,
     357                        RuntimeWarning, stacklevel=4)
     358  
     359      def _unlink(filename):
     360          _waitfor(os.unlink, filename)
     361  
     362      def _rmdir(dirname):
     363          _waitfor(os.rmdir, dirname)
     364  
     365      def _rmtree(path):
     366          from test.support import _force_run
     367  
     368          def _rmtree_inner(path):
     369              for name in _force_run(path, os.listdir, path):
     370                  fullname = os.path.join(path, name)
     371                  try:
     372                      mode = os.lstat(fullname).st_mode
     373                  except OSError as exc:
     374                      print("support.rmtree(): os.lstat(%r) failed with %s"
     375                            % (fullname, exc),
     376                            file=sys.__stderr__)
     377                      mode = 0
     378                  if stat.S_ISDIR(mode):
     379                      _waitfor(_rmtree_inner, fullname, waitall=True)
     380                      _force_run(fullname, os.rmdir, fullname)
     381                  else:
     382                      _force_run(fullname, os.unlink, fullname)
     383          _waitfor(_rmtree_inner, path, waitall=True)
     384          _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
     385  
     386      def _longpath(path):
     387          try:
     388              import ctypes
     389          except ImportError:
     390              # No ctypes means we can't expands paths.
     391              pass
     392          else:
     393              buffer = ctypes.create_unicode_buffer(len(path) * 2)
     394              length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
     395                                                               len(buffer))
     396              if length:
     397                  return buffer[:length]
     398          return path
     399  else:
     400      _unlink = os.unlink
     401      _rmdir = os.rmdir
     402  
     403      def _rmtree(path):
     404          import shutil
     405          try:
     406              shutil.rmtree(path)
     407              return
     408          except OSError:
     409              pass
     410  
     411          def _rmtree_inner(path):
     412              from test.support import _force_run
     413              for name in _force_run(path, os.listdir, path):
     414                  fullname = os.path.join(path, name)
     415                  try:
     416                      mode = os.lstat(fullname).st_mode
     417                  except OSError:
     418                      mode = 0
     419                  if stat.S_ISDIR(mode):
     420                      _rmtree_inner(fullname)
     421                      _force_run(path, os.rmdir, fullname)
     422                  else:
     423                      _force_run(path, os.unlink, fullname)
     424          _rmtree_inner(path)
     425          os.rmdir(path)
     426  
     427      def _longpath(path):
     428          return path
     429  
     430  
     431  def rmdir(dirname):
     432      try:
     433          _rmdir(dirname)
     434      except FileNotFoundError:
     435          pass
     436  
     437  
     438  def rmtree(path):
     439      try:
     440          _rmtree(path)
     441      except FileNotFoundError:
     442          pass
     443  
     444  
     445  @contextlib.contextmanager
     446  def temp_dir(path=None, quiet=False):
     447      """Return a context manager that creates a temporary directory.
     448  
     449      Arguments:
     450  
     451        path: the directory to create temporarily.  If omitted or None,
     452          defaults to creating a temporary directory using tempfile.mkdtemp.
     453  
     454        quiet: if False (the default), the context manager raises an exception
     455          on error.  Otherwise, if the path is specified and cannot be
     456          created, only a warning is issued.
     457  
     458      """
     459      import tempfile
     460      dir_created = False
     461      if path is None:
     462          path = tempfile.mkdtemp()
     463          dir_created = True
     464          path = os.path.realpath(path)
     465      else:
     466          try:
     467              os.mkdir(path)
     468              dir_created = True
     469          except OSError as exc:
     470              if not quiet:
     471                  raise
     472              warnings.warn(f'tests may fail, unable to create '
     473                            f'temporary directory {path!r}: {exc}',
     474                            RuntimeWarning, stacklevel=3)
     475      if dir_created:
     476          pid = os.getpid()
     477      try:
     478          yield path
     479      finally:
     480          # In case the process forks, let only the parent remove the
     481          # directory. The child has a different process id. (bpo-30028)
     482          if dir_created and pid == os.getpid():
     483              rmtree(path)
     484  
     485  
     486  @contextlib.contextmanager
     487  def change_cwd(path, quiet=False):
     488      """Return a context manager that changes the current working directory.
     489  
     490      Arguments:
     491  
     492        path: the directory to use as the temporary current working directory.
     493  
     494        quiet: if False (the default), the context manager raises an exception
     495          on error.  Otherwise, it issues only a warning and keeps the current
     496          working directory the same.
     497  
     498      """
     499      saved_dir = os.getcwd()
     500      try:
     501          os.chdir(os.path.realpath(path))
     502      except OSError as exc:
     503          if not quiet:
     504              raise
     505          warnings.warn(f'tests may fail, unable to change the current working '
     506                        f'directory to {path!r}: {exc}',
     507                        RuntimeWarning, stacklevel=3)
     508      try:
     509          yield os.getcwd()
     510      finally:
     511          os.chdir(saved_dir)
     512  
     513  
     514  @contextlib.contextmanager
     515  def temp_cwd(name='tempcwd', quiet=False):
     516      """
     517      Context manager that temporarily creates and changes the CWD.
     518  
     519      The function temporarily changes the current working directory
     520      after creating a temporary directory in the current directory with
     521      name *name*.  If *name* is None, the temporary directory is
     522      created using tempfile.mkdtemp.
     523  
     524      If *quiet* is False (default) and it is not possible to
     525      create or change the CWD, an error is raised.  If *quiet* is True,
     526      only a warning is raised and the original CWD is used.
     527  
     528      """
     529      with temp_dir(path=name, quiet=quiet) as temp_path:
     530          with change_cwd(temp_path, quiet=quiet) as cwd_dir:
     531              yield cwd_dir
     532  
     533  
     534  def create_empty_file(filename):
     535      """Create an empty file. If the file already exists, truncate it."""
     536      fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
     537      os.close(fd)
     538  
     539  
     540  @contextlib.contextmanager
     541  def open_dir_fd(path):
     542      """Open a file descriptor to a directory."""
     543      assert os.path.isdir(path)
     544      flags = os.O_RDONLY
     545      if hasattr(os, "O_DIRECTORY"):
     546          flags |= os.O_DIRECTORY
     547      dir_fd = os.open(path, flags)
     548      try:
     549          yield dir_fd
     550      finally:
     551          os.close(dir_fd)
     552  
     553  
     554  def fs_is_case_insensitive(directory):
     555      """Detects if the file system for the specified directory
     556      is case-insensitive."""
     557      import tempfile
     558      with tempfile.NamedTemporaryFile(dir=directory) as base:
     559          base_path = base.name
     560          case_path = base_path.upper()
     561          if case_path == base_path:
     562              case_path = base_path.lower()
     563          try:
     564              return os.path.samefile(base_path, case_path)
     565          except FileNotFoundError:
     566              return False
     567  
     568  
     569  class ESC[4;38;5;81mFakePath:
     570      """Simple implementation of the path protocol.
     571      """
     572      def __init__(self, path):
     573          self.path = path
     574  
     575      def __repr__(self):
     576          return f'<FakePath {self.path!r}>'
     577  
     578      def __fspath__(self):
     579          if (isinstance(self.path, BaseException) or
     580              isinstance(self.path, type) and
     581                  issubclass(self.path, BaseException)):
     582              raise self.path
     583          else:
     584              return self.path
     585  
     586  
     587  def fd_count():
     588      """Count the number of open file descriptors.
     589      """
     590      if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
     591          try:
     592              names = os.listdir("/proc/self/fd")
     593              # Subtract one because listdir() internally opens a file
     594              # descriptor to list the content of the /proc/self/fd/ directory.
     595              return len(names) - 1
     596          except FileNotFoundError:
     597              pass
     598  
     599      MAXFD = 256
     600      if hasattr(os, 'sysconf'):
     601          try:
     602              MAXFD = os.sysconf("SC_OPEN_MAX")
     603          except OSError:
     604              pass
     605  
     606      old_modes = None
     607      if sys.platform == 'win32':
     608          # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
     609          # on invalid file descriptor if Python is compiled in debug mode
     610          try:
     611              import msvcrt
     612              msvcrt.CrtSetReportMode
     613          except (AttributeError, ImportError):
     614              # no msvcrt or a release build
     615              pass
     616          else:
     617              old_modes = {}
     618              for report_type in (msvcrt.CRT_WARN,
     619                                  msvcrt.CRT_ERROR,
     620                                  msvcrt.CRT_ASSERT):
     621                  old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
     622                                                                   0)
     623  
     624      try:
     625          count = 0
     626          for fd in range(MAXFD):
     627              try:
     628                  # Prefer dup() over fstat(). fstat() can require input/output
     629                  # whereas dup() doesn't.
     630                  fd2 = os.dup(fd)
     631              except OSError as e:
     632                  if e.errno != errno.EBADF:
     633                      raise
     634              else:
     635                  os.close(fd2)
     636                  count += 1
     637      finally:
     638          if old_modes is not None:
     639              for report_type in (msvcrt.CRT_WARN,
     640                                  msvcrt.CRT_ERROR,
     641                                  msvcrt.CRT_ASSERT):
     642                  msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
     643  
     644      return count
     645  
     646  
     647  if hasattr(os, "umask"):
     648      @contextlib.contextmanager
     649      def temp_umask(umask):
     650          """Context manager that temporarily sets the process umask."""
     651          oldmask = os.umask(umask)
     652          try:
     653              yield
     654          finally:
     655              os.umask(oldmask)
     656  else:
     657      @contextlib.contextmanager
     658      def temp_umask(umask):
     659          """no-op on platforms without umask()"""
     660          yield
     661  
     662  
     663  class ESC[4;38;5;81mEnvironmentVarGuard(ESC[4;38;5;149mcollectionsESC[4;38;5;149m.ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mMutableMapping):
     664  
     665      """Class to help protect the environment variable properly.  Can be used as
     666      a context manager."""
     667  
     668      def __init__(self):
     669          self._environ = os.environ
     670          self._changed = {}
     671  
     672      def __getitem__(self, envvar):
     673          return self._environ[envvar]
     674  
     675      def __setitem__(self, envvar, value):
     676          # Remember the initial value on the first access
     677          if envvar not in self._changed:
     678              self._changed[envvar] = self._environ.get(envvar)
     679          self._environ[envvar] = value
     680  
     681      def __delitem__(self, envvar):
     682          # Remember the initial value on the first access
     683          if envvar not in self._changed:
     684              self._changed[envvar] = self._environ.get(envvar)
     685          if envvar in self._environ:
     686              del self._environ[envvar]
     687  
     688      def keys(self):
     689          return self._environ.keys()
     690  
     691      def __iter__(self):
     692          return iter(self._environ)
     693  
     694      def __len__(self):
     695          return len(self._environ)
     696  
     697      def set(self, envvar, value):
     698          self[envvar] = value
     699  
     700      def unset(self, envvar):
     701          del self[envvar]
     702  
     703      def copy(self):
     704          # We do what os.environ.copy() does.
     705          return dict(self)
     706  
     707      def __enter__(self):
     708          return self
     709  
     710      def __exit__(self, *ignore_exc):
     711          for (k, v) in self._changed.items():
     712              if v is None:
     713                  if k in self._environ:
     714                      del self._environ[k]
     715              else:
     716                  self._environ[k] = v
     717          os.environ = self._environ