1 import fnmatch
2 import glob
3 import os
4 import os.path
5 import shutil
6 import stat
7
8 from .iterutil import iter_many
9
10
11 USE_CWD = object()
12
13
14 C_SOURCE_SUFFIXES = ('.c', '.h')
15
16
17 def create_backup(old, backup=None):
18 if isinstance(old, str):
19 filename = old
20 else:
21 filename = getattr(old, 'name', None)
22 if not filename:
23 return None
24 if not backup or backup is True:
25 backup = f'{filename}.bak'
26 try:
27 shutil.copyfile(filename, backup)
28 except FileNotFoundError as exc:
29 if exc.filename != filename:
30 raise # re-raise
31 backup = None
32 return backup
33
34
35 ##################################
36 # filenames
37
38 def fix_filename(filename, relroot=USE_CWD, *,
39 fixroot=True,
40 _badprefix=f'..{os.path.sep}',
41 ):
42 """Return a normalized, absolute-path copy of the given filename."""
43 if not relroot or relroot is USE_CWD:
44 return os.path.abspath(filename)
45 if fixroot:
46 relroot = os.path.abspath(relroot)
47 return _fix_filename(filename, relroot)
48
49
50 def _fix_filename(filename, relroot, *,
51 _badprefix=f'..{os.path.sep}',
52 ):
53 orig = filename
54
55 # First we normalize.
56 filename = os.path.normpath(filename)
57 if filename.startswith(_badprefix):
58 raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
59
60 # Now make sure it is absolute (relative to relroot).
61 if not os.path.isabs(filename):
62 filename = os.path.join(relroot, filename)
63 else:
64 relpath = os.path.relpath(filename, relroot)
65 if os.path.join(relroot, relpath) != filename:
66 raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}')
67
68 return filename
69
70
71 def fix_filenames(filenames, relroot=USE_CWD):
72 if not relroot or relroot is USE_CWD:
73 filenames = (os.path.abspath(v) for v in filenames)
74 else:
75 relroot = os.path.abspath(relroot)
76 filenames = (_fix_filename(v, relroot) for v in filenames)
77 return filenames, relroot
78
79
80 def format_filename(filename, relroot=USE_CWD, *,
81 fixroot=True,
82 normalize=True,
83 _badprefix=f'..{os.path.sep}',
84 ):
85 """Return a consistent relative-path representation of the filename."""
86 orig = filename
87 if normalize:
88 filename = os.path.normpath(filename)
89 if relroot is None:
90 # Otherwise leave it as-is.
91 return filename
92 elif relroot is USE_CWD:
93 # Make it relative to CWD.
94 filename = os.path.relpath(filename)
95 else:
96 # Make it relative to "relroot".
97 if fixroot:
98 relroot = os.path.abspath(relroot)
99 elif not relroot:
100 raise ValueError('missing relroot')
101 filename = os.path.relpath(filename, relroot)
102 if filename.startswith(_badprefix):
103 raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
104 return filename
105
106
107 def match_path_tail(path1, path2):
108 """Return True if one path ends the other."""
109 if path1 == path2:
110 return True
111 if os.path.isabs(path1):
112 if os.path.isabs(path2):
113 return False
114 return _match_tail(path1, path2)
115 elif os.path.isabs(path2):
116 return _match_tail(path2, path1)
117 else:
118 return _match_tail(path1, path2) or _match_tail(path2, path1)
119
120
121 def _match_tail(path, tail):
122 assert not os.path.isabs(tail), repr(tail)
123 return path.endswith(os.path.sep + tail)
124
125
126 ##################################
127 # find files
128
129 def match_glob(filename, pattern):
130 if fnmatch.fnmatch(filename, pattern):
131 return True
132
133 # fnmatch doesn't handle ** quite right. It will not match the
134 # following:
135 #
136 # ('x/spam.py', 'x/**/*.py')
137 # ('spam.py', '**/*.py')
138 #
139 # though it *will* match the following:
140 #
141 # ('x/y/spam.py', 'x/**/*.py')
142 # ('x/spam.py', '**/*.py')
143
144 if '**/' not in pattern:
145 return False
146
147 # We only accommodate the single-"**" case.
148 return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))
149
150
151 def process_filenames(filenames, *,
152 start=None,
153 include=None,
154 exclude=None,
155 relroot=USE_CWD,
156 ):
157 if relroot and relroot is not USE_CWD:
158 relroot = os.path.abspath(relroot)
159 if start:
160 start = fix_filename(start, relroot, fixroot=False)
161 if include:
162 include = set(fix_filename(v, relroot, fixroot=False)
163 for v in include)
164 if exclude:
165 exclude = set(fix_filename(v, relroot, fixroot=False)
166 for v in exclude)
167
168 onempty = Exception('no filenames provided')
169 for filename, solo in iter_many(filenames, onempty):
170 filename = fix_filename(filename, relroot, fixroot=False)
171 relfile = format_filename(filename, relroot, fixroot=False, normalize=False)
172 check, start = _get_check(filename, start, include, exclude)
173 yield filename, relfile, check, solo
174
175
176 def expand_filenames(filenames):
177 for filename in filenames:
178 # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
179 if '**/' in filename:
180 yield from glob.glob(filename.replace('**/', ''))
181 yield from glob.glob(filename)
182
183
184 def _get_check(filename, start, include, exclude):
185 if start and filename != start:
186 return (lambda: '<skipped>'), start
187 else:
188 def check():
189 if _is_excluded(filename, exclude, include):
190 return '<excluded>'
191 return None
192 return check, None
193
194
195 def _is_excluded(filename, exclude, include):
196 if include:
197 for included in include:
198 if match_glob(filename, included):
199 return False
200 return True
201 elif exclude:
202 for excluded in exclude:
203 if match_glob(filename, excluded):
204 return True
205 return False
206 else:
207 return False
208
209
210 def _walk_tree(root, *,
211 _walk=os.walk,
212 ):
213 # A wrapper around os.walk that resolves the filenames.
214 for parent, _, names in _walk(root):
215 for name in names:
216 yield os.path.join(parent, name)
217
218
219 def walk_tree(root, *,
220 suffix=None,
221 walk=_walk_tree,
222 ):
223 """Yield each file in the tree under the given directory name.
224
225 If "suffix" is provided then only files with that suffix will
226 be included.
227 """
228 if suffix and not isinstance(suffix, str):
229 raise ValueError('suffix must be a string')
230
231 for filename in walk(root):
232 if suffix and not filename.endswith(suffix):
233 continue
234 yield filename
235
236
237 def glob_tree(root, *,
238 suffix=None,
239 _glob=glob.iglob,
240 ):
241 """Yield each file in the tree under the given directory name.
242
243 If "suffix" is provided then only files with that suffix will
244 be included.
245 """
246 suffix = suffix or ''
247 if not isinstance(suffix, str):
248 raise ValueError('suffix must be a string')
249
250 for filename in _glob(f'{root}/*{suffix}'):
251 yield filename
252 for filename in _glob(f'{root}/**/*{suffix}'):
253 yield filename
254
255
256 def iter_files(root, suffix=None, relparent=None, *,
257 get_files=os.walk,
258 _glob=glob_tree,
259 _walk=walk_tree,
260 ):
261 """Yield each file in the tree under the given directory name.
262
263 If "root" is a non-string iterable then do the same for each of
264 those trees.
265
266 If "suffix" is provided then only files with that suffix will
267 be included.
268
269 if "relparent" is provided then it is used to resolve each
270 filename as a relative path.
271 """
272 if not isinstance(root, str):
273 roots = root
274 for root in roots:
275 yield from iter_files(root, suffix, relparent,
276 get_files=get_files,
277 _glob=_glob, _walk=_walk)
278 return
279
280 # Use the right "walk" function.
281 if get_files in (glob.glob, glob.iglob, glob_tree):
282 get_files = _glob
283 else:
284 _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
285 get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))
286
287 # Handle a single suffix.
288 if suffix and not isinstance(suffix, str):
289 filenames = get_files(root)
290 suffix = tuple(suffix)
291 else:
292 filenames = get_files(root, suffix=suffix)
293 suffix = None
294
295 for filename in filenames:
296 if suffix and not isinstance(suffix, str): # multiple suffixes
297 if not filename.endswith(suffix):
298 continue
299 if relparent:
300 filename = os.path.relpath(filename, relparent)
301 yield filename
302
303
304 def iter_files_by_suffix(root, suffixes, relparent=None, *,
305 walk=walk_tree,
306 _iter_files=iter_files,
307 ):
308 """Yield each file in the tree that has the given suffixes.
309
310 Unlike iter_files(), the results are in the original suffix order.
311 """
312 if isinstance(suffixes, str):
313 suffixes = [suffixes]
314 # XXX Ignore repeated suffixes?
315 for suffix in suffixes:
316 yield from _iter_files(root, suffix, relparent)
317
318
319 ##################################
320 # file info
321
322 # XXX posix-only?
323
324 S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
325 S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
326 S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
327
328
329 def is_readable(file, *, user=None, check=False):
330 filename, st, mode = _get_file_info(file)
331 if check:
332 try:
333 okay = _check_file(filename, S_IRANY)
334 except NotImplementedError:
335 okay = NotImplemented
336 if okay is not NotImplemented:
337 return okay
338 # Fall back to checking the mode.
339 return _check_mode(st, mode, S_IRANY, user)
340
341
342 def is_writable(file, *, user=None, check=False):
343 filename, st, mode = _get_file_info(file)
344 if check:
345 try:
346 okay = _check_file(filename, S_IWANY)
347 except NotImplementedError:
348 okay = NotImplemented
349 if okay is not NotImplemented:
350 return okay
351 # Fall back to checking the mode.
352 return _check_mode(st, mode, S_IWANY, user)
353
354
355 def is_executable(file, *, user=None, check=False):
356 filename, st, mode = _get_file_info(file)
357 if check:
358 try:
359 okay = _check_file(filename, S_IXANY)
360 except NotImplementedError:
361 okay = NotImplemented
362 if okay is not NotImplemented:
363 return okay
364 # Fall back to checking the mode.
365 return _check_mode(st, mode, S_IXANY, user)
366
367
368 def _get_file_info(file):
369 filename = st = mode = None
370 if isinstance(file, int):
371 mode = file
372 elif isinstance(file, os.stat_result):
373 st = file
374 else:
375 if isinstance(file, str):
376 filename = file
377 elif hasattr(file, 'name') and os.path.exists(file.name):
378 filename = file.name
379 else:
380 raise NotImplementedError(file)
381 st = os.stat(filename)
382 return filename, st, mode or st.st_mode
383
384
385 def _check_file(filename, check):
386 if not isinstance(filename, str):
387 raise Exception(f'filename required to check file, got {filename}')
388 if check & S_IRANY:
389 flags = os.O_RDONLY
390 elif check & S_IWANY:
391 flags = os.O_WRONLY
392 elif check & S_IXANY:
393 # We can worry about S_IXANY later
394 return NotImplemented
395 else:
396 raise NotImplementedError(check)
397
398 try:
399 fd = os.open(filename, flags)
400 except PermissionError:
401 return False
402 # We do not ignore other exceptions.
403 else:
404 os.close(fd)
405 return True
406
407
408 def _get_user_info(user):
409 import pwd
410 username = uid = gid = groups = None
411 if user is None:
412 uid = os.geteuid()
413 #username = os.getlogin()
414 username = pwd.getpwuid(uid)[0]
415 gid = os.getgid()
416 groups = os.getgroups()
417 else:
418 if isinstance(user, int):
419 uid = user
420 entry = pwd.getpwuid(uid)
421 username = entry.pw_name
422 elif isinstance(user, str):
423 username = user
424 entry = pwd.getpwnam(username)
425 uid = entry.pw_uid
426 else:
427 raise NotImplementedError(user)
428 gid = entry.pw_gid
429 os.getgrouplist(username, gid)
430 return username, uid, gid, groups
431
432
433 def _check_mode(st, mode, check, user):
434 orig = check
435 _, uid, gid, groups = _get_user_info(user)
436 if check & S_IRANY:
437 check -= S_IRANY
438 matched = False
439 if mode & stat.S_IRUSR:
440 if st.st_uid == uid:
441 matched = True
442 if mode & stat.S_IRGRP:
443 if st.st_uid == gid or st.st_uid in groups:
444 matched = True
445 if mode & stat.S_IROTH:
446 matched = True
447 if not matched:
448 return False
449 if check & S_IWANY:
450 check -= S_IWANY
451 matched = False
452 if mode & stat.S_IWUSR:
453 if st.st_uid == uid:
454 matched = True
455 if mode & stat.S_IWGRP:
456 if st.st_uid == gid or st.st_uid in groups:
457 matched = True
458 if mode & stat.S_IWOTH:
459 matched = True
460 if not matched:
461 return False
462 if check & S_IXANY:
463 check -= S_IXANY
464 matched = False
465 if mode & stat.S_IXUSR:
466 if st.st_uid == uid:
467 matched = True
468 if mode & stat.S_IXGRP:
469 if st.st_uid == gid or st.st_uid in groups:
470 matched = True
471 if mode & stat.S_IXOTH:
472 matched = True
473 if not matched:
474 return False
475 if check:
476 raise NotImplementedError((orig, check))
477 return True