1#!/usr/bin/python2 -u 2 3import collections 4import errno 5import fcntl 6import json 7import optparse 8import os 9import socket 10import subprocess 11import sys 12import time 13import traceback 14 15import common 16from autotest_lib.client.bin.result_tools import utils as result_utils 17from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib 18from autotest_lib.client.bin.result_tools import runner as result_runner 19from autotest_lib.client.common_lib import control_data 20from autotest_lib.client.common_lib import global_config 21from autotest_lib.client.common_lib import mail, pidfile 22from autotest_lib.client.common_lib import utils 23from autotest_lib.frontend import setup_django_environment 24from autotest_lib.frontend.tko import models as tko_models 25from autotest_lib.server import site_utils 26from autotest_lib.server.cros.dynamic_suite import constants 27from autotest_lib.site_utils.sponge_lib import sponge_utils 28from autotest_lib.tko import db as tko_db, utils as tko_utils 29from autotest_lib.tko import models, parser_lib 30from autotest_lib.tko.perf_upload import perf_uploader 31from autotest_lib.utils.side_effects import config_loader 32 33try: 34 from chromite.lib import metrics 35except ImportError: 36 metrics = utils.metrics_mock 37 38 39_ParseOptions = collections.namedtuple( 40 'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report', 41 'datastore_creds', 'export_to_gcloud_path', 42 'disable_perf_upload']) 43 44_HARDCODED_CONTROL_FILE_NAMES = ( 45 # client side test control, as saved in old Autotest paths. 46 'control', 47 # server side test control, as saved in old Autotest paths. 48 'control.srv', 49 # All control files, as saved in skylab. 50 'control.from_control_name', 51) 52 53 54def parse_args(): 55 """Parse args.""" 56 # build up our options parser and parse sys.argv 57 parser = optparse.OptionParser() 58 parser.add_option("-m", help="Send mail for FAILED tests", 59 dest="mailit", action="store_true") 60 parser.add_option("-r", help="Reparse the results of a job", 61 dest="reparse", action="store_true") 62 parser.add_option("-o", help="Parse a single results directory", 63 dest="singledir", action="store_true") 64 parser.add_option("-l", help=("Levels of subdirectories to include " 65 "in the job name"), 66 type="int", dest="level", default=1) 67 parser.add_option("-n", help="No blocking on an existing parse", 68 dest="noblock", action="store_true") 69 parser.add_option("-s", help="Database server hostname", 70 dest="db_host", action="store") 71 parser.add_option("-u", help="Database username", dest="db_user", 72 action="store") 73 parser.add_option("-p", help="Database password", dest="db_pass", 74 action="store") 75 parser.add_option("-d", help="Database name", dest="db_name", 76 action="store") 77 parser.add_option("--dry-run", help="Do not actually commit any results.", 78 dest="dry_run", action="store_true", default=False) 79 parser.add_option( 80 "--detach", action="store_true", 81 help="Detach parsing process from the caller process. Used by " 82 "monitor_db to safely restart without affecting parsing.", 83 default=False) 84 parser.add_option("--write-pidfile", 85 help="write pidfile (.parser_execute)", 86 dest="write_pidfile", action="store_true", 87 default=False) 88 parser.add_option("--record-duration", 89 help="[DEPRECATED] Record timing to metadata db", 90 dest="record_duration", action="store_true", 91 default=False) 92 parser.add_option("--suite-report", 93 help=("Allows parsing job to attempt to create a suite " 94 "timeline report, if it detects that the job being " 95 "parsed is a suite job."), 96 dest="suite_report", action="store_true", 97 default=False) 98 parser.add_option("--datastore-creds", 99 help=("The path to gcloud datastore credentials file, " 100 "which will be used to upload suite timeline " 101 "report to gcloud. If not specified, the one " 102 "defined in shadow_config will be used."), 103 dest="datastore_creds", action="store", default=None) 104 parser.add_option("--export-to-gcloud-path", 105 help=("The path to export_to_gcloud script. Please find " 106 "chromite path on your server. The script is under " 107 "chromite/bin/."), 108 dest="export_to_gcloud_path", action="store", 109 default=None) 110 parser.add_option("--disable-perf-upload", 111 help=("Do not upload perf results to chrome perf."), 112 dest="disable_perf_upload", action="store_true", 113 default=False) 114 options, args = parser.parse_args() 115 116 # we need a results directory 117 if len(args) == 0: 118 tko_utils.dprint("ERROR: at least one results directory must " 119 "be provided") 120 parser.print_help() 121 sys.exit(1) 122 123 if not options.datastore_creds: 124 gcloud_creds = global_config.global_config.get_config_value( 125 'GCLOUD', 'cidb_datastore_writer_creds', default=None) 126 options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds) 127 if gcloud_creds else None) 128 129 if not options.export_to_gcloud_path: 130 export_script = 'chromiumos/chromite/bin/export_to_gcloud' 131 # If it is a lab server, the script is under ~chromeos-test/ 132 if os.path.exists(os.path.expanduser('~chromeos-test/%s' % 133 export_script)): 134 path = os.path.expanduser('~chromeos-test/%s' % export_script) 135 # If it is a local workstation, it is probably under ~/ 136 elif os.path.exists(os.path.expanduser('~/%s' % export_script)): 137 path = os.path.expanduser('~/%s' % export_script) 138 # If it is not found anywhere, the default will be set to None. 139 else: 140 path = None 141 options.export_to_gcloud_path = path 142 143 # pass the options back 144 return options, args 145 146 147def format_failure_message(jobname, kernel, testname, status, reason): 148 """Format failure message with the given information. 149 150 @param jobname: String representing the job name. 151 @param kernel: String representing the kernel. 152 @param testname: String representing the test name. 153 @param status: String representing the test status. 154 @param reason: String representing the reason. 155 156 @return: Failure message as a string. 157 """ 158 format_string = "%-12s %-20s %-12s %-10s %s" 159 return format_string % (jobname, kernel, testname, status, reason) 160 161 162def mailfailure(jobname, job, message): 163 """Send an email about the failure. 164 165 @param jobname: String representing the job name. 166 @param job: A job object. 167 @param message: The message to mail. 168 """ 169 message_lines = [""] 170 message_lines.append("The following tests FAILED for this job") 171 message_lines.append("http://%s/results/%s" % 172 (socket.gethostname(), jobname)) 173 message_lines.append("") 174 message_lines.append(format_failure_message("Job name", "Kernel", 175 "Test name", "FAIL/WARN", 176 "Failure reason")) 177 message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8, 178 "=" * 8, "=" * 14)) 179 message_header = "\n".join(message_lines) 180 181 subject = "AUTOTEST: FAILED tests from job %s" % jobname 182 mail.send("", job.user, "", subject, message_header + message) 183 184 185def _invalidate_original_tests(orig_job_idx, retry_job_idx): 186 """Retry tests invalidates original tests. 187 188 Whenever a retry job is complete, we want to invalidate the original 189 job's test results, such that the consumers of the tko database 190 (e.g. tko frontend, wmatrix) could figure out which results are the latest. 191 192 When a retry job is parsed, we retrieve the original job's afe_job_id 193 from the retry job's keyvals, which is then converted to tko job_idx and 194 passed into this method as |orig_job_idx|. 195 196 In this method, we are going to invalidate the rows in tko_tests that are 197 associated with the original job by flipping their 'invalid' bit to True. 198 In addition, in tko_tests, we also maintain a pointer from the retry results 199 to the original results, so that later we can always know which rows in 200 tko_tests are retries and which are the corresponding original results. 201 This is done by setting the field 'invalidates_test_idx' of the tests 202 associated with the retry job. 203 204 For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after 205 this method is run, their tko_tests rows will look like: 206 __________________________________________________________________________ 207 test_idx| job_idx | test | ... | invalid | invalidates_test_idx 208 10 | 105 | dummy_Fail.Error| ... | 1 | NULL 209 11 | 105 | dummy_Fail.Fail | ... | 1 | NULL 210 ... 211 20 | 108 | dummy_Fail.Error| ... | 0 | 10 212 21 | 108 | dummy_Fail.Fail | ... | 0 | 11 213 __________________________________________________________________________ 214 Note the invalid bits of the rows for Job(job_idx=105) are set to '1'. 215 And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108) 216 are set to 10 and 11 (the test_idx of the rows for the original job). 217 218 @param orig_job_idx: An integer representing the original job's 219 tko job_idx. Tests associated with this job will 220 be marked as 'invalid'. 221 @param retry_job_idx: An integer representing the retry job's 222 tko job_idx. The field 'invalidates_test_idx' 223 of the tests associated with this job will be updated. 224 225 """ 226 msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx) 227 if not orig_job_idx or not retry_job_idx: 228 tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg) 229 # Using django models here makes things easier, but make sure that 230 # before this method is called, all other relevant transactions have been 231 # committed to avoid race condition. In the long run, we might consider 232 # to make the rest of parser use django models. 233 orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx) 234 retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx) 235 236 # Invalidate original tests. 237 orig_tests.update(invalid=True) 238 239 # Maintain a dictionary that maps (test, subdir) to original tests. 240 # Note that within the scope of a job, (test, subdir) uniquelly 241 # identifies a test run, but 'test' does not. 242 # In a control file, one could run the same test with different 243 # 'subdir_tag', for example, 244 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1') 245 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2') 246 # In tko, we will get 247 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1') 248 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2') 249 invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test 250 for orig_test in orig_tests} 251 for retry in retry_tests: 252 # It is possible that (retry.test, retry.subdir) doesn't exist 253 # in invalidated_tests. This could happen when the original job 254 # didn't run some of its tests. For example, a dut goes offline 255 # since the beginning of the job, in which case invalidated_tests 256 # will only have one entry for 'SERVER_JOB'. 257 orig_test = invalidated_tests.get((retry.test, retry.subdir), None) 258 if orig_test: 259 retry.invalidates_test = orig_test 260 retry.save() 261 tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg) 262 263 264def _throttle_result_size(path): 265 """Limit the total size of test results for the given path. 266 267 @param path: Path of the result directory. 268 """ 269 if not result_runner.ENABLE_RESULT_THROTTLING: 270 tko_utils.dprint( 271 'Result throttling is not enabled. Skipping throttling %s' % 272 path) 273 return 274 275 max_result_size_KB = _max_result_size_from_control(path) 276 if max_result_size_KB is None: 277 max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB 278 279 try: 280 result_utils.execute(path, max_result_size_KB) 281 except: 282 tko_utils.dprint( 283 'Failed to throttle result size of %s.\nDetails %s' % 284 (path, traceback.format_exc())) 285 286 287def _max_result_size_from_control(path): 288 """Gets the max result size set in a control file, if any. 289 290 If not overrides is found, returns None. 291 """ 292 for control_file in _HARDCODED_CONTROL_FILE_NAMES: 293 control = os.path.join(path, control_file) 294 if not os.path.exists(control): 295 continue 296 297 try: 298 max_result_size_KB = control_data.parse_control( 299 control, raise_warnings=False).max_result_size_KB 300 if max_result_size_KB != control_data.DEFAULT_MAX_RESULT_SIZE_KB: 301 return max_result_size_KB 302 except IOError as e: 303 tko_utils.dprint( 304 'Failed to access %s. Error: %s\nDetails %s' % 305 (control, e, traceback.format_exc())) 306 except control_data.ControlVariableException as e: 307 tko_utils.dprint( 308 'Failed to parse %s. Error: %s\nDetails %s' % 309 (control, e, traceback.format_exc())) 310 return None 311 312 313def export_tko_job_to_file(job, jobname, filename): 314 """Exports the tko job to disk file. 315 316 @param job: database object. 317 @param jobname: the job name as string. 318 @param filename: The path to the results to be parsed. 319 """ 320 from autotest_lib.tko import job_serializer 321 322 serializer = job_serializer.JobSerializer() 323 serializer.serialize_to_binary(job, jobname, filename) 324 325 326def parse_one(db, pid_file_manager, jobname, path, parse_options): 327 """Parse a single job. Optionally send email on failure. 328 329 @param db: database object. 330 @param pid_file_manager: pidfile.PidFileManager object. 331 @param jobname: the tag used to search for existing job in db, 332 e.g. '1234-chromeos-test/host1' 333 @param path: The path to the results to be parsed. 334 @param parse_options: _ParseOptions instance. 335 """ 336 reparse = parse_options.reparse 337 mail_on_failure = parse_options.mail_on_failure 338 dry_run = parse_options.dry_run 339 suite_report = parse_options.suite_report 340 datastore_creds = parse_options.datastore_creds 341 export_to_gcloud_path = parse_options.export_to_gcloud_path 342 343 tko_utils.dprint("\nScanning %s (%s)" % (jobname, path)) 344 old_job_idx = db.find_job(jobname) 345 if old_job_idx is not None and not reparse: 346 tko_utils.dprint("! Job is already parsed, done") 347 return 348 349 # look up the status version 350 job_keyval = models.job.read_keyval(path) 351 status_version = job_keyval.get("status_version", 0) 352 353 parser = parser_lib.parser(status_version) 354 job = parser.make_job(path) 355 tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname)) 356 status_log_path = _find_status_log_path(path) 357 if not status_log_path: 358 tko_utils.dprint("! Unable to parse job, no status file") 359 return 360 _parse_status_log(parser, job, status_log_path) 361 362 if old_job_idx is not None: 363 job.job_idx = old_job_idx 364 unmatched_tests = _match_existing_tests(db, job) 365 if not dry_run: 366 _delete_tests_from_db(db, unmatched_tests) 367 368 job.afe_job_id = tko_utils.get_afe_job_id(jobname) 369 job.skylab_task_id = tko_utils.get_skylab_task_id(jobname) 370 job.afe_parent_job_id = job_keyval.get(constants.PARENT_JOB_ID) 371 job.skylab_parent_task_id = job_keyval.get(constants.PARENT_JOB_ID) 372 job.build = None 373 job.board = None 374 job.build_version = None 375 job.suite = None 376 if job.label: 377 label_info = site_utils.parse_job_name(job.label) 378 if label_info: 379 job.build = label_info.get('build', None) 380 job.build_version = label_info.get('build_version', None) 381 job.board = label_info.get('board', None) 382 job.suite = label_info.get('suite', None) 383 384 if 'suite' in job.keyval_dict: 385 job.suite = job.keyval_dict['suite'] 386 387 result_utils_lib.LOG = tko_utils.dprint 388 _throttle_result_size(path) 389 390 # Record test result size to job_keyvals 391 start_time = time.time() 392 result_size_info = site_utils.collect_result_sizes( 393 path, log=tko_utils.dprint) 394 tko_utils.dprint('Finished collecting result sizes after %s seconds' % 395 (time.time()-start_time)) 396 job.keyval_dict.update(result_size_info.__dict__) 397 398 # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it. 399 400 # check for failures 401 message_lines = [""] 402 job_successful = True 403 for test in job.tests: 404 if not test.subdir: 405 continue 406 tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s" 407 % (test.testname, test.subdir, test.status, 408 test.reason)) 409 if test.status not in ('GOOD', 'WARN'): 410 job_successful = False 411 pid_file_manager.num_tests_failed += 1 412 message_lines.append(format_failure_message( 413 jobname, test.kernel.base, test.subdir, 414 test.status, test.reason)) 415 416 message = "\n".join(message_lines) 417 418 if not dry_run: 419 # send out a email report of failure 420 if len(message) > 2 and mail_on_failure: 421 tko_utils.dprint("Sending email report of failure on %s to %s" 422 % (jobname, job.user)) 423 mailfailure(jobname, job, message) 424 425 # Upload perf values to the perf dashboard, if applicable. 426 if parse_options.disable_perf_upload: 427 tko_utils.dprint("Skipping results upload to chrome perf as it is " 428 "disabled by config") 429 else: 430 for test in job.tests: 431 perf_uploader.upload_test(job, test, jobname) 432 433 # Upload job details to Sponge. 434 sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint) 435 if sponge_url: 436 job.keyval_dict['sponge_url'] = sponge_url 437 438 _write_job_to_db(db, jobname, job) 439 440 # Verify the job data is written to the database. 441 if job.tests: 442 tests_in_db = db.find_tests(job.job_idx) 443 tests_in_db_count = len(tests_in_db) if tests_in_db else 0 444 if tests_in_db_count != len(job.tests): 445 tko_utils.dprint( 446 'Failed to find enough tests for job_idx: %d. The ' 447 'job should have %d tests, only found %d tests.' % 448 (job.job_idx, len(job.tests), tests_in_db_count)) 449 metrics.Counter( 450 'chromeos/autotest/result/db_save_failure', 451 description='The number of times parse failed to ' 452 'save job to TKO database.').increment() 453 454 # Although the cursor has autocommit, we still need to force it to 455 # commit existing changes before we can use django models, otherwise 456 # it will go into deadlock when django models try to start a new 457 # trasaction while the current one has not finished yet. 458 db.commit() 459 460 # Handle retry job. 461 orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID, 462 None) 463 if orig_afe_job_id: 464 orig_job_idx = tko_models.Job.objects.get( 465 afe_job_id=orig_afe_job_id).job_idx 466 _invalidate_original_tests(orig_job_idx, job.job_idx) 467 468 # Serializing job into a binary file 469 export_tko_to_file = global_config.global_config.get_config_value( 470 'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False) 471 472 binary_file_name = os.path.join(path, "job.serialize") 473 if export_tko_to_file: 474 export_tko_job_to_file(job, jobname, binary_file_name) 475 476 if not dry_run: 477 db.commit() 478 479 # Generate a suite report. 480 # Check whether this is a suite job, a suite job will be a hostless job, its 481 # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be 482 # NULL. Only generate timeline report when datastore_parent_key is given. 483 datastore_parent_key = job_keyval.get('datastore_parent_key', None) 484 provision_job_id = job_keyval.get('provision_job_id', None) 485 if (suite_report and jobname.endswith('/hostless') 486 and job.suite and datastore_parent_key): 487 tko_utils.dprint('Start dumping suite timing report...') 488 timing_log = os.path.join(path, 'suite_timing.log') 489 dump_cmd = ("%s/site_utils/dump_suite_report.py %s " 490 "--output='%s' --debug" % 491 (common.autotest_dir, job.afe_job_id, 492 timing_log)) 493 494 if provision_job_id is not None: 495 dump_cmd += " --provision_job_id=%d" % int(provision_job_id) 496 497 subprocess.check_output(dump_cmd, shell=True) 498 tko_utils.dprint('Successfully finish dumping suite timing report') 499 500 if (datastore_creds and export_to_gcloud_path 501 and os.path.exists(export_to_gcloud_path)): 502 upload_cmd = [export_to_gcloud_path, datastore_creds, 503 timing_log, '--parent_key', 504 datastore_parent_key] 505 tko_utils.dprint('Start exporting timeline report to gcloud') 506 subprocess.check_output(upload_cmd) 507 tko_utils.dprint('Successfully export timeline report to ' 508 'gcloud') 509 else: 510 tko_utils.dprint('DEBUG: skip exporting suite timeline to ' 511 'gcloud, because either gcloud creds or ' 512 'export_to_gcloud script is not found.') 513 514 # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of 515 # the function, so any failure, e.g., db connection error, will stop 516 # gs_offloader_instructions being updated, and logs can be uploaded for 517 # troubleshooting. 518 if job_successful: 519 # Check if we should not offload this test's results. 520 if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False): 521 # Update the gs_offloader_instructions json file. 522 gs_instructions_file = os.path.join( 523 path, constants.GS_OFFLOADER_INSTRUCTIONS) 524 gs_offloader_instructions = {} 525 if os.path.exists(gs_instructions_file): 526 with open(gs_instructions_file, 'r') as f: 527 gs_offloader_instructions = json.load(f) 528 529 gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True 530 with open(gs_instructions_file, 'w') as f: 531 json.dump(gs_offloader_instructions, f) 532 533 534def _write_job_to_db(db, jobname, job): 535 """Write all TKO data associated with a job to DB. 536 537 This updates the job object as a side effect. 538 539 @param db: tko.db.db_sql object. 540 @param jobname: Name of the job to write. 541 @param job: tko.models.job object. 542 """ 543 db.insert_or_update_machine(job) 544 db.insert_job(jobname, job) 545 db.insert_or_update_task_reference( 546 job, 547 'skylab' if tko_utils.is_skylab_task(jobname) else 'afe', 548 ) 549 db.update_job_keyvals(job) 550 for test in job.tests: 551 db.insert_test(job, test) 552 553 554def _find_status_log_path(path): 555 if os.path.exists(os.path.join(path, "status.log")): 556 return os.path.join(path, "status.log") 557 if os.path.exists(os.path.join(path, "status")): 558 return os.path.join(path, "status") 559 return "" 560 561 562def _parse_status_log(parser, job, status_log_path): 563 status_lines = open(status_log_path).readlines() 564 parser.start(job) 565 tests = parser.end(status_lines) 566 567 # parser.end can return the same object multiple times, so filter out dups 568 job.tests = [] 569 already_added = set() 570 for test in tests: 571 if test not in already_added: 572 already_added.add(test) 573 job.tests.append(test) 574 575 576def _match_existing_tests(db, job): 577 """Find entries in the DB corresponding to the job's tests, update job. 578 579 @return: Any unmatched tests in the db. 580 """ 581 old_job_idx = job.job_idx 582 raw_old_tests = db.select("test_idx,subdir,test", "tko_tests", 583 {"job_idx": old_job_idx}) 584 if raw_old_tests: 585 old_tests = dict(((test, subdir), test_idx) 586 for test_idx, subdir, test in raw_old_tests) 587 else: 588 old_tests = {} 589 590 for test in job.tests: 591 test_idx = old_tests.pop((test.testname, test.subdir), None) 592 if test_idx is not None: 593 test.test_idx = test_idx 594 else: 595 tko_utils.dprint("! Reparse returned new test " 596 "testname=%r subdir=%r" % 597 (test.testname, test.subdir)) 598 return old_tests 599 600 601def _delete_tests_from_db(db, tests): 602 for test_idx in tests.itervalues(): 603 where = {'test_idx' : test_idx} 604 db.delete('tko_iteration_result', where) 605 db.delete('tko_iteration_perf_value', where) 606 db.delete('tko_iteration_attributes', where) 607 db.delete('tko_test_attributes', where) 608 db.delete('tko_test_labels_tests', {'test_id': test_idx}) 609 db.delete('tko_tests', where) 610 611 612def _get_job_subdirs(path): 613 """ 614 Returns a list of job subdirectories at path. Returns None if the test 615 is itself a job directory. Does not recurse into the subdirs. 616 """ 617 # if there's a .machines file, use it to get the subdirs 618 machine_list = os.path.join(path, ".machines") 619 if os.path.exists(machine_list): 620 subdirs = set(line.strip() for line in file(machine_list)) 621 existing_subdirs = set(subdir for subdir in subdirs 622 if os.path.exists(os.path.join(path, subdir))) 623 if len(existing_subdirs) != 0: 624 return existing_subdirs 625 626 # if this dir contains ONLY subdirectories, return them 627 contents = set(os.listdir(path)) 628 contents.discard(".parse.lock") 629 subdirs = set(sub for sub in contents if 630 os.path.isdir(os.path.join(path, sub))) 631 if len(contents) == len(subdirs) != 0: 632 return subdirs 633 634 # this is a job directory, or something else we don't understand 635 return None 636 637 638def parse_leaf_path(db, pid_file_manager, path, level, parse_options): 639 """Parse a leaf path. 640 641 @param db: database handle. 642 @param pid_file_manager: pidfile.PidFileManager object. 643 @param path: The path to the results to be parsed. 644 @param level: Integer, level of subdirectories to include in the job name. 645 @param parse_options: _ParseOptions instance. 646 647 @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1' 648 """ 649 job_elements = path.split("/")[-level:] 650 jobname = "/".join(job_elements) 651 db.run_with_retry(parse_one, db, pid_file_manager, jobname, path, 652 parse_options) 653 return jobname 654 655 656def parse_path(db, pid_file_manager, path, level, parse_options): 657 """Parse a path 658 659 @param db: database handle. 660 @param pid_file_manager: pidfile.PidFileManager object. 661 @param path: The path to the results to be parsed. 662 @param level: Integer, level of subdirectories to include in the job name. 663 @param parse_options: _ParseOptions instance. 664 665 @returns: A set of job names of the parsed jobs. 666 set(['123-chromeos-test/host1', '123-chromeos-test/host2']) 667 """ 668 processed_jobs = set() 669 job_subdirs = _get_job_subdirs(path) 670 if job_subdirs is not None: 671 # parse status.log in current directory, if it exists. multi-machine 672 # synchronous server side tests record output in this directory. without 673 # this check, we do not parse these results. 674 if os.path.exists(os.path.join(path, 'status.log')): 675 new_job = parse_leaf_path(db, pid_file_manager, path, level, 676 parse_options) 677 processed_jobs.add(new_job) 678 # multi-machine job 679 for subdir in job_subdirs: 680 jobpath = os.path.join(path, subdir) 681 new_jobs = parse_path(db, pid_file_manager, jobpath, level + 1, 682 parse_options) 683 processed_jobs.update(new_jobs) 684 else: 685 # single machine job 686 new_job = parse_leaf_path(db, pid_file_manager, path, level, 687 parse_options) 688 processed_jobs.add(new_job) 689 return processed_jobs 690 691 692def _detach_from_parent_process(): 693 """Allow reparenting the parse process away from caller. 694 695 When monitor_db is run via upstart, restarting the job sends SIGTERM to 696 the whole process group. This makes us immune from that. 697 """ 698 if os.getpid() != os.getpgid(0): 699 os.setsid() 700 701 702def main(): 703 """tko_parse entry point.""" 704 options, args = parse_args() 705 706 # We are obliged to use indirect=False, not use the SetupTsMonGlobalState 707 # context manager, and add a manual flush, because tko/parse is expected to 708 # be a very short lived (<1 min) script when working effectively, and we 709 # can't afford to either a) wait for up to 1min for metrics to flush at the 710 # end or b) drop metrics that were sent within the last minute of execution. 711 site_utils.SetupTsMonGlobalState('tko_parse', indirect=False, 712 short_lived=True) 713 try: 714 with metrics.SuccessCounter('chromeos/autotest/tko_parse/runs'): 715 _main_with_options(options, args) 716 finally: 717 metrics.Flush() 718 719 720def _main_with_options(options, args): 721 """Entry point with options parsed and metrics already set up.""" 722 # Record the processed jobs so that 723 # we can send the duration of parsing to metadata db. 724 processed_jobs = set() 725 726 if options.detach: 727 _detach_from_parent_process() 728 729 results_dir = os.path.abspath(args[0]) 730 assert os.path.exists(results_dir) 731 732 _update_db_config_from_json(options, results_dir) 733 734 parse_options = _ParseOptions(options.reparse, options.mailit, 735 options.dry_run, options.suite_report, 736 options.datastore_creds, 737 options.export_to_gcloud_path, 738 options.disable_perf_upload) 739 740 pid_file_manager = pidfile.PidFileManager("parser", results_dir) 741 742 if options.write_pidfile: 743 pid_file_manager.open_file() 744 745 try: 746 # build up the list of job dirs to parse 747 if options.singledir: 748 jobs_list = [results_dir] 749 else: 750 jobs_list = [os.path.join(results_dir, subdir) 751 for subdir in os.listdir(results_dir)] 752 753 # build up the database 754 db = tko_db.db(autocommit=False, host=options.db_host, 755 user=options.db_user, password=options.db_pass, 756 database=options.db_name) 757 758 # parse all the jobs 759 for path in jobs_list: 760 lockfile = open(os.path.join(path, ".parse.lock"), "w") 761 flags = fcntl.LOCK_EX 762 if options.noblock: 763 flags |= fcntl.LOCK_NB 764 try: 765 fcntl.flock(lockfile, flags) 766 except IOError, e: 767 # lock is not available and nonblock has been requested 768 if e.errno == errno.EWOULDBLOCK: 769 lockfile.close() 770 continue 771 else: 772 raise # something unexpected happened 773 try: 774 new_jobs = parse_path(db, pid_file_manager, path, options.level, 775 parse_options) 776 processed_jobs.update(new_jobs) 777 778 finally: 779 fcntl.flock(lockfile, fcntl.LOCK_UN) 780 lockfile.close() 781 782 except Exception as e: 783 pid_file_manager.close_file(1) 784 raise 785 else: 786 pid_file_manager.close_file(0) 787 788 789def _update_db_config_from_json(options, test_results_dir): 790 """Uptade DB config options using a side_effects_config.json file. 791 792 @param options: parsed args to be updated. 793 @param test_results_dir: path to test results dir. 794 795 @raises: json_format.ParseError if the file is not a valid JSON. 796 ValueError if the JSON config is incomplete. 797 OSError if some files from the JSON config are missing. 798 """ 799 # results_dir passed to tko/parse is a subdir of the root results dir 800 config_dir = os.path.join(test_results_dir, os.pardir) 801 tko_utils.dprint("Attempting to read side_effects.Config from %s" % 802 config_dir) 803 config = config_loader.load(config_dir) 804 805 if config: 806 tko_utils.dprint("Validating side_effects.Config.tko") 807 config_loader.validate_tko(config) 808 809 tko_utils.dprint("Using the following DB config params from " 810 "side_effects.Config.tko:\n%s" % config.tko) 811 options.db_host = config.tko.proxy_socket 812 options.db_user = config.tko.mysql_user 813 814 with open(config.tko.mysql_password_file, 'r') as f: 815 options.db_pass = f.read().rstrip('\n') 816 817 options.disable_perf_upload = not config.chrome_perf.enabled 818 else: 819 tko_utils.dprint("No side_effects.Config found in %s - " 820 "defaulting to DB config values from shadow config" % config_dir) 821 822 823if __name__ == "__main__": 824 main() 825