• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import argparse
2import sys
3import time
4import token
5import tokenize
6import traceback
7
8from abc import abstractmethod
9from typing import Any, Callable, cast, Dict, Optional, Tuple, Type, TypeVar
10
11from pegen.tokenizer import exact_token_types
12from pegen.tokenizer import Mark
13from pegen.tokenizer import Tokenizer
14
15T = TypeVar("T")
16P = TypeVar("P", bound="Parser")
17F = TypeVar("F", bound=Callable[..., Any])
18
19
20def logger(method: F) -> F:
21    """For non-memoized functions that we want to be logged.
22
23    (In practice this is only non-leader left-recursive functions.)
24    """
25    method_name = method.__name__
26
27    def logger_wrapper(self: P, *args: object) -> T:
28        if not self._verbose:
29            return method(self, *args)
30        argsr = ",".join(repr(arg) for arg in args)
31        fill = "  " * self._level
32        print(f"{fill}{method_name}({argsr}) .... (looking at {self.showpeek()})")
33        self._level += 1
34        tree = method(self, *args)
35        self._level -= 1
36        print(f"{fill}... {method_name}({argsr}) --> {tree!s:.200}")
37        return tree
38
39    logger_wrapper.__wrapped__ = method  # type: ignore
40    return cast(F, logger_wrapper)
41
42
43def memoize(method: F) -> F:
44    """Memoize a symbol method."""
45    method_name = method.__name__
46
47    def memoize_wrapper(self: P, *args: object) -> T:
48        mark = self.mark()
49        key = mark, method_name, args
50        # Fast path: cache hit, and not verbose.
51        if key in self._cache and not self._verbose:
52            tree, endmark = self._cache[key]
53            self.reset(endmark)
54            return tree
55        # Slow path: no cache hit, or verbose.
56        verbose = self._verbose
57        argsr = ",".join(repr(arg) for arg in args)
58        fill = "  " * self._level
59        if key not in self._cache:
60            if verbose:
61                print(f"{fill}{method_name}({argsr}) ... (looking at {self.showpeek()})")
62            self._level += 1
63            tree = method(self, *args)
64            self._level -= 1
65            if verbose:
66                print(f"{fill}... {method_name}({argsr}) -> {tree!s:.200}")
67            endmark = self.mark()
68            self._cache[key] = tree, endmark
69        else:
70            tree, endmark = self._cache[key]
71            if verbose:
72                print(f"{fill}{method_name}({argsr}) -> {tree!s:.200}")
73            self.reset(endmark)
74        return tree
75
76    memoize_wrapper.__wrapped__ = method  # type: ignore
77    return cast(F, memoize_wrapper)
78
79
80def memoize_left_rec(method: Callable[[P], Optional[T]]) -> Callable[[P], Optional[T]]:
81    """Memoize a left-recursive symbol method."""
82    method_name = method.__name__
83
84    def memoize_left_rec_wrapper(self: P) -> Optional[T]:
85        mark = self.mark()
86        key = mark, method_name, ()
87        # Fast path: cache hit, and not verbose.
88        if key in self._cache and not self._verbose:
89            tree, endmark = self._cache[key]
90            self.reset(endmark)
91            return tree
92        # Slow path: no cache hit, or verbose.
93        verbose = self._verbose
94        fill = "  " * self._level
95        if key not in self._cache:
96            if verbose:
97                print(f"{fill}{method_name} ... (looking at {self.showpeek()})")
98            self._level += 1
99
100            # For left-recursive rules we manipulate the cache and
101            # loop until the rule shows no progress, then pick the
102            # previous result.  For an explanation why this works, see
103            # https://github.com/PhilippeSigaud/Pegged/wiki/Left-Recursion
104            # (But we use the memoization cache instead of a static
105            # variable; perhaps this is similar to a paper by Warth et al.
106            # (http://web.cs.ucla.edu/~todd/research/pub.php?id=pepm08).
107
108            # Prime the cache with a failure.
109            self._cache[key] = None, mark
110            lastresult, lastmark = None, mark
111            depth = 0
112            if verbose:
113                print(f"{fill}Recursive {method_name} at {mark} depth {depth}")
114
115            while True:
116                self.reset(mark)
117                result = method(self)
118                endmark = self.mark()
119                depth += 1
120                if verbose:
121                    print(
122                        f"{fill}Recursive {method_name} at {mark} depth {depth}: {result!s:.200} to {endmark}"
123                    )
124                if not result:
125                    if verbose:
126                        print(f"{fill}Fail with {lastresult!s:.200} to {lastmark}")
127                    break
128                if endmark <= lastmark:
129                    if verbose:
130                        print(f"{fill}Bailing with {lastresult!s:.200} to {lastmark}")
131                    break
132                self._cache[key] = lastresult, lastmark = result, endmark
133
134            self.reset(lastmark)
135            tree = lastresult
136
137            self._level -= 1
138            if verbose:
139                print(f"{fill}{method_name}() -> {tree!s:.200} [cached]")
140            if tree:
141                endmark = self.mark()
142            else:
143                endmark = mark
144                self.reset(endmark)
145            self._cache[key] = tree, endmark
146        else:
147            tree, endmark = self._cache[key]
148            if verbose:
149                print(f"{fill}{method_name}() -> {tree!s:.200} [fresh]")
150            if tree:
151                self.reset(endmark)
152        return tree
153
154    memoize_left_rec_wrapper.__wrapped__ = method  # type: ignore
155    return memoize_left_rec_wrapper
156
157
158class Parser:
159    """Parsing base class."""
160
161    def __init__(self, tokenizer: Tokenizer, *, verbose: bool = False):
162        self._tokenizer = tokenizer
163        self._verbose = verbose
164        self._level = 0
165        self._cache: Dict[Tuple[Mark, str, Tuple[Any, ...]], Tuple[Any, Mark]] = {}
166        # Pass through common tokenizer methods.
167        # TODO: Rename to _mark and _reset.
168        self.mark = self._tokenizer.mark
169        self.reset = self._tokenizer.reset
170
171    @abstractmethod
172    def start(self) -> Any:
173        pass
174
175    def showpeek(self) -> str:
176        tok = self._tokenizer.peek()
177        return f"{tok.start[0]}.{tok.start[1]}: {token.tok_name[tok.type]}:{tok.string!r}"
178
179    @memoize
180    def name(self) -> Optional[tokenize.TokenInfo]:
181        tok = self._tokenizer.peek()
182        if tok.type == token.NAME:
183            return self._tokenizer.getnext()
184        return None
185
186    @memoize
187    def number(self) -> Optional[tokenize.TokenInfo]:
188        tok = self._tokenizer.peek()
189        if tok.type == token.NUMBER:
190            return self._tokenizer.getnext()
191        return None
192
193    @memoize
194    def string(self) -> Optional[tokenize.TokenInfo]:
195        tok = self._tokenizer.peek()
196        if tok.type == token.STRING:
197            return self._tokenizer.getnext()
198        return None
199
200    @memoize
201    def op(self) -> Optional[tokenize.TokenInfo]:
202        tok = self._tokenizer.peek()
203        if tok.type == token.OP:
204            return self._tokenizer.getnext()
205        return None
206
207    @memoize
208    def expect(self, type: str) -> Optional[tokenize.TokenInfo]:
209        tok = self._tokenizer.peek()
210        if tok.string == type:
211            return self._tokenizer.getnext()
212        if type in exact_token_types:
213            if tok.type == exact_token_types[type]:
214                return self._tokenizer.getnext()
215        if type in token.__dict__:
216            if tok.type == token.__dict__[type]:
217                return self._tokenizer.getnext()
218        if tok.type == token.OP and tok.string == type:
219            return self._tokenizer.getnext()
220        return None
221
222    def positive_lookahead(self, func: Callable[..., T], *args: object) -> T:
223        mark = self.mark()
224        ok = func(*args)
225        self.reset(mark)
226        return ok
227
228    def negative_lookahead(self, func: Callable[..., object], *args: object) -> bool:
229        mark = self.mark()
230        ok = func(*args)
231        self.reset(mark)
232        return not ok
233
234    def make_syntax_error(self, filename: str = "<unknown>") -> SyntaxError:
235        tok = self._tokenizer.diagnose()
236        return SyntaxError(
237            "pegen parse failure", (filename, tok.start[0], 1 + tok.start[1], tok.line)
238        )
239
240
241def simple_parser_main(parser_class: Type[Parser]) -> None:
242    argparser = argparse.ArgumentParser()
243    argparser.add_argument(
244        "-v",
245        "--verbose",
246        action="count",
247        default=0,
248        help="Print timing stats; repeat for more debug output",
249    )
250    argparser.add_argument(
251        "-q", "--quiet", action="store_true", help="Don't print the parsed program"
252    )
253    argparser.add_argument("filename", help="Input file ('-' to use stdin)")
254
255    args = argparser.parse_args()
256    verbose = args.verbose
257    verbose_tokenizer = verbose >= 3
258    verbose_parser = verbose == 2 or verbose >= 4
259
260    t0 = time.time()
261
262    filename = args.filename
263    if filename == "" or filename == "-":
264        filename = "<stdin>"
265        file = sys.stdin
266    else:
267        file = open(args.filename)
268    try:
269        tokengen = tokenize.generate_tokens(file.readline)
270        tokenizer = Tokenizer(tokengen, verbose=verbose_tokenizer)
271        parser = parser_class(tokenizer, verbose=verbose_parser)
272        tree = parser.start()
273        try:
274            if file.isatty():
275                endpos = 0
276            else:
277                endpos = file.tell()
278        except IOError:
279            endpos = 0
280    finally:
281        if file is not sys.stdin:
282            file.close()
283
284    t1 = time.time()
285
286    if not tree:
287        err = parser.make_syntax_error(filename)
288        traceback.print_exception(err.__class__, err, None)
289        sys.exit(1)
290
291    if not args.quiet:
292        print(tree)
293
294    if verbose:
295        dt = t1 - t0
296        diag = tokenizer.diagnose()
297        nlines = diag.end[0]
298        if diag.type == token.ENDMARKER:
299            nlines -= 1
300        print(f"Total time: {dt:.3f} sec; {nlines} lines", end="")
301        if endpos:
302            print(f" ({endpos} bytes)", end="")
303        if dt:
304            print(f"; {nlines / dt:.0f} lines/sec")
305        else:
306            print()
307        print("Caches sizes:")
308        print(f"  token array : {len(tokenizer._tokens):10}")
309        print(f"        cache : {len(parser._cache):10}")
310        ## print_memstats()
311