• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Library to analyze timestamps."""
15
16from datetime import datetime, timedelta, timezone
17import logging
18import pw_tokenizer
19from pw_chrono_protos import chrono_pb2
20from pw_tokenizer import proto as proto_detokenizer
21
22_LOG = logging.getLogger(__name__)
23
24
25def process_snapshot(
26    serialized_snapshot: bytes,
27    detokenizer: pw_tokenizer.Detokenizer | None = None,
28):
29    snapshot = chrono_pb2.SnapshotTimestamps()
30    snapshot.ParseFromString(serialized_snapshot)
31
32    output: list[str] = []
33    for timestamp in snapshot.timestamps or []:
34        proto_detokenizer.detokenize_fields(
35            detokenizer, timestamp.clock_parameters
36        )
37        try:
38            time_info = TimePointInfo(timestamp)
39            output.append(f'  {time_info.name_str()}: {time_info}')
40        except ValueError:
41            _LOG.warning('Failed to decode timestamp:\n%s', str(timestamp))
42
43    if not output:
44        return ''
45    plural = '' if len(output) == 1 else 's'
46    output.insert(0, f'Snapshot capture timestamp{plural}')
47    output.append('')
48    return '\n'.join(output)
49
50
51class TimePointInfo:
52    """Decodes pw.chrono.TimePoint protos into various representations."""
53
54    def __init__(self, timepoint: chrono_pb2.TimePoint):
55        self._timepoint = timepoint
56        parameters = self._timepoint.clock_parameters
57        if (
58            parameters.tick_period_seconds_denominator == 0
59            or parameters.tick_period_seconds_numerator == 0
60        ):
61            raise ValueError('Invalid timepoint')
62        self._timepoint = timepoint
63
64    def ticks_per_sec(self) -> float:
65        parameters = self._timepoint.clock_parameters
66        return (
67            parameters.tick_period_seconds_denominator
68            / parameters.tick_period_seconds_numerator
69        )
70
71    def period_suffix(self) -> str:
72        return {
73            1.0: 's',
74            1000.0: 'ms',
75            1_000_000.0: 'us',
76            1_000_000_000.0: 'ns',
77        }.get(self.ticks_per_sec(), '')
78
79    def duration(self) -> timedelta:
80        return timedelta(
81            seconds=self._timepoint.timestamp / self.ticks_per_sec()
82        )
83
84    def as_unix_time(self, tz: timezone = timezone.utc) -> datetime:
85        return datetime.fromtimestamp(self.duration().total_seconds(), tz)
86
87    def tick_count_str(self) -> str:
88        return f'{self._timepoint.timestamp} {self.period_suffix()}'.rstrip()
89
90    def __str__(self) -> str:
91        epoch_type = self._timepoint.clock_parameters.epoch_type
92        if epoch_type == chrono_pb2.EpochType.Enum.TIME_SINCE_BOOT:
93            return f'{self.duration()} ({self.tick_count_str()})'
94        if epoch_type == chrono_pb2.EpochType.Enum.UTC_WALL_CLOCK:
95            return f'{self.as_unix_time()} ({self.tick_count_str()})'
96        return self.tick_count_str()
97
98    def name_str(self) -> str:
99        parameters = self._timepoint.clock_parameters
100        try:
101            epoch_str = chrono_pb2.EpochType.Enum.Name(parameters.epoch_type)
102        except ValueError:
103            epoch_str = str(parameters.epoch_type)
104
105        if parameters.name:
106            return f'{parameters.name.decode()} (epoch {epoch_str})'
107        if parameters.epoch_type == chrono_pb2.EpochType.Enum.TIME_SINCE_BOOT:
108            return 'Time since boot'
109        if parameters.epoch_type == chrono_pb2.EpochType.Enum.UTC_WALL_CLOCK:
110            return 'UTC time'
111        return f'Timestamp (epoch {epoch_str})'
112