• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (c) 2024 Huawei Device Co., Ltd.
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import base64
19from collections import OrderedDict, UserList
20from dataclasses import dataclass, field
21from pathlib import Path
22from subprocess import CalledProcessError
23from typing import Any, Dict, Generic, List, Literal, TypeVar, Union
24
25import trio
26from cdp import debugger, runtime
27
28from arkdb.compiler import CompileError, EvaluateCompileExpressionArgs, StringCodeCompiler
29from arkdb.compiler_verification.expression_verifier import ExpressionVerifier
30from arkdb.debug_client import DebuggerClient
31from arkdb.logs import logger
32from arkdb.mirrors import mirror_array, mirror_object, mirror_primitive, mirror_undefined
33from arkdb.source_meta import Breakpoint, SourceMeta
34
35LOG = logger(__name__)
36
37DEFAULT_DEPTH = 3
38
39
40class Locator:
41    def __init__(self, client: DebuggerClient, meta: SourceMeta | None = None) -> None:
42        self.client = client
43        self.cache: Dict[runtime.RemoteObjectId, PropertyDescriptorCacheEntry] = {}
44        self.meta = meta
45
46    async def _get_properties(self, id: runtime.RemoteObjectId) -> List[runtime.PropertyDescriptor]:
47        result, _, _, _ = await self.client.get_properties(id, generate_preview=False)
48        return result
49
50    async def properties(self, id: runtime.RemoteObjectId) -> "RemoteObjectProperties":
51        if (entry := self.cache.get(id, None)) is not None:
52            await trio.lowlevel.checkpoint()
53            entry.seen.append("other")
54            return entry.properties
55
56        props = await self._get_properties(id)
57        new_entry = PropertyDescriptorCacheEntry(
58            id=id,
59            seen=["first"],
60            properties=RemoteObjectProperties([PropertyDescriptor(locator=self, data=p) for p in props]),
61        )
62        self.cache[id] = new_entry
63        return new_entry.properties
64
65    def remote_object(self, data: runtime.RemoteObject) -> "RemoteObject":
66        return RemoteObject(locator=self, data=data)
67
68
69T = TypeVar("T")
70
71
72class Wrap(Generic[T]):
73    def __init__(self, locator: Locator, data: T) -> None:
74        self.locator = locator
75        self.data = data
76
77    @property
78    def client(self):
79        return self.locator.client
80
81    def __rich_repr__(self):
82        yield self.data
83
84
85class RemoteObjectProperties(UserList["PropertyDescriptor"]):
86    def __init__(self, initlist=None):
87        super().__init__(initlist)
88
89    async def mirror_values(self, depth: int):
90        await trio.lowlevel.checkpoint()
91        if depth:
92            return {p.name: await p.mirror_value(depth=depth - 1) for p in self.data}
93        return EllipsisTerminator()
94
95
96@dataclass
97class PropertyDescriptorCacheEntry:
98    id: runtime.RemoteObjectId
99    properties: RemoteObjectProperties
100    seen: List[str] = field(default_factory=list)
101
102
103class PropertyDescriptor(Wrap[runtime.PropertyDescriptor]):
104
105    def __init__(self, locator: Locator, data: runtime.PropertyDescriptor) -> None:
106        super().__init__(locator, data)
107
108    @property
109    def name(self) -> str:
110        return self.data.name
111
112    def remote_object(self):
113        return self.locator.remote_object(self.data.value) if self.data.value is not None else None
114
115    async def mirror_value(self, *, depth: int) -> Any:
116        obj = self.remote_object()
117        if obj:
118            return await obj.mirror_value(depth=depth)
119        await trio.lowlevel.checkpoint()
120        return None
121
122
123class EllipsisTerminator:
124    def __repr__(self):
125        return "<...>"
126
127
128class RemoteObject(Wrap[runtime.RemoteObject]):
129    async def properties(self) -> RemoteObjectProperties:
130        if self.data.object_id is None:
131            await trio.lowlevel.checkpoint()
132            return RemoteObjectProperties()
133        return await self.locator.properties(self.data.object_id)
134
135    async def mirror_value(self, *, depth: int = DEFAULT_DEPTH) -> Any:
136        obj = self.data
137        match obj.type_:
138            case "object":
139                return await self._mirror_object(obj, depth=depth)
140            case "undefined":
141                await trio.lowlevel.checkpoint()
142                return mirror_undefined()
143            case _:
144                await trio.lowlevel.checkpoint()
145                return mirror_primitive(obj.type_, obj.value)
146
147    async def _mirror_object(self, obj: runtime.RemoteObject, *, depth: int = DEFAULT_DEPTH) -> Any:
148        match obj.subtype:
149            case "null":
150                await trio.lowlevel.checkpoint()
151                return None
152            case None:
153                if obj.class_name is None:
154                    raise RuntimeError(f"class_name is None {obj!r}")
155                props = await (await self.properties()).mirror_values(depth=depth)
156                if isinstance(props, EllipsisTerminator):
157                    return props
158                return mirror_object(obj.class_name, **props)
159            case "array":
160                if obj.class_name is None:
161                    raise RuntimeError(f"class_name is None {obj!r}")
162                props = await (await self.properties()).mirror_values(depth=depth)
163                if isinstance(props, EllipsisTerminator):
164                    return [props]
165                return mirror_array(obj.class_name, props.values())
166            case subtype:
167                raise RuntimeError(f"Unexpected subtype '{subtype}'")
168
169
170class Scope(Wrap[debugger.Scope]):
171    async def mirror_variables(self, *, depth: int = DEFAULT_DEPTH) -> OrderedDict[str, Any]:
172        return OrderedDict([(p.name, await p.mirror_value(depth=depth)) for p in await self.object_().properties()])
173
174    def object_(self) -> RemoteObject:
175        return self.locator.remote_object(self.data.object_)
176
177
178class Frame(Wrap[debugger.CallFrame]):
179
180    def scope(self, type: Literal["local", "global"] = "local") -> Scope:
181        return Scope(
182            locator=self.locator,
183            data=next(filter(lambda s: s.type_ == type, self.data.scope_chain)),
184        )
185
186    def scopes(self):
187        for s in self.data.scope_chain:
188            yield Scope(locator=self.locator, data=s)
189
190    def this(self):
191        return self.locator.remote_object(self.data.this) if self.data.this is not None else None
192
193
194def compile_expression(
195    code_compiler: StringCodeCompiler,
196    eval_args: EvaluateCompileExpressionArgs,
197    verifier: ExpressionVerifier | None = None,
198    allow_compiler_failure: bool = False,
199):
200    try:
201        compiled_expression = code_compiler.compile_expression(eval_args)
202    except CalledProcessError as e:
203        if allow_compiler_failure:
204            raise CompileError(e.output) from e
205    if verifier is not None:
206        verifier(compiled_expression)
207    return base64.b64encode(compiled_expression.panda_file.read_bytes()).decode("utf-8")
208
209
210class Paused(Wrap[debugger.Paused]):
211    def frame(self, id: Union[int, debugger.CallFrameId] = 0) -> Frame:
212        frame_id = id if isinstance(id, debugger.CallFrameId) else debugger.CallFrameId(str(id))
213        return Frame(
214            locator=self.locator,
215            data=next(filter(lambda f: f.call_frame_id == frame_id, self.data.call_frames)),
216        )
217
218    def frames(self):
219        for f in self.data.call_frames:
220            yield Frame(locator=self.locator, data=f)
221
222    def local_scope(self) -> Scope:
223        return self.frame().scope()
224
225    def hit_breakpoints(self) -> List[Breakpoint]:
226        if self.data.hit_breakpoints is None:
227            return []
228        if self.locator.meta is None:
229            return []
230        return [
231            b for hit in self.data.hit_breakpoints for b in self.locator.meta.get_breakpoint(debugger.BreakpointId(hit))
232        ]
233
234    def hit_breakpoint_labels(self):
235        return set([b.label for b in self.hit_breakpoints() if b.label])
236
237    async def resume_and_wait_for_paused(self):
238        return await self.client.resume_and_wait_for_paused()
239
240    async def evaluate(
241        self,
242        expression: str | Path,
243        abc_files: list[Path],
244        verifier: ExpressionVerifier | None = None,
245        allow_compiler_failure: bool = False,
246    ):
247        assert len(self.data.call_frames) > 0
248        paused_file = Path(self.data.call_frames[0].url)
249        # Must be 1-based.
250        paused_code_line = self.data.call_frames[0].location.line_number + 1
251
252        compiled_expression_bytecode = compile_expression(
253            self.client.code_compiler,
254            EvaluateCompileExpressionArgs(
255                ets_expression=expression,
256                eval_panda_files=abc_files,
257                eval_source=paused_file,
258                eval_line=paused_code_line,
259                eval_log_level="debug",
260                ast_parser=verifier.ast_parser if verifier else None,
261            ),
262            verifier=verifier,
263            allow_compiler_failure=allow_compiler_failure,
264        )
265
266        return await self.client.evaluate(compiled_expression_bytecode)
267
268
269def paused_locator(paused: debugger.Paused, client: DebuggerClient, meta: SourceMeta | None = None) -> Paused:
270    return Paused(locator=Locator(client=client, meta=meta), data=paused)
271