• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python -u
2
3import collections
4import errno
5import fcntl
6import json
7import optparse
8import os
9import socket
10import subprocess
11import sys
12import time
13import traceback
14
15import common
16from autotest_lib.client.bin.result_tools import utils as result_utils
17from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib
18from autotest_lib.client.bin.result_tools import runner as result_runner
19from autotest_lib.client.common_lib import control_data
20from autotest_lib.client.common_lib import global_config
21from autotest_lib.client.common_lib import mail, pidfile
22from autotest_lib.client.common_lib import utils
23from autotest_lib.frontend import setup_django_environment
24from autotest_lib.frontend.tko import models as tko_models
25from autotest_lib.server import site_utils
26from autotest_lib.server.cros.dynamic_suite import constants
27from autotest_lib.site_utils.sponge_lib import sponge_utils
28from autotest_lib.tko import db as tko_db, utils as tko_utils
29from autotest_lib.tko import models, parser_lib
30from autotest_lib.tko.perf_upload import perf_uploader
31
32try:
33    from chromite.lib import metrics
34except ImportError:
35    metrics = utils.metrics_mock
36
37
38_ParseOptions = collections.namedtuple(
39    'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report',
40                     'datastore_creds', 'export_to_gcloud_path'])
41
42_HARDCODED_CONTROL_FILE_NAMES = (
43        # client side test control, as saved in old Autotest paths.
44        'control',
45        # server side test control, as saved in old Autotest paths.
46        'control.srv',
47        # All control files, as saved in skylab.
48        'control.from_control_name',
49)
50
51
52def parse_args():
53    """Parse args."""
54    # build up our options parser and parse sys.argv
55    parser = optparse.OptionParser()
56    parser.add_option("-m", help="Send mail for FAILED tests",
57                      dest="mailit", action="store_true")
58    parser.add_option("-r", help="Reparse the results of a job",
59                      dest="reparse", action="store_true")
60    parser.add_option("-o", help="Parse a single results directory",
61                      dest="singledir", action="store_true")
62    parser.add_option("-l", help=("Levels of subdirectories to include "
63                                  "in the job name"),
64                      type="int", dest="level", default=1)
65    parser.add_option("-n", help="No blocking on an existing parse",
66                      dest="noblock", action="store_true")
67    parser.add_option("-s", help="Database server hostname",
68                      dest="db_host", action="store")
69    parser.add_option("-u", help="Database username", dest="db_user",
70                      action="store")
71    parser.add_option("-p", help="Database password", dest="db_pass",
72                      action="store")
73    parser.add_option("-d", help="Database name", dest="db_name",
74                      action="store")
75    parser.add_option("--dry-run", help="Do not actually commit any results.",
76                      dest="dry_run", action="store_true", default=False)
77    parser.add_option(
78            "--detach", action="store_true",
79            help="Detach parsing process from the caller process. Used by "
80                 "monitor_db to safely restart without affecting parsing.",
81            default=False)
82    parser.add_option("--write-pidfile",
83                      help="write pidfile (.parser_execute)",
84                      dest="write_pidfile", action="store_true",
85                      default=False)
86    parser.add_option("--record-duration",
87                      help="[DEPRECATED] Record timing to metadata db",
88                      dest="record_duration", action="store_true",
89                      default=False)
90    parser.add_option("--suite-report",
91                      help=("Allows parsing job to attempt to create a suite "
92                            "timeline report, if it detects that the job being "
93                            "parsed is a suite job."),
94                      dest="suite_report", action="store_true",
95                      default=False)
96    parser.add_option("--datastore-creds",
97                      help=("The path to gcloud datastore credentials file, "
98                            "which will be used to upload suite timeline "
99                            "report to gcloud. If not specified, the one "
100                            "defined in shadow_config will be used."),
101                      dest="datastore_creds", action="store", default=None)
102    parser.add_option("--export-to-gcloud-path",
103                      help=("The path to export_to_gcloud script. Please find "
104                            "chromite path on your server. The script is under "
105                            "chromite/bin/."),
106                      dest="export_to_gcloud_path", action="store",
107                      default=None)
108    options, args = parser.parse_args()
109
110    # we need a results directory
111    if len(args) == 0:
112        tko_utils.dprint("ERROR: at least one results directory must "
113                         "be provided")
114        parser.print_help()
115        sys.exit(1)
116
117    if not options.datastore_creds:
118        gcloud_creds = global_config.global_config.get_config_value(
119            'GCLOUD', 'cidb_datastore_writer_creds', default=None)
120        options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds)
121                                   if gcloud_creds else None)
122
123    if not options.export_to_gcloud_path:
124        export_script = 'chromiumos/chromite/bin/export_to_gcloud'
125        # If it is a lab server, the script is under ~chromeos-test/
126        if os.path.exists(os.path.expanduser('~chromeos-test/%s' %
127                                             export_script)):
128            path = os.path.expanduser('~chromeos-test/%s' % export_script)
129        # If it is a local workstation, it is probably under ~/
130        elif os.path.exists(os.path.expanduser('~/%s' % export_script)):
131            path = os.path.expanduser('~/%s' % export_script)
132        # If it is not found anywhere, the default will be set to None.
133        else:
134            path = None
135        options.export_to_gcloud_path = path
136
137    # pass the options back
138    return options, args
139
140
141def format_failure_message(jobname, kernel, testname, status, reason):
142    """Format failure message with the given information.
143
144    @param jobname: String representing the job name.
145    @param kernel: String representing the kernel.
146    @param testname: String representing the test name.
147    @param status: String representing the test status.
148    @param reason: String representing the reason.
149
150    @return: Failure message as a string.
151    """
152    format_string = "%-12s %-20s %-12s %-10s %s"
153    return format_string % (jobname, kernel, testname, status, reason)
154
155
156def mailfailure(jobname, job, message):
157    """Send an email about the failure.
158
159    @param jobname: String representing the job name.
160    @param job: A job object.
161    @param message: The message to mail.
162    """
163    message_lines = [""]
164    message_lines.append("The following tests FAILED for this job")
165    message_lines.append("http://%s/results/%s" %
166                         (socket.gethostname(), jobname))
167    message_lines.append("")
168    message_lines.append(format_failure_message("Job name", "Kernel",
169                                                "Test name", "FAIL/WARN",
170                                                "Failure reason"))
171    message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8,
172                                                "=" * 8, "=" * 14))
173    message_header = "\n".join(message_lines)
174
175    subject = "AUTOTEST: FAILED tests from job %s" % jobname
176    mail.send("", job.user, "", subject, message_header + message)
177
178
179def _invalidate_original_tests(orig_job_idx, retry_job_idx):
180    """Retry tests invalidates original tests.
181
182    Whenever a retry job is complete, we want to invalidate the original
183    job's test results, such that the consumers of the tko database
184    (e.g. tko frontend, wmatrix) could figure out which results are the latest.
185
186    When a retry job is parsed, we retrieve the original job's afe_job_id
187    from the retry job's keyvals, which is then converted to tko job_idx and
188    passed into this method as |orig_job_idx|.
189
190    In this method, we are going to invalidate the rows in tko_tests that are
191    associated with the original job by flipping their 'invalid' bit to True.
192    In addition, in tko_tests, we also maintain a pointer from the retry results
193    to the original results, so that later we can always know which rows in
194    tko_tests are retries and which are the corresponding original results.
195    This is done by setting the field 'invalidates_test_idx' of the tests
196    associated with the retry job.
197
198    For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after
199    this method is run, their tko_tests rows will look like:
200    __________________________________________________________________________
201    test_idx| job_idx | test            | ... | invalid | invalidates_test_idx
202    10      | 105     | dummy_Fail.Error| ... | 1       | NULL
203    11      | 105     | dummy_Fail.Fail | ... | 1       | NULL
204    ...
205    20      | 108     | dummy_Fail.Error| ... | 0       | 10
206    21      | 108     | dummy_Fail.Fail | ... | 0       | 11
207    __________________________________________________________________________
208    Note the invalid bits of the rows for Job(job_idx=105) are set to '1'.
209    And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108)
210    are set to 10 and 11 (the test_idx of the rows for the original job).
211
212    @param orig_job_idx: An integer representing the original job's
213                         tko job_idx. Tests associated with this job will
214                         be marked as 'invalid'.
215    @param retry_job_idx: An integer representing the retry job's
216                          tko job_idx. The field 'invalidates_test_idx'
217                          of the tests associated with this job will be updated.
218
219    """
220    msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx)
221    if not orig_job_idx or not retry_job_idx:
222        tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg)
223    # Using django models here makes things easier, but make sure that
224    # before this method is called, all other relevant transactions have been
225    # committed to avoid race condition. In the long run, we might consider
226    # to make the rest of parser use django models.
227    orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx)
228    retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx)
229
230    # Invalidate original tests.
231    orig_tests.update(invalid=True)
232
233    # Maintain a dictionary that maps (test, subdir) to original tests.
234    # Note that within the scope of a job, (test, subdir) uniquelly
235    # identifies a test run, but 'test' does not.
236    # In a control file, one could run the same test with different
237    # 'subdir_tag', for example,
238    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1')
239    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2')
240    # In tko, we will get
241    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1')
242    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2')
243    invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test
244                         for orig_test in orig_tests}
245    for retry in retry_tests:
246        # It is possible that (retry.test, retry.subdir) doesn't exist
247        # in invalidated_tests. This could happen when the original job
248        # didn't run some of its tests. For example, a dut goes offline
249        # since the beginning of the job, in which case invalidated_tests
250        # will only have one entry for 'SERVER_JOB'.
251        orig_test = invalidated_tests.get((retry.test, retry.subdir), None)
252        if orig_test:
253            retry.invalidates_test = orig_test
254            retry.save()
255    tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg)
256
257
258def _throttle_result_size(path):
259    """Limit the total size of test results for the given path.
260
261    @param path: Path of the result directory.
262    """
263    if not result_runner.ENABLE_RESULT_THROTTLING:
264        tko_utils.dprint(
265                'Result throttling is not enabled. Skipping throttling %s' %
266                path)
267        return
268
269    max_result_size_KB = _max_result_size_from_control(path)
270    if max_result_size_KB is None:
271        max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
272
273    try:
274        result_utils.execute(path, max_result_size_KB)
275    except:
276        tko_utils.dprint(
277                'Failed to throttle result size of %s.\nDetails %s' %
278                (path, traceback.format_exc()))
279
280
281def _max_result_size_from_control(path):
282    """Gets the max result size set in a control file, if any.
283
284    If not overrides is found, returns None.
285    """
286    for control_file in _HARDCODED_CONTROL_FILE_NAMES:
287        control = os.path.join(path, control_file)
288        if not os.path.exists(control):
289            continue
290
291        try:
292            max_result_size_KB = control_data.parse_control(
293                    control, raise_warnings=False).max_result_size_KB
294            if max_result_size_KB != control_data.DEFAULT_MAX_RESULT_SIZE_KB:
295                return max_result_size_KB
296        except IOError as e:
297            tko_utils.dprint(
298                    'Failed to access %s. Error: %s\nDetails %s' %
299                    (control, e, traceback.format_exc()))
300        except control_data.ControlVariableException as e:
301            tko_utils.dprint(
302                    'Failed to parse %s. Error: %s\nDetails %s' %
303                    (control, e, traceback.format_exc()))
304    return None
305
306
307def export_tko_job_to_file(job, jobname, filename):
308    """Exports the tko job to disk file.
309
310    @param job: database object.
311    @param jobname: the job name as string.
312    @param filename: The path to the results to be parsed.
313    """
314    from autotest_lib.tko import job_serializer
315
316    serializer = job_serializer.JobSerializer()
317    serializer.serialize_to_binary(job, jobname, filename)
318
319
320def parse_one(db, pid_file_manager, jobname, path, parse_options):
321    """Parse a single job. Optionally send email on failure.
322
323    @param db: database object.
324    @param pid_file_manager: pidfile.PidFileManager object.
325    @param jobname: the tag used to search for existing job in db,
326                    e.g. '1234-chromeos-test/host1'
327    @param path: The path to the results to be parsed.
328    @param parse_options: _ParseOptions instance.
329    """
330    reparse = parse_options.reparse
331    mail_on_failure = parse_options.mail_on_failure
332    dry_run = parse_options.dry_run
333    suite_report = parse_options.suite_report
334    datastore_creds = parse_options.datastore_creds
335    export_to_gcloud_path = parse_options.export_to_gcloud_path
336
337    tko_utils.dprint("\nScanning %s (%s)" % (jobname, path))
338    old_job_idx = db.find_job(jobname)
339    if old_job_idx is not None and not reparse:
340        tko_utils.dprint("! Job is already parsed, done")
341        return
342
343    # look up the status version
344    job_keyval = models.job.read_keyval(path)
345    status_version = job_keyval.get("status_version", 0)
346
347    parser = parser_lib.parser(status_version)
348    job = parser.make_job(path)
349    tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname))
350    status_log_path = _find_status_log_path(path)
351    if not status_log_path:
352        tko_utils.dprint("! Unable to parse job, no status file")
353        return
354    _parse_status_log(parser, job, status_log_path)
355
356    if old_job_idx is not None:
357        job.job_idx = old_job_idx
358        unmatched_tests = _match_existing_tests(db, job)
359        if not dry_run:
360            _delete_tests_from_db(db, unmatched_tests)
361
362    job.afe_job_id = tko_utils.get_afe_job_id(jobname)
363    job.skylab_task_id = tko_utils.get_skylab_task_id(jobname)
364    job.afe_parent_job_id = job_keyval.get(constants.PARENT_JOB_ID)
365    job.skylab_parent_task_id = job_keyval.get(constants.PARENT_JOB_ID)
366    job.build = None
367    job.board = None
368    job.build_version = None
369    job.suite = None
370    if job.label:
371        label_info = site_utils.parse_job_name(job.label)
372        if label_info:
373            job.build = label_info.get('build', None)
374            job.build_version = label_info.get('build_version', None)
375            job.board = label_info.get('board', None)
376            job.suite = label_info.get('suite', None)
377
378    result_utils_lib.LOG =  tko_utils.dprint
379    _throttle_result_size(path)
380
381    # Record test result size to job_keyvals
382    start_time = time.time()
383    result_size_info = site_utils.collect_result_sizes(
384            path, log=tko_utils.dprint)
385    tko_utils.dprint('Finished collecting result sizes after %s seconds' %
386                     (time.time()-start_time))
387    job.keyval_dict.update(result_size_info.__dict__)
388
389    # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it.
390
391    # check for failures
392    message_lines = [""]
393    job_successful = True
394    for test in job.tests:
395        if not test.subdir:
396            continue
397        tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s"
398                         % (test.testname, test.subdir, test.status,
399                            test.reason))
400        if test.status not in ('GOOD', 'WARN'):
401            job_successful = False
402            pid_file_manager.num_tests_failed += 1
403            message_lines.append(format_failure_message(
404                jobname, test.kernel.base, test.subdir,
405                test.status, test.reason))
406
407    message = "\n".join(message_lines)
408
409    if not dry_run:
410        # send out a email report of failure
411        if len(message) > 2 and mail_on_failure:
412            tko_utils.dprint("Sending email report of failure on %s to %s"
413                                % (jobname, job.user))
414            mailfailure(jobname, job, message)
415
416        # Upload perf values to the perf dashboard, if applicable.
417        for test in job.tests:
418            perf_uploader.upload_test(job, test, jobname)
419
420        # Upload job details to Sponge.
421        sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint)
422        if sponge_url:
423            job.keyval_dict['sponge_url'] = sponge_url
424
425        _write_job_to_db(db, jobname, job)
426
427        # Verify the job data is written to the database.
428        if job.tests:
429            tests_in_db = db.find_tests(job.job_idx)
430            tests_in_db_count = len(tests_in_db) if tests_in_db else 0
431            if tests_in_db_count != len(job.tests):
432                tko_utils.dprint(
433                        'Failed to find enough tests for job_idx: %d. The '
434                        'job should have %d tests, only found %d tests.' %
435                        (job.job_idx, len(job.tests), tests_in_db_count))
436                metrics.Counter(
437                        'chromeos/autotest/result/db_save_failure',
438                        description='The number of times parse failed to '
439                        'save job to TKO database.').increment()
440
441        # Although the cursor has autocommit, we still need to force it to
442        # commit existing changes before we can use django models, otherwise
443        # it will go into deadlock when django models try to start a new
444        # trasaction while the current one has not finished yet.
445        db.commit()
446
447        # Handle retry job.
448        orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID,
449                                            None)
450        if orig_afe_job_id:
451            orig_job_idx = tko_models.Job.objects.get(
452                    afe_job_id=orig_afe_job_id).job_idx
453            _invalidate_original_tests(orig_job_idx, job.job_idx)
454
455    # Serializing job into a binary file
456    export_tko_to_file = global_config.global_config.get_config_value(
457            'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False)
458
459    binary_file_name = os.path.join(path, "job.serialize")
460    if export_tko_to_file:
461        export_tko_job_to_file(job, jobname, binary_file_name)
462
463    if not dry_run:
464        db.commit()
465
466    # Generate a suite report.
467    # Check whether this is a suite job, a suite job will be a hostless job, its
468    # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be
469    # NULL. Only generate timeline report when datastore_parent_key is given.
470    datastore_parent_key = job_keyval.get('datastore_parent_key', None)
471    provision_job_id = job_keyval.get('provision_job_id', None)
472    if (suite_report and jobname.endswith('/hostless')
473        and job.suite and datastore_parent_key):
474        tko_utils.dprint('Start dumping suite timing report...')
475        timing_log = os.path.join(path, 'suite_timing.log')
476        dump_cmd = ("%s/site_utils/dump_suite_report.py %s "
477                    "--output='%s' --debug" %
478                    (common.autotest_dir, job.afe_job_id,
479                        timing_log))
480
481        if provision_job_id is not None:
482            dump_cmd += " --provision_job_id=%d" % int(provision_job_id)
483
484        subprocess.check_output(dump_cmd, shell=True)
485        tko_utils.dprint('Successfully finish dumping suite timing report')
486
487        if (datastore_creds and export_to_gcloud_path
488            and os.path.exists(export_to_gcloud_path)):
489            upload_cmd = [export_to_gcloud_path, datastore_creds,
490                            timing_log, '--parent_key',
491                            datastore_parent_key]
492            tko_utils.dprint('Start exporting timeline report to gcloud')
493            subprocess.check_output(upload_cmd)
494            tko_utils.dprint('Successfully export timeline report to '
495                                'gcloud')
496        else:
497            tko_utils.dprint('DEBUG: skip exporting suite timeline to '
498                                'gcloud, because either gcloud creds or '
499                                'export_to_gcloud script is not found.')
500
501    # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of
502    # the function, so any failure, e.g., db connection error, will stop
503    # gs_offloader_instructions being updated, and logs can be uploaded for
504    # troubleshooting.
505    if job_successful:
506        # Check if we should not offload this test's results.
507        if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False):
508            # Update the gs_offloader_instructions json file.
509            gs_instructions_file = os.path.join(
510                    path, constants.GS_OFFLOADER_INSTRUCTIONS)
511            gs_offloader_instructions = {}
512            if os.path.exists(gs_instructions_file):
513                with open(gs_instructions_file, 'r') as f:
514                    gs_offloader_instructions = json.load(f)
515
516            gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True
517            with open(gs_instructions_file, 'w') as f:
518                json.dump(gs_offloader_instructions, f)
519
520
521def _write_job_to_db(db, jobname, job):
522    """Write all TKO data associated with a job to DB.
523
524    This updates the job object as a side effect.
525
526    @param db: tko.db.db_sql object.
527    @param jobname: Name of the job to write.
528    @param job: tko.models.job object.
529    """
530    db.insert_or_update_machine(job)
531    db.insert_job(jobname, job)
532    db.insert_or_update_task_reference(
533            job,
534            'skylab' if tko_utils.is_skylab_task(jobname) else 'afe',
535    )
536    db.update_job_keyvals(job)
537    for test in job.tests:
538        db.insert_test(job, test)
539
540
541def _find_status_log_path(path):
542    if os.path.exists(os.path.join(path, "status.log")):
543        return os.path.join(path, "status.log")
544    if os.path.exists(os.path.join(path, "status")):
545        return os.path.join(path, "status")
546    return ""
547
548
549def _parse_status_log(parser, job, status_log_path):
550    status_lines = open(status_log_path).readlines()
551    parser.start(job)
552    tests = parser.end(status_lines)
553
554    # parser.end can return the same object multiple times, so filter out dups
555    job.tests = []
556    already_added = set()
557    for test in tests:
558        if test not in already_added:
559            already_added.add(test)
560            job.tests.append(test)
561
562
563def _match_existing_tests(db, job):
564    """Find entries in the DB corresponding to the job's tests, update job.
565
566    @return: Any unmatched tests in the db.
567    """
568    old_job_idx = job.job_idx
569    raw_old_tests = db.select("test_idx,subdir,test", "tko_tests",
570                                {"job_idx": old_job_idx})
571    if raw_old_tests:
572        old_tests = dict(((test, subdir), test_idx)
573                            for test_idx, subdir, test in raw_old_tests)
574    else:
575        old_tests = {}
576
577    for test in job.tests:
578        test_idx = old_tests.pop((test.testname, test.subdir), None)
579        if test_idx is not None:
580            test.test_idx = test_idx
581        else:
582            tko_utils.dprint("! Reparse returned new test "
583                                "testname=%r subdir=%r" %
584                                (test.testname, test.subdir))
585    return old_tests
586
587
588def _delete_tests_from_db(db, tests):
589    for test_idx in tests.itervalues():
590        where = {'test_idx' : test_idx}
591        db.delete('tko_iteration_result', where)
592        db.delete('tko_iteration_perf_value', where)
593        db.delete('tko_iteration_attributes', where)
594        db.delete('tko_test_attributes', where)
595        db.delete('tko_test_labels_tests', {'test_id': test_idx})
596        db.delete('tko_tests', where)
597
598
599def _get_job_subdirs(path):
600    """
601    Returns a list of job subdirectories at path. Returns None if the test
602    is itself a job directory. Does not recurse into the subdirs.
603    """
604    # if there's a .machines file, use it to get the subdirs
605    machine_list = os.path.join(path, ".machines")
606    if os.path.exists(machine_list):
607        subdirs = set(line.strip() for line in file(machine_list))
608        existing_subdirs = set(subdir for subdir in subdirs
609                               if os.path.exists(os.path.join(path, subdir)))
610        if len(existing_subdirs) != 0:
611            return existing_subdirs
612
613    # if this dir contains ONLY subdirectories, return them
614    contents = set(os.listdir(path))
615    contents.discard(".parse.lock")
616    subdirs = set(sub for sub in contents if
617                  os.path.isdir(os.path.join(path, sub)))
618    if len(contents) == len(subdirs) != 0:
619        return subdirs
620
621    # this is a job directory, or something else we don't understand
622    return None
623
624
625def parse_leaf_path(db, pid_file_manager, path, level, parse_options):
626    """Parse a leaf path.
627
628    @param db: database handle.
629    @param pid_file_manager: pidfile.PidFileManager object.
630    @param path: The path to the results to be parsed.
631    @param level: Integer, level of subdirectories to include in the job name.
632    @param parse_options: _ParseOptions instance.
633
634    @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1'
635    """
636    job_elements = path.split("/")[-level:]
637    jobname = "/".join(job_elements)
638    db.run_with_retry(parse_one, db, pid_file_manager, jobname, path,
639                      parse_options)
640    return jobname
641
642
643def parse_path(db, pid_file_manager, path, level, parse_options):
644    """Parse a path
645
646    @param db: database handle.
647    @param pid_file_manager: pidfile.PidFileManager object.
648    @param path: The path to the results to be parsed.
649    @param level: Integer, level of subdirectories to include in the job name.
650    @param parse_options: _ParseOptions instance.
651
652    @returns: A set of job names of the parsed jobs.
653              set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
654    """
655    processed_jobs = set()
656    job_subdirs = _get_job_subdirs(path)
657    if job_subdirs is not None:
658        # parse status.log in current directory, if it exists. multi-machine
659        # synchronous server side tests record output in this directory. without
660        # this check, we do not parse these results.
661        if os.path.exists(os.path.join(path, 'status.log')):
662            new_job = parse_leaf_path(db, pid_file_manager, path, level,
663                                      parse_options)
664            processed_jobs.add(new_job)
665        # multi-machine job
666        for subdir in job_subdirs:
667            jobpath = os.path.join(path, subdir)
668            new_jobs = parse_path(db, pid_file_manager, jobpath, level + 1,
669                                  parse_options)
670            processed_jobs.update(new_jobs)
671    else:
672        # single machine job
673        new_job = parse_leaf_path(db, pid_file_manager, path, level,
674                                  parse_options)
675        processed_jobs.add(new_job)
676    return processed_jobs
677
678
679def _detach_from_parent_process():
680    """Allow reparenting the parse process away from caller.
681
682    When monitor_db is run via upstart, restarting the job sends SIGTERM to
683    the whole process group. This makes us immune from that.
684    """
685    if os.getpid() != os.getpgid(0):
686        os.setsid()
687
688
689def main():
690    """tko_parse entry point."""
691    options, args = parse_args()
692
693    # We are obliged to use indirect=False, not use the SetupTsMonGlobalState
694    # context manager, and add a manual flush, because tko/parse is expected to
695    # be a very short lived (<1 min) script when working effectively, and we
696    # can't afford to either a) wait for up to 1min for metrics to flush at the
697    # end or b) drop metrics that were sent within the last minute of execution.
698    site_utils.SetupTsMonGlobalState('tko_parse', indirect=False,
699                                     short_lived=True)
700    try:
701        with metrics.SuccessCounter('chromeos/autotest/tko_parse/runs'):
702            _main_with_options(options, args)
703    finally:
704        metrics.Flush()
705
706
707def _main_with_options(options, args):
708    """Entry point with options parsed and metrics already set up."""
709    # Record the processed jobs so that
710    # we can send the duration of parsing to metadata db.
711    processed_jobs = set()
712
713    if options.detach:
714        _detach_from_parent_process()
715
716    parse_options = _ParseOptions(options.reparse, options.mailit,
717                                  options.dry_run, options.suite_report,
718                                  options.datastore_creds,
719                                  options.export_to_gcloud_path)
720    results_dir = os.path.abspath(args[0])
721    assert os.path.exists(results_dir)
722
723    pid_file_manager = pidfile.PidFileManager("parser", results_dir)
724
725    if options.write_pidfile:
726        pid_file_manager.open_file()
727
728    try:
729        # build up the list of job dirs to parse
730        if options.singledir:
731            jobs_list = [results_dir]
732        else:
733            jobs_list = [os.path.join(results_dir, subdir)
734                         for subdir in os.listdir(results_dir)]
735
736        # build up the database
737        db = tko_db.db(autocommit=False, host=options.db_host,
738                       user=options.db_user, password=options.db_pass,
739                       database=options.db_name)
740
741        # parse all the jobs
742        for path in jobs_list:
743            lockfile = open(os.path.join(path, ".parse.lock"), "w")
744            flags = fcntl.LOCK_EX
745            if options.noblock:
746                flags |= fcntl.LOCK_NB
747            try:
748                fcntl.flock(lockfile, flags)
749            except IOError, e:
750                # lock is not available and nonblock has been requested
751                if e.errno == errno.EWOULDBLOCK:
752                    lockfile.close()
753                    continue
754                else:
755                    raise # something unexpected happened
756            try:
757                new_jobs = parse_path(db, pid_file_manager, path, options.level,
758                                      parse_options)
759                processed_jobs.update(new_jobs)
760
761            finally:
762                fcntl.flock(lockfile, fcntl.LOCK_UN)
763                lockfile.close()
764
765    except Exception as e:
766        pid_file_manager.close_file(1)
767        raise
768    else:
769        pid_file_manager.close_file(0)
770
771
772if __name__ == "__main__":
773    main()
774