• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Utilities for using HDLC with ``pw_rpc``."""
15
16from __future__ import annotations
17
18from abc import ABC, abstractmethod
19from concurrent.futures import ThreadPoolExecutor
20import io
21import logging
22import os
23import platform
24import queue
25import select
26import sys
27import threading
28import time
29import socket
30import subprocess
31from typing import (
32    Any,
33    BinaryIO,
34    Callable,
35    Iterable,
36    Sequence,
37    TypeVar,
38)
39import warnings
40
41import serial
42
43from pw_protobuf_compiler import python_protos
44import pw_rpc
45from pw_rpc import callback_client
46
47from pw_hdlc.decode import Frame, FrameDecoder
48from pw_hdlc import encode
49
50_LOG = logging.getLogger('pw_hdlc.rpc')
51
52STDOUT_ADDRESS = 1
53DEFAULT_ADDRESS = ord('R')
54DEFAULT_CHANNEL_ID = 1
55_VERBOSE = logging.DEBUG - 1
56
57
58def channel_output(
59    writer: Callable[[bytes], Any],
60    address: int = DEFAULT_ADDRESS,
61    delay_s: float = 0,
62) -> Callable[[bytes], None]:
63    """
64    Returns a function that can be used as a channel output for ``pw_rpc``.
65    """
66
67    if delay_s:
68
69        def slow_write(data: bytes) -> None:
70            """Slows down writes in case unbuffered serial is in use."""
71            for byte in data:
72                time.sleep(delay_s)
73                writer(bytes([byte]))
74
75        return lambda data: slow_write(encode.ui_frame(address, data))
76
77    def write_hdlc(data: bytes):
78        frame = encode.ui_frame(address, data)
79        _LOG.log(_VERBOSE, 'Write %2d B: %s', len(frame), frame)
80        writer(frame)
81
82    return write_hdlc
83
84
85FrameHandlers = dict[int, Callable[[Frame], Any]]
86FrameTypeT = TypeVar('FrameTypeT')
87
88
89class CancellableReader(ABC):
90    """Wraps communication interfaces used for reading incoming data with the
91    guarantee that the read request can be cancelled. Derived classes must
92    implement the :py:func:`cancel_read()` method.
93
94    Cancelling a read invalidates ongoing and future reads. The
95    :py:func:`cancel_read()` method can only be called once.
96    """
97
98    def __init__(self, base_obj: Any, *read_args, **read_kwargs):
99        """
100        Args:
101            base_obj: Object that offers a ``read()`` method with optional args
102                and kwargs.
103            read_args: Arguments for ``base_obj.read()`` function.
104            read_kwargs: Keyword arguments for ``base_obj.read()`` function.
105        """
106        self._base_obj = base_obj
107        self._read_args = read_args
108        self._read_kwargs = read_kwargs
109
110    def __enter__(self) -> CancellableReader:
111        return self
112
113    def __exit__(self, *exc_info) -> None:
114        self.cancel_read()
115
116    def read(self) -> bytes:
117        """Reads bytes that contain parts of or full RPC packets."""
118        return self._base_obj.read(*self._read_args, **self._read_kwargs)
119
120    @abstractmethod
121    def cancel_read(self) -> None:
122        """Cancels a blocking read request and all future reads.
123
124        Can only be called once.
125        """
126
127
128class SelectableReader(CancellableReader):
129    """
130    Wraps interfaces that work with ``select()`` to signal when data is
131    received.
132
133    These interfaces must provide a ``fileno()`` method.
134    WINDOWS ONLY: Only sockets that originate from WinSock can be wrapped. File
135    objects are not acceptable.
136    """
137
138    _STOP_CMD = b'STOP'
139
140    def __init__(self, base_obj: Any, *read_args, **read_kwargs):
141        assert hasattr(base_obj, 'fileno')
142        if platform.system() == 'Windows' and not isinstance(
143            base_obj, socket.socket
144        ):
145            raise ValueError('Only socket objects are selectable on Windows')
146        super().__init__(base_obj, *read_args, **read_kwargs)
147        self._cancel_signal_pipe_r_fd, self._cancel_signal_pipe_w_fd = os.pipe()
148        self._waiting_for_read_or_cancel_lock = threading.Lock()
149
150    def __exit__(self, *exc_info) -> None:
151        self.cancel_read()
152        with self._waiting_for_read_or_cancel_lock:
153            if self._cancel_signal_pipe_r_fd > 0:
154                os.close(self._cancel_signal_pipe_r_fd)
155                self._cancel_signal_pipe_r_fd = -1
156
157    def read(self) -> bytes:
158        if self._wait_for_read_or_cancel():
159            return super().read()
160        return b''
161
162    def _wait_for_read_or_cancel(self) -> bool:
163        """Returns ``True`` when ready to read."""
164        with self._waiting_for_read_or_cancel_lock:
165            if self._base_obj.fileno() < 0 or self._cancel_signal_pipe_r_fd < 0:
166                # The interface might've been closed already.
167                return False
168            ready_to_read, _, exception_list = select.select(
169                [self._cancel_signal_pipe_r_fd, self._base_obj],
170                [],
171                [self._base_obj],
172            )
173            if self._cancel_signal_pipe_r_fd in ready_to_read:
174                # A signal to stop the reading process was received.
175                os.read(self._cancel_signal_pipe_r_fd, len(self._STOP_CMD))
176                os.close(self._cancel_signal_pipe_r_fd)
177                self._cancel_signal_pipe_r_fd = -1
178                return False
179
180            if exception_list:
181                _LOG.error('Error reading interface')
182                return False
183        return True
184
185    def cancel_read(self) -> None:
186        if self._cancel_signal_pipe_w_fd > 0:
187            os.write(self._cancel_signal_pipe_w_fd, self._STOP_CMD)
188            os.close(self._cancel_signal_pipe_w_fd)
189            self._cancel_signal_pipe_w_fd = -1
190
191
192class SocketReader(SelectableReader):
193    """Wraps a socket ``recv()`` function."""
194
195    def __init__(self, base_obj: socket.socket, *read_args, **read_kwargs):
196        super().__init__(base_obj, *read_args, **read_kwargs)
197
198    def read(self) -> bytes:
199        if self._wait_for_read_or_cancel():
200            return self._base_obj.recv(*self._read_args, **self._read_kwargs)
201        return b''
202
203    def __exit__(self, *exc_info) -> None:
204        self.cancel_read()
205        self._base_obj.close()
206
207
208class SerialReader(CancellableReader):
209    """Wraps a :py:class:`serial.Serial` object."""
210
211    def __init__(self, base_obj: serial.Serial, *read_args, **read_kwargs):
212        super().__init__(base_obj, *read_args, **read_kwargs)
213
214    def cancel_read(self) -> None:
215        self._base_obj.cancel_read()
216
217    def __exit__(self, *exc_info) -> None:
218        self.cancel_read()
219        self._base_obj.close()
220
221
222class DataReaderAndExecutor:
223    """Reads incoming bytes, data processor that delegates frame handling.
224
225    Executing callbacks in a ``ThreadPoolExecutor`` decouples reading the input
226    stream from handling the data. That way, if a handler function takes a
227    long time or crashes, this reading thread is not interrupted.
228    """
229
230    def __init__(
231        self,
232        reader: CancellableReader,
233        on_read_error: Callable[[Exception], None],
234        data_processor: Callable[[bytes], Iterable[FrameTypeT]],
235        frame_handler: Callable[[FrameTypeT], None],
236        handler_threads: int | None = 1,
237    ):
238        """Creates the data reader and frame delegator.
239
240        Args:
241            reader: Reads incoming bytes from the given transport, blocks until
242              data is available or an exception is raised. Otherwise the reader
243              will exit.
244            on_read_error: Called when there is an error reading incoming bytes.
245            data_processor: Processes read bytes and returns a frame-like object
246              that the frame_handler can process.
247            frame_handler: Handles a received frame.
248            handler_threads: The number of threads in the executor pool.
249        """
250
251        self._reader = reader
252        self._on_read_error = on_read_error
253        self._data_processor = data_processor
254        self._frame_handler = frame_handler
255        self._handler_threads = handler_threads
256
257        self._reader_thread = threading.Thread(target=self._run)
258        self._reader_thread_stop = threading.Event()
259
260    def start(self) -> None:
261        """Starts the reading process."""
262        _LOG.debug('Starting read process')
263        self._reader_thread_stop.clear()
264        self._reader_thread.start()
265
266    def stop(self) -> None:
267        """Stops the reading process.
268
269        This requests that the reading process stop and waits
270        for the background thread to exit.
271        """
272        _LOG.debug('Stopping read process')
273        self._reader_thread_stop.set()
274        self._reader.cancel_read()
275        self._reader_thread.join(30)
276        if self._reader_thread.is_alive():
277            warnings.warn(
278                'Timed out waiting for read thread to terminate.\n'
279                'Tip: Use a `CancellableReader` to cancel reads.'
280            )
281
282    def _run(self) -> None:
283        """Reads raw data in a background thread."""
284        with ThreadPoolExecutor(max_workers=self._handler_threads) as executor:
285            while not self._reader_thread_stop.is_set():
286                try:
287                    data = self._reader.read()
288                except Exception as exc:  # pylint: disable=broad-except
289                    # Don't report the read error if the thread is stopping.
290                    # The stream or device backing _read was likely closed,
291                    # so errors are expected.
292                    if not self._reader_thread_stop.is_set():
293                        self._on_read_error(exc)
294                    _LOG.debug(
295                        'DataReaderAndExecutor thread exiting due to exception',
296                        exc_info=exc,
297                    )
298                    return
299
300                if not data:
301                    continue
302
303                _LOG.log(_VERBOSE, 'Read %2d B: %s', len(data), data)
304
305                for frame in self._data_processor(data):
306                    executor.submit(self._frame_handler, frame)
307
308
309# Writes to stdout by default, but sys.stdout.buffer is not guaranteed to exist
310# (see https://docs.python.org/3/library/io.html#io.TextIOBase.buffer). Defer
311# to sys.__stdout__.buffer if sys.stdout is wrapped with something that does not
312# offer it.
313def write_to_file(
314    data: bytes,
315    output: BinaryIO = getattr(sys.stdout, 'buffer', sys.__stdout__.buffer),
316) -> None:
317    output.write(data + b'\n')
318    output.flush()
319
320
321def default_channels(write: Callable[[bytes], Any]) -> list[pw_rpc.Channel]:
322    return [pw_rpc.Channel(DEFAULT_CHANNEL_ID, channel_output(write))]
323
324
325PathsModulesOrProtoLibrary = (
326    Iterable[python_protos.PathOrModule] | python_protos.Library
327)
328
329
330class RpcClient:
331    """An RPC client with configurable incoming data processing."""
332
333    def __init__(
334        self,
335        reader_and_executor: DataReaderAndExecutor,
336        paths_or_modules: PathsModulesOrProtoLibrary,
337        channels: Iterable[pw_rpc.Channel],
338        client_impl: pw_rpc.client.ClientImpl | None = None,
339    ):
340        """Creates an RPC client.
341
342        Args:
343          reader_and_executor: ``DataReaderAndExecutor`` instance.
344          paths_or_modules: paths to .proto files or proto modules.
345          channels: RPC channels to use for output.
346          client_impl: The RPC client implementation. Defaults to the callback
347            client implementation if not provided.
348        """
349        if isinstance(paths_or_modules, python_protos.Library):
350            self.protos = paths_or_modules
351        else:
352            self.protos = python_protos.Library.from_paths(paths_or_modules)
353
354        if client_impl is None:
355            client_impl = callback_client.Impl()
356
357        self.client = pw_rpc.Client.from_modules(
358            client_impl, channels, self.protos.modules()
359        )
360
361        # Start background thread that reads and processes RPC packets.
362        self._reader_and_executor = reader_and_executor
363        self._reader_and_executor.start()
364
365    def __enter__(self):
366        return self
367
368    def __exit__(self, *exc_info):
369        self.close()
370
371    def close(self) -> None:
372        self._reader_and_executor.stop()
373
374    def rpcs(self, channel_id: int | None = None) -> Any:
375        """Returns object for accessing services on the specified channel.
376
377        This skips some intermediate layers to make it simpler to invoke RPCs
378        from an ``HdlcRpcClient``. If only one channel is in use, the channel ID
379        is not necessary.
380        """
381        if channel_id is None:
382            return next(iter(self.client.channels())).rpcs
383
384        return self.client.channel(channel_id).rpcs
385
386    def handle_rpc_packet(self, packet: bytes) -> None:
387        if not self.client.process_packet(packet):
388            _LOG.error('Packet not handled by RPC client: %s', packet)
389
390
391class HdlcRpcClient(RpcClient):
392    """An RPC client configured to run over HDLC.
393
394    Expects HDLC frames to have addresses that dictate how to parse the HDLC
395    payloads.
396    """
397
398    def __init__(
399        self,
400        reader: CancellableReader,
401        paths_or_modules: PathsModulesOrProtoLibrary,
402        channels: Iterable[pw_rpc.Channel],
403        output: Callable[[bytes], Any] = write_to_file,
404        client_impl: pw_rpc.client.ClientImpl | None = None,
405        *,
406        _incoming_packet_filter_for_testing: (
407            pw_rpc.ChannelManipulator | None
408        ) = None,
409        rpc_frames_address: int = DEFAULT_ADDRESS,
410        log_frames_address: int = STDOUT_ADDRESS,
411        extra_frame_handlers: FrameHandlers | None = None,
412    ):
413        """Creates an RPC client configured to communicate using HDLC.
414
415        Args:
416          reader: Readable object used to receive RPC packets.
417          paths_or_modules: paths to .proto files or proto modules.
418          channels: RPC channels to use for output.
419          output: where to write ``stdout`` output from the device.
420          client_impl: The RPC Client implementation. Defaults to the callback
421            client implementation if not provided.
422          rpc_frames_address: the address used in the HDLC frames for RPC
423            packets. This can be the channel ID, or any custom address.
424          log_frames_address: the address used in the HDLC frames for ``stdout``
425            output from the device.
426          extra_fram_handlers: Optional mapping of HDLC frame addresses to their
427            callbacks.
428        """
429        # Set up frame handling.
430        rpc_output: Callable[[bytes], Any] = self.handle_rpc_packet
431        if _incoming_packet_filter_for_testing is not None:
432            _incoming_packet_filter_for_testing.send_packet = rpc_output
433            rpc_output = _incoming_packet_filter_for_testing
434
435        frame_handlers: FrameHandlers = {
436            rpc_frames_address: lambda frame: rpc_output(frame.data),
437            log_frames_address: lambda frame: output(frame.data),
438        }
439        if extra_frame_handlers:
440            frame_handlers.update(extra_frame_handlers)
441
442        def handle_frame(frame: Frame) -> None:
443            # Suppress raising any frame errors to avoid crashes on data
444            # processing, which may hide or drop other data.
445            try:
446                if not frame.ok():
447                    _LOG.error('Failed to parse frame: %s', frame.status.value)
448                    _LOG.debug('%s', frame.data)
449                    return
450
451                try:
452                    frame_handlers[frame.address](frame)
453                except KeyError:
454                    _LOG.warning(
455                        'Unhandled frame for address %d: %s',
456                        frame.address,
457                        frame,
458                    )
459            except:  # pylint: disable=bare-except
460                _LOG.exception('Exception in HDLC frame handler thread')
461
462        decoder = FrameDecoder()
463
464        def on_read_error(exc: Exception) -> None:
465            _LOG.error('data reader encountered an error', exc_info=exc)
466
467        reader_and_executor = DataReaderAndExecutor(
468            reader, on_read_error, decoder.process_valid_frames, handle_frame
469        )
470        super().__init__(
471            reader_and_executor, paths_or_modules, channels, client_impl
472        )
473
474
475class NoEncodingSingleChannelRpcClient(RpcClient):
476    """An RPC client without any frame encoding with a single channel output.
477
478    The caveat is that the provided read function must read entire frames.
479    """
480
481    def __init__(
482        self,
483        reader: CancellableReader,
484        paths_or_modules: PathsModulesOrProtoLibrary,
485        channel: pw_rpc.Channel,
486        client_impl: pw_rpc.client.ClientImpl | None = None,
487    ):
488        """Creates an RPC client over a single channel with no frame encoding.
489
490        Args:
491          reader: Readable object used to receive RPC packets.
492          paths_or_modules: paths to .proto files or proto modules.
493          channel: RPC channel to use for output.
494          client_impl: The RPC Client implementation. Defaults to the callback
495            client implementation if not provided.
496        """
497
498        def process_data(data: bytes):
499            yield data
500
501        def on_read_error(exc: Exception) -> None:
502            _LOG.error('data reader encountered an error', exc_info=exc)
503
504        reader_and_executor = DataReaderAndExecutor(
505            reader, on_read_error, process_data, self.handle_rpc_packet
506        )
507        super().__init__(
508            reader_and_executor, paths_or_modules, [channel], client_impl
509        )
510
511
512def _try_connect(port: int, attempts: int = 10) -> socket.socket:
513    """Tries to connect to the specified port up to the given number of times.
514
515    This is helpful when connecting to a process that was started by this
516    script. The process may need time to start listening for connections, and
517    that length of time can vary. This retries with a short delay rather than
518    having to wait for the worst case delay every time.
519    """
520    timeout_s = 0.001
521    while True:
522        time.sleep(timeout_s)
523
524        try:
525            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
526            sock.connect(('localhost', port))
527            return sock
528        except ConnectionRefusedError:
529            sock.close()
530            attempts -= 1
531            if attempts <= 0:
532                raise
533
534            timeout_s *= 2
535
536
537class SocketSubprocess:
538    """Executes a subprocess and connects to it with a socket."""
539
540    def __init__(self, command: Sequence, port: int) -> None:
541        self._server_process = subprocess.Popen(command, stdin=subprocess.PIPE)
542        self.stdin = self._server_process.stdin
543
544        try:
545            self.socket: socket.socket = _try_connect(port)  # ��
546        except:
547            self._server_process.terminate()
548            self._server_process.communicate()
549            raise
550
551    def close(self) -> None:
552        try:
553            self.socket.close()
554        finally:
555            self._server_process.terminate()
556            self._server_process.communicate()
557
558    def __enter__(self) -> SocketSubprocess:
559        return self
560
561    def __exit__(self, exc_type, exc_value, traceback) -> None:
562        self.close()
563
564
565class HdlcRpcLocalServerAndClient:
566    """Runs an RPC server in a subprocess and connects to it over a socket.
567
568    This can be used to run a local RPC server in an integration test.
569    """
570
571    def __init__(
572        self,
573        server_command: Sequence,
574        port: int,
575        protos: PathsModulesOrProtoLibrary,
576        *,
577        incoming_processor: pw_rpc.ChannelManipulator | None = None,
578        outgoing_processor: pw_rpc.ChannelManipulator | None = None,
579    ) -> None:
580        """Creates a new ``HdlcRpcLocalServerAndClient``."""
581
582        self.server = SocketSubprocess(server_command, port)
583
584        self._bytes_queue: queue.SimpleQueue[bytes] = queue.SimpleQueue()
585        self._read_thread = threading.Thread(target=self._read_from_socket)
586        self._read_thread.start()
587
588        self.output = io.BytesIO()
589
590        self.channel_output: Any = self.server.socket.sendall
591
592        self._incoming_processor = incoming_processor
593        if outgoing_processor is not None:
594            outgoing_processor.send_packet = self.channel_output
595            self.channel_output = outgoing_processor
596
597        class QueueReader(CancellableReader):
598            def read(self) -> bytes:
599                try:
600                    return self._base_obj.get(timeout=3)
601                except queue.Empty:
602                    return b''
603
604            def cancel_read(self) -> None:
605                pass
606
607        self._rpc_client = HdlcRpcClient(
608            QueueReader(self._bytes_queue),
609            protos,
610            default_channels(self.channel_output),
611            self.output.write,
612            _incoming_packet_filter_for_testing=incoming_processor,
613        )
614        self.client = self._rpc_client.client
615
616    def _read_from_socket(self):
617        while True:
618            data = self.server.socket.recv(4096)
619            self._bytes_queue.put(data)
620            if not data:
621                return
622
623    def close(self):
624        self.server.close()
625        self.output.close()
626        self._rpc_client.close()
627        self._read_thread.join()
628
629    def __enter__(self) -> HdlcRpcLocalServerAndClient:
630        return self
631
632    def __exit__(self, exc_type, exc_value, traceback) -> None:
633        self.close()
634