1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Library to analyze timestamps.""" 15 16from datetime import datetime, timedelta, timezone 17import logging 18import pw_tokenizer 19from pw_chrono_protos import chrono_pb2 20from pw_tokenizer import proto as proto_detokenizer 21 22_LOG = logging.getLogger(__name__) 23 24 25def process_snapshot( 26 serialized_snapshot: bytes, 27 detokenizer: pw_tokenizer.Detokenizer | None = None, 28): 29 snapshot = chrono_pb2.SnapshotTimestamps() 30 snapshot.ParseFromString(serialized_snapshot) 31 32 output: list[str] = [] 33 for timestamp in snapshot.timestamps or []: 34 proto_detokenizer.detokenize_fields( 35 detokenizer, timestamp.clock_parameters 36 ) 37 try: 38 time_info = TimePointInfo(timestamp) 39 output.append(f' {time_info.name_str()}: {time_info}') 40 except ValueError: 41 _LOG.warning('Failed to decode timestamp:\n%s', str(timestamp)) 42 43 if not output: 44 return '' 45 plural = '' if len(output) == 1 else 's' 46 output.insert(0, f'Snapshot capture timestamp{plural}') 47 output.append('') 48 return '\n'.join(output) 49 50 51class TimePointInfo: 52 """Decodes pw.chrono.TimePoint protos into various representations.""" 53 54 def __init__(self, timepoint: chrono_pb2.TimePoint): 55 self._timepoint = timepoint 56 parameters = self._timepoint.clock_parameters 57 if ( 58 parameters.tick_period_seconds_denominator == 0 59 or parameters.tick_period_seconds_numerator == 0 60 ): 61 raise ValueError('Invalid timepoint') 62 self._timepoint = timepoint 63 64 def ticks_per_sec(self) -> float: 65 parameters = self._timepoint.clock_parameters 66 return ( 67 parameters.tick_period_seconds_denominator 68 / parameters.tick_period_seconds_numerator 69 ) 70 71 def period_suffix(self) -> str: 72 return { 73 1.0: 's', 74 1000.0: 'ms', 75 1_000_000.0: 'us', 76 1_000_000_000.0: 'ns', 77 }.get(self.ticks_per_sec(), '') 78 79 def duration(self) -> timedelta: 80 return timedelta( 81 seconds=self._timepoint.timestamp / self.ticks_per_sec() 82 ) 83 84 def as_unix_time(self, tz: timezone = timezone.utc) -> datetime: 85 return datetime.fromtimestamp(self.duration().total_seconds(), tz) 86 87 def tick_count_str(self) -> str: 88 return f'{self._timepoint.timestamp} {self.period_suffix()}'.rstrip() 89 90 def __str__(self) -> str: 91 epoch_type = self._timepoint.clock_parameters.epoch_type 92 if epoch_type == chrono_pb2.EpochType.Enum.TIME_SINCE_BOOT: 93 return f'{self.duration()} ({self.tick_count_str()})' 94 if epoch_type == chrono_pb2.EpochType.Enum.UTC_WALL_CLOCK: 95 return f'{self.as_unix_time()} ({self.tick_count_str()})' 96 return self.tick_count_str() 97 98 def name_str(self) -> str: 99 parameters = self._timepoint.clock_parameters 100 try: 101 epoch_str = chrono_pb2.EpochType.Enum.Name(parameters.epoch_type) 102 except ValueError: 103 epoch_str = str(parameters.epoch_type) 104 105 if parameters.name: 106 return f'{parameters.name.decode()} (epoch {epoch_str})' 107 if parameters.epoch_type == chrono_pb2.EpochType.Enum.TIME_SINCE_BOOT: 108 return 'Time since boot' 109 if parameters.epoch_type == chrono_pb2.EpochType.Enum.UTC_WALL_CLOCK: 110 return 'UTC time' 111 return f'Timestamp (epoch {epoch_str})' 112