• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import argparse
8import datetime as dt
9import enum
10import json
11import logging
12import math
13import re
14import shlex
15from typing import (Any, Dict, Final, Iterable, List, Optional, Sequence, Type,
16                    TypeVar, Union, cast)
17from urllib import parse as urlparse
18
19import hjson
20
21from crossbench import path as pth
22from crossbench import plt
23
24
25def type_str(value: Any) -> str:
26  return type(value).__name__
27
28
29class PathParser:
30
31  PATH_PREFIX = re.compile(r"^(?:"
32                           r"(?:\.\.?|~)?|"
33                           r"[a-zA-Z]:"
34                           r")(\\|/)[^\\/]")
35
36  @classmethod
37  def path(cls, value: pth.AnyPathLike, name: str = "value") -> pth.LocalPath:
38    value = ObjectParser.not_none(value, "path")
39    if not value:
40      raise argparse.ArgumentTypeError("Invalid empty path.")
41    try:
42      path = pth.LocalPath(value).expanduser()
43    except RuntimeError as e:
44      raise argparse.ArgumentTypeError(
45          f"Invalid Path {name} {repr(value)}': {e}") from e
46    return path
47
48  @classmethod
49  def existing_file_path(cls,
50                         value: pth.AnyPathLike,
51                         name: str = "value") -> pth.LocalPath:
52    path = cls.existing_path(value, name)
53    if not path.is_file():
54      raise argparse.ArgumentTypeError(
55          f"{name} is not a file: {repr(str(path))}")
56    return path
57
58  @classmethod
59  def non_empty_file_path(cls,
60                          value: pth.AnyPathLike,
61                          name: str = "value") -> pth.LocalPath:
62    path: pth.LocalPath = cls.existing_file_path(value, name)
63    if path.stat().st_size == 0:
64      raise argparse.ArgumentTypeError(
65          f"{name} is an empty file: {repr(str(path))}")
66    return path
67
68  @classmethod
69  def file_path(cls,
70                value: pth.AnyPathLike,
71                name: str = "value") -> pth.LocalPath:
72    return cls.non_empty_file_path(value, name)
73
74  @classmethod
75  def dir_path(cls,
76               value: pth.AnyPathLike,
77               name: str = "value") -> pth.LocalPath:
78    path = cls.existing_path(value, name)
79    if not path.is_dir():
80      raise argparse.ArgumentTypeError(
81          f"{name} is not a folder: '{repr(str(path))}'")
82    return path
83
84  @classmethod
85  def non_empty_dir_path(cls,
86                         value: pth.AnyPathLike,
87                         name: str = "value") -> pth.LocalPath:
88    dir_path = cls.dir_path(value, name)
89    for _ in dir_path.iterdir():
90      return dir_path
91    raise argparse.ArgumentTypeError(
92        f"{name} dir must be non empty: {repr(str(dir_path))}")
93
94  @classmethod
95  def existing_path(cls,
96                    value: pth.AnyPathLike,
97                    name: str = "value") -> pth.LocalPath:
98    path = cls.path(value)
99    if not path.exists():
100      raise argparse.ArgumentTypeError(
101          f"{name} path does not exist: {repr(str(path))}")
102    return path
103
104  @classmethod
105  def not_existing_path(cls,
106                        value: pth.AnyPathLike,
107                        name: str = "value") -> pth.LocalPath:
108    path = cls.path(value)
109    if path.exists():
110      raise argparse.ArgumentTypeError(
111          f"{name} path already exists: {repr(str(path))}")
112    return path
113
114  @classmethod
115  def binary_path(cls,
116                  value: Optional[pth.AnyPathLike],
117                  name: str = "binary",
118                  platform: Optional[plt.Platform] = None) -> pth.AnyPath:
119    platform = platform or plt.PLATFORM
120    maybe_path = platform.path(ObjectParser.not_none(value, name))
121    if platform.is_file(maybe_path):
122      return maybe_path
123    maybe_bin = platform.search_binary(maybe_path)
124    if not maybe_bin:
125      raise argparse.ArgumentTypeError(f"Unknown binary: {value}")
126    return maybe_bin
127
128  @classmethod
129  def any_path(cls,
130               value: Optional[pth.AnyPathLike],
131               name: str = "value") -> pth.AnyPath:
132    """Parse a path than can be on a local or remote file system."""
133    some_value: pth.AnyPathLike = ObjectParser.not_none(value, name)
134    if not some_value:
135      raise argparse.ArgumentTypeError(f"Expected non empty path {name}.")
136    return pth.AnyPath(some_value)
137
138  @classmethod
139  def local_binary_path(cls,
140                        value: Optional[pth.AnyPathLike],
141                        name: str = "binary") -> pth.LocalPath:
142    return cast(pth.LocalPath, cls.binary_path(value, name))
143
144  @classmethod
145  def json_file_path(cls, value: pth.AnyPathLike) -> pth.LocalPath:
146    path = cls.file_path(value)
147    with path.open(encoding="utf-8") as f:
148      try:
149        json.load(f)
150      except ValueError as e:
151        message = _extract_decoding_error(f"Invalid json file '{path}':", path,
152                                          e)
153        raise argparse.ArgumentTypeError(message) from e
154    return path
155
156  @classmethod
157  def hjson_file_path(cls, value: pth.AnyPathLike) -> pth.LocalPath:
158    path = cls.file_path(value)
159    with path.open(encoding="utf-8") as f:
160      try:
161        hjson.load(f)
162      except ValueError as e:
163        message = _extract_decoding_error("Invalid hjson file '{path}':", path,
164                                          e)
165        raise argparse.ArgumentTypeError(message) from e
166    return path
167
168
169EnumT = TypeVar("EnumT", bound=enum.Enum)
170NotNoneT = TypeVar("NotNoneT", bound=Any)
171SequenceT = TypeVar("SequenceT", bound=Sequence)
172
173
174class ObjectParser:
175
176  @classmethod
177  def enum(cls, label: str, enum_cls: Type[EnumT], data: Any,
178           choices: Union[Type[EnumT], Iterable[EnumT]]) -> EnumT:
179    try:
180      # Try direct conversion, relying on the Enum._missing_ hook:
181      enum_value = enum_cls(data)
182      assert isinstance(enum_value, enum.Enum)
183      assert isinstance(enum_value, enum_cls)
184      return enum_value
185    except Exception as e:  # pylint: disable=broad-except
186      logging.debug("Could not auto-convert data '%s' to enum %s: %s", data,
187                    enum_cls, e)
188
189    for enum_instance in choices:
190      if data in (enum_instance, enum_instance.value):
191        return enum_instance
192    choices_str: str = ", ".join(repr(item.value) for item in choices)  # pytype: disable=missing-parameter
193    raise argparse.ArgumentTypeError(f"Unknown {label}: {repr(data)}.\n"
194                                     f"Choices are {choices_str}.")
195
196  @classmethod
197  def inline_hjson(cls, value: Any) -> Any:
198    value_str = cls.non_empty_str(value, "hjson")
199    if value_str[0] != "{" or value_str[-1] != "}":
200      raise argparse.ArgumentTypeError(
201          "Invalid inline hjson, missing braces: '{value_str}'")
202    try:
203      return hjson.loads(value_str)
204    except ValueError as e:
205      message = _extract_decoding_error("Could not decode inline hjson",
206                                        value_str, e)
207      if "eof" in message:
208        message += "\n   Likely missing quotes."
209      raise argparse.ArgumentTypeError(message) from e
210
211  @classmethod
212  def json_file(cls, value: pth.AnyPathLike) -> Any:
213    path = PathParser.file_path(value)
214    with path.open(encoding="utf-8") as f:
215      try:
216        return json.load(f)
217      except ValueError as e:
218        message = _extract_decoding_error(f"Invalid json file '{path}':", path,
219                                          e)
220        raise argparse.ArgumentTypeError(message) from e
221
222  @classmethod
223  def hjson_file(cls, value: pth.AnyPathLike) -> Any:
224    path = PathParser.file_path(value)
225    with path.open(encoding="utf-8") as f:
226      try:
227        return hjson.load(f)
228      except ValueError as e:
229        message = _extract_decoding_error("Invalid hjson file '{path}':", path,
230                                          e)
231        raise argparse.ArgumentTypeError(message) from e
232
233  @classmethod
234  def non_empty_hjson_file(cls, value: pth.AnyPathLike) -> Any:
235    data = cls.hjson_file(value)
236    if not data:
237      raise argparse.ArgumentTypeError(
238          "Expected hjson file with non-empty data, "
239          f"but got: {hjson.dumps(data)}")
240    return data
241
242  @classmethod
243  def dict_hjson_file(cls, value: pth.AnyPathLike) -> Any:
244    data = cls.non_empty_hjson_file(value)
245    if not isinstance(data, dict):
246      raise argparse.ArgumentTypeError(
247          "Expected object in hjson config '{value}', "
248          f"but got {type_str(data)}: {repr(data)}")
249    return data
250
251  @classmethod
252  def dict(cls, value: Any, name: str = "value") -> Dict:
253    if isinstance(value, dict):
254      return value
255    raise argparse.ArgumentTypeError(
256        f"Expected dict, but {name} is {type_str(value)}: {repr(value)}")
257
258  @classmethod
259  def non_empty_dict(cls, value: Any, name: str = "value") -> Dict:
260    dict_value = cls.dict(value)
261    if not dict_value:
262      raise argparse.ArgumentTypeError(
263          f"Expected {name} to be a non-empty dict.")
264    return dict_value
265
266  @classmethod
267  def sequence(cls, value: Any, name: str = "value") -> Sequence[Any]:
268    if isinstance(value, (list, tuple)):
269      return value
270    raise argparse.ArgumentTypeError(
271        f"Expected sequence, but {name} is {type_str(value)}: {repr(value)}")
272
273  @classmethod
274  def non_empty_sequence(cls, value: Any, name: str = "value") -> Sequence[Any]:
275    sequence_value = cls.sequence(value)
276    if not sequence_value:
277      raise argparse.ArgumentTypeError(
278          f"Expected {name} to be a non-empty sequence.")
279    return sequence_value
280
281  @classmethod
282  def any_str(cls, value: Any, name: str = "value") -> str:
283    value = cls.not_none(value, name)
284    if isinstance(value, str):
285      return value
286    raise argparse.ArgumentTypeError(
287        f"Expected str, but got {type_str(value)}: {value}")
288
289  @classmethod
290  def non_empty_str(cls, value: Any, name: str = "value") -> str:
291    value = cls.any_str(value, name)
292    if not isinstance(value, str):
293      raise argparse.ArgumentTypeError(
294          f"Expected non-empty string {name}, "
295          f"but got {type_str(value)}: {repr(value)}")
296    if not value:
297      raise argparse.ArgumentTypeError(f"Non-empty string {name} expected.")
298    return value
299
300  @classmethod
301  def url_str(cls,
302              value: str,
303              name: str = "url",
304              schemes: Optional[Sequence[str]] = None) -> str:
305    cls.url(value, name, schemes)
306    return value
307
308  @classmethod
309  def httpx_url_str(cls, value: Any, name: str = "url") -> str:
310    cls.url(value, name, schemes=("http", "https"))
311    return value
312
313  @classmethod
314  def base_url(cls, value: str, name: str = "url") -> urlparse.ParseResult:
315    url_str: str = cls.non_empty_str(value, name)
316    try:
317      return urlparse.urlparse(url_str)
318    except ValueError as e:
319      raise argparse.ArgumentTypeError(
320          f"Invalid {name}: {repr(value)}, {e}") from e
321
322  PORT_URL_PATH_RE = re.compile(r"^[0-9]+(?:/|$)")
323
324  @classmethod
325  def parse_fuzzy_url_str(cls,
326                          value: str,
327                          name: str = "url",
328                          schemes: Sequence[str] = ("http", "https", "about",
329                                                    "file"),
330                          default_scheme: str = "https") -> str:
331    parsed = cls.parse_fuzzy_url(value, name, schemes, default_scheme)
332    return urlparse.urlunparse(parsed)
333
334  @classmethod
335  def parse_fuzzy_url(cls,
336                      value: str,
337                      name: str = "url",
338                      schemes: Sequence[str] = ("http", "https", "about",
339                                                "file"),
340                      default_scheme: str = "https") -> urlparse.ParseResult:
341    assert default_scheme, "missing default scheme value"
342    value = cls.non_empty_str(value, name)
343    if PathParser.PATH_PREFIX.match(value):
344      value = f"file://{value}"
345    else:
346      parsed = cls.base_url(value)
347      if not parsed.scheme:
348        value = f"{default_scheme}://{value}"
349      # Check if this was a url without a scheme but with ports, which gets
350      # "wrongly" parsed and the host ends up in result.scheme and port and path
351      # are merged into result.path.
352      if parsed.scheme not in schemes and not parsed.netloc:
353        if cls.PORT_URL_PATH_RE.match(parsed.path):
354          # foo.com:8080/test => https://foo.com:8080/test
355          value = f"{default_scheme}://{value}"
356      schemes = tuple(schemes) + (default_scheme,)
357    return cls.url(value, name, schemes)
358
359  @classmethod
360  def url(cls,
361          value: str,
362          name: str = "url",
363          schemes: Optional[Sequence[str]] = None) -> urlparse.ParseResult:
364    parsed = cls.base_url(value)
365    try:
366      scheme = parsed.scheme
367      if schemes and scheme not in schemes:
368        schemes_str = ",".join(map(repr, schemes))
369        raise argparse.ArgumentTypeError(
370            f"Invalid {name}: Expected scheme to be one of {schemes_str}, "
371            f"but got {repr(parsed.scheme)} for url {repr(value)}")
372      if port := parsed.port:
373        _ = NumberParser.port_number(port, f"{name} port")
374      if scheme in ("file", "about"):
375        return parsed
376      hostname = parsed.hostname
377      if not hostname:
378        raise argparse.ArgumentTypeError(
379            f"Missing hostname in {name}: {repr(value)}")
380      if " " in hostname:
381        raise argparse.ArgumentTypeError(
382            f"Hostname in {name} contains invalid space: {repr(value)}")
383    except ValueError as e:
384      # Some ParseResult properties trigger errors, wrap all of them
385      raise argparse.ArgumentTypeError(
386          f"Invalid {name}: {repr(value)}, {e}") from e
387    return parsed
388
389  @classmethod
390  def bool(cls, value: Any, name: str = "value") -> bool:
391    if isinstance(value, bool):
392      return value
393    value = str(value).lower()
394    if value == "true":
395      return True
396    if value == "false":
397      return False
398    raise argparse.ArgumentTypeError(
399        f"Expected bool {name} but got {type_str(value)}: {repr(value)}")
400
401
402  @classmethod
403  def not_none(cls, value: Optional[NotNoneT], name: str = "value") -> NotNoneT:
404    if value is None:
405      raise argparse.ArgumentTypeError(f"Expected {name} to be not None.")
406    return value
407
408  @classmethod
409  def sh_cmd(cls, value: Any) -> List[str]:
410    value = cls.not_none(value, "shell cmd")
411    if not value:
412      raise argparse.ArgumentTypeError(
413          f"Expected non-empty shell cmd, but got: {value}")
414    if isinstance(value, (list, tuple)):
415      for i, part in enumerate(value):
416        cls.non_empty_str(part, f"cmd[{i}]")
417      return list(value)
418    if not isinstance(value, str):
419      raise argparse.ArgumentTypeError(
420          f"Expected string or list, but got {type_str(value)}: {value}")
421    try:
422      return shlex.split(value)
423    except ValueError as e:
424      raise argparse.ArgumentTypeError(f"Invalid shell cmd: {value} ") from e
425
426  @classmethod
427  def unique_sequence(
428      cls,
429      value: SequenceT,
430      name: str = "sequence",
431      error_cls: Type[Exception] = argparse.ArgumentTypeError) -> SequenceT:
432    unique = set()
433    duplicates = set()
434    for item in value:
435      if item in unique:
436        duplicates.add(item)
437      else:
438        unique.add(item)
439    if not duplicates:
440      return value
441    raise error_cls(f"Unexpected duplicates in {name}: {repr(duplicates)}")
442
443  @classmethod
444  def regexp(cls, value: Any, name: str = "regexp") -> re.Pattern:
445    try:
446      return re.compile(cls.any_str(value, name))
447    except re.error as e:
448      raise argparse.ArgumentTypeError(f"Invalid regexp {name}: {value}") from e
449
450
451_MAX_LEN = 70
452
453
454def _extract_decoding_error(message: str, value: pth.AnyPathLike,
455                            e: ValueError) -> str:
456  lineno = getattr(e, "lineno", -1) - 1
457  colno = getattr(e, "colno", -1) - 1
458  if lineno < 0 or colno < 0:
459    if isinstance(value, pth.LocalPath):
460      return f"{message}\n    {str(e)}"
461    return f"{message}: {value}\n    {str(e)}"
462  if isinstance(value, pth.AnyPath):
463    with pth.LocalPath(value).open(encoding="utf-8") as f:
464      line = f.readlines()[lineno]
465  else:
466    line = value.splitlines()[lineno]
467  if len(line) > _MAX_LEN:
468    # Only show line around error:
469    start = colno - _MAX_LEN // 2
470    end = colno + _MAX_LEN // 2
471    prefix = "..."
472    suffix = "..."
473    if start < 0:
474      start = 0
475      end = _MAX_LEN
476      prefix = ""
477    elif end > len(line):
478      end = len(line)
479      start = len(line) - _MAX_LEN
480      suffix = ""
481    colno -= start
482    line = prefix + line[start:end] + suffix
483    marker_space = (" " * len(prefix)) + (" " * colno)
484  else:
485    marker_space = " " * colno
486  marker = "_▲_"
487  # Adjust line to be aligned with marker size
488  line = (" " * (len(marker) // 2)) + line
489  return f"{message}\n    {line}\n    {marker_space}{marker}\n({str(e)})"
490
491
492class NumberParser:
493
494  @classmethod
495  def any_float(cls, value: Any, name: str = "float") -> float:
496    try:
497      return float(value)
498    except ValueError as e:
499      raise argparse.ArgumentTypeError(f"Invalid {name}: {repr(value)}") from e
500
501  @classmethod
502  def positive_zero_float(cls, value: Any, name: str = "float") -> float:
503    value_f = cls.any_float(value, name)
504    if not math.isfinite(value_f) or value_f < 0:
505      raise argparse.ArgumentTypeError(
506          f"Expected {name} >= 0, but got: {value_f}")
507    return value_f
508
509  @classmethod
510  def any_int(cls, value: Any, name: str = "value") -> int:
511    try:
512      return int(value)
513    except ValueError as e:
514      raise argparse.ArgumentTypeError(
515          f"Invalid integer {name}: {repr(value)}") from e
516
517  @classmethod
518  def positive_zero_int(cls, value: Any, name: str = "value") -> int:
519    value_i = cls.any_int(value, name)
520    if value_i < 0:
521      raise argparse.ArgumentTypeError(
522          f"Expected integer {name} >= 0, but got: {value_i}")
523    return value_i
524
525  @classmethod
526  def positive_int(cls, value: Any, name: str = "value") -> int:
527    value_i = cls.any_int(value, name)
528    if not math.isfinite(value_i) or value_i <= 0:
529      raise argparse.ArgumentTypeError(
530          f"Expected integer {name} > 0, but got: {value_i}")
531    return value_i
532
533  @classmethod
534  def port_number(cls, value: Any, name: str = "port") -> int:
535    port = cls.any_int(value, name)
536    if 1 <= port <= 65535:
537      return port
538    raise argparse.ArgumentTypeError(
539        f"Invalid Port: expected 1 <= {name} <= 65535, but got: {repr(port)}")
540
541
542class LateArgumentError(argparse.ArgumentTypeError):
543  """Signals argument parse errors after parser.parse_args().
544  This is used to map errors back to the original argument, much like
545  argparse.ArgumentError does internally. However, since this happens after
546  the internal argument parsing we need this custom implementation to print
547  more descriptive error messages.
548  """
549
550  def __init__(self, flag: str, message: str) -> None:
551    super().__init__(message)
552    self.flag = flag
553    self.message = message
554
555
556class DurationParseError(argparse.ArgumentTypeError):
557  pass
558
559
560class DurationParser:
561
562  @classmethod
563  def help(cls) -> str:
564    return "'12.5' == '12.5s',  units=['ms', 's', 'm', 'h']"
565
566  _DURATION_RE: Final[re.Pattern] = re.compile(
567      r"(?P<value>(-?\d+(\.\d+)?)) ?(?P<unit>[a-z]+)?")
568
569  @classmethod
570  def _to_timedelta(cls, value: float, suffix: str) -> dt.timedelta:
571    if suffix in {"ms", "millis", "milliseconds"}:
572      return dt.timedelta(milliseconds=value)
573    if suffix in {"s", "sec", "secs", "second", "seconds"}:
574      return dt.timedelta(seconds=value)
575    if suffix in {"m", "min", "mins", "minute", "minutes"}:
576      return dt.timedelta(minutes=value)
577    if suffix in {"h", "hrs", "hour", "hours"}:
578      return dt.timedelta(hours=value)
579    raise DurationParseError(f"Error: {suffix} is not supported for duration. "
580                             "Make sure to use a supported time unit/suffix")
581
582  @classmethod
583  def positive_duration(cls,
584                        time_value: Any,
585                        name: str = "duration") -> dt.timedelta:
586    duration: dt.timedelta = cls.any_duration(time_value)
587    if duration.total_seconds() <= 0:
588      raise DurationParseError(f"Expected non-zero {name}, but got {duration}")
589    return duration
590
591  @classmethod
592  def positive_or_zero_duration(cls,
593                                time_value: Any,
594                                name: str = "duration") -> dt.timedelta:
595    duration: dt.timedelta = cls.any_duration(time_value, name)
596    if duration.total_seconds() < 0:
597      raise DurationParseError(f"Expected positive {name}, but got {duration}")
598    return duration
599
600  @classmethod
601  def any_duration(cls,
602                   time_value: Any,
603                   name: str = "duration") -> dt.timedelta:
604    """
605    This function will parse the measurement and the value from string value.
606
607    For example:
608    5s => dt.timedelta(seconds=5)
609    5m => 5*60 = dt.timedelta(minutes=5)
610
611    """
612    if isinstance(time_value, dt.timedelta):
613      return time_value
614    if isinstance(time_value, (int, float)):
615      return dt.timedelta(seconds=time_value)
616    if not time_value:
617      raise DurationParseError(f"Expected non-empty {name} value.")
618    if not isinstance(time_value, str):
619      raise DurationParseError(
620          f"Unexpected {type_str(time_value)} for {name}: {time_value}")
621
622    match = cls._DURATION_RE.fullmatch(time_value)
623    if match is None:
624      raise DurationParseError(f"Unknown {name} format: '{time_value}'")
625
626    value = match.group("value")
627    if not value:
628      raise DurationParseError(
629          f"Error: {name} value not found."
630          f"Make sure to include a valid {name} value: '{time_value}'")
631    time_unit = match.group("unit")
632    try:
633      time_value = float(value)
634    except ValueError as e:
635      raise DurationParseError(f"{name} must be a valid number, {e}") from e
636    if not math.isfinite(time_value):
637      raise DurationParseError(f"{name} must be finite, but got: {time_value}")
638
639    if not time_unit:
640      # If no time unit provided we assume it is in seconds.
641      return dt.timedelta(seconds=time_value)
642    return cls._to_timedelta(time_value, time_unit)
643