• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2#
3# Copyright 2011 Google Inc. All Rights Reserved.
4#
5"""Tools for recording and reporting timeline of abstract events.
6
7You can store any events provided that they can be stringified.
8"""
9
10__author__ = 'kbaclawski@google.com (Krystian Baclawski)'
11
12import collections
13import datetime
14import time
15
16
17class _EventRecord(object):
18  """Internal class.  Attaches extra information to an event."""
19
20  def __init__(self, event, time_started=None, time_elapsed=None):
21    self._event = event
22    self._time_started = time_started or time.time()
23    self._time_elapsed = None
24
25    if time_elapsed:
26      self.time_elapsed = time_elapsed
27
28  @property
29  def event(self):
30    return self._event
31
32  @property
33  def time_started(self):
34    return self._time_started
35
36  def _TimeElapsedGet(self):
37    if self.has_finished:
38      time_elapsed = self._time_elapsed
39    else:
40      time_elapsed = time.time() - self._time_started
41
42    return datetime.timedelta(seconds=time_elapsed)
43
44  def _TimeElapsedSet(self, time_elapsed):
45    if isinstance(time_elapsed, datetime.timedelta):
46      self._time_elapsed = time_elapsed.seconds
47    else:
48      self._time_elapsed = time_elapsed
49
50  time_elapsed = property(_TimeElapsedGet, _TimeElapsedSet)
51
52  @property
53  def has_finished(self):
54    return self._time_elapsed is not None
55
56  def GetTimeStartedFormatted(self):
57    return time.strftime('%m/%d/%Y %H:%M:%S', time.gmtime(self._time_started))
58
59  def GetTimeElapsedRounded(self):
60    return datetime.timedelta(seconds=int(self.time_elapsed.seconds))
61
62  def Finish(self):
63    if not self.has_finished:
64      self._time_elapsed = time.time() - self._time_started
65
66
67class _Transition(collections.namedtuple('_Transition', ('from_', 'to_'))):
68  """Internal class.  Represents transition point between events / states."""
69
70  def __str__(self):
71    return '%s => %s' % (self.from_, self.to_)
72
73
74class EventHistory(collections.Sequence):
75  """Records events and provides human readable events timeline."""
76
77  def __init__(self, records=None):
78    self._records = records or []
79
80  def __len__(self):
81    return len(self._records)
82
83  def __iter__(self):
84    return iter(self._records)
85
86  def __getitem__(self, index):
87    return self._records[index]
88
89  @property
90  def last(self):
91    if self._records:
92      return self._records[-1]
93
94  def AddEvent(self, event):
95    if self.last:
96      self.last.Finish()
97
98    evrec = _EventRecord(event)
99    self._records.append(evrec)
100    return evrec
101
102  def GetTotalTime(self):
103    if self._records:
104      total_time_elapsed = sum(evrec.time_elapsed.seconds
105                               for evrec in self._records)
106
107      return datetime.timedelta(seconds=int(total_time_elapsed))
108
109  def GetTransitionEventHistory(self):
110    records = []
111
112    if self._records:
113      for num, next_evrec in enumerate(self._records[1:], start=1):
114        evrec = self._records[num - 1]
115
116        records.append(_EventRecord(
117            _Transition(evrec.event, next_evrec.event), evrec.time_started,
118            evrec.time_elapsed))
119
120      if not self.last.has_finished:
121        records.append(_EventRecord(
122            _Transition(self.last.event,
123                        'NOW'), self.last.time_started, self.last.time_elapsed))
124
125    return EventHistory(records)
126
127  @staticmethod
128  def _GetReport(history, report_name):
129    report = [report_name]
130
131    for num, evrec in enumerate(history, start=1):
132      time_elapsed = str(evrec.GetTimeElapsedRounded())
133
134      if not evrec.has_finished:
135        time_elapsed.append(' (not finished)')
136
137      report.append('%d) %s: %s: %s' % (num, evrec.GetTimeStartedFormatted(),
138                                        evrec.event, time_elapsed))
139
140    report.append('Total Time: %s' % history.GetTotalTime())
141
142    return '\n'.join(report)
143
144  def GetEventReport(self):
145    return EventHistory._GetReport(self, 'Timeline of events:')
146
147  def GetTransitionEventReport(self):
148    return EventHistory._GetReport(self.GetTransitionEventHistory(),
149                                   'Timeline of transition events:')
150