• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Wrapers for pyserial classes to log read and write data."""
15from __future__ import annotations
16
17from contextvars import ContextVar
18import logging
19import textwrap
20from typing import TYPE_CHECKING
21
22import serial
23
24from pw_console.widgets.event_count_history import EventCountHistory
25
26if TYPE_CHECKING:
27    from _typeshed import ReadableBuffer
28
29_LOG = logging.getLogger('pw_console.serial_debug_logger')
30
31
32def _log_hex_strings(data: bytes, prefix=''):
33    """Create alinged hex number and character view log messages."""
34    # Make a list of 2 character hex number strings.
35    hex_numbers = textwrap.wrap(data.hex(), 2)
36
37    hex_chars = [
38        ('<' + str(b.to_bytes(1, byteorder='big')) + '>')
39        .replace("<b'\\x", '', 1)  # Remove b'\x from the beginning
40        .replace("<b'", '', 1)  # Remove b' from the beginning
41        .replace("'>", '', 1)  # Remove ' from the end
42        .rjust(2)
43        for b in data
44    ]
45
46    # Replace non-printable bytes with dots.
47    for i, num in enumerate(hex_numbers):
48        if num == hex_chars[i]:
49            hex_chars[i] = '..'
50
51    hex_numbers_msg = ' '.join(hex_numbers)
52    hex_chars_msg = ' '.join(hex_chars)
53
54    _LOG.debug(
55        '%s%s',
56        prefix,
57        hex_numbers_msg,
58        extra=dict(
59            extra_metadata_fields={
60                'msg': hex_numbers_msg,
61                'view': 'hex',
62            }
63        ),
64    )
65    _LOG.debug(
66        '%s%s',
67        prefix,
68        hex_chars_msg,
69        extra=dict(
70            extra_metadata_fields={
71                'msg': hex_chars_msg,
72                'view': 'chars',
73            }
74        ),
75    )
76
77
78BANDWIDTH_HISTORY_CONTEXTVAR = ContextVar(
79    'pw_console_bandwidth_history',
80    default={
81        'total': EventCountHistory(interval=3),
82        'read': EventCountHistory(interval=3),
83        'write': EventCountHistory(interval=3),
84    },
85)
86
87
88class SerialWithLogging(serial.Serial):  # pylint: disable=too-many-ancestors
89    """pyserial with read and write wrappers for logging."""
90
91    def __init__(self, *args, **kwargs):
92        super().__init__(*args, **kwargs)
93        self.pw_bps_history = BANDWIDTH_HISTORY_CONTEXTVAR.get()
94
95    def read(self, size: int = 1) -> bytes:
96        data = super().read(size)
97        self.pw_bps_history['read'].log(len(data))
98        self.pw_bps_history['total'].log(len(data))
99
100        if len(data) > 0:
101            prefix = 'Read %2d B: ' % len(data)
102            _LOG.debug(
103                '%s%s',
104                prefix,
105                data,
106                extra=dict(
107                    extra_metadata_fields={
108                        'mode': 'Read',
109                        'bytes': len(data),
110                        'view': 'bytes',
111                        'msg': str(data),
112                    }
113                ),
114            )
115            _log_hex_strings(data, prefix=prefix)
116
117            # Print individual lines
118            for line in data.decode(
119                encoding='utf-8', errors='ignore'
120            ).splitlines():
121                _LOG.debug(
122                    '%s',
123                    line,
124                    extra=dict(
125                        extra_metadata_fields={
126                            'msg': line,
127                            'view': 'lines',
128                        }
129                    ),
130                )
131
132        return data
133
134    def write(self, data: ReadableBuffer) -> None:
135        if isinstance(data, bytes) and len(data) > 0:
136            self.pw_bps_history['write'].log(len(data))
137            self.pw_bps_history['total'].log(len(data))
138
139            prefix = 'Write %2d B: ' % len(data)
140            _LOG.debug(
141                '%s%s',
142                prefix,
143                data,
144                extra=dict(
145                    extra_metadata_fields={
146                        'mode': 'Write',
147                        'bytes': len(data),
148                        'view': 'bytes',
149                        'msg': str(data),
150                    }
151                ),
152            )
153            _log_hex_strings(data, prefix=prefix)
154
155            # Print individual lines
156            for line in data.decode(
157                encoding='utf-8', errors='ignore'
158            ).splitlines():
159                _LOG.debug(
160                    '%s',
161                    line,
162                    extra=dict(
163                        extra_metadata_fields={
164                            'msg': line,
165                            'view': 'lines',
166                        }
167                    ),
168                )
169
170        super().write(data)
171