1#!/usr/bin/python3 -u 2 3from __future__ import absolute_import 4from __future__ import division 5from __future__ import print_function 6 7import collections 8import errno 9import fcntl 10import json 11import optparse 12import os 13import socket 14import sys 15import time 16import traceback 17 18import common 19from autotest_lib.client.bin.result_tools import utils as result_utils 20from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib 21from autotest_lib.client.bin.result_tools import runner as result_runner 22from autotest_lib.client.common_lib import control_data 23from autotest_lib.client.common_lib import global_config 24from autotest_lib.client.common_lib import mail, pidfile 25from autotest_lib.client.common_lib import utils 26from autotest_lib.frontend import setup_django_environment 27from autotest_lib.frontend.tko import models as tko_models 28from autotest_lib.server import site_utils 29from autotest_lib.server.cros.dynamic_suite import constants 30from autotest_lib.tko import db as tko_db, utils as tko_utils 31from autotest_lib.tko import models, parser_lib 32from autotest_lib.tko.perf_upload import perf_uploader 33from autotest_lib.utils.side_effects import config_loader 34import six 35 36try: 37 from autotest_lib.utils.frozen_chromite.lib import metrics 38except ImportError: 39 metrics = utils.metrics_mock 40 41 42_ParseOptions = collections.namedtuple( 43 'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report', 44 'datastore_creds', 'export_to_gcloud_path', 45 'disable_perf_upload']) 46 47_HARDCODED_CONTROL_FILE_NAMES = ( 48 # client side test control, as saved in old Autotest paths. 49 'control', 50 # server side test control, as saved in old Autotest paths. 51 'control.srv', 52 # All control files, as saved in skylab. 53 'control.from_control_name', 54) 55 56# Max size for the parser is 350mb due to large suites getting throttled. 57DEFAULT_MAX_RESULT_SIZE_KB = 350000 58 59 60def parse_args(): 61 """Parse args.""" 62 # build up our options parser and parse sys.argv 63 parser = optparse.OptionParser() 64 parser.add_option("-m", help="Send mail for FAILED tests", 65 dest="mailit", action="store_true") 66 parser.add_option("-r", help="Reparse the results of a job", 67 dest="reparse", action="store_true") 68 parser.add_option("-o", help="Parse a single results directory", 69 dest="singledir", action="store_true") 70 parser.add_option("-l", help=("Levels of subdirectories to include " 71 "in the job name"), 72 type="int", dest="level", default=1) 73 parser.add_option("-n", help="No blocking on an existing parse", 74 dest="noblock", action="store_true") 75 parser.add_option("-s", help="Database server hostname", 76 dest="db_host", action="store") 77 parser.add_option("-u", help="Database username", dest="db_user", 78 action="store") 79 parser.add_option("-p", help="Database password", dest="db_pass", 80 action="store") 81 parser.add_option("-d", help="Database name", dest="db_name", 82 action="store") 83 parser.add_option("--dry-run", help="Do not actually commit any results.", 84 dest="dry_run", action="store_true", default=False) 85 parser.add_option( 86 "--detach", action="store_true", 87 help="Detach parsing process from the caller process. Used by " 88 "monitor_db to safely restart without affecting parsing.", 89 default=False) 90 parser.add_option("--write-pidfile", 91 help="write pidfile (.parser_execute)", 92 dest="write_pidfile", action="store_true", 93 default=False) 94 parser.add_option("--record-duration", 95 help="[DEPRECATED] Record timing to metadata db", 96 dest="record_duration", action="store_true", 97 default=False) 98 parser.add_option("--suite-report", 99 help=("Allows parsing job to attempt to create a suite " 100 "timeline report, if it detects that the job being " 101 "parsed is a suite job."), 102 dest="suite_report", action="store_true", 103 default=False) 104 parser.add_option("--datastore-creds", 105 help=("[DEPRECATED] " 106 "The path to gcloud datastore credentials file, " 107 "which will be used to upload suite timeline " 108 "report to gcloud."), 109 dest="datastore_creds", 110 action="store", 111 default=None) 112 parser.add_option( 113 "--export-to-gcloud-path", 114 help=("[DEPRECATED] " 115 "The path to export_to_gcloud script. Please find " 116 "chromite path on your server. The script is under " 117 "chromite/bin/."), 118 dest="export_to_gcloud_path", 119 action="store", 120 default=None) 121 parser.add_option("--disable-perf-upload", 122 help=("Do not upload perf results to chrome perf."), 123 dest="disable_perf_upload", action="store_true", 124 default=False) 125 options, args = parser.parse_args() 126 127 # we need a results directory 128 if len(args) == 0: 129 tko_utils.dprint("ERROR: at least one results directory must " 130 "be provided") 131 parser.print_help() 132 sys.exit(1) 133 134 # pass the options back 135 return options, args 136 137 138def format_failure_message(jobname, kernel, testname, status, reason): 139 """Format failure message with the given information. 140 141 @param jobname: String representing the job name. 142 @param kernel: String representing the kernel. 143 @param testname: String representing the test name. 144 @param status: String representing the test status. 145 @param reason: String representing the reason. 146 147 @return: Failure message as a string. 148 """ 149 format_string = "%-12s %-20s %-12s %-10s %s" 150 return format_string % (jobname, kernel, testname, status, reason) 151 152 153def mailfailure(jobname, job, message): 154 """Send an email about the failure. 155 156 @param jobname: String representing the job name. 157 @param job: A job object. 158 @param message: The message to mail. 159 """ 160 message_lines = [""] 161 message_lines.append("The following tests FAILED for this job") 162 message_lines.append("http://%s/results/%s" % 163 (socket.gethostname(), jobname)) 164 message_lines.append("") 165 message_lines.append(format_failure_message("Job name", "Kernel", 166 "Test name", "FAIL/WARN", 167 "Failure reason")) 168 message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8, 169 "=" * 8, "=" * 14)) 170 message_header = "\n".join(message_lines) 171 172 subject = "AUTOTEST: FAILED tests from job %s" % jobname 173 mail.send("", job.user, "", subject, message_header + message) 174 175 176def _invalidate_original_tests(orig_job_idx, retry_job_idx): 177 """Retry tests invalidates original tests. 178 179 Whenever a retry job is complete, we want to invalidate the original 180 job's test results, such that the consumers of the tko database 181 (e.g. tko frontend, wmatrix) could figure out which results are the latest. 182 183 When a retry job is parsed, we retrieve the original job's afe_job_id 184 from the retry job's keyvals, which is then converted to tko job_idx and 185 passed into this method as |orig_job_idx|. 186 187 In this method, we are going to invalidate the rows in tko_tests that are 188 associated with the original job by flipping their 'invalid' bit to True. 189 In addition, in tko_tests, we also maintain a pointer from the retry results 190 to the original results, so that later we can always know which rows in 191 tko_tests are retries and which are the corresponding original results. 192 This is done by setting the field 'invalidates_test_idx' of the tests 193 associated with the retry job. 194 195 For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after 196 this method is run, their tko_tests rows will look like: 197 __________________________________________________________________________ 198 test_idx| job_idx | test | ... | invalid | invalidates_test_idx 199 10 | 105 | example_Fail.Error| ... | 1 | NULL 200 11 | 105 | example_Fail.Fail | ... | 1 | NULL 201 ... 202 20 | 108 | example_Fail.Error| ... | 0 | 10 203 21 | 108 | example_Fail.Fail | ... | 0 | 11 204 __________________________________________________________________________ 205 Note the invalid bits of the rows for Job(job_idx=105) are set to '1'. 206 And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108) 207 are set to 10 and 11 (the test_idx of the rows for the original job). 208 209 @param orig_job_idx: An integer representing the original job's 210 tko job_idx. Tests associated with this job will 211 be marked as 'invalid'. 212 @param retry_job_idx: An integer representing the retry job's 213 tko job_idx. The field 'invalidates_test_idx' 214 of the tests associated with this job will be updated. 215 216 """ 217 msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx) 218 if not orig_job_idx or not retry_job_idx: 219 tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg) 220 # Using django models here makes things easier, but make sure that 221 # before this method is called, all other relevant transactions have been 222 # committed to avoid race condition. In the long run, we might consider 223 # to make the rest of parser use django models. 224 orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx) 225 retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx) 226 227 # Invalidate original tests. 228 orig_tests.update(invalid=True) 229 230 # Maintain a dictionary that maps (test, subdir) to original tests. 231 # Note that within the scope of a job, (test, subdir) uniquelly 232 # identifies a test run, but 'test' does not. 233 # In a control file, one could run the same test with different 234 # 'subdir_tag', for example, 235 # job.run_test('example_Fail', tag='Error', subdir_tag='subdir_1') 236 # job.run_test('example_Fail', tag='Error', subdir_tag='subdir_2') 237 # In tko, we will get 238 # (test='example_Fail.Error', subdir='example_Fail.Error.subdir_1') 239 # (test='example_Fail.Error', subdir='example_Fail.Error.subdir_2') 240 invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test 241 for orig_test in orig_tests} 242 for retry in retry_tests: 243 # It is possible that (retry.test, retry.subdir) doesn't exist 244 # in invalidated_tests. This could happen when the original job 245 # didn't run some of its tests. For example, a dut goes offline 246 # since the beginning of the job, in which case invalidated_tests 247 # will only have one entry for 'SERVER_JOB'. 248 orig_test = invalidated_tests.get((retry.test, retry.subdir), None) 249 if orig_test: 250 retry.invalidates_test = orig_test 251 retry.save() 252 tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg) 253 254 255def _throttle_result_size(path): 256 """Limit the total size of test results for the given path. 257 258 @param path: Path of the result directory. 259 """ 260 if not result_runner.ENABLE_RESULT_THROTTLING: 261 tko_utils.dprint( 262 'Result throttling is not enabled. Skipping throttling %s' % 263 path) 264 return 265 266 max_result_size_KB = _max_result_size_from_control(path) 267 if max_result_size_KB is None: 268 max_result_size_KB = DEFAULT_MAX_RESULT_SIZE_KB 269 270 try: 271 result_utils.execute(path, max_result_size_KB) 272 except: 273 tko_utils.dprint( 274 'Failed to throttle result size of %s.\nDetails %s' % 275 (path, traceback.format_exc())) 276 277 278def _max_result_size_from_control(path): 279 """Gets the max result size set in a control file, if any. 280 281 If not overrides is found, returns None. 282 """ 283 for control_file in _HARDCODED_CONTROL_FILE_NAMES: 284 control = os.path.join(path, control_file) 285 if not os.path.exists(control): 286 continue 287 288 try: 289 max_result_size_KB = control_data.parse_control( 290 control, raise_warnings=False).max_result_size_KB 291 if max_result_size_KB != DEFAULT_MAX_RESULT_SIZE_KB: 292 return max_result_size_KB 293 except IOError as e: 294 tko_utils.dprint( 295 'Failed to access %s. Error: %s\nDetails %s' % 296 (control, e, traceback.format_exc())) 297 except control_data.ControlVariableException as e: 298 tko_utils.dprint( 299 'Failed to parse %s. Error: %s\nDetails %s' % 300 (control, e, traceback.format_exc())) 301 return None 302 303 304def export_tko_job_to_file(job, jobname, filename): 305 """Exports the tko job to disk file. 306 307 @param job: database object. 308 @param jobname: the job name as string. 309 @param filename: the serialized binary destination path. 310 """ 311 from autotest_lib.tko import job_serializer 312 313 serializer = job_serializer.JobSerializer() 314 serializer.serialize_to_binary(job, jobname, filename) 315 316 317def parse_one(db, pid_file_manager, jobname, path, parse_options): 318 """Parse a single job. Optionally send email on failure. 319 320 @param db: database object. 321 @param pid_file_manager: pidfile.PidFileManager object. 322 @param jobname: the tag used to search for existing job in db, 323 e.g. '1234-chromeos-test/host1' 324 @param path: The path to the results to be parsed. 325 @param parse_options: _ParseOptions instance. 326 327 @return job: the parsed job object 328 """ 329 reparse = parse_options.reparse 330 mail_on_failure = parse_options.mail_on_failure 331 dry_run = parse_options.dry_run 332 suite_report = parse_options.suite_report 333 334 tko_utils.dprint("\nScanning %s (%s)" % (jobname, path)) 335 old_job_idx = db.find_job(jobname) 336 if old_job_idx is not None and not reparse: 337 tko_utils.dprint("! Job is already parsed, done") 338 return None 339 340 # look up the status version 341 job_keyval = models.job.read_keyval(path) 342 status_version = job_keyval.get("status_version", 0) 343 344 parser = parser_lib.parser(status_version) 345 job = parser.make_job(path) 346 tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname)) 347 status_log_path = _find_status_log_path(path) 348 if not status_log_path: 349 tko_utils.dprint("! Unable to parse job, no status file") 350 return None 351 _parse_status_log(parser, job, status_log_path) 352 353 if old_job_idx is not None: 354 job.job_idx = old_job_idx 355 unmatched_tests = _match_existing_tests(db, job) 356 if not dry_run: 357 _delete_tests_from_db(db, unmatched_tests) 358 359 job.afe_job_id = tko_utils.get_afe_job_id(jobname) 360 job.skylab_task_id = tko_utils.get_skylab_task_id(jobname) 361 job.afe_parent_job_id = job_keyval.get(constants.PARENT_JOB_ID) 362 job.skylab_parent_task_id = job_keyval.get(constants.PARENT_JOB_ID) 363 job.build = None 364 job.board = None 365 job.build_version = None 366 job.suite = None 367 if job.label: 368 label_info = site_utils.parse_job_name(job.label) 369 if label_info: 370 job.build = label_info.get('build', None) 371 job.build_version = label_info.get('build_version', None) 372 job.board = label_info.get('board', None) 373 job.suite = label_info.get('suite', None) 374 375 if 'suite' in job.keyval_dict: 376 job.suite = job.keyval_dict['suite'] 377 378 result_utils_lib.LOG = tko_utils.dprint 379 380 # Do not throttle results for now (b/207409280) 381 # _throttle_result_size(path) 382 383 # Record test result size to job_keyvals 384 start_time = time.time() 385 result_size_info = site_utils.collect_result_sizes( 386 path, log=tko_utils.dprint) 387 tko_utils.dprint('Finished collecting result sizes after %s seconds' % 388 (time.time()-start_time)) 389 job.keyval_dict.update(result_size_info._asdict()) 390 391 # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it. 392 393 # check for failures 394 message_lines = [""] 395 job_successful = True 396 for test in job.tests: 397 if not test.subdir: 398 continue 399 tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s" 400 % (test.testname, test.subdir, test.status, 401 test.reason)) 402 if test.status not in ('GOOD', 'WARN'): 403 job_successful = False 404 pid_file_manager.num_tests_failed += 1 405 message_lines.append(format_failure_message( 406 jobname, test.kernel.base, test.subdir, 407 test.status, test.reason)) 408 409 message = "\n".join(message_lines) 410 411 if not dry_run: 412 # send out a email report of failure 413 if len(message) > 2 and mail_on_failure: 414 tko_utils.dprint("Sending email report of failure on %s to %s" 415 % (jobname, job.user)) 416 mailfailure(jobname, job, message) 417 418 # Upload perf values to the perf dashboard, if applicable. 419 if parse_options.disable_perf_upload: 420 tko_utils.dprint("Skipping results upload to chrome perf as it is " 421 "disabled by config") 422 else: 423 for test in job.tests: 424 perf_uploader.upload_test(job, test, jobname) 425 426 _write_job_to_db(db, jobname, job) 427 428 # Verify the job data is written to the database. 429 if job.tests: 430 tests_in_db = db.find_tests(job.job_idx) 431 tests_in_db_count = len(tests_in_db) if tests_in_db else 0 432 if tests_in_db_count != len(job.tests): 433 tko_utils.dprint( 434 'Failed to find enough tests for job_idx: %d. The ' 435 'job should have %d tests, only found %d tests.' % 436 (job.job_idx, len(job.tests), tests_in_db_count)) 437 metrics.Counter( 438 'chromeos/autotest/result/db_save_failure', 439 description='The number of times parse failed to ' 440 'save job to TKO database.').increment() 441 442 # Although the cursor has autocommit, we still need to force it to 443 # commit existing changes before we can use django models, otherwise 444 # it will go into deadlock when django models try to start a new 445 # trasaction while the current one has not finished yet. 446 db.commit() 447 448 # Handle retry job. 449 orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID, 450 None) 451 if orig_afe_job_id: 452 orig_job_idx = tko_models.Job.objects.get( 453 afe_job_id=orig_afe_job_id).job_idx 454 _invalidate_original_tests(orig_job_idx, job.job_idx) 455 456 # Serializing job into a binary file 457 export_tko_to_file = global_config.global_config.get_config_value( 458 'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False) 459 460 binary_file_name = os.path.join(path, "job.serialize") 461 if export_tko_to_file: 462 export_tko_job_to_file(job, jobname, binary_file_name) 463 464 if not dry_run: 465 db.commit() 466 467 # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of 468 # the function, so any failure, e.g., db connection error, will stop 469 # gs_offloader_instructions being updated, and logs can be uploaded for 470 # troubleshooting. 471 if job_successful: 472 # Check if we should not offload this test's results. 473 if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False): 474 # Update the gs_offloader_instructions json file. 475 gs_instructions_file = os.path.join( 476 path, constants.GS_OFFLOADER_INSTRUCTIONS) 477 gs_offloader_instructions = {} 478 if os.path.exists(gs_instructions_file): 479 with open(gs_instructions_file, 'r') as f: 480 gs_offloader_instructions = json.load(f) 481 482 gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True 483 with open(gs_instructions_file, 'w') as f: 484 json.dump(gs_offloader_instructions, f) 485 return job 486 487 488def _write_job_to_db(db, jobname, job): 489 """Write all TKO data associated with a job to DB. 490 491 This updates the job object as a side effect. 492 493 @param db: tko.db.db_sql object. 494 @param jobname: Name of the job to write. 495 @param job: tko.models.job object. 496 """ 497 db.insert_or_update_machine(job) 498 db.insert_job(jobname, job) 499 db.insert_or_update_task_reference( 500 job, 501 'skylab' if tko_utils.is_skylab_task(jobname) else 'afe', 502 ) 503 db.update_job_keyvals(job) 504 for test in job.tests: 505 db.insert_test(job, test) 506 507 508def _find_status_log_path(path): 509 if os.path.exists(os.path.join(path, "status.log")): 510 return os.path.join(path, "status.log") 511 if os.path.exists(os.path.join(path, "status")): 512 return os.path.join(path, "status") 513 return "" 514 515 516def _parse_status_log(parser, job, status_log_path): 517 status_lines = open(status_log_path).readlines() 518 parser.start(job) 519 tests = parser.end(status_lines) 520 521 # parser.end can return the same object multiple times, so filter out dups 522 job.tests = [] 523 already_added = set() 524 for test in tests: 525 if test not in already_added: 526 already_added.add(test) 527 job.tests.append(test) 528 529 530def _match_existing_tests(db, job): 531 """Find entries in the DB corresponding to the job's tests, update job. 532 533 @return: Any unmatched tests in the db. 534 """ 535 old_job_idx = job.job_idx 536 raw_old_tests = db.select("test_idx,subdir,test", "tko_tests", 537 {"job_idx": old_job_idx}) 538 if raw_old_tests: 539 old_tests = dict(((test, subdir), test_idx) 540 for test_idx, subdir, test in raw_old_tests) 541 else: 542 old_tests = {} 543 544 for test in job.tests: 545 test_idx = old_tests.pop((test.testname, test.subdir), None) 546 if test_idx is not None: 547 test.test_idx = test_idx 548 else: 549 tko_utils.dprint("! Reparse returned new test " 550 "testname=%r subdir=%r" % 551 (test.testname, test.subdir)) 552 return old_tests 553 554 555def _delete_tests_from_db(db, tests): 556 for test_idx in six.itervalues(tests): 557 where = {'test_idx' : test_idx} 558 db.delete('tko_iteration_result', where) 559 db.delete('tko_iteration_perf_value', where) 560 db.delete('tko_iteration_attributes', where) 561 db.delete('tko_test_attributes', where) 562 db.delete('tko_test_labels_tests', {'test_id': test_idx}) 563 db.delete('tko_tests', where) 564 565 566def _get_job_subdirs(path): 567 """ 568 Returns a list of job subdirectories at path. Returns None if the test 569 is itself a job directory. Does not recurse into the subdirs. 570 """ 571 # if there's a .machines file, use it to get the subdirs 572 machine_list = os.path.join(path, ".machines") 573 if os.path.exists(machine_list): 574 with open(machine_list, 'r') as ml: 575 subdirs = set(line.strip() for line in ml.readlines()) 576 existing_subdirs = set(subdir for subdir in subdirs 577 if os.path.exists(os.path.join(path, subdir))) 578 if len(existing_subdirs) != 0: 579 return existing_subdirs 580 581 # if this dir contains ONLY subdirectories, return them 582 contents = set(os.listdir(path)) 583 contents.discard(".parse.lock") 584 subdirs = set(sub for sub in contents if 585 os.path.isdir(os.path.join(path, sub))) 586 if len(contents) == len(subdirs) != 0: 587 return subdirs 588 589 # this is a job directory, or something else we don't understand 590 return None 591 592 593def parse_leaf_path(db, pid_file_manager, path, level, parse_options): 594 """Parse a leaf path. 595 596 @param db: database handle. 597 @param pid_file_manager: pidfile.PidFileManager object. 598 @param path: The path to the results to be parsed. 599 @param level: Integer, level of subdirectories to include in the job name. 600 @param parse_options: _ParseOptions instance. 601 602 @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1' 603 """ 604 job_elements = path.split("/")[-level:] 605 jobname = "/".join(job_elements) 606 db.run_with_retry(parse_one, db, pid_file_manager, jobname, path, 607 parse_options) 608 return jobname 609 610 611def parse_path(db, pid_file_manager, path, level, parse_options): 612 """Parse a path 613 614 @param db: database handle. 615 @param pid_file_manager: pidfile.PidFileManager object. 616 @param path: The path to the results to be parsed. 617 @param level: Integer, level of subdirectories to include in the job name. 618 @param parse_options: _ParseOptions instance. 619 620 @returns: A set of job names of the parsed jobs. 621 set(['123-chromeos-test/host1', '123-chromeos-test/host2']) 622 """ 623 processed_jobs = set() 624 job_subdirs = _get_job_subdirs(path) 625 if job_subdirs is not None: 626 # parse status.log in current directory, if it exists. multi-machine 627 # synchronous server side tests record output in this directory. without 628 # this check, we do not parse these results. 629 if os.path.exists(os.path.join(path, 'status.log')): 630 new_job = parse_leaf_path(db, pid_file_manager, path, level, 631 parse_options) 632 processed_jobs.add(new_job) 633 # multi-machine job 634 for subdir in job_subdirs: 635 jobpath = os.path.join(path, subdir) 636 new_jobs = parse_path(db, pid_file_manager, jobpath, level + 1, 637 parse_options) 638 processed_jobs.update(new_jobs) 639 else: 640 # single machine job 641 new_job = parse_leaf_path(db, pid_file_manager, path, level, 642 parse_options) 643 processed_jobs.add(new_job) 644 return processed_jobs 645 646 647def _detach_from_parent_process(): 648 """Allow reparenting the parse process away from caller. 649 650 When monitor_db is run via upstart, restarting the job sends SIGTERM to 651 the whole process group. This makes us immune from that. 652 """ 653 if os.getpid() != os.getpgid(0): 654 os.setsid() 655 656 657def main(): 658 """tko_parse entry point.""" 659 options, args = parse_args() 660 661 # We are obliged to use indirect=False, not use the SetupTsMonGlobalState 662 # context manager, and add a manual flush, because tko/parse is expected to 663 # be a very short lived (<1 min) script when working effectively, and we 664 # can't afford to either a) wait for up to 1min for metrics to flush at the 665 # end or b) drop metrics that were sent within the last minute of execution. 666 site_utils.SetupTsMonGlobalState('tko_parse', indirect=False, 667 short_lived=True) 668 try: 669 with metrics.SuccessCounter('chromeos/autotest/tko_parse/runs'): 670 _main_with_options(options, args) 671 finally: 672 metrics.Flush() 673 674 675def _main_with_options(options, args): 676 """Entry point with options parsed and metrics already set up.""" 677 # Record the processed jobs so that 678 # we can send the duration of parsing to metadata db. 679 processed_jobs = set() 680 681 if options.detach: 682 _detach_from_parent_process() 683 684 results_dir = os.path.abspath(args[0]) 685 assert os.path.exists(results_dir) 686 687 _update_db_config_from_json(options, results_dir) 688 689 parse_options = _ParseOptions(options.reparse, options.mailit, 690 options.dry_run, options.suite_report, 691 options.datastore_creds, 692 options.export_to_gcloud_path, 693 options.disable_perf_upload) 694 695 pid_file_manager = pidfile.PidFileManager("parser", results_dir) 696 697 if options.write_pidfile: 698 pid_file_manager.open_file() 699 700 try: 701 # build up the list of job dirs to parse 702 if options.singledir: 703 jobs_list = [results_dir] 704 else: 705 jobs_list = [os.path.join(results_dir, subdir) 706 for subdir in os.listdir(results_dir)] 707 708 # build up the database 709 db = tko_db.db(autocommit=False, host=options.db_host, 710 user=options.db_user, password=options.db_pass, 711 database=options.db_name) 712 713 # parse all the jobs 714 for path in jobs_list: 715 lockfile = open(os.path.join(path, ".parse.lock"), "w") 716 flags = fcntl.LOCK_EX 717 if options.noblock: 718 flags |= fcntl.LOCK_NB 719 try: 720 fcntl.flock(lockfile, flags) 721 except IOError as e: 722 # lock is not available and nonblock has been requested 723 if e.errno == errno.EWOULDBLOCK: 724 lockfile.close() 725 continue 726 else: 727 raise # something unexpected happened 728 try: 729 new_jobs = parse_path(db, pid_file_manager, path, options.level, 730 parse_options) 731 processed_jobs.update(new_jobs) 732 733 finally: 734 fcntl.flock(lockfile, fcntl.LOCK_UN) 735 lockfile.close() 736 737 except Exception as e: 738 pid_file_manager.close_file(1) 739 raise 740 else: 741 pid_file_manager.close_file(0) 742 743 744def _update_db_config_from_json(options, test_results_dir): 745 """Uptade DB config options using a side_effects_config.json file. 746 747 @param options: parsed args to be updated. 748 @param test_results_dir: path to test results dir. 749 750 @raises: json_format.ParseError if the file is not a valid JSON. 751 ValueError if the JSON config is incomplete. 752 OSError if some files from the JSON config are missing. 753 """ 754 # results_dir passed to tko/parse is a subdir of the root results dir 755 config_dir = os.path.join(test_results_dir, os.pardir) 756 tko_utils.dprint("Attempting to read side_effects.Config from %s" % 757 config_dir) 758 config = config_loader.load(config_dir) 759 760 if config: 761 tko_utils.dprint("Validating side_effects.Config.tko") 762 config_loader.validate_tko(config) 763 764 tko_utils.dprint("Using the following DB config params from " 765 "side_effects.Config.tko:\n%s" % config.tko) 766 options.db_host = config.tko.proxy_socket 767 options.db_user = config.tko.mysql_user 768 769 with open(config.tko.mysql_password_file, 'r') as f: 770 options.db_pass = f.read().rstrip('\n') 771 772 options.disable_perf_upload = not config.chrome_perf.enabled 773 else: 774 tko_utils.dprint("No side_effects.Config found in %s - " 775 "defaulting to DB config values from shadow config" % config_dir) 776 777 778if __name__ == "__main__": 779 main() 780