• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from __future__ import annotations
2import dataclasses as dc
3import io
4import os
5from typing import Final, TYPE_CHECKING
6
7import libclinic
8from libclinic import fail
9from libclinic.language import Language
10from libclinic.block_parser import Block
11if TYPE_CHECKING:
12    from libclinic.app import Clinic
13
14
15TemplateDict = dict[str, str]
16
17
18class CRenderData:
19    def __init__(self) -> None:
20
21        # The C statements to declare variables.
22        # Should be full lines with \n eol characters.
23        self.declarations: list[str] = []
24
25        # The C statements required to initialize the variables before the parse call.
26        # Should be full lines with \n eol characters.
27        self.initializers: list[str] = []
28
29        # The C statements needed to dynamically modify the values
30        # parsed by the parse call, before calling the impl.
31        self.modifications: list[str] = []
32
33        # The entries for the "keywords" array for PyArg_ParseTuple.
34        # Should be individual strings representing the names.
35        self.keywords: list[str] = []
36
37        # The "format units" for PyArg_ParseTuple.
38        # Should be individual strings that will get
39        self.format_units: list[str] = []
40
41        # The varargs arguments for PyArg_ParseTuple.
42        self.parse_arguments: list[str] = []
43
44        # The parameter declarations for the impl function.
45        self.impl_parameters: list[str] = []
46
47        # The arguments to the impl function at the time it's called.
48        self.impl_arguments: list[str] = []
49
50        # For return converters: the name of the variable that
51        # should receive the value returned by the impl.
52        self.return_value = "return_value"
53
54        # For return converters: the code to convert the return
55        # value from the parse function.  This is also where
56        # you should check the _return_value for errors, and
57        # "goto exit" if there are any.
58        self.return_conversion: list[str] = []
59        self.converter_retval = "_return_value"
60
61        # The C statements required to do some operations
62        # after the end of parsing but before cleaning up.
63        # These operations may be, for example, memory deallocations which
64        # can only be done without any error happening during argument parsing.
65        self.post_parsing: list[str] = []
66
67        # The C statements required to clean up after the impl call.
68        self.cleanup: list[str] = []
69
70        # The C statements to generate critical sections (per-object locking).
71        self.lock: list[str] = []
72        self.unlock: list[str] = []
73
74
75@dc.dataclass(slots=True, frozen=True)
76class Include:
77    """
78    An include like: #include "pycore_long.h"   // _Py_ID()
79    """
80    # Example: "pycore_long.h".
81    filename: str
82
83    # Example: "_Py_ID()".
84    reason: str
85
86    # None means unconditional include.
87    # Example: "#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)".
88    condition: str | None
89
90    def sort_key(self) -> tuple[str, str]:
91        # order: '#if' comes before 'NO_CONDITION'
92        return (self.condition or 'NO_CONDITION', self.filename)
93
94
95@dc.dataclass(slots=True)
96class BlockPrinter:
97    language: Language
98    f: io.StringIO = dc.field(default_factory=io.StringIO)
99
100    # '#include "header.h"   // reason': column of '//' comment
101    INCLUDE_COMMENT_COLUMN: Final[int] = 35
102
103    def print_block(
104        self,
105        block: Block,
106        *,
107        header_includes: list[Include] | None = None,
108    ) -> None:
109        input = block.input
110        output = block.output
111        dsl_name = block.dsl_name
112        write = self.f.write
113
114        assert not ((dsl_name is None) ^ (output is None)), "you must specify dsl_name and output together, dsl_name " + repr(dsl_name)
115
116        if not dsl_name:
117            write(input)
118            return
119
120        write(self.language.start_line.format(dsl_name=dsl_name))
121        write("\n")
122
123        body_prefix = self.language.body_prefix.format(dsl_name=dsl_name)
124        if not body_prefix:
125            write(input)
126        else:
127            for line in input.split('\n'):
128                write(body_prefix)
129                write(line)
130                write("\n")
131
132        write(self.language.stop_line.format(dsl_name=dsl_name))
133        write("\n")
134
135        output = ''
136        if header_includes:
137            # Emit optional "#include" directives for C headers
138            output += '\n'
139
140            current_condition: str | None = None
141            for include in header_includes:
142                if include.condition != current_condition:
143                    if current_condition:
144                        output += '#endif\n'
145                    current_condition = include.condition
146                    if include.condition:
147                        output += f'{include.condition}\n'
148
149                if current_condition:
150                    line = f'#  include "{include.filename}"'
151                else:
152                    line = f'#include "{include.filename}"'
153                if include.reason:
154                    comment = f'// {include.reason}\n'
155                    line = line.ljust(self.INCLUDE_COMMENT_COLUMN - 1) + comment
156                output += line
157
158            if current_condition:
159                output += '#endif\n'
160
161        input = ''.join(block.input)
162        output += ''.join(block.output)
163        if output:
164            if not output.endswith('\n'):
165                output += '\n'
166            write(output)
167
168        arguments = "output={output} input={input}".format(
169            output=libclinic.compute_checksum(output, 16),
170            input=libclinic.compute_checksum(input, 16)
171        )
172        write(self.language.checksum_line.format(dsl_name=dsl_name, arguments=arguments))
173        write("\n")
174
175    def write(self, text: str) -> None:
176        self.f.write(text)
177
178
179class BufferSeries:
180    """
181    Behaves like a "defaultlist".
182    When you ask for an index that doesn't exist yet,
183    the object grows the list until that item exists.
184    So o[n] will always work.
185
186    Supports negative indices for actual items.
187    e.g. o[-1] is an element immediately preceding o[0].
188    """
189
190    def __init__(self) -> None:
191        self._start = 0
192        self._array: list[list[str]] = []
193
194    def __getitem__(self, i: int) -> list[str]:
195        i -= self._start
196        if i < 0:
197            self._start += i
198            prefix: list[list[str]] = [[] for x in range(-i)]
199            self._array = prefix + self._array
200            i = 0
201        while i >= len(self._array):
202            self._array.append([])
203        return self._array[i]
204
205    def clear(self) -> None:
206        for ta in self._array:
207            ta.clear()
208
209    def dump(self) -> str:
210        texts = ["".join(ta) for ta in self._array]
211        self.clear()
212        return "".join(texts)
213
214
215@dc.dataclass(slots=True, repr=False)
216class Destination:
217    name: str
218    type: str
219    clinic: Clinic
220    buffers: BufferSeries = dc.field(init=False, default_factory=BufferSeries)
221    filename: str = dc.field(init=False)  # set in __post_init__
222
223    args: dc.InitVar[tuple[str, ...]] = ()
224
225    def __post_init__(self, args: tuple[str, ...]) -> None:
226        valid_types = ('buffer', 'file', 'suppress')
227        if self.type not in valid_types:
228            fail(
229                f"Invalid destination type {self.type!r} for {self.name}, "
230                f"must be {', '.join(valid_types)}"
231            )
232        extra_arguments = 1 if self.type == "file" else 0
233        if len(args) < extra_arguments:
234            fail(f"Not enough arguments for destination "
235                 f"{self.name!r} new {self.type!r}")
236        if len(args) > extra_arguments:
237            fail(f"Too many arguments for destination {self.name!r} new {self.type!r}")
238        if self.type =='file':
239            d = {}
240            filename = self.clinic.filename
241            d['path'] = filename
242            dirname, basename = os.path.split(filename)
243            if not dirname:
244                dirname = '.'
245            d['dirname'] = dirname
246            d['basename'] = basename
247            d['basename_root'], d['basename_extension'] = os.path.splitext(filename)
248            self.filename = args[0].format_map(d)
249
250    def __repr__(self) -> str:
251        if self.type == 'file':
252            type_repr = f"type='file' file={self.filename!r}"
253        else:
254            type_repr = f"type={self.type!r}"
255        return f"<clinic.Destination {self.name!r} {type_repr}>"
256
257    def clear(self) -> None:
258        if self.type != 'buffer':
259            fail(f"Can't clear destination {self.name!r}: it's not of type 'buffer'")
260        self.buffers.clear()
261
262    def dump(self) -> str:
263        return self.buffers.dump()
264
265
266DestinationDict = dict[str, Destination]
267
268
269class CodeGen:
270    def __init__(self, limited_capi: bool) -> None:
271        self.limited_capi = limited_capi
272        self._ifndef_symbols: set[str] = set()
273        # dict: include name => Include instance
274        self._includes: dict[str, Include] = {}
275
276    def add_ifndef_symbol(self, name: str) -> bool:
277        if name in self._ifndef_symbols:
278            return False
279        self._ifndef_symbols.add(name)
280        return True
281
282    def add_include(self, name: str, reason: str,
283                    *, condition: str | None = None) -> None:
284        try:
285            existing = self._includes[name]
286        except KeyError:
287            pass
288        else:
289            if existing.condition and not condition:
290                # If the previous include has a condition and the new one is
291                # unconditional, override the include.
292                pass
293            else:
294                # Already included, do nothing. Only mention a single reason,
295                # no need to list all of them.
296                return
297
298        self._includes[name] = Include(name, reason, condition)
299
300    def get_includes(self) -> list[Include]:
301        return sorted(self._includes.values(),
302                      key=Include.sort_key)
303