• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import fnmatch
2import glob
3import os
4import os.path
5import shutil
6import stat
7
8from .iterutil import iter_many
9
10
11USE_CWD = object()
12
13
14C_SOURCE_SUFFIXES = ('.c', '.h')
15
16
17def create_backup(old, backup=None):
18    if isinstance(old, str):
19        filename = old
20    else:
21        filename = getattr(old, 'name', None)
22    if not filename:
23        return None
24    if not backup or backup is True:
25        backup = f'{filename}.bak'
26    try:
27        shutil.copyfile(filename, backup)
28    except FileNotFoundError as exc:
29        if exc.filename != filename:
30            raise   # re-raise
31        backup = None
32    return backup
33
34
35##################################
36# filenames
37
38def fix_filename(filename, relroot=USE_CWD, *,
39                 fixroot=True,
40                 _badprefix=f'..{os.path.sep}',
41                 ):
42    """Return a normalized, absolute-path copy of the given filename."""
43    if not relroot or relroot is USE_CWD:
44        return os.path.abspath(filename)
45    if fixroot:
46        relroot = os.path.abspath(relroot)
47    return _fix_filename(filename, relroot)
48
49
50def _fix_filename(filename, relroot, *,
51                  _badprefix=f'..{os.path.sep}',
52                  ):
53    orig = filename
54
55    # First we normalize.
56    filename = os.path.normpath(filename)
57    if filename.startswith(_badprefix):
58        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
59
60    # Now make sure it is absolute (relative to relroot).
61    if not os.path.isabs(filename):
62        filename = os.path.join(relroot, filename)
63    else:
64        relpath = os.path.relpath(filename, relroot)
65        if os.path.join(relroot, relpath) != filename:
66            raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}')
67
68    return filename
69
70
71def fix_filenames(filenames, relroot=USE_CWD):
72    if not relroot or relroot is USE_CWD:
73        filenames = (os.path.abspath(v) for v in filenames)
74    else:
75        relroot = os.path.abspath(relroot)
76        filenames = (_fix_filename(v, relroot) for v in filenames)
77    return filenames, relroot
78
79
80def format_filename(filename, relroot=USE_CWD, *,
81                    fixroot=True,
82                    normalize=True,
83                    _badprefix=f'..{os.path.sep}',
84                    ):
85    """Return a consistent relative-path representation of the filename."""
86    orig = filename
87    if normalize:
88        filename = os.path.normpath(filename)
89    if relroot is None:
90        # Otherwise leave it as-is.
91        return filename
92    elif relroot is USE_CWD:
93        # Make it relative to CWD.
94        filename = os.path.relpath(filename)
95    else:
96        # Make it relative to "relroot".
97        if fixroot:
98            relroot = os.path.abspath(relroot)
99        elif not relroot:
100            raise ValueError('missing relroot')
101        filename = os.path.relpath(filename, relroot)
102    if filename.startswith(_badprefix):
103        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
104    return filename
105
106
107def match_path_tail(path1, path2):
108    """Return True if one path ends the other."""
109    if path1 == path2:
110        return True
111    if os.path.isabs(path1):
112        if os.path.isabs(path2):
113            return False
114        return _match_tail(path1, path2)
115    elif os.path.isabs(path2):
116        return _match_tail(path2, path1)
117    else:
118        return _match_tail(path1, path2) or _match_tail(path2, path1)
119
120
121def _match_tail(path, tail):
122    assert not os.path.isabs(tail), repr(tail)
123    return path.endswith(os.path.sep + tail)
124
125
126##################################
127# find files
128
129def match_glob(filename, pattern):
130    if fnmatch.fnmatch(filename, pattern):
131        return True
132
133    # fnmatch doesn't handle ** quite right.  It will not match the
134    # following:
135    #
136    #  ('x/spam.py', 'x/**/*.py')
137    #  ('spam.py', '**/*.py')
138    #
139    # though it *will* match the following:
140    #
141    #  ('x/y/spam.py', 'x/**/*.py')
142    #  ('x/spam.py', '**/*.py')
143
144    if '**/' not in pattern:
145        return False
146
147    # We only accommodate the single-"**" case.
148    return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))
149
150
151def process_filenames(filenames, *,
152                      start=None,
153                      include=None,
154                      exclude=None,
155                      relroot=USE_CWD,
156                      ):
157    if relroot and relroot is not USE_CWD:
158        relroot = os.path.abspath(relroot)
159    if start:
160        start = fix_filename(start, relroot, fixroot=False)
161    if include:
162        include = set(fix_filename(v, relroot, fixroot=False)
163                      for v in include)
164    if exclude:
165        exclude = set(fix_filename(v, relroot, fixroot=False)
166                      for v in exclude)
167
168    onempty = Exception('no filenames provided')
169    for filename, solo in iter_many(filenames, onempty):
170        filename = fix_filename(filename, relroot, fixroot=False)
171        relfile = format_filename(filename, relroot, fixroot=False, normalize=False)
172        check, start = _get_check(filename, start, include, exclude)
173        yield filename, relfile, check, solo
174
175
176def expand_filenames(filenames):
177    for filename in filenames:
178        # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
179        if '**/' in filename:
180            yield from glob.glob(filename.replace('**/', ''))
181        yield from glob.glob(filename)
182
183
184def _get_check(filename, start, include, exclude):
185    if start and filename != start:
186        return (lambda: '<skipped>'), start
187    else:
188        def check():
189            if _is_excluded(filename, exclude, include):
190                return '<excluded>'
191            return None
192        return check, None
193
194
195def _is_excluded(filename, exclude, include):
196    if include:
197        for included in include:
198            if match_glob(filename, included):
199                return False
200        return True
201    elif exclude:
202        for excluded in exclude:
203            if match_glob(filename, excluded):
204                return True
205        return False
206    else:
207        return False
208
209
210def _walk_tree(root, *,
211               _walk=os.walk,
212               ):
213    # A wrapper around os.walk that resolves the filenames.
214    for parent, _, names in _walk(root):
215        for name in names:
216            yield os.path.join(parent, name)
217
218
219def walk_tree(root, *,
220              suffix=None,
221              walk=_walk_tree,
222              ):
223    """Yield each file in the tree under the given directory name.
224
225    If "suffix" is provided then only files with that suffix will
226    be included.
227    """
228    if suffix and not isinstance(suffix, str):
229        raise ValueError('suffix must be a string')
230
231    for filename in walk(root):
232        if suffix and not filename.endswith(suffix):
233            continue
234        yield filename
235
236
237def glob_tree(root, *,
238              suffix=None,
239              _glob=glob.iglob,
240              ):
241    """Yield each file in the tree under the given directory name.
242
243    If "suffix" is provided then only files with that suffix will
244    be included.
245    """
246    suffix = suffix or ''
247    if not isinstance(suffix, str):
248        raise ValueError('suffix must be a string')
249
250    for filename in _glob(f'{root}/*{suffix}'):
251        yield filename
252    for filename in _glob(f'{root}/**/*{suffix}'):
253        yield filename
254
255
256def iter_files(root, suffix=None, relparent=None, *,
257               get_files=os.walk,
258               _glob=glob_tree,
259               _walk=walk_tree,
260               ):
261    """Yield each file in the tree under the given directory name.
262
263    If "root" is a non-string iterable then do the same for each of
264    those trees.
265
266    If "suffix" is provided then only files with that suffix will
267    be included.
268
269    if "relparent" is provided then it is used to resolve each
270    filename as a relative path.
271    """
272    if not isinstance(root, str):
273        roots = root
274        for root in roots:
275            yield from iter_files(root, suffix, relparent,
276                                  get_files=get_files,
277                                  _glob=_glob, _walk=_walk)
278        return
279
280    # Use the right "walk" function.
281    if get_files in (glob.glob, glob.iglob, glob_tree):
282        get_files = _glob
283    else:
284        _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
285        get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))
286
287    # Handle a single suffix.
288    if suffix and not isinstance(suffix, str):
289        filenames = get_files(root)
290        suffix = tuple(suffix)
291    else:
292        filenames = get_files(root, suffix=suffix)
293        suffix = None
294
295    for filename in filenames:
296        if suffix and not isinstance(suffix, str):  # multiple suffixes
297            if not filename.endswith(suffix):
298                continue
299        if relparent:
300            filename = os.path.relpath(filename, relparent)
301        yield filename
302
303
304def iter_files_by_suffix(root, suffixes, relparent=None, *,
305                         walk=walk_tree,
306                         _iter_files=iter_files,
307                         ):
308    """Yield each file in the tree that has the given suffixes.
309
310    Unlike iter_files(), the results are in the original suffix order.
311    """
312    if isinstance(suffixes, str):
313        suffixes = [suffixes]
314    # XXX Ignore repeated suffixes?
315    for suffix in suffixes:
316        yield from _iter_files(root, suffix, relparent)
317
318
319##################################
320# file info
321
322# XXX posix-only?
323
324S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
325S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
326S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
327
328
329def is_readable(file, *, user=None, check=False):
330    filename, st, mode = _get_file_info(file)
331    if check:
332        try:
333            okay = _check_file(filename, S_IRANY)
334        except NotImplementedError:
335            okay = NotImplemented
336        if okay is not NotImplemented:
337            return okay
338        # Fall back to checking the mode.
339    return _check_mode(st, mode, S_IRANY, user)
340
341
342def is_writable(file, *, user=None, check=False):
343    filename, st, mode = _get_file_info(file)
344    if check:
345        try:
346            okay = _check_file(filename, S_IWANY)
347        except NotImplementedError:
348            okay = NotImplemented
349        if okay is not NotImplemented:
350            return okay
351        # Fall back to checking the mode.
352    return _check_mode(st, mode, S_IWANY, user)
353
354
355def is_executable(file, *, user=None, check=False):
356    filename, st, mode = _get_file_info(file)
357    if check:
358        try:
359            okay = _check_file(filename, S_IXANY)
360        except NotImplementedError:
361            okay = NotImplemented
362        if okay is not NotImplemented:
363            return okay
364        # Fall back to checking the mode.
365    return _check_mode(st, mode, S_IXANY, user)
366
367
368def _get_file_info(file):
369    filename = st = mode = None
370    if isinstance(file, int):
371        mode = file
372    elif isinstance(file, os.stat_result):
373        st = file
374    else:
375        if isinstance(file, str):
376            filename = file
377        elif hasattr(file, 'name') and os.path.exists(file.name):
378            filename = file.name
379        else:
380            raise NotImplementedError(file)
381        st = os.stat(filename)
382    return filename, st, mode or st.st_mode
383
384
385def _check_file(filename, check):
386    if not isinstance(filename, str):
387        raise Exception(f'filename required to check file, got {filename}')
388    if check & S_IRANY:
389        flags = os.O_RDONLY
390    elif check & S_IWANY:
391        flags = os.O_WRONLY
392    elif check & S_IXANY:
393        # We can worry about S_IXANY later
394        return NotImplemented
395    else:
396        raise NotImplementedError(check)
397
398    try:
399        fd = os.open(filename, flags)
400    except PermissionError:
401        return False
402    # We do not ignore other exceptions.
403    else:
404        os.close(fd)
405        return True
406
407
408def _get_user_info(user):
409    import pwd
410    username = uid = gid = groups = None
411    if user is None:
412        uid = os.geteuid()
413        #username = os.getlogin()
414        username = pwd.getpwuid(uid)[0]
415        gid = os.getgid()
416        groups = os.getgroups()
417    else:
418        if isinstance(user, int):
419            uid = user
420            entry = pwd.getpwuid(uid)
421            username = entry.pw_name
422        elif isinstance(user, str):
423            username = user
424            entry = pwd.getpwnam(username)
425            uid = entry.pw_uid
426        else:
427            raise NotImplementedError(user)
428        gid = entry.pw_gid
429        os.getgrouplist(username, gid)
430    return username, uid, gid, groups
431
432
433def _check_mode(st, mode, check, user):
434    orig = check
435    _, uid, gid, groups = _get_user_info(user)
436    if check & S_IRANY:
437        check -= S_IRANY
438        matched = False
439        if mode & stat.S_IRUSR:
440            if st.st_uid == uid:
441                matched = True
442        if mode & stat.S_IRGRP:
443            if st.st_uid == gid or st.st_uid in groups:
444                matched = True
445        if mode & stat.S_IROTH:
446            matched = True
447        if not matched:
448            return False
449    if check & S_IWANY:
450        check -= S_IWANY
451        matched = False
452        if mode & stat.S_IWUSR:
453            if st.st_uid == uid:
454                matched = True
455        if mode & stat.S_IWGRP:
456            if st.st_uid == gid or st.st_uid in groups:
457                matched = True
458        if mode & stat.S_IWOTH:
459            matched = True
460        if not matched:
461            return False
462    if check & S_IXANY:
463        check -= S_IXANY
464        matched = False
465        if mode & stat.S_IXUSR:
466            if st.st_uid == uid:
467                matched = True
468        if mode & stat.S_IXGRP:
469            if st.st_uid == gid or st.st_uid in groups:
470                matched = True
471        if mode & stat.S_IXOTH:
472            matched = True
473        if not matched:
474            return False
475    if check:
476        raise NotImplementedError((orig, check))
477    return True
478