• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Wrapers for pyserial classes to log read and write data."""
15
16import collections
17import logging
18from dataclasses import dataclass
19import time
20from typing import Optional
21
22_LOG = logging.getLogger('pw_console')
23
24
25@dataclass
26class EventCountHistory:
27    """Track counts of events over time.
28
29    Example usage: ::
30
31        events = EventCountHistory(
32            base_count_units='Bytes',
33            display_unit_title='KiB/s',
34            display_unit_factor=0.001,
35            interval=1.0,
36            show_sparkline=True)
37
38        # Log 1 event now.
39        events.log(1)
40        time.sleep(1)
41
42        # Log 100 events at this time.
43        events.log(100)
44        time.sleep(1)
45
46        events.log(200)
47        time.sleep(1)
48        events.log(400)
49        print(events)
50        ▂▄█ 0.400 [KiB/s]
51
52    """
53
54    base_count_units: str = 'Bytes'
55    display_unit_title: str = 'KiB/s'
56    display_unit_factor: float = 0.001
57    interval: float = 1.0  # Number of seconds per sum of events.
58    history_limit: int = 20
59    scale_characters = ' ▁▂▃▄▅▆▇█'
60    history: collections.deque = collections.deque()
61    show_sparkline: bool = False
62    _this_count: int = 0
63    _last_count: int = 0
64    _last_update_time: float = time.time()
65
66    def log(self, count: int) -> None:
67        self._this_count += count
68
69        this_time = time.time()
70        if this_time - self._last_update_time >= self.interval:
71            self._last_update_time = this_time
72            self._last_count = self._this_count
73            self._this_count = 0
74            self.history.append(self._last_count)
75
76            if len(self.history) > self.history_limit:
77                self.history.popleft()
78
79    def last_count(self) -> float:
80        return self._last_count * self.display_unit_factor
81
82    def last_count_raw(self) -> int:
83        return self._last_count
84
85    def last_count_with_units(self) -> str:
86        return '{:.3f} [{}]'.format(
87            self._last_count * self.display_unit_factor,
88            self.display_unit_title)
89
90    def __repr__(self) -> str:
91        sparkline = ''
92        if self.show_sparkline:
93            sparkline = self.sparkline()
94        return ' '.join([sparkline, self.last_count_with_units()])
95
96    def __pt_formatted_text__(self):
97        return [('', self.__repr__())]
98
99    def sparkline(self,
100                  min_value: int = 0,
101                  max_value: Optional[int] = None) -> str:
102        msg = ''.rjust(self.history_limit)
103        if len(self.history) == 0:
104            return msg
105
106        minimum = min_value
107        maximum = max_value if max_value else max(self.history)
108        max_minus_min = maximum - min_value
109        if max_minus_min == 0:
110            return msg
111
112        msg = ''
113        for i in self.history:
114            # (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
115            index = int((((1.0 * i) - minimum) / max_minus_min) *
116                        len(self.scale_characters))
117            if index >= len(self.scale_characters):
118                index = len(self.scale_characters) - 1
119            msg += self.scale_characters[index]
120        return msg.rjust(self.history_limit)
121