• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from dataclasses import dataclass, field
2import lexer
3import parser
4import re
5from typing import Optional
6
7
8@dataclass
9class Properties:
10    escapes: bool
11    error_with_pop: bool
12    error_without_pop: bool
13    deopts: bool
14    oparg: bool
15    jumps: bool
16    eval_breaker: bool
17    ends_with_eval_breaker: bool
18    needs_this: bool
19    always_exits: bool
20    stores_sp: bool
21    uses_co_consts: bool
22    uses_co_names: bool
23    uses_locals: bool
24    has_free: bool
25    side_exit: bool
26    pure: bool
27    tier: int | None = None
28    oparg_and_1: bool = False
29    const_oparg: int = -1
30
31    def dump(self, indent: str) -> None:
32        print(indent, end="")
33        text = ", ".join([f"{key}: {value}" for (key, value) in self.__dict__.items()])
34        print(indent, text, sep="")
35
36    @staticmethod
37    def from_list(properties: list["Properties"]) -> "Properties":
38        return Properties(
39            escapes=any(p.escapes for p in properties),
40            error_with_pop=any(p.error_with_pop for p in properties),
41            error_without_pop=any(p.error_without_pop for p in properties),
42            deopts=any(p.deopts for p in properties),
43            oparg=any(p.oparg for p in properties),
44            jumps=any(p.jumps for p in properties),
45            eval_breaker=any(p.eval_breaker for p in properties),
46            ends_with_eval_breaker=any(p.ends_with_eval_breaker for p in properties),
47            needs_this=any(p.needs_this for p in properties),
48            always_exits=any(p.always_exits for p in properties),
49            stores_sp=any(p.stores_sp for p in properties),
50            uses_co_consts=any(p.uses_co_consts for p in properties),
51            uses_co_names=any(p.uses_co_names for p in properties),
52            uses_locals=any(p.uses_locals for p in properties),
53            has_free=any(p.has_free for p in properties),
54            side_exit=any(p.side_exit for p in properties),
55            pure=all(p.pure for p in properties),
56        )
57
58    @property
59    def infallible(self) -> bool:
60        return not self.error_with_pop and not self.error_without_pop
61
62
63
64SKIP_PROPERTIES = Properties(
65    escapes=False,
66    error_with_pop=False,
67    error_without_pop=False,
68    deopts=False,
69    oparg=False,
70    jumps=False,
71    eval_breaker=False,
72    ends_with_eval_breaker=False,
73    needs_this=False,
74    always_exits=False,
75    stores_sp=False,
76    uses_co_consts=False,
77    uses_co_names=False,
78    uses_locals=False,
79    has_free=False,
80    side_exit=False,
81    pure=False,
82)
83
84
85@dataclass
86class Skip:
87    "Unused cache entry"
88    size: int
89
90    @property
91    def name(self) -> str:
92        return f"unused/{self.size}"
93
94    @property
95    def properties(self) -> Properties:
96        return SKIP_PROPERTIES
97
98
99@dataclass
100class StackItem:
101    name: str
102    type: str | None
103    condition: str | None
104    size: str
105    peek: bool = False
106
107    def __str__(self) -> str:
108        cond = f" if ({self.condition})" if self.condition else ""
109        size = f"[{self.size}]" if self.size != "1" else ""
110        type = "" if self.type is None else f"{self.type} "
111        return f"{type}{self.name}{size}{cond} {self.peek}"
112
113    def is_array(self) -> bool:
114        return self.type == "PyObject **"
115
116
117@dataclass
118class StackEffect:
119    inputs: list[StackItem]
120    outputs: list[StackItem]
121
122    def __str__(self) -> str:
123        return f"({', '.join([str(i) for i in self.inputs])} -- {', '.join([str(i) for i in self.outputs])})"
124
125
126@dataclass
127class CacheEntry:
128    name: str
129    size: int
130
131    def __str__(self) -> str:
132        return f"{self.name}/{self.size}"
133
134
135@dataclass
136class Uop:
137    name: str
138    context: parser.Context | None
139    annotations: list[str]
140    stack: StackEffect
141    caches: list[CacheEntry]
142    body: list[lexer.Token]
143    properties: Properties
144    _size: int = -1
145    implicitly_created: bool = False
146    replicated = 0
147    replicates : "Uop | None" = None
148
149    def dump(self, indent: str) -> None:
150        print(
151            indent, self.name, ", ".join(self.annotations) if self.annotations else ""
152        )
153        print(indent, self.stack, ", ".join([str(c) for c in self.caches]))
154        self.properties.dump("    " + indent)
155
156    @property
157    def size(self) -> int:
158        if self._size < 0:
159            self._size = sum(c.size for c in self.caches)
160        return self._size
161
162    def why_not_viable(self) -> str | None:
163        if self.name == "_SAVE_RETURN_OFFSET":
164            return None  # Adjusts next_instr, but only in tier 1 code
165        if "INSTRUMENTED" in self.name:
166            return "is instrumented"
167        if "replaced" in self.annotations:
168            return "is replaced"
169        if self.name in ("INTERPRETER_EXIT", "JUMP_BACKWARD"):
170            return "has tier 1 control flow"
171        if self.properties.needs_this:
172            return "uses the 'this_instr' variable"
173        if len([c for c in self.caches if c.name != "unused"]) > 1:
174            return "has unused cache entries"
175        if self.properties.error_with_pop and self.properties.error_without_pop:
176            return "has both popping and not-popping errors"
177        if self.properties.eval_breaker:
178            if self.properties.error_with_pop or self.properties.error_without_pop:
179                return "has error handling and eval-breaker check"
180            if self.properties.side_exit:
181                return "exits and eval-breaker check"
182            if self.properties.deopts:
183                return "deopts and eval-breaker check"
184        return None
185
186    def is_viable(self) -> bool:
187        return self.why_not_viable() is None
188
189    def is_super(self) -> bool:
190        for tkn in self.body:
191            if tkn.kind == "IDENTIFIER" and tkn.text == "oparg1":
192                return True
193        return False
194
195
196Part = Uop | Skip
197
198
199@dataclass
200class Instruction:
201    name: str
202    parts: list[Part]
203    _properties: Properties | None
204    is_target: bool = False
205    family: Optional["Family"] = None
206    opcode: int = -1
207
208    @property
209    def properties(self) -> Properties:
210        if self._properties is None:
211            self._properties = self._compute_properties()
212        return self._properties
213
214    def _compute_properties(self) -> Properties:
215        return Properties.from_list([part.properties for part in self.parts])
216
217    def dump(self, indent: str) -> None:
218        print(indent, self.name, "=", ", ".join([part.name for part in self.parts]))
219        self.properties.dump("    " + indent)
220
221    @property
222    def size(self) -> int:
223        return 1 + sum(part.size for part in self.parts)
224
225    def is_super(self) -> bool:
226        if len(self.parts) != 1:
227            return False
228        uop = self.parts[0]
229        if isinstance(uop, Uop):
230            return uop.is_super()
231        else:
232            return False
233
234
235@dataclass
236class PseudoInstruction:
237    name: str
238    targets: list[Instruction]
239    flags: list[str]
240    opcode: int = -1
241
242    def dump(self, indent: str) -> None:
243        print(indent, self.name, "->", " or ".join([t.name for t in self.targets]))
244
245    @property
246    def properties(self) -> Properties:
247        return Properties.from_list([i.properties for i in self.targets])
248
249
250@dataclass
251class Family:
252    name: str
253    size: str
254    members: list[Instruction]
255
256    def dump(self, indent: str) -> None:
257        print(indent, self.name, "= ", ", ".join([m.name for m in self.members]))
258
259
260@dataclass
261class Analysis:
262    instructions: dict[str, Instruction]
263    uops: dict[str, Uop]
264    families: dict[str, Family]
265    pseudos: dict[str, PseudoInstruction]
266    opmap: dict[str, int]
267    have_arg: int
268    min_instrumented: int
269
270
271def analysis_error(message: str, tkn: lexer.Token) -> SyntaxError:
272    # To do -- support file and line output
273    # Construct a SyntaxError instance from message and token
274    return lexer.make_syntax_error(message, tkn.filename, tkn.line, tkn.column, "")
275
276
277def override_error(
278    name: str,
279    context: parser.Context | None,
280    prev_context: parser.Context | None,
281    token: lexer.Token,
282) -> SyntaxError:
283    return analysis_error(
284        f"Duplicate definition of '{name}' @ {context} "
285        f"previous definition @ {prev_context}",
286        token,
287    )
288
289
290def convert_stack_item(item: parser.StackEffect, replace_op_arg_1: str | None) -> StackItem:
291    cond = item.cond
292    if replace_op_arg_1 and OPARG_AND_1.match(item.cond):
293        cond = replace_op_arg_1
294    return StackItem(
295        item.name, item.type, cond, (item.size or "1")
296    )
297
298def analyze_stack(op: parser.InstDef, replace_op_arg_1: str | None = None) -> StackEffect:
299    inputs: list[StackItem] = [
300        convert_stack_item(i, replace_op_arg_1) for i in op.inputs if isinstance(i, parser.StackEffect)
301    ]
302    outputs: list[StackItem] = [convert_stack_item(i, replace_op_arg_1) for i in op.outputs]
303    # Mark variables with matching names at the base of the stack as "peek"
304    modified = False
305    for input, output in zip(inputs, outputs):
306        if input.name == output.name and not modified:
307            input.peek = output.peek = True
308        else:
309            modified = True
310    return StackEffect(inputs, outputs)
311
312
313def analyze_caches(inputs: list[parser.InputEffect]) -> list[CacheEntry]:
314    caches: list[parser.CacheEffect] = [
315        i for i in inputs if isinstance(i, parser.CacheEffect)
316    ]
317    for cache in caches:
318        if cache.name == "unused":
319            raise analysis_error(
320                "Unused cache entry in op. Move to enclosing macro.", cache.tokens[0]
321            )
322    return [CacheEntry(i.name, int(i.size)) for i in caches]
323
324
325def variable_used(node: parser.InstDef, name: str) -> bool:
326    """Determine whether a variable with a given name is used in a node."""
327    return any(
328        token.kind == "IDENTIFIER" and token.text == name for token in node.tokens
329    )
330
331def tier_variable(node: parser.InstDef) -> int | None:
332    """Determine whether a tier variable is used in a node."""
333    for token in node.tokens:
334        if token.kind == "ANNOTATION":
335            if token.text == "specializing":
336                return 1
337            if re.fullmatch(r"tier\d", token.text):
338                return int(token.text[-1])
339    return None
340
341def has_error_with_pop(op: parser.InstDef) -> bool:
342    return (
343        variable_used(op, "ERROR_IF")
344        or variable_used(op, "pop_1_error")
345        or variable_used(op, "exception_unwind")
346        or variable_used(op, "resume_with_error")
347    )
348
349def has_error_without_pop(op: parser.InstDef) -> bool:
350    return (
351        variable_used(op, "ERROR_NO_POP")
352        or variable_used(op, "pop_1_error")
353        or variable_used(op, "exception_unwind")
354        or variable_used(op, "resume_with_error")
355    )
356
357
358NON_ESCAPING_FUNCTIONS = (
359    "Py_INCREF",
360    "_PyManagedDictPointer_IsValues",
361    "_PyObject_GetManagedDict",
362    "_PyObject_ManagedDictPointer",
363    "_PyObject_InlineValues",
364    "_PyDictValues_AddToInsertionOrder",
365    "Py_DECREF",
366    "Py_XDECREF",
367    "_Py_DECREF_SPECIALIZED",
368    "DECREF_INPUTS_AND_REUSE_FLOAT",
369    "PyUnicode_Append",
370    "_PyLong_IsZero",
371    "Py_SIZE",
372    "Py_TYPE",
373    "PyList_GET_ITEM",
374    "PyList_SET_ITEM",
375    "PyTuple_GET_ITEM",
376    "PyList_GET_SIZE",
377    "PyTuple_GET_SIZE",
378    "Py_ARRAY_LENGTH",
379    "Py_Unicode_GET_LENGTH",
380    "PyUnicode_READ_CHAR",
381    "_Py_SINGLETON",
382    "PyUnicode_GET_LENGTH",
383    "_PyLong_IsCompact",
384    "_PyLong_IsNonNegativeCompact",
385    "_PyLong_CompactValue",
386    "_PyLong_DigitCount",
387    "_Py_NewRef",
388    "_Py_IsImmortal",
389    "PyLong_FromLong",
390    "_Py_STR",
391    "_PyLong_Add",
392    "_PyLong_Multiply",
393    "_PyLong_Subtract",
394    "Py_NewRef",
395    "_PyList_ITEMS",
396    "_PyTuple_ITEMS",
397    "_PyList_AppendTakeRef",
398    "_Py_atomic_load_uintptr_relaxed",
399    "_PyFrame_GetCode",
400    "_PyThreadState_HasStackSpace",
401    "_PyUnicode_Equal",
402    "_PyFrame_SetStackPointer",
403    "_PyType_HasFeature",
404    "PyUnicode_Concat",
405    "_PyList_FromArraySteal",
406    "_PyTuple_FromArraySteal",
407    "PySlice_New",
408    "_Py_LeaveRecursiveCallPy",
409    "CALL_STAT_INC",
410    "STAT_INC",
411    "maybe_lltrace_resume_frame",
412    "_PyUnicode_JoinArray",
413    "_PyEval_FrameClearAndPop",
414    "_PyFrame_StackPush",
415    "PyCell_New",
416    "PyFloat_AS_DOUBLE",
417    "_PyFrame_PushUnchecked",
418    "Py_FatalError",
419)
420
421ESCAPING_FUNCTIONS = (
422    "import_name",
423    "import_from",
424)
425
426
427def makes_escaping_api_call(instr: parser.InstDef) -> bool:
428    if "CALL_INTRINSIC" in instr.name:
429        return True
430    if instr.name == "_BINARY_OP":
431        return True
432    tkns = iter(instr.tokens)
433    for tkn in tkns:
434        if tkn.kind != lexer.IDENTIFIER:
435            continue
436        try:
437            next_tkn = next(tkns)
438        except StopIteration:
439            return False
440        if next_tkn.kind != lexer.LPAREN:
441            continue
442        if tkn.text in ESCAPING_FUNCTIONS:
443            return True
444        if tkn.text == "tp_vectorcall":
445            return True
446        if not tkn.text.startswith("Py") and not tkn.text.startswith("_Py"):
447            continue
448        if tkn.text.endswith("Check"):
449            continue
450        if tkn.text.startswith("Py_Is"):
451            continue
452        if tkn.text.endswith("CheckExact"):
453            continue
454        if tkn.text in NON_ESCAPING_FUNCTIONS:
455            continue
456        return True
457    return False
458
459
460EXITS = {
461    "DISPATCH",
462    "GO_TO_INSTRUCTION",
463    "Py_UNREACHABLE",
464    "DISPATCH_INLINED",
465    "DISPATCH_GOTO",
466}
467
468
469def eval_breaker_at_end(op: parser.InstDef) -> bool:
470    return op.tokens[-5].text == "CHECK_EVAL_BREAKER"
471
472
473def always_exits(op: parser.InstDef) -> bool:
474    depth = 0
475    tkn_iter = iter(op.tokens)
476    for tkn in tkn_iter:
477        if tkn.kind == "LBRACE":
478            depth += 1
479        elif tkn.kind == "RBRACE":
480            depth -= 1
481        elif depth > 1:
482            continue
483        elif tkn.kind == "GOTO" or tkn.kind == "RETURN":
484            return True
485        elif tkn.kind == "KEYWORD":
486            if tkn.text in EXITS:
487                return True
488        elif tkn.kind == "IDENTIFIER":
489            if tkn.text in EXITS:
490                return True
491            if tkn.text == "DEOPT_IF" or tkn.text == "ERROR_IF":
492                next(tkn_iter)  # '('
493                t = next(tkn_iter)
494                if t.text == "true":
495                    return True
496    return False
497
498
499def stack_effect_only_peeks(instr: parser.InstDef) -> bool:
500    stack_inputs = [s for s in instr.inputs if not isinstance(s, parser.CacheEffect)]
501    if len(stack_inputs) != len(instr.outputs):
502        return False
503    if len(stack_inputs) == 0:
504        return False
505    if any(s.cond for s in stack_inputs) or any(s.cond for s in instr.outputs):
506        return False
507    return all(
508        (s.name == other.name and s.type == other.type and s.size == other.size)
509        for s, other in zip(stack_inputs, instr.outputs)
510    )
511
512OPARG_AND_1 = re.compile("\\(*oparg *& *1")
513
514def effect_depends_on_oparg_1(op: parser.InstDef) -> bool:
515    for effect in op.inputs:
516        if isinstance(effect, parser.CacheEffect):
517            continue
518        if not effect.cond:
519            continue
520        if OPARG_AND_1.match(effect.cond):
521            return True
522    for effect in op.outputs:
523        if not effect.cond:
524            continue
525        if OPARG_AND_1.match(effect.cond):
526            return True
527    return False
528
529def compute_properties(op: parser.InstDef) -> Properties:
530    has_free = (
531        variable_used(op, "PyCell_New")
532        or variable_used(op, "PyCell_GetRef")
533        or variable_used(op, "PyCell_SetTakeRef")
534        or variable_used(op, "PyCell_SwapTakeRef")
535    )
536    deopts_if = variable_used(op, "DEOPT_IF")
537    exits_if = variable_used(op, "EXIT_IF")
538    if deopts_if and exits_if:
539        tkn = op.tokens[0]
540        raise lexer.make_syntax_error(
541            "Op cannot contain both EXIT_IF and DEOPT_IF",
542            tkn.filename,
543            tkn.line,
544            tkn.column,
545            op.name,
546        )
547    error_with_pop = has_error_with_pop(op)
548    error_without_pop = has_error_without_pop(op)
549    return Properties(
550        escapes=makes_escaping_api_call(op),
551        error_with_pop=error_with_pop,
552        error_without_pop=error_without_pop,
553        deopts=deopts_if,
554        side_exit=exits_if,
555        oparg=variable_used(op, "oparg"),
556        jumps=variable_used(op, "JUMPBY"),
557        eval_breaker=variable_used(op, "CHECK_EVAL_BREAKER"),
558        ends_with_eval_breaker=eval_breaker_at_end(op),
559        needs_this=variable_used(op, "this_instr"),
560        always_exits=always_exits(op),
561        stores_sp=variable_used(op, "SYNC_SP"),
562        uses_co_consts=variable_used(op, "FRAME_CO_CONSTS"),
563        uses_co_names=variable_used(op, "FRAME_CO_NAMES"),
564        uses_locals=(variable_used(op, "GETLOCAL") or variable_used(op, "SETLOCAL"))
565        and not has_free,
566        has_free=has_free,
567        pure="pure" in op.annotations,
568        tier=tier_variable(op),
569    )
570
571
572def make_uop(name: str, op: parser.InstDef, inputs: list[parser.InputEffect], uops: dict[str, Uop]) -> Uop:
573    result = Uop(
574        name=name,
575        context=op.context,
576        annotations=op.annotations,
577        stack=analyze_stack(op),
578        caches=analyze_caches(inputs),
579        body=op.block.tokens,
580        properties=compute_properties(op),
581    )
582    if effect_depends_on_oparg_1(op) and "split" in op.annotations:
583        result.properties.oparg_and_1 = True
584        for bit in ("0", "1"):
585            name_x = name + "_" + bit
586            properties = compute_properties(op)
587            if properties.oparg:
588                # May not need oparg anymore
589                properties.oparg = any(token.text == "oparg" for token in op.block.tokens)
590            rep = Uop(
591                name=name_x,
592                context=op.context,
593                annotations=op.annotations,
594                stack=analyze_stack(op, bit),
595                caches=analyze_caches(inputs),
596                body=op.block.tokens,
597                properties=properties,
598            )
599            rep.replicates = result
600            uops[name_x] = rep
601    for anno in op.annotations:
602        if anno.startswith("replicate"):
603            result.replicated = int(anno[10:-1])
604            break
605    else:
606        return result
607    for oparg in range(result.replicated):
608        name_x = name + "_" + str(oparg)
609        properties = compute_properties(op)
610        properties.oparg = False
611        properties.const_oparg = oparg
612        rep = Uop(
613            name=name_x,
614            context=op.context,
615            annotations=op.annotations,
616            stack=analyze_stack(op),
617            caches=analyze_caches(inputs),
618            body=op.block.tokens,
619            properties=properties,
620        )
621        rep.replicates = result
622        uops[name_x] = rep
623
624    return result
625
626
627def add_op(op: parser.InstDef, uops: dict[str, Uop]) -> None:
628    assert op.kind == "op"
629    if op.name in uops:
630        if "override" not in op.annotations:
631            raise override_error(
632                op.name, op.context, uops[op.name].context, op.tokens[0]
633            )
634    uops[op.name] = make_uop(op.name, op, op.inputs, uops)
635
636
637def add_instruction(
638    name: str, parts: list[Part], instructions: dict[str, Instruction]
639) -> None:
640    instructions[name] = Instruction(name, parts, None)
641
642
643def desugar_inst(
644    inst: parser.InstDef, instructions: dict[str, Instruction], uops: dict[str, Uop]
645) -> None:
646    assert inst.kind == "inst"
647    name = inst.name
648    op_inputs: list[parser.InputEffect] = []
649    parts: list[Part] = []
650    uop_index = -1
651    # Move unused cache entries to the Instruction, removing them from the Uop.
652    for input in inst.inputs:
653        if isinstance(input, parser.CacheEffect) and input.name == "unused":
654            parts.append(Skip(input.size))
655        else:
656            op_inputs.append(input)
657            if uop_index < 0:
658                uop_index = len(parts)
659                # Place holder for the uop.
660                parts.append(Skip(0))
661    uop = make_uop("_" + inst.name, inst, op_inputs, uops)
662    uop.implicitly_created = True
663    uops[inst.name] = uop
664    if uop_index < 0:
665        parts.append(uop)
666    else:
667        parts[uop_index] = uop
668    add_instruction(name, parts, instructions)
669
670
671def add_macro(
672    macro: parser.Macro, instructions: dict[str, Instruction], uops: dict[str, Uop]
673) -> None:
674    parts: list[Uop | Skip] = []
675    for part in macro.uops:
676        match part:
677            case parser.OpName():
678                if part.name not in uops:
679                    analysis_error(f"No Uop named {part.name}", macro.tokens[0])
680                parts.append(uops[part.name])
681            case parser.CacheEffect():
682                parts.append(Skip(part.size))
683            case _:
684                assert False
685    assert parts
686    add_instruction(macro.name, parts, instructions)
687
688
689def add_family(
690    pfamily: parser.Family,
691    instructions: dict[str, Instruction],
692    families: dict[str, Family],
693) -> None:
694    family = Family(
695        pfamily.name,
696        pfamily.size,
697        [instructions[member_name] for member_name in pfamily.members],
698    )
699    for member in family.members:
700        member.family = family
701    # The head of the family is an implicit jump target for DEOPTs
702    instructions[family.name].is_target = True
703    families[family.name] = family
704
705
706def add_pseudo(
707    pseudo: parser.Pseudo,
708    instructions: dict[str, Instruction],
709    pseudos: dict[str, PseudoInstruction],
710) -> None:
711    pseudos[pseudo.name] = PseudoInstruction(
712        pseudo.name,
713        [instructions[target] for target in pseudo.targets],
714        pseudo.flags,
715    )
716
717
718def assign_opcodes(
719    instructions: dict[str, Instruction],
720    families: dict[str, Family],
721    pseudos: dict[str, PseudoInstruction],
722) -> tuple[dict[str, int], int, int]:
723    """Assigns opcodes, then returns the opmap,
724    have_arg and min_instrumented values"""
725    instmap: dict[str, int] = {}
726
727    # 0 is reserved for cache entries. This helps debugging.
728    instmap["CACHE"] = 0
729
730    # 17 is reserved as it is the initial value for the specializing counter.
731    # This helps catch cases where we attempt to execute a cache.
732    instmap["RESERVED"] = 17
733
734    # 149 is RESUME - it is hard coded as such in Tools/build/deepfreeze.py
735    instmap["RESUME"] = 149
736
737    # This is an historical oddity.
738    instmap["BINARY_OP_INPLACE_ADD_UNICODE"] = 3
739
740    instmap["INSTRUMENTED_LINE"] = 254
741
742    instrumented = [name for name in instructions if name.startswith("INSTRUMENTED")]
743
744    # Special case: this instruction is implemented in ceval.c
745    # rather than bytecodes.c, so we need to add it explicitly
746    # here (at least until we add something to bytecodes.c to
747    # declare external instructions).
748    instrumented.append("INSTRUMENTED_LINE")
749
750    specialized: set[str] = set()
751    no_arg: list[str] = []
752    has_arg: list[str] = []
753
754    for family in families.values():
755        specialized.update(inst.name for inst in family.members)
756
757    for inst in instructions.values():
758        name = inst.name
759        if name in specialized:
760            continue
761        if name in instrumented:
762            continue
763        if inst.properties.oparg:
764            has_arg.append(name)
765        else:
766            no_arg.append(name)
767
768    # Specialized ops appear in their own section
769    # Instrumented opcodes are at the end of the valid range
770    min_internal = 150
771    min_instrumented = 254 - (len(instrumented) - 1)
772    assert min_internal + len(specialized) < min_instrumented
773
774    next_opcode = 1
775
776    def add_instruction(name: str) -> None:
777        nonlocal next_opcode
778        if name in instmap:
779            return  # Pre-defined name
780        while next_opcode in instmap.values():
781            next_opcode += 1
782        instmap[name] = next_opcode
783        next_opcode += 1
784
785    for name in sorted(no_arg):
786        add_instruction(name)
787    for name in sorted(has_arg):
788        add_instruction(name)
789    # For compatibility
790    next_opcode = min_internal
791    for name in sorted(specialized):
792        add_instruction(name)
793    next_opcode = min_instrumented
794    for name in instrumented:
795        add_instruction(name)
796
797    for name in instructions:
798        instructions[name].opcode = instmap[name]
799
800    for op, name in enumerate(sorted(pseudos), 256):
801        instmap[name] = op
802        pseudos[name].opcode = op
803
804    return instmap, len(no_arg), min_instrumented
805
806
807def analyze_forest(forest: list[parser.AstNode]) -> Analysis:
808    instructions: dict[str, Instruction] = {}
809    uops: dict[str, Uop] = {}
810    families: dict[str, Family] = {}
811    pseudos: dict[str, PseudoInstruction] = {}
812    for node in forest:
813        match node:
814            case parser.InstDef(name):
815                if node.kind == "inst":
816                    desugar_inst(node, instructions, uops)
817                else:
818                    assert node.kind == "op"
819                    add_op(node, uops)
820            case parser.Macro():
821                pass
822            case parser.Family():
823                pass
824            case parser.Pseudo():
825                pass
826            case _:
827                assert False
828    for node in forest:
829        if isinstance(node, parser.Macro):
830            add_macro(node, instructions, uops)
831    for node in forest:
832        match node:
833            case parser.Family():
834                add_family(node, instructions, families)
835            case parser.Pseudo():
836                add_pseudo(node, instructions, pseudos)
837            case _:
838                pass
839    for uop in uops.values():
840        tkn_iter = iter(uop.body)
841        for tkn in tkn_iter:
842            if tkn.kind == "IDENTIFIER" and tkn.text == "GO_TO_INSTRUCTION":
843                if next(tkn_iter).kind != "LPAREN":
844                    continue
845                target = next(tkn_iter)
846                if target.kind != "IDENTIFIER":
847                    continue
848                if target.text in instructions:
849                    instructions[target.text].is_target = True
850    # Special case BINARY_OP_INPLACE_ADD_UNICODE
851    # BINARY_OP_INPLACE_ADD_UNICODE is not a normal family member,
852    # as it is the wrong size, but we need it to maintain an
853    # historical optimization.
854    if "BINARY_OP_INPLACE_ADD_UNICODE" in instructions:
855        inst = instructions["BINARY_OP_INPLACE_ADD_UNICODE"]
856        inst.family = families["BINARY_OP"]
857        families["BINARY_OP"].members.append(inst)
858    opmap, first_arg, min_instrumented = assign_opcodes(instructions, families, pseudos)
859    return Analysis(
860        instructions, uops, families, pseudos, opmap, first_arg, min_instrumented
861    )
862
863
864def analyze_files(filenames: list[str]) -> Analysis:
865    return analyze_forest(parser.parse_files(filenames))
866
867
868def dump_analysis(analysis: Analysis) -> None:
869    print("Uops:")
870    for u in analysis.uops.values():
871        u.dump("    ")
872    print("Instructions:")
873    for i in analysis.instructions.values():
874        i.dump("    ")
875    print("Families:")
876    for f in analysis.families.values():
877        f.dump("    ")
878    print("Pseudos:")
879    for p in analysis.pseudos.values():
880        p.dump("    ")
881
882
883if __name__ == "__main__":
884    import sys
885
886    if len(sys.argv) < 2:
887        print("No input")
888    else:
889        filenames = sys.argv[1:]
890        dump_analysis(analyze_files(filenames))
891