1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import datetime 6import logging 7import os 8import random 9import time 10 11 12from autotest_lib.client.common_lib import base_job, global_config, log 13from autotest_lib.client.common_lib import time_utils 14 15_DEFAULT_POLL_INTERVAL_SECONDS = 30.0 16 17HQE_MAXIMUM_ABORT_RATE_FLOAT = global_config.global_config.get_config_value( 18 'SCHEDULER', 'hqe_maximum_abort_rate_float', type=float, 19 default=0.5) 20 21 22def view_is_relevant(view): 23 """ 24 Indicates whether the view of a given test is meaningful or not. 25 26 @param view: a detailed test 'view' from the TKO DB to look at. 27 @return True if this is a test result worth looking at further. 28 """ 29 return not view['test_name'].startswith('CLIENT_JOB') 30 31 32def view_is_for_suite_job(view): 33 """ 34 Indicates whether the given test view is the view of Suite job. 35 36 @param view: a detailed test 'view' from the TKO DB to look at. 37 @return True if this is view of suite job. 38 """ 39 return view['test_name'] == 'SERVER_JOB' 40 41 42def view_is_for_infrastructure_fail(view): 43 """ 44 Indicates whether the given test view is from an infra fail. 45 46 @param view: a detailed test 'view' from the TKO DB to look at. 47 @return True if this view indicates an infrastructure-side issue during 48 a test. 49 """ 50 return view['test_name'].endswith('SERVER_JOB') 51 52 53def is_for_infrastructure_fail(status): 54 """ 55 Indicates whether the given Status is from an infra fail. 56 57 @param status: the Status object to look at. 58 @return True if this Status indicates an infrastructure-side issue during 59 a test. 60 """ 61 return view_is_for_infrastructure_fail({'test_name': status.test_name}) 62 63 64def _abort_jobs_if_timedout(afe, jobs, start_time, timeout_mins): 65 """ 66 Abort all of the jobs in jobs if the running time has past the timeout. 67 68 @param afe: an instance of AFE as defined in server/frontend.py. 69 @param jobs: an iterable of Running frontend.Jobs 70 @param start_time: Time to compare to the current time to see if a timeout 71 has occurred. 72 @param timeout_mins: Time in minutes to wait before aborting the jobs we 73 are waiting on. 74 75 @returns True if we there was a timeout, False if not. 76 """ 77 if datetime.datetime.utcnow() < (start_time + 78 datetime.timedelta(minutes=timeout_mins)): 79 return False 80 for job in jobs: 81 logging.debug('Job: %s has timed out after %s minutes. Aborting job.', 82 job.id, timeout_mins) 83 afe.run('abort_host_queue_entries', job=job.id) 84 return True 85 86 87def _collate_aborted(current_value, entry): 88 """ 89 reduce() over a list of HostQueueEntries for a job; True if any aborted. 90 91 Functor that can be reduced()ed over a list of 92 HostQueueEntries for a job. If any were aborted 93 (|entry.aborted| exists and is True), then the reduce() will 94 return True. 95 96 Ex: 97 entries = AFE.run('get_host_queue_entries', job=job.id) 98 reduce(_collate_aborted, entries, False) 99 100 @param current_value: the current accumulator (a boolean). 101 @param entry: the current entry under consideration. 102 @return the value of |entry.aborted| if it exists, False if not. 103 """ 104 return current_value or ('aborted' in entry and entry['aborted']) 105 106 107def _status_for_test(status): 108 """ 109 Indicates whether the status of a given test is meaningful or not. 110 111 @param status: frontend.TestStatus object to look at. 112 @return True if this is a test result worth looking at further. 113 """ 114 return not (status.test_name.startswith('SERVER_JOB') or 115 status.test_name.startswith('CLIENT_JOB')) 116 117 118class _JobResultWaiter(object): 119 """Class for waiting on job results.""" 120 121 def __init__(self, afe, tko): 122 """Instantiate class 123 124 @param afe: an instance of AFE as defined in server/frontend.py. 125 @param tko: an instance of TKO as defined in server/frontend.py. 126 """ 127 self._afe = afe 128 self._tko = tko 129 self._job_ids = set() 130 131 def add_job(self, job): 132 """Add job to wait on. 133 134 @param job: Job object to get results from, as defined in 135 server/frontend.py 136 """ 137 self.add_jobs((job,)) 138 139 def add_jobs(self, jobs): 140 """Add job to wait on. 141 142 @param jobs: Iterable of Job object to get results from, as defined in 143 server/frontend.py 144 """ 145 self._job_ids.update(job.id for job in jobs) 146 147 def wait_for_results(self): 148 """Wait for jobs to finish and return their results. 149 150 The returned generator blocks until all jobs have finished, 151 naturally. 152 153 @yields an iterator of Statuses, one per test. 154 """ 155 while self._job_ids: 156 for job in self._get_finished_jobs(): 157 for result in _yield_job_results(self._afe, self._tko, job): 158 yield result 159 self._job_ids.remove(job.id) 160 self._sleep() 161 162 def _get_finished_jobs(self): 163 # This is an RPC call which serializes to JSON, so we can't pass 164 # in sets. 165 return self._afe.get_jobs(id__in=list(self._job_ids), finished=True) 166 167 def _sleep(self): 168 time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5)) 169 170 171def _yield_job_results(afe, tko, job): 172 """ 173 Yields the results of an individual job. 174 175 Yields one Status object per test. 176 177 @param afe: an instance of AFE as defined in server/frontend.py. 178 @param tko: an instance of TKO as defined in server/frontend.py. 179 @param job: Job object to get results from, as defined in 180 server/frontend.py 181 @yields an iterator of Statuses, one per test. 182 """ 183 entries = afe.run('get_host_queue_entries', job=job.id) 184 185 # This query uses the job id to search through the tko_test_view_2 186 # table, for results of a test with a similar job_tag. The job_tag 187 # is used to store results, and takes the form job_id-owner/host. 188 # Many times when a job aborts during a test, the job_tag actually 189 # exists and the results directory contains valid logs. If the job 190 # was aborted prematurely i.e before it had a chance to create the 191 # job_tag, this query will return no results. When statuses is not 192 # empty it will contain frontend.TestStatus' with fields populated 193 # using the results of the db query. 194 statuses = tko.get_job_test_statuses_from_db(job.id) 195 if not statuses: 196 yield Status('ABORT', job.name) 197 198 # We only care about the SERVER and CLIENT job failures when there 199 # are no test failures. 200 contains_test_failure = any(_status_for_test(s) and s.status != 'GOOD' 201 for s in statuses) 202 for s in statuses: 203 # TKO parser uniquelly identifies a test run by 204 # (test_name, subdir). In dynamic suite, we need to emit 205 # a subdir for each status and make sure (test_name, subdir) 206 # in the suite job's status log is unique. 207 # For non-test status (i.e.SERVER_JOB, CLIENT_JOB), 208 # we use 'job_tag' from tko_test_view_2, which looks like 209 # '1246-owner/172.22.33.44' 210 # For normal test status, we use 'job_tag/subdir' 211 # which looks like '1246-owner/172.22.33.44/my_DummyTest.tag.subdir_tag' 212 if _status_for_test(s): 213 yield Status(s.status, s.test_name, s.reason, 214 s.test_started_time, s.test_finished_time, 215 job.id, job.owner, s.hostname, job.name, 216 subdir=os.path.join(s.job_tag, s.subdir)) 217 else: 218 if s.status != 'GOOD' and not contains_test_failure: 219 yield Status(s.status, 220 '%s_%s' % (entries[0]['job']['name'], 221 s.test_name), 222 s.reason, s.test_started_time, 223 s.test_finished_time, job.id, 224 job.owner, s.hostname, job.name, 225 subdir=s.job_tag) 226 227 228def wait_for_child_results(afe, tko, parent_job_id): 229 """ 230 Wait for results of all tests in jobs with given parent id. 231 232 New jobs could be added by calling send(new_jobs) on the generator. 233 Currently polls for results every 5s. Yields one Status object per test 234 as results become available. 235 236 @param afe: an instance of AFE as defined in server/frontend.py. 237 @param tko: an instance of TKO as defined in server/frontend.py. 238 @param parent_job_id: Parent job id for the jobs to wait on. 239 @yields an iterator of Statuses, one per test. 240 """ 241 waiter = _JobResultWaiter(afe, tko) 242 waiter.add_jobs(afe.get_jobs(parent_job_id=parent_job_id)) 243 for result in waiter.wait_for_results(): 244 new_jobs = (yield result) 245 if new_jobs: 246 waiter.add_jobs(new_jobs) 247 # Return nothing if 'send' is called 248 yield None 249 250 251def wait_for_results(afe, tko, jobs): 252 """ 253 Wait for results of all tests in all jobs in |jobs|. 254 255 New jobs could be added by calling send(new_jobs) on the generator. 256 Currently polls for results every 5s. Yields one Status object per test 257 as results become available. 258 259 @param afe: an instance of AFE as defined in server/frontend.py. 260 @param tko: an instance of TKO as defined in server/frontend.py. 261 @param jobs: a list of Job objects, as defined in server/frontend.py. 262 @yields an iterator of Statuses, one per test. 263 """ 264 waiter = _JobResultWaiter(afe, tko) 265 waiter.add_jobs(jobs) 266 for result in waiter.wait_for_results(): 267 new_jobs = (yield result) 268 if new_jobs: 269 waiter.add_jobs(new_jobs) 270 # Return nothing if 'send' is called 271 yield None 272 273 274class Status(object): 275 """ 276 A class representing a test result. 277 278 Stores all pertinent info about a test result and, given a callable 279 to use, can record start, result, and end info appropriately. 280 281 @var _status: status code, e.g. 'INFO', 'FAIL', etc. 282 @var _test_name: the name of the test whose result this is. 283 @var _reason: message explaining failure, if any. 284 @var _begin_timestamp: when test started (int, in seconds since the epoch). 285 @var _end_timestamp: when test finished (int, in seconds since the epoch). 286 @var _id: the ID of the job that generated this Status. 287 @var _owner: the owner of the job that generated this Status. 288 289 @var STATUS_MAP: a dict mapping host queue entry status strings to canonical 290 status codes; e.g. 'Aborted' -> 'ABORT' 291 """ 292 _status = None 293 _test_name = None 294 _reason = None 295 _begin_timestamp = None 296 _end_timestamp = None 297 298 # Queued status can occur if the try job just aborted due to not completing 299 # reimaging for all machines. The Queued corresponds to an 'ABORT'. 300 STATUS_MAP = {'Failed': 'FAIL', 'Aborted': 'ABORT', 'Completed': 'GOOD', 301 'Queued' : 'ABORT'} 302 303 class sle(base_job.status_log_entry): 304 """ 305 Thin wrapper around status_log_entry that supports stringification. 306 """ 307 def __str__(self): 308 return self.render() 309 310 def __repr__(self): 311 return self.render() 312 313 314 def __init__(self, status, test_name, reason='', begin_time_str=None, 315 end_time_str=None, job_id=None, owner=None, hostname=None, 316 job_name='', subdir=None): 317 """ 318 Constructor 319 320 @param status: status code, e.g. 'INFO', 'FAIL', etc. 321 @param test_name: the name of the test whose result this is. 322 @param reason: message explaining failure, if any; Optional. 323 @param begin_time_str: when test started (in time_utils.TIME_FMT); 324 now() if None or 'None'. 325 @param end_time_str: when test finished (in time_utils.TIME_FMT); 326 now() if None or 'None'. 327 @param job_id: the ID of the job that generated this Status. 328 @param owner: the owner of the job that generated this Status. 329 @param hostname: The name of the host the test that generated this 330 result ran on. 331 @param job_name: The job name; Contains the test name with/without the 332 experimental prefix, the tag and the build. 333 @param subdir: The result directory of the test. It will be recorded 334 as the subdir in the status.log file. 335 """ 336 self._status = status 337 self._test_name = test_name 338 self._reason = reason 339 self._id = job_id 340 self._owner = owner 341 self._hostname = hostname 342 self._job_name = job_name 343 self._subdir = subdir 344 # Autoserv drops a keyval of the started time which eventually makes its 345 # way here. Therefore, if we have a starting time, we may assume that 346 # the test reached Running and actually began execution on a drone. 347 self._test_executed = begin_time_str and begin_time_str != 'None' 348 349 if begin_time_str and begin_time_str != 'None': 350 self._begin_timestamp = int(time.mktime( 351 datetime.datetime.strptime( 352 begin_time_str, time_utils.TIME_FMT).timetuple())) 353 else: 354 self._begin_timestamp = int(time.time()) 355 356 if end_time_str and end_time_str != 'None': 357 self._end_timestamp = int(time.mktime( 358 datetime.datetime.strptime( 359 end_time_str, time_utils.TIME_FMT).timetuple())) 360 else: 361 self._end_timestamp = int(time.time()) 362 363 364 def is_good(self): 365 """ Returns true if status is good. """ 366 return self._status == 'GOOD' 367 368 369 def is_warn(self): 370 """ Returns true if status is warn. """ 371 return self._status == 'WARN' 372 373 374 def is_testna(self): 375 """ Returns true if status is TEST_NA """ 376 return self._status == 'TEST_NA' 377 378 379 def is_worse_than(self, candidate): 380 """ 381 Return whether |self| represents a "worse" failure than |candidate|. 382 383 "Worse" is defined the same as it is for log message purposes in 384 common_lib/log.py. We also consider status with a specific error 385 message to represent a "worse" failure than one without. 386 387 @param candidate: a Status instance to compare to this one. 388 @return True if |self| is "worse" than |candidate|. 389 """ 390 if self._status != candidate._status: 391 return (log.job_statuses.index(self._status) < 392 log.job_statuses.index(candidate._status)) 393 # else, if the statuses are the same... 394 if self._reason and not candidate._reason: 395 return True 396 return False 397 398 399 def record_start(self, record_entry): 400 """ 401 Use record_entry to log message about start of test. 402 403 @param record_entry: a callable to use for logging. 404 prototype: 405 record_entry(base_job.status_log_entry) 406 """ 407 log_entry = Status.sle('START', self._subdir, 408 self._test_name, '', 409 None, self._begin_timestamp) 410 record_entry(log_entry, log_in_subdir=False) 411 412 413 def record_result(self, record_entry): 414 """ 415 Use record_entry to log message about result of test. 416 417 @param record_entry: a callable to use for logging. 418 prototype: 419 record_entry(base_job.status_log_entry) 420 """ 421 log_entry = Status.sle(self._status, self._subdir, 422 self._test_name, self._reason, None, 423 self._end_timestamp) 424 record_entry(log_entry, log_in_subdir=False) 425 426 427 def record_end(self, record_entry): 428 """ 429 Use record_entry to log message about end of test. 430 431 @param record_entry: a callable to use for logging. 432 prototype: 433 record_entry(base_job.status_log_entry) 434 """ 435 log_entry = Status.sle('END %s' % self._status, self._subdir, 436 self._test_name, '', None, self._end_timestamp) 437 record_entry(log_entry, log_in_subdir=False) 438 439 440 def record_all(self, record_entry): 441 """ 442 Use record_entry to log all messages about test results. 443 444 @param record_entry: a callable to use for logging. 445 prototype: 446 record_entry(base_job.status_log_entry) 447 """ 448 self.record_start(record_entry) 449 self.record_result(record_entry) 450 self.record_end(record_entry) 451 452 453 def override_status(self, override): 454 """ 455 Override the _status field of this Status. 456 457 @param override: value with which to override _status. 458 """ 459 self._status = override 460 461 462 @property 463 def test_name(self): 464 """ Name of the test this status corresponds to. """ 465 return self._test_name 466 467 468 @test_name.setter 469 def test_name(self, value): 470 """ 471 Test name setter. 472 473 @param value: The test name. 474 """ 475 self._test_name = value 476 477 478 @property 479 def id(self): 480 """ Id of the job that corresponds to this status. """ 481 return self._id 482 483 484 @property 485 def owner(self): 486 """ Owner of the job that corresponds to this status. """ 487 return self._owner 488 489 490 @property 491 def hostname(self): 492 """ Host the job corresponding to this status ran on. """ 493 return self._hostname 494 495 496 @property 497 def reason(self): 498 """ Reason the job corresponding to this status failed. """ 499 return self._reason 500 501 502 @property 503 def test_executed(self): 504 """ If the test reached running an autoserv instance or not. """ 505 return self._test_executed 506 507 @property 508 def subdir(self): 509 """Subdir of test this status corresponds to.""" 510 return self._subdir 511