1 """Utility functions for copying and archiving files and directory trees.
2
3 XXX The functions here don't copy the resource fork or other metadata on Mac.
4
5 """
6
7 import os
8 import sys
9 import stat
10 import fnmatch
11 import collections
12 import errno
13 import warnings
14
15 try:
16 import zlib
17 del zlib
18 _ZLIB_SUPPORTED = True
19 except ImportError:
20 _ZLIB_SUPPORTED = False
21
22 try:
23 import bz2
24 del bz2
25 _BZ2_SUPPORTED = True
26 except ImportError:
27 _BZ2_SUPPORTED = False
28
29 try:
30 import lzma
31 del lzma
32 _LZMA_SUPPORTED = True
33 except ImportError:
34 _LZMA_SUPPORTED = False
35
36 _WINDOWS = os.name == 'nt'
37 posix = nt = None
38 if os.name == 'posix':
39 import posix
40 elif _WINDOWS:
41 import nt
42
43 if sys.platform == 'win32':
44 import _winapi
45 else:
46 _winapi = None
47
48 COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
49 # This should never be removed, see rationale in:
50 # https://bugs.python.org/issue43743#msg393429
51 _USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux")
52 _HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS
53
54 # CMD defaults in Windows 10
55 _WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
56
57 __all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
58 "copytree", "move", "rmtree", "Error", "SpecialFileError",
59 "ExecError", "make_archive", "get_archive_formats",
60 "register_archive_format", "unregister_archive_format",
61 "get_unpack_formats", "register_unpack_format",
62 "unregister_unpack_format", "unpack_archive",
63 "ignore_patterns", "chown", "which", "get_terminal_size",
64 "SameFileError"]
65 # disk_usage is added later, if available on the platform
66
67 class ESC[4;38;5;81mError(ESC[4;38;5;149mOSError):
68 pass
69
70 class ESC[4;38;5;81mSameFileError(ESC[4;38;5;149mError):
71 """Raised when source and destination are the same file."""
72
73 class ESC[4;38;5;81mSpecialFileError(ESC[4;38;5;149mOSError):
74 """Raised when trying to do a kind of operation (e.g. copying) which is
75 not supported on a special file (e.g. a named pipe)"""
76
77 class ESC[4;38;5;81mExecError(ESC[4;38;5;149mOSError):
78 """Raised when a command could not be executed"""
79
80 class ESC[4;38;5;81mReadError(ESC[4;38;5;149mOSError):
81 """Raised when an archive cannot be read"""
82
83 class ESC[4;38;5;81mRegistryError(ESC[4;38;5;149mException):
84 """Raised when a registry operation with the archiving
85 and unpacking registries fails"""
86
87 class ESC[4;38;5;81m_GiveupOnFastCopy(ESC[4;38;5;149mException):
88 """Raised as a signal to fallback on using raw read()/write()
89 file copy when fast-copy functions fail to do so.
90 """
91
92 def _fastcopy_fcopyfile(fsrc, fdst, flags):
93 """Copy a regular file content or metadata by using high-performance
94 fcopyfile(3) syscall (macOS).
95 """
96 try:
97 infd = fsrc.fileno()
98 outfd = fdst.fileno()
99 except Exception as err:
100 raise _GiveupOnFastCopy(err) # not a regular file
101
102 try:
103 posix._fcopyfile(infd, outfd, flags)
104 except OSError as err:
105 err.filename = fsrc.name
106 err.filename2 = fdst.name
107 if err.errno in {errno.EINVAL, errno.ENOTSUP}:
108 raise _GiveupOnFastCopy(err)
109 else:
110 raise err from None
111
112 def _fastcopy_sendfile(fsrc, fdst):
113 """Copy data from one regular mmap-like fd to another by using
114 high-performance sendfile(2) syscall.
115 This should work on Linux >= 2.6.33 only.
116 """
117 # Note: copyfileobj() is left alone in order to not introduce any
118 # unexpected breakage. Possible risks by using zero-copy calls
119 # in copyfileobj() are:
120 # - fdst cannot be open in "a"(ppend) mode
121 # - fsrc and fdst may be open in "t"(ext) mode
122 # - fsrc may be a BufferedReader (which hides unread data in a buffer),
123 # GzipFile (which decompresses data), HTTPResponse (which decodes
124 # chunks).
125 # - possibly others (e.g. encrypted fs/partition?)
126 global _USE_CP_SENDFILE
127 try:
128 infd = fsrc.fileno()
129 outfd = fdst.fileno()
130 except Exception as err:
131 raise _GiveupOnFastCopy(err) # not a regular file
132
133 # Hopefully the whole file will be copied in a single call.
134 # sendfile() is called in a loop 'till EOF is reached (0 return)
135 # so a bufsize smaller or bigger than the actual file size
136 # should not make any difference, also in case the file content
137 # changes while being copied.
138 try:
139 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB
140 except OSError:
141 blocksize = 2 ** 27 # 128MiB
142 # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
143 # see bpo-38319.
144 if sys.maxsize < 2 ** 32:
145 blocksize = min(blocksize, 2 ** 30)
146
147 offset = 0
148 while True:
149 try:
150 sent = os.sendfile(outfd, infd, offset, blocksize)
151 except OSError as err:
152 # ...in oder to have a more informative exception.
153 err.filename = fsrc.name
154 err.filename2 = fdst.name
155
156 if err.errno == errno.ENOTSOCK:
157 # sendfile() on this platform (probably Linux < 2.6.33)
158 # does not support copies between regular files (only
159 # sockets).
160 _USE_CP_SENDFILE = False
161 raise _GiveupOnFastCopy(err)
162
163 if err.errno == errno.ENOSPC: # filesystem is full
164 raise err from None
165
166 # Give up on first call and if no data was copied.
167 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
168 raise _GiveupOnFastCopy(err)
169
170 raise err
171 else:
172 if sent == 0:
173 break # EOF
174 offset += sent
175
176 def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
177 """readinto()/memoryview() based variant of copyfileobj().
178 *fsrc* must support readinto() method and both files must be
179 open in binary mode.
180 """
181 # Localize variable access to minimize overhead.
182 fsrc_readinto = fsrc.readinto
183 fdst_write = fdst.write
184 with memoryview(bytearray(length)) as mv:
185 while True:
186 n = fsrc_readinto(mv)
187 if not n:
188 break
189 elif n < length:
190 with mv[:n] as smv:
191 fdst_write(smv)
192 break
193 else:
194 fdst_write(mv)
195
196 def copyfileobj(fsrc, fdst, length=0):
197 """copy data from file-like object fsrc to file-like object fdst"""
198 if not length:
199 length = COPY_BUFSIZE
200 # Localize variable access to minimize overhead.
201 fsrc_read = fsrc.read
202 fdst_write = fdst.write
203 while buf := fsrc_read(length):
204 fdst_write(buf)
205
206 def _samefile(src, dst):
207 # Macintosh, Unix.
208 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
209 try:
210 return os.path.samestat(src.stat(), os.stat(dst))
211 except OSError:
212 return False
213
214 if hasattr(os.path, 'samefile'):
215 try:
216 return os.path.samefile(src, dst)
217 except OSError:
218 return False
219
220 # All other platforms: check for same pathname.
221 return (os.path.normcase(os.path.abspath(src)) ==
222 os.path.normcase(os.path.abspath(dst)))
223
224 def _stat(fn):
225 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
226
227 def _islink(fn):
228 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
229
230 def copyfile(src, dst, *, follow_symlinks=True):
231 """Copy data from src to dst in the most efficient way possible.
232
233 If follow_symlinks is not set and src is a symbolic link, a new
234 symlink will be created instead of copying the file it points to.
235
236 """
237 sys.audit("shutil.copyfile", src, dst)
238
239 if _samefile(src, dst):
240 raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
241
242 file_size = 0
243 for i, fn in enumerate([src, dst]):
244 try:
245 st = _stat(fn)
246 except OSError:
247 # File most likely does not exist
248 pass
249 else:
250 # XXX What about other special files? (sockets, devices...)
251 if stat.S_ISFIFO(st.st_mode):
252 fn = fn.path if isinstance(fn, os.DirEntry) else fn
253 raise SpecialFileError("`%s` is a named pipe" % fn)
254 if _WINDOWS and i == 0:
255 file_size = st.st_size
256
257 if not follow_symlinks and _islink(src):
258 os.symlink(os.readlink(src), dst)
259 else:
260 with open(src, 'rb') as fsrc:
261 try:
262 with open(dst, 'wb') as fdst:
263 # macOS
264 if _HAS_FCOPYFILE:
265 try:
266 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
267 return dst
268 except _GiveupOnFastCopy:
269 pass
270 # Linux
271 elif _USE_CP_SENDFILE:
272 try:
273 _fastcopy_sendfile(fsrc, fdst)
274 return dst
275 except _GiveupOnFastCopy:
276 pass
277 # Windows, see:
278 # https://github.com/python/cpython/pull/7160#discussion_r195405230
279 elif _WINDOWS and file_size > 0:
280 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
281 return dst
282
283 copyfileobj(fsrc, fdst)
284
285 # Issue 43219, raise a less confusing exception
286 except IsADirectoryError as e:
287 if not os.path.exists(dst):
288 raise FileNotFoundError(f'Directory does not exist: {dst}') from e
289 else:
290 raise
291
292 return dst
293
294 def copymode(src, dst, *, follow_symlinks=True):
295 """Copy mode bits from src to dst.
296
297 If follow_symlinks is not set, symlinks aren't followed if and only
298 if both `src` and `dst` are symlinks. If `lchmod` isn't available
299 (e.g. Linux) this method does nothing.
300
301 """
302 sys.audit("shutil.copymode", src, dst)
303
304 if not follow_symlinks and _islink(src) and os.path.islink(dst):
305 if hasattr(os, 'lchmod'):
306 stat_func, chmod_func = os.lstat, os.lchmod
307 else:
308 return
309 else:
310 stat_func, chmod_func = _stat, os.chmod
311
312 st = stat_func(src)
313 chmod_func(dst, stat.S_IMODE(st.st_mode))
314
315 if hasattr(os, 'listxattr'):
316 def _copyxattr(src, dst, *, follow_symlinks=True):
317 """Copy extended filesystem attributes from `src` to `dst`.
318
319 Overwrite existing attributes.
320
321 If `follow_symlinks` is false, symlinks won't be followed.
322
323 """
324
325 try:
326 names = os.listxattr(src, follow_symlinks=follow_symlinks)
327 except OSError as e:
328 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
329 raise
330 return
331 for name in names:
332 try:
333 value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
334 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
335 except OSError as e:
336 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
337 errno.EINVAL, errno.EACCES):
338 raise
339 else:
340 def _copyxattr(*args, **kwargs):
341 pass
342
343 def copystat(src, dst, *, follow_symlinks=True):
344 """Copy file metadata
345
346 Copy the permission bits, last access time, last modification time, and
347 flags from `src` to `dst`. On Linux, copystat() also copies the "extended
348 attributes" where possible. The file contents, owner, and group are
349 unaffected. `src` and `dst` are path-like objects or path names given as
350 strings.
351
352 If the optional flag `follow_symlinks` is not set, symlinks aren't
353 followed if and only if both `src` and `dst` are symlinks.
354 """
355 sys.audit("shutil.copystat", src, dst)
356
357 def _nop(*args, ns=None, follow_symlinks=None):
358 pass
359
360 # follow symlinks (aka don't not follow symlinks)
361 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
362 if follow:
363 # use the real function if it exists
364 def lookup(name):
365 return getattr(os, name, _nop)
366 else:
367 # use the real function only if it exists
368 # *and* it supports follow_symlinks
369 def lookup(name):
370 fn = getattr(os, name, _nop)
371 if fn in os.supports_follow_symlinks:
372 return fn
373 return _nop
374
375 if isinstance(src, os.DirEntry):
376 st = src.stat(follow_symlinks=follow)
377 else:
378 st = lookup("stat")(src, follow_symlinks=follow)
379 mode = stat.S_IMODE(st.st_mode)
380 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
381 follow_symlinks=follow)
382 # We must copy extended attributes before the file is (potentially)
383 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
384 _copyxattr(src, dst, follow_symlinks=follow)
385 try:
386 lookup("chmod")(dst, mode, follow_symlinks=follow)
387 except NotImplementedError:
388 # if we got a NotImplementedError, it's because
389 # * follow_symlinks=False,
390 # * lchown() is unavailable, and
391 # * either
392 # * fchownat() is unavailable or
393 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
394 # (it returned ENOSUP.)
395 # therefore we're out of options--we simply cannot chown the
396 # symlink. give up, suppress the error.
397 # (which is what shutil always did in this circumstance.)
398 pass
399 if hasattr(st, 'st_flags'):
400 try:
401 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow)
402 except OSError as why:
403 for err in 'EOPNOTSUPP', 'ENOTSUP':
404 if hasattr(errno, err) and why.errno == getattr(errno, err):
405 break
406 else:
407 raise
408
409 def copy(src, dst, *, follow_symlinks=True):
410 """Copy data and mode bits ("cp src dst"). Return the file's destination.
411
412 The destination may be a directory.
413
414 If follow_symlinks is false, symlinks won't be followed. This
415 resembles GNU's "cp -P src dst".
416
417 If source and destination are the same file, a SameFileError will be
418 raised.
419
420 """
421 if os.path.isdir(dst):
422 dst = os.path.join(dst, os.path.basename(src))
423 copyfile(src, dst, follow_symlinks=follow_symlinks)
424 copymode(src, dst, follow_symlinks=follow_symlinks)
425 return dst
426
427 def copy2(src, dst, *, follow_symlinks=True):
428 """Copy data and metadata. Return the file's destination.
429
430 Metadata is copied with copystat(). Please see the copystat function
431 for more information.
432
433 The destination may be a directory.
434
435 If follow_symlinks is false, symlinks won't be followed. This
436 resembles GNU's "cp -P src dst".
437 """
438 if os.path.isdir(dst):
439 dst = os.path.join(dst, os.path.basename(src))
440
441 if hasattr(_winapi, "CopyFile2"):
442 src_ = os.fsdecode(src)
443 dst_ = os.fsdecode(dst)
444 flags = _winapi.COPY_FILE_ALLOW_DECRYPTED_DESTINATION # for compat
445 if not follow_symlinks:
446 flags |= _winapi.COPY_FILE_COPY_SYMLINK
447 try:
448 _winapi.CopyFile2(src_, dst_, flags)
449 return dst
450 except OSError as exc:
451 if (exc.winerror == _winapi.ERROR_PRIVILEGE_NOT_HELD
452 and not follow_symlinks):
453 # Likely encountered a symlink we aren't allowed to create.
454 # Fall back on the old code
455 pass
456 elif exc.winerror == _winapi.ERROR_ACCESS_DENIED:
457 # Possibly encountered a hidden or readonly file we can't
458 # overwrite. Fall back on old code
459 pass
460 else:
461 raise
462
463 copyfile(src, dst, follow_symlinks=follow_symlinks)
464 copystat(src, dst, follow_symlinks=follow_symlinks)
465 return dst
466
467 def ignore_patterns(*patterns):
468 """Function that can be used as copytree() ignore parameter.
469
470 Patterns is a sequence of glob-style patterns
471 that are used to exclude files"""
472 def _ignore_patterns(path, names):
473 ignored_names = []
474 for pattern in patterns:
475 ignored_names.extend(fnmatch.filter(names, pattern))
476 return set(ignored_names)
477 return _ignore_patterns
478
479 def _copytree(entries, src, dst, symlinks, ignore, copy_function,
480 ignore_dangling_symlinks, dirs_exist_ok=False):
481 if ignore is not None:
482 ignored_names = ignore(os.fspath(src), [x.name for x in entries])
483 else:
484 ignored_names = set()
485
486 os.makedirs(dst, exist_ok=dirs_exist_ok)
487 errors = []
488 use_srcentry = copy_function is copy2 or copy_function is copy
489
490 for srcentry in entries:
491 if srcentry.name in ignored_names:
492 continue
493 srcname = os.path.join(src, srcentry.name)
494 dstname = os.path.join(dst, srcentry.name)
495 srcobj = srcentry if use_srcentry else srcname
496 try:
497 is_symlink = srcentry.is_symlink()
498 if is_symlink and os.name == 'nt':
499 # Special check for directory junctions, which appear as
500 # symlinks but we want to recurse.
501 lstat = srcentry.stat(follow_symlinks=False)
502 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT:
503 is_symlink = False
504 if is_symlink:
505 linkto = os.readlink(srcname)
506 if symlinks:
507 # We can't just leave it to `copy_function` because legacy
508 # code with a custom `copy_function` may rely on copytree
509 # doing the right thing.
510 os.symlink(linkto, dstname)
511 copystat(srcobj, dstname, follow_symlinks=not symlinks)
512 else:
513 # ignore dangling symlink if the flag is on
514 if not os.path.exists(linkto) and ignore_dangling_symlinks:
515 continue
516 # otherwise let the copy occur. copy2 will raise an error
517 if srcentry.is_dir():
518 copytree(srcobj, dstname, symlinks, ignore,
519 copy_function, ignore_dangling_symlinks,
520 dirs_exist_ok)
521 else:
522 copy_function(srcobj, dstname)
523 elif srcentry.is_dir():
524 copytree(srcobj, dstname, symlinks, ignore, copy_function,
525 ignore_dangling_symlinks, dirs_exist_ok)
526 else:
527 # Will raise a SpecialFileError for unsupported file types
528 copy_function(srcobj, dstname)
529 # catch the Error from the recursive copytree so that we can
530 # continue with other files
531 except Error as err:
532 errors.extend(err.args[0])
533 except OSError as why:
534 errors.append((srcname, dstname, str(why)))
535 try:
536 copystat(src, dst)
537 except OSError as why:
538 # Copying file access times may fail on Windows
539 if getattr(why, 'winerror', None) is None:
540 errors.append((src, dst, str(why)))
541 if errors:
542 raise Error(errors)
543 return dst
544
545 def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
546 ignore_dangling_symlinks=False, dirs_exist_ok=False):
547 """Recursively copy a directory tree and return the destination directory.
548
549 If exception(s) occur, an Error is raised with a list of reasons.
550
551 If the optional symlinks flag is true, symbolic links in the
552 source tree result in symbolic links in the destination tree; if
553 it is false, the contents of the files pointed to by symbolic
554 links are copied. If the file pointed by the symlink doesn't
555 exist, an exception will be added in the list of errors raised in
556 an Error exception at the end of the copy process.
557
558 You can set the optional ignore_dangling_symlinks flag to true if you
559 want to silence this exception. Notice that this has no effect on
560 platforms that don't support os.symlink.
561
562 The optional ignore argument is a callable. If given, it
563 is called with the `src` parameter, which is the directory
564 being visited by copytree(), and `names` which is the list of
565 `src` contents, as returned by os.listdir():
566
567 callable(src, names) -> ignored_names
568
569 Since copytree() is called recursively, the callable will be
570 called once for each directory that is copied. It returns a
571 list of names relative to the `src` directory that should
572 not be copied.
573
574 The optional copy_function argument is a callable that will be used
575 to copy each file. It will be called with the source path and the
576 destination path as arguments. By default, copy2() is used, but any
577 function that supports the same signature (like copy()) can be used.
578
579 If dirs_exist_ok is false (the default) and `dst` already exists, a
580 `FileExistsError` is raised. If `dirs_exist_ok` is true, the copying
581 operation will continue if it encounters existing directories, and files
582 within the `dst` tree will be overwritten by corresponding files from the
583 `src` tree.
584 """
585 sys.audit("shutil.copytree", src, dst)
586 with os.scandir(src) as itr:
587 entries = list(itr)
588 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
589 ignore=ignore, copy_function=copy_function,
590 ignore_dangling_symlinks=ignore_dangling_symlinks,
591 dirs_exist_ok=dirs_exist_ok)
592
593 if hasattr(os.stat_result, 'st_file_attributes'):
594 def _rmtree_islink(path):
595 try:
596 st = os.lstat(path)
597 return (stat.S_ISLNK(st.st_mode) or
598 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
599 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
600 except OSError:
601 return False
602 else:
603 def _rmtree_islink(path):
604 return os.path.islink(path)
605
606 # version vulnerable to race conditions
607 def _rmtree_unsafe(path, onexc):
608 try:
609 with os.scandir(path) as scandir_it:
610 entries = list(scandir_it)
611 except OSError as err:
612 onexc(os.scandir, path, err)
613 entries = []
614 for entry in entries:
615 fullname = entry.path
616 try:
617 is_dir = entry.is_dir(follow_symlinks=False)
618 except OSError:
619 is_dir = False
620
621 if is_dir and not entry.is_junction():
622 try:
623 if entry.is_symlink():
624 # This can only happen if someone replaces
625 # a directory with a symlink after the call to
626 # os.scandir or entry.is_dir above.
627 raise OSError("Cannot call rmtree on a symbolic link")
628 except OSError as err:
629 onexc(os.path.islink, fullname, err)
630 continue
631 _rmtree_unsafe(fullname, onexc)
632 else:
633 try:
634 os.unlink(fullname)
635 except OSError as err:
636 onexc(os.unlink, fullname, err)
637 try:
638 os.rmdir(path)
639 except OSError as err:
640 onexc(os.rmdir, path, err)
641
642 # Version using fd-based APIs to protect against races
643 def _rmtree_safe_fd(topfd, path, onexc):
644 try:
645 with os.scandir(topfd) as scandir_it:
646 entries = list(scandir_it)
647 except OSError as err:
648 err.filename = path
649 onexc(os.scandir, path, err)
650 return
651 for entry in entries:
652 fullname = os.path.join(path, entry.name)
653 try:
654 is_dir = entry.is_dir(follow_symlinks=False)
655 except OSError:
656 is_dir = False
657 else:
658 if is_dir:
659 try:
660 orig_st = entry.stat(follow_symlinks=False)
661 is_dir = stat.S_ISDIR(orig_st.st_mode)
662 except OSError as err:
663 onexc(os.lstat, fullname, err)
664 continue
665 if is_dir:
666 try:
667 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
668 dirfd_closed = False
669 except OSError as err:
670 onexc(os.open, fullname, err)
671 else:
672 try:
673 if os.path.samestat(orig_st, os.fstat(dirfd)):
674 _rmtree_safe_fd(dirfd, fullname, onexc)
675 try:
676 os.close(dirfd)
677 dirfd_closed = True
678 os.rmdir(entry.name, dir_fd=topfd)
679 except OSError as err:
680 onexc(os.rmdir, fullname, err)
681 else:
682 try:
683 # This can only happen if someone replaces
684 # a directory with a symlink after the call to
685 # os.scandir or stat.S_ISDIR above.
686 raise OSError("Cannot call rmtree on a symbolic "
687 "link")
688 except OSError as err:
689 onexc(os.path.islink, fullname, err)
690 finally:
691 if not dirfd_closed:
692 os.close(dirfd)
693 else:
694 try:
695 os.unlink(entry.name, dir_fd=topfd)
696 except OSError as err:
697 onexc(os.unlink, fullname, err)
698
699 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
700 os.supports_dir_fd and
701 os.scandir in os.supports_fd and
702 os.stat in os.supports_follow_symlinks)
703
704 def rmtree(path, ignore_errors=False, onerror=None, *, onexc=None, dir_fd=None):
705 """Recursively delete a directory tree.
706
707 If dir_fd is not None, it should be a file descriptor open to a directory;
708 path will then be relative to that directory.
709 dir_fd may not be implemented on your platform.
710 If it is unavailable, using it will raise a NotImplementedError.
711
712 If ignore_errors is set, errors are ignored; otherwise, if onexc or
713 onerror is set, it is called to handle the error with arguments (func,
714 path, exc_info) where func is platform and implementation dependent;
715 path is the argument to that function that caused it to fail; and
716 the value of exc_info describes the exception. For onexc it is the
717 exception instance, and for onerror it is a tuple as returned by
718 sys.exc_info(). If ignore_errors is false and both onexc and
719 onerror are None, the exception is reraised.
720
721 onerror is deprecated and only remains for backwards compatibility.
722 If both onerror and onexc are set, onerror is ignored and onexc is used.
723 """
724
725 if onerror is not None:
726 warnings.warn("onerror argument is deprecated, use onexc instead",
727 DeprecationWarning, stacklevel=2)
728
729 sys.audit("shutil.rmtree", path, dir_fd)
730 if ignore_errors:
731 def onexc(*args):
732 pass
733 elif onerror is None and onexc is None:
734 def onexc(*args):
735 raise
736 elif onexc is None:
737 if onerror is None:
738 def onexc(*args):
739 raise
740 else:
741 # delegate to onerror
742 def onexc(*args):
743 func, path, exc = args
744 if exc is None:
745 exc_info = None, None, None
746 else:
747 exc_info = type(exc), exc, exc.__traceback__
748 return onerror(func, path, exc_info)
749
750 if _use_fd_functions:
751 # While the unsafe rmtree works fine on bytes, the fd based does not.
752 if isinstance(path, bytes):
753 path = os.fsdecode(path)
754 # Note: To guard against symlink races, we use the standard
755 # lstat()/open()/fstat() trick.
756 try:
757 orig_st = os.lstat(path, dir_fd=dir_fd)
758 except Exception as err:
759 onexc(os.lstat, path, err)
760 return
761 try:
762 fd = os.open(path, os.O_RDONLY, dir_fd=dir_fd)
763 fd_closed = False
764 except Exception as err:
765 onexc(os.open, path, err)
766 return
767 try:
768 if os.path.samestat(orig_st, os.fstat(fd)):
769 _rmtree_safe_fd(fd, path, onexc)
770 try:
771 os.close(fd)
772 fd_closed = True
773 os.rmdir(path, dir_fd=dir_fd)
774 except OSError as err:
775 onexc(os.rmdir, path, err)
776 else:
777 try:
778 # symlinks to directories are forbidden, see bug #1669
779 raise OSError("Cannot call rmtree on a symbolic link")
780 except OSError as err:
781 onexc(os.path.islink, path, err)
782 finally:
783 if not fd_closed:
784 os.close(fd)
785 else:
786 if dir_fd is not None:
787 raise NotImplementedError("dir_fd unavailable on this platform")
788 try:
789 if _rmtree_islink(path):
790 # symlinks to directories are forbidden, see bug #1669
791 raise OSError("Cannot call rmtree on a symbolic link")
792 except OSError as err:
793 onexc(os.path.islink, path, err)
794 # can't continue even if onexc hook returns
795 return
796 return _rmtree_unsafe(path, onexc)
797
798 # Allow introspection of whether or not the hardening against symlink
799 # attacks is supported on the current platform
800 rmtree.avoids_symlink_attacks = _use_fd_functions
801
802 def _basename(path):
803 """A basename() variant which first strips the trailing slash, if present.
804 Thus we always get the last component of the path, even for directories.
805
806 path: Union[PathLike, str]
807
808 e.g.
809 >>> os.path.basename('/bar/foo')
810 'foo'
811 >>> os.path.basename('/bar/foo/')
812 ''
813 >>> _basename('/bar/foo/')
814 'foo'
815 """
816 path = os.fspath(path)
817 sep = os.path.sep + (os.path.altsep or '')
818 return os.path.basename(path.rstrip(sep))
819
820 def move(src, dst, copy_function=copy2):
821 """Recursively move a file or directory to another location. This is
822 similar to the Unix "mv" command. Return the file or directory's
823 destination.
824
825 If the destination is a directory or a symlink to a directory, the source
826 is moved inside the directory. The destination path must not already
827 exist.
828
829 If the destination already exists but is not a directory, it may be
830 overwritten depending on os.rename() semantics.
831
832 If the destination is on our current filesystem, then rename() is used.
833 Otherwise, src is copied to the destination and then removed. Symlinks are
834 recreated under the new name if os.rename() fails because of cross
835 filesystem renames.
836
837 The optional `copy_function` argument is a callable that will be used
838 to copy the source or it will be delegated to `copytree`.
839 By default, copy2() is used, but any function that supports the same
840 signature (like copy()) can be used.
841
842 A lot more could be done here... A look at a mv.c shows a lot of
843 the issues this implementation glosses over.
844
845 """
846 sys.audit("shutil.move", src, dst)
847 real_dst = dst
848 if os.path.isdir(dst):
849 if _samefile(src, dst):
850 # We might be on a case insensitive filesystem,
851 # perform the rename anyway.
852 os.rename(src, dst)
853 return
854
855 # Using _basename instead of os.path.basename is important, as we must
856 # ignore any trailing slash to avoid the basename returning ''
857 real_dst = os.path.join(dst, _basename(src))
858
859 if os.path.exists(real_dst):
860 raise Error("Destination path '%s' already exists" % real_dst)
861 try:
862 os.rename(src, real_dst)
863 except OSError:
864 if os.path.islink(src):
865 linkto = os.readlink(src)
866 os.symlink(linkto, real_dst)
867 os.unlink(src)
868 elif os.path.isdir(src):
869 if _destinsrc(src, dst):
870 raise Error("Cannot move a directory '%s' into itself"
871 " '%s'." % (src, dst))
872 if (_is_immutable(src)
873 or (not os.access(src, os.W_OK) and os.listdir(src)
874 and sys.platform == 'darwin')):
875 raise PermissionError("Cannot move the non-empty directory "
876 "'%s': Lacking write permission to '%s'."
877 % (src, src))
878 copytree(src, real_dst, copy_function=copy_function,
879 symlinks=True)
880 rmtree(src)
881 else:
882 copy_function(src, real_dst)
883 os.unlink(src)
884 return real_dst
885
886 def _destinsrc(src, dst):
887 src = os.path.abspath(src)
888 dst = os.path.abspath(dst)
889 if not src.endswith(os.path.sep):
890 src += os.path.sep
891 if not dst.endswith(os.path.sep):
892 dst += os.path.sep
893 return dst.startswith(src)
894
895 def _is_immutable(src):
896 st = _stat(src)
897 immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
898 return hasattr(st, 'st_flags') and st.st_flags in immutable_states
899
900 def _get_gid(name):
901 """Returns a gid, given a group name."""
902 if name is None:
903 return None
904
905 try:
906 from grp import getgrnam
907 except ImportError:
908 return None
909
910 try:
911 result = getgrnam(name)
912 except KeyError:
913 result = None
914 if result is not None:
915 return result[2]
916 return None
917
918 def _get_uid(name):
919 """Returns an uid, given a user name."""
920 if name is None:
921 return None
922
923 try:
924 from pwd import getpwnam
925 except ImportError:
926 return None
927
928 try:
929 result = getpwnam(name)
930 except KeyError:
931 result = None
932 if result is not None:
933 return result[2]
934 return None
935
936 def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
937 owner=None, group=None, logger=None, root_dir=None):
938 """Create a (possibly compressed) tar file from all the files under
939 'base_dir'.
940
941 'compress' must be "gzip" (the default), "bzip2", "xz", or None.
942
943 'owner' and 'group' can be used to define an owner and a group for the
944 archive that is being built. If not provided, the current owner and group
945 will be used.
946
947 The output tar file will be named 'base_name' + ".tar", possibly plus
948 the appropriate compression extension (".gz", ".bz2", or ".xz").
949
950 Returns the output filename.
951 """
952 if compress is None:
953 tar_compression = ''
954 elif _ZLIB_SUPPORTED and compress == 'gzip':
955 tar_compression = 'gz'
956 elif _BZ2_SUPPORTED and compress == 'bzip2':
957 tar_compression = 'bz2'
958 elif _LZMA_SUPPORTED and compress == 'xz':
959 tar_compression = 'xz'
960 else:
961 raise ValueError("bad value for 'compress', or compression format not "
962 "supported : {0}".format(compress))
963
964 import tarfile # late import for breaking circular dependency
965
966 compress_ext = '.' + tar_compression if compress else ''
967 archive_name = base_name + '.tar' + compress_ext
968 archive_dir = os.path.dirname(archive_name)
969
970 if archive_dir and not os.path.exists(archive_dir):
971 if logger is not None:
972 logger.info("creating %s", archive_dir)
973 if not dry_run:
974 os.makedirs(archive_dir)
975
976 # creating the tarball
977 if logger is not None:
978 logger.info('Creating tar archive')
979
980 uid = _get_uid(owner)
981 gid = _get_gid(group)
982
983 def _set_uid_gid(tarinfo):
984 if gid is not None:
985 tarinfo.gid = gid
986 tarinfo.gname = group
987 if uid is not None:
988 tarinfo.uid = uid
989 tarinfo.uname = owner
990 return tarinfo
991
992 if not dry_run:
993 tar = tarfile.open(archive_name, 'w|%s' % tar_compression)
994 arcname = base_dir
995 if root_dir is not None:
996 base_dir = os.path.join(root_dir, base_dir)
997 try:
998 tar.add(base_dir, arcname, filter=_set_uid_gid)
999 finally:
1000 tar.close()
1001
1002 if root_dir is not None:
1003 archive_name = os.path.abspath(archive_name)
1004 return archive_name
1005
1006 def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0,
1007 logger=None, owner=None, group=None, root_dir=None):
1008 """Create a zip file from all the files under 'base_dir'.
1009
1010 The output zip file will be named 'base_name' + ".zip". Returns the
1011 name of the output zip file.
1012 """
1013 import zipfile # late import for breaking circular dependency
1014
1015 zip_filename = base_name + ".zip"
1016 archive_dir = os.path.dirname(base_name)
1017
1018 if archive_dir and not os.path.exists(archive_dir):
1019 if logger is not None:
1020 logger.info("creating %s", archive_dir)
1021 if not dry_run:
1022 os.makedirs(archive_dir)
1023
1024 if logger is not None:
1025 logger.info("creating '%s' and adding '%s' to it",
1026 zip_filename, base_dir)
1027
1028 if not dry_run:
1029 with zipfile.ZipFile(zip_filename, "w",
1030 compression=zipfile.ZIP_DEFLATED) as zf:
1031 arcname = os.path.normpath(base_dir)
1032 if root_dir is not None:
1033 base_dir = os.path.join(root_dir, base_dir)
1034 base_dir = os.path.normpath(base_dir)
1035 if arcname != os.curdir:
1036 zf.write(base_dir, arcname)
1037 if logger is not None:
1038 logger.info("adding '%s'", base_dir)
1039 for dirpath, dirnames, filenames in os.walk(base_dir):
1040 arcdirpath = dirpath
1041 if root_dir is not None:
1042 arcdirpath = os.path.relpath(arcdirpath, root_dir)
1043 arcdirpath = os.path.normpath(arcdirpath)
1044 for name in sorted(dirnames):
1045 path = os.path.join(dirpath, name)
1046 arcname = os.path.join(arcdirpath, name)
1047 zf.write(path, arcname)
1048 if logger is not None:
1049 logger.info("adding '%s'", path)
1050 for name in filenames:
1051 path = os.path.join(dirpath, name)
1052 path = os.path.normpath(path)
1053 if os.path.isfile(path):
1054 arcname = os.path.join(arcdirpath, name)
1055 zf.write(path, arcname)
1056 if logger is not None:
1057 logger.info("adding '%s'", path)
1058
1059 if root_dir is not None:
1060 zip_filename = os.path.abspath(zip_filename)
1061 return zip_filename
1062
1063 _make_tarball.supports_root_dir = True
1064 _make_zipfile.supports_root_dir = True
1065
1066 # Maps the name of the archive format to a tuple containing:
1067 # * the archiving function
1068 # * extra keyword arguments
1069 # * description
1070 _ARCHIVE_FORMATS = {
1071 'tar': (_make_tarball, [('compress', None)],
1072 "uncompressed tar file"),
1073 }
1074
1075 if _ZLIB_SUPPORTED:
1076 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
1077 "gzip'ed tar-file")
1078 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file")
1079
1080 if _BZ2_SUPPORTED:
1081 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
1082 "bzip2'ed tar-file")
1083
1084 if _LZMA_SUPPORTED:
1085 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
1086 "xz'ed tar-file")
1087
1088 def get_archive_formats():
1089 """Returns a list of supported formats for archiving and unarchiving.
1090
1091 Each element of the returned sequence is a tuple (name, description)
1092 """
1093 formats = [(name, registry[2]) for name, registry in
1094 _ARCHIVE_FORMATS.items()]
1095 formats.sort()
1096 return formats
1097
1098 def register_archive_format(name, function, extra_args=None, description=''):
1099 """Registers an archive format.
1100
1101 name is the name of the format. function is the callable that will be
1102 used to create archives. If provided, extra_args is a sequence of
1103 (name, value) tuples that will be passed as arguments to the callable.
1104 description can be provided to describe the format, and will be returned
1105 by the get_archive_formats() function.
1106 """
1107 if extra_args is None:
1108 extra_args = []
1109 if not callable(function):
1110 raise TypeError('The %s object is not callable' % function)
1111 if not isinstance(extra_args, (tuple, list)):
1112 raise TypeError('extra_args needs to be a sequence')
1113 for element in extra_args:
1114 if not isinstance(element, (tuple, list)) or len(element) !=2:
1115 raise TypeError('extra_args elements are : (arg_name, value)')
1116
1117 _ARCHIVE_FORMATS[name] = (function, extra_args, description)
1118
1119 def unregister_archive_format(name):
1120 del _ARCHIVE_FORMATS[name]
1121
1122 def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
1123 dry_run=0, owner=None, group=None, logger=None):
1124 """Create an archive file (eg. zip or tar).
1125
1126 'base_name' is the name of the file to create, minus any format-specific
1127 extension; 'format' is the archive format: one of "zip", "tar", "gztar",
1128 "bztar", or "xztar". Or any other registered format.
1129
1130 'root_dir' is a directory that will be the root directory of the
1131 archive; ie. we typically chdir into 'root_dir' before creating the
1132 archive. 'base_dir' is the directory where we start archiving from;
1133 ie. 'base_dir' will be the common prefix of all files and
1134 directories in the archive. 'root_dir' and 'base_dir' both default
1135 to the current directory. Returns the name of the archive file.
1136
1137 'owner' and 'group' are used when creating a tar archive. By default,
1138 uses the current owner and group.
1139 """
1140 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
1141 try:
1142 format_info = _ARCHIVE_FORMATS[format]
1143 except KeyError:
1144 raise ValueError("unknown archive format '%s'" % format) from None
1145
1146 kwargs = {'dry_run': dry_run, 'logger': logger,
1147 'owner': owner, 'group': group}
1148
1149 func = format_info[0]
1150 for arg, val in format_info[1]:
1151 kwargs[arg] = val
1152
1153 if base_dir is None:
1154 base_dir = os.curdir
1155
1156 supports_root_dir = getattr(func, 'supports_root_dir', False)
1157 save_cwd = None
1158 if root_dir is not None:
1159 stmd = os.stat(root_dir).st_mode
1160 if not stat.S_ISDIR(stmd):
1161 raise NotADirectoryError(errno.ENOTDIR, 'Not a directory', root_dir)
1162
1163 if supports_root_dir:
1164 # Support path-like base_name here for backwards-compatibility.
1165 base_name = os.fspath(base_name)
1166 kwargs['root_dir'] = root_dir
1167 else:
1168 save_cwd = os.getcwd()
1169 if logger is not None:
1170 logger.debug("changing into '%s'", root_dir)
1171 base_name = os.path.abspath(base_name)
1172 if not dry_run:
1173 os.chdir(root_dir)
1174
1175 try:
1176 filename = func(base_name, base_dir, **kwargs)
1177 finally:
1178 if save_cwd is not None:
1179 if logger is not None:
1180 logger.debug("changing back to '%s'", save_cwd)
1181 os.chdir(save_cwd)
1182
1183 return filename
1184
1185
1186 def get_unpack_formats():
1187 """Returns a list of supported formats for unpacking.
1188
1189 Each element of the returned sequence is a tuple
1190 (name, extensions, description)
1191 """
1192 formats = [(name, info[0], info[3]) for name, info in
1193 _UNPACK_FORMATS.items()]
1194 formats.sort()
1195 return formats
1196
1197 def _check_unpack_options(extensions, function, extra_args):
1198 """Checks what gets registered as an unpacker."""
1199 # first make sure no other unpacker is registered for this extension
1200 existing_extensions = {}
1201 for name, info in _UNPACK_FORMATS.items():
1202 for ext in info[0]:
1203 existing_extensions[ext] = name
1204
1205 for extension in extensions:
1206 if extension in existing_extensions:
1207 msg = '%s is already registered for "%s"'
1208 raise RegistryError(msg % (extension,
1209 existing_extensions[extension]))
1210
1211 if not callable(function):
1212 raise TypeError('The registered function must be a callable')
1213
1214
1215 def register_unpack_format(name, extensions, function, extra_args=None,
1216 description=''):
1217 """Registers an unpack format.
1218
1219 `name` is the name of the format. `extensions` is a list of extensions
1220 corresponding to the format.
1221
1222 `function` is the callable that will be
1223 used to unpack archives. The callable will receive archives to unpack.
1224 If it's unable to handle an archive, it needs to raise a ReadError
1225 exception.
1226
1227 If provided, `extra_args` is a sequence of
1228 (name, value) tuples that will be passed as arguments to the callable.
1229 description can be provided to describe the format, and will be returned
1230 by the get_unpack_formats() function.
1231 """
1232 if extra_args is None:
1233 extra_args = []
1234 _check_unpack_options(extensions, function, extra_args)
1235 _UNPACK_FORMATS[name] = extensions, function, extra_args, description
1236
1237 def unregister_unpack_format(name):
1238 """Removes the pack format from the registry."""
1239 del _UNPACK_FORMATS[name]
1240
1241 def _ensure_directory(path):
1242 """Ensure that the parent directory of `path` exists"""
1243 dirname = os.path.dirname(path)
1244 if not os.path.isdir(dirname):
1245 os.makedirs(dirname)
1246
1247 def _unpack_zipfile(filename, extract_dir):
1248 """Unpack zip `filename` to `extract_dir`
1249 """
1250 import zipfile # late import for breaking circular dependency
1251
1252 if not zipfile.is_zipfile(filename):
1253 raise ReadError("%s is not a zip file" % filename)
1254
1255 zip = zipfile.ZipFile(filename)
1256 try:
1257 for info in zip.infolist():
1258 name = info.filename
1259
1260 # don't extract absolute paths or ones with .. in them
1261 if name.startswith('/') or '..' in name:
1262 continue
1263
1264 targetpath = os.path.join(extract_dir, *name.split('/'))
1265 if not targetpath:
1266 continue
1267
1268 _ensure_directory(targetpath)
1269 if not name.endswith('/'):
1270 # file
1271 with zip.open(name, 'r') as source, \
1272 open(targetpath, 'wb') as target:
1273 copyfileobj(source, target)
1274 finally:
1275 zip.close()
1276
1277 def _unpack_tarfile(filename, extract_dir, *, filter=None):
1278 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
1279 """
1280 import tarfile # late import for breaking circular dependency
1281 try:
1282 tarobj = tarfile.open(filename)
1283 except tarfile.TarError:
1284 raise ReadError(
1285 "%s is not a compressed or uncompressed tar file" % filename)
1286 try:
1287 tarobj.extractall(extract_dir, filter=filter)
1288 finally:
1289 tarobj.close()
1290
1291 # Maps the name of the unpack format to a tuple containing:
1292 # * extensions
1293 # * the unpacking function
1294 # * extra keyword arguments
1295 # * description
1296 _UNPACK_FORMATS = {
1297 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
1298 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"),
1299 }
1300
1301 if _ZLIB_SUPPORTED:
1302 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [],
1303 "gzip'ed tar-file")
1304
1305 if _BZ2_SUPPORTED:
1306 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [],
1307 "bzip2'ed tar-file")
1308
1309 if _LZMA_SUPPORTED:
1310 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [],
1311 "xz'ed tar-file")
1312
1313 def _find_unpack_format(filename):
1314 for name, info in _UNPACK_FORMATS.items():
1315 for extension in info[0]:
1316 if filename.endswith(extension):
1317 return name
1318 return None
1319
1320 def unpack_archive(filename, extract_dir=None, format=None, *, filter=None):
1321 """Unpack an archive.
1322
1323 `filename` is the name of the archive.
1324
1325 `extract_dir` is the name of the target directory, where the archive
1326 is unpacked. If not provided, the current working directory is used.
1327
1328 `format` is the archive format: one of "zip", "tar", "gztar", "bztar",
1329 or "xztar". Or any other registered format. If not provided,
1330 unpack_archive will use the filename extension and see if an unpacker
1331 was registered for that extension.
1332
1333 In case none is found, a ValueError is raised.
1334
1335 If `filter` is given, it is passed to the underlying
1336 extraction function.
1337 """
1338 sys.audit("shutil.unpack_archive", filename, extract_dir, format)
1339
1340 if extract_dir is None:
1341 extract_dir = os.getcwd()
1342
1343 extract_dir = os.fspath(extract_dir)
1344 filename = os.fspath(filename)
1345
1346 if filter is None:
1347 filter_kwargs = {}
1348 else:
1349 filter_kwargs = {'filter': filter}
1350 if format is not None:
1351 try:
1352 format_info = _UNPACK_FORMATS[format]
1353 except KeyError:
1354 raise ValueError("Unknown unpack format '{0}'".format(format)) from None
1355
1356 func = format_info[1]
1357 func(filename, extract_dir, **dict(format_info[2]), **filter_kwargs)
1358 else:
1359 # we need to look at the registered unpackers supported extensions
1360 format = _find_unpack_format(filename)
1361 if format is None:
1362 raise ReadError("Unknown archive format '{0}'".format(filename))
1363
1364 func = _UNPACK_FORMATS[format][1]
1365 kwargs = dict(_UNPACK_FORMATS[format][2]) | filter_kwargs
1366 func(filename, extract_dir, **kwargs)
1367
1368
1369 if hasattr(os, 'statvfs'):
1370
1371 __all__.append('disk_usage')
1372 _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1373 _ntuple_diskusage.total.__doc__ = 'Total space in bytes'
1374 _ntuple_diskusage.used.__doc__ = 'Used space in bytes'
1375 _ntuple_diskusage.free.__doc__ = 'Free space in bytes'
1376
1377 def disk_usage(path):
1378 """Return disk usage statistics about the given path.
1379
1380 Returned value is a named tuple with attributes 'total', 'used' and
1381 'free', which are the amount of total, used and free space, in bytes.
1382 """
1383 st = os.statvfs(path)
1384 free = st.f_bavail * st.f_frsize
1385 total = st.f_blocks * st.f_frsize
1386 used = (st.f_blocks - st.f_bfree) * st.f_frsize
1387 return _ntuple_diskusage(total, used, free)
1388
1389 elif _WINDOWS:
1390
1391 __all__.append('disk_usage')
1392 _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1393
1394 def disk_usage(path):
1395 """Return disk usage statistics about the given path.
1396
1397 Returned values is a named tuple with attributes 'total', 'used' and
1398 'free', which are the amount of total, used and free space, in bytes.
1399 """
1400 total, free = nt._getdiskusage(path)
1401 used = total - free
1402 return _ntuple_diskusage(total, used, free)
1403
1404
1405 def chown(path, user=None, group=None):
1406 """Change owner user and group of the given path.
1407
1408 user and group can be the uid/gid or the user/group names, and in that case,
1409 they are converted to their respective uid/gid.
1410 """
1411 sys.audit('shutil.chown', path, user, group)
1412
1413 if user is None and group is None:
1414 raise ValueError("user and/or group must be set")
1415
1416 _user = user
1417 _group = group
1418
1419 # -1 means don't change it
1420 if user is None:
1421 _user = -1
1422 # user can either be an int (the uid) or a string (the system username)
1423 elif isinstance(user, str):
1424 _user = _get_uid(user)
1425 if _user is None:
1426 raise LookupError("no such user: {!r}".format(user))
1427
1428 if group is None:
1429 _group = -1
1430 elif not isinstance(group, int):
1431 _group = _get_gid(group)
1432 if _group is None:
1433 raise LookupError("no such group: {!r}".format(group))
1434
1435 os.chown(path, _user, _group)
1436
1437 def get_terminal_size(fallback=(80, 24)):
1438 """Get the size of the terminal window.
1439
1440 For each of the two dimensions, the environment variable, COLUMNS
1441 and LINES respectively, is checked. If the variable is defined and
1442 the value is a positive integer, it is used.
1443
1444 When COLUMNS or LINES is not defined, which is the common case,
1445 the terminal connected to sys.__stdout__ is queried
1446 by invoking os.get_terminal_size.
1447
1448 If the terminal size cannot be successfully queried, either because
1449 the system doesn't support querying, or because we are not
1450 connected to a terminal, the value given in fallback parameter
1451 is used. Fallback defaults to (80, 24) which is the default
1452 size used by many terminal emulators.
1453
1454 The value returned is a named tuple of type os.terminal_size.
1455 """
1456 # columns, lines are the working values
1457 try:
1458 columns = int(os.environ['COLUMNS'])
1459 except (KeyError, ValueError):
1460 columns = 0
1461
1462 try:
1463 lines = int(os.environ['LINES'])
1464 except (KeyError, ValueError):
1465 lines = 0
1466
1467 # only query if necessary
1468 if columns <= 0 or lines <= 0:
1469 try:
1470 size = os.get_terminal_size(sys.__stdout__.fileno())
1471 except (AttributeError, ValueError, OSError):
1472 # stdout is None, closed, detached, or not a terminal, or
1473 # os.get_terminal_size() is unsupported
1474 size = os.terminal_size(fallback)
1475 if columns <= 0:
1476 columns = size.columns or fallback[0]
1477 if lines <= 0:
1478 lines = size.lines or fallback[1]
1479
1480 return os.terminal_size((columns, lines))
1481
1482
1483 # Check that a given file can be accessed with the correct mode.
1484 # Additionally check that `file` is not a directory, as on Windows
1485 # directories pass the os.access check.
1486 def _access_check(fn, mode):
1487 return (os.path.exists(fn) and os.access(fn, mode)
1488 and not os.path.isdir(fn))
1489
1490
1491 def _win_path_needs_curdir(cmd, mode):
1492 """
1493 On Windows, we can use NeedCurrentDirectoryForExePath to figure out
1494 if we should add the cwd to PATH when searching for executables if
1495 the mode is executable.
1496 """
1497 return (not (mode & os.X_OK)) or _winapi.NeedCurrentDirectoryForExePath(
1498 os.fsdecode(cmd))
1499
1500
1501 def which(cmd, mode=os.F_OK | os.X_OK, path=None):
1502 """Given a command, mode, and a PATH string, return the path which
1503 conforms to the given mode on the PATH, or None if there is no such
1504 file.
1505
1506 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
1507 of os.environ.get("PATH"), or can be overridden with a custom search
1508 path.
1509
1510 """
1511 use_bytes = isinstance(cmd, bytes)
1512
1513 # If we're given a path with a directory part, look it up directly rather
1514 # than referring to PATH directories. This includes checking relative to
1515 # the current directory, e.g. ./script
1516 dirname, cmd = os.path.split(cmd)
1517 if dirname:
1518 path = [dirname]
1519 else:
1520 if path is None:
1521 path = os.environ.get("PATH", None)
1522 if path is None:
1523 try:
1524 path = os.confstr("CS_PATH")
1525 except (AttributeError, ValueError):
1526 # os.confstr() or CS_PATH is not available
1527 path = os.defpath
1528 # bpo-35755: Don't use os.defpath if the PATH environment variable
1529 # is set to an empty string
1530
1531 # PATH='' doesn't match, whereas PATH=':' looks in the current
1532 # directory
1533 if not path:
1534 return None
1535
1536 if use_bytes:
1537 path = os.fsencode(path)
1538 path = path.split(os.fsencode(os.pathsep))
1539 else:
1540 path = os.fsdecode(path)
1541 path = path.split(os.pathsep)
1542
1543 if sys.platform == "win32" and _win_path_needs_curdir(cmd, mode):
1544 curdir = os.curdir
1545 if use_bytes:
1546 curdir = os.fsencode(curdir)
1547 path.insert(0, curdir)
1548
1549 if sys.platform == "win32":
1550 # PATHEXT is necessary to check on Windows.
1551 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
1552 pathext = [ext for ext in pathext_source.split(os.pathsep) if ext]
1553
1554 if use_bytes:
1555 pathext = [os.fsencode(ext) for ext in pathext]
1556
1557 # Always try checking the originally given cmd, if it doesn't match, try pathext
1558 files = [cmd] + [cmd + ext for ext in pathext]
1559 else:
1560 # On other platforms you don't have things like PATHEXT to tell you
1561 # what file suffixes are executable, so just pass on cmd as-is.
1562 files = [cmd]
1563
1564 seen = set()
1565 for dir in path:
1566 normdir = os.path.normcase(dir)
1567 if not normdir in seen:
1568 seen.add(normdir)
1569 for thefile in files:
1570 name = os.path.join(dir, thefile)
1571 if _access_check(name, mode):
1572 return name
1573 return None