1#!/usr/bin/python -u 2 3import collections 4import errno 5import fcntl 6import json 7import optparse 8import os 9import socket 10import subprocess 11import sys 12import time 13import traceback 14 15import common 16from autotest_lib.client.bin.result_tools import utils as result_utils 17from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib 18from autotest_lib.client.bin.result_tools import runner as result_runner 19from autotest_lib.client.common_lib import control_data 20from autotest_lib.client.common_lib import global_config 21from autotest_lib.client.common_lib import mail, pidfile 22from autotest_lib.client.common_lib import utils 23from autotest_lib.frontend import setup_django_environment 24from autotest_lib.frontend.tko import models as tko_models 25from autotest_lib.server import site_utils 26from autotest_lib.server.cros.dynamic_suite import constants 27from autotest_lib.site_utils.sponge_lib import sponge_utils 28from autotest_lib.tko import db as tko_db, utils as tko_utils 29from autotest_lib.tko import models, parser_lib 30from autotest_lib.tko.perf_upload import perf_uploader 31 32try: 33 from chromite.lib import metrics 34except ImportError: 35 metrics = utils.metrics_mock 36 37 38_ParseOptions = collections.namedtuple( 39 'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report', 40 'datastore_creds', 'export_to_gcloud_path']) 41 42_HARDCODED_CONTROL_FILE_NAMES = ( 43 # client side test control, as saved in old Autotest paths. 44 'control', 45 # server side test control, as saved in old Autotest paths. 46 'control.srv', 47 # All control files, as saved in skylab. 48 'control.from_control_name', 49) 50 51 52def parse_args(): 53 """Parse args.""" 54 # build up our options parser and parse sys.argv 55 parser = optparse.OptionParser() 56 parser.add_option("-m", help="Send mail for FAILED tests", 57 dest="mailit", action="store_true") 58 parser.add_option("-r", help="Reparse the results of a job", 59 dest="reparse", action="store_true") 60 parser.add_option("-o", help="Parse a single results directory", 61 dest="singledir", action="store_true") 62 parser.add_option("-l", help=("Levels of subdirectories to include " 63 "in the job name"), 64 type="int", dest="level", default=1) 65 parser.add_option("-n", help="No blocking on an existing parse", 66 dest="noblock", action="store_true") 67 parser.add_option("-s", help="Database server hostname", 68 dest="db_host", action="store") 69 parser.add_option("-u", help="Database username", dest="db_user", 70 action="store") 71 parser.add_option("-p", help="Database password", dest="db_pass", 72 action="store") 73 parser.add_option("-d", help="Database name", dest="db_name", 74 action="store") 75 parser.add_option("--dry-run", help="Do not actually commit any results.", 76 dest="dry_run", action="store_true", default=False) 77 parser.add_option( 78 "--detach", action="store_true", 79 help="Detach parsing process from the caller process. Used by " 80 "monitor_db to safely restart without affecting parsing.", 81 default=False) 82 parser.add_option("--write-pidfile", 83 help="write pidfile (.parser_execute)", 84 dest="write_pidfile", action="store_true", 85 default=False) 86 parser.add_option("--record-duration", 87 help="[DEPRECATED] Record timing to metadata db", 88 dest="record_duration", action="store_true", 89 default=False) 90 parser.add_option("--suite-report", 91 help=("Allows parsing job to attempt to create a suite " 92 "timeline report, if it detects that the job being " 93 "parsed is a suite job."), 94 dest="suite_report", action="store_true", 95 default=False) 96 parser.add_option("--datastore-creds", 97 help=("The path to gcloud datastore credentials file, " 98 "which will be used to upload suite timeline " 99 "report to gcloud. If not specified, the one " 100 "defined in shadow_config will be used."), 101 dest="datastore_creds", action="store", default=None) 102 parser.add_option("--export-to-gcloud-path", 103 help=("The path to export_to_gcloud script. Please find " 104 "chromite path on your server. The script is under " 105 "chromite/bin/."), 106 dest="export_to_gcloud_path", action="store", 107 default=None) 108 options, args = parser.parse_args() 109 110 # we need a results directory 111 if len(args) == 0: 112 tko_utils.dprint("ERROR: at least one results directory must " 113 "be provided") 114 parser.print_help() 115 sys.exit(1) 116 117 if not options.datastore_creds: 118 gcloud_creds = global_config.global_config.get_config_value( 119 'GCLOUD', 'cidb_datastore_writer_creds', default=None) 120 options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds) 121 if gcloud_creds else None) 122 123 if not options.export_to_gcloud_path: 124 export_script = 'chromiumos/chromite/bin/export_to_gcloud' 125 # If it is a lab server, the script is under ~chromeos-test/ 126 if os.path.exists(os.path.expanduser('~chromeos-test/%s' % 127 export_script)): 128 path = os.path.expanduser('~chromeos-test/%s' % export_script) 129 # If it is a local workstation, it is probably under ~/ 130 elif os.path.exists(os.path.expanduser('~/%s' % export_script)): 131 path = os.path.expanduser('~/%s' % export_script) 132 # If it is not found anywhere, the default will be set to None. 133 else: 134 path = None 135 options.export_to_gcloud_path = path 136 137 # pass the options back 138 return options, args 139 140 141def format_failure_message(jobname, kernel, testname, status, reason): 142 """Format failure message with the given information. 143 144 @param jobname: String representing the job name. 145 @param kernel: String representing the kernel. 146 @param testname: String representing the test name. 147 @param status: String representing the test status. 148 @param reason: String representing the reason. 149 150 @return: Failure message as a string. 151 """ 152 format_string = "%-12s %-20s %-12s %-10s %s" 153 return format_string % (jobname, kernel, testname, status, reason) 154 155 156def mailfailure(jobname, job, message): 157 """Send an email about the failure. 158 159 @param jobname: String representing the job name. 160 @param job: A job object. 161 @param message: The message to mail. 162 """ 163 message_lines = [""] 164 message_lines.append("The following tests FAILED for this job") 165 message_lines.append("http://%s/results/%s" % 166 (socket.gethostname(), jobname)) 167 message_lines.append("") 168 message_lines.append(format_failure_message("Job name", "Kernel", 169 "Test name", "FAIL/WARN", 170 "Failure reason")) 171 message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8, 172 "=" * 8, "=" * 14)) 173 message_header = "\n".join(message_lines) 174 175 subject = "AUTOTEST: FAILED tests from job %s" % jobname 176 mail.send("", job.user, "", subject, message_header + message) 177 178 179def _invalidate_original_tests(orig_job_idx, retry_job_idx): 180 """Retry tests invalidates original tests. 181 182 Whenever a retry job is complete, we want to invalidate the original 183 job's test results, such that the consumers of the tko database 184 (e.g. tko frontend, wmatrix) could figure out which results are the latest. 185 186 When a retry job is parsed, we retrieve the original job's afe_job_id 187 from the retry job's keyvals, which is then converted to tko job_idx and 188 passed into this method as |orig_job_idx|. 189 190 In this method, we are going to invalidate the rows in tko_tests that are 191 associated with the original job by flipping their 'invalid' bit to True. 192 In addition, in tko_tests, we also maintain a pointer from the retry results 193 to the original results, so that later we can always know which rows in 194 tko_tests are retries and which are the corresponding original results. 195 This is done by setting the field 'invalidates_test_idx' of the tests 196 associated with the retry job. 197 198 For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after 199 this method is run, their tko_tests rows will look like: 200 __________________________________________________________________________ 201 test_idx| job_idx | test | ... | invalid | invalidates_test_idx 202 10 | 105 | dummy_Fail.Error| ... | 1 | NULL 203 11 | 105 | dummy_Fail.Fail | ... | 1 | NULL 204 ... 205 20 | 108 | dummy_Fail.Error| ... | 0 | 10 206 21 | 108 | dummy_Fail.Fail | ... | 0 | 11 207 __________________________________________________________________________ 208 Note the invalid bits of the rows for Job(job_idx=105) are set to '1'. 209 And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108) 210 are set to 10 and 11 (the test_idx of the rows for the original job). 211 212 @param orig_job_idx: An integer representing the original job's 213 tko job_idx. Tests associated with this job will 214 be marked as 'invalid'. 215 @param retry_job_idx: An integer representing the retry job's 216 tko job_idx. The field 'invalidates_test_idx' 217 of the tests associated with this job will be updated. 218 219 """ 220 msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx) 221 if not orig_job_idx or not retry_job_idx: 222 tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg) 223 # Using django models here makes things easier, but make sure that 224 # before this method is called, all other relevant transactions have been 225 # committed to avoid race condition. In the long run, we might consider 226 # to make the rest of parser use django models. 227 orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx) 228 retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx) 229 230 # Invalidate original tests. 231 orig_tests.update(invalid=True) 232 233 # Maintain a dictionary that maps (test, subdir) to original tests. 234 # Note that within the scope of a job, (test, subdir) uniquelly 235 # identifies a test run, but 'test' does not. 236 # In a control file, one could run the same test with different 237 # 'subdir_tag', for example, 238 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1') 239 # job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2') 240 # In tko, we will get 241 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1') 242 # (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2') 243 invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test 244 for orig_test in orig_tests} 245 for retry in retry_tests: 246 # It is possible that (retry.test, retry.subdir) doesn't exist 247 # in invalidated_tests. This could happen when the original job 248 # didn't run some of its tests. For example, a dut goes offline 249 # since the beginning of the job, in which case invalidated_tests 250 # will only have one entry for 'SERVER_JOB'. 251 orig_test = invalidated_tests.get((retry.test, retry.subdir), None) 252 if orig_test: 253 retry.invalidates_test = orig_test 254 retry.save() 255 tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg) 256 257 258def _throttle_result_size(path): 259 """Limit the total size of test results for the given path. 260 261 @param path: Path of the result directory. 262 """ 263 if not result_runner.ENABLE_RESULT_THROTTLING: 264 tko_utils.dprint( 265 'Result throttling is not enabled. Skipping throttling %s' % 266 path) 267 return 268 269 max_result_size_KB = _max_result_size_from_control(path) 270 if max_result_size_KB is None: 271 max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB 272 273 try: 274 result_utils.execute(path, max_result_size_KB) 275 except: 276 tko_utils.dprint( 277 'Failed to throttle result size of %s.\nDetails %s' % 278 (path, traceback.format_exc())) 279 280 281def _max_result_size_from_control(path): 282 """Gets the max result size set in a control file, if any. 283 284 If not overrides is found, returns None. 285 """ 286 for control_file in _HARDCODED_CONTROL_FILE_NAMES: 287 control = os.path.join(path, control_file) 288 if not os.path.exists(control): 289 continue 290 291 try: 292 max_result_size_KB = control_data.parse_control( 293 control, raise_warnings=False).max_result_size_KB 294 if max_result_size_KB != control_data.DEFAULT_MAX_RESULT_SIZE_KB: 295 return max_result_size_KB 296 except IOError as e: 297 tko_utils.dprint( 298 'Failed to access %s. Error: %s\nDetails %s' % 299 (control, e, traceback.format_exc())) 300 except control_data.ControlVariableException as e: 301 tko_utils.dprint( 302 'Failed to parse %s. Error: %s\nDetails %s' % 303 (control, e, traceback.format_exc())) 304 return None 305 306 307def export_tko_job_to_file(job, jobname, filename): 308 """Exports the tko job to disk file. 309 310 @param job: database object. 311 @param jobname: the job name as string. 312 @param filename: The path to the results to be parsed. 313 """ 314 from autotest_lib.tko import job_serializer 315 316 serializer = job_serializer.JobSerializer() 317 serializer.serialize_to_binary(job, jobname, filename) 318 319 320def parse_one(db, pid_file_manager, jobname, path, parse_options): 321 """Parse a single job. Optionally send email on failure. 322 323 @param db: database object. 324 @param pid_file_manager: pidfile.PidFileManager object. 325 @param jobname: the tag used to search for existing job in db, 326 e.g. '1234-chromeos-test/host1' 327 @param path: The path to the results to be parsed. 328 @param parse_options: _ParseOptions instance. 329 """ 330 reparse = parse_options.reparse 331 mail_on_failure = parse_options.mail_on_failure 332 dry_run = parse_options.dry_run 333 suite_report = parse_options.suite_report 334 datastore_creds = parse_options.datastore_creds 335 export_to_gcloud_path = parse_options.export_to_gcloud_path 336 337 tko_utils.dprint("\nScanning %s (%s)" % (jobname, path)) 338 old_job_idx = db.find_job(jobname) 339 if old_job_idx is not None and not reparse: 340 tko_utils.dprint("! Job is already parsed, done") 341 return 342 343 # look up the status version 344 job_keyval = models.job.read_keyval(path) 345 status_version = job_keyval.get("status_version", 0) 346 347 parser = parser_lib.parser(status_version) 348 job = parser.make_job(path) 349 tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname)) 350 status_log_path = _find_status_log_path(path) 351 if not status_log_path: 352 tko_utils.dprint("! Unable to parse job, no status file") 353 return 354 _parse_status_log(parser, job, status_log_path) 355 356 if old_job_idx is not None: 357 job.job_idx = old_job_idx 358 unmatched_tests = _match_existing_tests(db, job) 359 if not dry_run: 360 _delete_tests_from_db(db, unmatched_tests) 361 362 job.afe_job_id = tko_utils.get_afe_job_id(jobname) 363 job.skylab_task_id = tko_utils.get_skylab_task_id(jobname) 364 job.afe_parent_job_id = job_keyval.get(constants.PARENT_JOB_ID) 365 job.skylab_parent_task_id = job_keyval.get(constants.PARENT_JOB_ID) 366 job.build = None 367 job.board = None 368 job.build_version = None 369 job.suite = None 370 if job.label: 371 label_info = site_utils.parse_job_name(job.label) 372 if label_info: 373 job.build = label_info.get('build', None) 374 job.build_version = label_info.get('build_version', None) 375 job.board = label_info.get('board', None) 376 job.suite = label_info.get('suite', None) 377 378 result_utils_lib.LOG = tko_utils.dprint 379 _throttle_result_size(path) 380 381 # Record test result size to job_keyvals 382 start_time = time.time() 383 result_size_info = site_utils.collect_result_sizes( 384 path, log=tko_utils.dprint) 385 tko_utils.dprint('Finished collecting result sizes after %s seconds' % 386 (time.time()-start_time)) 387 job.keyval_dict.update(result_size_info.__dict__) 388 389 # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it. 390 391 # check for failures 392 message_lines = [""] 393 job_successful = True 394 for test in job.tests: 395 if not test.subdir: 396 continue 397 tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s" 398 % (test.testname, test.subdir, test.status, 399 test.reason)) 400 if test.status not in ('GOOD', 'WARN'): 401 job_successful = False 402 pid_file_manager.num_tests_failed += 1 403 message_lines.append(format_failure_message( 404 jobname, test.kernel.base, test.subdir, 405 test.status, test.reason)) 406 407 message = "\n".join(message_lines) 408 409 if not dry_run: 410 # send out a email report of failure 411 if len(message) > 2 and mail_on_failure: 412 tko_utils.dprint("Sending email report of failure on %s to %s" 413 % (jobname, job.user)) 414 mailfailure(jobname, job, message) 415 416 # Upload perf values to the perf dashboard, if applicable. 417 for test in job.tests: 418 perf_uploader.upload_test(job, test, jobname) 419 420 # Upload job details to Sponge. 421 sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint) 422 if sponge_url: 423 job.keyval_dict['sponge_url'] = sponge_url 424 425 _write_job_to_db(db, jobname, job) 426 427 # Verify the job data is written to the database. 428 if job.tests: 429 tests_in_db = db.find_tests(job.job_idx) 430 tests_in_db_count = len(tests_in_db) if tests_in_db else 0 431 if tests_in_db_count != len(job.tests): 432 tko_utils.dprint( 433 'Failed to find enough tests for job_idx: %d. The ' 434 'job should have %d tests, only found %d tests.' % 435 (job.job_idx, len(job.tests), tests_in_db_count)) 436 metrics.Counter( 437 'chromeos/autotest/result/db_save_failure', 438 description='The number of times parse failed to ' 439 'save job to TKO database.').increment() 440 441 # Although the cursor has autocommit, we still need to force it to 442 # commit existing changes before we can use django models, otherwise 443 # it will go into deadlock when django models try to start a new 444 # trasaction while the current one has not finished yet. 445 db.commit() 446 447 # Handle retry job. 448 orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID, 449 None) 450 if orig_afe_job_id: 451 orig_job_idx = tko_models.Job.objects.get( 452 afe_job_id=orig_afe_job_id).job_idx 453 _invalidate_original_tests(orig_job_idx, job.job_idx) 454 455 # Serializing job into a binary file 456 export_tko_to_file = global_config.global_config.get_config_value( 457 'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False) 458 459 binary_file_name = os.path.join(path, "job.serialize") 460 if export_tko_to_file: 461 export_tko_job_to_file(job, jobname, binary_file_name) 462 463 if not dry_run: 464 db.commit() 465 466 # Generate a suite report. 467 # Check whether this is a suite job, a suite job will be a hostless job, its 468 # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be 469 # NULL. Only generate timeline report when datastore_parent_key is given. 470 datastore_parent_key = job_keyval.get('datastore_parent_key', None) 471 provision_job_id = job_keyval.get('provision_job_id', None) 472 if (suite_report and jobname.endswith('/hostless') 473 and job.suite and datastore_parent_key): 474 tko_utils.dprint('Start dumping suite timing report...') 475 timing_log = os.path.join(path, 'suite_timing.log') 476 dump_cmd = ("%s/site_utils/dump_suite_report.py %s " 477 "--output='%s' --debug" % 478 (common.autotest_dir, job.afe_job_id, 479 timing_log)) 480 481 if provision_job_id is not None: 482 dump_cmd += " --provision_job_id=%d" % int(provision_job_id) 483 484 subprocess.check_output(dump_cmd, shell=True) 485 tko_utils.dprint('Successfully finish dumping suite timing report') 486 487 if (datastore_creds and export_to_gcloud_path 488 and os.path.exists(export_to_gcloud_path)): 489 upload_cmd = [export_to_gcloud_path, datastore_creds, 490 timing_log, '--parent_key', 491 datastore_parent_key] 492 tko_utils.dprint('Start exporting timeline report to gcloud') 493 subprocess.check_output(upload_cmd) 494 tko_utils.dprint('Successfully export timeline report to ' 495 'gcloud') 496 else: 497 tko_utils.dprint('DEBUG: skip exporting suite timeline to ' 498 'gcloud, because either gcloud creds or ' 499 'export_to_gcloud script is not found.') 500 501 # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of 502 # the function, so any failure, e.g., db connection error, will stop 503 # gs_offloader_instructions being updated, and logs can be uploaded for 504 # troubleshooting. 505 if job_successful: 506 # Check if we should not offload this test's results. 507 if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False): 508 # Update the gs_offloader_instructions json file. 509 gs_instructions_file = os.path.join( 510 path, constants.GS_OFFLOADER_INSTRUCTIONS) 511 gs_offloader_instructions = {} 512 if os.path.exists(gs_instructions_file): 513 with open(gs_instructions_file, 'r') as f: 514 gs_offloader_instructions = json.load(f) 515 516 gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True 517 with open(gs_instructions_file, 'w') as f: 518 json.dump(gs_offloader_instructions, f) 519 520 521def _write_job_to_db(db, jobname, job): 522 """Write all TKO data associated with a job to DB. 523 524 This updates the job object as a side effect. 525 526 @param db: tko.db.db_sql object. 527 @param jobname: Name of the job to write. 528 @param job: tko.models.job object. 529 """ 530 db.insert_or_update_machine(job) 531 db.insert_job(jobname, job) 532 db.insert_or_update_task_reference( 533 job, 534 'skylab' if tko_utils.is_skylab_task(jobname) else 'afe', 535 ) 536 db.update_job_keyvals(job) 537 for test in job.tests: 538 db.insert_test(job, test) 539 540 541def _find_status_log_path(path): 542 if os.path.exists(os.path.join(path, "status.log")): 543 return os.path.join(path, "status.log") 544 if os.path.exists(os.path.join(path, "status")): 545 return os.path.join(path, "status") 546 return "" 547 548 549def _parse_status_log(parser, job, status_log_path): 550 status_lines = open(status_log_path).readlines() 551 parser.start(job) 552 tests = parser.end(status_lines) 553 554 # parser.end can return the same object multiple times, so filter out dups 555 job.tests = [] 556 already_added = set() 557 for test in tests: 558 if test not in already_added: 559 already_added.add(test) 560 job.tests.append(test) 561 562 563def _match_existing_tests(db, job): 564 """Find entries in the DB corresponding to the job's tests, update job. 565 566 @return: Any unmatched tests in the db. 567 """ 568 old_job_idx = job.job_idx 569 raw_old_tests = db.select("test_idx,subdir,test", "tko_tests", 570 {"job_idx": old_job_idx}) 571 if raw_old_tests: 572 old_tests = dict(((test, subdir), test_idx) 573 for test_idx, subdir, test in raw_old_tests) 574 else: 575 old_tests = {} 576 577 for test in job.tests: 578 test_idx = old_tests.pop((test.testname, test.subdir), None) 579 if test_idx is not None: 580 test.test_idx = test_idx 581 else: 582 tko_utils.dprint("! Reparse returned new test " 583 "testname=%r subdir=%r" % 584 (test.testname, test.subdir)) 585 return old_tests 586 587 588def _delete_tests_from_db(db, tests): 589 for test_idx in tests.itervalues(): 590 where = {'test_idx' : test_idx} 591 db.delete('tko_iteration_result', where) 592 db.delete('tko_iteration_perf_value', where) 593 db.delete('tko_iteration_attributes', where) 594 db.delete('tko_test_attributes', where) 595 db.delete('tko_test_labels_tests', {'test_id': test_idx}) 596 db.delete('tko_tests', where) 597 598 599def _get_job_subdirs(path): 600 """ 601 Returns a list of job subdirectories at path. Returns None if the test 602 is itself a job directory. Does not recurse into the subdirs. 603 """ 604 # if there's a .machines file, use it to get the subdirs 605 machine_list = os.path.join(path, ".machines") 606 if os.path.exists(machine_list): 607 subdirs = set(line.strip() for line in file(machine_list)) 608 existing_subdirs = set(subdir for subdir in subdirs 609 if os.path.exists(os.path.join(path, subdir))) 610 if len(existing_subdirs) != 0: 611 return existing_subdirs 612 613 # if this dir contains ONLY subdirectories, return them 614 contents = set(os.listdir(path)) 615 contents.discard(".parse.lock") 616 subdirs = set(sub for sub in contents if 617 os.path.isdir(os.path.join(path, sub))) 618 if len(contents) == len(subdirs) != 0: 619 return subdirs 620 621 # this is a job directory, or something else we don't understand 622 return None 623 624 625def parse_leaf_path(db, pid_file_manager, path, level, parse_options): 626 """Parse a leaf path. 627 628 @param db: database handle. 629 @param pid_file_manager: pidfile.PidFileManager object. 630 @param path: The path to the results to be parsed. 631 @param level: Integer, level of subdirectories to include in the job name. 632 @param parse_options: _ParseOptions instance. 633 634 @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1' 635 """ 636 job_elements = path.split("/")[-level:] 637 jobname = "/".join(job_elements) 638 db.run_with_retry(parse_one, db, pid_file_manager, jobname, path, 639 parse_options) 640 return jobname 641 642 643def parse_path(db, pid_file_manager, path, level, parse_options): 644 """Parse a path 645 646 @param db: database handle. 647 @param pid_file_manager: pidfile.PidFileManager object. 648 @param path: The path to the results to be parsed. 649 @param level: Integer, level of subdirectories to include in the job name. 650 @param parse_options: _ParseOptions instance. 651 652 @returns: A set of job names of the parsed jobs. 653 set(['123-chromeos-test/host1', '123-chromeos-test/host2']) 654 """ 655 processed_jobs = set() 656 job_subdirs = _get_job_subdirs(path) 657 if job_subdirs is not None: 658 # parse status.log in current directory, if it exists. multi-machine 659 # synchronous server side tests record output in this directory. without 660 # this check, we do not parse these results. 661 if os.path.exists(os.path.join(path, 'status.log')): 662 new_job = parse_leaf_path(db, pid_file_manager, path, level, 663 parse_options) 664 processed_jobs.add(new_job) 665 # multi-machine job 666 for subdir in job_subdirs: 667 jobpath = os.path.join(path, subdir) 668 new_jobs = parse_path(db, pid_file_manager, jobpath, level + 1, 669 parse_options) 670 processed_jobs.update(new_jobs) 671 else: 672 # single machine job 673 new_job = parse_leaf_path(db, pid_file_manager, path, level, 674 parse_options) 675 processed_jobs.add(new_job) 676 return processed_jobs 677 678 679def _detach_from_parent_process(): 680 """Allow reparenting the parse process away from caller. 681 682 When monitor_db is run via upstart, restarting the job sends SIGTERM to 683 the whole process group. This makes us immune from that. 684 """ 685 if os.getpid() != os.getpgid(0): 686 os.setsid() 687 688 689def main(): 690 """tko_parse entry point.""" 691 options, args = parse_args() 692 693 # We are obliged to use indirect=False, not use the SetupTsMonGlobalState 694 # context manager, and add a manual flush, because tko/parse is expected to 695 # be a very short lived (<1 min) script when working effectively, and we 696 # can't afford to either a) wait for up to 1min for metrics to flush at the 697 # end or b) drop metrics that were sent within the last minute of execution. 698 site_utils.SetupTsMonGlobalState('tko_parse', indirect=False, 699 short_lived=True) 700 try: 701 with metrics.SuccessCounter('chromeos/autotest/tko_parse/runs'): 702 _main_with_options(options, args) 703 finally: 704 metrics.Flush() 705 706 707def _main_with_options(options, args): 708 """Entry point with options parsed and metrics already set up.""" 709 # Record the processed jobs so that 710 # we can send the duration of parsing to metadata db. 711 processed_jobs = set() 712 713 if options.detach: 714 _detach_from_parent_process() 715 716 parse_options = _ParseOptions(options.reparse, options.mailit, 717 options.dry_run, options.suite_report, 718 options.datastore_creds, 719 options.export_to_gcloud_path) 720 results_dir = os.path.abspath(args[0]) 721 assert os.path.exists(results_dir) 722 723 pid_file_manager = pidfile.PidFileManager("parser", results_dir) 724 725 if options.write_pidfile: 726 pid_file_manager.open_file() 727 728 try: 729 # build up the list of job dirs to parse 730 if options.singledir: 731 jobs_list = [results_dir] 732 else: 733 jobs_list = [os.path.join(results_dir, subdir) 734 for subdir in os.listdir(results_dir)] 735 736 # build up the database 737 db = tko_db.db(autocommit=False, host=options.db_host, 738 user=options.db_user, password=options.db_pass, 739 database=options.db_name) 740 741 # parse all the jobs 742 for path in jobs_list: 743 lockfile = open(os.path.join(path, ".parse.lock"), "w") 744 flags = fcntl.LOCK_EX 745 if options.noblock: 746 flags |= fcntl.LOCK_NB 747 try: 748 fcntl.flock(lockfile, flags) 749 except IOError, e: 750 # lock is not available and nonblock has been requested 751 if e.errno == errno.EWOULDBLOCK: 752 lockfile.close() 753 continue 754 else: 755 raise # something unexpected happened 756 try: 757 new_jobs = parse_path(db, pid_file_manager, path, options.level, 758 parse_options) 759 processed_jobs.update(new_jobs) 760 761 finally: 762 fcntl.flock(lockfile, fcntl.LOCK_UN) 763 lockfile.close() 764 765 except Exception as e: 766 pid_file_manager.close_file(1) 767 raise 768 else: 769 pid_file_manager.close_file(0) 770 771 772if __name__ == "__main__": 773 main() 774