1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import argparse 8import datetime as dt 9import enum 10import json 11import logging 12import math 13import re 14import shlex 15from typing import (Any, Dict, Final, Iterable, List, Optional, Sequence, Type, 16 TypeVar, Union, cast) 17from urllib import parse as urlparse 18 19import hjson 20 21from crossbench import path as pth 22from crossbench import plt 23 24 25def type_str(value: Any) -> str: 26 return type(value).__name__ 27 28 29class PathParser: 30 31 PATH_PREFIX = re.compile(r"^(?:" 32 r"(?:\.\.?|~)?|" 33 r"[a-zA-Z]:" 34 r")(\\|/)[^\\/]") 35 36 @classmethod 37 def path(cls, value: pth.AnyPathLike, name: str = "value") -> pth.LocalPath: 38 value = ObjectParser.not_none(value, "path") 39 if not value: 40 raise argparse.ArgumentTypeError("Invalid empty path.") 41 try: 42 path = pth.LocalPath(value).expanduser() 43 except RuntimeError as e: 44 raise argparse.ArgumentTypeError( 45 f"Invalid Path {name} {repr(value)}': {e}") from e 46 return path 47 48 @classmethod 49 def existing_file_path(cls, 50 value: pth.AnyPathLike, 51 name: str = "value") -> pth.LocalPath: 52 path = cls.existing_path(value, name) 53 if not path.is_file(): 54 raise argparse.ArgumentTypeError( 55 f"{name} is not a file: {repr(str(path))}") 56 return path 57 58 @classmethod 59 def non_empty_file_path(cls, 60 value: pth.AnyPathLike, 61 name: str = "value") -> pth.LocalPath: 62 path: pth.LocalPath = cls.existing_file_path(value, name) 63 if path.stat().st_size == 0: 64 raise argparse.ArgumentTypeError( 65 f"{name} is an empty file: {repr(str(path))}") 66 return path 67 68 @classmethod 69 def file_path(cls, 70 value: pth.AnyPathLike, 71 name: str = "value") -> pth.LocalPath: 72 return cls.non_empty_file_path(value, name) 73 74 @classmethod 75 def dir_path(cls, 76 value: pth.AnyPathLike, 77 name: str = "value") -> pth.LocalPath: 78 path = cls.existing_path(value, name) 79 if not path.is_dir(): 80 raise argparse.ArgumentTypeError( 81 f"{name} is not a folder: '{repr(str(path))}'") 82 return path 83 84 @classmethod 85 def non_empty_dir_path(cls, 86 value: pth.AnyPathLike, 87 name: str = "value") -> pth.LocalPath: 88 dir_path = cls.dir_path(value, name) 89 for _ in dir_path.iterdir(): 90 return dir_path 91 raise argparse.ArgumentTypeError( 92 f"{name} dir must be non empty: {repr(str(dir_path))}") 93 94 @classmethod 95 def existing_path(cls, 96 value: pth.AnyPathLike, 97 name: str = "value") -> pth.LocalPath: 98 path = cls.path(value) 99 if not path.exists(): 100 raise argparse.ArgumentTypeError( 101 f"{name} path does not exist: {repr(str(path))}") 102 return path 103 104 @classmethod 105 def not_existing_path(cls, 106 value: pth.AnyPathLike, 107 name: str = "value") -> pth.LocalPath: 108 path = cls.path(value) 109 if path.exists(): 110 raise argparse.ArgumentTypeError( 111 f"{name} path already exists: {repr(str(path))}") 112 return path 113 114 @classmethod 115 def binary_path(cls, 116 value: Optional[pth.AnyPathLike], 117 name: str = "binary", 118 platform: Optional[plt.Platform] = None) -> pth.AnyPath: 119 platform = platform or plt.PLATFORM 120 maybe_path = platform.path(ObjectParser.not_none(value, name)) 121 if platform.is_file(maybe_path): 122 return maybe_path 123 maybe_bin = platform.search_binary(maybe_path) 124 if not maybe_bin: 125 raise argparse.ArgumentTypeError(f"Unknown binary: {value}") 126 return maybe_bin 127 128 @classmethod 129 def any_path(cls, 130 value: Optional[pth.AnyPathLike], 131 name: str = "value") -> pth.AnyPath: 132 """Parse a path than can be on a local or remote file system.""" 133 some_value: pth.AnyPathLike = ObjectParser.not_none(value, name) 134 if not some_value: 135 raise argparse.ArgumentTypeError(f"Expected non empty path {name}.") 136 return pth.AnyPath(some_value) 137 138 @classmethod 139 def local_binary_path(cls, 140 value: Optional[pth.AnyPathLike], 141 name: str = "binary") -> pth.LocalPath: 142 return cast(pth.LocalPath, cls.binary_path(value, name)) 143 144 @classmethod 145 def json_file_path(cls, value: pth.AnyPathLike) -> pth.LocalPath: 146 path = cls.file_path(value) 147 with path.open(encoding="utf-8") as f: 148 try: 149 json.load(f) 150 except ValueError as e: 151 message = _extract_decoding_error(f"Invalid json file '{path}':", path, 152 e) 153 raise argparse.ArgumentTypeError(message) from e 154 return path 155 156 @classmethod 157 def hjson_file_path(cls, value: pth.AnyPathLike) -> pth.LocalPath: 158 path = cls.file_path(value) 159 with path.open(encoding="utf-8") as f: 160 try: 161 hjson.load(f) 162 except ValueError as e: 163 message = _extract_decoding_error("Invalid hjson file '{path}':", path, 164 e) 165 raise argparse.ArgumentTypeError(message) from e 166 return path 167 168 169EnumT = TypeVar("EnumT", bound=enum.Enum) 170NotNoneT = TypeVar("NotNoneT", bound=Any) 171SequenceT = TypeVar("SequenceT", bound=Sequence) 172 173 174class ObjectParser: 175 176 @classmethod 177 def enum(cls, label: str, enum_cls: Type[EnumT], data: Any, 178 choices: Union[Type[EnumT], Iterable[EnumT]]) -> EnumT: 179 try: 180 # Try direct conversion, relying on the Enum._missing_ hook: 181 enum_value = enum_cls(data) 182 assert isinstance(enum_value, enum.Enum) 183 assert isinstance(enum_value, enum_cls) 184 return enum_value 185 except Exception as e: # pylint: disable=broad-except 186 logging.debug("Could not auto-convert data '%s' to enum %s: %s", data, 187 enum_cls, e) 188 189 for enum_instance in choices: 190 if data in (enum_instance, enum_instance.value): 191 return enum_instance 192 choices_str: str = ", ".join(repr(item.value) for item in choices) # pytype: disable=missing-parameter 193 raise argparse.ArgumentTypeError(f"Unknown {label}: {repr(data)}.\n" 194 f"Choices are {choices_str}.") 195 196 @classmethod 197 def inline_hjson(cls, value: Any) -> Any: 198 value_str = cls.non_empty_str(value, "hjson") 199 if value_str[0] != "{" or value_str[-1] != "}": 200 raise argparse.ArgumentTypeError( 201 "Invalid inline hjson, missing braces: '{value_str}'") 202 try: 203 return hjson.loads(value_str) 204 except ValueError as e: 205 message = _extract_decoding_error("Could not decode inline hjson", 206 value_str, e) 207 if "eof" in message: 208 message += "\n Likely missing quotes." 209 raise argparse.ArgumentTypeError(message) from e 210 211 @classmethod 212 def json_file(cls, value: pth.AnyPathLike) -> Any: 213 path = PathParser.file_path(value) 214 with path.open(encoding="utf-8") as f: 215 try: 216 return json.load(f) 217 except ValueError as e: 218 message = _extract_decoding_error(f"Invalid json file '{path}':", path, 219 e) 220 raise argparse.ArgumentTypeError(message) from e 221 222 @classmethod 223 def hjson_file(cls, value: pth.AnyPathLike) -> Any: 224 path = PathParser.file_path(value) 225 with path.open(encoding="utf-8") as f: 226 try: 227 return hjson.load(f) 228 except ValueError as e: 229 message = _extract_decoding_error("Invalid hjson file '{path}':", path, 230 e) 231 raise argparse.ArgumentTypeError(message) from e 232 233 @classmethod 234 def non_empty_hjson_file(cls, value: pth.AnyPathLike) -> Any: 235 data = cls.hjson_file(value) 236 if not data: 237 raise argparse.ArgumentTypeError( 238 "Expected hjson file with non-empty data, " 239 f"but got: {hjson.dumps(data)}") 240 return data 241 242 @classmethod 243 def dict_hjson_file(cls, value: pth.AnyPathLike) -> Any: 244 data = cls.non_empty_hjson_file(value) 245 if not isinstance(data, dict): 246 raise argparse.ArgumentTypeError( 247 "Expected object in hjson config '{value}', " 248 f"but got {type_str(data)}: {repr(data)}") 249 return data 250 251 @classmethod 252 def dict(cls, value: Any, name: str = "value") -> Dict: 253 if isinstance(value, dict): 254 return value 255 raise argparse.ArgumentTypeError( 256 f"Expected dict, but {name} is {type_str(value)}: {repr(value)}") 257 258 @classmethod 259 def non_empty_dict(cls, value: Any, name: str = "value") -> Dict: 260 dict_value = cls.dict(value) 261 if not dict_value: 262 raise argparse.ArgumentTypeError( 263 f"Expected {name} to be a non-empty dict.") 264 return dict_value 265 266 @classmethod 267 def sequence(cls, value: Any, name: str = "value") -> Sequence[Any]: 268 if isinstance(value, (list, tuple)): 269 return value 270 raise argparse.ArgumentTypeError( 271 f"Expected sequence, but {name} is {type_str(value)}: {repr(value)}") 272 273 @classmethod 274 def non_empty_sequence(cls, value: Any, name: str = "value") -> Sequence[Any]: 275 sequence_value = cls.sequence(value) 276 if not sequence_value: 277 raise argparse.ArgumentTypeError( 278 f"Expected {name} to be a non-empty sequence.") 279 return sequence_value 280 281 @classmethod 282 def any_str(cls, value: Any, name: str = "value") -> str: 283 value = cls.not_none(value, name) 284 if isinstance(value, str): 285 return value 286 raise argparse.ArgumentTypeError( 287 f"Expected str, but got {type_str(value)}: {value}") 288 289 @classmethod 290 def non_empty_str(cls, value: Any, name: str = "value") -> str: 291 value = cls.any_str(value, name) 292 if not isinstance(value, str): 293 raise argparse.ArgumentTypeError( 294 f"Expected non-empty string {name}, " 295 f"but got {type_str(value)}: {repr(value)}") 296 if not value: 297 raise argparse.ArgumentTypeError(f"Non-empty string {name} expected.") 298 return value 299 300 @classmethod 301 def url_str(cls, 302 value: str, 303 name: str = "url", 304 schemes: Optional[Sequence[str]] = None) -> str: 305 cls.url(value, name, schemes) 306 return value 307 308 @classmethod 309 def httpx_url_str(cls, value: Any, name: str = "url") -> str: 310 cls.url(value, name, schemes=("http", "https")) 311 return value 312 313 @classmethod 314 def base_url(cls, value: str, name: str = "url") -> urlparse.ParseResult: 315 url_str: str = cls.non_empty_str(value, name) 316 try: 317 return urlparse.urlparse(url_str) 318 except ValueError as e: 319 raise argparse.ArgumentTypeError( 320 f"Invalid {name}: {repr(value)}, {e}") from e 321 322 PORT_URL_PATH_RE = re.compile(r"^[0-9]+(?:/|$)") 323 324 @classmethod 325 def parse_fuzzy_url_str(cls, 326 value: str, 327 name: str = "url", 328 schemes: Sequence[str] = ("http", "https", "about", 329 "file"), 330 default_scheme: str = "https") -> str: 331 parsed = cls.parse_fuzzy_url(value, name, schemes, default_scheme) 332 return urlparse.urlunparse(parsed) 333 334 @classmethod 335 def parse_fuzzy_url(cls, 336 value: str, 337 name: str = "url", 338 schemes: Sequence[str] = ("http", "https", "about", 339 "file"), 340 default_scheme: str = "https") -> urlparse.ParseResult: 341 assert default_scheme, "missing default scheme value" 342 value = cls.non_empty_str(value, name) 343 if PathParser.PATH_PREFIX.match(value): 344 value = f"file://{value}" 345 else: 346 parsed = cls.base_url(value) 347 if not parsed.scheme: 348 value = f"{default_scheme}://{value}" 349 # Check if this was a url without a scheme but with ports, which gets 350 # "wrongly" parsed and the host ends up in result.scheme and port and path 351 # are merged into result.path. 352 if parsed.scheme not in schemes and not parsed.netloc: 353 if cls.PORT_URL_PATH_RE.match(parsed.path): 354 # foo.com:8080/test => https://foo.com:8080/test 355 value = f"{default_scheme}://{value}" 356 schemes = tuple(schemes) + (default_scheme,) 357 return cls.url(value, name, schemes) 358 359 @classmethod 360 def url(cls, 361 value: str, 362 name: str = "url", 363 schemes: Optional[Sequence[str]] = None) -> urlparse.ParseResult: 364 parsed = cls.base_url(value) 365 try: 366 scheme = parsed.scheme 367 if schemes and scheme not in schemes: 368 schemes_str = ",".join(map(repr, schemes)) 369 raise argparse.ArgumentTypeError( 370 f"Invalid {name}: Expected scheme to be one of {schemes_str}, " 371 f"but got {repr(parsed.scheme)} for url {repr(value)}") 372 if port := parsed.port: 373 _ = NumberParser.port_number(port, f"{name} port") 374 if scheme in ("file", "about"): 375 return parsed 376 hostname = parsed.hostname 377 if not hostname: 378 raise argparse.ArgumentTypeError( 379 f"Missing hostname in {name}: {repr(value)}") 380 if " " in hostname: 381 raise argparse.ArgumentTypeError( 382 f"Hostname in {name} contains invalid space: {repr(value)}") 383 except ValueError as e: 384 # Some ParseResult properties trigger errors, wrap all of them 385 raise argparse.ArgumentTypeError( 386 f"Invalid {name}: {repr(value)}, {e}") from e 387 return parsed 388 389 @classmethod 390 def bool(cls, value: Any, name: str = "value") -> bool: 391 if isinstance(value, bool): 392 return value 393 value = str(value).lower() 394 if value == "true": 395 return True 396 if value == "false": 397 return False 398 raise argparse.ArgumentTypeError( 399 f"Expected bool {name} but got {type_str(value)}: {repr(value)}") 400 401 402 @classmethod 403 def not_none(cls, value: Optional[NotNoneT], name: str = "value") -> NotNoneT: 404 if value is None: 405 raise argparse.ArgumentTypeError(f"Expected {name} to be not None.") 406 return value 407 408 @classmethod 409 def sh_cmd(cls, value: Any) -> List[str]: 410 value = cls.not_none(value, "shell cmd") 411 if not value: 412 raise argparse.ArgumentTypeError( 413 f"Expected non-empty shell cmd, but got: {value}") 414 if isinstance(value, (list, tuple)): 415 for i, part in enumerate(value): 416 cls.non_empty_str(part, f"cmd[{i}]") 417 return list(value) 418 if not isinstance(value, str): 419 raise argparse.ArgumentTypeError( 420 f"Expected string or list, but got {type_str(value)}: {value}") 421 try: 422 return shlex.split(value) 423 except ValueError as e: 424 raise argparse.ArgumentTypeError(f"Invalid shell cmd: {value} ") from e 425 426 @classmethod 427 def unique_sequence( 428 cls, 429 value: SequenceT, 430 name: str = "sequence", 431 error_cls: Type[Exception] = argparse.ArgumentTypeError) -> SequenceT: 432 unique = set() 433 duplicates = set() 434 for item in value: 435 if item in unique: 436 duplicates.add(item) 437 else: 438 unique.add(item) 439 if not duplicates: 440 return value 441 raise error_cls(f"Unexpected duplicates in {name}: {repr(duplicates)}") 442 443 @classmethod 444 def regexp(cls, value: Any, name: str = "regexp") -> re.Pattern: 445 try: 446 return re.compile(cls.any_str(value, name)) 447 except re.error as e: 448 raise argparse.ArgumentTypeError(f"Invalid regexp {name}: {value}") from e 449 450 451_MAX_LEN = 70 452 453 454def _extract_decoding_error(message: str, value: pth.AnyPathLike, 455 e: ValueError) -> str: 456 lineno = getattr(e, "lineno", -1) - 1 457 colno = getattr(e, "colno", -1) - 1 458 if lineno < 0 or colno < 0: 459 if isinstance(value, pth.LocalPath): 460 return f"{message}\n {str(e)}" 461 return f"{message}: {value}\n {str(e)}" 462 if isinstance(value, pth.AnyPath): 463 with pth.LocalPath(value).open(encoding="utf-8") as f: 464 line = f.readlines()[lineno] 465 else: 466 line = value.splitlines()[lineno] 467 if len(line) > _MAX_LEN: 468 # Only show line around error: 469 start = colno - _MAX_LEN // 2 470 end = colno + _MAX_LEN // 2 471 prefix = "..." 472 suffix = "..." 473 if start < 0: 474 start = 0 475 end = _MAX_LEN 476 prefix = "" 477 elif end > len(line): 478 end = len(line) 479 start = len(line) - _MAX_LEN 480 suffix = "" 481 colno -= start 482 line = prefix + line[start:end] + suffix 483 marker_space = (" " * len(prefix)) + (" " * colno) 484 else: 485 marker_space = " " * colno 486 marker = "_▲_" 487 # Adjust line to be aligned with marker size 488 line = (" " * (len(marker) // 2)) + line 489 return f"{message}\n {line}\n {marker_space}{marker}\n({str(e)})" 490 491 492class NumberParser: 493 494 @classmethod 495 def any_float(cls, value: Any, name: str = "float") -> float: 496 try: 497 return float(value) 498 except ValueError as e: 499 raise argparse.ArgumentTypeError(f"Invalid {name}: {repr(value)}") from e 500 501 @classmethod 502 def positive_zero_float(cls, value: Any, name: str = "float") -> float: 503 value_f = cls.any_float(value, name) 504 if not math.isfinite(value_f) or value_f < 0: 505 raise argparse.ArgumentTypeError( 506 f"Expected {name} >= 0, but got: {value_f}") 507 return value_f 508 509 @classmethod 510 def any_int(cls, value: Any, name: str = "value") -> int: 511 try: 512 return int(value) 513 except ValueError as e: 514 raise argparse.ArgumentTypeError( 515 f"Invalid integer {name}: {repr(value)}") from e 516 517 @classmethod 518 def positive_zero_int(cls, value: Any, name: str = "value") -> int: 519 value_i = cls.any_int(value, name) 520 if value_i < 0: 521 raise argparse.ArgumentTypeError( 522 f"Expected integer {name} >= 0, but got: {value_i}") 523 return value_i 524 525 @classmethod 526 def positive_int(cls, value: Any, name: str = "value") -> int: 527 value_i = cls.any_int(value, name) 528 if not math.isfinite(value_i) or value_i <= 0: 529 raise argparse.ArgumentTypeError( 530 f"Expected integer {name} > 0, but got: {value_i}") 531 return value_i 532 533 @classmethod 534 def port_number(cls, value: Any, name: str = "port") -> int: 535 port = cls.any_int(value, name) 536 if 1 <= port <= 65535: 537 return port 538 raise argparse.ArgumentTypeError( 539 f"Invalid Port: expected 1 <= {name} <= 65535, but got: {repr(port)}") 540 541 542class LateArgumentError(argparse.ArgumentTypeError): 543 """Signals argument parse errors after parser.parse_args(). 544 This is used to map errors back to the original argument, much like 545 argparse.ArgumentError does internally. However, since this happens after 546 the internal argument parsing we need this custom implementation to print 547 more descriptive error messages. 548 """ 549 550 def __init__(self, flag: str, message: str) -> None: 551 super().__init__(message) 552 self.flag = flag 553 self.message = message 554 555 556class DurationParseError(argparse.ArgumentTypeError): 557 pass 558 559 560class DurationParser: 561 562 @classmethod 563 def help(cls) -> str: 564 return "'12.5' == '12.5s', units=['ms', 's', 'm', 'h']" 565 566 _DURATION_RE: Final[re.Pattern] = re.compile( 567 r"(?P<value>(-?\d+(\.\d+)?)) ?(?P<unit>[a-z]+)?") 568 569 @classmethod 570 def _to_timedelta(cls, value: float, suffix: str) -> dt.timedelta: 571 if suffix in {"ms", "millis", "milliseconds"}: 572 return dt.timedelta(milliseconds=value) 573 if suffix in {"s", "sec", "secs", "second", "seconds"}: 574 return dt.timedelta(seconds=value) 575 if suffix in {"m", "min", "mins", "minute", "minutes"}: 576 return dt.timedelta(minutes=value) 577 if suffix in {"h", "hrs", "hour", "hours"}: 578 return dt.timedelta(hours=value) 579 raise DurationParseError(f"Error: {suffix} is not supported for duration. " 580 "Make sure to use a supported time unit/suffix") 581 582 @classmethod 583 def positive_duration(cls, 584 time_value: Any, 585 name: str = "duration") -> dt.timedelta: 586 duration: dt.timedelta = cls.any_duration(time_value) 587 if duration.total_seconds() <= 0: 588 raise DurationParseError(f"Expected non-zero {name}, but got {duration}") 589 return duration 590 591 @classmethod 592 def positive_or_zero_duration(cls, 593 time_value: Any, 594 name: str = "duration") -> dt.timedelta: 595 duration: dt.timedelta = cls.any_duration(time_value, name) 596 if duration.total_seconds() < 0: 597 raise DurationParseError(f"Expected positive {name}, but got {duration}") 598 return duration 599 600 @classmethod 601 def any_duration(cls, 602 time_value: Any, 603 name: str = "duration") -> dt.timedelta: 604 """ 605 This function will parse the measurement and the value from string value. 606 607 For example: 608 5s => dt.timedelta(seconds=5) 609 5m => 5*60 = dt.timedelta(minutes=5) 610 611 """ 612 if isinstance(time_value, dt.timedelta): 613 return time_value 614 if isinstance(time_value, (int, float)): 615 return dt.timedelta(seconds=time_value) 616 if not time_value: 617 raise DurationParseError(f"Expected non-empty {name} value.") 618 if not isinstance(time_value, str): 619 raise DurationParseError( 620 f"Unexpected {type_str(time_value)} for {name}: {time_value}") 621 622 match = cls._DURATION_RE.fullmatch(time_value) 623 if match is None: 624 raise DurationParseError(f"Unknown {name} format: '{time_value}'") 625 626 value = match.group("value") 627 if not value: 628 raise DurationParseError( 629 f"Error: {name} value not found." 630 f"Make sure to include a valid {name} value: '{time_value}'") 631 time_unit = match.group("unit") 632 try: 633 time_value = float(value) 634 except ValueError as e: 635 raise DurationParseError(f"{name} must be a valid number, {e}") from e 636 if not math.isfinite(time_value): 637 raise DurationParseError(f"{name} must be finite, but got: {time_value}") 638 639 if not time_unit: 640 # If no time unit provided we assume it is in seconds. 641 return dt.timedelta(seconds=time_value) 642 return cls._to_timedelta(time_value, time_unit) 643