• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Wrapper library around ts_mon.
7
8This library provides some wrapper functionality around ts_mon, to make it more
9friendly to developers. It also provides import safety, in case ts_mon is not
10deployed with your code.
11"""
12
13from __future__ import division
14from __future__ import print_function
15
16import collections
17import contextlib
18import ssl
19import time
20from functools import wraps
21
22import six
23from six.moves import queue as Queue
24
25from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
26
27try:
28  from infra_libs import ts_mon
29except (ImportError, RuntimeError):
30  ts_mon = None
31
32
33# This number is chosen because 1.16^100 seconds is about
34# 32 days. This is a good compromise between bucket size
35# and dynamic range.
36_SECONDS_BUCKET_FACTOR = 1.16
37
38# If none, we create metrics in this process. Otherwise, we send metrics via
39# this Queue to a dedicated flushing processes.
40# These attributes are set by chromite.lib.ts_mon_config.SetupTsMonGlobalState.
41FLUSHING_PROCESS = None
42MESSAGE_QUEUE = None
43
44_MISSING = object()
45
46MetricCall = collections.namedtuple('MetricCall', [
47    'metric_name', 'metric_args', 'metric_kwargs',
48    'method', 'method_args', 'method_kwargs',
49    'reset_after'
50])
51
52
53def _FlushingProcessClosed():
54  """Returns whether the metrics flushing process has been closed."""
55  return (FLUSHING_PROCESS is not None and
56          FLUSHING_PROCESS.exitcode is not None)
57
58
59class ProxyMetric(object):
60  """Redirects any method calls to the message queue."""
61  def __init__(self, metric, metric_args, metric_kwargs):
62    self.metric = metric
63    self.metric_args = metric_args
64    self.reset_after = metric_kwargs.pop('reset_after', False)
65    self.metric_kwargs = metric_kwargs
66
67  def __getattr__(self, method_name):
68    """Redirects all method calls to the MESSAGE_QUEUE."""
69    def enqueue(*args, **kwargs):
70      if not _FlushingProcessClosed():
71        try:
72          MESSAGE_QUEUE.put_nowait(
73              MetricCall(
74                  metric_name=self.metric,
75                  metric_args=self.metric_args,
76                  metric_kwargs=self.metric_kwargs,
77                  method=method_name,
78                  method_args=args,
79                  method_kwargs=kwargs,
80                  reset_after=self.reset_after))
81        except Queue.Full:
82          logging.warning(
83              "Metrics queue is full; skipped sending metric '%s'",
84              self.metric)
85      else:
86        try:
87          exit_code = FLUSHING_PROCESS.exitcode
88        except AttributeError:
89          exit_code = None
90        logging.warning(
91            'Flushing process has been closed (exit code %s),'
92            " skipped sending metric '%s'",
93            exit_code,
94            self.metric)
95
96    return enqueue
97
98
99def _Indirect(fn):
100  """Decorates a function to be indirect If MESSAGE_QUEUE is set.
101
102  If MESSAGE_QUEUE is set, the indirect function will return a proxy metrics
103  object; otherwise, it behaves normally.
104  """
105  @wraps(fn)
106  def AddToQueueIfPresent(*args, **kwargs):
107    if MESSAGE_QUEUE:
108      return ProxyMetric(fn.__name__, args, kwargs)
109    else:
110      # Whether to reset the metric after the flush; this is only used by
111      # |ProxyMetric|, so remove this from the kwargs.
112      kwargs.pop('reset_after', None)
113      return fn(*args, **kwargs)
114  return AddToQueueIfPresent
115
116
117class MockMetric(object):
118  """Mock metric object, to be returned if ts_mon is not set up."""
119
120  def _mock_method(self, *args, **kwargs):
121    pass
122
123  def __getattr__(self, _):
124    return self._mock_method
125
126
127def _ImportSafe(fn):
128  """Decorator which causes |fn| to return MockMetric if ts_mon not imported."""
129  @wraps(fn)
130  def wrapper(*args, **kwargs):
131    if ts_mon:
132      return fn(*args, **kwargs)
133    else:
134      return MockMetric()
135
136  return wrapper
137
138
139class FieldSpecAdapter(object):
140  """Infers the types of fields values to work around field_spec requirement.
141
142  See: https://chromium-review.googlesource.com/c/432120/ for the change
143  which added a required field_spec argument. This class is a temporary
144  workaround to allow inferring the field_spec if is not provided.
145  """
146  FIELD_CLASSES = {} if ts_mon is None else {
147      bool: ts_mon.BooleanField,
148      int: ts_mon.IntegerField,
149      str: ts_mon.StringField,
150      six.text_type: ts_mon.StringField,
151  }
152
153  def __init__(self, metric_cls, *args, **kwargs):
154    self._metric_cls = metric_cls
155    self._args = args
156    self._kwargs = kwargs
157    self._instance = _MISSING
158
159  def __getattr__(self, prop):
160    """Return a wrapper which constructs the metric object on demand.
161
162    Args:
163      prop: The property name
164
165    Returns:
166      If self._instance has been created, the instance's .|prop| property,
167      otherwise, a wrapper function which creates the ._instance and then
168      calls the |prop| method on the instance.
169    """
170    if self._instance is not _MISSING:
171      return getattr(self._instance, prop)
172
173    def func(*args, **kwargs):
174      if self._instance is not _MISSING:
175        return getattr(self._instance, prop)(*args, **kwargs)
176      fields = FieldSpecAdapter._InferFields(prop, args, kwargs)
177      self._kwargs['field_spec'] = FieldSpecAdapter._InferFieldSpec(fields)
178      self._instance = self._metric_cls(*self._args, **self._kwargs)
179      return getattr(self._instance, prop)(*args, **kwargs)
180
181    func.__name__ = prop
182    return func
183
184  @staticmethod
185  def _InferFields(method_name, args, kwargs):
186    """Infers the fields argument.
187
188    Args:
189      method_name: The method called.
190      args: The args list
191      kwargs: The keyword args
192    """
193    if 'fields' in kwargs:
194      return kwargs['fields']
195
196    if method_name == 'increment' and args:
197      return args[0]
198
199    if len(args) >= 2:
200      return args[1]
201
202  @staticmethod
203  def _InferFieldSpec(fields):
204    """Infers the fields types from the given fields.
205
206    Args:
207      fields: A dictionary with metric fields.
208    """
209    if not fields or not ts_mon:
210      return None
211
212    return [FieldSpecAdapter.FIELD_CLASSES[type(v)](field)
213            for (field, v) in sorted(fields.items())]
214
215
216def _OptionalFieldSpec(fn):
217  """Decorates a function to allow an optional description and field_spec."""
218  @wraps(fn)
219  def wrapper(*args, **kwargs):
220    kwargs = dict(**kwargs)  # It's bad practice to mutate **kwargs
221    # Slightly different than .setdefault, this line sets a default even when
222    # the key is present (as long as the value is not truthy). Empty or None is
223    # not allowed for descriptions.
224    kwargs['description'] = kwargs.get('description') or 'No description.'
225    if 'field_spec' in kwargs and kwargs['field_spec'] is not _MISSING:
226      return fn(*args, **kwargs)
227    else:
228      return FieldSpecAdapter(fn, *args, **kwargs)
229  return wrapper
230
231
232def _Metric(fn):
233  """A pipeline of decorators to apply to our metric constructors."""
234  return _OptionalFieldSpec(_ImportSafe(_Indirect(fn)))
235
236
237# This is needed for the reset_after flag used by @Indirect.
238# pylint: disable=unused-argument
239
240@_Metric
241def CounterMetric(name, reset_after=False, description=None,
242                  field_spec=_MISSING, start_time=None):
243  """Returns a metric handle for a counter named |name|."""
244  return ts_mon.CounterMetric(name,
245                              description=description, field_spec=field_spec,
246                              start_time=start_time)
247Counter = CounterMetric
248
249
250@_Metric
251def GaugeMetric(name, reset_after=False, description=None, field_spec=_MISSING):
252  """Returns a metric handle for a gauge named |name|."""
253  return ts_mon.GaugeMetric(name, description=description,
254                            field_spec=field_spec)
255Gauge = GaugeMetric
256
257
258@_Metric
259def CumulativeMetric(name, reset_after=False, description=None,
260                     field_spec=_MISSING):
261  """Returns a metric handle for a cumulative float named |name|."""
262  return ts_mon.CumulativeMetric(name, description=description,
263                                 field_spec=field_spec)
264
265
266@_Metric
267def StringMetric(name, reset_after=False, description=None,
268                 field_spec=_MISSING):
269  """Returns a metric handle for a string named |name|."""
270  return ts_mon.StringMetric(name, description=description,
271                             field_spec=field_spec)
272String = StringMetric
273
274
275@_Metric
276def BooleanMetric(name, reset_after=False, description=None,
277                  field_spec=_MISSING):
278  """Returns a metric handle for a boolean named |name|."""
279  return ts_mon.BooleanMetric(name, description=description,
280                              field_spec=field_spec)
281Boolean = BooleanMetric
282
283
284@_Metric
285def FloatMetric(name, reset_after=False, description=None, field_spec=_MISSING):
286  """Returns a metric handle for a float named |name|."""
287  return ts_mon.FloatMetric(name, description=description,
288                            field_spec=field_spec)
289Float = FloatMetric
290
291
292@_Metric
293def CumulativeDistributionMetric(name, reset_after=False, description=None,
294                                 bucketer=None, field_spec=_MISSING):
295  """Returns a metric handle for a cumulative distribution named |name|."""
296  return ts_mon.CumulativeDistributionMetric(
297      name, description=description, bucketer=bucketer, field_spec=field_spec)
298CumulativeDistribution = CumulativeDistributionMetric
299
300
301@_Metric
302def DistributionMetric(name, reset_after=False, description=None,
303                       bucketer=None, field_spec=_MISSING):
304  """Returns a metric handle for a distribution named |name|."""
305  return ts_mon.NonCumulativeDistributionMetric(
306      name, description=description, bucketer=bucketer, field_spec=field_spec)
307Distribution = DistributionMetric
308
309
310@_Metric
311def CumulativeSmallIntegerDistribution(name, reset_after=False,
312                                       description=None, field_spec=_MISSING):
313  """Returns a metric handle for a cumulative distribution named |name|.
314
315  This differs slightly from CumulativeDistribution, in that the underlying
316  metric uses a uniform bucketer rather than a geometric one.
317
318  This metric type is suitable for holding a distribution of numbers that are
319  nonnegative integers in the range of 0 to 100.
320  """
321  return ts_mon.CumulativeDistributionMetric(
322      name,
323      bucketer=ts_mon.FixedWidthBucketer(1),
324      description=description,
325      field_spec=field_spec)
326
327
328@_Metric
329def CumulativeSecondsDistribution(name, scale=1, reset_after=False,
330                                  description=None, field_spec=_MISSING):
331  """Returns a metric handle for a cumulative distribution named |name|.
332
333  The distribution handle returned by this method is better suited than the
334  default one for recording handling times, in seconds.
335
336  This metric handle has bucketing that is optimized for time intervals
337  (in seconds) in the range of 1 second to 32 days. Use |scale| to adjust this
338  (e.g. scale=0.1 covers a range from .1 seconds to 3.2 days).
339
340  Args:
341    name: string name of metric
342    scale: scaling factor of buckets, and size of the first bucket. default: 1
343    reset_after: Should the metric be reset after reporting.
344    description: A string description of the metric.
345    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
346  """
347  b = ts_mon.GeometricBucketer(growth_factor=_SECONDS_BUCKET_FACTOR,
348                               scale=scale)
349  return ts_mon.CumulativeDistributionMetric(
350      name, bucketer=b, units=ts_mon.MetricsDataUnits.SECONDS,
351      description=description, field_spec=field_spec)
352
353SecondsDistribution = CumulativeSecondsDistribution
354
355
356@_Metric
357def PercentageDistribution(
358    name, num_buckets=1000, reset_after=False,
359    description=None, field_spec=_MISSING):
360  """Returns a metric handle for a cumulative distribution for percentage.
361
362  The distribution handle returned by this method is better suited for reporting
363  percentage values than the default one. The bucketing is optimized for values
364  in [0,100].
365
366  Args:
367    name: The name of this metric.
368    num_buckets: This metric buckets the percentage values before
369        reporting. This argument controls the number of the bucket the range
370        [0,100] is divided in. The default gives you 0.1% resolution.
371    reset_after: Should the metric be reset after reporting.
372    description: A string description of the metric.
373    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
374  """
375  # The last bucket actually covers [100, 100 + 1.0/num_buckets), so it
376  # corresponds to values that exactly match 100%.
377  bucket_width = 100 / num_buckets
378  b = ts_mon.FixedWidthBucketer(bucket_width, num_buckets)
379  return ts_mon.CumulativeDistributionMetric(
380      name, bucketer=b,
381      description=description, field_spec=field_spec)
382
383
384@contextlib.contextmanager
385def SecondsTimer(name, fields=None, description=None, field_spec=_MISSING,
386                 scale=1, record_on_exception=True, add_exception_field=False):
387  """Record the time of an operation to a CumulativeSecondsDistributionMetric.
388
389  Records the time taken inside of the context block, to the
390  CumulativeSecondsDistribution named |name|, with the given fields.
391
392  Examples:
393    # Time the doSomething() call, with field values that are independent of the
394    # results of the operation.
395    with SecondsTimer('timer/name', fields={'foo': 'bar'},
396                      description='My timer',
397                      field_spec=[ts_mon.StringField('foo'),
398                                  ts_mon.BooleanField('success')]):
399      doSomething()
400
401    # Time the doSomethingElse call, with field values that depend on the
402    # results of that operation. Note that it is important that a default value
403    # is specified for these fields, in case an exception is thrown by
404    # doSomethingElse()
405    f = {'success': False, 'foo': 'bar'}
406    with SecondsTimer('timer/name', fields=f, description='My timer',
407                      field_spec=[ts_mon.StringField('foo')]) as c:
408      doSomethingElse()
409      c['success'] = True
410
411    # Incorrect Usage!
412    with SecondsTimer('timer/name', description='My timer') as c:
413      doSomething()
414      c['foo'] = bar # 'foo' is not a valid field, because no default
415                     # value for it was specified in the context constructor.
416                     # It will be silently ignored.
417
418  Args:
419    name: The name of the metric to create
420    fields: The fields of the metric to create.
421    description: A string description of the metric.
422    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
423    scale: A float to scale the CumulativeSecondsDistribution buckets by.
424    record_on_exception: Whether to record metrics if an exception is raised.
425    add_exception_field: Whether to add a BooleanField('encountered_exception')
426        to the FieldSpec provided, and set its value to True iff an exception
427        was raised in the context.
428  """
429  if field_spec is not None and field_spec is not _MISSING:
430    field_spec.append(ts_mon.BooleanField('encountered_exception'))
431
432  m = CumulativeSecondsDistribution(
433      name, scale=scale, description=description, field_spec=field_spec)
434  f = fields or {}
435  f = dict(f)
436  keys = list(f)
437  t0 = _GetSystemClock()
438
439  error = True
440  try:
441    yield f
442    error = False
443  finally:
444    if record_on_exception and add_exception_field:
445      keys.append('encountered_exception')
446      f.setdefault('encountered_exception', error)
447    # Filter out keys that were not part of the initial key set. This is to
448    # avoid inconsistent fields.
449    # TODO(akeshet): Doing this filtering isn't super efficient. Would be better
450    # to implement some key-restricted subclass or wrapper around dict, and just
451    # yield that above rather than yielding a regular dict.
452    if record_on_exception or not error:
453      dt = _GetSystemClock() - t0
454      # TODO(ayatane): Handle backward clock jumps.  See _GetSystemClock.
455      if dt >= 0:
456        m.add(dt, fields={k: f[k] for k in keys})
457
458
459def SecondsTimerDecorator(name, fields=None, description=None,
460                          field_spec=_MISSING, scale=1,
461                          record_on_exception=True, add_exception_field=False):
462  """Decorator to time the duration of function calls.
463
464  Examples:
465    @SecondsTimerDecorator('timer/name', fields={'foo': 'bar'},
466                           description='My timer',
467                           field_spec=[ts_mon.StringField('foo')])
468    def Foo(bar):
469      return doStuff()
470
471    is equivalent to
472
473    def Foo(bar):
474      with SecondsTimer('timer/name', fields={'foo': 'bar'},
475                        description='My timer',
476                        field_spec=[ts_mon.StringField('foo')])
477        return doStuff()
478
479  Args:
480    name: The name of the metric to create
481    fields: The fields of the metric to create
482    description: A string description of the metric.
483    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
484    scale: A float to scale the distrubtion by
485    record_on_exception: Whether to record metrics if an exception is raised.
486    add_exception_field: Whether to add a BooleanField('encountered_exception')
487        to the FieldSpec provided, and set its value to True iff an exception
488        was raised in the context.
489  """
490  def decorator(fn):
491    @wraps(fn)
492    def wrapper(*args, **kwargs):
493      with SecondsTimer(name, fields=fields, description=description,
494                        field_spec=field_spec, scale=scale,
495                        record_on_exception=record_on_exception,
496                        add_exception_field=add_exception_field):
497        return fn(*args, **kwargs)
498
499    return wrapper
500
501  return decorator
502
503
504@contextlib.contextmanager
505def SecondsInstanceTimer(name, fields=None, description=None,
506                         field_spec=_MISSING, record_on_exception=True,
507                         add_exception_field=False):
508  """Record the time of an operation to a FloatMetric.
509
510  Records the time taken inside of the context block, to the
511  Float metric named |name|, with the given fields.  This is
512  a non-cumulative metric; this represents the absolute time
513  taken for a specific block.  The duration is stored in a float
514  to provide flexibility in the future for higher accuracy.
515
516  Examples:
517    # Time the doSomething() call, with field values that are independent of the
518    # results of the operation.
519    with SecondsInstanceTimer('timer/name', fields={'foo': 'bar'},
520                              description='My timer',
521                              field_spec=[ts_mon.StringField('foo'),
522                                          ts_mon.BooleanField('success')]):
523      doSomething()
524
525    # Time the doSomethingElse call, with field values that depend on the
526    # results of that operation. Note that it is important that a default value
527    # is specified for these fields, in case an exception is thrown by
528    # doSomethingElse()
529    f = {'success': False, 'foo': 'bar'}
530    with SecondsInstanceTimer('timer/name', fields=f, description='My timer',
531                              field_spec=[ts_mon.StringField('foo')]) as c:
532      doSomethingElse()
533      c['success'] = True
534
535    # Incorrect Usage!
536    with SecondsInstanceTimer('timer/name', description='My timer') as c:
537      doSomething()
538      c['foo'] = bar # 'foo' is not a valid field, because no default
539                     # value for it was specified in the context constructor.
540                     # It will be silently ignored.
541
542  Args:
543    name: The name of the metric to create
544    fields: The fields of the metric to create.
545    description: A string description of the metric.
546    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
547    record_on_exception: Whether to record metrics if an exception is raised.
548    add_exception_field: Whether to add a BooleanField('encountered_exception')
549        to the FieldSpec provided, and set its value to True iff an exception
550        was raised in the context.
551
552  Yields:
553    Float based metric measing the duration of execution.
554  """
555  if field_spec is not None and field_spec is not _MISSING:
556    field_spec.append(ts_mon.BooleanField('encountered_exception'))
557
558  m = FloatMetric(name, description=description, field_spec=field_spec)
559  f = dict(fields or {})
560  keys = list(f)
561  t0 = _GetSystemClock()
562
563  error = True
564  try:
565    yield f
566    error = False
567  finally:
568    if record_on_exception and add_exception_field:
569      keys.append('encountered_exception')
570      f.setdefault('encountered_exception', error)
571    # Filter out keys that were not part of the initial key set. This is to
572    # avoid inconsistent fields.
573    # TODO(akeshet): Doing this filtering isn't super efficient. Would be better
574    # to implement some key-restricted subclass or wrapper around dict, and just
575    # yield that above rather than yielding a regular dict.
576    if record_on_exception or not error:
577      dt = _GetSystemClock() - t0
578      m.set(dt, fields={k: f[k] for k in keys})
579
580
581def SecondsInstanceTimerDecorator(name, fields=None, description=None,
582                                  field_spec=_MISSING,
583                                  record_on_exception=True,
584                                  add_exception_field=False):
585  """Decorator to time the gauge duration of function calls.
586
587  Examples:
588    @SecondsInstanceTimerDecorator('timer/name', fields={'foo': 'bar'},
589                                   description='My timer',
590                                   field_spec=[ts_mon.StringField('foo'),
591                                               ts_mon.BooleanField('success')]):
592
593    def Foo(bar):
594      return doStuff()
595
596    is equivalent to
597
598    def Foo(bar):
599      with SecondsInstanceTimer('timer/name', fields={'foo': 'bar'},
600                                description='My timer',
601                                field_spec=[ts_mon.StringField('foo'),
602                                            ts_mon.BooleanField('success')]):
603        return doStuff()
604
605  Args:
606    name: The name of the metric to create
607    fields: The fields of the metric to create
608    description: A string description of the metric.
609    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
610    record_on_exception: Whether to record metrics if an exception is raised.
611    add_exception_field: Whether to add a BooleanField('encountered_exception')
612        to the FieldSpec provided, and set its value to True iff an exception
613        was raised in the context.
614
615  Returns:
616    A SecondsInstanceTimer metric decorator.
617  """
618  def decorator(fn):
619    @wraps(fn)
620    def wrapper(*args, **kwargs):
621      with SecondsInstanceTimer(name, fields=fields, description=description,
622                                field_spec=field_spec,
623                                record_on_exception=record_on_exception,
624                                add_exception_field=add_exception_field):
625        return fn(*args, **kwargs)
626
627    return wrapper
628
629  return decorator
630
631
632@contextlib.contextmanager
633def SuccessCounter(name, fields=None, description=None, field_spec=_MISSING):
634  """Create a counter that tracks if something succeeds.
635
636  Args:
637    name: The name of the metric to create
638    fields: The fields of the metric
639    description: A string description of the metric.
640    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
641  """
642  c = Counter(name)
643  f = fields or {}
644  f = f.copy()
645  # We add in the additional field success.
646  keys = list(f) + ['success']
647  success = False
648  try:
649    yield f
650    success = True
651  finally:
652    f.setdefault('success', success)
653    f = {k: f[k] for k in keys}
654    c.increment(fields=f)
655
656
657@contextlib.contextmanager
658def Presence(name, fields=None, description=None, field_spec=_MISSING):
659  """A counter of 'active' things.
660
661  This keeps track of how many name's are active at any given time. However,
662  it's only suitable for long running tasks, since the initial true value may
663  never be written out if the task doesn't run for at least a minute.
664  """
665  b = Boolean(name, description=None, field_spec=field_spec)
666  b.set(True, fields=fields)
667  try:
668    yield
669  finally:
670    b.set(False, fields=fields)
671
672
673class RuntimeBreakdownTimer(object):
674  """Record the time of an operation and the breakdown into sub-steps.
675
676  Examples:
677    with RuntimeBreakdownTimer('timer/name', fields={'foo':'bar'},
678                               description='My timer',
679                               field_spec=[ts_mon.StringField('foo')]) as timer:
680      with timer.Step('first_step'):
681        doFirstStep()
682      with timer.Step('second_step'):
683        doSecondStep()
684      # The time spent next will show up under .../timer/name/breakdown_no_step
685      doSomeNonStepWork()
686
687  This will emit the following metrics:
688  - .../timer/name/total_duration - A CumulativeSecondsDistribution metric for
689        the time spent inside the outer with block.
690  - .../timer/name/breakdown/first_step and
691    .../timer/name/breakdown/second_step - PercentageDistribution metrics for
692        the fraction of time devoted to each substep.
693  - .../timer/name/breakdown_unaccounted - PercentageDistribution metric for the
694        fraction of time that is not accounted for in any of the substeps.
695  - .../timer/name/bucketing_loss - PercentageDistribution metric buckets values
696        before reporting them as distributions. This causes small errors in the
697        reported values because they are rounded to the reported buckets lower
698        bound. This is a CumulativeMetric measuring the total rounding error
699        accrued in reporting all the percentages. The worst case bucketing loss
700        for x steps is (x+1)/10. So, if you time across 9 steps, you should
701        expect no more than 1% rounding error.
702  [experimental]
703  - .../timer/name/duration_breakdown - A Float metric, with one stream per Step
704        indicating the ratio of time spent in that step. The different steps are
705        differentiated via a field with key 'step_name'. Since some of the time
706        can be spent outside any steps, these ratios will sum to <= 1.
707
708  NB: This helper can only be used if the field values are known at the
709  beginning of the outer context and do not change as a result of any of the
710  operations timed.
711  """
712
713  PERCENT_BUCKET_COUNT = 1000
714
715  _StepMetrics = collections.namedtuple('_StepMetrics', ['name', 'time_s'])
716
717  def __init__(self, name, fields=None, description=None, field_spec=_MISSING):
718    self._name = name
719    self._fields = fields
720    self._field_spec = field_spec
721    self._description = description
722    self._outer_t0 = None
723    self._total_time_s = 0
724    self._inside_step = False
725    self._step_metrics = []
726
727  def __enter__(self):
728    self._outer_t0 = _GetSystemClock()
729    return self
730
731  def __exit__(self, _type, _value, _traceback):
732    self._RecordTotalTime()
733
734    outer_timer = CumulativeSecondsDistribution(
735        '%s/total_duration' % (self._name,),
736        field_spec=self._field_spec,
737        description=self._description)
738    outer_timer.add(self._total_time_s, fields=self._fields)
739
740    for name, percent in self._GetStepBreakdowns().items():
741      step_metric = PercentageDistribution(
742          '%s/breakdown/%s' % (self._name, name),
743          num_buckets=self.PERCENT_BUCKET_COUNT,
744          field_spec=self._field_spec,
745          description=self._description)
746      step_metric.add(percent, fields=self._fields)
747
748      fields = dict(self._fields) if self._fields is not None else dict()
749      fields['step_name'] = name
750      # TODO(pprabhu): Convert _GetStepBreakdowns() to return ratios instead of
751      # percentage when the old PercentageDistribution reporting is deleted.
752      Float('%s/duration_breakdown' % self._name).set(percent / 100,
753                                                      fields=fields)
754
755    unaccounted_metric = PercentageDistribution(
756        '%s/breakdown_unaccounted' % self._name,
757        num_buckets=self.PERCENT_BUCKET_COUNT,
758        field_spec=self._field_spec,
759        description=self._description)
760    unaccounted_metric.add(self._GetUnaccountedBreakdown(), fields=self._fields)
761
762    bucketing_loss_metric = CumulativeMetric(
763        '%s/bucketing_loss' % self._name,
764        field_spec=self._field_spec,
765        description=self._description)
766    bucketing_loss_metric.increment_by(self._GetBucketingLoss(),
767                                       fields=self._fields)
768
769  @contextlib.contextmanager
770  def Step(self, step_name):
771    """Start a new step named step_name in the timed operation.
772
773    Note that it is not possible to start a step inside a step. i.e.,
774
775    with RuntimeBreakdownTimer('timer') as timer:
776      with timer.Step('outer_step'):
777        with timer.Step('inner_step'):
778          # will by design raise an exception.
779
780    Args:
781      step_name: The name of the step being timed.
782    """
783    if self._inside_step:
784      logging.error('RuntimeBreakdownTimer.Step is not reentrant. '
785                    'Dropping step: %s', step_name)
786      yield
787      return
788
789    self._inside_step = True
790    t0 = _GetSystemClock()
791    try:
792      yield
793    finally:
794      self._inside_step = False
795      step_time_s = _GetSystemClock() - t0
796      # TODO(ayatane): Handle backward clock jumps.  See _GetSystemClock.
797      step_time_s = max(0, step_time_s)
798      self._step_metrics.append(self._StepMetrics(step_name, step_time_s))
799
800  def _GetStepBreakdowns(self):
801    """Returns percentage of time spent in each step.
802
803    Must be called after |_RecordTotalTime|.
804    """
805    if not self._total_time_s:
806      return {}
807    return {x.name: (x.time_s * 100) / self._total_time_s
808            for x in self._step_metrics}
809
810  def _GetUnaccountedBreakdown(self):
811    """Returns the percentage time spent outside of all steps.
812
813    Must be called after |_RecordTotalTime|.
814    """
815    breakdown_percentages = sum(self._GetStepBreakdowns().values())
816    return max(0, 100 - breakdown_percentages)
817
818  def _GetBucketingLoss(self):
819    """Compute the actual loss in reported percentages due to bucketing.
820
821    Must be called after |_RecordTotalTime|.
822    """
823    reported = list(self._GetStepBreakdowns().values())
824    reported.append(self._GetUnaccountedBreakdown())
825    bucket_width = 100 / self.PERCENT_BUCKET_COUNT
826    return sum(x % bucket_width for x in reported)
827
828  def _RecordTotalTime(self):
829    self._total_time_s = _GetSystemClock() - self._outer_t0
830    # TODO(ayatane): Handle backward clock jumps.  See _GetSystemClock.
831    self._total_time_s = max(0, self._total_time_s)
832
833
834def _GetSystemClock():
835  """Return a clock time.
836
837  The only thing that the return value can be used for is to subtract from
838  other instances to determine time elapsed.
839  """
840  # TODO(ayatane): We should use a monotonic clock to measure this,
841  # but Python 2 does not have one.
842  return time.time()
843
844
845def Flush(reset_after=()):
846  """Flushes metrics, but warns on transient errors.
847
848  Args:
849    reset_after: A list of metrics to reset after flushing.
850  """
851  if not ts_mon:
852    return
853
854  try:
855    ts_mon.flush()
856    while reset_after:
857      reset_after.pop().reset()
858  except ssl.SSLError as e:
859    logging.warning('Caught transient network error while flushing: %s', e)
860  except Exception as e:
861    logging.error('Caught exception while flushing: %s', e)
862