1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (c) 2024 Huawei Device Co., Ltd. 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import math 19import subprocess # noqa: S404 20from collections.abc import AsyncIterator 21from contextlib import asynccontextmanager 22from dataclasses import dataclass, field 23from pathlib import Path 24from typing import List, Literal, Union 25 26import trio 27from pytest import fixture 28 29from .logs import ARK_ERR, ARK_OUT, logger 30from .runnable_module import RunnableModule 31 32LOG = logger(__name__) 33DEFAULT_ENTRY_POINT = "ETSGLOBAL::main" 34ComponentType = Literal[ 35 "all", 36 "alloc", 37 "mm-obj-events", 38 "classlinker", 39 "common", 40 "core", 41 "gc", 42 "gc_trigger", 43 "reference_processor", 44 "interpreter", 45 "compiler", 46 "llvm", 47 "pandafile", 48 "memorypool", 49 "runtime", 50 "trace", 51 "debugger", 52 "interop", 53 "verifier", 54 "compilation_queue", 55 "aot", 56 "events", 57 "ecmascript", 58 "scheduler", 59 "coroutines", 60 "task_manager", 61] 62 63 64@dataclass 65class Options: 66 """ 67 Options for ArkTS runtime execution. 68 """ 69 70 app_path: Path = Path("bin", "ark") 71 load_runtimes: Literal["core", "ecmascript", "ets"] = "ets" 72 interpreter_type: Literal["cpp", "irtoc", "llvm"] = "cpp" 73 boot_panda_files: List[Path] = field(default_factory=list) 74 debugger_library_path: Path = Path() 75 log_debug: List[ComponentType] = field(default_factory=lambda: ["debugger"]) 76 print_stdout: bool = False 77 78 79MessageMarker = Union[trio.Event] 80Message = Union[str, MessageMarker] 81SendChannel = trio.MemorySendChannel[Message] 82ReceiveChannel = trio.MemoryReceiveChannel[Message] 83 84 85def open_memory_channels(): 86 return trio.open_memory_channel[Message](math.inf) 87 88 89@dataclass 90class ProcessCapture: 91 stdout: List[str] = field(default_factory=list) 92 stderr: List[str] = field(default_factory=list) 93 94 95async def _capture_channel( 96 channel: ReceiveChannel, 97 array: List[str], 98) -> None: 99 async with channel: 100 async for data in channel: 101 if isinstance(data, str): 102 array.append(data) 103 elif isinstance(data, trio.Event): 104 data.set() 105 106 107class RuntimeProcess: 108 """ 109 The context of ArkTS runtime execution. 110 """ 111 112 def __init__( 113 self, 114 process: trio.Process, 115 nursery: trio.Nursery, 116 stdout_channel: ReceiveChannel, 117 stderr_channel: ReceiveChannel, 118 ) -> None: 119 self.stdout_parser: StreamParser | None = None 120 self.stderr_parser: StreamParser | None = None 121 self.process = process 122 self.nursery = nursery 123 self.capture = ProcessCapture() 124 125 nursery.start_soon( 126 _capture_channel, 127 stdout_channel.clone(), 128 self.capture.stdout, 129 name="Runtime STDOUT capture to list", 130 ) 131 nursery.start_soon( 132 _capture_channel, 133 stderr_channel.clone(), 134 self.capture.stderr, 135 name="Runtime STDERR capture to list", 136 ) 137 138 @property 139 def returncode(self) -> int | None: 140 return self.process.returncode 141 142 def start_parser(self, send_stdout: SendChannel, send_stderr: SendChannel): 143 if self.process.stdout: 144 self.stdout_parser = StreamParser(self.process.stdout, send_stdout.clone(), "STDOUT", ARK_OUT) 145 self.nursery.start_soon(self.stdout_parser.parse, name="Runtime STDOUT parser") 146 if self.process.stderr: 147 self.stderr_parser = StreamParser(self.process.stderr, send_stderr.clone(), "STDERR", ARK_ERR) 148 self.nursery.start_soon(self.stderr_parser.parse, name="Runtime STDERR parser") 149 150 def terminate(self) -> None: 151 self.process.terminate() 152 153 def kill(self) -> None: 154 self.process.kill() 155 156 async def wait(self) -> int: 157 return await self.process.wait() 158 159 def cancel(self) -> None: 160 self.nursery.cancel_scope.cancel() 161 162 async def close(self) -> int: 163 self.cancel() 164 result = self.process.poll() 165 if result is not None: 166 await trio.lowlevel.checkpoint() 167 return result 168 self.terminate() 169 return await self.wait() 170 171 async def sync_capture(self) -> None: 172 out = self.stdout_parser and self.stdout_parser.sync() 173 err = self.stderr_parser and self.stderr_parser.sync() 174 await trio.lowlevel.checkpoint() 175 if out: 176 await out.wait() 177 if err: 178 await err.wait() 179 180 181class StreamParser: 182 def __init__( 183 self, 184 stream: trio.abc.ReceiveStream, 185 channel: SendChannel, 186 name: str, 187 loglevel: int, 188 ) -> None: 189 self._sync_event = trio.Event() 190 self._sync_event.set() 191 self._stream = stream 192 self._channel = channel 193 self._name = name 194 self._loglevel = loglevel 195 196 def sync(self) -> trio.Event: 197 e = trio.Event() 198 self._sync_event = e 199 return e 200 201 async def parse(self) -> None: 202 try: 203 buffer = bytearray() 204 async with self._stream, self._channel: 205 await self._process_buffer(buffer) 206 except trio.Cancelled: 207 with trio.CancelScope(deadline=1, shield=True): 208 await self._process_buffer(buffer) 209 raise 210 finally: 211 if len(buffer) > 0: 212 msg = bytes(buffer) 213 self._send(msg) 214 self._sync_event.set() 215 216 def _send(self, data: bytes): 217 text = data.decode(errors="replace") 218 LOG.log(self._loglevel, text) 219 self._channel.send_nowait(text) 220 221 async def _process_buffer(self, buffer: bytearray): 222 async for data in self._stream: 223 LOG.debug("Read from '%s': %s", self._name, data) 224 if len(data) == 0: 225 continue 226 start = 0 227 while True: 228 end = data.find(b"\n", start) 229 if end == -1: 230 buffer += data[start:] 231 break 232 else: 233 msg = bytes(buffer + data[start:end]) 234 start = end + 1 235 buffer.clear() 236 self._send(msg) 237 self._sync_event.set() 238 239 240class Runtime: 241 """ 242 Controls the ArkTS runtime. 243 """ 244 245 def __init__(self, options: Options) -> None: 246 self.options = options 247 pass 248 249 @asynccontextmanager 250 async def run( 251 self, 252 nursery: trio.Nursery, 253 /, 254 module: RunnableModule, 255 entry_point: str = DEFAULT_ENTRY_POINT, 256 cwd: Path | None = None, 257 debug: bool = True, 258 ) -> AsyncIterator[RuntimeProcess]: 259 module.check_exists() 260 o = self.options 261 boot_panda_files = [str(f) for f in o.boot_panda_files + module.boot_abc] 262 command = [ 263 str(o.app_path), 264 f"--load-runtimes={o.load_runtimes}", 265 f"--interpreter-type={o.interpreter_type}", 266 f"--boot-panda-files={':'.join(boot_panda_files)}", 267 ] 268 if debug: 269 command.append("--debugger-break-on-start") 270 command.append(f"--debugger-library-path={str(o.debugger_library_path)}") 271 command.extend([f"--log-debug={c}" for c in o.log_debug]) 272 command.append(str(module.entry_abc)) 273 command.append(entry_point) 274 275 send_stdout, receive_stdout = open_memory_channels() 276 send_stderr, receive_stderr = open_memory_channels() 277 LOG.debug("Memory channels have been created.") 278 async with send_stdout, receive_stdout, send_stderr, receive_stderr: 279 process: trio.Process = await trio.lowlevel.open_process( 280 command=command, 281 stdout=subprocess.PIPE, 282 stderr=subprocess.PIPE, 283 cwd=cwd, 284 ) 285 LOG.info("Exec runtime %s", command) 286 run_proc = RuntimeProcess( 287 process=process, 288 nursery=nursery, 289 stdout_channel=receive_stdout, 290 stderr_channel=receive_stderr, 291 ) 292 run_proc.start_parser(send_stdout=send_stdout, send_stderr=send_stderr) 293 294 async with trio.open_nursery() as runtime_nursery: 295 runtime_nursery.start_soon(_runtime_wait_task, process) 296 yield run_proc 297 298 299async def _runtime_wait_task(process: trio.Process): 300 LOG.debug("Waiting for Runtime.") 301 try: 302 returncode = await process.wait() 303 LOG.debug("Runtime exit status %s.", returncode) 304 except BaseException: 305 process.terminate() 306 with trio.CancelScope(deadline=1, shield=True): 307 try: 308 await process.wait() 309 except BaseException: 310 process.kill() 311 raise 312 raise 313 314 315@fixture 316def ark_runtime( 317 ark_runtime_options: Options, 318) -> Runtime: 319 """ 320 Return a :class:`runtime.Runtime` instance that controls the ArkTS runtime. 321 """ 322 return Runtime(options=ark_runtime_options) 323 324 325@fixture 326@asynccontextmanager 327async def ark_runtime_run_data_files( 328 nursery: trio.Nursery, 329 ark_runtime: Runtime, 330 module: RunnableModule, 331) -> AsyncIterator[RuntimeProcess]: 332 async with ark_runtime.run(nursery, module=module) as process: 333 yield process 334