1 """Temporary files.
2
3 This module provides generic, low- and high-level interfaces for
4 creating temporary files and directories. All of the interfaces
5 provided by this module can be used without fear of race conditions
6 except for 'mktemp'. 'mktemp' is subject to race conditions and
7 should not be used; it is provided for backward compatibility only.
8
9 The default path names are returned as str. If you supply bytes as
10 input, all return values will be in bytes. Ex:
11
12 >>> tempfile.mkstemp()
13 (4, '/tmp/tmptpu9nin8')
14 >>> tempfile.mkdtemp(suffix=b'')
15 b'/tmp/tmppbi8f0hy'
16
17 This module also provides some data items to the user:
18
19 TMP_MAX - maximum number of names that will be tried before
20 giving up.
21 tempdir - If this is set to a string before the first use of
22 any routine from this module, it will be considered as
23 another candidate location to store temporary files.
24 """
25
26 __all__ = [
27 "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
28 "SpooledTemporaryFile", "TemporaryDirectory",
29 "mkstemp", "mkdtemp", # low level safe interfaces
30 "mktemp", # deprecated unsafe interface
31 "TMP_MAX", "gettempprefix", # constants
32 "tempdir", "gettempdir",
33 "gettempprefixb", "gettempdirb",
34 ]
35
36
37 # Imports.
38
39 import functools as _functools
40 import warnings as _warnings
41 import io as _io
42 import os as _os
43 import shutil as _shutil
44 import errno as _errno
45 from random import Random as _Random
46 import sys as _sys
47 import types as _types
48 import weakref as _weakref
49 import _thread
50 _allocate_lock = _thread.allocate_lock
51
52 _text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
53 if hasattr(_os, 'O_NOFOLLOW'):
54 _text_openflags |= _os.O_NOFOLLOW
55
56 _bin_openflags = _text_openflags
57 if hasattr(_os, 'O_BINARY'):
58 _bin_openflags |= _os.O_BINARY
59
60 if hasattr(_os, 'TMP_MAX'):
61 TMP_MAX = _os.TMP_MAX
62 else:
63 TMP_MAX = 10000
64
65 # This variable _was_ unused for legacy reasons, see issue 10354.
66 # But as of 3.5 we actually use it at runtime so changing it would
67 # have a possibly desirable side effect... But we do not want to support
68 # that as an API. It is undocumented on purpose. Do not depend on this.
69 template = "tmp"
70
71 # Internal routines.
72
73 _once_lock = _allocate_lock()
74
75
76 def _exists(fn):
77 try:
78 _os.lstat(fn)
79 except OSError:
80 return False
81 else:
82 return True
83
84
85 def _infer_return_type(*args):
86 """Look at the type of all args and divine their implied return type."""
87 return_type = None
88 for arg in args:
89 if arg is None:
90 continue
91
92 if isinstance(arg, _os.PathLike):
93 arg = _os.fspath(arg)
94
95 if isinstance(arg, bytes):
96 if return_type is str:
97 raise TypeError("Can't mix bytes and non-bytes in "
98 "path components.")
99 return_type = bytes
100 else:
101 if return_type is bytes:
102 raise TypeError("Can't mix bytes and non-bytes in "
103 "path components.")
104 return_type = str
105 if return_type is None:
106 if tempdir is None or isinstance(tempdir, str):
107 return str # tempfile APIs return a str by default.
108 else:
109 # we could check for bytes but it'll fail later on anyway
110 return bytes
111 return return_type
112
113
114 def _sanitize_params(prefix, suffix, dir):
115 """Common parameter processing for most APIs in this module."""
116 output_type = _infer_return_type(prefix, suffix, dir)
117 if suffix is None:
118 suffix = output_type()
119 if prefix is None:
120 if output_type is str:
121 prefix = template
122 else:
123 prefix = _os.fsencode(template)
124 if dir is None:
125 if output_type is str:
126 dir = gettempdir()
127 else:
128 dir = gettempdirb()
129 return prefix, suffix, dir, output_type
130
131
132 class ESC[4;38;5;81m_RandomNameSequence:
133 """An instance of _RandomNameSequence generates an endless
134 sequence of unpredictable strings which can safely be incorporated
135 into file names. Each string is eight characters long. Multiple
136 threads can safely use the same instance at the same time.
137
138 _RandomNameSequence is an iterator."""
139
140 characters = "abcdefghijklmnopqrstuvwxyz0123456789_"
141
142 @property
143 def rng(self):
144 cur_pid = _os.getpid()
145 if cur_pid != getattr(self, '_rng_pid', None):
146 self._rng = _Random()
147 self._rng_pid = cur_pid
148 return self._rng
149
150 def __iter__(self):
151 return self
152
153 def __next__(self):
154 return ''.join(self.rng.choices(self.characters, k=8))
155
156 def _candidate_tempdir_list():
157 """Generate a list of candidate temporary directories which
158 _get_default_tempdir will try."""
159
160 dirlist = []
161
162 # First, try the environment.
163 for envname in 'TMPDIR', 'TEMP', 'TMP':
164 dirname = _os.getenv(envname)
165 if dirname: dirlist.append(dirname)
166
167 # Failing that, try OS-specific locations.
168 if _os.name == 'nt':
169 dirlist.extend([ _os.path.expanduser(r'~\AppData\Local\Temp'),
170 _os.path.expandvars(r'%SYSTEMROOT%\Temp'),
171 r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
172 else:
173 dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
174
175 # As a last resort, the current directory.
176 try:
177 dirlist.append(_os.getcwd())
178 except (AttributeError, OSError):
179 dirlist.append(_os.curdir)
180
181 return dirlist
182
183 def _get_default_tempdir():
184 """Calculate the default directory to use for temporary files.
185 This routine should be called exactly once.
186
187 We determine whether or not a candidate temp dir is usable by
188 trying to create and write to a file in that directory. If this
189 is successful, the test file is deleted. To prevent denial of
190 service, the name of the test file must be randomized."""
191
192 namer = _RandomNameSequence()
193 dirlist = _candidate_tempdir_list()
194
195 for dir in dirlist:
196 if dir != _os.curdir:
197 dir = _os.path.abspath(dir)
198 # Try only a few names per directory.
199 for seq in range(100):
200 name = next(namer)
201 filename = _os.path.join(dir, name)
202 try:
203 fd = _os.open(filename, _bin_openflags, 0o600)
204 try:
205 try:
206 _os.write(fd, b'blat')
207 finally:
208 _os.close(fd)
209 finally:
210 _os.unlink(filename)
211 return dir
212 except FileExistsError:
213 pass
214 except PermissionError:
215 # This exception is thrown when a directory with the chosen name
216 # already exists on windows.
217 if (_os.name == 'nt' and _os.path.isdir(dir) and
218 _os.access(dir, _os.W_OK)):
219 continue
220 break # no point trying more names in this directory
221 except OSError:
222 break # no point trying more names in this directory
223 raise FileNotFoundError(_errno.ENOENT,
224 "No usable temporary directory found in %s" %
225 dirlist)
226
227 _name_sequence = None
228
229 def _get_candidate_names():
230 """Common setup sequence for all user-callable interfaces."""
231
232 global _name_sequence
233 if _name_sequence is None:
234 _once_lock.acquire()
235 try:
236 if _name_sequence is None:
237 _name_sequence = _RandomNameSequence()
238 finally:
239 _once_lock.release()
240 return _name_sequence
241
242
243 def _mkstemp_inner(dir, pre, suf, flags, output_type):
244 """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
245
246 dir = _os.path.abspath(dir)
247 names = _get_candidate_names()
248 if output_type is bytes:
249 names = map(_os.fsencode, names)
250
251 for seq in range(TMP_MAX):
252 name = next(names)
253 file = _os.path.join(dir, pre + name + suf)
254 _sys.audit("tempfile.mkstemp", file)
255 try:
256 fd = _os.open(file, flags, 0o600)
257 except FileExistsError:
258 continue # try again
259 except PermissionError:
260 # This exception is thrown when a directory with the chosen name
261 # already exists on windows.
262 if (_os.name == 'nt' and _os.path.isdir(dir) and
263 _os.access(dir, _os.W_OK)):
264 continue
265 else:
266 raise
267 return fd, file
268
269 raise FileExistsError(_errno.EEXIST,
270 "No usable temporary file name found")
271
272
273 # User visible interfaces.
274
275 def gettempprefix():
276 """The default prefix for temporary directories as string."""
277 return _os.fsdecode(template)
278
279 def gettempprefixb():
280 """The default prefix for temporary directories as bytes."""
281 return _os.fsencode(template)
282
283 tempdir = None
284
285 def _gettempdir():
286 """Private accessor for tempfile.tempdir."""
287 global tempdir
288 if tempdir is None:
289 _once_lock.acquire()
290 try:
291 if tempdir is None:
292 tempdir = _get_default_tempdir()
293 finally:
294 _once_lock.release()
295 return tempdir
296
297 def gettempdir():
298 """Returns tempfile.tempdir as str."""
299 return _os.fsdecode(_gettempdir())
300
301 def gettempdirb():
302 """Returns tempfile.tempdir as bytes."""
303 return _os.fsencode(_gettempdir())
304
305 def mkstemp(suffix=None, prefix=None, dir=None, text=False):
306 """User-callable function to create and return a unique temporary
307 file. The return value is a pair (fd, name) where fd is the
308 file descriptor returned by os.open, and name is the filename.
309
310 If 'suffix' is not None, the file name will end with that suffix,
311 otherwise there will be no suffix.
312
313 If 'prefix' is not None, the file name will begin with that prefix,
314 otherwise a default prefix is used.
315
316 If 'dir' is not None, the file will be created in that directory,
317 otherwise a default directory is used.
318
319 If 'text' is specified and true, the file is opened in text
320 mode. Else (the default) the file is opened in binary mode.
321
322 If any of 'suffix', 'prefix' and 'dir' are not None, they must be the
323 same type. If they are bytes, the returned name will be bytes; str
324 otherwise.
325
326 The file is readable and writable only by the creating user ID.
327 If the operating system uses permission bits to indicate whether a
328 file is executable, the file is executable by no one. The file
329 descriptor is not inherited by children of this process.
330
331 Caller is responsible for deleting the file when done with it.
332 """
333
334 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
335
336 if text:
337 flags = _text_openflags
338 else:
339 flags = _bin_openflags
340
341 return _mkstemp_inner(dir, prefix, suffix, flags, output_type)
342
343
344 def mkdtemp(suffix=None, prefix=None, dir=None):
345 """User-callable function to create and return a unique temporary
346 directory. The return value is the pathname of the directory.
347
348 Arguments are as for mkstemp, except that the 'text' argument is
349 not accepted.
350
351 The directory is readable, writable, and searchable only by the
352 creating user.
353
354 Caller is responsible for deleting the directory when done with it.
355 """
356
357 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
358
359 names = _get_candidate_names()
360 if output_type is bytes:
361 names = map(_os.fsencode, names)
362
363 for seq in range(TMP_MAX):
364 name = next(names)
365 file = _os.path.join(dir, prefix + name + suffix)
366 _sys.audit("tempfile.mkdtemp", file)
367 try:
368 _os.mkdir(file, 0o700)
369 except FileExistsError:
370 continue # try again
371 except PermissionError:
372 # This exception is thrown when a directory with the chosen name
373 # already exists on windows.
374 if (_os.name == 'nt' and _os.path.isdir(dir) and
375 _os.access(dir, _os.W_OK)):
376 continue
377 else:
378 raise
379 return _os.path.abspath(file)
380
381 raise FileExistsError(_errno.EEXIST,
382 "No usable temporary directory name found")
383
384 def mktemp(suffix="", prefix=template, dir=None):
385 """User-callable function to return a unique temporary file name. The
386 file is not created.
387
388 Arguments are similar to mkstemp, except that the 'text' argument is
389 not accepted, and suffix=None, prefix=None and bytes file names are not
390 supported.
391
392 THIS FUNCTION IS UNSAFE AND SHOULD NOT BE USED. The file name may
393 refer to a file that did not exist at some point, but by the time
394 you get around to creating it, someone else may have beaten you to
395 the punch.
396 """
397
398 ## from warnings import warn as _warn
399 ## _warn("mktemp is a potential security risk to your program",
400 ## RuntimeWarning, stacklevel=2)
401
402 if dir is None:
403 dir = gettempdir()
404
405 names = _get_candidate_names()
406 for seq in range(TMP_MAX):
407 name = next(names)
408 file = _os.path.join(dir, prefix + name + suffix)
409 if not _exists(file):
410 return file
411
412 raise FileExistsError(_errno.EEXIST,
413 "No usable temporary filename found")
414
415
416 class ESC[4;38;5;81m_TemporaryFileCloser:
417 """A separate object allowing proper closing of a temporary file's
418 underlying file object, without adding a __del__ method to the
419 temporary file."""
420
421 cleanup_called = False
422 close_called = False
423
424 def __init__(self, file, name, delete=True, delete_on_close=True):
425 self.file = file
426 self.name = name
427 self.delete = delete
428 self.delete_on_close = delete_on_close
429
430 def cleanup(self, windows=(_os.name == 'nt'), unlink=_os.unlink):
431 if not self.cleanup_called:
432 self.cleanup_called = True
433 try:
434 if not self.close_called:
435 self.close_called = True
436 self.file.close()
437 finally:
438 # Windows provides delete-on-close as a primitive, in which
439 # case the file was deleted by self.file.close().
440 if self.delete and not (windows and self.delete_on_close):
441 try:
442 unlink(self.name)
443 except FileNotFoundError:
444 pass
445
446 def close(self):
447 if not self.close_called:
448 self.close_called = True
449 try:
450 self.file.close()
451 finally:
452 if self.delete and self.delete_on_close:
453 self.cleanup()
454
455 def __del__(self):
456 self.cleanup()
457
458
459 class ESC[4;38;5;81m_TemporaryFileWrapper:
460 """Temporary file wrapper
461
462 This class provides a wrapper around files opened for
463 temporary use. In particular, it seeks to automatically
464 remove the file when it is no longer needed.
465 """
466
467 def __init__(self, file, name, delete=True, delete_on_close=True):
468 self.file = file
469 self.name = name
470 self._closer = _TemporaryFileCloser(file, name, delete,
471 delete_on_close)
472
473 def __getattr__(self, name):
474 # Attribute lookups are delegated to the underlying file
475 # and cached for non-numeric results
476 # (i.e. methods are cached, closed and friends are not)
477 file = self.__dict__['file']
478 a = getattr(file, name)
479 if hasattr(a, '__call__'):
480 func = a
481 @_functools.wraps(func)
482 def func_wrapper(*args, **kwargs):
483 return func(*args, **kwargs)
484 # Avoid closing the file as long as the wrapper is alive,
485 # see issue #18879.
486 func_wrapper._closer = self._closer
487 a = func_wrapper
488 if not isinstance(a, int):
489 setattr(self, name, a)
490 return a
491
492 # The underlying __enter__ method returns the wrong object
493 # (self.file) so override it to return the wrapper
494 def __enter__(self):
495 self.file.__enter__()
496 return self
497
498 # Need to trap __exit__ as well to ensure the file gets
499 # deleted when used in a with statement
500 def __exit__(self, exc, value, tb):
501 result = self.file.__exit__(exc, value, tb)
502 self._closer.cleanup()
503 return result
504
505 def close(self):
506 """
507 Close the temporary file, possibly deleting it.
508 """
509 self._closer.close()
510
511 # iter() doesn't use __getattr__ to find the __iter__ method
512 def __iter__(self):
513 # Don't return iter(self.file), but yield from it to avoid closing
514 # file as long as it's being used as iterator (see issue #23700). We
515 # can't use 'yield from' here because iter(file) returns the file
516 # object itself, which has a close method, and thus the file would get
517 # closed when the generator is finalized, due to PEP380 semantics.
518 for line in self.file:
519 yield line
520
521 def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
522 newline=None, suffix=None, prefix=None,
523 dir=None, delete=True, *, errors=None,
524 delete_on_close=True):
525 """Create and return a temporary file.
526 Arguments:
527 'prefix', 'suffix', 'dir' -- as for mkstemp.
528 'mode' -- the mode argument to io.open (default "w+b").
529 'buffering' -- the buffer size argument to io.open (default -1).
530 'encoding' -- the encoding argument to io.open (default None)
531 'newline' -- the newline argument to io.open (default None)
532 'delete' -- whether the file is automatically deleted (default True).
533 'delete_on_close' -- if 'delete', whether the file is deleted on close
534 (default True) or otherwise either on context manager exit
535 (if context manager was used) or on object finalization. .
536 'errors' -- the errors argument to io.open (default None)
537 The file is created as mkstemp() would do it.
538
539 Returns an object with a file-like interface; the name of the file
540 is accessible as its 'name' attribute. The file will be automatically
541 deleted when it is closed unless the 'delete' argument is set to False.
542
543 On POSIX, NamedTemporaryFiles cannot be automatically deleted if
544 the creating process is terminated abruptly with a SIGKILL signal.
545 Windows can delete the file even in this case.
546 """
547
548 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
549
550 flags = _bin_openflags
551
552 # Setting O_TEMPORARY in the flags causes the OS to delete
553 # the file when it is closed. This is only supported by Windows.
554 if _os.name == 'nt' and delete and delete_on_close:
555 flags |= _os.O_TEMPORARY
556
557 if "b" not in mode:
558 encoding = _io.text_encoding(encoding)
559
560 name = None
561 def opener(*args):
562 nonlocal name
563 fd, name = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
564 return fd
565 try:
566 file = _io.open(dir, mode, buffering=buffering,
567 newline=newline, encoding=encoding, errors=errors,
568 opener=opener)
569 try:
570 raw = getattr(file, 'buffer', file)
571 raw = getattr(raw, 'raw', raw)
572 raw.name = name
573 return _TemporaryFileWrapper(file, name, delete, delete_on_close)
574 except:
575 file.close()
576 raise
577 except:
578 if name is not None and not (
579 _os.name == 'nt' and delete and delete_on_close):
580 _os.unlink(name)
581 raise
582
583 if _os.name != 'posix' or _sys.platform == 'cygwin':
584 # On non-POSIX and Cygwin systems, assume that we cannot unlink a file
585 # while it is open.
586 TemporaryFile = NamedTemporaryFile
587
588 else:
589 # Is the O_TMPFILE flag available and does it work?
590 # The flag is set to False if os.open(dir, os.O_TMPFILE) raises an
591 # IsADirectoryError exception
592 _O_TMPFILE_WORKS = hasattr(_os, 'O_TMPFILE')
593
594 def TemporaryFile(mode='w+b', buffering=-1, encoding=None,
595 newline=None, suffix=None, prefix=None,
596 dir=None, *, errors=None):
597 """Create and return a temporary file.
598 Arguments:
599 'prefix', 'suffix', 'dir' -- as for mkstemp.
600 'mode' -- the mode argument to io.open (default "w+b").
601 'buffering' -- the buffer size argument to io.open (default -1).
602 'encoding' -- the encoding argument to io.open (default None)
603 'newline' -- the newline argument to io.open (default None)
604 'errors' -- the errors argument to io.open (default None)
605 The file is created as mkstemp() would do it.
606
607 Returns an object with a file-like interface. The file has no
608 name, and will cease to exist when it is closed.
609 """
610 global _O_TMPFILE_WORKS
611
612 if "b" not in mode:
613 encoding = _io.text_encoding(encoding)
614
615 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
616
617 flags = _bin_openflags
618 if _O_TMPFILE_WORKS:
619 fd = None
620 def opener(*args):
621 nonlocal fd
622 flags2 = (flags | _os.O_TMPFILE) & ~_os.O_CREAT
623 fd = _os.open(dir, flags2, 0o600)
624 return fd
625 try:
626 file = _io.open(dir, mode, buffering=buffering,
627 newline=newline, encoding=encoding,
628 errors=errors, opener=opener)
629 raw = getattr(file, 'buffer', file)
630 raw = getattr(raw, 'raw', raw)
631 raw.name = fd
632 return file
633 except IsADirectoryError:
634 # Linux kernel older than 3.11 ignores the O_TMPFILE flag:
635 # O_TMPFILE is read as O_DIRECTORY. Trying to open a directory
636 # with O_RDWR|O_DIRECTORY fails with IsADirectoryError, a
637 # directory cannot be open to write. Set flag to False to not
638 # try again.
639 _O_TMPFILE_WORKS = False
640 except OSError:
641 # The filesystem of the directory does not support O_TMPFILE.
642 # For example, OSError(95, 'Operation not supported').
643 #
644 # On Linux kernel older than 3.11, trying to open a regular
645 # file (or a symbolic link to a regular file) with O_TMPFILE
646 # fails with NotADirectoryError, because O_TMPFILE is read as
647 # O_DIRECTORY.
648 pass
649 # Fallback to _mkstemp_inner().
650
651 fd = None
652 def opener(*args):
653 nonlocal fd
654 fd, name = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
655 try:
656 _os.unlink(name)
657 except BaseException as e:
658 _os.close(fd)
659 raise
660 return fd
661 file = _io.open(dir, mode, buffering=buffering,
662 newline=newline, encoding=encoding, errors=errors,
663 opener=opener)
664 raw = getattr(file, 'buffer', file)
665 raw = getattr(raw, 'raw', raw)
666 raw.name = fd
667 return file
668
669 class ESC[4;38;5;81mSpooledTemporaryFile(ESC[4;38;5;149m_ioESC[4;38;5;149m.ESC[4;38;5;149mIOBase):
670 """Temporary file wrapper, specialized to switch from BytesIO
671 or StringIO to a real file when it exceeds a certain size or
672 when a fileno is needed.
673 """
674 _rolled = False
675
676 def __init__(self, max_size=0, mode='w+b', buffering=-1,
677 encoding=None, newline=None,
678 suffix=None, prefix=None, dir=None, *, errors=None):
679 if 'b' in mode:
680 self._file = _io.BytesIO()
681 else:
682 encoding = _io.text_encoding(encoding)
683 self._file = _io.TextIOWrapper(_io.BytesIO(),
684 encoding=encoding, errors=errors,
685 newline=newline)
686 self._max_size = max_size
687 self._rolled = False
688 self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering,
689 'suffix': suffix, 'prefix': prefix,
690 'encoding': encoding, 'newline': newline,
691 'dir': dir, 'errors': errors}
692
693 __class_getitem__ = classmethod(_types.GenericAlias)
694
695 def _check(self, file):
696 if self._rolled: return
697 max_size = self._max_size
698 if max_size and file.tell() > max_size:
699 self.rollover()
700
701 def rollover(self):
702 if self._rolled: return
703 file = self._file
704 newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
705 del self._TemporaryFileArgs
706
707 pos = file.tell()
708 if hasattr(newfile, 'buffer'):
709 newfile.buffer.write(file.detach().getvalue())
710 else:
711 newfile.write(file.getvalue())
712 newfile.seek(pos, 0)
713
714 self._rolled = True
715
716 # The method caching trick from NamedTemporaryFile
717 # won't work here, because _file may change from a
718 # BytesIO/StringIO instance to a real file. So we list
719 # all the methods directly.
720
721 # Context management protocol
722 def __enter__(self):
723 if self._file.closed:
724 raise ValueError("Cannot enter context with closed file")
725 return self
726
727 def __exit__(self, exc, value, tb):
728 self._file.close()
729
730 # file protocol
731 def __iter__(self):
732 return self._file.__iter__()
733
734 def __del__(self):
735 if not self.closed:
736 _warnings.warn(
737 "Unclosed file {!r}".format(self),
738 ResourceWarning,
739 stacklevel=2,
740 source=self
741 )
742 self.close()
743
744 def close(self):
745 self._file.close()
746
747 @property
748 def closed(self):
749 return self._file.closed
750
751 @property
752 def encoding(self):
753 return self._file.encoding
754
755 @property
756 def errors(self):
757 return self._file.errors
758
759 def fileno(self):
760 self.rollover()
761 return self._file.fileno()
762
763 def flush(self):
764 self._file.flush()
765
766 def isatty(self):
767 return self._file.isatty()
768
769 @property
770 def mode(self):
771 try:
772 return self._file.mode
773 except AttributeError:
774 return self._TemporaryFileArgs['mode']
775
776 @property
777 def name(self):
778 try:
779 return self._file.name
780 except AttributeError:
781 return None
782
783 @property
784 def newlines(self):
785 return self._file.newlines
786
787 def readable(self):
788 return self._file.readable()
789
790 def read(self, *args):
791 return self._file.read(*args)
792
793 def read1(self, *args):
794 return self._file.read1(*args)
795
796 def readinto(self, b):
797 return self._file.readinto(b)
798
799 def readinto1(self, b):
800 return self._file.readinto1(b)
801
802 def readline(self, *args):
803 return self._file.readline(*args)
804
805 def readlines(self, *args):
806 return self._file.readlines(*args)
807
808 def seekable(self):
809 return self._file.seekable()
810
811 def seek(self, *args):
812 return self._file.seek(*args)
813
814 def tell(self):
815 return self._file.tell()
816
817 def truncate(self, size=None):
818 if size is None:
819 return self._file.truncate()
820 else:
821 if size > self._max_size:
822 self.rollover()
823 return self._file.truncate(size)
824
825 def writable(self):
826 return self._file.writable()
827
828 def write(self, s):
829 file = self._file
830 rv = file.write(s)
831 self._check(file)
832 return rv
833
834 def writelines(self, iterable):
835 file = self._file
836 rv = file.writelines(iterable)
837 self._check(file)
838 return rv
839
840 def detach(self):
841 return self._file.detach()
842
843
844 class ESC[4;38;5;81mTemporaryDirectory:
845 """Create and return a temporary directory. This has the same
846 behavior as mkdtemp but can be used as a context manager. For
847 example:
848
849 with TemporaryDirectory() as tmpdir:
850 ...
851
852 Upon exiting the context, the directory and everything contained
853 in it are removed (unless delete=False is passed or an exception
854 is raised during cleanup and ignore_cleanup_errors is not True).
855
856 Optional Arguments:
857 suffix - A str suffix for the directory name. (see mkdtemp)
858 prefix - A str prefix for the directory name. (see mkdtemp)
859 dir - A directory to create this temp dir in. (see mkdtemp)
860 ignore_cleanup_errors - False; ignore exceptions during cleanup?
861 delete - True; whether the directory is automatically deleted.
862 """
863
864 def __init__(self, suffix=None, prefix=None, dir=None,
865 ignore_cleanup_errors=False, *, delete=True):
866 self.name = mkdtemp(suffix, prefix, dir)
867 self._ignore_cleanup_errors = ignore_cleanup_errors
868 self._delete = delete
869 self._finalizer = _weakref.finalize(
870 self, self._cleanup, self.name,
871 warn_message="Implicitly cleaning up {!r}".format(self),
872 ignore_errors=self._ignore_cleanup_errors, delete=self._delete)
873
874 @classmethod
875 def _rmtree(cls, name, ignore_errors=False):
876 def onexc(func, path, exc):
877 if isinstance(exc, PermissionError):
878 def resetperms(path):
879 try:
880 _os.chflags(path, 0)
881 except AttributeError:
882 pass
883 _os.chmod(path, 0o700)
884
885 try:
886 if path != name:
887 resetperms(_os.path.dirname(path))
888 resetperms(path)
889
890 try:
891 _os.unlink(path)
892 # PermissionError is raised on FreeBSD for directories
893 except (IsADirectoryError, PermissionError):
894 cls._rmtree(path, ignore_errors=ignore_errors)
895 except FileNotFoundError:
896 pass
897 elif isinstance(exc, FileNotFoundError):
898 pass
899 else:
900 if not ignore_errors:
901 raise
902
903 _shutil.rmtree(name, onexc=onexc)
904
905 @classmethod
906 def _cleanup(cls, name, warn_message, ignore_errors=False, delete=True):
907 if delete:
908 cls._rmtree(name, ignore_errors=ignore_errors)
909 _warnings.warn(warn_message, ResourceWarning)
910
911 def __repr__(self):
912 return "<{} {!r}>".format(self.__class__.__name__, self.name)
913
914 def __enter__(self):
915 return self.name
916
917 def __exit__(self, exc, value, tb):
918 if self._delete:
919 self.cleanup()
920
921 def cleanup(self):
922 if self._finalizer.detach() or _os.path.exists(self.name):
923 self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
924
925 __class_getitem__ = classmethod(_types.GenericAlias)