1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Utilities for using HDLC with ``pw_rpc``.""" 15 16from __future__ import annotations 17 18from abc import ABC, abstractmethod 19from concurrent.futures import ThreadPoolExecutor 20import io 21import logging 22import os 23import platform 24import queue 25import select 26import sys 27import threading 28import time 29import socket 30import subprocess 31from typing import ( 32 Any, 33 BinaryIO, 34 Callable, 35 Iterable, 36 Sequence, 37 TypeVar, 38) 39import warnings 40 41import serial 42 43from pw_protobuf_compiler import python_protos 44import pw_rpc 45from pw_rpc import callback_client 46 47from pw_hdlc.decode import Frame, FrameDecoder 48from pw_hdlc import encode 49 50_LOG = logging.getLogger('pw_hdlc.rpc') 51 52STDOUT_ADDRESS = 1 53DEFAULT_ADDRESS = ord('R') 54DEFAULT_CHANNEL_ID = 1 55_VERBOSE = logging.DEBUG - 1 56 57 58def channel_output( 59 writer: Callable[[bytes], Any], 60 address: int = DEFAULT_ADDRESS, 61 delay_s: float = 0, 62) -> Callable[[bytes], None]: 63 """ 64 Returns a function that can be used as a channel output for ``pw_rpc``. 65 """ 66 67 if delay_s: 68 69 def slow_write(data: bytes) -> None: 70 """Slows down writes in case unbuffered serial is in use.""" 71 for byte in data: 72 time.sleep(delay_s) 73 writer(bytes([byte])) 74 75 return lambda data: slow_write(encode.ui_frame(address, data)) 76 77 def write_hdlc(data: bytes): 78 frame = encode.ui_frame(address, data) 79 _LOG.log(_VERBOSE, 'Write %2d B: %s', len(frame), frame) 80 writer(frame) 81 82 return write_hdlc 83 84 85FrameHandlers = dict[int, Callable[[Frame], Any]] 86FrameTypeT = TypeVar('FrameTypeT') 87 88 89class CancellableReader(ABC): 90 """Wraps communication interfaces used for reading incoming data with the 91 guarantee that the read request can be cancelled. Derived classes must 92 implement the :py:func:`cancel_read()` method. 93 94 Cancelling a read invalidates ongoing and future reads. The 95 :py:func:`cancel_read()` method can only be called once. 96 """ 97 98 def __init__(self, base_obj: Any, *read_args, **read_kwargs): 99 """ 100 Args: 101 base_obj: Object that offers a ``read()`` method with optional args 102 and kwargs. 103 read_args: Arguments for ``base_obj.read()`` function. 104 read_kwargs: Keyword arguments for ``base_obj.read()`` function. 105 """ 106 self._base_obj = base_obj 107 self._read_args = read_args 108 self._read_kwargs = read_kwargs 109 110 def __enter__(self) -> CancellableReader: 111 return self 112 113 def __exit__(self, *exc_info) -> None: 114 self.cancel_read() 115 116 def read(self) -> bytes: 117 """Reads bytes that contain parts of or full RPC packets.""" 118 return self._base_obj.read(*self._read_args, **self._read_kwargs) 119 120 @abstractmethod 121 def cancel_read(self) -> None: 122 """Cancels a blocking read request and all future reads. 123 124 Can only be called once. 125 """ 126 127 128class SelectableReader(CancellableReader): 129 """ 130 Wraps interfaces that work with ``select()`` to signal when data is 131 received. 132 133 These interfaces must provide a ``fileno()`` method. 134 WINDOWS ONLY: Only sockets that originate from WinSock can be wrapped. File 135 objects are not acceptable. 136 """ 137 138 _STOP_CMD = b'STOP' 139 140 def __init__(self, base_obj: Any, *read_args, **read_kwargs): 141 assert hasattr(base_obj, 'fileno') 142 if platform.system() == 'Windows' and not isinstance( 143 base_obj, socket.socket 144 ): 145 raise ValueError('Only socket objects are selectable on Windows') 146 super().__init__(base_obj, *read_args, **read_kwargs) 147 self._cancel_signal_pipe_r_fd, self._cancel_signal_pipe_w_fd = os.pipe() 148 self._waiting_for_read_or_cancel_lock = threading.Lock() 149 150 def __exit__(self, *exc_info) -> None: 151 self.cancel_read() 152 with self._waiting_for_read_or_cancel_lock: 153 if self._cancel_signal_pipe_r_fd > 0: 154 os.close(self._cancel_signal_pipe_r_fd) 155 self._cancel_signal_pipe_r_fd = -1 156 157 def read(self) -> bytes: 158 if self._wait_for_read_or_cancel(): 159 return super().read() 160 return b'' 161 162 def _wait_for_read_or_cancel(self) -> bool: 163 """Returns ``True`` when ready to read.""" 164 with self._waiting_for_read_or_cancel_lock: 165 if self._base_obj.fileno() < 0 or self._cancel_signal_pipe_r_fd < 0: 166 # The interface might've been closed already. 167 return False 168 ready_to_read, _, exception_list = select.select( 169 [self._cancel_signal_pipe_r_fd, self._base_obj], 170 [], 171 [self._base_obj], 172 ) 173 if self._cancel_signal_pipe_r_fd in ready_to_read: 174 # A signal to stop the reading process was received. 175 os.read(self._cancel_signal_pipe_r_fd, len(self._STOP_CMD)) 176 os.close(self._cancel_signal_pipe_r_fd) 177 self._cancel_signal_pipe_r_fd = -1 178 return False 179 180 if exception_list: 181 _LOG.error('Error reading interface') 182 return False 183 return True 184 185 def cancel_read(self) -> None: 186 if self._cancel_signal_pipe_w_fd > 0: 187 os.write(self._cancel_signal_pipe_w_fd, self._STOP_CMD) 188 os.close(self._cancel_signal_pipe_w_fd) 189 self._cancel_signal_pipe_w_fd = -1 190 191 192class SocketReader(SelectableReader): 193 """Wraps a socket ``recv()`` function.""" 194 195 def __init__(self, base_obj: socket.socket, *read_args, **read_kwargs): 196 super().__init__(base_obj, *read_args, **read_kwargs) 197 198 def read(self) -> bytes: 199 if self._wait_for_read_or_cancel(): 200 return self._base_obj.recv(*self._read_args, **self._read_kwargs) 201 return b'' 202 203 def __exit__(self, *exc_info) -> None: 204 self.cancel_read() 205 self._base_obj.close() 206 207 208class SerialReader(CancellableReader): 209 """Wraps a :py:class:`serial.Serial` object.""" 210 211 def __init__(self, base_obj: serial.Serial, *read_args, **read_kwargs): 212 super().__init__(base_obj, *read_args, **read_kwargs) 213 214 def cancel_read(self) -> None: 215 self._base_obj.cancel_read() 216 217 def __exit__(self, *exc_info) -> None: 218 self.cancel_read() 219 self._base_obj.close() 220 221 222class DataReaderAndExecutor: 223 """Reads incoming bytes, data processor that delegates frame handling. 224 225 Executing callbacks in a ``ThreadPoolExecutor`` decouples reading the input 226 stream from handling the data. That way, if a handler function takes a 227 long time or crashes, this reading thread is not interrupted. 228 """ 229 230 def __init__( 231 self, 232 reader: CancellableReader, 233 on_read_error: Callable[[Exception], None], 234 data_processor: Callable[[bytes], Iterable[FrameTypeT]], 235 frame_handler: Callable[[FrameTypeT], None], 236 handler_threads: int | None = 1, 237 ): 238 """Creates the data reader and frame delegator. 239 240 Args: 241 reader: Reads incoming bytes from the given transport, blocks until 242 data is available or an exception is raised. Otherwise the reader 243 will exit. 244 on_read_error: Called when there is an error reading incoming bytes. 245 data_processor: Processes read bytes and returns a frame-like object 246 that the frame_handler can process. 247 frame_handler: Handles a received frame. 248 handler_threads: The number of threads in the executor pool. 249 """ 250 251 self._reader = reader 252 self._on_read_error = on_read_error 253 self._data_processor = data_processor 254 self._frame_handler = frame_handler 255 self._handler_threads = handler_threads 256 257 self._reader_thread = threading.Thread(target=self._run) 258 self._reader_thread_stop = threading.Event() 259 260 def start(self) -> None: 261 """Starts the reading process.""" 262 _LOG.debug('Starting read process') 263 self._reader_thread_stop.clear() 264 self._reader_thread.start() 265 266 def stop(self) -> None: 267 """Stops the reading process. 268 269 This requests that the reading process stop and waits 270 for the background thread to exit. 271 """ 272 _LOG.debug('Stopping read process') 273 self._reader_thread_stop.set() 274 self._reader.cancel_read() 275 self._reader_thread.join(30) 276 if self._reader_thread.is_alive(): 277 warnings.warn( 278 'Timed out waiting for read thread to terminate.\n' 279 'Tip: Use a `CancellableReader` to cancel reads.' 280 ) 281 282 def _run(self) -> None: 283 """Reads raw data in a background thread.""" 284 with ThreadPoolExecutor(max_workers=self._handler_threads) as executor: 285 while not self._reader_thread_stop.is_set(): 286 try: 287 data = self._reader.read() 288 except Exception as exc: # pylint: disable=broad-except 289 # Don't report the read error if the thread is stopping. 290 # The stream or device backing _read was likely closed, 291 # so errors are expected. 292 if not self._reader_thread_stop.is_set(): 293 self._on_read_error(exc) 294 _LOG.debug( 295 'DataReaderAndExecutor thread exiting due to exception', 296 exc_info=exc, 297 ) 298 return 299 300 if not data: 301 continue 302 303 _LOG.log(_VERBOSE, 'Read %2d B: %s', len(data), data) 304 305 for frame in self._data_processor(data): 306 executor.submit(self._frame_handler, frame) 307 308 309# Writes to stdout by default, but sys.stdout.buffer is not guaranteed to exist 310# (see https://docs.python.org/3/library/io.html#io.TextIOBase.buffer). Defer 311# to sys.__stdout__.buffer if sys.stdout is wrapped with something that does not 312# offer it. 313def write_to_file( 314 data: bytes, 315 output: BinaryIO = getattr(sys.stdout, 'buffer', sys.__stdout__.buffer), 316) -> None: 317 output.write(data + b'\n') 318 output.flush() 319 320 321def default_channels(write: Callable[[bytes], Any]) -> list[pw_rpc.Channel]: 322 return [pw_rpc.Channel(DEFAULT_CHANNEL_ID, channel_output(write))] 323 324 325PathsModulesOrProtoLibrary = ( 326 Iterable[python_protos.PathOrModule] | python_protos.Library 327) 328 329 330class RpcClient: 331 """An RPC client with configurable incoming data processing.""" 332 333 def __init__( 334 self, 335 reader_and_executor: DataReaderAndExecutor, 336 paths_or_modules: PathsModulesOrProtoLibrary, 337 channels: Iterable[pw_rpc.Channel], 338 client_impl: pw_rpc.client.ClientImpl | None = None, 339 ): 340 """Creates an RPC client. 341 342 Args: 343 reader_and_executor: ``DataReaderAndExecutor`` instance. 344 paths_or_modules: paths to .proto files or proto modules. 345 channels: RPC channels to use for output. 346 client_impl: The RPC client implementation. Defaults to the callback 347 client implementation if not provided. 348 """ 349 if isinstance(paths_or_modules, python_protos.Library): 350 self.protos = paths_or_modules 351 else: 352 self.protos = python_protos.Library.from_paths(paths_or_modules) 353 354 if client_impl is None: 355 client_impl = callback_client.Impl() 356 357 self.client = pw_rpc.Client.from_modules( 358 client_impl, channels, self.protos.modules() 359 ) 360 361 # Start background thread that reads and processes RPC packets. 362 self._reader_and_executor = reader_and_executor 363 self._reader_and_executor.start() 364 365 def __enter__(self): 366 return self 367 368 def __exit__(self, *exc_info): 369 self.close() 370 371 def close(self) -> None: 372 self._reader_and_executor.stop() 373 374 def rpcs(self, channel_id: int | None = None) -> Any: 375 """Returns object for accessing services on the specified channel. 376 377 This skips some intermediate layers to make it simpler to invoke RPCs 378 from an ``HdlcRpcClient``. If only one channel is in use, the channel ID 379 is not necessary. 380 """ 381 if channel_id is None: 382 return next(iter(self.client.channels())).rpcs 383 384 return self.client.channel(channel_id).rpcs 385 386 def handle_rpc_packet(self, packet: bytes) -> None: 387 if not self.client.process_packet(packet): 388 _LOG.error('Packet not handled by RPC client: %s', packet) 389 390 391class HdlcRpcClient(RpcClient): 392 """An RPC client configured to run over HDLC. 393 394 Expects HDLC frames to have addresses that dictate how to parse the HDLC 395 payloads. 396 """ 397 398 def __init__( 399 self, 400 reader: CancellableReader, 401 paths_or_modules: PathsModulesOrProtoLibrary, 402 channels: Iterable[pw_rpc.Channel], 403 output: Callable[[bytes], Any] = write_to_file, 404 client_impl: pw_rpc.client.ClientImpl | None = None, 405 *, 406 _incoming_packet_filter_for_testing: ( 407 pw_rpc.ChannelManipulator | None 408 ) = None, 409 rpc_frames_address: int = DEFAULT_ADDRESS, 410 log_frames_address: int = STDOUT_ADDRESS, 411 extra_frame_handlers: FrameHandlers | None = None, 412 ): 413 """Creates an RPC client configured to communicate using HDLC. 414 415 Args: 416 reader: Readable object used to receive RPC packets. 417 paths_or_modules: paths to .proto files or proto modules. 418 channels: RPC channels to use for output. 419 output: where to write ``stdout`` output from the device. 420 client_impl: The RPC Client implementation. Defaults to the callback 421 client implementation if not provided. 422 rpc_frames_address: the address used in the HDLC frames for RPC 423 packets. This can be the channel ID, or any custom address. 424 log_frames_address: the address used in the HDLC frames for ``stdout`` 425 output from the device. 426 extra_fram_handlers: Optional mapping of HDLC frame addresses to their 427 callbacks. 428 """ 429 # Set up frame handling. 430 rpc_output: Callable[[bytes], Any] = self.handle_rpc_packet 431 if _incoming_packet_filter_for_testing is not None: 432 _incoming_packet_filter_for_testing.send_packet = rpc_output 433 rpc_output = _incoming_packet_filter_for_testing 434 435 frame_handlers: FrameHandlers = { 436 rpc_frames_address: lambda frame: rpc_output(frame.data), 437 log_frames_address: lambda frame: output(frame.data), 438 } 439 if extra_frame_handlers: 440 frame_handlers.update(extra_frame_handlers) 441 442 def handle_frame(frame: Frame) -> None: 443 # Suppress raising any frame errors to avoid crashes on data 444 # processing, which may hide or drop other data. 445 try: 446 if not frame.ok(): 447 _LOG.error('Failed to parse frame: %s', frame.status.value) 448 _LOG.debug('%s', frame.data) 449 return 450 451 try: 452 frame_handlers[frame.address](frame) 453 except KeyError: 454 _LOG.warning( 455 'Unhandled frame for address %d: %s', 456 frame.address, 457 frame, 458 ) 459 except: # pylint: disable=bare-except 460 _LOG.exception('Exception in HDLC frame handler thread') 461 462 decoder = FrameDecoder() 463 464 def on_read_error(exc: Exception) -> None: 465 _LOG.error('data reader encountered an error', exc_info=exc) 466 467 reader_and_executor = DataReaderAndExecutor( 468 reader, on_read_error, decoder.process_valid_frames, handle_frame 469 ) 470 super().__init__( 471 reader_and_executor, paths_or_modules, channels, client_impl 472 ) 473 474 475class NoEncodingSingleChannelRpcClient(RpcClient): 476 """An RPC client without any frame encoding with a single channel output. 477 478 The caveat is that the provided read function must read entire frames. 479 """ 480 481 def __init__( 482 self, 483 reader: CancellableReader, 484 paths_or_modules: PathsModulesOrProtoLibrary, 485 channel: pw_rpc.Channel, 486 client_impl: pw_rpc.client.ClientImpl | None = None, 487 ): 488 """Creates an RPC client over a single channel with no frame encoding. 489 490 Args: 491 reader: Readable object used to receive RPC packets. 492 paths_or_modules: paths to .proto files or proto modules. 493 channel: RPC channel to use for output. 494 client_impl: The RPC Client implementation. Defaults to the callback 495 client implementation if not provided. 496 """ 497 498 def process_data(data: bytes): 499 yield data 500 501 def on_read_error(exc: Exception) -> None: 502 _LOG.error('data reader encountered an error', exc_info=exc) 503 504 reader_and_executor = DataReaderAndExecutor( 505 reader, on_read_error, process_data, self.handle_rpc_packet 506 ) 507 super().__init__( 508 reader_and_executor, paths_or_modules, [channel], client_impl 509 ) 510 511 512def _try_connect(port: int, attempts: int = 10) -> socket.socket: 513 """Tries to connect to the specified port up to the given number of times. 514 515 This is helpful when connecting to a process that was started by this 516 script. The process may need time to start listening for connections, and 517 that length of time can vary. This retries with a short delay rather than 518 having to wait for the worst case delay every time. 519 """ 520 timeout_s = 0.001 521 while True: 522 time.sleep(timeout_s) 523 524 try: 525 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 526 sock.connect(('localhost', port)) 527 return sock 528 except ConnectionRefusedError: 529 sock.close() 530 attempts -= 1 531 if attempts <= 0: 532 raise 533 534 timeout_s *= 2 535 536 537class SocketSubprocess: 538 """Executes a subprocess and connects to it with a socket.""" 539 540 def __init__(self, command: Sequence, port: int) -> None: 541 self._server_process = subprocess.Popen(command, stdin=subprocess.PIPE) 542 self.stdin = self._server_process.stdin 543 544 try: 545 self.socket: socket.socket = _try_connect(port) # 546 except: 547 self._server_process.terminate() 548 self._server_process.communicate() 549 raise 550 551 def close(self) -> None: 552 try: 553 self.socket.close() 554 finally: 555 self._server_process.terminate() 556 self._server_process.communicate() 557 558 def __enter__(self) -> SocketSubprocess: 559 return self 560 561 def __exit__(self, exc_type, exc_value, traceback) -> None: 562 self.close() 563 564 565class HdlcRpcLocalServerAndClient: 566 """Runs an RPC server in a subprocess and connects to it over a socket. 567 568 This can be used to run a local RPC server in an integration test. 569 """ 570 571 def __init__( 572 self, 573 server_command: Sequence, 574 port: int, 575 protos: PathsModulesOrProtoLibrary, 576 *, 577 incoming_processor: pw_rpc.ChannelManipulator | None = None, 578 outgoing_processor: pw_rpc.ChannelManipulator | None = None, 579 ) -> None: 580 """Creates a new ``HdlcRpcLocalServerAndClient``.""" 581 582 self.server = SocketSubprocess(server_command, port) 583 584 self._bytes_queue: queue.SimpleQueue[bytes] = queue.SimpleQueue() 585 self._read_thread = threading.Thread(target=self._read_from_socket) 586 self._read_thread.start() 587 588 self.output = io.BytesIO() 589 590 self.channel_output: Any = self.server.socket.sendall 591 592 self._incoming_processor = incoming_processor 593 if outgoing_processor is not None: 594 outgoing_processor.send_packet = self.channel_output 595 self.channel_output = outgoing_processor 596 597 class QueueReader(CancellableReader): 598 def read(self) -> bytes: 599 try: 600 return self._base_obj.get(timeout=3) 601 except queue.Empty: 602 return b'' 603 604 def cancel_read(self) -> None: 605 pass 606 607 self._rpc_client = HdlcRpcClient( 608 QueueReader(self._bytes_queue), 609 protos, 610 default_channels(self.channel_output), 611 self.output.write, 612 _incoming_packet_filter_for_testing=incoming_processor, 613 ) 614 self.client = self._rpc_client.client 615 616 def _read_from_socket(self): 617 while True: 618 data = self.server.socket.recv(4096) 619 self._bytes_queue.put(data) 620 if not data: 621 return 622 623 def close(self): 624 self.server.close() 625 self.output.close() 626 self._rpc_client.close() 627 self._read_thread.join() 628 629 def __enter__(self) -> HdlcRpcLocalServerAndClient: 630 return self 631 632 def __exit__(self, exc_type, exc_value, traceback) -> None: 633 self.close() 634