1 """Utility functions for copying and archiving files and directory trees.
2
3 XXX The functions here don't copy the resource fork or other metadata on Mac.
4
5 """
6
7 import os
8 import sys
9 import stat
10 import fnmatch
11 import collections
12 import errno
13
14 try:
15 import zlib
16 del zlib
17 _ZLIB_SUPPORTED = True
18 except ImportError:
19 _ZLIB_SUPPORTED = False
20
21 try:
22 import bz2
23 del bz2
24 _BZ2_SUPPORTED = True
25 except ImportError:
26 _BZ2_SUPPORTED = False
27
28 try:
29 import lzma
30 del lzma
31 _LZMA_SUPPORTED = True
32 except ImportError:
33 _LZMA_SUPPORTED = False
34
35 _WINDOWS = os.name == 'nt'
36 posix = nt = None
37 if os.name == 'posix':
38 import posix
39 elif _WINDOWS:
40 import nt
41
42 COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
43 # This should never be removed, see rationale in:
44 # https://bugs.python.org/issue43743#msg393429
45 _USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux")
46 _HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS
47
48 # CMD defaults in Windows 10
49 _WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
50
51 __all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
52 "copytree", "move", "rmtree", "Error", "SpecialFileError",
53 "ExecError", "make_archive", "get_archive_formats",
54 "register_archive_format", "unregister_archive_format",
55 "get_unpack_formats", "register_unpack_format",
56 "unregister_unpack_format", "unpack_archive",
57 "ignore_patterns", "chown", "which", "get_terminal_size",
58 "SameFileError"]
59 # disk_usage is added later, if available on the platform
60
61 class ESC[4;38;5;81mError(ESC[4;38;5;149mOSError):
62 pass
63
64 class ESC[4;38;5;81mSameFileError(ESC[4;38;5;149mError):
65 """Raised when source and destination are the same file."""
66
67 class ESC[4;38;5;81mSpecialFileError(ESC[4;38;5;149mOSError):
68 """Raised when trying to do a kind of operation (e.g. copying) which is
69 not supported on a special file (e.g. a named pipe)"""
70
71 class ESC[4;38;5;81mExecError(ESC[4;38;5;149mOSError):
72 """Raised when a command could not be executed"""
73
74 class ESC[4;38;5;81mReadError(ESC[4;38;5;149mOSError):
75 """Raised when an archive cannot be read"""
76
77 class ESC[4;38;5;81mRegistryError(ESC[4;38;5;149mException):
78 """Raised when a registry operation with the archiving
79 and unpacking registries fails"""
80
81 class ESC[4;38;5;81m_GiveupOnFastCopy(ESC[4;38;5;149mException):
82 """Raised as a signal to fallback on using raw read()/write()
83 file copy when fast-copy functions fail to do so.
84 """
85
86 def _fastcopy_fcopyfile(fsrc, fdst, flags):
87 """Copy a regular file content or metadata by using high-performance
88 fcopyfile(3) syscall (macOS).
89 """
90 try:
91 infd = fsrc.fileno()
92 outfd = fdst.fileno()
93 except Exception as err:
94 raise _GiveupOnFastCopy(err) # not a regular file
95
96 try:
97 posix._fcopyfile(infd, outfd, flags)
98 except OSError as err:
99 err.filename = fsrc.name
100 err.filename2 = fdst.name
101 if err.errno in {errno.EINVAL, errno.ENOTSUP}:
102 raise _GiveupOnFastCopy(err)
103 else:
104 raise err from None
105
106 def _fastcopy_sendfile(fsrc, fdst):
107 """Copy data from one regular mmap-like fd to another by using
108 high-performance sendfile(2) syscall.
109 This should work on Linux >= 2.6.33 only.
110 """
111 # Note: copyfileobj() is left alone in order to not introduce any
112 # unexpected breakage. Possible risks by using zero-copy calls
113 # in copyfileobj() are:
114 # - fdst cannot be open in "a"(ppend) mode
115 # - fsrc and fdst may be open in "t"(ext) mode
116 # - fsrc may be a BufferedReader (which hides unread data in a buffer),
117 # GzipFile (which decompresses data), HTTPResponse (which decodes
118 # chunks).
119 # - possibly others (e.g. encrypted fs/partition?)
120 global _USE_CP_SENDFILE
121 try:
122 infd = fsrc.fileno()
123 outfd = fdst.fileno()
124 except Exception as err:
125 raise _GiveupOnFastCopy(err) # not a regular file
126
127 # Hopefully the whole file will be copied in a single call.
128 # sendfile() is called in a loop 'till EOF is reached (0 return)
129 # so a bufsize smaller or bigger than the actual file size
130 # should not make any difference, also in case the file content
131 # changes while being copied.
132 try:
133 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB
134 except OSError:
135 blocksize = 2 ** 27 # 128MiB
136 # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
137 # see bpo-38319.
138 if sys.maxsize < 2 ** 32:
139 blocksize = min(blocksize, 2 ** 30)
140
141 offset = 0
142 while True:
143 try:
144 sent = os.sendfile(outfd, infd, offset, blocksize)
145 except OSError as err:
146 # ...in oder to have a more informative exception.
147 err.filename = fsrc.name
148 err.filename2 = fdst.name
149
150 if err.errno == errno.ENOTSOCK:
151 # sendfile() on this platform (probably Linux < 2.6.33)
152 # does not support copies between regular files (only
153 # sockets).
154 _USE_CP_SENDFILE = False
155 raise _GiveupOnFastCopy(err)
156
157 if err.errno == errno.ENOSPC: # filesystem is full
158 raise err from None
159
160 # Give up on first call and if no data was copied.
161 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
162 raise _GiveupOnFastCopy(err)
163
164 raise err
165 else:
166 if sent == 0:
167 break # EOF
168 offset += sent
169
170 def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
171 """readinto()/memoryview() based variant of copyfileobj().
172 *fsrc* must support readinto() method and both files must be
173 open in binary mode.
174 """
175 # Localize variable access to minimize overhead.
176 fsrc_readinto = fsrc.readinto
177 fdst_write = fdst.write
178 with memoryview(bytearray(length)) as mv:
179 while True:
180 n = fsrc_readinto(mv)
181 if not n:
182 break
183 elif n < length:
184 with mv[:n] as smv:
185 fdst.write(smv)
186 else:
187 fdst_write(mv)
188
189 def copyfileobj(fsrc, fdst, length=0):
190 """copy data from file-like object fsrc to file-like object fdst"""
191 if not length:
192 length = COPY_BUFSIZE
193 # Localize variable access to minimize overhead.
194 fsrc_read = fsrc.read
195 fdst_write = fdst.write
196 while True:
197 buf = fsrc_read(length)
198 if not buf:
199 break
200 fdst_write(buf)
201
202 def _samefile(src, dst):
203 # Macintosh, Unix.
204 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
205 try:
206 return os.path.samestat(src.stat(), os.stat(dst))
207 except OSError:
208 return False
209
210 if hasattr(os.path, 'samefile'):
211 try:
212 return os.path.samefile(src, dst)
213 except OSError:
214 return False
215
216 # All other platforms: check for same pathname.
217 return (os.path.normcase(os.path.abspath(src)) ==
218 os.path.normcase(os.path.abspath(dst)))
219
220 def _stat(fn):
221 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
222
223 def _islink(fn):
224 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
225
226 def copyfile(src, dst, *, follow_symlinks=True):
227 """Copy data from src to dst in the most efficient way possible.
228
229 If follow_symlinks is not set and src is a symbolic link, a new
230 symlink will be created instead of copying the file it points to.
231
232 """
233 sys.audit("shutil.copyfile", src, dst)
234
235 if _samefile(src, dst):
236 raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
237
238 file_size = 0
239 for i, fn in enumerate([src, dst]):
240 try:
241 st = _stat(fn)
242 except OSError:
243 # File most likely does not exist
244 pass
245 else:
246 # XXX What about other special files? (sockets, devices...)
247 if stat.S_ISFIFO(st.st_mode):
248 fn = fn.path if isinstance(fn, os.DirEntry) else fn
249 raise SpecialFileError("`%s` is a named pipe" % fn)
250 if _WINDOWS and i == 0:
251 file_size = st.st_size
252
253 if not follow_symlinks and _islink(src):
254 os.symlink(os.readlink(src), dst)
255 else:
256 with open(src, 'rb') as fsrc:
257 try:
258 with open(dst, 'wb') as fdst:
259 # macOS
260 if _HAS_FCOPYFILE:
261 try:
262 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
263 return dst
264 except _GiveupOnFastCopy:
265 pass
266 # Linux
267 elif _USE_CP_SENDFILE:
268 try:
269 _fastcopy_sendfile(fsrc, fdst)
270 return dst
271 except _GiveupOnFastCopy:
272 pass
273 # Windows, see:
274 # https://github.com/python/cpython/pull/7160#discussion_r195405230
275 elif _WINDOWS and file_size > 0:
276 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
277 return dst
278
279 copyfileobj(fsrc, fdst)
280
281 # Issue 43219, raise a less confusing exception
282 except IsADirectoryError as e:
283 if not os.path.exists(dst):
284 raise FileNotFoundError(f'Directory does not exist: {dst}') from e
285 else:
286 raise
287
288 return dst
289
290 def copymode(src, dst, *, follow_symlinks=True):
291 """Copy mode bits from src to dst.
292
293 If follow_symlinks is not set, symlinks aren't followed if and only
294 if both `src` and `dst` are symlinks. If `lchmod` isn't available
295 (e.g. Linux) this method does nothing.
296
297 """
298 sys.audit("shutil.copymode", src, dst)
299
300 if not follow_symlinks and _islink(src) and os.path.islink(dst):
301 if hasattr(os, 'lchmod'):
302 stat_func, chmod_func = os.lstat, os.lchmod
303 else:
304 return
305 else:
306 stat_func, chmod_func = _stat, os.chmod
307
308 st = stat_func(src)
309 chmod_func(dst, stat.S_IMODE(st.st_mode))
310
311 if hasattr(os, 'listxattr'):
312 def _copyxattr(src, dst, *, follow_symlinks=True):
313 """Copy extended filesystem attributes from `src` to `dst`.
314
315 Overwrite existing attributes.
316
317 If `follow_symlinks` is false, symlinks won't be followed.
318
319 """
320
321 try:
322 names = os.listxattr(src, follow_symlinks=follow_symlinks)
323 except OSError as e:
324 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
325 raise
326 return
327 for name in names:
328 try:
329 value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
330 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
331 except OSError as e:
332 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
333 errno.EINVAL):
334 raise
335 else:
336 def _copyxattr(*args, **kwargs):
337 pass
338
339 def copystat(src, dst, *, follow_symlinks=True):
340 """Copy file metadata
341
342 Copy the permission bits, last access time, last modification time, and
343 flags from `src` to `dst`. On Linux, copystat() also copies the "extended
344 attributes" where possible. The file contents, owner, and group are
345 unaffected. `src` and `dst` are path-like objects or path names given as
346 strings.
347
348 If the optional flag `follow_symlinks` is not set, symlinks aren't
349 followed if and only if both `src` and `dst` are symlinks.
350 """
351 sys.audit("shutil.copystat", src, dst)
352
353 def _nop(*args, ns=None, follow_symlinks=None):
354 pass
355
356 # follow symlinks (aka don't not follow symlinks)
357 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
358 if follow:
359 # use the real function if it exists
360 def lookup(name):
361 return getattr(os, name, _nop)
362 else:
363 # use the real function only if it exists
364 # *and* it supports follow_symlinks
365 def lookup(name):
366 fn = getattr(os, name, _nop)
367 if fn in os.supports_follow_symlinks:
368 return fn
369 return _nop
370
371 if isinstance(src, os.DirEntry):
372 st = src.stat(follow_symlinks=follow)
373 else:
374 st = lookup("stat")(src, follow_symlinks=follow)
375 mode = stat.S_IMODE(st.st_mode)
376 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
377 follow_symlinks=follow)
378 # We must copy extended attributes before the file is (potentially)
379 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
380 _copyxattr(src, dst, follow_symlinks=follow)
381 try:
382 lookup("chmod")(dst, mode, follow_symlinks=follow)
383 except NotImplementedError:
384 # if we got a NotImplementedError, it's because
385 # * follow_symlinks=False,
386 # * lchown() is unavailable, and
387 # * either
388 # * fchownat() is unavailable or
389 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
390 # (it returned ENOSUP.)
391 # therefore we're out of options--we simply cannot chown the
392 # symlink. give up, suppress the error.
393 # (which is what shutil always did in this circumstance.)
394 pass
395 if hasattr(st, 'st_flags'):
396 try:
397 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow)
398 except OSError as why:
399 for err in 'EOPNOTSUPP', 'ENOTSUP':
400 if hasattr(errno, err) and why.errno == getattr(errno, err):
401 break
402 else:
403 raise
404
405 def copy(src, dst, *, follow_symlinks=True):
406 """Copy data and mode bits ("cp src dst"). Return the file's destination.
407
408 The destination may be a directory.
409
410 If follow_symlinks is false, symlinks won't be followed. This
411 resembles GNU's "cp -P src dst".
412
413 If source and destination are the same file, a SameFileError will be
414 raised.
415
416 """
417 if os.path.isdir(dst):
418 dst = os.path.join(dst, os.path.basename(src))
419 copyfile(src, dst, follow_symlinks=follow_symlinks)
420 copymode(src, dst, follow_symlinks=follow_symlinks)
421 return dst
422
423 def copy2(src, dst, *, follow_symlinks=True):
424 """Copy data and metadata. Return the file's destination.
425
426 Metadata is copied with copystat(). Please see the copystat function
427 for more information.
428
429 The destination may be a directory.
430
431 If follow_symlinks is false, symlinks won't be followed. This
432 resembles GNU's "cp -P src dst".
433 """
434 if os.path.isdir(dst):
435 dst = os.path.join(dst, os.path.basename(src))
436 copyfile(src, dst, follow_symlinks=follow_symlinks)
437 copystat(src, dst, follow_symlinks=follow_symlinks)
438 return dst
439
440 def ignore_patterns(*patterns):
441 """Function that can be used as copytree() ignore parameter.
442
443 Patterns is a sequence of glob-style patterns
444 that are used to exclude files"""
445 def _ignore_patterns(path, names):
446 ignored_names = []
447 for pattern in patterns:
448 ignored_names.extend(fnmatch.filter(names, pattern))
449 return set(ignored_names)
450 return _ignore_patterns
451
452 def _copytree(entries, src, dst, symlinks, ignore, copy_function,
453 ignore_dangling_symlinks, dirs_exist_ok=False):
454 if ignore is not None:
455 ignored_names = ignore(os.fspath(src), [x.name for x in entries])
456 else:
457 ignored_names = ()
458
459 os.makedirs(dst, exist_ok=dirs_exist_ok)
460 errors = []
461 use_srcentry = copy_function is copy2 or copy_function is copy
462
463 for srcentry in entries:
464 if srcentry.name in ignored_names:
465 continue
466 srcname = os.path.join(src, srcentry.name)
467 dstname = os.path.join(dst, srcentry.name)
468 srcobj = srcentry if use_srcentry else srcname
469 try:
470 is_symlink = srcentry.is_symlink()
471 if is_symlink and os.name == 'nt':
472 # Special check for directory junctions, which appear as
473 # symlinks but we want to recurse.
474 lstat = srcentry.stat(follow_symlinks=False)
475 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT:
476 is_symlink = False
477 if is_symlink:
478 linkto = os.readlink(srcname)
479 if symlinks:
480 # We can't just leave it to `copy_function` because legacy
481 # code with a custom `copy_function` may rely on copytree
482 # doing the right thing.
483 os.symlink(linkto, dstname)
484 copystat(srcobj, dstname, follow_symlinks=not symlinks)
485 else:
486 # ignore dangling symlink if the flag is on
487 if not os.path.exists(linkto) and ignore_dangling_symlinks:
488 continue
489 # otherwise let the copy occur. copy2 will raise an error
490 if srcentry.is_dir():
491 copytree(srcobj, dstname, symlinks, ignore,
492 copy_function, ignore_dangling_symlinks,
493 dirs_exist_ok)
494 else:
495 copy_function(srcobj, dstname)
496 elif srcentry.is_dir():
497 copytree(srcobj, dstname, symlinks, ignore, copy_function,
498 ignore_dangling_symlinks, dirs_exist_ok)
499 else:
500 # Will raise a SpecialFileError for unsupported file types
501 copy_function(srcobj, dstname)
502 # catch the Error from the recursive copytree so that we can
503 # continue with other files
504 except Error as err:
505 errors.extend(err.args[0])
506 except OSError as why:
507 errors.append((srcname, dstname, str(why)))
508 try:
509 copystat(src, dst)
510 except OSError as why:
511 # Copying file access times may fail on Windows
512 if getattr(why, 'winerror', None) is None:
513 errors.append((src, dst, str(why)))
514 if errors:
515 raise Error(errors)
516 return dst
517
518 def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
519 ignore_dangling_symlinks=False, dirs_exist_ok=False):
520 """Recursively copy a directory tree and return the destination directory.
521
522 If exception(s) occur, an Error is raised with a list of reasons.
523
524 If the optional symlinks flag is true, symbolic links in the
525 source tree result in symbolic links in the destination tree; if
526 it is false, the contents of the files pointed to by symbolic
527 links are copied. If the file pointed by the symlink doesn't
528 exist, an exception will be added in the list of errors raised in
529 an Error exception at the end of the copy process.
530
531 You can set the optional ignore_dangling_symlinks flag to true if you
532 want to silence this exception. Notice that this has no effect on
533 platforms that don't support os.symlink.
534
535 The optional ignore argument is a callable. If given, it
536 is called with the `src` parameter, which is the directory
537 being visited by copytree(), and `names` which is the list of
538 `src` contents, as returned by os.listdir():
539
540 callable(src, names) -> ignored_names
541
542 Since copytree() is called recursively, the callable will be
543 called once for each directory that is copied. It returns a
544 list of names relative to the `src` directory that should
545 not be copied.
546
547 The optional copy_function argument is a callable that will be used
548 to copy each file. It will be called with the source path and the
549 destination path as arguments. By default, copy2() is used, but any
550 function that supports the same signature (like copy()) can be used.
551
552 If dirs_exist_ok is false (the default) and `dst` already exists, a
553 `FileExistsError` is raised. If `dirs_exist_ok` is true, the copying
554 operation will continue if it encounters existing directories, and files
555 within the `dst` tree will be overwritten by corresponding files from the
556 `src` tree.
557 """
558 sys.audit("shutil.copytree", src, dst)
559 with os.scandir(src) as itr:
560 entries = list(itr)
561 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
562 ignore=ignore, copy_function=copy_function,
563 ignore_dangling_symlinks=ignore_dangling_symlinks,
564 dirs_exist_ok=dirs_exist_ok)
565
566 if hasattr(os.stat_result, 'st_file_attributes'):
567 # Special handling for directory junctions to make them behave like
568 # symlinks for shutil.rmtree, since in general they do not appear as
569 # regular links.
570 def _rmtree_isdir(entry):
571 try:
572 st = entry.stat(follow_symlinks=False)
573 return (stat.S_ISDIR(st.st_mode) and not
574 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
575 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
576 except OSError:
577 return False
578
579 def _rmtree_islink(path):
580 try:
581 st = os.lstat(path)
582 return (stat.S_ISLNK(st.st_mode) or
583 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
584 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
585 except OSError:
586 return False
587 else:
588 def _rmtree_isdir(entry):
589 try:
590 return entry.is_dir(follow_symlinks=False)
591 except OSError:
592 return False
593
594 def _rmtree_islink(path):
595 return os.path.islink(path)
596
597 # version vulnerable to race conditions
598 def _rmtree_unsafe(path, onerror):
599 try:
600 with os.scandir(path) as scandir_it:
601 entries = list(scandir_it)
602 except OSError:
603 onerror(os.scandir, path, sys.exc_info())
604 entries = []
605 for entry in entries:
606 fullname = entry.path
607 if _rmtree_isdir(entry):
608 try:
609 if entry.is_symlink():
610 # This can only happen if someone replaces
611 # a directory with a symlink after the call to
612 # os.scandir or entry.is_dir above.
613 raise OSError("Cannot call rmtree on a symbolic link")
614 except OSError:
615 onerror(os.path.islink, fullname, sys.exc_info())
616 continue
617 _rmtree_unsafe(fullname, onerror)
618 else:
619 try:
620 os.unlink(fullname)
621 except OSError:
622 onerror(os.unlink, fullname, sys.exc_info())
623 try:
624 os.rmdir(path)
625 except OSError:
626 onerror(os.rmdir, path, sys.exc_info())
627
628 # Version using fd-based APIs to protect against races
629 def _rmtree_safe_fd(topfd, path, onerror):
630 try:
631 with os.scandir(topfd) as scandir_it:
632 entries = list(scandir_it)
633 except OSError as err:
634 err.filename = path
635 onerror(os.scandir, path, sys.exc_info())
636 return
637 for entry in entries:
638 fullname = os.path.join(path, entry.name)
639 try:
640 is_dir = entry.is_dir(follow_symlinks=False)
641 except OSError:
642 is_dir = False
643 else:
644 if is_dir:
645 try:
646 orig_st = entry.stat(follow_symlinks=False)
647 is_dir = stat.S_ISDIR(orig_st.st_mode)
648 except OSError:
649 onerror(os.lstat, fullname, sys.exc_info())
650 continue
651 if is_dir:
652 try:
653 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
654 dirfd_closed = False
655 except OSError:
656 onerror(os.open, fullname, sys.exc_info())
657 else:
658 try:
659 if os.path.samestat(orig_st, os.fstat(dirfd)):
660 _rmtree_safe_fd(dirfd, fullname, onerror)
661 try:
662 os.close(dirfd)
663 dirfd_closed = True
664 os.rmdir(entry.name, dir_fd=topfd)
665 except OSError:
666 onerror(os.rmdir, fullname, sys.exc_info())
667 else:
668 try:
669 # This can only happen if someone replaces
670 # a directory with a symlink after the call to
671 # os.scandir or stat.S_ISDIR above.
672 raise OSError("Cannot call rmtree on a symbolic "
673 "link")
674 except OSError:
675 onerror(os.path.islink, fullname, sys.exc_info())
676 finally:
677 if not dirfd_closed:
678 os.close(dirfd)
679 else:
680 try:
681 os.unlink(entry.name, dir_fd=topfd)
682 except OSError:
683 onerror(os.unlink, fullname, sys.exc_info())
684
685 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
686 os.supports_dir_fd and
687 os.scandir in os.supports_fd and
688 os.stat in os.supports_follow_symlinks)
689
690 def rmtree(path, ignore_errors=False, onerror=None, *, dir_fd=None):
691 """Recursively delete a directory tree.
692
693 If dir_fd is not None, it should be a file descriptor open to a directory;
694 path will then be relative to that directory.
695 dir_fd may not be implemented on your platform.
696 If it is unavailable, using it will raise a NotImplementedError.
697
698 If ignore_errors is set, errors are ignored; otherwise, if onerror
699 is set, it is called to handle the error with arguments (func,
700 path, exc_info) where func is platform and implementation dependent;
701 path is the argument to that function that caused it to fail; and
702 exc_info is a tuple returned by sys.exc_info(). If ignore_errors
703 is false and onerror is None, an exception is raised.
704
705 """
706 sys.audit("shutil.rmtree", path, dir_fd)
707 if ignore_errors:
708 def onerror(*args):
709 pass
710 elif onerror is None:
711 def onerror(*args):
712 raise
713 if _use_fd_functions:
714 # While the unsafe rmtree works fine on bytes, the fd based does not.
715 if isinstance(path, bytes):
716 path = os.fsdecode(path)
717 # Note: To guard against symlink races, we use the standard
718 # lstat()/open()/fstat() trick.
719 try:
720 orig_st = os.lstat(path, dir_fd=dir_fd)
721 except Exception:
722 onerror(os.lstat, path, sys.exc_info())
723 return
724 try:
725 fd = os.open(path, os.O_RDONLY, dir_fd=dir_fd)
726 fd_closed = False
727 except Exception:
728 onerror(os.open, path, sys.exc_info())
729 return
730 try:
731 if os.path.samestat(orig_st, os.fstat(fd)):
732 _rmtree_safe_fd(fd, path, onerror)
733 try:
734 os.close(fd)
735 fd_closed = True
736 os.rmdir(path, dir_fd=dir_fd)
737 except OSError:
738 onerror(os.rmdir, path, sys.exc_info())
739 else:
740 try:
741 # symlinks to directories are forbidden, see bug #1669
742 raise OSError("Cannot call rmtree on a symbolic link")
743 except OSError:
744 onerror(os.path.islink, path, sys.exc_info())
745 finally:
746 if not fd_closed:
747 os.close(fd)
748 else:
749 if dir_fd is not None:
750 raise NotImplementedError("dir_fd unavailable on this platform")
751 try:
752 if _rmtree_islink(path):
753 # symlinks to directories are forbidden, see bug #1669
754 raise OSError("Cannot call rmtree on a symbolic link")
755 except OSError:
756 onerror(os.path.islink, path, sys.exc_info())
757 # can't continue even if onerror hook returns
758 return
759 return _rmtree_unsafe(path, onerror)
760
761 # Allow introspection of whether or not the hardening against symlink
762 # attacks is supported on the current platform
763 rmtree.avoids_symlink_attacks = _use_fd_functions
764
765 def _basename(path):
766 """A basename() variant which first strips the trailing slash, if present.
767 Thus we always get the last component of the path, even for directories.
768
769 path: Union[PathLike, str]
770
771 e.g.
772 >>> os.path.basename('/bar/foo')
773 'foo'
774 >>> os.path.basename('/bar/foo/')
775 ''
776 >>> _basename('/bar/foo/')
777 'foo'
778 """
779 path = os.fspath(path)
780 sep = os.path.sep + (os.path.altsep or '')
781 return os.path.basename(path.rstrip(sep))
782
783 def move(src, dst, copy_function=copy2):
784 """Recursively move a file or directory to another location. This is
785 similar to the Unix "mv" command. Return the file or directory's
786 destination.
787
788 If the destination is a directory or a symlink to a directory, the source
789 is moved inside the directory. The destination path must not already
790 exist.
791
792 If the destination already exists but is not a directory, it may be
793 overwritten depending on os.rename() semantics.
794
795 If the destination is on our current filesystem, then rename() is used.
796 Otherwise, src is copied to the destination and then removed. Symlinks are
797 recreated under the new name if os.rename() fails because of cross
798 filesystem renames.
799
800 The optional `copy_function` argument is a callable that will be used
801 to copy the source or it will be delegated to `copytree`.
802 By default, copy2() is used, but any function that supports the same
803 signature (like copy()) can be used.
804
805 A lot more could be done here... A look at a mv.c shows a lot of
806 the issues this implementation glosses over.
807
808 """
809 sys.audit("shutil.move", src, dst)
810 real_dst = dst
811 if os.path.isdir(dst):
812 if _samefile(src, dst):
813 # We might be on a case insensitive filesystem,
814 # perform the rename anyway.
815 os.rename(src, dst)
816 return
817
818 # Using _basename instead of os.path.basename is important, as we must
819 # ignore any trailing slash to avoid the basename returning ''
820 real_dst = os.path.join(dst, _basename(src))
821
822 if os.path.exists(real_dst):
823 raise Error("Destination path '%s' already exists" % real_dst)
824 try:
825 os.rename(src, real_dst)
826 except OSError:
827 if os.path.islink(src):
828 linkto = os.readlink(src)
829 os.symlink(linkto, real_dst)
830 os.unlink(src)
831 elif os.path.isdir(src):
832 if _destinsrc(src, dst):
833 raise Error("Cannot move a directory '%s' into itself"
834 " '%s'." % (src, dst))
835 if (_is_immutable(src)
836 or (not os.access(src, os.W_OK) and os.listdir(src)
837 and sys.platform == 'darwin')):
838 raise PermissionError("Cannot move the non-empty directory "
839 "'%s': Lacking write permission to '%s'."
840 % (src, src))
841 copytree(src, real_dst, copy_function=copy_function,
842 symlinks=True)
843 rmtree(src)
844 else:
845 copy_function(src, real_dst)
846 os.unlink(src)
847 return real_dst
848
849 def _destinsrc(src, dst):
850 src = os.path.abspath(src)
851 dst = os.path.abspath(dst)
852 if not src.endswith(os.path.sep):
853 src += os.path.sep
854 if not dst.endswith(os.path.sep):
855 dst += os.path.sep
856 return dst.startswith(src)
857
858 def _is_immutable(src):
859 st = _stat(src)
860 immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
861 return hasattr(st, 'st_flags') and st.st_flags in immutable_states
862
863 def _get_gid(name):
864 """Returns a gid, given a group name."""
865 if name is None:
866 return None
867
868 try:
869 from grp import getgrnam
870 except ImportError:
871 return None
872
873 try:
874 result = getgrnam(name)
875 except KeyError:
876 result = None
877 if result is not None:
878 return result[2]
879 return None
880
881 def _get_uid(name):
882 """Returns an uid, given a user name."""
883 if name is None:
884 return None
885
886 try:
887 from pwd import getpwnam
888 except ImportError:
889 return None
890
891 try:
892 result = getpwnam(name)
893 except KeyError:
894 result = None
895 if result is not None:
896 return result[2]
897 return None
898
899 def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
900 owner=None, group=None, logger=None, root_dir=None):
901 """Create a (possibly compressed) tar file from all the files under
902 'base_dir'.
903
904 'compress' must be "gzip" (the default), "bzip2", "xz", or None.
905
906 'owner' and 'group' can be used to define an owner and a group for the
907 archive that is being built. If not provided, the current owner and group
908 will be used.
909
910 The output tar file will be named 'base_name' + ".tar", possibly plus
911 the appropriate compression extension (".gz", ".bz2", or ".xz").
912
913 Returns the output filename.
914 """
915 if compress is None:
916 tar_compression = ''
917 elif _ZLIB_SUPPORTED and compress == 'gzip':
918 tar_compression = 'gz'
919 elif _BZ2_SUPPORTED and compress == 'bzip2':
920 tar_compression = 'bz2'
921 elif _LZMA_SUPPORTED and compress == 'xz':
922 tar_compression = 'xz'
923 else:
924 raise ValueError("bad value for 'compress', or compression format not "
925 "supported : {0}".format(compress))
926
927 import tarfile # late import for breaking circular dependency
928
929 compress_ext = '.' + tar_compression if compress else ''
930 archive_name = base_name + '.tar' + compress_ext
931 archive_dir = os.path.dirname(archive_name)
932
933 if archive_dir and not os.path.exists(archive_dir):
934 if logger is not None:
935 logger.info("creating %s", archive_dir)
936 if not dry_run:
937 os.makedirs(archive_dir)
938
939 # creating the tarball
940 if logger is not None:
941 logger.info('Creating tar archive')
942
943 uid = _get_uid(owner)
944 gid = _get_gid(group)
945
946 def _set_uid_gid(tarinfo):
947 if gid is not None:
948 tarinfo.gid = gid
949 tarinfo.gname = group
950 if uid is not None:
951 tarinfo.uid = uid
952 tarinfo.uname = owner
953 return tarinfo
954
955 if not dry_run:
956 tar = tarfile.open(archive_name, 'w|%s' % tar_compression)
957 arcname = base_dir
958 if root_dir is not None:
959 base_dir = os.path.join(root_dir, base_dir)
960 try:
961 tar.add(base_dir, arcname, filter=_set_uid_gid)
962 finally:
963 tar.close()
964
965 if root_dir is not None:
966 archive_name = os.path.abspath(archive_name)
967 return archive_name
968
969 def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0,
970 logger=None, owner=None, group=None, root_dir=None):
971 """Create a zip file from all the files under 'base_dir'.
972
973 The output zip file will be named 'base_name' + ".zip". Returns the
974 name of the output zip file.
975 """
976 import zipfile # late import for breaking circular dependency
977
978 zip_filename = base_name + ".zip"
979 archive_dir = os.path.dirname(base_name)
980
981 if archive_dir and not os.path.exists(archive_dir):
982 if logger is not None:
983 logger.info("creating %s", archive_dir)
984 if not dry_run:
985 os.makedirs(archive_dir)
986
987 if logger is not None:
988 logger.info("creating '%s' and adding '%s' to it",
989 zip_filename, base_dir)
990
991 if not dry_run:
992 with zipfile.ZipFile(zip_filename, "w",
993 compression=zipfile.ZIP_DEFLATED) as zf:
994 arcname = os.path.normpath(base_dir)
995 if root_dir is not None:
996 base_dir = os.path.join(root_dir, base_dir)
997 base_dir = os.path.normpath(base_dir)
998 if arcname != os.curdir:
999 zf.write(base_dir, arcname)
1000 if logger is not None:
1001 logger.info("adding '%s'", base_dir)
1002 for dirpath, dirnames, filenames in os.walk(base_dir):
1003 arcdirpath = dirpath
1004 if root_dir is not None:
1005 arcdirpath = os.path.relpath(arcdirpath, root_dir)
1006 arcdirpath = os.path.normpath(arcdirpath)
1007 for name in sorted(dirnames):
1008 path = os.path.join(dirpath, name)
1009 arcname = os.path.join(arcdirpath, name)
1010 zf.write(path, arcname)
1011 if logger is not None:
1012 logger.info("adding '%s'", path)
1013 for name in filenames:
1014 path = os.path.join(dirpath, name)
1015 path = os.path.normpath(path)
1016 if os.path.isfile(path):
1017 arcname = os.path.join(arcdirpath, name)
1018 zf.write(path, arcname)
1019 if logger is not None:
1020 logger.info("adding '%s'", path)
1021
1022 if root_dir is not None:
1023 zip_filename = os.path.abspath(zip_filename)
1024 return zip_filename
1025
1026 # Maps the name of the archive format to a tuple containing:
1027 # * the archiving function
1028 # * extra keyword arguments
1029 # * description
1030 # * does it support the root_dir argument?
1031 _ARCHIVE_FORMATS = {
1032 'tar': (_make_tarball, [('compress', None)],
1033 "uncompressed tar file", True),
1034 }
1035
1036 if _ZLIB_SUPPORTED:
1037 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
1038 "gzip'ed tar-file", True)
1039 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file", True)
1040
1041 if _BZ2_SUPPORTED:
1042 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
1043 "bzip2'ed tar-file", True)
1044
1045 if _LZMA_SUPPORTED:
1046 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
1047 "xz'ed tar-file", True)
1048
1049 def get_archive_formats():
1050 """Returns a list of supported formats for archiving and unarchiving.
1051
1052 Each element of the returned sequence is a tuple (name, description)
1053 """
1054 formats = [(name, registry[2]) for name, registry in
1055 _ARCHIVE_FORMATS.items()]
1056 formats.sort()
1057 return formats
1058
1059 def register_archive_format(name, function, extra_args=None, description=''):
1060 """Registers an archive format.
1061
1062 name is the name of the format. function is the callable that will be
1063 used to create archives. If provided, extra_args is a sequence of
1064 (name, value) tuples that will be passed as arguments to the callable.
1065 description can be provided to describe the format, and will be returned
1066 by the get_archive_formats() function.
1067 """
1068 if extra_args is None:
1069 extra_args = []
1070 if not callable(function):
1071 raise TypeError('The %s object is not callable' % function)
1072 if not isinstance(extra_args, (tuple, list)):
1073 raise TypeError('extra_args needs to be a sequence')
1074 for element in extra_args:
1075 if not isinstance(element, (tuple, list)) or len(element) !=2:
1076 raise TypeError('extra_args elements are : (arg_name, value)')
1077
1078 _ARCHIVE_FORMATS[name] = (function, extra_args, description, False)
1079
1080 def unregister_archive_format(name):
1081 del _ARCHIVE_FORMATS[name]
1082
1083 def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
1084 dry_run=0, owner=None, group=None, logger=None):
1085 """Create an archive file (eg. zip or tar).
1086
1087 'base_name' is the name of the file to create, minus any format-specific
1088 extension; 'format' is the archive format: one of "zip", "tar", "gztar",
1089 "bztar", or "xztar". Or any other registered format.
1090
1091 'root_dir' is a directory that will be the root directory of the
1092 archive; ie. we typically chdir into 'root_dir' before creating the
1093 archive. 'base_dir' is the directory where we start archiving from;
1094 ie. 'base_dir' will be the common prefix of all files and
1095 directories in the archive. 'root_dir' and 'base_dir' both default
1096 to the current directory. Returns the name of the archive file.
1097
1098 'owner' and 'group' are used when creating a tar archive. By default,
1099 uses the current owner and group.
1100 """
1101 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
1102 try:
1103 format_info = _ARCHIVE_FORMATS[format]
1104 except KeyError:
1105 raise ValueError("unknown archive format '%s'" % format) from None
1106
1107 kwargs = {'dry_run': dry_run, 'logger': logger,
1108 'owner': owner, 'group': group}
1109
1110 func = format_info[0]
1111 for arg, val in format_info[1]:
1112 kwargs[arg] = val
1113
1114 if base_dir is None:
1115 base_dir = os.curdir
1116
1117 supports_root_dir = format_info[3]
1118 save_cwd = None
1119 if root_dir is not None:
1120 stmd = os.stat(root_dir).st_mode
1121 if not stat.S_ISDIR(stmd):
1122 raise NotADirectoryError(errno.ENOTDIR, 'Not a directory', root_dir)
1123
1124 if supports_root_dir:
1125 # Support path-like base_name here for backwards-compatibility.
1126 base_name = os.fspath(base_name)
1127 kwargs['root_dir'] = root_dir
1128 else:
1129 save_cwd = os.getcwd()
1130 if logger is not None:
1131 logger.debug("changing into '%s'", root_dir)
1132 base_name = os.path.abspath(base_name)
1133 if not dry_run:
1134 os.chdir(root_dir)
1135
1136 try:
1137 filename = func(base_name, base_dir, **kwargs)
1138 finally:
1139 if save_cwd is not None:
1140 if logger is not None:
1141 logger.debug("changing back to '%s'", save_cwd)
1142 os.chdir(save_cwd)
1143
1144 return filename
1145
1146
1147 def get_unpack_formats():
1148 """Returns a list of supported formats for unpacking.
1149
1150 Each element of the returned sequence is a tuple
1151 (name, extensions, description)
1152 """
1153 formats = [(name, info[0], info[3]) for name, info in
1154 _UNPACK_FORMATS.items()]
1155 formats.sort()
1156 return formats
1157
1158 def _check_unpack_options(extensions, function, extra_args):
1159 """Checks what gets registered as an unpacker."""
1160 # first make sure no other unpacker is registered for this extension
1161 existing_extensions = {}
1162 for name, info in _UNPACK_FORMATS.items():
1163 for ext in info[0]:
1164 existing_extensions[ext] = name
1165
1166 for extension in extensions:
1167 if extension in existing_extensions:
1168 msg = '%s is already registered for "%s"'
1169 raise RegistryError(msg % (extension,
1170 existing_extensions[extension]))
1171
1172 if not callable(function):
1173 raise TypeError('The registered function must be a callable')
1174
1175
1176 def register_unpack_format(name, extensions, function, extra_args=None,
1177 description=''):
1178 """Registers an unpack format.
1179
1180 `name` is the name of the format. `extensions` is a list of extensions
1181 corresponding to the format.
1182
1183 `function` is the callable that will be
1184 used to unpack archives. The callable will receive archives to unpack.
1185 If it's unable to handle an archive, it needs to raise a ReadError
1186 exception.
1187
1188 If provided, `extra_args` is a sequence of
1189 (name, value) tuples that will be passed as arguments to the callable.
1190 description can be provided to describe the format, and will be returned
1191 by the get_unpack_formats() function.
1192 """
1193 if extra_args is None:
1194 extra_args = []
1195 _check_unpack_options(extensions, function, extra_args)
1196 _UNPACK_FORMATS[name] = extensions, function, extra_args, description
1197
1198 def unregister_unpack_format(name):
1199 """Removes the pack format from the registry."""
1200 del _UNPACK_FORMATS[name]
1201
1202 def _ensure_directory(path):
1203 """Ensure that the parent directory of `path` exists"""
1204 dirname = os.path.dirname(path)
1205 if not os.path.isdir(dirname):
1206 os.makedirs(dirname)
1207
1208 def _unpack_zipfile(filename, extract_dir):
1209 """Unpack zip `filename` to `extract_dir`
1210 """
1211 import zipfile # late import for breaking circular dependency
1212
1213 if not zipfile.is_zipfile(filename):
1214 raise ReadError("%s is not a zip file" % filename)
1215
1216 zip = zipfile.ZipFile(filename)
1217 try:
1218 for info in zip.infolist():
1219 name = info.filename
1220
1221 # don't extract absolute paths or ones with .. in them
1222 if name.startswith('/') or '..' in name:
1223 continue
1224
1225 targetpath = os.path.join(extract_dir, *name.split('/'))
1226 if not targetpath:
1227 continue
1228
1229 _ensure_directory(targetpath)
1230 if not name.endswith('/'):
1231 # file
1232 with zip.open(name, 'r') as source, \
1233 open(targetpath, 'wb') as target:
1234 copyfileobj(source, target)
1235 finally:
1236 zip.close()
1237
1238 def _unpack_tarfile(filename, extract_dir, *, filter=None):
1239 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
1240 """
1241 import tarfile # late import for breaking circular dependency
1242 try:
1243 tarobj = tarfile.open(filename)
1244 except tarfile.TarError:
1245 raise ReadError(
1246 "%s is not a compressed or uncompressed tar file" % filename)
1247 try:
1248 tarobj.extractall(extract_dir, filter=filter)
1249 finally:
1250 tarobj.close()
1251
1252 # Maps the name of the unpack format to a tuple containing:
1253 # * extensions
1254 # * the unpacking function
1255 # * extra keyword arguments
1256 # * description
1257 _UNPACK_FORMATS = {
1258 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
1259 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"),
1260 }
1261
1262 if _ZLIB_SUPPORTED:
1263 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [],
1264 "gzip'ed tar-file")
1265
1266 if _BZ2_SUPPORTED:
1267 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [],
1268 "bzip2'ed tar-file")
1269
1270 if _LZMA_SUPPORTED:
1271 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [],
1272 "xz'ed tar-file")
1273
1274 def _find_unpack_format(filename):
1275 for name, info in _UNPACK_FORMATS.items():
1276 for extension in info[0]:
1277 if filename.endswith(extension):
1278 return name
1279 return None
1280
1281 def unpack_archive(filename, extract_dir=None, format=None, *, filter=None):
1282 """Unpack an archive.
1283
1284 `filename` is the name of the archive.
1285
1286 `extract_dir` is the name of the target directory, where the archive
1287 is unpacked. If not provided, the current working directory is used.
1288
1289 `format` is the archive format: one of "zip", "tar", "gztar", "bztar",
1290 or "xztar". Or any other registered format. If not provided,
1291 unpack_archive will use the filename extension and see if an unpacker
1292 was registered for that extension.
1293
1294 In case none is found, a ValueError is raised.
1295
1296 If `filter` is given, it is passed to the underlying
1297 extraction function.
1298 """
1299 sys.audit("shutil.unpack_archive", filename, extract_dir, format)
1300
1301 if extract_dir is None:
1302 extract_dir = os.getcwd()
1303
1304 extract_dir = os.fspath(extract_dir)
1305 filename = os.fspath(filename)
1306
1307 if filter is None:
1308 filter_kwargs = {}
1309 else:
1310 filter_kwargs = {'filter': filter}
1311 if format is not None:
1312 try:
1313 format_info = _UNPACK_FORMATS[format]
1314 except KeyError:
1315 raise ValueError("Unknown unpack format '{0}'".format(format)) from None
1316
1317 func = format_info[1]
1318 func(filename, extract_dir, **dict(format_info[2]), **filter_kwargs)
1319 else:
1320 # we need to look at the registered unpackers supported extensions
1321 format = _find_unpack_format(filename)
1322 if format is None:
1323 raise ReadError("Unknown archive format '{0}'".format(filename))
1324
1325 func = _UNPACK_FORMATS[format][1]
1326 kwargs = dict(_UNPACK_FORMATS[format][2]) | filter_kwargs
1327 func(filename, extract_dir, **kwargs)
1328
1329
1330 if hasattr(os, 'statvfs'):
1331
1332 __all__.append('disk_usage')
1333 _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1334 _ntuple_diskusage.total.__doc__ = 'Total space in bytes'
1335 _ntuple_diskusage.used.__doc__ = 'Used space in bytes'
1336 _ntuple_diskusage.free.__doc__ = 'Free space in bytes'
1337
1338 def disk_usage(path):
1339 """Return disk usage statistics about the given path.
1340
1341 Returned value is a named tuple with attributes 'total', 'used' and
1342 'free', which are the amount of total, used and free space, in bytes.
1343 """
1344 st = os.statvfs(path)
1345 free = st.f_bavail * st.f_frsize
1346 total = st.f_blocks * st.f_frsize
1347 used = (st.f_blocks - st.f_bfree) * st.f_frsize
1348 return _ntuple_diskusage(total, used, free)
1349
1350 elif _WINDOWS:
1351
1352 __all__.append('disk_usage')
1353 _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1354
1355 def disk_usage(path):
1356 """Return disk usage statistics about the given path.
1357
1358 Returned values is a named tuple with attributes 'total', 'used' and
1359 'free', which are the amount of total, used and free space, in bytes.
1360 """
1361 total, free = nt._getdiskusage(path)
1362 used = total - free
1363 return _ntuple_diskusage(total, used, free)
1364
1365
1366 def chown(path, user=None, group=None):
1367 """Change owner user and group of the given path.
1368
1369 user and group can be the uid/gid or the user/group names, and in that case,
1370 they are converted to their respective uid/gid.
1371 """
1372 sys.audit('shutil.chown', path, user, group)
1373
1374 if user is None and group is None:
1375 raise ValueError("user and/or group must be set")
1376
1377 _user = user
1378 _group = group
1379
1380 # -1 means don't change it
1381 if user is None:
1382 _user = -1
1383 # user can either be an int (the uid) or a string (the system username)
1384 elif isinstance(user, str):
1385 _user = _get_uid(user)
1386 if _user is None:
1387 raise LookupError("no such user: {!r}".format(user))
1388
1389 if group is None:
1390 _group = -1
1391 elif not isinstance(group, int):
1392 _group = _get_gid(group)
1393 if _group is None:
1394 raise LookupError("no such group: {!r}".format(group))
1395
1396 os.chown(path, _user, _group)
1397
1398 def get_terminal_size(fallback=(80, 24)):
1399 """Get the size of the terminal window.
1400
1401 For each of the two dimensions, the environment variable, COLUMNS
1402 and LINES respectively, is checked. If the variable is defined and
1403 the value is a positive integer, it is used.
1404
1405 When COLUMNS or LINES is not defined, which is the common case,
1406 the terminal connected to sys.__stdout__ is queried
1407 by invoking os.get_terminal_size.
1408
1409 If the terminal size cannot be successfully queried, either because
1410 the system doesn't support querying, or because we are not
1411 connected to a terminal, the value given in fallback parameter
1412 is used. Fallback defaults to (80, 24) which is the default
1413 size used by many terminal emulators.
1414
1415 The value returned is a named tuple of type os.terminal_size.
1416 """
1417 # columns, lines are the working values
1418 try:
1419 columns = int(os.environ['COLUMNS'])
1420 except (KeyError, ValueError):
1421 columns = 0
1422
1423 try:
1424 lines = int(os.environ['LINES'])
1425 except (KeyError, ValueError):
1426 lines = 0
1427
1428 # only query if necessary
1429 if columns <= 0 or lines <= 0:
1430 try:
1431 size = os.get_terminal_size(sys.__stdout__.fileno())
1432 except (AttributeError, ValueError, OSError):
1433 # stdout is None, closed, detached, or not a terminal, or
1434 # os.get_terminal_size() is unsupported
1435 size = os.terminal_size(fallback)
1436 if columns <= 0:
1437 columns = size.columns or fallback[0]
1438 if lines <= 0:
1439 lines = size.lines or fallback[1]
1440
1441 return os.terminal_size((columns, lines))
1442
1443
1444 # Check that a given file can be accessed with the correct mode.
1445 # Additionally check that `file` is not a directory, as on Windows
1446 # directories pass the os.access check.
1447 def _access_check(fn, mode):
1448 return (os.path.exists(fn) and os.access(fn, mode)
1449 and not os.path.isdir(fn))
1450
1451
1452 def which(cmd, mode=os.F_OK | os.X_OK, path=None):
1453 """Given a command, mode, and a PATH string, return the path which
1454 conforms to the given mode on the PATH, or None if there is no such
1455 file.
1456
1457 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
1458 of os.environ.get("PATH"), or can be overridden with a custom search
1459 path.
1460
1461 """
1462 # If we're given a path with a directory part, look it up directly rather
1463 # than referring to PATH directories. This includes checking relative to the
1464 # current directory, e.g. ./script
1465 if os.path.dirname(cmd):
1466 if _access_check(cmd, mode):
1467 return cmd
1468 return None
1469
1470 use_bytes = isinstance(cmd, bytes)
1471
1472 if path is None:
1473 path = os.environ.get("PATH", None)
1474 if path is None:
1475 try:
1476 path = os.confstr("CS_PATH")
1477 except (AttributeError, ValueError):
1478 # os.confstr() or CS_PATH is not available
1479 path = os.defpath
1480 # bpo-35755: Don't use os.defpath if the PATH environment variable is
1481 # set to an empty string
1482
1483 # PATH='' doesn't match, whereas PATH=':' looks in the current directory
1484 if not path:
1485 return None
1486
1487 if use_bytes:
1488 path = os.fsencode(path)
1489 path = path.split(os.fsencode(os.pathsep))
1490 else:
1491 path = os.fsdecode(path)
1492 path = path.split(os.pathsep)
1493
1494 if sys.platform == "win32":
1495 # The current directory takes precedence on Windows.
1496 curdir = os.curdir
1497 if use_bytes:
1498 curdir = os.fsencode(curdir)
1499 if curdir not in path:
1500 path.insert(0, curdir)
1501
1502 # PATHEXT is necessary to check on Windows.
1503 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
1504 pathext = [ext for ext in pathext_source.split(os.pathsep) if ext]
1505
1506 if use_bytes:
1507 pathext = [os.fsencode(ext) for ext in pathext]
1508 # See if the given file matches any of the expected path extensions.
1509 # This will allow us to short circuit when given "python.exe".
1510 # If it does match, only test that one, otherwise we have to try
1511 # others.
1512 if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
1513 files = [cmd]
1514 else:
1515 files = [cmd + ext for ext in pathext]
1516 else:
1517 # On other platforms you don't have things like PATHEXT to tell you
1518 # what file suffixes are executable, so just pass on cmd as-is.
1519 files = [cmd]
1520
1521 seen = set()
1522 for dir in path:
1523 normdir = os.path.normcase(dir)
1524 if not normdir in seen:
1525 seen.add(normdir)
1526 for thefile in files:
1527 name = os.path.join(dir, thefile)
1528 if _access_check(name, mode):
1529 return name
1530 return None