1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import datetime 6import logging 7import os 8import random 9import time 10 11 12from autotest_lib.client.common_lib import base_job, global_config, log 13from autotest_lib.client.common_lib import time_utils 14 15_DEFAULT_POLL_INTERVAL_SECONDS = 30.0 16 17HQE_MAXIMUM_ABORT_RATE_FLOAT = global_config.global_config.get_config_value( 18 'SCHEDULER', 'hqe_maximum_abort_rate_float', type=float, 19 default=0.5) 20 21 22def view_is_relevant(view): 23 """ 24 Indicates whether the view of a given test is meaningful or not. 25 26 @param view: a detailed test 'view' from the TKO DB to look at. 27 @return True if this is a test result worth looking at further. 28 """ 29 return not view['test_name'].startswith('CLIENT_JOB') 30 31 32def view_is_for_suite_job(view): 33 """ 34 Indicates whether the given test view is the view of Suite job. 35 36 @param view: a detailed test 'view' from the TKO DB to look at. 37 @return True if this is view of suite job. 38 """ 39 return view['test_name'] == 'SERVER_JOB' 40 41 42def view_is_for_infrastructure_fail(view): 43 """ 44 Indicates whether the given test view is from an infra fail. 45 46 @param view: a detailed test 'view' from the TKO DB to look at. 47 @return True if this view indicates an infrastructure-side issue during 48 a test. 49 """ 50 return view['test_name'].endswith('SERVER_JOB') 51 52 53def is_for_infrastructure_fail(status): 54 """ 55 Indicates whether the given Status is from an infra fail. 56 57 @param status: the Status object to look at. 58 @return True if this Status indicates an infrastructure-side issue during 59 a test. 60 """ 61 return view_is_for_infrastructure_fail({'test_name': status.test_name}) 62 63 64def _abort_jobs_if_timedout(afe, jobs, start_time, timeout_mins): 65 """ 66 Abort all of the jobs in jobs if the running time has past the timeout. 67 68 @param afe: an instance of AFE as defined in server/frontend.py. 69 @param jobs: an iterable of Running frontend.Jobs 70 @param start_time: Time to compare to the current time to see if a timeout 71 has occurred. 72 @param timeout_mins: Time in minutes to wait before aborting the jobs we 73 are waiting on. 74 75 @returns True if we there was a timeout, False if not. 76 """ 77 if datetime.datetime.utcnow() < (start_time + 78 datetime.timedelta(minutes=timeout_mins)): 79 return False 80 for job in jobs: 81 logging.debug('Job: %s has timed out after %s minutes. Aborting job.', 82 job.id, timeout_mins) 83 afe.run('abort_host_queue_entries', job=job.id) 84 return True 85 86 87def _collate_aborted(current_value, entry): 88 """ 89 reduce() over a list of HostQueueEntries for a job; True if any aborted. 90 91 Functor that can be reduced()ed over a list of 92 HostQueueEntries for a job. If any were aborted 93 (|entry.aborted| exists and is True), then the reduce() will 94 return True. 95 96 Ex: 97 entries = AFE.run('get_host_queue_entries', job=job.id) 98 reduce(_collate_aborted, entries, False) 99 100 @param current_value: the current accumulator (a boolean). 101 @param entry: the current entry under consideration. 102 @return the value of |entry.aborted| if it exists, False if not. 103 """ 104 return current_value or ('aborted' in entry and entry['aborted']) 105 106 107def _status_for_test(status): 108 """ 109 Indicates whether the status of a given test is meaningful or not. 110 111 @param status: frontend.TestStatus object to look at. 112 @return True if this is a test result worth looking at further. 113 """ 114 return not (status.test_name.startswith('SERVER_JOB') or 115 status.test_name.startswith('CLIENT_JOB')) 116 117 118class JobResultWaiter(object): 119 """Class for waiting on job results.""" 120 121 def __init__(self, afe, tko): 122 """Instantiate class 123 124 @param afe: an instance of AFE as defined in server/frontend.py. 125 @param tko: an instance of TKO as defined in server/frontend.py. 126 """ 127 self._afe = afe 128 self._tko = tko 129 self._job_ids = set() 130 131 def add_job(self, job): 132 """Add job to wait on. 133 134 @param job: Job object to get results from, as defined in 135 server/frontend.py 136 """ 137 self.add_jobs((job,)) 138 139 def add_jobs(self, jobs): 140 """Add job to wait on. 141 142 @param jobs: Iterable of Job object to get results from, as defined in 143 server/frontend.py 144 """ 145 self._job_ids.update(job.id for job in jobs) 146 147 def wait_for_results(self): 148 """Wait for jobs to finish and return their results. 149 150 The returned generator blocks until all jobs have finished, 151 naturally. 152 153 @yields an iterator of Statuses, one per test. 154 """ 155 while self._job_ids: 156 for job in self._get_finished_jobs(): 157 for result in _yield_job_results(self._afe, self._tko, job): 158 yield result 159 self._job_ids.remove(job.id) 160 self._sleep() 161 162 def _get_finished_jobs(self): 163 # This is an RPC call which serializes to JSON, so we can't pass 164 # in sets. 165 return self._afe.get_jobs(id__in=list(self._job_ids), finished=True) 166 167 def _sleep(self): 168 time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5)) 169 170 171def _yield_job_results(afe, tko, job): 172 """ 173 Yields the results of an individual job. 174 175 Yields one Status object per test. 176 177 @param afe: an instance of AFE as defined in server/frontend.py. 178 @param tko: an instance of TKO as defined in server/frontend.py. 179 @param job: Job object to get results from, as defined in 180 server/frontend.py 181 @yields an iterator of Statuses, one per test. 182 """ 183 entries = afe.run('get_host_queue_entries', job=job.id) 184 185 # This query uses the job id to search through the tko_test_view_2 186 # table, for results of a test with a similar job_tag. The job_tag 187 # is used to store results, and takes the form job_id-owner/host. 188 # Many times when a job aborts during a test, the job_tag actually 189 # exists and the results directory contains valid logs. If the job 190 # was aborted prematurely i.e before it had a chance to create the 191 # job_tag, this query will return no results. When statuses is not 192 # empty it will contain frontend.TestStatus' with fields populated 193 # using the results of the db query. 194 statuses = tko.get_job_test_statuses_from_db(job.id) 195 if not statuses: 196 yield Status('ABORT', job.name) 197 198 # We only care about the SERVER and CLIENT job failures when there 199 # are no test failures. 200 contains_test_failure = any(_status_for_test(s) and s.status != 'GOOD' 201 for s in statuses) 202 for s in statuses: 203 # TKO parser uniquelly identifies a test run by 204 # (test_name, subdir). In dynamic suite, we need to emit 205 # a subdir for each status and make sure (test_name, subdir) 206 # in the suite job's status log is unique. 207 # For non-test status (i.e.SERVER_JOB, CLIENT_JOB), 208 # we use 'job_tag' from tko_test_view_2, which looks like 209 # '1246-owner/172.22.33.44' 210 # For normal test status, we use 'job_tag/subdir' 211 # which looks like '1246-owner/172.22.33.44/my_DummyTest.tag.subdir_tag' 212 if _status_for_test(s): 213 yield Status(s.status, s.test_name, s.reason, 214 s.test_started_time, s.test_finished_time, 215 job.id, job.owner, s.hostname, job.name, 216 subdir=os.path.join(s.job_tag, s.subdir)) 217 else: 218 if s.status != 'GOOD' and not contains_test_failure: 219 yield Status(s.status, 220 '%s_%s' % (entries[0]['job']['name'], 221 s.test_name), 222 s.reason, s.test_started_time, 223 s.test_finished_time, job.id, 224 job.owner, s.hostname, job.name, 225 subdir=s.job_tag) 226 227 228class Status(object): 229 """ 230 A class representing a test result. 231 232 Stores all pertinent info about a test result and, given a callable 233 to use, can record start, result, and end info appropriately. 234 235 @var _status: status code, e.g. 'INFO', 'FAIL', etc. 236 @var _test_name: the name of the test whose result this is. 237 @var _reason: message explaining failure, if any. 238 @var _begin_timestamp: when test started (int, in seconds since the epoch). 239 @var _end_timestamp: when test finished (int, in seconds since the epoch). 240 @var _id: the ID of the job that generated this Status. 241 @var _owner: the owner of the job that generated this Status. 242 243 @var STATUS_MAP: a dict mapping host queue entry status strings to canonical 244 status codes; e.g. 'Aborted' -> 'ABORT' 245 """ 246 _status = None 247 _test_name = None 248 _reason = None 249 _begin_timestamp = None 250 _end_timestamp = None 251 252 # Queued status can occur if the try job just aborted due to not completing 253 # reimaging for all machines. The Queued corresponds to an 'ABORT'. 254 STATUS_MAP = {'Failed': 'FAIL', 'Aborted': 'ABORT', 'Completed': 'GOOD', 255 'Queued' : 'ABORT'} 256 257 class sle(base_job.status_log_entry): 258 """ 259 Thin wrapper around status_log_entry that supports stringification. 260 """ 261 def __str__(self): 262 return self.render() 263 264 def __repr__(self): 265 return self.render() 266 267 268 def __init__(self, status, test_name, reason='', begin_time_str=None, 269 end_time_str=None, job_id=None, owner=None, hostname=None, 270 job_name='', subdir=None): 271 """ 272 Constructor 273 274 @param status: status code, e.g. 'INFO', 'FAIL', etc. 275 @param test_name: the name of the test whose result this is. 276 @param reason: message explaining failure, if any; Optional. 277 @param begin_time_str: when test started (in time_utils.TIME_FMT); 278 now() if None or 'None'. 279 @param end_time_str: when test finished (in time_utils.TIME_FMT); 280 now() if None or 'None'. 281 @param job_id: the ID of the job that generated this Status. 282 @param owner: the owner of the job that generated this Status. 283 @param hostname: The name of the host the test that generated this 284 result ran on. 285 @param job_name: The job name; Contains the test name with/without the 286 experimental prefix, the tag and the build. 287 @param subdir: The result directory of the test. It will be recorded 288 as the subdir in the status.log file. 289 """ 290 self._status = status 291 self._test_name = test_name 292 self._reason = reason 293 self._id = job_id 294 self._owner = owner 295 self._hostname = hostname 296 self._job_name = job_name 297 self._subdir = subdir 298 # Autoserv drops a keyval of the started time which eventually makes its 299 # way here. Therefore, if we have a starting time, we may assume that 300 # the test reached Running and actually began execution on a drone. 301 self._test_executed = begin_time_str and begin_time_str != 'None' 302 303 if begin_time_str and begin_time_str != 'None': 304 self._begin_timestamp = int(time.mktime( 305 datetime.datetime.strptime( 306 begin_time_str, time_utils.TIME_FMT).timetuple())) 307 else: 308 self._begin_timestamp = int(time.time()) 309 310 if end_time_str and end_time_str != 'None': 311 self._end_timestamp = int(time.mktime( 312 datetime.datetime.strptime( 313 end_time_str, time_utils.TIME_FMT).timetuple())) 314 else: 315 self._end_timestamp = int(time.time()) 316 317 318 def is_good(self): 319 """ Returns true if status is good. """ 320 return self._status == 'GOOD' 321 322 323 def is_warn(self): 324 """ Returns true if status is warn. """ 325 return self._status == 'WARN' 326 327 328 def is_testna(self): 329 """ Returns true if status is TEST_NA """ 330 return self._status == 'TEST_NA' 331 332 333 def is_worse_than(self, candidate): 334 """ 335 Return whether |self| represents a "worse" failure than |candidate|. 336 337 "Worse" is defined the same as it is for log message purposes in 338 common_lib/log.py. We also consider status with a specific error 339 message to represent a "worse" failure than one without. 340 341 @param candidate: a Status instance to compare to this one. 342 @return True if |self| is "worse" than |candidate|. 343 """ 344 if self._status != candidate._status: 345 return (log.job_statuses.index(self._status) < 346 log.job_statuses.index(candidate._status)) 347 # else, if the statuses are the same... 348 if self._reason and not candidate._reason: 349 return True 350 return False 351 352 353 def record_start(self, record_entry): 354 """ 355 Use record_entry to log message about start of test. 356 357 @param record_entry: a callable to use for logging. 358 prototype: 359 record_entry(base_job.status_log_entry) 360 """ 361 log_entry = Status.sle('START', self._subdir, 362 self._test_name, '', 363 None, self._begin_timestamp) 364 record_entry(log_entry, log_in_subdir=False) 365 366 367 def record_result(self, record_entry): 368 """ 369 Use record_entry to log message about result of test. 370 371 @param record_entry: a callable to use for logging. 372 prototype: 373 record_entry(base_job.status_log_entry) 374 """ 375 log_entry = Status.sle(self._status, self._subdir, 376 self._test_name, self._reason, None, 377 self._end_timestamp) 378 record_entry(log_entry, log_in_subdir=False) 379 380 381 def record_end(self, record_entry): 382 """ 383 Use record_entry to log message about end of test. 384 385 @param record_entry: a callable to use for logging. 386 prototype: 387 record_entry(base_job.status_log_entry) 388 """ 389 log_entry = Status.sle('END %s' % self._status, self._subdir, 390 self._test_name, '', None, self._end_timestamp) 391 record_entry(log_entry, log_in_subdir=False) 392 393 394 def record_all(self, record_entry): 395 """ 396 Use record_entry to log all messages about test results. 397 398 @param record_entry: a callable to use for logging. 399 prototype: 400 record_entry(base_job.status_log_entry) 401 """ 402 self.record_start(record_entry) 403 self.record_result(record_entry) 404 self.record_end(record_entry) 405 406 407 def override_status(self, override): 408 """ 409 Override the _status field of this Status. 410 411 @param override: value with which to override _status. 412 """ 413 self._status = override 414 415 416 @property 417 def test_name(self): 418 """ Name of the test this status corresponds to. """ 419 return self._test_name 420 421 422 @test_name.setter 423 def test_name(self, value): 424 """ 425 Test name setter. 426 427 @param value: The test name. 428 """ 429 self._test_name = value 430 431 432 @property 433 def id(self): 434 """ Id of the job that corresponds to this status. """ 435 return self._id 436 437 438 @property 439 def owner(self): 440 """ Owner of the job that corresponds to this status. """ 441 return self._owner 442 443 444 @property 445 def hostname(self): 446 """ Host the job corresponding to this status ran on. """ 447 return self._hostname 448 449 450 @property 451 def reason(self): 452 """ Reason the job corresponding to this status failed. """ 453 return self._reason 454 455 456 @property 457 def test_executed(self): 458 """ If the test reached running an autoserv instance or not. """ 459 return self._test_executed 460 461 @property 462 def subdir(self): 463 """Subdir of test this status corresponds to.""" 464 return self._subdir 465