• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2 -u
2
3import collections
4import errno
5import fcntl
6import json
7import optparse
8import os
9import socket
10import subprocess
11import sys
12import time
13import traceback
14
15import common
16from autotest_lib.client.bin.result_tools import utils as result_utils
17from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib
18from autotest_lib.client.bin.result_tools import runner as result_runner
19from autotest_lib.client.common_lib import control_data
20from autotest_lib.client.common_lib import global_config
21from autotest_lib.client.common_lib import mail, pidfile
22from autotest_lib.client.common_lib import utils
23from autotest_lib.frontend import setup_django_environment
24from autotest_lib.frontend.tko import models as tko_models
25from autotest_lib.server import site_utils
26from autotest_lib.server.cros.dynamic_suite import constants
27from autotest_lib.site_utils.sponge_lib import sponge_utils
28from autotest_lib.tko import db as tko_db, utils as tko_utils
29from autotest_lib.tko import models, parser_lib
30from autotest_lib.tko.perf_upload import perf_uploader
31from autotest_lib.utils.side_effects import config_loader
32
33try:
34    from chromite.lib import metrics
35except ImportError:
36    metrics = utils.metrics_mock
37
38
39_ParseOptions = collections.namedtuple(
40    'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report',
41                     'datastore_creds', 'export_to_gcloud_path',
42                     'disable_perf_upload'])
43
44_HARDCODED_CONTROL_FILE_NAMES = (
45        # client side test control, as saved in old Autotest paths.
46        'control',
47        # server side test control, as saved in old Autotest paths.
48        'control.srv',
49        # All control files, as saved in skylab.
50        'control.from_control_name',
51)
52
53
54def parse_args():
55    """Parse args."""
56    # build up our options parser and parse sys.argv
57    parser = optparse.OptionParser()
58    parser.add_option("-m", help="Send mail for FAILED tests",
59                      dest="mailit", action="store_true")
60    parser.add_option("-r", help="Reparse the results of a job",
61                      dest="reparse", action="store_true")
62    parser.add_option("-o", help="Parse a single results directory",
63                      dest="singledir", action="store_true")
64    parser.add_option("-l", help=("Levels of subdirectories to include "
65                                  "in the job name"),
66                      type="int", dest="level", default=1)
67    parser.add_option("-n", help="No blocking on an existing parse",
68                      dest="noblock", action="store_true")
69    parser.add_option("-s", help="Database server hostname",
70                      dest="db_host", action="store")
71    parser.add_option("-u", help="Database username", dest="db_user",
72                      action="store")
73    parser.add_option("-p", help="Database password", dest="db_pass",
74                      action="store")
75    parser.add_option("-d", help="Database name", dest="db_name",
76                      action="store")
77    parser.add_option("--dry-run", help="Do not actually commit any results.",
78                      dest="dry_run", action="store_true", default=False)
79    parser.add_option(
80            "--detach", action="store_true",
81            help="Detach parsing process from the caller process. Used by "
82                 "monitor_db to safely restart without affecting parsing.",
83            default=False)
84    parser.add_option("--write-pidfile",
85                      help="write pidfile (.parser_execute)",
86                      dest="write_pidfile", action="store_true",
87                      default=False)
88    parser.add_option("--record-duration",
89                      help="[DEPRECATED] Record timing to metadata db",
90                      dest="record_duration", action="store_true",
91                      default=False)
92    parser.add_option("--suite-report",
93                      help=("Allows parsing job to attempt to create a suite "
94                            "timeline report, if it detects that the job being "
95                            "parsed is a suite job."),
96                      dest="suite_report", action="store_true",
97                      default=False)
98    parser.add_option("--datastore-creds",
99                      help=("The path to gcloud datastore credentials file, "
100                            "which will be used to upload suite timeline "
101                            "report to gcloud. If not specified, the one "
102                            "defined in shadow_config will be used."),
103                      dest="datastore_creds", action="store", default=None)
104    parser.add_option("--export-to-gcloud-path",
105                      help=("The path to export_to_gcloud script. Please find "
106                            "chromite path on your server. The script is under "
107                            "chromite/bin/."),
108                      dest="export_to_gcloud_path", action="store",
109                      default=None)
110    parser.add_option("--disable-perf-upload",
111                      help=("Do not upload perf results to chrome perf."),
112                      dest="disable_perf_upload", action="store_true",
113                      default=False)
114    options, args = parser.parse_args()
115
116    # we need a results directory
117    if len(args) == 0:
118        tko_utils.dprint("ERROR: at least one results directory must "
119                         "be provided")
120        parser.print_help()
121        sys.exit(1)
122
123    if not options.datastore_creds:
124        gcloud_creds = global_config.global_config.get_config_value(
125            'GCLOUD', 'cidb_datastore_writer_creds', default=None)
126        options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds)
127                                   if gcloud_creds else None)
128
129    if not options.export_to_gcloud_path:
130        export_script = 'chromiumos/chromite/bin/export_to_gcloud'
131        # If it is a lab server, the script is under ~chromeos-test/
132        if os.path.exists(os.path.expanduser('~chromeos-test/%s' %
133                                             export_script)):
134            path = os.path.expanduser('~chromeos-test/%s' % export_script)
135        # If it is a local workstation, it is probably under ~/
136        elif os.path.exists(os.path.expanduser('~/%s' % export_script)):
137            path = os.path.expanduser('~/%s' % export_script)
138        # If it is not found anywhere, the default will be set to None.
139        else:
140            path = None
141        options.export_to_gcloud_path = path
142
143    # pass the options back
144    return options, args
145
146
147def format_failure_message(jobname, kernel, testname, status, reason):
148    """Format failure message with the given information.
149
150    @param jobname: String representing the job name.
151    @param kernel: String representing the kernel.
152    @param testname: String representing the test name.
153    @param status: String representing the test status.
154    @param reason: String representing the reason.
155
156    @return: Failure message as a string.
157    """
158    format_string = "%-12s %-20s %-12s %-10s %s"
159    return format_string % (jobname, kernel, testname, status, reason)
160
161
162def mailfailure(jobname, job, message):
163    """Send an email about the failure.
164
165    @param jobname: String representing the job name.
166    @param job: A job object.
167    @param message: The message to mail.
168    """
169    message_lines = [""]
170    message_lines.append("The following tests FAILED for this job")
171    message_lines.append("http://%s/results/%s" %
172                         (socket.gethostname(), jobname))
173    message_lines.append("")
174    message_lines.append(format_failure_message("Job name", "Kernel",
175                                                "Test name", "FAIL/WARN",
176                                                "Failure reason"))
177    message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8,
178                                                "=" * 8, "=" * 14))
179    message_header = "\n".join(message_lines)
180
181    subject = "AUTOTEST: FAILED tests from job %s" % jobname
182    mail.send("", job.user, "", subject, message_header + message)
183
184
185def _invalidate_original_tests(orig_job_idx, retry_job_idx):
186    """Retry tests invalidates original tests.
187
188    Whenever a retry job is complete, we want to invalidate the original
189    job's test results, such that the consumers of the tko database
190    (e.g. tko frontend, wmatrix) could figure out which results are the latest.
191
192    When a retry job is parsed, we retrieve the original job's afe_job_id
193    from the retry job's keyvals, which is then converted to tko job_idx and
194    passed into this method as |orig_job_idx|.
195
196    In this method, we are going to invalidate the rows in tko_tests that are
197    associated with the original job by flipping their 'invalid' bit to True.
198    In addition, in tko_tests, we also maintain a pointer from the retry results
199    to the original results, so that later we can always know which rows in
200    tko_tests are retries and which are the corresponding original results.
201    This is done by setting the field 'invalidates_test_idx' of the tests
202    associated with the retry job.
203
204    For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after
205    this method is run, their tko_tests rows will look like:
206    __________________________________________________________________________
207    test_idx| job_idx | test            | ... | invalid | invalidates_test_idx
208    10      | 105     | dummy_Fail.Error| ... | 1       | NULL
209    11      | 105     | dummy_Fail.Fail | ... | 1       | NULL
210    ...
211    20      | 108     | dummy_Fail.Error| ... | 0       | 10
212    21      | 108     | dummy_Fail.Fail | ... | 0       | 11
213    __________________________________________________________________________
214    Note the invalid bits of the rows for Job(job_idx=105) are set to '1'.
215    And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108)
216    are set to 10 and 11 (the test_idx of the rows for the original job).
217
218    @param orig_job_idx: An integer representing the original job's
219                         tko job_idx. Tests associated with this job will
220                         be marked as 'invalid'.
221    @param retry_job_idx: An integer representing the retry job's
222                          tko job_idx. The field 'invalidates_test_idx'
223                          of the tests associated with this job will be updated.
224
225    """
226    msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx)
227    if not orig_job_idx or not retry_job_idx:
228        tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg)
229    # Using django models here makes things easier, but make sure that
230    # before this method is called, all other relevant transactions have been
231    # committed to avoid race condition. In the long run, we might consider
232    # to make the rest of parser use django models.
233    orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx)
234    retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx)
235
236    # Invalidate original tests.
237    orig_tests.update(invalid=True)
238
239    # Maintain a dictionary that maps (test, subdir) to original tests.
240    # Note that within the scope of a job, (test, subdir) uniquelly
241    # identifies a test run, but 'test' does not.
242    # In a control file, one could run the same test with different
243    # 'subdir_tag', for example,
244    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1')
245    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2')
246    # In tko, we will get
247    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1')
248    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2')
249    invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test
250                         for orig_test in orig_tests}
251    for retry in retry_tests:
252        # It is possible that (retry.test, retry.subdir) doesn't exist
253        # in invalidated_tests. This could happen when the original job
254        # didn't run some of its tests. For example, a dut goes offline
255        # since the beginning of the job, in which case invalidated_tests
256        # will only have one entry for 'SERVER_JOB'.
257        orig_test = invalidated_tests.get((retry.test, retry.subdir), None)
258        if orig_test:
259            retry.invalidates_test = orig_test
260            retry.save()
261    tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg)
262
263
264def _throttle_result_size(path):
265    """Limit the total size of test results for the given path.
266
267    @param path: Path of the result directory.
268    """
269    if not result_runner.ENABLE_RESULT_THROTTLING:
270        tko_utils.dprint(
271                'Result throttling is not enabled. Skipping throttling %s' %
272                path)
273        return
274
275    max_result_size_KB = _max_result_size_from_control(path)
276    if max_result_size_KB is None:
277        max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
278
279    try:
280        result_utils.execute(path, max_result_size_KB)
281    except:
282        tko_utils.dprint(
283                'Failed to throttle result size of %s.\nDetails %s' %
284                (path, traceback.format_exc()))
285
286
287def _max_result_size_from_control(path):
288    """Gets the max result size set in a control file, if any.
289
290    If not overrides is found, returns None.
291    """
292    for control_file in _HARDCODED_CONTROL_FILE_NAMES:
293        control = os.path.join(path, control_file)
294        if not os.path.exists(control):
295            continue
296
297        try:
298            max_result_size_KB = control_data.parse_control(
299                    control, raise_warnings=False).max_result_size_KB
300            if max_result_size_KB != control_data.DEFAULT_MAX_RESULT_SIZE_KB:
301                return max_result_size_KB
302        except IOError as e:
303            tko_utils.dprint(
304                    'Failed to access %s. Error: %s\nDetails %s' %
305                    (control, e, traceback.format_exc()))
306        except control_data.ControlVariableException as e:
307            tko_utils.dprint(
308                    'Failed to parse %s. Error: %s\nDetails %s' %
309                    (control, e, traceback.format_exc()))
310    return None
311
312
313def export_tko_job_to_file(job, jobname, filename):
314    """Exports the tko job to disk file.
315
316    @param job: database object.
317    @param jobname: the job name as string.
318    @param filename: The path to the results to be parsed.
319    """
320    from autotest_lib.tko import job_serializer
321
322    serializer = job_serializer.JobSerializer()
323    serializer.serialize_to_binary(job, jobname, filename)
324
325
326def parse_one(db, pid_file_manager, jobname, path, parse_options):
327    """Parse a single job. Optionally send email on failure.
328
329    @param db: database object.
330    @param pid_file_manager: pidfile.PidFileManager object.
331    @param jobname: the tag used to search for existing job in db,
332                    e.g. '1234-chromeos-test/host1'
333    @param path: The path to the results to be parsed.
334    @param parse_options: _ParseOptions instance.
335    """
336    reparse = parse_options.reparse
337    mail_on_failure = parse_options.mail_on_failure
338    dry_run = parse_options.dry_run
339    suite_report = parse_options.suite_report
340    datastore_creds = parse_options.datastore_creds
341    export_to_gcloud_path = parse_options.export_to_gcloud_path
342
343    tko_utils.dprint("\nScanning %s (%s)" % (jobname, path))
344    old_job_idx = db.find_job(jobname)
345    if old_job_idx is not None and not reparse:
346        tko_utils.dprint("! Job is already parsed, done")
347        return
348
349    # look up the status version
350    job_keyval = models.job.read_keyval(path)
351    status_version = job_keyval.get("status_version", 0)
352
353    parser = parser_lib.parser(status_version)
354    job = parser.make_job(path)
355    tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname))
356    status_log_path = _find_status_log_path(path)
357    if not status_log_path:
358        tko_utils.dprint("! Unable to parse job, no status file")
359        return
360    _parse_status_log(parser, job, status_log_path)
361
362    if old_job_idx is not None:
363        job.job_idx = old_job_idx
364        unmatched_tests = _match_existing_tests(db, job)
365        if not dry_run:
366            _delete_tests_from_db(db, unmatched_tests)
367
368    job.afe_job_id = tko_utils.get_afe_job_id(jobname)
369    job.skylab_task_id = tko_utils.get_skylab_task_id(jobname)
370    job.afe_parent_job_id = job_keyval.get(constants.PARENT_JOB_ID)
371    job.skylab_parent_task_id = job_keyval.get(constants.PARENT_JOB_ID)
372    job.build = None
373    job.board = None
374    job.build_version = None
375    job.suite = None
376    if job.label:
377        label_info = site_utils.parse_job_name(job.label)
378        if label_info:
379            job.build = label_info.get('build', None)
380            job.build_version = label_info.get('build_version', None)
381            job.board = label_info.get('board', None)
382            job.suite = label_info.get('suite', None)
383
384    if 'suite' in job.keyval_dict:
385      job.suite = job.keyval_dict['suite']
386
387    result_utils_lib.LOG =  tko_utils.dprint
388    _throttle_result_size(path)
389
390    # Record test result size to job_keyvals
391    start_time = time.time()
392    result_size_info = site_utils.collect_result_sizes(
393            path, log=tko_utils.dprint)
394    tko_utils.dprint('Finished collecting result sizes after %s seconds' %
395                     (time.time()-start_time))
396    job.keyval_dict.update(result_size_info.__dict__)
397
398    # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it.
399
400    # check for failures
401    message_lines = [""]
402    job_successful = True
403    for test in job.tests:
404        if not test.subdir:
405            continue
406        tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s"
407                         % (test.testname, test.subdir, test.status,
408                            test.reason))
409        if test.status not in ('GOOD', 'WARN'):
410            job_successful = False
411            pid_file_manager.num_tests_failed += 1
412            message_lines.append(format_failure_message(
413                jobname, test.kernel.base, test.subdir,
414                test.status, test.reason))
415
416    message = "\n".join(message_lines)
417
418    if not dry_run:
419        # send out a email report of failure
420        if len(message) > 2 and mail_on_failure:
421            tko_utils.dprint("Sending email report of failure on %s to %s"
422                                % (jobname, job.user))
423            mailfailure(jobname, job, message)
424
425        # Upload perf values to the perf dashboard, if applicable.
426        if parse_options.disable_perf_upload:
427            tko_utils.dprint("Skipping results upload to chrome perf as it is "
428                "disabled by config")
429        else:
430            for test in job.tests:
431                perf_uploader.upload_test(job, test, jobname)
432
433        # Upload job details to Sponge.
434        sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint)
435        if sponge_url:
436            job.keyval_dict['sponge_url'] = sponge_url
437
438        _write_job_to_db(db, jobname, job)
439
440        # Verify the job data is written to the database.
441        if job.tests:
442            tests_in_db = db.find_tests(job.job_idx)
443            tests_in_db_count = len(tests_in_db) if tests_in_db else 0
444            if tests_in_db_count != len(job.tests):
445                tko_utils.dprint(
446                        'Failed to find enough tests for job_idx: %d. The '
447                        'job should have %d tests, only found %d tests.' %
448                        (job.job_idx, len(job.tests), tests_in_db_count))
449                metrics.Counter(
450                        'chromeos/autotest/result/db_save_failure',
451                        description='The number of times parse failed to '
452                        'save job to TKO database.').increment()
453
454        # Although the cursor has autocommit, we still need to force it to
455        # commit existing changes before we can use django models, otherwise
456        # it will go into deadlock when django models try to start a new
457        # trasaction while the current one has not finished yet.
458        db.commit()
459
460        # Handle retry job.
461        orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID,
462                                            None)
463        if orig_afe_job_id:
464            orig_job_idx = tko_models.Job.objects.get(
465                    afe_job_id=orig_afe_job_id).job_idx
466            _invalidate_original_tests(orig_job_idx, job.job_idx)
467
468    # Serializing job into a binary file
469    export_tko_to_file = global_config.global_config.get_config_value(
470            'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False)
471
472    binary_file_name = os.path.join(path, "job.serialize")
473    if export_tko_to_file:
474        export_tko_job_to_file(job, jobname, binary_file_name)
475
476    if not dry_run:
477        db.commit()
478
479    # Generate a suite report.
480    # Check whether this is a suite job, a suite job will be a hostless job, its
481    # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be
482    # NULL. Only generate timeline report when datastore_parent_key is given.
483    datastore_parent_key = job_keyval.get('datastore_parent_key', None)
484    provision_job_id = job_keyval.get('provision_job_id', None)
485    if (suite_report and jobname.endswith('/hostless')
486        and job.suite and datastore_parent_key):
487        tko_utils.dprint('Start dumping suite timing report...')
488        timing_log = os.path.join(path, 'suite_timing.log')
489        dump_cmd = ("%s/site_utils/dump_suite_report.py %s "
490                    "--output='%s' --debug" %
491                    (common.autotest_dir, job.afe_job_id,
492                        timing_log))
493
494        if provision_job_id is not None:
495            dump_cmd += " --provision_job_id=%d" % int(provision_job_id)
496
497        subprocess.check_output(dump_cmd, shell=True)
498        tko_utils.dprint('Successfully finish dumping suite timing report')
499
500        if (datastore_creds and export_to_gcloud_path
501            and os.path.exists(export_to_gcloud_path)):
502            upload_cmd = [export_to_gcloud_path, datastore_creds,
503                            timing_log, '--parent_key',
504                            datastore_parent_key]
505            tko_utils.dprint('Start exporting timeline report to gcloud')
506            subprocess.check_output(upload_cmd)
507            tko_utils.dprint('Successfully export timeline report to '
508                                'gcloud')
509        else:
510            tko_utils.dprint('DEBUG: skip exporting suite timeline to '
511                                'gcloud, because either gcloud creds or '
512                                'export_to_gcloud script is not found.')
513
514    # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of
515    # the function, so any failure, e.g., db connection error, will stop
516    # gs_offloader_instructions being updated, and logs can be uploaded for
517    # troubleshooting.
518    if job_successful:
519        # Check if we should not offload this test's results.
520        if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False):
521            # Update the gs_offloader_instructions json file.
522            gs_instructions_file = os.path.join(
523                    path, constants.GS_OFFLOADER_INSTRUCTIONS)
524            gs_offloader_instructions = {}
525            if os.path.exists(gs_instructions_file):
526                with open(gs_instructions_file, 'r') as f:
527                    gs_offloader_instructions = json.load(f)
528
529            gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True
530            with open(gs_instructions_file, 'w') as f:
531                json.dump(gs_offloader_instructions, f)
532
533
534def _write_job_to_db(db, jobname, job):
535    """Write all TKO data associated with a job to DB.
536
537    This updates the job object as a side effect.
538
539    @param db: tko.db.db_sql object.
540    @param jobname: Name of the job to write.
541    @param job: tko.models.job object.
542    """
543    db.insert_or_update_machine(job)
544    db.insert_job(jobname, job)
545    db.insert_or_update_task_reference(
546            job,
547            'skylab' if tko_utils.is_skylab_task(jobname) else 'afe',
548    )
549    db.update_job_keyvals(job)
550    for test in job.tests:
551        db.insert_test(job, test)
552
553
554def _find_status_log_path(path):
555    if os.path.exists(os.path.join(path, "status.log")):
556        return os.path.join(path, "status.log")
557    if os.path.exists(os.path.join(path, "status")):
558        return os.path.join(path, "status")
559    return ""
560
561
562def _parse_status_log(parser, job, status_log_path):
563    status_lines = open(status_log_path).readlines()
564    parser.start(job)
565    tests = parser.end(status_lines)
566
567    # parser.end can return the same object multiple times, so filter out dups
568    job.tests = []
569    already_added = set()
570    for test in tests:
571        if test not in already_added:
572            already_added.add(test)
573            job.tests.append(test)
574
575
576def _match_existing_tests(db, job):
577    """Find entries in the DB corresponding to the job's tests, update job.
578
579    @return: Any unmatched tests in the db.
580    """
581    old_job_idx = job.job_idx
582    raw_old_tests = db.select("test_idx,subdir,test", "tko_tests",
583                                {"job_idx": old_job_idx})
584    if raw_old_tests:
585        old_tests = dict(((test, subdir), test_idx)
586                            for test_idx, subdir, test in raw_old_tests)
587    else:
588        old_tests = {}
589
590    for test in job.tests:
591        test_idx = old_tests.pop((test.testname, test.subdir), None)
592        if test_idx is not None:
593            test.test_idx = test_idx
594        else:
595            tko_utils.dprint("! Reparse returned new test "
596                                "testname=%r subdir=%r" %
597                                (test.testname, test.subdir))
598    return old_tests
599
600
601def _delete_tests_from_db(db, tests):
602    for test_idx in tests.itervalues():
603        where = {'test_idx' : test_idx}
604        db.delete('tko_iteration_result', where)
605        db.delete('tko_iteration_perf_value', where)
606        db.delete('tko_iteration_attributes', where)
607        db.delete('tko_test_attributes', where)
608        db.delete('tko_test_labels_tests', {'test_id': test_idx})
609        db.delete('tko_tests', where)
610
611
612def _get_job_subdirs(path):
613    """
614    Returns a list of job subdirectories at path. Returns None if the test
615    is itself a job directory. Does not recurse into the subdirs.
616    """
617    # if there's a .machines file, use it to get the subdirs
618    machine_list = os.path.join(path, ".machines")
619    if os.path.exists(machine_list):
620        subdirs = set(line.strip() for line in file(machine_list))
621        existing_subdirs = set(subdir for subdir in subdirs
622                               if os.path.exists(os.path.join(path, subdir)))
623        if len(existing_subdirs) != 0:
624            return existing_subdirs
625
626    # if this dir contains ONLY subdirectories, return them
627    contents = set(os.listdir(path))
628    contents.discard(".parse.lock")
629    subdirs = set(sub for sub in contents if
630                  os.path.isdir(os.path.join(path, sub)))
631    if len(contents) == len(subdirs) != 0:
632        return subdirs
633
634    # this is a job directory, or something else we don't understand
635    return None
636
637
638def parse_leaf_path(db, pid_file_manager, path, level, parse_options):
639    """Parse a leaf path.
640
641    @param db: database handle.
642    @param pid_file_manager: pidfile.PidFileManager object.
643    @param path: The path to the results to be parsed.
644    @param level: Integer, level of subdirectories to include in the job name.
645    @param parse_options: _ParseOptions instance.
646
647    @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1'
648    """
649    job_elements = path.split("/")[-level:]
650    jobname = "/".join(job_elements)
651    db.run_with_retry(parse_one, db, pid_file_manager, jobname, path,
652                      parse_options)
653    return jobname
654
655
656def parse_path(db, pid_file_manager, path, level, parse_options):
657    """Parse a path
658
659    @param db: database handle.
660    @param pid_file_manager: pidfile.PidFileManager object.
661    @param path: The path to the results to be parsed.
662    @param level: Integer, level of subdirectories to include in the job name.
663    @param parse_options: _ParseOptions instance.
664
665    @returns: A set of job names of the parsed jobs.
666              set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
667    """
668    processed_jobs = set()
669    job_subdirs = _get_job_subdirs(path)
670    if job_subdirs is not None:
671        # parse status.log in current directory, if it exists. multi-machine
672        # synchronous server side tests record output in this directory. without
673        # this check, we do not parse these results.
674        if os.path.exists(os.path.join(path, 'status.log')):
675            new_job = parse_leaf_path(db, pid_file_manager, path, level,
676                                      parse_options)
677            processed_jobs.add(new_job)
678        # multi-machine job
679        for subdir in job_subdirs:
680            jobpath = os.path.join(path, subdir)
681            new_jobs = parse_path(db, pid_file_manager, jobpath, level + 1,
682                                  parse_options)
683            processed_jobs.update(new_jobs)
684    else:
685        # single machine job
686        new_job = parse_leaf_path(db, pid_file_manager, path, level,
687                                  parse_options)
688        processed_jobs.add(new_job)
689    return processed_jobs
690
691
692def _detach_from_parent_process():
693    """Allow reparenting the parse process away from caller.
694
695    When monitor_db is run via upstart, restarting the job sends SIGTERM to
696    the whole process group. This makes us immune from that.
697    """
698    if os.getpid() != os.getpgid(0):
699        os.setsid()
700
701
702def main():
703    """tko_parse entry point."""
704    options, args = parse_args()
705
706    # We are obliged to use indirect=False, not use the SetupTsMonGlobalState
707    # context manager, and add a manual flush, because tko/parse is expected to
708    # be a very short lived (<1 min) script when working effectively, and we
709    # can't afford to either a) wait for up to 1min for metrics to flush at the
710    # end or b) drop metrics that were sent within the last minute of execution.
711    site_utils.SetupTsMonGlobalState('tko_parse', indirect=False,
712                                     short_lived=True)
713    try:
714        with metrics.SuccessCounter('chromeos/autotest/tko_parse/runs'):
715            _main_with_options(options, args)
716    finally:
717        metrics.Flush()
718
719
720def _main_with_options(options, args):
721    """Entry point with options parsed and metrics already set up."""
722    # Record the processed jobs so that
723    # we can send the duration of parsing to metadata db.
724    processed_jobs = set()
725
726    if options.detach:
727        _detach_from_parent_process()
728
729    results_dir = os.path.abspath(args[0])
730    assert os.path.exists(results_dir)
731
732    _update_db_config_from_json(options, results_dir)
733
734    parse_options = _ParseOptions(options.reparse, options.mailit,
735                                  options.dry_run, options.suite_report,
736                                  options.datastore_creds,
737                                  options.export_to_gcloud_path,
738                                  options.disable_perf_upload)
739
740    pid_file_manager = pidfile.PidFileManager("parser", results_dir)
741
742    if options.write_pidfile:
743        pid_file_manager.open_file()
744
745    try:
746        # build up the list of job dirs to parse
747        if options.singledir:
748            jobs_list = [results_dir]
749        else:
750            jobs_list = [os.path.join(results_dir, subdir)
751                         for subdir in os.listdir(results_dir)]
752
753        # build up the database
754        db = tko_db.db(autocommit=False, host=options.db_host,
755                       user=options.db_user, password=options.db_pass,
756                       database=options.db_name)
757
758        # parse all the jobs
759        for path in jobs_list:
760            lockfile = open(os.path.join(path, ".parse.lock"), "w")
761            flags = fcntl.LOCK_EX
762            if options.noblock:
763                flags |= fcntl.LOCK_NB
764            try:
765                fcntl.flock(lockfile, flags)
766            except IOError, e:
767                # lock is not available and nonblock has been requested
768                if e.errno == errno.EWOULDBLOCK:
769                    lockfile.close()
770                    continue
771                else:
772                    raise # something unexpected happened
773            try:
774                new_jobs = parse_path(db, pid_file_manager, path, options.level,
775                                      parse_options)
776                processed_jobs.update(new_jobs)
777
778            finally:
779                fcntl.flock(lockfile, fcntl.LOCK_UN)
780                lockfile.close()
781
782    except Exception as e:
783        pid_file_manager.close_file(1)
784        raise
785    else:
786        pid_file_manager.close_file(0)
787
788
789def _update_db_config_from_json(options, test_results_dir):
790    """Uptade DB config options using a side_effects_config.json file.
791
792    @param options: parsed args to be updated.
793    @param test_results_dir: path to test results dir.
794
795    @raises: json_format.ParseError if the file is not a valid JSON.
796             ValueError if the JSON config is incomplete.
797             OSError if some files from the JSON config are missing.
798    """
799    # results_dir passed to tko/parse is a subdir of the root results dir
800    config_dir = os.path.join(test_results_dir, os.pardir)
801    tko_utils.dprint("Attempting to read side_effects.Config from %s" %
802        config_dir)
803    config = config_loader.load(config_dir)
804
805    if config:
806        tko_utils.dprint("Validating side_effects.Config.tko")
807        config_loader.validate_tko(config)
808
809        tko_utils.dprint("Using the following DB config params from "
810            "side_effects.Config.tko:\n%s" % config.tko)
811        options.db_host = config.tko.proxy_socket
812        options.db_user = config.tko.mysql_user
813
814        with open(config.tko.mysql_password_file, 'r') as f:
815            options.db_pass = f.read().rstrip('\n')
816
817        options.disable_perf_upload = not config.chrome_perf.enabled
818    else:
819        tko_utils.dprint("No side_effects.Config found in %s - "
820            "defaulting to DB config values from shadow config" % config_dir)
821
822
823if __name__ == "__main__":
824    main()
825