# -*- coding: utf-8 -*- # Copyright 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Wrapper for inframon's command-line flag based configuration.""" from __future__ import print_function import argparse import contextlib import multiprocessing import os import socket import signal import time from six.moves import queue as Queue import six from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging from autotest_lib.utils.frozen_chromite.lib import metrics from autotest_lib.utils.frozen_chromite.lib import parallel try: from infra_libs.ts_mon import config from infra_libs.ts_mon import BooleanField from infra_libs.ts_mon import IntegerField from infra_libs.ts_mon import StringField import googleapiclient.discovery except (ImportError, RuntimeError) as e: config = None logging.warning('Failed to import ts_mon, monitoring is disabled: %s', e) _WasSetup = False _CommonMetricFields = {} FLUSH_INTERVAL = 60 @contextlib.contextmanager def TrivialContextManager(): """Context manager with no side effects.""" yield def GetMetricFieldSpec(fields=None): """Return the corresponding field_spec for metric fields. Args: fields: Dictionary containing metric fields. Returns: field_spec: List containing any *Field object associated with metric. """ field_spec = [] if fields: for key, val in fields.items(): if isinstance(val, bool): field_spec.append(BooleanField(key)) elif isinstance(val, int): field_spec.append(IntegerField(key)) elif isinstance(val, six.string_types): field_spec.append(StringField(key)) else: logging.error("Couldn't classify the metric field %s:%s", key, val) return field_spec def AddCommonFields(fields=None, field_spec=None): """Add cbuildbot-wide common fields to a given field set. Args: fields: Dictionary containing metric fields to which common metric fields will be added. field_spec: List containing any *Field object associated with metric. Returns: Dictionary containing complete set of metric fields to be applied to metric and a list of corresponding field_spec. """ metric_fields = (dict(_CommonMetricFields) if _CommonMetricFields else {}) if metric_fields: metric_fields.update(fields or {}) return metric_fields, GetMetricFieldSpec(metric_fields) else: return fields, field_spec def SetupTsMonGlobalState(service_name, indirect=False, suppress_exception=True, short_lived=False, auto_flush=True, common_metric_fields=None, debug_file=None, task_num=0): """Uses a dummy argument parser to get the default behavior from ts-mon. Args: service_name: The name of the task we are sending metrics from. indirect: Whether to create a metrics.METRICS_QUEUE object and a separate process for indirect metrics flushing. Useful for forking, because forking would normally create a duplicate ts_mon thread. suppress_exception: True to silence any exception during the setup. Default is set to True. short_lived: Whether this process is short-lived and should use the autogen hostname prefix. auto_flush: Whether to create a thread to automatically flush metrics every minute. common_metric_fields: Dictionary containing the metric fields that will be added to all metrics. debug_file: If non-none, send metrics to this path instead of to PubSub. task_num: (Default 0) The task_num target field of the metrics to emit. """ if not config: return TrivialContextManager() # The flushing subprocess calls .flush manually. if indirect: auto_flush = False if common_metric_fields: _CommonMetricFields.update(common_metric_fields) # google-api-client has too much noisey logging. options = _GenerateTsMonArgparseOptions( service_name, short_lived, auto_flush, debug_file, task_num) if indirect: return _CreateTsMonFlushingProcess(options) else: _SetupTsMonFromOptions(options, suppress_exception) return TrivialContextManager() def _SetupTsMonFromOptions(options, suppress_exception): """Sets up ts-mon global state given parsed argparse options. Args: options: An argparse options object containing ts-mon flags. suppress_exception: True to silence any exception during the setup. Default is set to True. """ googleapiclient.discovery.logger.setLevel(logging.WARNING) try: config.process_argparse_options(options) logging.notice('ts_mon was set up.') global _WasSetup # pylint: disable=global-statement _WasSetup = True except Exception as e: logging.warning('Failed to configure ts_mon, monitoring is disabled: %s', e, exc_info=True) if not suppress_exception: raise def _GenerateTsMonArgparseOptions(service_name, short_lived, auto_flush, debug_file, task_num): """Generates an arg list for ts-mon to consume. Args: service_name: The name of the task we are sending metrics from. short_lived: Whether this process is short-lived and should use the autogen hostname prefix. auto_flush: Whether to create a thread to automatically flush metrics every minute. debug_file: If non-none, send metrics to this path instead of to PubSub. task_num: Override the default task num of 0. """ parser = argparse.ArgumentParser() config.add_argparse_options(parser) args = [ '--ts-mon-target-type', 'task', '--ts-mon-task-service-name', service_name, '--ts-mon-task-job-name', service_name, ] if debug_file: args.extend(['--ts-mon-endpoint', 'file://' + debug_file]) # Short lived processes will have autogen: prepended to their hostname and # use task-number=PID to trigger shorter retention policies under # chrome-infra@, and used by a Monarch precomputation to group across the # task number. # Furthermore, we assume they manually call ts_mon.Flush(), because the # ts_mon thread will drop messages if the process exits before it flushes. if short_lived: auto_flush = False fqdn = socket.getfqdn().lower() host = fqdn.split('.')[0] args.extend(['--ts-mon-task-hostname', 'autogen:' + host, '--ts-mon-task-number', str(os.getpid())]) elif task_num: args.extend(['--ts-mon-task-number', str(task_num)]) args.extend(['--ts-mon-flush', 'auto' if auto_flush else 'manual']) return parser.parse_args(args=args) @contextlib.contextmanager def _CreateTsMonFlushingProcess(options): """Creates a separate process to flush ts_mon metrics. Useful for multiprocessing scenarios where we don't want multiple ts-mon threads send contradictory metrics. Instead, functions in chromite.lib.metrics will send their calls to a Queue, which is consumed by a dedicated flushing process. Args: options: An argparse options object to configure ts-mon with. Side effects: Sets chromite.lib.metrics.MESSAGE_QUEUE, which causes the metric functions to send their calls to the Queue instead of creating the metrics. """ # If this is nested, we don't need to create another queue and another # message consumer. Do nothing to continue to use the existing queue. if metrics.MESSAGE_QUEUE or metrics.FLUSHING_PROCESS: return with parallel.Manager() as manager: message_q = manager.Queue() metrics.FLUSHING_PROCESS = multiprocessing.Process( target=lambda: _SetupAndConsumeMessages(message_q, options)) metrics.FLUSHING_PROCESS.start() # this makes the chromite.lib.metric functions use the queue. # note - we have to do this *after* forking the ConsumeMessages process. metrics.MESSAGE_QUEUE = message_q try: yield message_q finally: _CleanupMetricsFlushingProcess() def _CleanupMetricsFlushingProcess(): """Sends sentinal value to flushing process and .joins it.""" # Now that there is no longer a process to listen to the Queue, re-set it # to None so that any future metrics are created within this process. message_q = metrics.MESSAGE_QUEUE flushing_process = metrics.FLUSHING_PROCESS metrics.MESSAGE_QUEUE = None metrics.FLUSHING_PROCESS = None # If the process has already died, we don't need to try to clean it up. if not flushing_process.is_alive(): return # Send the sentinal value for "flush one more time and exit". try: message_q.put(None) # If the flushing process quits, the message Queue can become full. except IOError: if not flushing_process.is_alive(): return logging.info('Waiting for ts_mon flushing process to finish...') flushing_process.join(timeout=FLUSH_INTERVAL*2) if flushing_process.is_alive(): flushing_process.terminate() if flushing_process.exitcode: logging.warning('ts_mon_config flushing process did not exit cleanly.') logging.info('Finished waiting for ts_mon process.') def _SetupAndConsumeMessages(message_q, options): """Sets up ts-mon, and starts a MetricConsumer loop. Args: message_q: The metric multiprocessing.Queue to read from. options: An argparse options object to configure ts-mon with. """ # Configure ts-mon, but don't start up a sending thread. _SetupTsMonFromOptions(options, suppress_exception=True) if not _WasSetup: return return MetricConsumer(message_q).Consume() class MetricConsumer(object): """Configures ts_mon and gets metrics from a message queue. This class is meant to be used in a subprocess. It configures itself to receive a SIGHUP signal when the parent process dies, and catches the signal in order to have a chance to flush any pending metrics one more time before quitting. """ def __init__(self, message_q): # If our parent dies, finish flushing before exiting. self.reset_after_flush = [] self.last_flush = 0 self.pending = False self.message_q = message_q if parallel.ExitWithParent(signal.SIGHUP): signal.signal(signal.SIGHUP, lambda _sig, _stack: self._WaitToFlush()) def Consume(self): """Emits metrics from self.message_q, flushing periodically. The loop is terminated by a None entry on the Queue, which is a friendly signal from the parent process that it's time to shut down. Before returning, we wait to flush one more time to make sure that all the metrics were sent. """ message = self.message_q.get() while message: self._CallMetric(message) message = self._WaitForNextMessage() if self.pending: self._WaitToFlush() def _CallMetric(self, message): """Calls the metric method from |message|, ignoring exceptions.""" try: cls = getattr(metrics, message.metric_name) message.method_kwargs.setdefault('fields', {}) message.metric_kwargs.setdefault('field_spec', []) message.method_kwargs['fields'], message.metric_kwargs['field_spec'] = ( AddCommonFields(message.method_kwargs['fields'], message.metric_kwargs['field_spec'])) metric = cls(*message.metric_args, **message.metric_kwargs) if message.reset_after: self.reset_after_flush.append(metric) getattr(metric, message.method)( *message.method_args, **message.method_kwargs) self.pending = True except Exception: logging.exception('Caught an exception while running %s', _MethodCallRepr(message)) def _WaitForNextMessage(self): """Waits for a new message, flushing every |FLUSH_INTERVAL| seconds.""" while True: time_delta = self._FlushIfReady() try: timeout = FLUSH_INTERVAL - time_delta message = self.message_q.get(timeout=timeout) return message except Queue.Empty: pass def _WaitToFlush(self): """Sleeps until the next time we can call metrics.Flush(), then flushes.""" time_delta = time.time() - self.last_flush time.sleep(max(0, FLUSH_INTERVAL - time_delta)) metrics.Flush(reset_after=self.reset_after_flush) def _FlushIfReady(self): """Call metrics.Flush() if we are ready and have pending metrics. This allows us to only call flush every FLUSH_INTERVAL seconds. """ now = time.time() time_delta = now - self.last_flush if time_delta > FLUSH_INTERVAL: self.last_flush = now time_delta = 0 metrics.Flush(reset_after=self.reset_after_flush) self.pending = False return time_delta def _MethodCallRepr(message): """Gives a string representation of |obj|.|method|(*|args|, **|kwargs|) Args: message: A MetricCall object. """ if not message: return repr(message) obj = message.metric_name method = message.method args = message.method_args kwargs = message.method_kwargs args_strings = ([repr(x) for x in args] + [(str(k) + '=' + repr(v)) for k, v in kwargs.items()]) return '%s.%s(%s)' % (repr(obj), method, ', '.join(args_strings))