1# -*- coding: utf-8 -*- 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Wrapper library around ts_mon. 7 8This library provides some wrapper functionality around ts_mon, to make it more 9friendly to developers. It also provides import safety, in case ts_mon is not 10deployed with your code. 11""" 12 13from __future__ import division 14from __future__ import print_function 15 16import collections 17import contextlib 18import ssl 19import time 20from functools import wraps 21 22import six 23from six.moves import queue as Queue 24 25from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging 26 27try: 28 from infra_libs import ts_mon 29except (ImportError, RuntimeError): 30 ts_mon = None 31 32 33# This number is chosen because 1.16^100 seconds is about 34# 32 days. This is a good compromise between bucket size 35# and dynamic range. 36_SECONDS_BUCKET_FACTOR = 1.16 37 38# If none, we create metrics in this process. Otherwise, we send metrics via 39# this Queue to a dedicated flushing processes. 40# These attributes are set by chromite.lib.ts_mon_config.SetupTsMonGlobalState. 41FLUSHING_PROCESS = None 42MESSAGE_QUEUE = None 43 44_MISSING = object() 45 46MetricCall = collections.namedtuple('MetricCall', [ 47 'metric_name', 'metric_args', 'metric_kwargs', 48 'method', 'method_args', 'method_kwargs', 49 'reset_after' 50]) 51 52 53def _FlushingProcessClosed(): 54 """Returns whether the metrics flushing process has been closed.""" 55 return (FLUSHING_PROCESS is not None and 56 FLUSHING_PROCESS.exitcode is not None) 57 58 59class ProxyMetric(object): 60 """Redirects any method calls to the message queue.""" 61 def __init__(self, metric, metric_args, metric_kwargs): 62 self.metric = metric 63 self.metric_args = metric_args 64 self.reset_after = metric_kwargs.pop('reset_after', False) 65 self.metric_kwargs = metric_kwargs 66 67 def __getattr__(self, method_name): 68 """Redirects all method calls to the MESSAGE_QUEUE.""" 69 def enqueue(*args, **kwargs): 70 if not _FlushingProcessClosed(): 71 try: 72 MESSAGE_QUEUE.put_nowait( 73 MetricCall( 74 metric_name=self.metric, 75 metric_args=self.metric_args, 76 metric_kwargs=self.metric_kwargs, 77 method=method_name, 78 method_args=args, 79 method_kwargs=kwargs, 80 reset_after=self.reset_after)) 81 except Queue.Full: 82 logging.warning( 83 "Metrics queue is full; skipped sending metric '%s'", 84 self.metric) 85 else: 86 try: 87 exit_code = FLUSHING_PROCESS.exitcode 88 except AttributeError: 89 exit_code = None 90 logging.warning( 91 'Flushing process has been closed (exit code %s),' 92 " skipped sending metric '%s'", 93 exit_code, 94 self.metric) 95 96 return enqueue 97 98 99def _Indirect(fn): 100 """Decorates a function to be indirect If MESSAGE_QUEUE is set. 101 102 If MESSAGE_QUEUE is set, the indirect function will return a proxy metrics 103 object; otherwise, it behaves normally. 104 """ 105 @wraps(fn) 106 def AddToQueueIfPresent(*args, **kwargs): 107 if MESSAGE_QUEUE: 108 return ProxyMetric(fn.__name__, args, kwargs) 109 else: 110 # Whether to reset the metric after the flush; this is only used by 111 # |ProxyMetric|, so remove this from the kwargs. 112 kwargs.pop('reset_after', None) 113 return fn(*args, **kwargs) 114 return AddToQueueIfPresent 115 116 117class MockMetric(object): 118 """Mock metric object, to be returned if ts_mon is not set up.""" 119 120 def _mock_method(self, *args, **kwargs): 121 pass 122 123 def __getattr__(self, _): 124 return self._mock_method 125 126 127def _ImportSafe(fn): 128 """Decorator which causes |fn| to return MockMetric if ts_mon not imported.""" 129 @wraps(fn) 130 def wrapper(*args, **kwargs): 131 if ts_mon: 132 return fn(*args, **kwargs) 133 else: 134 return MockMetric() 135 136 return wrapper 137 138 139class FieldSpecAdapter(object): 140 """Infers the types of fields values to work around field_spec requirement. 141 142 See: https://chromium-review.googlesource.com/c/432120/ for the change 143 which added a required field_spec argument. This class is a temporary 144 workaround to allow inferring the field_spec if is not provided. 145 """ 146 FIELD_CLASSES = {} if ts_mon is None else { 147 bool: ts_mon.BooleanField, 148 int: ts_mon.IntegerField, 149 str: ts_mon.StringField, 150 six.text_type: ts_mon.StringField, 151 } 152 153 def __init__(self, metric_cls, *args, **kwargs): 154 self._metric_cls = metric_cls 155 self._args = args 156 self._kwargs = kwargs 157 self._instance = _MISSING 158 159 def __getattr__(self, prop): 160 """Return a wrapper which constructs the metric object on demand. 161 162 Args: 163 prop: The property name 164 165 Returns: 166 If self._instance has been created, the instance's .|prop| property, 167 otherwise, a wrapper function which creates the ._instance and then 168 calls the |prop| method on the instance. 169 """ 170 if self._instance is not _MISSING: 171 return getattr(self._instance, prop) 172 173 def func(*args, **kwargs): 174 if self._instance is not _MISSING: 175 return getattr(self._instance, prop)(*args, **kwargs) 176 fields = FieldSpecAdapter._InferFields(prop, args, kwargs) 177 self._kwargs['field_spec'] = FieldSpecAdapter._InferFieldSpec(fields) 178 self._instance = self._metric_cls(*self._args, **self._kwargs) 179 return getattr(self._instance, prop)(*args, **kwargs) 180 181 func.__name__ = prop 182 return func 183 184 @staticmethod 185 def _InferFields(method_name, args, kwargs): 186 """Infers the fields argument. 187 188 Args: 189 method_name: The method called. 190 args: The args list 191 kwargs: The keyword args 192 """ 193 if 'fields' in kwargs: 194 return kwargs['fields'] 195 196 if method_name == 'increment' and args: 197 return args[0] 198 199 if len(args) >= 2: 200 return args[1] 201 202 @staticmethod 203 def _InferFieldSpec(fields): 204 """Infers the fields types from the given fields. 205 206 Args: 207 fields: A dictionary with metric fields. 208 """ 209 if not fields or not ts_mon: 210 return None 211 212 return [FieldSpecAdapter.FIELD_CLASSES[type(v)](field) 213 for (field, v) in sorted(fields.items())] 214 215 216def _OptionalFieldSpec(fn): 217 """Decorates a function to allow an optional description and field_spec.""" 218 @wraps(fn) 219 def wrapper(*args, **kwargs): 220 kwargs = dict(**kwargs) # It's bad practice to mutate **kwargs 221 # Slightly different than .setdefault, this line sets a default even when 222 # the key is present (as long as the value is not truthy). Empty or None is 223 # not allowed for descriptions. 224 kwargs['description'] = kwargs.get('description') or 'No description.' 225 if 'field_spec' in kwargs and kwargs['field_spec'] is not _MISSING: 226 return fn(*args, **kwargs) 227 else: 228 return FieldSpecAdapter(fn, *args, **kwargs) 229 return wrapper 230 231 232def _Metric(fn): 233 """A pipeline of decorators to apply to our metric constructors.""" 234 return _OptionalFieldSpec(_ImportSafe(_Indirect(fn))) 235 236 237# This is needed for the reset_after flag used by @Indirect. 238# pylint: disable=unused-argument 239 240@_Metric 241def CounterMetric(name, reset_after=False, description=None, 242 field_spec=_MISSING, start_time=None): 243 """Returns a metric handle for a counter named |name|.""" 244 return ts_mon.CounterMetric(name, 245 description=description, field_spec=field_spec, 246 start_time=start_time) 247Counter = CounterMetric 248 249 250@_Metric 251def GaugeMetric(name, reset_after=False, description=None, field_spec=_MISSING): 252 """Returns a metric handle for a gauge named |name|.""" 253 return ts_mon.GaugeMetric(name, description=description, 254 field_spec=field_spec) 255Gauge = GaugeMetric 256 257 258@_Metric 259def CumulativeMetric(name, reset_after=False, description=None, 260 field_spec=_MISSING): 261 """Returns a metric handle for a cumulative float named |name|.""" 262 return ts_mon.CumulativeMetric(name, description=description, 263 field_spec=field_spec) 264 265 266@_Metric 267def StringMetric(name, reset_after=False, description=None, 268 field_spec=_MISSING): 269 """Returns a metric handle for a string named |name|.""" 270 return ts_mon.StringMetric(name, description=description, 271 field_spec=field_spec) 272String = StringMetric 273 274 275@_Metric 276def BooleanMetric(name, reset_after=False, description=None, 277 field_spec=_MISSING): 278 """Returns a metric handle for a boolean named |name|.""" 279 return ts_mon.BooleanMetric(name, description=description, 280 field_spec=field_spec) 281Boolean = BooleanMetric 282 283 284@_Metric 285def FloatMetric(name, reset_after=False, description=None, field_spec=_MISSING): 286 """Returns a metric handle for a float named |name|.""" 287 return ts_mon.FloatMetric(name, description=description, 288 field_spec=field_spec) 289Float = FloatMetric 290 291 292@_Metric 293def CumulativeDistributionMetric(name, reset_after=False, description=None, 294 bucketer=None, field_spec=_MISSING): 295 """Returns a metric handle for a cumulative distribution named |name|.""" 296 return ts_mon.CumulativeDistributionMetric( 297 name, description=description, bucketer=bucketer, field_spec=field_spec) 298CumulativeDistribution = CumulativeDistributionMetric 299 300 301@_Metric 302def DistributionMetric(name, reset_after=False, description=None, 303 bucketer=None, field_spec=_MISSING): 304 """Returns a metric handle for a distribution named |name|.""" 305 return ts_mon.NonCumulativeDistributionMetric( 306 name, description=description, bucketer=bucketer, field_spec=field_spec) 307Distribution = DistributionMetric 308 309 310@_Metric 311def CumulativeSmallIntegerDistribution(name, reset_after=False, 312 description=None, field_spec=_MISSING): 313 """Returns a metric handle for a cumulative distribution named |name|. 314 315 This differs slightly from CumulativeDistribution, in that the underlying 316 metric uses a uniform bucketer rather than a geometric one. 317 318 This metric type is suitable for holding a distribution of numbers that are 319 nonnegative integers in the range of 0 to 100. 320 """ 321 return ts_mon.CumulativeDistributionMetric( 322 name, 323 bucketer=ts_mon.FixedWidthBucketer(1), 324 description=description, 325 field_spec=field_spec) 326 327 328@_Metric 329def CumulativeSecondsDistribution(name, scale=1, reset_after=False, 330 description=None, field_spec=_MISSING): 331 """Returns a metric handle for a cumulative distribution named |name|. 332 333 The distribution handle returned by this method is better suited than the 334 default one for recording handling times, in seconds. 335 336 This metric handle has bucketing that is optimized for time intervals 337 (in seconds) in the range of 1 second to 32 days. Use |scale| to adjust this 338 (e.g. scale=0.1 covers a range from .1 seconds to 3.2 days). 339 340 Args: 341 name: string name of metric 342 scale: scaling factor of buckets, and size of the first bucket. default: 1 343 reset_after: Should the metric be reset after reporting. 344 description: A string description of the metric. 345 field_spec: A sequence of ts_mon.Field objects to specify the field schema. 346 """ 347 b = ts_mon.GeometricBucketer(growth_factor=_SECONDS_BUCKET_FACTOR, 348 scale=scale) 349 return ts_mon.CumulativeDistributionMetric( 350 name, bucketer=b, units=ts_mon.MetricsDataUnits.SECONDS, 351 description=description, field_spec=field_spec) 352 353SecondsDistribution = CumulativeSecondsDistribution 354 355 356@_Metric 357def PercentageDistribution( 358 name, num_buckets=1000, reset_after=False, 359 description=None, field_spec=_MISSING): 360 """Returns a metric handle for a cumulative distribution for percentage. 361 362 The distribution handle returned by this method is better suited for reporting 363 percentage values than the default one. The bucketing is optimized for values 364 in [0,100]. 365 366 Args: 367 name: The name of this metric. 368 num_buckets: This metric buckets the percentage values before 369 reporting. This argument controls the number of the bucket the range 370 [0,100] is divided in. The default gives you 0.1% resolution. 371 reset_after: Should the metric be reset after reporting. 372 description: A string description of the metric. 373 field_spec: A sequence of ts_mon.Field objects to specify the field schema. 374 """ 375 # The last bucket actually covers [100, 100 + 1.0/num_buckets), so it 376 # corresponds to values that exactly match 100%. 377 bucket_width = 100 / num_buckets 378 b = ts_mon.FixedWidthBucketer(bucket_width, num_buckets) 379 return ts_mon.CumulativeDistributionMetric( 380 name, bucketer=b, 381 description=description, field_spec=field_spec) 382 383 384@contextlib.contextmanager 385def SecondsTimer(name, fields=None, description=None, field_spec=_MISSING, 386 scale=1, record_on_exception=True, add_exception_field=False): 387 """Record the time of an operation to a CumulativeSecondsDistributionMetric. 388 389 Records the time taken inside of the context block, to the 390 CumulativeSecondsDistribution named |name|, with the given fields. 391 392 Examples: 393 # Time the doSomething() call, with field values that are independent of the 394 # results of the operation. 395 with SecondsTimer('timer/name', fields={'foo': 'bar'}, 396 description='My timer', 397 field_spec=[ts_mon.StringField('foo'), 398 ts_mon.BooleanField('success')]): 399 doSomething() 400 401 # Time the doSomethingElse call, with field values that depend on the 402 # results of that operation. Note that it is important that a default value 403 # is specified for these fields, in case an exception is thrown by 404 # doSomethingElse() 405 f = {'success': False, 'foo': 'bar'} 406 with SecondsTimer('timer/name', fields=f, description='My timer', 407 field_spec=[ts_mon.StringField('foo')]) as c: 408 doSomethingElse() 409 c['success'] = True 410 411 # Incorrect Usage! 412 with SecondsTimer('timer/name', description='My timer') as c: 413 doSomething() 414 c['foo'] = bar # 'foo' is not a valid field, because no default 415 # value for it was specified in the context constructor. 416 # It will be silently ignored. 417 418 Args: 419 name: The name of the metric to create 420 fields: The fields of the metric to create. 421 description: A string description of the metric. 422 field_spec: A sequence of ts_mon.Field objects to specify the field schema. 423 scale: A float to scale the CumulativeSecondsDistribution buckets by. 424 record_on_exception: Whether to record metrics if an exception is raised. 425 add_exception_field: Whether to add a BooleanField('encountered_exception') 426 to the FieldSpec provided, and set its value to True iff an exception 427 was raised in the context. 428 """ 429 if field_spec is not None and field_spec is not _MISSING: 430 field_spec.append(ts_mon.BooleanField('encountered_exception')) 431 432 m = CumulativeSecondsDistribution( 433 name, scale=scale, description=description, field_spec=field_spec) 434 f = fields or {} 435 f = dict(f) 436 keys = list(f) 437 t0 = _GetSystemClock() 438 439 error = True 440 try: 441 yield f 442 error = False 443 finally: 444 if record_on_exception and add_exception_field: 445 keys.append('encountered_exception') 446 f.setdefault('encountered_exception', error) 447 # Filter out keys that were not part of the initial key set. This is to 448 # avoid inconsistent fields. 449 # TODO(akeshet): Doing this filtering isn't super efficient. Would be better 450 # to implement some key-restricted subclass or wrapper around dict, and just 451 # yield that above rather than yielding a regular dict. 452 if record_on_exception or not error: 453 dt = _GetSystemClock() - t0 454 # TODO(ayatane): Handle backward clock jumps. See _GetSystemClock. 455 if dt >= 0: 456 m.add(dt, fields={k: f[k] for k in keys}) 457 458 459def SecondsTimerDecorator(name, fields=None, description=None, 460 field_spec=_MISSING, scale=1, 461 record_on_exception=True, add_exception_field=False): 462 """Decorator to time the duration of function calls. 463 464 Examples: 465 @SecondsTimerDecorator('timer/name', fields={'foo': 'bar'}, 466 description='My timer', 467 field_spec=[ts_mon.StringField('foo')]) 468 def Foo(bar): 469 return doStuff() 470 471 is equivalent to 472 473 def Foo(bar): 474 with SecondsTimer('timer/name', fields={'foo': 'bar'}, 475 description='My timer', 476 field_spec=[ts_mon.StringField('foo')]) 477 return doStuff() 478 479 Args: 480 name: The name of the metric to create 481 fields: The fields of the metric to create 482 description: A string description of the metric. 483 field_spec: A sequence of ts_mon.Field objects to specify the field schema. 484 scale: A float to scale the distrubtion by 485 record_on_exception: Whether to record metrics if an exception is raised. 486 add_exception_field: Whether to add a BooleanField('encountered_exception') 487 to the FieldSpec provided, and set its value to True iff an exception 488 was raised in the context. 489 """ 490 def decorator(fn): 491 @wraps(fn) 492 def wrapper(*args, **kwargs): 493 with SecondsTimer(name, fields=fields, description=description, 494 field_spec=field_spec, scale=scale, 495 record_on_exception=record_on_exception, 496 add_exception_field=add_exception_field): 497 return fn(*args, **kwargs) 498 499 return wrapper 500 501 return decorator 502 503 504@contextlib.contextmanager 505def SecondsInstanceTimer(name, fields=None, description=None, 506 field_spec=_MISSING, record_on_exception=True, 507 add_exception_field=False): 508 """Record the time of an operation to a FloatMetric. 509 510 Records the time taken inside of the context block, to the 511 Float metric named |name|, with the given fields. This is 512 a non-cumulative metric; this represents the absolute time 513 taken for a specific block. The duration is stored in a float 514 to provide flexibility in the future for higher accuracy. 515 516 Examples: 517 # Time the doSomething() call, with field values that are independent of the 518 # results of the operation. 519 with SecondsInstanceTimer('timer/name', fields={'foo': 'bar'}, 520 description='My timer', 521 field_spec=[ts_mon.StringField('foo'), 522 ts_mon.BooleanField('success')]): 523 doSomething() 524 525 # Time the doSomethingElse call, with field values that depend on the 526 # results of that operation. Note that it is important that a default value 527 # is specified for these fields, in case an exception is thrown by 528 # doSomethingElse() 529 f = {'success': False, 'foo': 'bar'} 530 with SecondsInstanceTimer('timer/name', fields=f, description='My timer', 531 field_spec=[ts_mon.StringField('foo')]) as c: 532 doSomethingElse() 533 c['success'] = True 534 535 # Incorrect Usage! 536 with SecondsInstanceTimer('timer/name', description='My timer') as c: 537 doSomething() 538 c['foo'] = bar # 'foo' is not a valid field, because no default 539 # value for it was specified in the context constructor. 540 # It will be silently ignored. 541 542 Args: 543 name: The name of the metric to create 544 fields: The fields of the metric to create. 545 description: A string description of the metric. 546 field_spec: A sequence of ts_mon.Field objects to specify the field schema. 547 record_on_exception: Whether to record metrics if an exception is raised. 548 add_exception_field: Whether to add a BooleanField('encountered_exception') 549 to the FieldSpec provided, and set its value to True iff an exception 550 was raised in the context. 551 552 Yields: 553 Float based metric measing the duration of execution. 554 """ 555 if field_spec is not None and field_spec is not _MISSING: 556 field_spec.append(ts_mon.BooleanField('encountered_exception')) 557 558 m = FloatMetric(name, description=description, field_spec=field_spec) 559 f = dict(fields or {}) 560 keys = list(f) 561 t0 = _GetSystemClock() 562 563 error = True 564 try: 565 yield f 566 error = False 567 finally: 568 if record_on_exception and add_exception_field: 569 keys.append('encountered_exception') 570 f.setdefault('encountered_exception', error) 571 # Filter out keys that were not part of the initial key set. This is to 572 # avoid inconsistent fields. 573 # TODO(akeshet): Doing this filtering isn't super efficient. Would be better 574 # to implement some key-restricted subclass or wrapper around dict, and just 575 # yield that above rather than yielding a regular dict. 576 if record_on_exception or not error: 577 dt = _GetSystemClock() - t0 578 m.set(dt, fields={k: f[k] for k in keys}) 579 580 581def SecondsInstanceTimerDecorator(name, fields=None, description=None, 582 field_spec=_MISSING, 583 record_on_exception=True, 584 add_exception_field=False): 585 """Decorator to time the gauge duration of function calls. 586 587 Examples: 588 @SecondsInstanceTimerDecorator('timer/name', fields={'foo': 'bar'}, 589 description='My timer', 590 field_spec=[ts_mon.StringField('foo'), 591 ts_mon.BooleanField('success')]): 592 593 def Foo(bar): 594 return doStuff() 595 596 is equivalent to 597 598 def Foo(bar): 599 with SecondsInstanceTimer('timer/name', fields={'foo': 'bar'}, 600 description='My timer', 601 field_spec=[ts_mon.StringField('foo'), 602 ts_mon.BooleanField('success')]): 603 return doStuff() 604 605 Args: 606 name: The name of the metric to create 607 fields: The fields of the metric to create 608 description: A string description of the metric. 609 field_spec: A sequence of ts_mon.Field objects to specify the field schema. 610 record_on_exception: Whether to record metrics if an exception is raised. 611 add_exception_field: Whether to add a BooleanField('encountered_exception') 612 to the FieldSpec provided, and set its value to True iff an exception 613 was raised in the context. 614 615 Returns: 616 A SecondsInstanceTimer metric decorator. 617 """ 618 def decorator(fn): 619 @wraps(fn) 620 def wrapper(*args, **kwargs): 621 with SecondsInstanceTimer(name, fields=fields, description=description, 622 field_spec=field_spec, 623 record_on_exception=record_on_exception, 624 add_exception_field=add_exception_field): 625 return fn(*args, **kwargs) 626 627 return wrapper 628 629 return decorator 630 631 632@contextlib.contextmanager 633def SuccessCounter(name, fields=None, description=None, field_spec=_MISSING): 634 """Create a counter that tracks if something succeeds. 635 636 Args: 637 name: The name of the metric to create 638 fields: The fields of the metric 639 description: A string description of the metric. 640 field_spec: A sequence of ts_mon.Field objects to specify the field schema. 641 """ 642 c = Counter(name) 643 f = fields or {} 644 f = f.copy() 645 # We add in the additional field success. 646 keys = list(f) + ['success'] 647 success = False 648 try: 649 yield f 650 success = True 651 finally: 652 f.setdefault('success', success) 653 f = {k: f[k] for k in keys} 654 c.increment(fields=f) 655 656 657@contextlib.contextmanager 658def Presence(name, fields=None, description=None, field_spec=_MISSING): 659 """A counter of 'active' things. 660 661 This keeps track of how many name's are active at any given time. However, 662 it's only suitable for long running tasks, since the initial true value may 663 never be written out if the task doesn't run for at least a minute. 664 """ 665 b = Boolean(name, description=None, field_spec=field_spec) 666 b.set(True, fields=fields) 667 try: 668 yield 669 finally: 670 b.set(False, fields=fields) 671 672 673class RuntimeBreakdownTimer(object): 674 """Record the time of an operation and the breakdown into sub-steps. 675 676 Examples: 677 with RuntimeBreakdownTimer('timer/name', fields={'foo':'bar'}, 678 description='My timer', 679 field_spec=[ts_mon.StringField('foo')]) as timer: 680 with timer.Step('first_step'): 681 doFirstStep() 682 with timer.Step('second_step'): 683 doSecondStep() 684 # The time spent next will show up under .../timer/name/breakdown_no_step 685 doSomeNonStepWork() 686 687 This will emit the following metrics: 688 - .../timer/name/total_duration - A CumulativeSecondsDistribution metric for 689 the time spent inside the outer with block. 690 - .../timer/name/breakdown/first_step and 691 .../timer/name/breakdown/second_step - PercentageDistribution metrics for 692 the fraction of time devoted to each substep. 693 - .../timer/name/breakdown_unaccounted - PercentageDistribution metric for the 694 fraction of time that is not accounted for in any of the substeps. 695 - .../timer/name/bucketing_loss - PercentageDistribution metric buckets values 696 before reporting them as distributions. This causes small errors in the 697 reported values because they are rounded to the reported buckets lower 698 bound. This is a CumulativeMetric measuring the total rounding error 699 accrued in reporting all the percentages. The worst case bucketing loss 700 for x steps is (x+1)/10. So, if you time across 9 steps, you should 701 expect no more than 1% rounding error. 702 [experimental] 703 - .../timer/name/duration_breakdown - A Float metric, with one stream per Step 704 indicating the ratio of time spent in that step. The different steps are 705 differentiated via a field with key 'step_name'. Since some of the time 706 can be spent outside any steps, these ratios will sum to <= 1. 707 708 NB: This helper can only be used if the field values are known at the 709 beginning of the outer context and do not change as a result of any of the 710 operations timed. 711 """ 712 713 PERCENT_BUCKET_COUNT = 1000 714 715 _StepMetrics = collections.namedtuple('_StepMetrics', ['name', 'time_s']) 716 717 def __init__(self, name, fields=None, description=None, field_spec=_MISSING): 718 self._name = name 719 self._fields = fields 720 self._field_spec = field_spec 721 self._description = description 722 self._outer_t0 = None 723 self._total_time_s = 0 724 self._inside_step = False 725 self._step_metrics = [] 726 727 def __enter__(self): 728 self._outer_t0 = _GetSystemClock() 729 return self 730 731 def __exit__(self, _type, _value, _traceback): 732 self._RecordTotalTime() 733 734 outer_timer = CumulativeSecondsDistribution( 735 '%s/total_duration' % (self._name,), 736 field_spec=self._field_spec, 737 description=self._description) 738 outer_timer.add(self._total_time_s, fields=self._fields) 739 740 for name, percent in self._GetStepBreakdowns().items(): 741 step_metric = PercentageDistribution( 742 '%s/breakdown/%s' % (self._name, name), 743 num_buckets=self.PERCENT_BUCKET_COUNT, 744 field_spec=self._field_spec, 745 description=self._description) 746 step_metric.add(percent, fields=self._fields) 747 748 fields = dict(self._fields) if self._fields is not None else dict() 749 fields['step_name'] = name 750 # TODO(pprabhu): Convert _GetStepBreakdowns() to return ratios instead of 751 # percentage when the old PercentageDistribution reporting is deleted. 752 Float('%s/duration_breakdown' % self._name).set(percent / 100, 753 fields=fields) 754 755 unaccounted_metric = PercentageDistribution( 756 '%s/breakdown_unaccounted' % self._name, 757 num_buckets=self.PERCENT_BUCKET_COUNT, 758 field_spec=self._field_spec, 759 description=self._description) 760 unaccounted_metric.add(self._GetUnaccountedBreakdown(), fields=self._fields) 761 762 bucketing_loss_metric = CumulativeMetric( 763 '%s/bucketing_loss' % self._name, 764 field_spec=self._field_spec, 765 description=self._description) 766 bucketing_loss_metric.increment_by(self._GetBucketingLoss(), 767 fields=self._fields) 768 769 @contextlib.contextmanager 770 def Step(self, step_name): 771 """Start a new step named step_name in the timed operation. 772 773 Note that it is not possible to start a step inside a step. i.e., 774 775 with RuntimeBreakdownTimer('timer') as timer: 776 with timer.Step('outer_step'): 777 with timer.Step('inner_step'): 778 # will by design raise an exception. 779 780 Args: 781 step_name: The name of the step being timed. 782 """ 783 if self._inside_step: 784 logging.error('RuntimeBreakdownTimer.Step is not reentrant. ' 785 'Dropping step: %s', step_name) 786 yield 787 return 788 789 self._inside_step = True 790 t0 = _GetSystemClock() 791 try: 792 yield 793 finally: 794 self._inside_step = False 795 step_time_s = _GetSystemClock() - t0 796 # TODO(ayatane): Handle backward clock jumps. See _GetSystemClock. 797 step_time_s = max(0, step_time_s) 798 self._step_metrics.append(self._StepMetrics(step_name, step_time_s)) 799 800 def _GetStepBreakdowns(self): 801 """Returns percentage of time spent in each step. 802 803 Must be called after |_RecordTotalTime|. 804 """ 805 if not self._total_time_s: 806 return {} 807 return {x.name: (x.time_s * 100) / self._total_time_s 808 for x in self._step_metrics} 809 810 def _GetUnaccountedBreakdown(self): 811 """Returns the percentage time spent outside of all steps. 812 813 Must be called after |_RecordTotalTime|. 814 """ 815 breakdown_percentages = sum(self._GetStepBreakdowns().values()) 816 return max(0, 100 - breakdown_percentages) 817 818 def _GetBucketingLoss(self): 819 """Compute the actual loss in reported percentages due to bucketing. 820 821 Must be called after |_RecordTotalTime|. 822 """ 823 reported = list(self._GetStepBreakdowns().values()) 824 reported.append(self._GetUnaccountedBreakdown()) 825 bucket_width = 100 / self.PERCENT_BUCKET_COUNT 826 return sum(x % bucket_width for x in reported) 827 828 def _RecordTotalTime(self): 829 self._total_time_s = _GetSystemClock() - self._outer_t0 830 # TODO(ayatane): Handle backward clock jumps. See _GetSystemClock. 831 self._total_time_s = max(0, self._total_time_s) 832 833 834def _GetSystemClock(): 835 """Return a clock time. 836 837 The only thing that the return value can be used for is to subtract from 838 other instances to determine time elapsed. 839 """ 840 # TODO(ayatane): We should use a monotonic clock to measure this, 841 # but Python 2 does not have one. 842 return time.time() 843 844 845def Flush(reset_after=()): 846 """Flushes metrics, but warns on transient errors. 847 848 Args: 849 reset_after: A list of metrics to reset after flushing. 850 """ 851 if not ts_mon: 852 return 853 854 try: 855 ts_mon.flush() 856 while reset_after: 857 reset_after.pop().reset() 858 except ssl.SSLError as e: 859 logging.warning('Caught transient network error while flushing: %s', e) 860 except Exception as e: 861 logging.error('Caught exception while flushing: %s', e) 862