1import fnmatch 2import glob 3import os 4import os.path 5import shutil 6import stat 7 8from .iterutil import iter_many 9 10 11USE_CWD = object() 12 13 14C_SOURCE_SUFFIXES = ('.c', '.h') 15 16 17def create_backup(old, backup=None): 18 if isinstance(old, str): 19 filename = old 20 else: 21 filename = getattr(old, 'name', None) 22 if not filename: 23 return None 24 if not backup or backup is True: 25 backup = f'{filename}.bak' 26 try: 27 shutil.copyfile(filename, backup) 28 except FileNotFoundError as exc: 29 if exc.filename != filename: 30 raise # re-raise 31 backup = None 32 return backup 33 34 35################################## 36# filenames 37 38def fix_filename(filename, relroot=USE_CWD, *, 39 fixroot=True, 40 _badprefix=f'..{os.path.sep}', 41 ): 42 """Return a normalized, absolute-path copy of the given filename.""" 43 if not relroot or relroot is USE_CWD: 44 return os.path.abspath(filename) 45 if fixroot: 46 relroot = os.path.abspath(relroot) 47 return _fix_filename(filename, relroot) 48 49 50def _fix_filename(filename, relroot, *, 51 _badprefix=f'..{os.path.sep}', 52 ): 53 orig = filename 54 55 # First we normalize. 56 filename = os.path.normpath(filename) 57 if filename.startswith(_badprefix): 58 raise ValueError(f'bad filename {orig!r} (resolves beyond relative root') 59 60 # Now make sure it is absolute (relative to relroot). 61 if not os.path.isabs(filename): 62 filename = os.path.join(relroot, filename) 63 else: 64 relpath = os.path.relpath(filename, relroot) 65 if os.path.join(relroot, relpath) != filename: 66 raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}') 67 68 return filename 69 70 71def fix_filenames(filenames, relroot=USE_CWD): 72 if not relroot or relroot is USE_CWD: 73 filenames = (os.path.abspath(v) for v in filenames) 74 else: 75 relroot = os.path.abspath(relroot) 76 filenames = (_fix_filename(v, relroot) for v in filenames) 77 return filenames, relroot 78 79 80def format_filename(filename, relroot=USE_CWD, *, 81 fixroot=True, 82 normalize=True, 83 _badprefix=f'..{os.path.sep}', 84 ): 85 """Return a consistent relative-path representation of the filename.""" 86 orig = filename 87 if normalize: 88 filename = os.path.normpath(filename) 89 if relroot is None: 90 # Otherwise leave it as-is. 91 return filename 92 elif relroot is USE_CWD: 93 # Make it relative to CWD. 94 filename = os.path.relpath(filename) 95 else: 96 # Make it relative to "relroot". 97 if fixroot: 98 relroot = os.path.abspath(relroot) 99 elif not relroot: 100 raise ValueError('missing relroot') 101 filename = os.path.relpath(filename, relroot) 102 if filename.startswith(_badprefix): 103 raise ValueError(f'bad filename {orig!r} (resolves beyond relative root') 104 return filename 105 106 107def match_path_tail(path1, path2): 108 """Return True if one path ends the other.""" 109 if path1 == path2: 110 return True 111 if os.path.isabs(path1): 112 if os.path.isabs(path2): 113 return False 114 return _match_tail(path1, path2) 115 elif os.path.isabs(path2): 116 return _match_tail(path2, path1) 117 else: 118 return _match_tail(path1, path2) or _match_tail(path2, path1) 119 120 121def _match_tail(path, tail): 122 assert not os.path.isabs(tail), repr(tail) 123 return path.endswith(os.path.sep + tail) 124 125 126################################## 127# find files 128 129def match_glob(filename, pattern): 130 if fnmatch.fnmatch(filename, pattern): 131 return True 132 133 # fnmatch doesn't handle ** quite right. It will not match the 134 # following: 135 # 136 # ('x/spam.py', 'x/**/*.py') 137 # ('spam.py', '**/*.py') 138 # 139 # though it *will* match the following: 140 # 141 # ('x/y/spam.py', 'x/**/*.py') 142 # ('x/spam.py', '**/*.py') 143 144 if '**/' not in pattern: 145 return False 146 147 # We only accommodate the single-"**" case. 148 return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1)) 149 150 151def process_filenames(filenames, *, 152 start=None, 153 include=None, 154 exclude=None, 155 relroot=USE_CWD, 156 ): 157 if relroot and relroot is not USE_CWD: 158 relroot = os.path.abspath(relroot) 159 if start: 160 start = fix_filename(start, relroot, fixroot=False) 161 if include: 162 include = set(fix_filename(v, relroot, fixroot=False) 163 for v in include) 164 if exclude: 165 exclude = set(fix_filename(v, relroot, fixroot=False) 166 for v in exclude) 167 168 onempty = Exception('no filenames provided') 169 for filename, solo in iter_many(filenames, onempty): 170 filename = fix_filename(filename, relroot, fixroot=False) 171 relfile = format_filename(filename, relroot, fixroot=False, normalize=False) 172 check, start = _get_check(filename, start, include, exclude) 173 yield filename, relfile, check, solo 174 175 176def expand_filenames(filenames): 177 for filename in filenames: 178 # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)? 179 if '**/' in filename: 180 yield from glob.glob(filename.replace('**/', '')) 181 yield from glob.glob(filename) 182 183 184def _get_check(filename, start, include, exclude): 185 if start and filename != start: 186 return (lambda: '<skipped>'), start 187 else: 188 def check(): 189 if _is_excluded(filename, exclude, include): 190 return '<excluded>' 191 return None 192 return check, None 193 194 195def _is_excluded(filename, exclude, include): 196 if include: 197 for included in include: 198 if match_glob(filename, included): 199 return False 200 return True 201 elif exclude: 202 for excluded in exclude: 203 if match_glob(filename, excluded): 204 return True 205 return False 206 else: 207 return False 208 209 210def _walk_tree(root, *, 211 _walk=os.walk, 212 ): 213 # A wrapper around os.walk that resolves the filenames. 214 for parent, _, names in _walk(root): 215 for name in names: 216 yield os.path.join(parent, name) 217 218 219def walk_tree(root, *, 220 suffix=None, 221 walk=_walk_tree, 222 ): 223 """Yield each file in the tree under the given directory name. 224 225 If "suffix" is provided then only files with that suffix will 226 be included. 227 """ 228 if suffix and not isinstance(suffix, str): 229 raise ValueError('suffix must be a string') 230 231 for filename in walk(root): 232 if suffix and not filename.endswith(suffix): 233 continue 234 yield filename 235 236 237def glob_tree(root, *, 238 suffix=None, 239 _glob=glob.iglob, 240 ): 241 """Yield each file in the tree under the given directory name. 242 243 If "suffix" is provided then only files with that suffix will 244 be included. 245 """ 246 suffix = suffix or '' 247 if not isinstance(suffix, str): 248 raise ValueError('suffix must be a string') 249 250 for filename in _glob(f'{root}/*{suffix}'): 251 yield filename 252 for filename in _glob(f'{root}/**/*{suffix}'): 253 yield filename 254 255 256def iter_files(root, suffix=None, relparent=None, *, 257 get_files=os.walk, 258 _glob=glob_tree, 259 _walk=walk_tree, 260 ): 261 """Yield each file in the tree under the given directory name. 262 263 If "root" is a non-string iterable then do the same for each of 264 those trees. 265 266 If "suffix" is provided then only files with that suffix will 267 be included. 268 269 if "relparent" is provided then it is used to resolve each 270 filename as a relative path. 271 """ 272 if not isinstance(root, str): 273 roots = root 274 for root in roots: 275 yield from iter_files(root, suffix, relparent, 276 get_files=get_files, 277 _glob=_glob, _walk=_walk) 278 return 279 280 # Use the right "walk" function. 281 if get_files in (glob.glob, glob.iglob, glob_tree): 282 get_files = _glob 283 else: 284 _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files 285 get_files = (lambda *a, **k: _walk(*a, walk=_files, **k)) 286 287 # Handle a single suffix. 288 if suffix and not isinstance(suffix, str): 289 filenames = get_files(root) 290 suffix = tuple(suffix) 291 else: 292 filenames = get_files(root, suffix=suffix) 293 suffix = None 294 295 for filename in filenames: 296 if suffix and not isinstance(suffix, str): # multiple suffixes 297 if not filename.endswith(suffix): 298 continue 299 if relparent: 300 filename = os.path.relpath(filename, relparent) 301 yield filename 302 303 304def iter_files_by_suffix(root, suffixes, relparent=None, *, 305 walk=walk_tree, 306 _iter_files=iter_files, 307 ): 308 """Yield each file in the tree that has the given suffixes. 309 310 Unlike iter_files(), the results are in the original suffix order. 311 """ 312 if isinstance(suffixes, str): 313 suffixes = [suffixes] 314 # XXX Ignore repeated suffixes? 315 for suffix in suffixes: 316 yield from _iter_files(root, suffix, relparent) 317 318 319################################## 320# file info 321 322# XXX posix-only? 323 324S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH 325S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH 326S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH 327 328 329def is_readable(file, *, user=None, check=False): 330 filename, st, mode = _get_file_info(file) 331 if check: 332 try: 333 okay = _check_file(filename, S_IRANY) 334 except NotImplementedError: 335 okay = NotImplemented 336 if okay is not NotImplemented: 337 return okay 338 # Fall back to checking the mode. 339 return _check_mode(st, mode, S_IRANY, user) 340 341 342def is_writable(file, *, user=None, check=False): 343 filename, st, mode = _get_file_info(file) 344 if check: 345 try: 346 okay = _check_file(filename, S_IWANY) 347 except NotImplementedError: 348 okay = NotImplemented 349 if okay is not NotImplemented: 350 return okay 351 # Fall back to checking the mode. 352 return _check_mode(st, mode, S_IWANY, user) 353 354 355def is_executable(file, *, user=None, check=False): 356 filename, st, mode = _get_file_info(file) 357 if check: 358 try: 359 okay = _check_file(filename, S_IXANY) 360 except NotImplementedError: 361 okay = NotImplemented 362 if okay is not NotImplemented: 363 return okay 364 # Fall back to checking the mode. 365 return _check_mode(st, mode, S_IXANY, user) 366 367 368def _get_file_info(file): 369 filename = st = mode = None 370 if isinstance(file, int): 371 mode = file 372 elif isinstance(file, os.stat_result): 373 st = file 374 else: 375 if isinstance(file, str): 376 filename = file 377 elif hasattr(file, 'name') and os.path.exists(file.name): 378 filename = file.name 379 else: 380 raise NotImplementedError(file) 381 st = os.stat(filename) 382 return filename, st, mode or st.st_mode 383 384 385def _check_file(filename, check): 386 if not isinstance(filename, str): 387 raise Exception(f'filename required to check file, got {filename}') 388 if check & S_IRANY: 389 flags = os.O_RDONLY 390 elif check & S_IWANY: 391 flags = os.O_WRONLY 392 elif check & S_IXANY: 393 # We can worry about S_IXANY later 394 return NotImplemented 395 else: 396 raise NotImplementedError(check) 397 398 try: 399 fd = os.open(filename, flags) 400 except PermissionError: 401 return False 402 # We do not ignore other exceptions. 403 else: 404 os.close(fd) 405 return True 406 407 408def _get_user_info(user): 409 import pwd 410 username = uid = gid = groups = None 411 if user is None: 412 uid = os.geteuid() 413 #username = os.getlogin() 414 username = pwd.getpwuid(uid)[0] 415 gid = os.getgid() 416 groups = os.getgroups() 417 else: 418 if isinstance(user, int): 419 uid = user 420 entry = pwd.getpwuid(uid) 421 username = entry.pw_name 422 elif isinstance(user, str): 423 username = user 424 entry = pwd.getpwnam(username) 425 uid = entry.pw_uid 426 else: 427 raise NotImplementedError(user) 428 gid = entry.pw_gid 429 os.getgrouplist(username, gid) 430 return username, uid, gid, groups 431 432 433def _check_mode(st, mode, check, user): 434 orig = check 435 _, uid, gid, groups = _get_user_info(user) 436 if check & S_IRANY: 437 check -= S_IRANY 438 matched = False 439 if mode & stat.S_IRUSR: 440 if st.st_uid == uid: 441 matched = True 442 if mode & stat.S_IRGRP: 443 if st.st_uid == gid or st.st_uid in groups: 444 matched = True 445 if mode & stat.S_IROTH: 446 matched = True 447 if not matched: 448 return False 449 if check & S_IWANY: 450 check -= S_IWANY 451 matched = False 452 if mode & stat.S_IWUSR: 453 if st.st_uid == uid: 454 matched = True 455 if mode & stat.S_IWGRP: 456 if st.st_uid == gid or st.st_uid in groups: 457 matched = True 458 if mode & stat.S_IWOTH: 459 matched = True 460 if not matched: 461 return False 462 if check & S_IXANY: 463 check -= S_IXANY 464 matched = False 465 if mode & stat.S_IXUSR: 466 if st.st_uid == uid: 467 matched = True 468 if mode & stat.S_IXGRP: 469 if st.st_uid == gid or st.st_uid in groups: 470 matched = True 471 if mode & stat.S_IXOTH: 472 matched = True 473 if not matched: 474 return False 475 if check: 476 raise NotImplementedError((orig, check)) 477 return True 478