1import fnmatch 2import glob 3import os 4import os.path 5import shutil 6import stat 7 8from .iterutil import iter_many 9 10 11USE_CWD = object() 12 13 14C_SOURCE_SUFFIXES = ('.c', '.h') 15 16 17def create_backup(old, backup=None): 18 if isinstance(old, str): 19 filename = old 20 else: 21 filename = getattr(old, 'name', None) 22 if not filename: 23 return None 24 if not backup or backup is True: 25 backup = f'{filename}.bak' 26 try: 27 shutil.copyfile(filename, backup) 28 except FileNotFoundError as exc: 29 if exc.filename != filename: 30 raise # re-raise 31 backup = None 32 return backup 33 34 35################################## 36# filenames 37 38def fix_filename(filename, relroot=USE_CWD, *, 39 fixroot=True, 40 _badprefix=f'..{os.path.sep}', 41 ): 42 """Return a normalized, absolute-path copy of the given filename.""" 43 if not relroot or relroot is USE_CWD: 44 return os.path.abspath(filename) 45 if fixroot: 46 relroot = os.path.abspath(relroot) 47 return _fix_filename(filename, relroot) 48 49 50def _fix_filename(filename, relroot, *, 51 _badprefix=f'..{os.path.sep}', 52 ): 53 orig = filename 54 55 # First we normalize. 56 filename = os.path.normpath(filename) 57 if filename.startswith(_badprefix): 58 raise ValueError(f'bad filename {orig!r} (resolves beyond relative root') 59 60 # Now make sure it is absolute (relative to relroot). 61 if not os.path.isabs(filename): 62 filename = os.path.join(relroot, filename) 63 else: 64 relpath = os.path.relpath(filename, relroot) 65 if os.path.join(relroot, relpath) != filename: 66 raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}') 67 68 return filename 69 70 71def fix_filenames(filenames, relroot=USE_CWD): 72 if not relroot or relroot is USE_CWD: 73 filenames = (os.path.abspath(v) for v in filenames) 74 else: 75 relroot = os.path.abspath(relroot) 76 filenames = (_fix_filename(v, relroot) for v in filenames) 77 return filenames, relroot 78 79 80def format_filename(filename, relroot=USE_CWD, *, 81 fixroot=True, 82 normalize=True, 83 _badprefix=f'..{os.path.sep}', 84 ): 85 """Return a consistent relative-path representation of the filename.""" 86 orig = filename 87 if normalize: 88 filename = os.path.normpath(filename) 89 if relroot is None: 90 # Otherwise leave it as-is. 91 return filename 92 elif relroot is USE_CWD: 93 # Make it relative to CWD. 94 filename = os.path.relpath(filename) 95 else: 96 # Make it relative to "relroot". 97 if fixroot: 98 relroot = os.path.abspath(relroot) 99 elif not relroot: 100 raise ValueError('missing relroot') 101 filename = os.path.relpath(filename, relroot) 102 if filename.startswith(_badprefix): 103 raise ValueError(f'bad filename {orig!r} (resolves beyond relative root') 104 return filename 105 106 107################################## 108# find files 109 110def match_glob(filename, pattern): 111 if fnmatch.fnmatch(filename, pattern): 112 return True 113 114 # fnmatch doesn't handle ** quite right. It will not match the 115 # following: 116 # 117 # ('x/spam.py', 'x/**/*.py') 118 # ('spam.py', '**/*.py') 119 # 120 # though it *will* match the following: 121 # 122 # ('x/y/spam.py', 'x/**/*.py') 123 # ('x/spam.py', '**/*.py') 124 125 if '**/' not in pattern: 126 return False 127 128 # We only accommodate the single-"**" case. 129 return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1)) 130 131 132def process_filenames(filenames, *, 133 start=None, 134 include=None, 135 exclude=None, 136 relroot=USE_CWD, 137 ): 138 if relroot and relroot is not USE_CWD: 139 relroot = os.path.abspath(relroot) 140 if start: 141 start = fix_filename(start, relroot, fixroot=False) 142 if include: 143 include = set(fix_filename(v, relroot, fixroot=False) 144 for v in include) 145 if exclude: 146 exclude = set(fix_filename(v, relroot, fixroot=False) 147 for v in exclude) 148 149 onempty = Exception('no filenames provided') 150 for filename, solo in iter_many(filenames, onempty): 151 filename = fix_filename(filename, relroot, fixroot=False) 152 relfile = format_filename(filename, relroot, fixroot=False, normalize=False) 153 check, start = _get_check(filename, start, include, exclude) 154 yield filename, relfile, check, solo 155 156 157def expand_filenames(filenames): 158 for filename in filenames: 159 # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)? 160 if '**/' in filename: 161 yield from glob.glob(filename.replace('**/', '')) 162 yield from glob.glob(filename) 163 164 165def _get_check(filename, start, include, exclude): 166 if start and filename != start: 167 return (lambda: '<skipped>'), start 168 else: 169 def check(): 170 if _is_excluded(filename, exclude, include): 171 return '<excluded>' 172 return None 173 return check, None 174 175 176def _is_excluded(filename, exclude, include): 177 if include: 178 for included in include: 179 if match_glob(filename, included): 180 return False 181 return True 182 elif exclude: 183 for excluded in exclude: 184 if match_glob(filename, excluded): 185 return True 186 return False 187 else: 188 return False 189 190 191def _walk_tree(root, *, 192 _walk=os.walk, 193 ): 194 # A wrapper around os.walk that resolves the filenames. 195 for parent, _, names in _walk(root): 196 for name in names: 197 yield os.path.join(parent, name) 198 199 200def walk_tree(root, *, 201 suffix=None, 202 walk=_walk_tree, 203 ): 204 """Yield each file in the tree under the given directory name. 205 206 If "suffix" is provided then only files with that suffix will 207 be included. 208 """ 209 if suffix and not isinstance(suffix, str): 210 raise ValueError('suffix must be a string') 211 212 for filename in walk(root): 213 if suffix and not filename.endswith(suffix): 214 continue 215 yield filename 216 217 218def glob_tree(root, *, 219 suffix=None, 220 _glob=glob.iglob, 221 ): 222 """Yield each file in the tree under the given directory name. 223 224 If "suffix" is provided then only files with that suffix will 225 be included. 226 """ 227 suffix = suffix or '' 228 if not isinstance(suffix, str): 229 raise ValueError('suffix must be a string') 230 231 for filename in _glob(f'{root}/*{suffix}'): 232 yield filename 233 for filename in _glob(f'{root}/**/*{suffix}'): 234 yield filename 235 236 237def iter_files(root, suffix=None, relparent=None, *, 238 get_files=os.walk, 239 _glob=glob_tree, 240 _walk=walk_tree, 241 ): 242 """Yield each file in the tree under the given directory name. 243 244 If "root" is a non-string iterable then do the same for each of 245 those trees. 246 247 If "suffix" is provided then only files with that suffix will 248 be included. 249 250 if "relparent" is provided then it is used to resolve each 251 filename as a relative path. 252 """ 253 if not isinstance(root, str): 254 roots = root 255 for root in roots: 256 yield from iter_files(root, suffix, relparent, 257 get_files=get_files, 258 _glob=_glob, _walk=_walk) 259 return 260 261 # Use the right "walk" function. 262 if get_files in (glob.glob, glob.iglob, glob_tree): 263 get_files = _glob 264 else: 265 _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files 266 get_files = (lambda *a, **k: _walk(*a, walk=_files, **k)) 267 268 # Handle a single suffix. 269 if suffix and not isinstance(suffix, str): 270 filenames = get_files(root) 271 suffix = tuple(suffix) 272 else: 273 filenames = get_files(root, suffix=suffix) 274 suffix = None 275 276 for filename in filenames: 277 if suffix and not isinstance(suffix, str): # multiple suffixes 278 if not filename.endswith(suffix): 279 continue 280 if relparent: 281 filename = os.path.relpath(filename, relparent) 282 yield filename 283 284 285def iter_files_by_suffix(root, suffixes, relparent=None, *, 286 walk=walk_tree, 287 _iter_files=iter_files, 288 ): 289 """Yield each file in the tree that has the given suffixes. 290 291 Unlike iter_files(), the results are in the original suffix order. 292 """ 293 if isinstance(suffixes, str): 294 suffixes = [suffixes] 295 # XXX Ignore repeated suffixes? 296 for suffix in suffixes: 297 yield from _iter_files(root, suffix, relparent) 298 299 300################################## 301# file info 302 303# XXX posix-only? 304 305S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH 306S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH 307S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH 308 309 310def is_readable(file, *, user=None, check=False): 311 filename, st, mode = _get_file_info(file) 312 if check: 313 try: 314 okay = _check_file(filename, S_IRANY) 315 except NotImplementedError: 316 okay = NotImplemented 317 if okay is not NotImplemented: 318 return okay 319 # Fall back to checking the mode. 320 return _check_mode(st, mode, S_IRANY, user) 321 322 323def is_writable(file, *, user=None, check=False): 324 filename, st, mode = _get_file_info(file) 325 if check: 326 try: 327 okay = _check_file(filename, S_IWANY) 328 except NotImplementedError: 329 okay = NotImplemented 330 if okay is not NotImplemented: 331 return okay 332 # Fall back to checking the mode. 333 return _check_mode(st, mode, S_IWANY, user) 334 335 336def is_executable(file, *, user=None, check=False): 337 filename, st, mode = _get_file_info(file) 338 if check: 339 try: 340 okay = _check_file(filename, S_IXANY) 341 except NotImplementedError: 342 okay = NotImplemented 343 if okay is not NotImplemented: 344 return okay 345 # Fall back to checking the mode. 346 return _check_mode(st, mode, S_IXANY, user) 347 348 349def _get_file_info(file): 350 filename = st = mode = None 351 if isinstance(file, int): 352 mode = file 353 elif isinstance(file, os.stat_result): 354 st = file 355 else: 356 if isinstance(file, str): 357 filename = file 358 elif hasattr(file, 'name') and os.path.exists(file.name): 359 filename = file.name 360 else: 361 raise NotImplementedError(file) 362 st = os.stat(filename) 363 return filename, st, mode or st.st_mode 364 365 366def _check_file(filename, check): 367 if not isinstance(filename, str): 368 raise Exception(f'filename required to check file, got {filename}') 369 if check & S_IRANY: 370 flags = os.O_RDONLY 371 elif check & S_IWANY: 372 flags = os.O_WRONLY 373 elif check & S_IXANY: 374 # We can worry about S_IXANY later 375 return NotImplemented 376 else: 377 raise NotImplementedError(check) 378 379 try: 380 fd = os.open(filename, flags) 381 except PermissionError: 382 return False 383 # We do not ignore other exceptions. 384 else: 385 os.close(fd) 386 return True 387 388 389def _get_user_info(user): 390 import pwd 391 username = uid = gid = groups = None 392 if user is None: 393 uid = os.geteuid() 394 #username = os.getlogin() 395 username = pwd.getpwuid(uid)[0] 396 gid = os.getgid() 397 groups = os.getgroups() 398 else: 399 if isinstance(user, int): 400 uid = user 401 entry = pwd.getpwuid(uid) 402 username = entry.pw_name 403 elif isinstance(user, str): 404 username = user 405 entry = pwd.getpwnam(username) 406 uid = entry.pw_uid 407 else: 408 raise NotImplementedError(user) 409 gid = entry.pw_gid 410 os.getgrouplist(username, gid) 411 return username, uid, gid, groups 412 413 414def _check_mode(st, mode, check, user): 415 orig = check 416 _, uid, gid, groups = _get_user_info(user) 417 if check & S_IRANY: 418 check -= S_IRANY 419 matched = False 420 if mode & stat.S_IRUSR: 421 if st.st_uid == uid: 422 matched = True 423 if mode & stat.S_IRGRP: 424 if st.st_uid == gid or st.st_uid in groups: 425 matched = True 426 if mode & stat.S_IROTH: 427 matched = True 428 if not matched: 429 return False 430 if check & S_IWANY: 431 check -= S_IWANY 432 matched = False 433 if mode & stat.S_IWUSR: 434 if st.st_uid == uid: 435 matched = True 436 if mode & stat.S_IWGRP: 437 if st.st_uid == gid or st.st_uid in groups: 438 matched = True 439 if mode & stat.S_IWOTH: 440 matched = True 441 if not matched: 442 return False 443 if check & S_IXANY: 444 check -= S_IXANY 445 matched = False 446 if mode & stat.S_IXUSR: 447 if st.st_uid == uid: 448 matched = True 449 if mode & stat.S_IXGRP: 450 if st.st_uid == gid or st.st_uid in groups: 451 matched = True 452 if mode & stat.S_IXOTH: 453 matched = True 454 if not matched: 455 return False 456 if check: 457 raise NotImplementedError((orig, check)) 458 return True 459