1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5""" 6This module is designed to report metadata in a separated thread to avoid the 7performance overhead of sending data to Elasticsearch using HTTP. 8 9""" 10 11import logging 12import Queue 13import socket 14import time 15import threading 16 17import common 18from autotest_lib.client.common_lib import utils 19 20try: 21 from chromite.lib import metrics 22except ImportError: 23 metrics = utils.metrics_mock 24 25 26_METADATA_METRICS_PREFIX = 'chromeos/autotest/es_metadata_reporter/' 27 28# Number of seconds to wait before checking queue again for uploading data. 29_REPORT_INTERVAL_SECONDS = 5 30 31_MAX_METADATA_QUEUE_SIZE = 1000000 32_MAX_UPLOAD_SIZE = 50000 33# The number of seconds for upload to fail continuously. After that, upload will 34# be limited to 1 entry. 35_MAX_UPLOAD_FAIL_DURATION = 600 36# Number of entries to retry when the previous upload failed continueously for 37# the duration of _MAX_UPLOAD_FAIL_DURATION. 38_MIN_RETRY_ENTRIES = 10 39# Queue to buffer metadata to be reported. 40metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE) 41 42_report_lock = threading.Lock() 43_abort = threading.Event() 44_queue_full = threading.Event() 45_metrics_fields = {} 46 47def _get_metrics_fields(): 48 """Get the fields information to be uploaded to metrics.""" 49 if not _metrics_fields: 50 _metrics_fields['hostname'] = socket.gethostname() 51 52 return _metrics_fields 53 54 55def queue(data): 56 """Queue metadata to be uploaded in reporter thread. 57 58 If the queue is full, an error will be logged for the first time the queue 59 becomes full. The call does not wait or raise Queue.Full exception, so 60 there is no overhead on the performance of caller, e.g., scheduler. 61 62 @param data: A metadata entry, which should be a dictionary. 63 """ 64 if not is_running(): 65 return 66 67 try: 68 metadata_queue.put_nowait(data) 69 if _queue_full.is_set(): 70 logging.info('Metadata queue is available to receive new data ' 71 'again.') 72 _queue_full.clear() 73 except Queue.Full: 74 if not _queue_full.is_set(): 75 _queue_full.set() 76 logging.error('Metadata queue is full, cannot report data. ' 77 'Consider increasing the value of ' 78 '_MAX_METADATA_QUEUE_SIZE. Its current value is set ' 79 'to %d.', _MAX_METADATA_QUEUE_SIZE) 80 81 82def _run(): 83 """Report metadata in the queue until being aborted. 84 """ 85 # Time when the first time upload failed. None if the last upload succeeded. 86 first_failed_upload = None 87 upload_size = _MIN_RETRY_ENTRIES 88 89 try: 90 while True: 91 start_time = time.time() 92 data_list = [] 93 if (first_failed_upload and 94 time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION): 95 upload_size = _MIN_RETRY_ENTRIES 96 else: 97 upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE) 98 while (not metadata_queue.empty() and len(data_list) < upload_size): 99 data_list.append(metadata_queue.get_nowait()) 100 if data_list: 101 success = False 102 fields = _get_metrics_fields().copy() 103 fields['success'] = success 104 metrics.Gauge( 105 _METADATA_METRICS_PREFIX + 'upload/batch_sizes').set( 106 len(data_list), fields=fields) 107 metrics.Counter( 108 _METADATA_METRICS_PREFIX + 'upload/attempts').increment( 109 fields=fields); 110 111 metrics.Gauge(_METADATA_METRICS_PREFIX + 'queue_size').set( 112 metadata_queue.qsize(), fields=_get_metrics_fields()) 113 sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time 114 if sleep_time < 0: 115 sleep_time = 0.5 116 _abort.wait(timeout=sleep_time) 117 except Exception as e: 118 logging.exception('Metadata reporter thread failed with error: %s', e) 119 raise 120 finally: 121 logging.info('Metadata reporting thread is exiting.') 122 _abort.clear() 123 _report_lock.release() 124 125 126def is_running(): 127 """Check if metadata_reporter is running. 128 129 @return: True if metadata_reporter is running. 130 """ 131 return _report_lock.locked() 132 133 134def start(): 135 """Start the thread to report metadata. 136 """ 137 # The lock makes sure there is only one reporting thread working. 138 if is_running(): 139 logging.error('There is already a metadata reporter thread.') 140 return 141 142 logging.warn('Elasticsearch db deprecated, no metadata will be ' 143 'reported.') 144 145 _report_lock.acquire() 146 reporting_thread = threading.Thread(target=_run) 147 # Make it a daemon thread so it doesn't need to be closed explicitly. 148 reporting_thread.setDaemon(True) 149 reporting_thread.start() 150 logging.info('Metadata reporting thread is started.') 151 152 153def abort(): 154 """Abort the thread to report metadata. 155 156 The call will wait up to 5 seconds for existing data to be uploaded. 157 """ 158 if not is_running(): 159 logging.error('The metadata reporting thread has already exited.') 160 return 161 162 _abort.set() 163 logging.info('Waiting up to %s seconds for metadata reporting thread to ' 164 'complete.', _REPORT_INTERVAL_SECONDS) 165 _abort.wait(_REPORT_INTERVAL_SECONDS) 166