• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""
6This module is designed to report metadata in a separated thread to avoid the
7performance overhead of sending data to Elasticsearch using HTTP.
8
9"""
10
11import logging
12import Queue
13import socket
14import time
15import threading
16
17import common
18from autotest_lib.client.common_lib import utils
19
20try:
21    from chromite.lib import metrics
22except ImportError:
23    metrics = utils.metrics_mock
24
25
26_METADATA_METRICS_PREFIX = 'chromeos/autotest/es_metadata_reporter/'
27
28# Number of seconds to wait before checking queue again for uploading data.
29_REPORT_INTERVAL_SECONDS = 5
30
31_MAX_METADATA_QUEUE_SIZE = 1000000
32_MAX_UPLOAD_SIZE = 50000
33# The number of seconds for upload to fail continuously. After that, upload will
34# be limited to 1 entry.
35_MAX_UPLOAD_FAIL_DURATION = 600
36# Number of entries to retry when the previous upload failed continueously for
37# the duration of _MAX_UPLOAD_FAIL_DURATION.
38_MIN_RETRY_ENTRIES = 10
39# Queue to buffer metadata to be reported.
40metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE)
41
42_report_lock = threading.Lock()
43_abort = threading.Event()
44_queue_full = threading.Event()
45_metrics_fields = {}
46
47def  _get_metrics_fields():
48    """Get the fields information to be uploaded to metrics."""
49    if not _metrics_fields:
50        _metrics_fields['hostname'] = socket.gethostname()
51
52    return _metrics_fields
53
54
55def queue(data):
56    """Queue metadata to be uploaded in reporter thread.
57
58    If the queue is full, an error will be logged for the first time the queue
59    becomes full. The call does not wait or raise Queue.Full exception, so
60    there is no overhead on the performance of caller, e.g., scheduler.
61
62    @param data: A metadata entry, which should be a dictionary.
63    """
64    if not is_running():
65        return
66
67    try:
68        metadata_queue.put_nowait(data)
69        if _queue_full.is_set():
70            logging.info('Metadata queue is available to receive new data '
71                         'again.')
72            _queue_full.clear()
73    except Queue.Full:
74        if not _queue_full.is_set():
75            _queue_full.set()
76            logging.error('Metadata queue is full, cannot report data. '
77                          'Consider increasing the value of '
78                          '_MAX_METADATA_QUEUE_SIZE. Its current value is set '
79                          'to %d.', _MAX_METADATA_QUEUE_SIZE)
80
81
82def _run():
83    """Report metadata in the queue until being aborted.
84    """
85    # Time when the first time upload failed. None if the last upload succeeded.
86    first_failed_upload = None
87    upload_size = _MIN_RETRY_ENTRIES
88
89    try:
90        while True:
91            start_time = time.time()
92            data_list = []
93            if (first_failed_upload and
94                time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION):
95                upload_size = _MIN_RETRY_ENTRIES
96            else:
97                upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE)
98            while (not metadata_queue.empty() and len(data_list) < upload_size):
99                data_list.append(metadata_queue.get_nowait())
100            if data_list:
101                success = False
102                fields = _get_metrics_fields().copy()
103                fields['success'] = success
104                metrics.Gauge(
105                        _METADATA_METRICS_PREFIX + 'upload/batch_sizes').set(
106                                len(data_list), fields=fields)
107                metrics.Counter(
108                        _METADATA_METRICS_PREFIX + 'upload/attempts').increment(
109                                fields=fields);
110
111            metrics.Gauge(_METADATA_METRICS_PREFIX + 'queue_size').set(
112                    metadata_queue.qsize(), fields=_get_metrics_fields())
113            sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time
114            if sleep_time < 0:
115                sleep_time = 0.5
116            _abort.wait(timeout=sleep_time)
117    except Exception as e:
118        logging.exception('Metadata reporter thread failed with error: %s', e)
119        raise
120    finally:
121        logging.info('Metadata reporting thread is exiting.')
122        _abort.clear()
123        _report_lock.release()
124
125
126def is_running():
127    """Check if metadata_reporter is running.
128
129    @return: True if metadata_reporter is running.
130    """
131    return _report_lock.locked()
132
133
134def start():
135    """Start the thread to report metadata.
136    """
137    # The lock makes sure there is only one reporting thread working.
138    if is_running():
139        logging.error('There is already a metadata reporter thread.')
140        return
141
142    logging.warn('Elasticsearch db deprecated, no metadata will be '
143                 'reported.')
144
145    _report_lock.acquire()
146    reporting_thread = threading.Thread(target=_run)
147    # Make it a daemon thread so it doesn't need to be closed explicitly.
148    reporting_thread.setDaemon(True)
149    reporting_thread.start()
150    logging.info('Metadata reporting thread is started.')
151
152
153def abort():
154    """Abort the thread to report metadata.
155
156    The call will wait up to 5 seconds for existing data to be uploaded.
157    """
158    if  not is_running():
159        logging.error('The metadata reporting thread has already exited.')
160        return
161
162    _abort.set()
163    logging.info('Waiting up to %s seconds for metadata reporting thread to '
164                 'complete.', _REPORT_INTERVAL_SECONDS)
165    _abort.wait(_REPORT_INTERVAL_SECONDS)
166