• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import fnmatch
2import glob
3import os
4import os.path
5import shutil
6import stat
7
8from .iterutil import iter_many
9
10
11USE_CWD = object()
12
13
14C_SOURCE_SUFFIXES = ('.c', '.h')
15
16
17def create_backup(old, backup=None):
18    if isinstance(old, str):
19        filename = old
20    else:
21        filename = getattr(old, 'name', None)
22    if not filename:
23        return None
24    if not backup or backup is True:
25        backup = f'{filename}.bak'
26    try:
27        shutil.copyfile(filename, backup)
28    except FileNotFoundError as exc:
29        if exc.filename != filename:
30            raise   # re-raise
31        backup = None
32    return backup
33
34
35##################################
36# filenames
37
38def fix_filename(filename, relroot=USE_CWD, *,
39                 fixroot=True,
40                 _badprefix=f'..{os.path.sep}',
41                 ):
42    """Return a normalized, absolute-path copy of the given filename."""
43    if not relroot or relroot is USE_CWD:
44        return os.path.abspath(filename)
45    if fixroot:
46        relroot = os.path.abspath(relroot)
47    return _fix_filename(filename, relroot)
48
49
50def _fix_filename(filename, relroot, *,
51                  _badprefix=f'..{os.path.sep}',
52                  ):
53    orig = filename
54
55    # First we normalize.
56    filename = os.path.normpath(filename)
57    if filename.startswith(_badprefix):
58        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
59
60    # Now make sure it is absolute (relative to relroot).
61    if not os.path.isabs(filename):
62        filename = os.path.join(relroot, filename)
63    else:
64        relpath = os.path.relpath(filename, relroot)
65        if os.path.join(relroot, relpath) != filename:
66            raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}')
67
68    return filename
69
70
71def fix_filenames(filenames, relroot=USE_CWD):
72    if not relroot or relroot is USE_CWD:
73        filenames = (os.path.abspath(v) for v in filenames)
74    else:
75        relroot = os.path.abspath(relroot)
76        filenames = (_fix_filename(v, relroot) for v in filenames)
77    return filenames, relroot
78
79
80def format_filename(filename, relroot=USE_CWD, *,
81                    fixroot=True,
82                    normalize=True,
83                    _badprefix=f'..{os.path.sep}',
84                    ):
85    """Return a consistent relative-path representation of the filename."""
86    orig = filename
87    if normalize:
88        filename = os.path.normpath(filename)
89    if relroot is None:
90        # Otherwise leave it as-is.
91        return filename
92    elif relroot is USE_CWD:
93        # Make it relative to CWD.
94        filename = os.path.relpath(filename)
95    else:
96        # Make it relative to "relroot".
97        if fixroot:
98            relroot = os.path.abspath(relroot)
99        elif not relroot:
100            raise ValueError('missing relroot')
101        filename = os.path.relpath(filename, relroot)
102    if filename.startswith(_badprefix):
103        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
104    return filename
105
106
107##################################
108# find files
109
110def match_glob(filename, pattern):
111    if fnmatch.fnmatch(filename, pattern):
112        return True
113
114    # fnmatch doesn't handle ** quite right.  It will not match the
115    # following:
116    #
117    #  ('x/spam.py', 'x/**/*.py')
118    #  ('spam.py', '**/*.py')
119    #
120    # though it *will* match the following:
121    #
122    #  ('x/y/spam.py', 'x/**/*.py')
123    #  ('x/spam.py', '**/*.py')
124
125    if '**/' not in pattern:
126        return False
127
128    # We only accommodate the single-"**" case.
129    return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))
130
131
132def process_filenames(filenames, *,
133                      start=None,
134                      include=None,
135                      exclude=None,
136                      relroot=USE_CWD,
137                      ):
138    if relroot and relroot is not USE_CWD:
139        relroot = os.path.abspath(relroot)
140    if start:
141        start = fix_filename(start, relroot, fixroot=False)
142    if include:
143        include = set(fix_filename(v, relroot, fixroot=False)
144                      for v in include)
145    if exclude:
146        exclude = set(fix_filename(v, relroot, fixroot=False)
147                      for v in exclude)
148
149    onempty = Exception('no filenames provided')
150    for filename, solo in iter_many(filenames, onempty):
151        filename = fix_filename(filename, relroot, fixroot=False)
152        relfile = format_filename(filename, relroot, fixroot=False, normalize=False)
153        check, start = _get_check(filename, start, include, exclude)
154        yield filename, relfile, check, solo
155
156
157def expand_filenames(filenames):
158    for filename in filenames:
159        # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
160        if '**/' in filename:
161            yield from glob.glob(filename.replace('**/', ''))
162        yield from glob.glob(filename)
163
164
165def _get_check(filename, start, include, exclude):
166    if start and filename != start:
167        return (lambda: '<skipped>'), start
168    else:
169        def check():
170            if _is_excluded(filename, exclude, include):
171                return '<excluded>'
172            return None
173        return check, None
174
175
176def _is_excluded(filename, exclude, include):
177    if include:
178        for included in include:
179            if match_glob(filename, included):
180                return False
181        return True
182    elif exclude:
183        for excluded in exclude:
184            if match_glob(filename, excluded):
185                return True
186        return False
187    else:
188        return False
189
190
191def _walk_tree(root, *,
192               _walk=os.walk,
193               ):
194    # A wrapper around os.walk that resolves the filenames.
195    for parent, _, names in _walk(root):
196        for name in names:
197            yield os.path.join(parent, name)
198
199
200def walk_tree(root, *,
201              suffix=None,
202              walk=_walk_tree,
203              ):
204    """Yield each file in the tree under the given directory name.
205
206    If "suffix" is provided then only files with that suffix will
207    be included.
208    """
209    if suffix and not isinstance(suffix, str):
210        raise ValueError('suffix must be a string')
211
212    for filename in walk(root):
213        if suffix and not filename.endswith(suffix):
214            continue
215        yield filename
216
217
218def glob_tree(root, *,
219              suffix=None,
220              _glob=glob.iglob,
221              ):
222    """Yield each file in the tree under the given directory name.
223
224    If "suffix" is provided then only files with that suffix will
225    be included.
226    """
227    suffix = suffix or ''
228    if not isinstance(suffix, str):
229        raise ValueError('suffix must be a string')
230
231    for filename in _glob(f'{root}/*{suffix}'):
232        yield filename
233    for filename in _glob(f'{root}/**/*{suffix}'):
234        yield filename
235
236
237def iter_files(root, suffix=None, relparent=None, *,
238               get_files=os.walk,
239               _glob=glob_tree,
240               _walk=walk_tree,
241               ):
242    """Yield each file in the tree under the given directory name.
243
244    If "root" is a non-string iterable then do the same for each of
245    those trees.
246
247    If "suffix" is provided then only files with that suffix will
248    be included.
249
250    if "relparent" is provided then it is used to resolve each
251    filename as a relative path.
252    """
253    if not isinstance(root, str):
254        roots = root
255        for root in roots:
256            yield from iter_files(root, suffix, relparent,
257                                  get_files=get_files,
258                                  _glob=_glob, _walk=_walk)
259        return
260
261    # Use the right "walk" function.
262    if get_files in (glob.glob, glob.iglob, glob_tree):
263        get_files = _glob
264    else:
265        _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
266        get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))
267
268    # Handle a single suffix.
269    if suffix and not isinstance(suffix, str):
270        filenames = get_files(root)
271        suffix = tuple(suffix)
272    else:
273        filenames = get_files(root, suffix=suffix)
274        suffix = None
275
276    for filename in filenames:
277        if suffix and not isinstance(suffix, str):  # multiple suffixes
278            if not filename.endswith(suffix):
279                continue
280        if relparent:
281            filename = os.path.relpath(filename, relparent)
282        yield filename
283
284
285def iter_files_by_suffix(root, suffixes, relparent=None, *,
286                         walk=walk_tree,
287                         _iter_files=iter_files,
288                         ):
289    """Yield each file in the tree that has the given suffixes.
290
291    Unlike iter_files(), the results are in the original suffix order.
292    """
293    if isinstance(suffixes, str):
294        suffixes = [suffixes]
295    # XXX Ignore repeated suffixes?
296    for suffix in suffixes:
297        yield from _iter_files(root, suffix, relparent)
298
299
300##################################
301# file info
302
303# XXX posix-only?
304
305S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
306S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
307S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
308
309
310def is_readable(file, *, user=None, check=False):
311    filename, st, mode = _get_file_info(file)
312    if check:
313        try:
314            okay = _check_file(filename, S_IRANY)
315        except NotImplementedError:
316            okay = NotImplemented
317        if okay is not NotImplemented:
318            return okay
319        # Fall back to checking the mode.
320    return _check_mode(st, mode, S_IRANY, user)
321
322
323def is_writable(file, *, user=None, check=False):
324    filename, st, mode = _get_file_info(file)
325    if check:
326        try:
327            okay = _check_file(filename, S_IWANY)
328        except NotImplementedError:
329            okay = NotImplemented
330        if okay is not NotImplemented:
331            return okay
332        # Fall back to checking the mode.
333    return _check_mode(st, mode, S_IWANY, user)
334
335
336def is_executable(file, *, user=None, check=False):
337    filename, st, mode = _get_file_info(file)
338    if check:
339        try:
340            okay = _check_file(filename, S_IXANY)
341        except NotImplementedError:
342            okay = NotImplemented
343        if okay is not NotImplemented:
344            return okay
345        # Fall back to checking the mode.
346    return _check_mode(st, mode, S_IXANY, user)
347
348
349def _get_file_info(file):
350    filename = st = mode = None
351    if isinstance(file, int):
352        mode = file
353    elif isinstance(file, os.stat_result):
354        st = file
355    else:
356        if isinstance(file, str):
357            filename = file
358        elif hasattr(file, 'name') and os.path.exists(file.name):
359            filename = file.name
360        else:
361            raise NotImplementedError(file)
362        st = os.stat(filename)
363    return filename, st, mode or st.st_mode
364
365
366def _check_file(filename, check):
367    if not isinstance(filename, str):
368        raise Exception(f'filename required to check file, got {filename}')
369    if check & S_IRANY:
370        flags = os.O_RDONLY
371    elif check & S_IWANY:
372        flags = os.O_WRONLY
373    elif check & S_IXANY:
374        # We can worry about S_IXANY later
375        return NotImplemented
376    else:
377        raise NotImplementedError(check)
378
379    try:
380        fd = os.open(filename, flags)
381    except PermissionError:
382        return False
383    # We do not ignore other exceptions.
384    else:
385        os.close(fd)
386        return True
387
388
389def _get_user_info(user):
390    import pwd
391    username = uid = gid = groups = None
392    if user is None:
393        uid = os.geteuid()
394        #username = os.getlogin()
395        username = pwd.getpwuid(uid)[0]
396        gid = os.getgid()
397        groups = os.getgroups()
398    else:
399        if isinstance(user, int):
400            uid = user
401            entry = pwd.getpwuid(uid)
402            username = entry.pw_name
403        elif isinstance(user, str):
404            username = user
405            entry = pwd.getpwnam(username)
406            uid = entry.pw_uid
407        else:
408            raise NotImplementedError(user)
409        gid = entry.pw_gid
410        os.getgrouplist(username, gid)
411    return username, uid, gid, groups
412
413
414def _check_mode(st, mode, check, user):
415    orig = check
416    _, uid, gid, groups = _get_user_info(user)
417    if check & S_IRANY:
418        check -= S_IRANY
419        matched = False
420        if mode & stat.S_IRUSR:
421            if st.st_uid == uid:
422                matched = True
423        if mode & stat.S_IRGRP:
424            if st.st_uid == gid or st.st_uid in groups:
425                matched = True
426        if mode & stat.S_IROTH:
427            matched = True
428        if not matched:
429            return False
430    if check & S_IWANY:
431        check -= S_IWANY
432        matched = False
433        if mode & stat.S_IWUSR:
434            if st.st_uid == uid:
435                matched = True
436        if mode & stat.S_IWGRP:
437            if st.st_uid == gid or st.st_uid in groups:
438                matched = True
439        if mode & stat.S_IWOTH:
440            matched = True
441        if not matched:
442            return False
443    if check & S_IXANY:
444        check -= S_IXANY
445        matched = False
446        if mode & stat.S_IXUSR:
447            if st.st_uid == uid:
448                matched = True
449        if mode & stat.S_IXGRP:
450            if st.st_uid == gid or st.st_uid in groups:
451                matched = True
452        if mode & stat.S_IXOTH:
453            matched = True
454        if not matched:
455            return False
456    if check:
457        raise NotImplementedError((orig, check))
458    return True
459