1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Wrapers for pyserial classes to log read and write data.""" 15from __future__ import annotations 16 17from contextvars import ContextVar 18import logging 19import textwrap 20from typing import TYPE_CHECKING 21 22import serial 23 24from pw_console.widgets.event_count_history import EventCountHistory 25 26if TYPE_CHECKING: 27 from _typeshed import ReadableBuffer 28 29_LOG = logging.getLogger('pw_console.serial_debug_logger') 30 31 32def _log_hex_strings(data: bytes, prefix=''): 33 """Create alinged hex number and character view log messages.""" 34 # Make a list of 2 character hex number strings. 35 hex_numbers = textwrap.wrap(data.hex(), 2) 36 37 hex_chars = [ 38 ('<' + str(b.to_bytes(1, byteorder='big')) + '>') 39 .replace("<b'\\x", '', 1) # Remove b'\x from the beginning 40 .replace("<b'", '', 1) # Remove b' from the beginning 41 .replace("'>", '', 1) # Remove ' from the end 42 .rjust(2) 43 for b in data 44 ] 45 46 # Replace non-printable bytes with dots. 47 for i, num in enumerate(hex_numbers): 48 if num == hex_chars[i]: 49 hex_chars[i] = '..' 50 51 hex_numbers_msg = ' '.join(hex_numbers) 52 hex_chars_msg = ' '.join(hex_chars) 53 54 _LOG.debug( 55 '%s%s', 56 prefix, 57 hex_numbers_msg, 58 extra=dict( 59 extra_metadata_fields={ 60 'msg': hex_numbers_msg, 61 'view': 'hex', 62 } 63 ), 64 ) 65 _LOG.debug( 66 '%s%s', 67 prefix, 68 hex_chars_msg, 69 extra=dict( 70 extra_metadata_fields={ 71 'msg': hex_chars_msg, 72 'view': 'chars', 73 } 74 ), 75 ) 76 77 78BANDWIDTH_HISTORY_CONTEXTVAR = ContextVar( 79 'pw_console_bandwidth_history', 80 default={ 81 'total': EventCountHistory(interval=3), 82 'read': EventCountHistory(interval=3), 83 'write': EventCountHistory(interval=3), 84 }, 85) 86 87 88class SerialWithLogging(serial.Serial): # pylint: disable=too-many-ancestors 89 """pyserial with read and write wrappers for logging.""" 90 91 def __init__(self, *args, **kwargs): 92 super().__init__(*args, **kwargs) 93 self.pw_bps_history = BANDWIDTH_HISTORY_CONTEXTVAR.get() 94 95 def read(self, size: int = 1) -> bytes: 96 data = super().read(size) 97 self.pw_bps_history['read'].log(len(data)) 98 self.pw_bps_history['total'].log(len(data)) 99 100 if len(data) > 0: 101 prefix = 'Read %2d B: ' % len(data) 102 _LOG.debug( 103 '%s%s', 104 prefix, 105 data, 106 extra=dict( 107 extra_metadata_fields={ 108 'mode': 'Read', 109 'bytes': len(data), 110 'view': 'bytes', 111 'msg': str(data), 112 } 113 ), 114 ) 115 _log_hex_strings(data, prefix=prefix) 116 117 # Print individual lines 118 for line in data.decode( 119 encoding='utf-8', errors='ignore' 120 ).splitlines(): 121 _LOG.debug( 122 '%s', 123 line, 124 extra=dict( 125 extra_metadata_fields={ 126 'msg': line, 127 'view': 'lines', 128 } 129 ), 130 ) 131 132 return data 133 134 def write(self, data: ReadableBuffer) -> None: 135 if isinstance(data, bytes) and len(data) > 0: 136 self.pw_bps_history['write'].log(len(data)) 137 self.pw_bps_history['total'].log(len(data)) 138 139 prefix = 'Write %2d B: ' % len(data) 140 _LOG.debug( 141 '%s%s', 142 prefix, 143 data, 144 extra=dict( 145 extra_metadata_fields={ 146 'mode': 'Write', 147 'bytes': len(data), 148 'view': 'bytes', 149 'msg': str(data), 150 } 151 ), 152 ) 153 _log_hex_strings(data, prefix=prefix) 154 155 # Print individual lines 156 for line in data.decode( 157 encoding='utf-8', errors='ignore' 158 ).splitlines(): 159 _LOG.debug( 160 '%s', 161 line, 162 extra=dict( 163 extra_metadata_fields={ 164 'msg': line, 165 'view': 'lines', 166 } 167 ), 168 ) 169 170 super().write(data) 171