1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5""" 6This module is designed to report metadata in a separated thread to avoid the 7performance overhead of sending data to Elasticsearch using HTTP. 8 9""" 10 11import logging 12import Queue 13import time 14import threading 15 16import common 17from autotest_lib.client.common_lib.cros.graphite import autotest_es 18from autotest_lib.scheduler import email_manager 19# The metadata_reporter thread runs inside scheduler process, thus it doesn't 20# need to setup django, otherwise, following import is needed: 21# from autotest_lib.frontend import setup_django_environment 22from autotest_lib.site_utils import server_manager_utils 23 24 25# Number of seconds to wait before checking queue again for uploading data. 26_REPORT_INTERVAL_SECONDS = 5 27 28_MAX_METADATA_QUEUE_SIZE = 1000000 29_MAX_UPLOAD_SIZE = 50000 30# The number of seconds for upload to fail continuously. After that, upload will 31# be limited to 1 entry. 32_MAX_UPLOAD_FAIL_DURATION = 600 33# Number of entries to retry when the previous upload failed continueously for 34# the duration of _MAX_UPLOAD_FAIL_DURATION. 35_MIN_RETRY_ENTRIES = 10 36# Queue to buffer metadata to be reported. 37metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE) 38 39_report_lock = threading.Lock() 40_abort = threading.Event() 41_queue_full = threading.Event() 42 43def queue(data): 44 """Queue metadata to be uploaded in reporter thread. 45 46 If the queue is full, an error will be logged for the first time the queue 47 becomes full. The call does not wait or raise Queue.Full exception, so 48 there is no overhead on the performance of caller, e.g., scheduler. 49 50 @param data: A metadata entry, which should be a dictionary. 51 """ 52 try: 53 metadata_queue.put_nowait(data) 54 if _queue_full.is_set(): 55 logging.info('Metadata queue is available to receive new data ' 56 'again.') 57 _queue_full.clear() 58 except Queue.Full: 59 if not _queue_full.is_set(): 60 _queue_full.set() 61 logging.error('Metadata queue is full, cannot report data. ' 62 'Consider increasing the value of ' 63 '_MAX_METADATA_QUEUE_SIZE. Its current value is set ' 64 'to %d.', _MAX_METADATA_QUEUE_SIZE) 65 66 67def _email_alert(): 68 """ 69 """ 70 if not server_manager_utils.use_server_db(): 71 logging.debug('Server database not emailed, email alert is skipped.') 72 return 73 try: 74 server_manager_utils.confirm_server_has_role(hostname='localhost', 75 role='scheduler') 76 except server_manager_utils.ServerActionError: 77 # Only email alert if the server is a scheduler, not shard. 78 return 79 subject = ('Metadata upload has been failing for %d seconds' % 80 _MAX_UPLOAD_FAIL_DURATION) 81 email_manager.manager.enqueue_notify_email(subject, '') 82 email_manager.manager.send_queued_emails() 83 84 85def _run(): 86 """Report metadata in the queue until being aborted. 87 """ 88 # Time when the first time upload failed. None if the last upload succeeded. 89 first_failed_upload = None 90 # True if email alert was sent when upload has been failing continuously 91 # for _MAX_UPLOAD_FAIL_DURATION seconds. 92 email_alert = False 93 upload_size = _MIN_RETRY_ENTRIES 94 try: 95 while True: 96 start_time = time.time() 97 data_list = [] 98 if (first_failed_upload and 99 time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION): 100 upload_size = _MIN_RETRY_ENTRIES 101 if not email_alert: 102 _email_alert() 103 email_alert = True 104 else: 105 upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE) 106 while (not metadata_queue.empty() and len(data_list) < upload_size): 107 data_list.append(metadata_queue.get_nowait()) 108 if data_list: 109 if autotest_es.bulk_post(data_list=data_list): 110 time_used = time.time() - start_time 111 logging.info('%d entries of metadata uploaded in %s ' 112 'seconds.', len(data_list), time_used) 113 first_failed_upload = None 114 email_alert = False 115 else: 116 logging.warn('Failed to upload %d entries of metadata, ' 117 'they will be retried later.', len(data_list)) 118 for data in data_list: 119 queue(data) 120 if not first_failed_upload: 121 first_failed_upload = time.time() 122 sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time 123 if sleep_time < 0: 124 sleep_time = 0.5 125 _abort.wait(timeout=sleep_time) 126 except Exception as e: 127 logging.error('Metadata reporter thread failed with error: %s', e) 128 raise 129 finally: 130 logging.info('Metadata reporting thread is exiting.') 131 _abort.clear() 132 _report_lock.release() 133 134 135def start(): 136 """Start the thread to report metadata. 137 """ 138 # The lock makes sure there is only one reporting thread working. 139 if _report_lock.locked(): 140 logging.error('There is already a metadata reporter thread.') 141 return 142 143 _report_lock.acquire() 144 reporting_thread = threading.Thread(target=_run) 145 # Make it a daemon thread so it doesn't need to be closed explicitly. 146 reporting_thread.setDaemon(True) 147 reporting_thread.start() 148 logging.info('Metadata reporting thread is started.') 149 150 151def abort(): 152 """Abort the thread to report metadata. 153 154 The call will wait up to 5 seconds for existing data to be uploaded. 155 """ 156 if not _report_lock.locked(): 157 logging.error('The metadata reporting thread has already exited.') 158 return 159 160 _abort.set() 161 logging.info('Waiting up to %s seconds for metadata reporting thread to ' 162 'complete.', _REPORT_INTERVAL_SECONDS) 163 _abort.wait(_REPORT_INTERVAL_SECONDS) 164