1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Wrapers for pyserial classes to log read and write data.""" 15 16import collections 17import logging 18from dataclasses import dataclass 19import time 20from typing import Optional 21 22_LOG = logging.getLogger('pw_console') 23 24 25@dataclass 26class EventCountHistory: 27 """Track counts of events over time. 28 29 Example usage: :: 30 31 events = EventCountHistory( 32 base_count_units='Bytes', 33 display_unit_title='KiB/s', 34 display_unit_factor=0.001, 35 interval=1.0, 36 show_sparkline=True) 37 38 # Log 1 event now. 39 events.log(1) 40 time.sleep(1) 41 42 # Log 100 events at this time. 43 events.log(100) 44 time.sleep(1) 45 46 events.log(200) 47 time.sleep(1) 48 events.log(400) 49 print(events) 50 ▂▄█ 0.400 [KiB/s] 51 52 """ 53 54 base_count_units: str = 'Bytes' 55 display_unit_title: str = 'KiB/s' 56 display_unit_factor: float = 0.001 57 interval: float = 1.0 # Number of seconds per sum of events. 58 history_limit: int = 20 59 scale_characters = ' ▁▂▃▄▅▆▇█' 60 history: collections.deque = collections.deque() 61 show_sparkline: bool = False 62 _this_count: int = 0 63 _last_count: int = 0 64 _last_update_time: float = time.time() 65 66 def log(self, count: int) -> None: 67 self._this_count += count 68 69 this_time = time.time() 70 if this_time - self._last_update_time >= self.interval: 71 self._last_update_time = this_time 72 self._last_count = self._this_count 73 self._this_count = 0 74 self.history.append(self._last_count) 75 76 if len(self.history) > self.history_limit: 77 self.history.popleft() 78 79 def last_count(self) -> float: 80 return self._last_count * self.display_unit_factor 81 82 def last_count_raw(self) -> int: 83 return self._last_count 84 85 def last_count_with_units(self) -> str: 86 return '{:.3f} [{}]'.format( 87 self._last_count * self.display_unit_factor, 88 self.display_unit_title) 89 90 def __repr__(self) -> str: 91 sparkline = '' 92 if self.show_sparkline: 93 sparkline = self.sparkline() 94 return ' '.join([sparkline, self.last_count_with_units()]) 95 96 def __pt_formatted_text__(self): 97 return [('', self.__repr__())] 98 99 def sparkline(self, 100 min_value: int = 0, 101 max_value: Optional[int] = None) -> str: 102 msg = ''.rjust(self.history_limit) 103 if len(self.history) == 0: 104 return msg 105 106 minimum = min_value 107 maximum = max_value if max_value else max(self.history) 108 max_minus_min = maximum - min_value 109 if max_minus_min == 0: 110 return msg 111 112 msg = '' 113 for i in self.history: 114 # (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min 115 index = int((((1.0 * i) - minimum) / max_minus_min) * 116 len(self.scale_characters)) 117 if index >= len(self.scale_characters): 118 index = len(self.scale_characters) - 1 119 msg += self.scale_characters[index] 120 return msg.rjust(self.history_limit) 121