1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (c) 2024 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import base64 19from collections import OrderedDict, UserList 20from dataclasses import dataclass, field 21from pathlib import Path 22from subprocess import CalledProcessError 23from typing import Any, Dict, Generic, List, Literal, TypeVar, Union 24 25import trio 26from cdp import debugger, runtime 27 28from arkdb.compiler import CompileError, EvaluateCompileExpressionArgs, StringCodeCompiler 29from arkdb.compiler_verification.expression_verifier import ExpressionVerifier 30from arkdb.debug_client import DebuggerClient 31from arkdb.logs import logger 32from arkdb.mirrors import mirror_array, mirror_object, mirror_primitive, mirror_undefined 33from arkdb.source_meta import Breakpoint, SourceMeta 34 35LOG = logger(__name__) 36 37DEFAULT_DEPTH = 3 38 39 40class Locator: 41 def __init__(self, client: DebuggerClient, meta: SourceMeta | None = None) -> None: 42 self.client = client 43 self.cache: Dict[runtime.RemoteObjectId, PropertyDescriptorCacheEntry] = {} 44 self.meta = meta 45 46 async def _get_properties(self, id: runtime.RemoteObjectId) -> List[runtime.PropertyDescriptor]: 47 result, _, _, _ = await self.client.get_properties(id, generate_preview=False) 48 return result 49 50 async def properties(self, id: runtime.RemoteObjectId) -> "RemoteObjectProperties": 51 if (entry := self.cache.get(id, None)) is not None: 52 await trio.lowlevel.checkpoint() 53 entry.seen.append("other") 54 return entry.properties 55 56 props = await self._get_properties(id) 57 new_entry = PropertyDescriptorCacheEntry( 58 id=id, 59 seen=["first"], 60 properties=RemoteObjectProperties([PropertyDescriptor(locator=self, data=p) for p in props]), 61 ) 62 self.cache[id] = new_entry 63 return new_entry.properties 64 65 def remote_object(self, data: runtime.RemoteObject) -> "RemoteObject": 66 return RemoteObject(locator=self, data=data) 67 68 69T = TypeVar("T") 70 71 72class Wrap(Generic[T]): 73 def __init__(self, locator: Locator, data: T) -> None: 74 self.locator = locator 75 self.data = data 76 77 @property 78 def client(self): 79 return self.locator.client 80 81 def __rich_repr__(self): 82 yield self.data 83 84 85class RemoteObjectProperties(UserList["PropertyDescriptor"]): 86 def __init__(self, initlist=None): 87 super().__init__(initlist) 88 89 async def mirror_values(self, depth: int): 90 await trio.lowlevel.checkpoint() 91 if depth: 92 return {p.name: await p.mirror_value(depth=depth - 1) for p in self.data} 93 return EllipsisTerminator() 94 95 96@dataclass 97class PropertyDescriptorCacheEntry: 98 id: runtime.RemoteObjectId 99 properties: RemoteObjectProperties 100 seen: List[str] = field(default_factory=list) 101 102 103class PropertyDescriptor(Wrap[runtime.PropertyDescriptor]): 104 105 def __init__(self, locator: Locator, data: runtime.PropertyDescriptor) -> None: 106 super().__init__(locator, data) 107 108 @property 109 def name(self) -> str: 110 return self.data.name 111 112 def remote_object(self): 113 return self.locator.remote_object(self.data.value) if self.data.value is not None else None 114 115 async def mirror_value(self, *, depth: int) -> Any: 116 obj = self.remote_object() 117 if obj: 118 return await obj.mirror_value(depth=depth) 119 await trio.lowlevel.checkpoint() 120 return None 121 122 123class EllipsisTerminator: 124 def __repr__(self): 125 return "<...>" 126 127 128class RemoteObject(Wrap[runtime.RemoteObject]): 129 async def properties(self) -> RemoteObjectProperties: 130 if self.data.object_id is None: 131 await trio.lowlevel.checkpoint() 132 return RemoteObjectProperties() 133 return await self.locator.properties(self.data.object_id) 134 135 async def mirror_value(self, *, depth: int = DEFAULT_DEPTH) -> Any: 136 obj = self.data 137 match obj.type_: 138 case "object": 139 return await self._mirror_object(obj, depth=depth) 140 case "undefined": 141 await trio.lowlevel.checkpoint() 142 return mirror_undefined() 143 case _: 144 await trio.lowlevel.checkpoint() 145 return mirror_primitive(obj.type_, obj.value) 146 147 async def _mirror_object(self, obj: runtime.RemoteObject, *, depth: int = DEFAULT_DEPTH) -> Any: 148 match obj.subtype: 149 case "null": 150 await trio.lowlevel.checkpoint() 151 return None 152 case None: 153 if obj.class_name is None: 154 raise RuntimeError(f"class_name is None {obj!r}") 155 props = await (await self.properties()).mirror_values(depth=depth) 156 if isinstance(props, EllipsisTerminator): 157 return props 158 return mirror_object(obj.class_name, **props) 159 case "array": 160 if obj.class_name is None: 161 raise RuntimeError(f"class_name is None {obj!r}") 162 props = await (await self.properties()).mirror_values(depth=depth) 163 if isinstance(props, EllipsisTerminator): 164 return [props] 165 return mirror_array(obj.class_name, props.values()) 166 case subtype: 167 raise RuntimeError(f"Unexpected subtype '{subtype}'") 168 169 170class Scope(Wrap[debugger.Scope]): 171 async def mirror_variables(self, *, depth: int = DEFAULT_DEPTH) -> OrderedDict[str, Any]: 172 return OrderedDict([(p.name, await p.mirror_value(depth=depth)) for p in await self.object_().properties()]) 173 174 def object_(self) -> RemoteObject: 175 return self.locator.remote_object(self.data.object_) 176 177 178class Frame(Wrap[debugger.CallFrame]): 179 180 def scope(self, type: Literal["local", "global"] = "local") -> Scope: 181 return Scope( 182 locator=self.locator, 183 data=next(filter(lambda s: s.type_ == type, self.data.scope_chain)), 184 ) 185 186 def scopes(self): 187 for s in self.data.scope_chain: 188 yield Scope(locator=self.locator, data=s) 189 190 def this(self): 191 return self.locator.remote_object(self.data.this) if self.data.this is not None else None 192 193 194def compile_expression( 195 code_compiler: StringCodeCompiler, 196 eval_args: EvaluateCompileExpressionArgs, 197 verifier: ExpressionVerifier | None = None, 198 allow_compiler_failure: bool = False, 199): 200 try: 201 compiled_expression = code_compiler.compile_expression(eval_args) 202 except CalledProcessError as e: 203 if allow_compiler_failure: 204 raise CompileError(e.output) from e 205 if verifier is not None: 206 verifier(compiled_expression) 207 return base64.b64encode(compiled_expression.panda_file.read_bytes()).decode("utf-8") 208 209 210class Paused(Wrap[debugger.Paused]): 211 def frame(self, id: Union[int, debugger.CallFrameId] = 0) -> Frame: 212 frame_id = id if isinstance(id, debugger.CallFrameId) else debugger.CallFrameId(str(id)) 213 return Frame( 214 locator=self.locator, 215 data=next(filter(lambda f: f.call_frame_id == frame_id, self.data.call_frames)), 216 ) 217 218 def frames(self): 219 for f in self.data.call_frames: 220 yield Frame(locator=self.locator, data=f) 221 222 def local_scope(self) -> Scope: 223 return self.frame().scope() 224 225 def hit_breakpoints(self) -> List[Breakpoint]: 226 if self.data.hit_breakpoints is None: 227 return [] 228 if self.locator.meta is None: 229 return [] 230 return [ 231 b for hit in self.data.hit_breakpoints for b in self.locator.meta.get_breakpoint(debugger.BreakpointId(hit)) 232 ] 233 234 def hit_breakpoint_labels(self): 235 return set([b.label for b in self.hit_breakpoints() if b.label]) 236 237 async def resume_and_wait_for_paused(self): 238 return await self.client.resume_and_wait_for_paused() 239 240 async def evaluate( 241 self, 242 expression: str | Path, 243 abc_files: list[Path], 244 verifier: ExpressionVerifier | None = None, 245 allow_compiler_failure: bool = False, 246 ): 247 assert len(self.data.call_frames) > 0 248 paused_file = Path(self.data.call_frames[0].url) 249 # Must be 1-based. 250 paused_code_line = self.data.call_frames[0].location.line_number + 1 251 252 compiled_expression_bytecode = compile_expression( 253 self.client.code_compiler, 254 EvaluateCompileExpressionArgs( 255 ets_expression=expression, 256 eval_panda_files=abc_files, 257 eval_source=paused_file, 258 eval_line=paused_code_line, 259 eval_log_level="debug", 260 ast_parser=verifier.ast_parser if verifier else None, 261 ), 262 verifier=verifier, 263 allow_compiler_failure=allow_compiler_failure, 264 ) 265 266 return await self.client.evaluate(compiled_expression_bytecode) 267 268 269def paused_locator(paused: debugger.Paused, client: DebuggerClient, meta: SourceMeta | None = None) -> Paused: 270 return Paused(locator=Locator(client=client, meta=meta), data=paused) 271