1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (c) 2024-2025 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import math 19import subprocess # noqa: S404 20from collections.abc import AsyncIterator 21from contextlib import asynccontextmanager 22from dataclasses import dataclass, field 23from pathlib import Path 24from typing import List, Literal, Union 25 26import trio 27from pytest import fixture 28 29from .logs import ARK_ERR, ARK_OUT, logger 30from .runnable_module import RunnableModule 31 32LOG = logger(__name__) 33DEFAULT_ENTRY_POINT = "ETSGLOBAL::main" 34ComponentType = Literal[ 35 "all", 36 "alloc", 37 "mm-obj-events", 38 "classlinker", 39 "common", 40 "core", 41 "gc", 42 "gc_trigger", 43 "reference_processor", 44 "interpreter", 45 "compiler", 46 "llvm", 47 "pandafile", 48 "memorypool", 49 "runtime", 50 "trace", 51 "debugger", 52 "interop", 53 "verifier", 54 "compilation_queue", 55 "aot", 56 "events", 57 "ecmascript", 58 "scheduler", 59 "coroutines", 60 "task_manager", 61] 62 63 64@dataclass 65class Options: 66 """ 67 Options for ArkTS runtime execution. 68 """ 69 70 app_path: Path = Path("bin", "ark") 71 load_runtimes: Literal["core", "ecmascript", "ets"] = "ets" 72 interpreter_type: Literal["cpp", "irtoc", "llvm"] = "cpp" 73 boot_panda_files: List[Path] = field(default_factory=list) 74 debugger_library_path: Path = Path() 75 log_debug: List[ComponentType] = field(default_factory=lambda: ["debugger"]) 76 print_stdout: bool = False 77 78 79MessageMarker = Union[trio.Event] 80Message = Union[str, MessageMarker] 81SendChannel = trio.MemorySendChannel[Message] 82ReceiveChannel = trio.MemoryReceiveChannel[Message] 83 84 85def open_memory_channels(): 86 return trio.open_memory_channel[Message](math.inf) 87 88 89@dataclass 90class ProcessCapture: 91 stdout: List[str] = field(default_factory=list) 92 stderr: List[str] = field(default_factory=list) 93 94 95async def _capture_channel( 96 channel: ReceiveChannel, 97 array: List[str], 98) -> None: 99 async with channel: 100 async for data in channel: 101 if isinstance(data, str): 102 array.append(data) 103 elif isinstance(data, trio.Event): 104 data.set() 105 106 107class RuntimeProcess: 108 """ 109 The context of ArkTS runtime execution. 110 """ 111 112 def __init__( 113 self, 114 process: trio.Process, 115 nursery: trio.Nursery, 116 stdout_channel: ReceiveChannel, 117 stderr_channel: ReceiveChannel, 118 ) -> None: 119 self.stdout_parser: StreamParser | None = None 120 self.stderr_parser: StreamParser | None = None 121 self.process = process 122 self.nursery = nursery 123 self.capture = ProcessCapture() 124 125 nursery.start_soon( 126 _capture_channel, 127 stdout_channel.clone(), 128 self.capture.stdout, 129 name="Runtime STDOUT capture to list", 130 ) 131 nursery.start_soon( 132 _capture_channel, 133 stderr_channel.clone(), 134 self.capture.stderr, 135 name="Runtime STDERR capture to list", 136 ) 137 138 @property 139 def returncode(self) -> int | None: 140 return self.process.returncode 141 142 def start_parser(self, send_stdout: SendChannel, send_stderr: SendChannel): 143 if self.process.stdout: 144 self.stdout_parser = StreamParser(self.process.stdout, send_stdout.clone(), "STDOUT", ARK_OUT) 145 self.nursery.start_soon(self.stdout_parser.parse, name="Runtime STDOUT parser") 146 if self.process.stderr: 147 self.stderr_parser = StreamParser(self.process.stderr, send_stderr.clone(), "STDERR", ARK_ERR) 148 self.nursery.start_soon(self.stderr_parser.parse, name="Runtime STDERR parser") 149 150 def terminate(self) -> None: 151 self.process.terminate() 152 153 def kill(self) -> None: 154 self.process.kill() 155 156 async def wait(self) -> int: 157 return await self.process.wait() 158 159 def cancel(self) -> None: 160 self.nursery.cancel_scope.cancel() 161 162 async def close(self) -> int: 163 self.cancel() 164 result = self.process.poll() 165 if result is not None: 166 await trio.lowlevel.checkpoint() 167 return result 168 self.terminate() 169 return await self.wait() 170 171 async def sync_capture(self) -> None: 172 out = self.stdout_parser and self.stdout_parser.sync() 173 err = self.stderr_parser and self.stderr_parser.sync() 174 await trio.lowlevel.checkpoint() 175 if out: 176 await out.wait() 177 if err: 178 await err.wait() 179 180 181class StreamParser: 182 def __init__( 183 self, 184 stream: trio.abc.ReceiveStream, 185 channel: SendChannel, 186 name: str, 187 loglevel: int, 188 ) -> None: 189 self._sync_event = trio.Event() 190 self._sync_event.set() 191 self._stream = stream 192 self._channel = channel 193 self._name = name 194 self._loglevel = loglevel 195 196 def sync(self) -> trio.Event: 197 e = trio.Event() 198 self._sync_event = e 199 return e 200 201 async def parse(self) -> None: 202 try: 203 buffer = bytearray() 204 async with self._stream, self._channel: 205 await self._process_buffer(buffer) 206 except trio.Cancelled: 207 with trio.CancelScope(deadline=1, shield=True): 208 await self._process_buffer(buffer) 209 raise 210 finally: 211 if len(buffer) > 0: 212 msg = bytes(buffer) 213 self._send(msg) 214 self._sync_event.set() 215 216 def _send(self, data: bytes): 217 text = data.decode(errors="replace") 218 LOG.log(self._loglevel, text) 219 self._channel.send_nowait(text) 220 221 async def _process_buffer(self, buffer: bytearray): 222 async for data in self._stream: 223 LOG.debug("Read from '%s': %s", self._name, data) 224 if len(data) == 0: 225 continue 226 start = 0 227 while True: 228 end = data.find(b"\n", start) 229 if end == -1: 230 buffer += data[start:] 231 break 232 else: 233 msg = bytes(buffer + data[start:end]) 234 start = end + 1 235 buffer.clear() 236 self._send(msg) 237 self._sync_event.set() 238 239 240class Runtime: 241 """ 242 Controls the ArkTS runtime. 243 """ 244 245 def __init__(self, options: Options) -> None: 246 self.options = options 247 pass 248 249 @asynccontextmanager 250 async def run( 251 self, 252 nursery: trio.Nursery, 253 /, 254 module: RunnableModule, 255 entry_point: str = DEFAULT_ENTRY_POINT, 256 cwd: Path | None = None, 257 debug: bool = True, 258 profile: bool = False, 259 additional_options: Options | None = None, 260 ) -> AsyncIterator[RuntimeProcess]: 261 module.check_exists() 262 o = self.options 263 ao = additional_options 264 boot_panda_files = [str(f) for f in o.boot_panda_files + module.boot_abc] 265 if ao is not None: 266 boot_panda_files += [str(f) for f in ao.boot_panda_files] 267 268 command = [ 269 str(o.app_path), 270 f"--load-runtimes={o.load_runtimes}", 271 f"--interpreter-type={o.interpreter_type}", 272 f"--boot-panda-files={':'.join(boot_panda_files)}", 273 "--load-in-boot", 274 ] 275 debugger_library_option: str = f"--debugger-library-path={str(o.debugger_library_path)}" 276 if profile: 277 command.append("--sampling-profiler-create") 278 command.append("--workers-type=threadpool") 279 command.append(debugger_library_option) 280 elif debug: 281 command.append("--debugger-break-on-start") 282 command.append(debugger_library_option) 283 command.extend([f"--log-debug={c}" for c in o.log_debug]) 284 command.append(str(module.entry_abc)) 285 command.append(f"{module.entry_abc.stem}.{entry_point}") 286 287 send_stdout, receive_stdout = open_memory_channels() 288 send_stderr, receive_stderr = open_memory_channels() 289 LOG.debug("Memory channels have been created.") 290 async with send_stdout, receive_stdout, send_stderr, receive_stderr: 291 process: trio.Process = await trio.lowlevel.open_process( 292 command=command, 293 stdout=subprocess.PIPE, 294 stderr=subprocess.PIPE, 295 cwd=cwd, 296 ) 297 LOG.info("Exec runtime %s", command) 298 run_proc = RuntimeProcess( 299 process=process, 300 nursery=nursery, 301 stdout_channel=receive_stdout, 302 stderr_channel=receive_stderr, 303 ) 304 run_proc.start_parser(send_stdout=send_stdout, send_stderr=send_stderr) 305 306 async with trio.open_nursery() as runtime_nursery: 307 runtime_nursery.start_soon(_runtime_wait_task, process) 308 yield run_proc 309 310 311async def _runtime_wait_task(process: trio.Process): 312 LOG.debug("Waiting for Runtime.") 313 try: 314 returncode = await process.wait() 315 LOG.debug("Runtime exit status %s.", returncode) 316 except BaseException: 317 process.terminate() 318 with trio.CancelScope(deadline=1, shield=True): 319 try: 320 await process.wait() 321 except BaseException: 322 process.kill() 323 raise 324 raise 325 326 327@fixture 328def ark_runtime( 329 ark_runtime_options: Options, 330) -> Runtime: 331 """ 332 Return a :class:`runtime.Runtime` instance that controls the ArkTS runtime. 333 """ 334 return Runtime(options=ark_runtime_options) 335 336 337@fixture 338@asynccontextmanager 339async def ark_runtime_run_data_files( 340 nursery: trio.Nursery, 341 ark_runtime: Runtime, 342 module: RunnableModule, 343) -> AsyncIterator[RuntimeProcess]: 344 async with ark_runtime.run(nursery, module=module) as process: 345 yield process 346