1"""Autotest AFE Cleanup used by the scheduler""" 2 3import contextlib 4import logging 5import random 6import time 7 8from autotest_lib.client.common_lib import utils 9from autotest_lib.frontend.afe import models 10from autotest_lib.scheduler import scheduler_config 11from autotest_lib.client.common_lib import global_config 12from autotest_lib.client.common_lib import host_protections 13 14try: 15 from chromite.lib import metrics 16except ImportError: 17 metrics = utils.metrics_mock 18 19 20_METRICS_PREFIX = 'chromeos/autotest/scheduler/cleanup' 21 22 23class PeriodicCleanup(object): 24 """Base class to schedule periodical cleanup work. 25 """ 26 27 def __init__(self, db, clean_interval_minutes, run_at_initialize=False): 28 self._db = db 29 self.clean_interval_minutes = clean_interval_minutes 30 self._last_clean_time = time.time() 31 self._run_at_initialize = run_at_initialize 32 33 34 def initialize(self): 35 """Method called by scheduler at the startup. 36 """ 37 if self._run_at_initialize: 38 self._cleanup() 39 40 41 def run_cleanup_maybe(self): 42 """Test if cleanup method should be called. 43 """ 44 should_cleanup = (self._last_clean_time + 45 self.clean_interval_minutes * 60 46 < time.time()) 47 if should_cleanup: 48 self._cleanup() 49 self._last_clean_time = time.time() 50 51 52 def _cleanup(self): 53 """Abrstract cleanup method.""" 54 raise NotImplementedError 55 56 57class UserCleanup(PeriodicCleanup): 58 """User cleanup that is controlled by the global config variable 59 clean_interval_minutes in the SCHEDULER section. 60 """ 61 62 def __init__(self, db, clean_interval_minutes): 63 super(UserCleanup, self).__init__(db, clean_interval_minutes) 64 self._last_reverify_time = time.time() 65 66 67 @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/user/durations') 68 def _cleanup(self): 69 logging.info('Running periodic cleanup') 70 self._abort_timed_out_jobs() 71 self._abort_jobs_past_max_runtime() 72 self._clear_inactive_blocks() 73 self._check_for_db_inconsistencies() 74 self._reverify_dead_hosts() 75 self._django_session_cleanup() 76 77 78 def _abort_timed_out_jobs(self): 79 logging.info( 80 'Aborting all jobs that have timed out and are not complete') 81 query = models.Job.objects.filter(hostqueueentry__complete=False).extra( 82 where=['created_on + INTERVAL timeout_mins MINUTE < NOW()']) 83 jobs = query.distinct() 84 if not jobs: 85 return 86 87 with _cleanup_warning_banner('timed out jobs', len(jobs)): 88 for job in jobs: 89 logging.warning('Aborting job %d due to job timeout', job.id) 90 job.abort() 91 _report_detected_errors('jobs_timed_out', len(jobs)) 92 93 94 def _abort_jobs_past_max_runtime(self): 95 """ 96 Abort executions that have started and are past the job's max runtime. 97 """ 98 logging.info('Aborting all jobs that have passed maximum runtime') 99 rows = self._db.execute(""" 100 SELECT hqe.id FROM afe_host_queue_entries AS hqe 101 WHERE NOT hqe.complete AND NOT hqe.aborted AND EXISTS 102 (select * from afe_jobs where hqe.job_id=afe_jobs.id and 103 hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE < NOW()) 104 """) 105 query = models.HostQueueEntry.objects.filter( 106 id__in=[row[0] for row in rows]) 107 hqes = query.distinct() 108 if not hqes: 109 return 110 111 with _cleanup_warning_banner('hqes past max runtime', len(hqes)): 112 for queue_entry in hqes: 113 logging.warning('Aborting entry %s due to max runtime', 114 queue_entry) 115 queue_entry.abort() 116 _report_detected_errors('hqes_past_max_runtime', len(hqes)) 117 118 119 def _check_for_db_inconsistencies(self): 120 logging.info('Cleaning db inconsistencies') 121 self._check_all_invalid_related_objects() 122 123 124 def _check_invalid_related_objects_one_way(self, invalid_model, 125 relation_field, valid_model): 126 if 'invalid' not in invalid_model.get_field_dict(): 127 return 128 129 invalid_objects = list(invalid_model.objects.filter(invalid=True)) 130 invalid_model.objects.populate_relationships( 131 invalid_objects, valid_model, 'related_objects') 132 if not invalid_objects: 133 return 134 135 num_objects_with_invalid_relations = 0 136 errors = [] 137 for invalid_object in invalid_objects: 138 if invalid_object.related_objects: 139 related_objects = invalid_object.related_objects 140 related_list = ', '.join(str(x) for x in related_objects) 141 num_objects_with_invalid_relations += 1 142 errors.append('Invalid %s is related to: %s' % 143 (invalid_object, related_list)) 144 related_manager = getattr(invalid_object, relation_field) 145 related_manager.clear() 146 147 # Only log warnings after we're sure we've seen at least one invalid 148 # model with some valid relations to avoid empty banners from getting 149 # printed. 150 if errors: 151 invalid_model_name = invalid_model.__name__ 152 valid_model_name = valid_model.__name__ 153 banner = 'invalid %s related to valid %s' % (invalid_model_name, 154 valid_model_name) 155 with _cleanup_warning_banner(banner, len(errors)): 156 for error in errors: 157 logging.warning(error) 158 _report_detected_errors( 159 'invalid_related_objects', 160 num_objects_with_invalid_relations, 161 fields={'invalid_model': invalid_model_name, 162 'valid_model': valid_model_name}) 163 _report_detected_errors( 164 'invalid_related_objects_relations', 165 len(errors), 166 fields={'invalid_model': invalid_model_name, 167 'valid_model': valid_model_name}) 168 169 170 def _check_invalid_related_objects(self, first_model, first_field, 171 second_model, second_field): 172 self._check_invalid_related_objects_one_way( 173 first_model, 174 first_field, 175 second_model, 176 ) 177 self._check_invalid_related_objects_one_way( 178 second_model, 179 second_field, 180 first_model, 181 ) 182 183 184 def _check_all_invalid_related_objects(self): 185 model_pairs = ((models.Host, 'labels', models.Label, 'host_set'), 186 (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'), 187 (models.AclGroup, 'users', models.User, 'aclgroup_set'), 188 (models.Test, 'dependency_labels', models.Label, 189 'test_set')) 190 for first_model, first_field, second_model, second_field in model_pairs: 191 self._check_invalid_related_objects( 192 first_model, 193 first_field, 194 second_model, 195 second_field, 196 ) 197 198 199 def _clear_inactive_blocks(self): 200 logging.info('Clear out blocks for all completed jobs.') 201 # this would be simpler using NOT IN (subquery), but MySQL 202 # treats all IN subqueries as dependent, so this optimizes much 203 # better 204 self._db.execute(""" 205 DELETE ihq FROM afe_ineligible_host_queues ihq 206 WHERE NOT EXISTS 207 (SELECT job_id FROM afe_host_queue_entries hqe 208 WHERE NOT hqe.complete AND hqe.job_id = ihq.job_id)""") 209 210 211 def _should_reverify_hosts_now(self): 212 reverify_period_sec = (scheduler_config.config.reverify_period_minutes 213 * 60) 214 if reverify_period_sec == 0: 215 return False 216 return (self._last_reverify_time + reverify_period_sec) <= time.time() 217 218 219 def _choose_subset_of_hosts_to_reverify(self, hosts): 220 """Given hosts needing verification, return a subset to reverify.""" 221 max_at_once = scheduler_config.config.reverify_max_hosts_at_once 222 if (max_at_once > 0 and len(hosts) > max_at_once): 223 return random.sample(hosts, max_at_once) 224 return sorted(hosts) 225 226 227 def _reverify_dead_hosts(self): 228 if not self._should_reverify_hosts_now(): 229 return 230 231 self._last_reverify_time = time.time() 232 logging.info('Checking for dead hosts to reverify') 233 hosts = models.Host.objects.filter( 234 status=models.Host.Status.REPAIR_FAILED, 235 locked=False, 236 invalid=False) 237 hosts = hosts.exclude( 238 protection=host_protections.Protection.DO_NOT_VERIFY) 239 if not hosts: 240 return 241 242 hosts = list(hosts) 243 total_hosts = len(hosts) 244 hosts = self._choose_subset_of_hosts_to_reverify(hosts) 245 logging.info('Reverifying dead hosts (%d of %d)', len(hosts), 246 total_hosts) 247 with _cleanup_warning_banner('reverify dead hosts', len(hosts)): 248 for host in hosts: 249 logging.warning(host.hostname) 250 _report_detected_errors('dead_hosts_triggered_reverify', len(hosts)) 251 _report_detected_errors('dead_hosts_require_reverify', total_hosts) 252 for host in hosts: 253 models.SpecialTask.schedule_special_task( 254 host=host, task=models.SpecialTask.Task.VERIFY) 255 256 257 def _django_session_cleanup(self): 258 """Clean up django_session since django doesn't for us. 259 http://www.djangoproject.com/documentation/0.96/sessions/ 260 """ 261 logging.info('Deleting old sessions from django_session') 262 sql = 'TRUNCATE TABLE django_session' 263 self._db.execute(sql) 264 265 266class TwentyFourHourUpkeep(PeriodicCleanup): 267 """Cleanup that runs at the startup of monitor_db and every subsequent 268 twenty four hours. 269 """ 270 271 272 def __init__(self, db, drone_manager, run_at_initialize=True): 273 """Initialize TwentyFourHourUpkeep. 274 275 @param db: Database connection object. 276 @param drone_manager: DroneManager to access drones. 277 @param run_at_initialize: True to run cleanup when scheduler starts. 278 Default is set to True. 279 280 """ 281 self.drone_manager = drone_manager 282 clean_interval_minutes = 24 * 60 # 24 hours 283 super(TwentyFourHourUpkeep, self).__init__( 284 db, clean_interval_minutes, run_at_initialize=run_at_initialize) 285 286 287 @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/daily/durations') 288 def _cleanup(self): 289 logging.info('Running 24 hour clean up') 290 self._check_for_uncleanable_db_inconsistencies() 291 self._cleanup_orphaned_containers() 292 293 294 def _check_for_uncleanable_db_inconsistencies(self): 295 logging.info('Checking for uncleanable DB inconsistencies') 296 self._check_for_active_and_complete_queue_entries() 297 self._check_for_multiple_platform_hosts() 298 self._check_for_no_platform_hosts() 299 300 301 def _check_for_active_and_complete_queue_entries(self): 302 query = models.HostQueueEntry.objects.filter(active=True, complete=True) 303 num_bad_hqes = query.count() 304 if num_bad_hqes == 0: 305 return 306 307 num_aborted = 0 308 logging.warning('%d queue entries found with active=complete=1', 309 num_bad_hqes) 310 with _cleanup_warning_banner('active and complete hqes', num_bad_hqes): 311 for entry in query: 312 if entry.status == 'Aborted': 313 entry.active = False 314 entry.save() 315 recovery_path = 'was also aborted, set active to False' 316 num_aborted += 1 317 else: 318 recovery_path = 'can not recover' 319 logging.warning('%s (recovery: %s)', entry.get_object_dict(), 320 recovery_path) 321 _report_detected_errors('hqes_active_and_complete', num_bad_hqes) 322 _report_detected_errors('hqes_aborted_set_to_inactive', num_aborted) 323 324 325 def _check_for_multiple_platform_hosts(self): 326 rows = self._db.execute(""" 327 SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count, 328 GROUP_CONCAT(afe_labels.name) 329 FROM afe_hosts 330 INNER JOIN afe_hosts_labels ON 331 afe_hosts.id = afe_hosts_labels.host_id 332 INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id 333 WHERE afe_labels.platform 334 GROUP BY afe_hosts.id 335 HAVING platform_count > 1 336 ORDER BY hostname""") 337 338 if rows: 339 logging.warning('Cleanup found hosts with multiple platforms') 340 with _cleanup_warning_banner('hosts with multiple platforms', 341 len(rows)): 342 for row in rows: 343 logging.warning(' '.join(str(item) for item in row)) 344 _report_detected_errors('hosts_with_multiple_platforms', len(rows)) 345 346 347 def _check_for_no_platform_hosts(self): 348 rows = self._db.execute(""" 349 SELECT hostname 350 FROM afe_hosts 351 LEFT JOIN afe_hosts_labels 352 ON afe_hosts.id = afe_hosts_labels.host_id 353 AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels 354 WHERE platform) 355 WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""") 356 if rows: 357 with _cleanup_warning_banner('hosts with no platform', len(rows)): 358 for row in rows: 359 logging.warning(row[0]) 360 _report_detected_errors('hosts_with_no_platform', len(rows)) 361 362 363 def _cleanup_orphaned_containers(self): 364 """Cleanup orphaned containers in each drone. 365 366 The function queues a lxc_cleanup call in each drone without waiting for 367 the script to finish, as the cleanup procedure could take minutes and the 368 script output is logged. 369 370 """ 371 ssp_enabled = global_config.global_config.get_config_value( 372 'AUTOSERV', 'enable_ssp_container') 373 if not ssp_enabled: 374 logging.info( 375 'Server-side packaging is not enabled, no need to clean ' 376 'up orphaned containers.') 377 return 378 self.drone_manager.cleanup_orphaned_containers() 379 380 381def _report_detected_errors(metric_name, count, fields={}): 382 """Reports a counter metric for recovered errors 383 384 @param metric_name: Name of the metric to report about. 385 @param count: How many "errors" were fixed this cycle. 386 @param fields: Optional fields to include with the metric. 387 """ 388 m = '%s/errors_recovered/%s' % (_METRICS_PREFIX, metric_name) 389 metrics.Counter(m).increment_by(count, fields=fields) 390 391 392def _report_detected_errors(metric_name, gauge, fields={}): 393 """Reports a gauge metric for errors detected 394 395 @param metric_name: Name of the metric to report about. 396 @param gauge: Outstanding number of unrecoverable errors of this type. 397 @param fields: Optional fields to include with the metric. 398 """ 399 m = '%s/errors_detected/%s' % (_METRICS_PREFIX, metric_name) 400 metrics.Gauge(m).set(gauge, fields=fields) 401 402 403@contextlib.contextmanager 404def _cleanup_warning_banner(banner, error_count=None): 405 """Put a clear context in the logs around list of errors 406 407 @param: banner: The identifying header to print for context. 408 @param: error_count: If not None, the number of errors detected. 409 """ 410 if error_count is not None: 411 banner += ' (total: %d)' % error_count 412 logging.warning('#### START: %s ####', banner) 413 try: 414 yield 415 finally: 416 logging.warning('#### END: %s ####', banner) 417