• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python -u
2
3import collections
4import datetime
5import errno
6import fcntl
7import json
8import optparse
9import os
10import socket
11import subprocess
12import sys
13import time
14import traceback
15
16import common
17from autotest_lib.client.bin.result_tools import utils as result_utils
18from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib
19from autotest_lib.client.bin.result_tools import runner as result_runner
20from autotest_lib.client.common_lib import control_data
21from autotest_lib.client.common_lib import global_config
22from autotest_lib.client.common_lib import mail, pidfile
23from autotest_lib.client.common_lib import utils
24from autotest_lib.frontend import setup_django_environment
25from autotest_lib.frontend.tko import models as tko_models
26from autotest_lib.server import site_utils
27from autotest_lib.server.cros.dynamic_suite import constants
28from autotest_lib.site_utils import job_overhead
29from autotest_lib.site_utils.sponge_lib import sponge_utils
30from autotest_lib.tko import db as tko_db, utils as tko_utils
31from autotest_lib.tko import models, parser_lib
32from autotest_lib.tko.perf_upload import perf_uploader
33
34try:
35    from chromite.lib import metrics
36except ImportError:
37    metrics = utils.metrics_mock
38
39
40_ParseOptions = collections.namedtuple(
41    'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report',
42                     'datastore_creds', 'export_to_gcloud_path'])
43
44def parse_args():
45    """Parse args."""
46    # build up our options parser and parse sys.argv
47    parser = optparse.OptionParser()
48    parser.add_option("-m", help="Send mail for FAILED tests",
49                      dest="mailit", action="store_true")
50    parser.add_option("-r", help="Reparse the results of a job",
51                      dest="reparse", action="store_true")
52    parser.add_option("-o", help="Parse a single results directory",
53                      dest="singledir", action="store_true")
54    parser.add_option("-l", help=("Levels of subdirectories to include "
55                                  "in the job name"),
56                      type="int", dest="level", default=1)
57    parser.add_option("-n", help="No blocking on an existing parse",
58                      dest="noblock", action="store_true")
59    parser.add_option("-s", help="Database server hostname",
60                      dest="db_host", action="store")
61    parser.add_option("-u", help="Database username", dest="db_user",
62                      action="store")
63    parser.add_option("-p", help="Database password", dest="db_pass",
64                      action="store")
65    parser.add_option("-d", help="Database name", dest="db_name",
66                      action="store")
67    parser.add_option("--dry-run", help="Do not actually commit any results.",
68                      dest="dry_run", action="store_true", default=False)
69    parser.add_option(
70            "--detach", action="store_true",
71            help="Detach parsing process from the caller process. Used by "
72                 "monitor_db to safely restart without affecting parsing.",
73            default=False)
74    parser.add_option("--write-pidfile",
75                      help="write pidfile (.parser_execute)",
76                      dest="write_pidfile", action="store_true",
77                      default=False)
78    parser.add_option("--record-duration",
79                      help="Record timing to metadata db",
80                      dest="record_duration", action="store_true",
81                      default=False)
82    parser.add_option("--suite-report",
83                      help=("Allows parsing job to attempt to create a suite "
84                            "timeline report, if it detects that the job being "
85                            "parsed is a suite job."),
86                      dest="suite_report", action="store_true",
87                      default=False)
88    parser.add_option("--datastore-creds",
89                      help=("The path to gcloud datastore credentials file, "
90                            "which will be used to upload suite timeline "
91                            "report to gcloud. If not specified, the one "
92                            "defined in shadow_config will be used."),
93                      dest="datastore_creds", action="store", default=None)
94    parser.add_option("--export-to-gcloud-path",
95                      help=("The path to export_to_gcloud script. Please find "
96                            "chromite path on your server. The script is under "
97                            "chromite/bin/."),
98                      dest="export_to_gcloud_path", action="store",
99                      default=None)
100    options, args = parser.parse_args()
101
102    # we need a results directory
103    if len(args) == 0:
104        tko_utils.dprint("ERROR: at least one results directory must "
105                         "be provided")
106        parser.print_help()
107        sys.exit(1)
108
109    if not options.datastore_creds:
110        gcloud_creds = global_config.global_config.get_config_value(
111            'GCLOUD', 'cidb_datastore_writer_creds', default=None)
112        options.datastore_creds = (site_utils.get_creds_abspath(gcloud_creds)
113                                   if gcloud_creds else None)
114
115    if not options.export_to_gcloud_path:
116        export_script = 'chromiumos/chromite/bin/export_to_gcloud'
117        # If it is a lab server, the script is under ~chromeos-test/
118        if os.path.exists(os.path.expanduser('~chromeos-test/%s' %
119                                             export_script)):
120            path = os.path.expanduser('~chromeos-test/%s' % export_script)
121        # If it is a local workstation, it is probably under ~/
122        elif os.path.exists(os.path.expanduser('~/%s' % export_script)):
123            path = os.path.expanduser('~/%s' % export_script)
124        # If it is not found anywhere, the default will be set to None.
125        else:
126            path = None
127        options.export_to_gcloud_path = path
128
129    # pass the options back
130    return options, args
131
132
133def format_failure_message(jobname, kernel, testname, status, reason):
134    """Format failure message with the given information.
135
136    @param jobname: String representing the job name.
137    @param kernel: String representing the kernel.
138    @param testname: String representing the test name.
139    @param status: String representing the test status.
140    @param reason: String representing the reason.
141
142    @return: Failure message as a string.
143    """
144    format_string = "%-12s %-20s %-12s %-10s %s"
145    return format_string % (jobname, kernel, testname, status, reason)
146
147
148def mailfailure(jobname, job, message):
149    """Send an email about the failure.
150
151    @param jobname: String representing the job name.
152    @param job: A job object.
153    @param message: The message to mail.
154    """
155    message_lines = [""]
156    message_lines.append("The following tests FAILED for this job")
157    message_lines.append("http://%s/results/%s" %
158                         (socket.gethostname(), jobname))
159    message_lines.append("")
160    message_lines.append(format_failure_message("Job name", "Kernel",
161                                                "Test name", "FAIL/WARN",
162                                                "Failure reason"))
163    message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8,
164                                                "=" * 8, "=" * 14))
165    message_header = "\n".join(message_lines)
166
167    subject = "AUTOTEST: FAILED tests from job %s" % jobname
168    mail.send("", job.user, "", subject, message_header + message)
169
170
171def _invalidate_original_tests(orig_job_idx, retry_job_idx):
172    """Retry tests invalidates original tests.
173
174    Whenever a retry job is complete, we want to invalidate the original
175    job's test results, such that the consumers of the tko database
176    (e.g. tko frontend, wmatrix) could figure out which results are the latest.
177
178    When a retry job is parsed, we retrieve the original job's afe_job_id
179    from the retry job's keyvals, which is then converted to tko job_idx and
180    passed into this method as |orig_job_idx|.
181
182    In this method, we are going to invalidate the rows in tko_tests that are
183    associated with the original job by flipping their 'invalid' bit to True.
184    In addition, in tko_tests, we also maintain a pointer from the retry results
185    to the original results, so that later we can always know which rows in
186    tko_tests are retries and which are the corresponding original results.
187    This is done by setting the field 'invalidates_test_idx' of the tests
188    associated with the retry job.
189
190    For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after
191    this method is run, their tko_tests rows will look like:
192    __________________________________________________________________________
193    test_idx| job_idx | test            | ... | invalid | invalidates_test_idx
194    10      | 105     | dummy_Fail.Error| ... | 1       | NULL
195    11      | 105     | dummy_Fail.Fail | ... | 1       | NULL
196    ...
197    20      | 108     | dummy_Fail.Error| ... | 0       | 10
198    21      | 108     | dummy_Fail.Fail | ... | 0       | 11
199    __________________________________________________________________________
200    Note the invalid bits of the rows for Job(job_idx=105) are set to '1'.
201    And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108)
202    are set to 10 and 11 (the test_idx of the rows for the original job).
203
204    @param orig_job_idx: An integer representing the original job's
205                         tko job_idx. Tests associated with this job will
206                         be marked as 'invalid'.
207    @param retry_job_idx: An integer representing the retry job's
208                          tko job_idx. The field 'invalidates_test_idx'
209                          of the tests associated with this job will be updated.
210
211    """
212    msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx)
213    if not orig_job_idx or not retry_job_idx:
214        tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg)
215    # Using django models here makes things easier, but make sure that
216    # before this method is called, all other relevant transactions have been
217    # committed to avoid race condition. In the long run, we might consider
218    # to make the rest of parser use django models.
219    orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx)
220    retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx)
221
222    # Invalidate original tests.
223    orig_tests.update(invalid=True)
224
225    # Maintain a dictionary that maps (test, subdir) to original tests.
226    # Note that within the scope of a job, (test, subdir) uniquelly
227    # identifies a test run, but 'test' does not.
228    # In a control file, one could run the same test with different
229    # 'subdir_tag', for example,
230    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_1')
231    #     job.run_test('dummy_Fail', tag='Error', subdir_tag='subdir_2')
232    # In tko, we will get
233    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_1')
234    #    (test='dummy_Fail.Error', subdir='dummy_Fail.Error.subdir_2')
235    invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test
236                         for orig_test in orig_tests}
237    for retry in retry_tests:
238        # It is possible that (retry.test, retry.subdir) doesn't exist
239        # in invalidated_tests. This could happen when the original job
240        # didn't run some of its tests. For example, a dut goes offline
241        # since the beginning of the job, in which case invalidated_tests
242        # will only have one entry for 'SERVER_JOB'.
243        orig_test = invalidated_tests.get((retry.test, retry.subdir), None)
244        if orig_test:
245            retry.invalidates_test = orig_test
246            retry.save()
247    tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg)
248
249
250def _throttle_result_size(path):
251    """Limit the total size of test results for the given path.
252
253    @param path: Path of the result directory.
254    """
255    if not result_runner.ENABLE_RESULT_THROTTLING:
256        tko_utils.dprint(
257                'Result throttling is not enabled. Skipping throttling %s' %
258                path)
259        return
260
261    max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
262    # Client side test saves the test control to file `control`, while server
263    # side test saves the test control to file `control.srv`
264    for control_file in ['control', 'control.srv']:
265        control = os.path.join(path, control_file)
266        try:
267            max_result_size_KB = control_data.parse_control(
268                    control, raise_warnings=False).max_result_size_KB
269            # Any value different from the default is considered to be the one
270            # set in the test control file.
271            if max_result_size_KB != control_data.DEFAULT_MAX_RESULT_SIZE_KB:
272                break
273        except IOError as e:
274            tko_utils.dprint(
275                    'Failed to access %s. Error: %s\nDetails %s' %
276                    (control, e, traceback.format_exc()))
277        except control_data.ControlVariableException as e:
278            tko_utils.dprint(
279                    'Failed to parse %s. Error: %s\nDetails %s' %
280                    (control, e, traceback.format_exc()))
281
282    try:
283        result_utils.execute(path, max_result_size_KB)
284    except:
285        tko_utils.dprint(
286                'Failed to throttle result size of %s.\nDetails %s' %
287                (path, traceback.format_exc()))
288
289
290def export_tko_job_to_file(job, jobname, filename):
291    """Exports the tko job to disk file.
292
293    @param job: database object.
294    @param jobname: the job name as string.
295    @param filename: The path to the results to be parsed.
296    """
297    try:
298        from autotest_lib.tko import job_serializer
299
300        serializer = job_serializer.JobSerializer()
301        serializer.serialize_to_binary(job, jobname, filename)
302    except ImportError:
303        tko_utils.dprint("WARNING: tko_pb2.py doesn't exist. Create by "
304                         "compiling tko/tko.proto.")
305
306
307def parse_one(db, jobname, path, parse_options):
308    """Parse a single job. Optionally send email on failure.
309
310    @param db: database object.
311    @param jobname: the tag used to search for existing job in db,
312                    e.g. '1234-chromeos-test/host1'
313    @param path: The path to the results to be parsed.
314    @param parse_options: _ParseOptions instance.
315    """
316    reparse = parse_options.reparse
317    mail_on_failure = parse_options.mail_on_failure
318    dry_run = parse_options.dry_run
319    suite_report = parse_options.suite_report
320    datastore_creds = parse_options.datastore_creds
321    export_to_gcloud_path = parse_options.export_to_gcloud_path
322
323    tko_utils.dprint("\nScanning %s (%s)" % (jobname, path))
324    old_job_idx = db.find_job(jobname)
325    # old tests is a dict from tuple (test_name, subdir) to test_idx
326    old_tests = {}
327    if old_job_idx is not None:
328        if not reparse:
329            tko_utils.dprint("! Job is already parsed, done")
330            return
331
332        raw_old_tests = db.select("test_idx,subdir,test", "tko_tests",
333                                  {"job_idx": old_job_idx})
334        if raw_old_tests:
335            old_tests = dict(((test, subdir), test_idx)
336                             for test_idx, subdir, test in raw_old_tests)
337
338    # look up the status version
339    job_keyval = models.job.read_keyval(path)
340    status_version = job_keyval.get("status_version", 0)
341
342    # parse out the job
343    parser = parser_lib.parser(status_version)
344    job = parser.make_job(path)
345    status_log = os.path.join(path, "status.log")
346    if not os.path.exists(status_log):
347        status_log = os.path.join(path, "status")
348    if not os.path.exists(status_log):
349        tko_utils.dprint("! Unable to parse job, no status file")
350        return
351
352    # parse the status logs
353    tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname))
354    status_lines = open(status_log).readlines()
355    parser.start(job)
356    tests = parser.end(status_lines)
357
358    # parser.end can return the same object multiple times, so filter out dups
359    job.tests = []
360    already_added = set()
361    for test in tests:
362        if test not in already_added:
363            already_added.add(test)
364            job.tests.append(test)
365
366    # try and port test_idx over from the old tests, but if old tests stop
367    # matching up with new ones just give up
368    if reparse and old_job_idx is not None:
369        job.index = old_job_idx
370        for test in job.tests:
371            test_idx = old_tests.pop((test.testname, test.subdir), None)
372            if test_idx is not None:
373                test.test_idx = test_idx
374            else:
375                tko_utils.dprint("! Reparse returned new test "
376                                 "testname=%r subdir=%r" %
377                                 (test.testname, test.subdir))
378        if not dry_run:
379            for test_idx in old_tests.itervalues():
380                where = {'test_idx' : test_idx}
381                db.delete('tko_iteration_result', where)
382                db.delete('tko_iteration_perf_value', where)
383                db.delete('tko_iteration_attributes', where)
384                db.delete('tko_test_attributes', where)
385                db.delete('tko_test_labels_tests', {'test_id': test_idx})
386                db.delete('tko_tests', where)
387
388    job.build = None
389    job.board = None
390    job.build_version = None
391    job.suite = None
392    if job.label:
393        label_info = site_utils.parse_job_name(job.label)
394        if label_info:
395            job.build = label_info.get('build', None)
396            job.build_version = label_info.get('build_version', None)
397            job.board = label_info.get('board', None)
398            job.suite = label_info.get('suite', None)
399
400    result_utils_lib.LOG =  tko_utils.dprint
401    _throttle_result_size(path)
402
403    # Record test result size to job_keyvals
404    start_time = time.time()
405    result_size_info = site_utils.collect_result_sizes(
406            path, log=tko_utils.dprint)
407    tko_utils.dprint('Finished collecting result sizes after %s seconds' %
408                     (time.time()-start_time))
409    job.keyval_dict.update(result_size_info.__dict__)
410
411    # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it.
412
413    # check for failures
414    message_lines = [""]
415    job_successful = True
416    for test in job.tests:
417        if not test.subdir:
418            continue
419        tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s"
420                         % (test.testname, test.subdir, test.status,
421                            test.reason))
422        if test.status != 'GOOD':
423            job_successful = False
424            message_lines.append(format_failure_message(
425                jobname, test.kernel.base, test.subdir,
426                test.status, test.reason))
427    try:
428        message = "\n".join(message_lines)
429
430        if not dry_run:
431            # send out a email report of failure
432            if len(message) > 2 and mail_on_failure:
433                tko_utils.dprint("Sending email report of failure on %s to %s"
434                                 % (jobname, job.user))
435                mailfailure(jobname, job, message)
436
437            # Upload perf values to the perf dashboard, if applicable.
438            for test in job.tests:
439                perf_uploader.upload_test(job, test, jobname)
440
441            # Upload job details to Sponge.
442            sponge_url = sponge_utils.upload_results(job, log=tko_utils.dprint)
443            if sponge_url:
444                job.keyval_dict['sponge_url'] = sponge_url
445
446            # write the job into the database.
447            job_data = db.insert_job(
448                jobname, job,
449                parent_job_id=job_keyval.get(constants.PARENT_JOB_ID, None))
450
451            # Verify the job data is written to the database.
452            if job.tests:
453                tests_in_db = db.find_tests(job_data['job_idx'])
454                tests_in_db_count = len(tests_in_db) if tests_in_db else 0
455                if tests_in_db_count != len(job.tests):
456                    tko_utils.dprint(
457                            'Failed to find enough tests for job_idx: %d. The '
458                            'job should have %d tests, only found %d tests.' %
459                            (job_data['job_idx'], len(job.tests),
460                             tests_in_db_count))
461                    metrics.Counter(
462                            'chromeos/autotest/result/db_save_failure',
463                            description='The number of times parse failed to '
464                            'save job to TKO database.').increment()
465
466            # Although the cursor has autocommit, we still need to force it to
467            # commit existing changes before we can use django models, otherwise
468            # it will go into deadlock when django models try to start a new
469            # trasaction while the current one has not finished yet.
470            db.commit()
471
472            # Handle retry job.
473            orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID,
474                                             None)
475            if orig_afe_job_id:
476                orig_job_idx = tko_models.Job.objects.get(
477                        afe_job_id=orig_afe_job_id).job_idx
478                _invalidate_original_tests(orig_job_idx, job.index)
479    except Exception as e:
480        tko_utils.dprint("Hit exception while uploading to tko db:\n%s" %
481                         traceback.format_exc())
482        raise e
483
484    # Serializing job into a binary file
485    export_tko_to_file = global_config.global_config.get_config_value(
486            'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False)
487
488    binary_file_name = os.path.join(path, "job.serialize")
489    if export_tko_to_file:
490        export_tko_job_to_file(job, jobname, binary_file_name)
491
492    if reparse:
493        site_export_file = "autotest_lib.tko.site_export"
494        site_export = utils.import_site_function(__file__,
495                                                 site_export_file,
496                                                 "site_export",
497                                                 _site_export_dummy)
498        site_export(binary_file_name)
499
500    if not dry_run:
501        db.commit()
502
503    # Generate a suite report.
504    # Check whether this is a suite job, a suite job will be a hostless job, its
505    # jobname will be <JOB_ID>-<USERNAME>/hostless, the suite field will not be
506    # NULL. Only generate timeline report when datastore_parent_key is given.
507    try:
508        datastore_parent_key = job_keyval.get('datastore_parent_key', None)
509        if (suite_report and jobname.endswith('/hostless')
510            and job_data['suite'] and datastore_parent_key):
511            tko_utils.dprint('Start dumping suite timing report...')
512            timing_log = os.path.join(path, 'suite_timing.log')
513            dump_cmd = ("%s/site_utils/dump_suite_report.py %s "
514                        "--output='%s' --debug" %
515                        (common.autotest_dir, job_data['afe_job_id'],
516                         timing_log))
517            subprocess.check_output(dump_cmd, shell=True)
518            tko_utils.dprint('Successfully finish dumping suite timing report')
519
520            if (datastore_creds and export_to_gcloud_path
521                and os.path.exists(export_to_gcloud_path)):
522                upload_cmd = [export_to_gcloud_path, datastore_creds,
523                              timing_log, '--parent_key',
524                              datastore_parent_key]
525                tko_utils.dprint('Start exporting timeline report to gcloud')
526                subprocess.check_output(upload_cmd)
527                tko_utils.dprint('Successfully export timeline report to '
528                                 'gcloud')
529            else:
530                tko_utils.dprint('DEBUG: skip exporting suite timeline to '
531                                 'gcloud, because either gcloud creds or '
532                                 'export_to_gcloud script is not found.')
533    except Exception as e:
534        tko_utils.dprint("WARNING: fail to dump/export suite report. "
535                         "Error:\n%s" % e)
536
537    # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of
538    # the function, so any failure, e.g., db connection error, will stop
539    # gs_offloader_instructions being updated, and logs can be uploaded for
540    # troubleshooting.
541    if job_successful:
542        # Check if we should not offload this test's results.
543        if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False):
544            # Update the gs_offloader_instructions json file.
545            gs_instructions_file = os.path.join(
546                    path, constants.GS_OFFLOADER_INSTRUCTIONS)
547            gs_offloader_instructions = {}
548            if os.path.exists(gs_instructions_file):
549                with open(gs_instructions_file, 'r') as f:
550                    gs_offloader_instructions = json.load(f)
551
552            gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True
553            with open(gs_instructions_file, 'w') as f:
554                json.dump(gs_offloader_instructions, f)
555
556
557def _site_export_dummy(binary_file_name):
558    pass
559
560
561def _get_job_subdirs(path):
562    """
563    Returns a list of job subdirectories at path. Returns None if the test
564    is itself a job directory. Does not recurse into the subdirs.
565    """
566    # if there's a .machines file, use it to get the subdirs
567    machine_list = os.path.join(path, ".machines")
568    if os.path.exists(machine_list):
569        subdirs = set(line.strip() for line in file(machine_list))
570        existing_subdirs = set(subdir for subdir in subdirs
571                               if os.path.exists(os.path.join(path, subdir)))
572        if len(existing_subdirs) != 0:
573            return existing_subdirs
574
575    # if this dir contains ONLY subdirectories, return them
576    contents = set(os.listdir(path))
577    contents.discard(".parse.lock")
578    subdirs = set(sub for sub in contents if
579                  os.path.isdir(os.path.join(path, sub)))
580    if len(contents) == len(subdirs) != 0:
581        return subdirs
582
583    # this is a job directory, or something else we don't understand
584    return None
585
586
587def parse_leaf_path(db, path, level, parse_options):
588    """Parse a leaf path.
589
590    @param db: database handle.
591    @param path: The path to the results to be parsed.
592    @param level: Integer, level of subdirectories to include in the job name.
593    @param parse_options: _ParseOptions instance.
594
595    @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1'
596    """
597    job_elements = path.split("/")[-level:]
598    jobname = "/".join(job_elements)
599    try:
600        db.run_with_retry(parse_one, db, jobname, path, parse_options)
601    except Exception as e:
602        tko_utils.dprint("Error parsing leaf path: %s\nException:\n%s\n%s" %
603                         (path, e, traceback.format_exc()))
604    return jobname
605
606
607def parse_path(db, path, level, parse_options):
608    """Parse a path
609
610    @param db: database handle.
611    @param path: The path to the results to be parsed.
612    @param level: Integer, level of subdirectories to include in the job name.
613    @param parse_options: _ParseOptions instance.
614
615    @returns: A set of job names of the parsed jobs.
616              set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
617    """
618    processed_jobs = set()
619    job_subdirs = _get_job_subdirs(path)
620    if job_subdirs is not None:
621        # parse status.log in current directory, if it exists. multi-machine
622        # synchronous server side tests record output in this directory. without
623        # this check, we do not parse these results.
624        if os.path.exists(os.path.join(path, 'status.log')):
625            new_job = parse_leaf_path(db, path, level, parse_options)
626            processed_jobs.add(new_job)
627        # multi-machine job
628        for subdir in job_subdirs:
629            jobpath = os.path.join(path, subdir)
630            new_jobs = parse_path(db, jobpath, level + 1, parse_options)
631            processed_jobs.update(new_jobs)
632    else:
633        # single machine job
634        new_job = parse_leaf_path(db, path, level, parse_options)
635        processed_jobs.add(new_job)
636    return processed_jobs
637
638
639def record_parsing(processed_jobs, duration_secs):
640    """Record the time spent on parsing to metadata db.
641
642    @param processed_jobs: A set of job names of the parsed jobs.
643              set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
644    @param duration_secs: Total time spent on parsing, in seconds.
645    """
646
647    for job_name in processed_jobs:
648        job_id, hostname = tko_utils.get_afe_job_id_and_hostname(job_name)
649        if not job_id or not hostname:
650            tko_utils.dprint('ERROR: can not parse job name %s, '
651                             'will not send duration to metadata db.'
652                             % job_name)
653            continue
654        else:
655            job_overhead.record_state_duration(
656                    job_id, hostname, job_overhead.STATUS.PARSING,
657                    duration_secs)
658
659def _detach_from_parent_process():
660    """Allow reparenting the parse process away from caller.
661
662    When monitor_db is run via upstart, restarting the job sends SIGTERM to
663    the whole process group. This makes us immune from that.
664    """
665    if os.getpid() != os.getpgid(0):
666        os.setsid()
667
668def main():
669    """Main entrance."""
670    start_time = datetime.datetime.now()
671    # Record the processed jobs so that
672    # we can send the duration of parsing to metadata db.
673    processed_jobs = set()
674
675    options, args = parse_args()
676
677    if options.detach:
678        _detach_from_parent_process()
679
680    parse_options = _ParseOptions(options.reparse, options.mailit,
681                                  options.dry_run, options.suite_report,
682                                  options.datastore_creds,
683                                  options.export_to_gcloud_path)
684    results_dir = os.path.abspath(args[0])
685    assert os.path.exists(results_dir)
686
687    site_utils.SetupTsMonGlobalState('tko_parse', indirect=False,
688                                     short_lived=True)
689
690    pid_file_manager = pidfile.PidFileManager("parser", results_dir)
691
692    if options.write_pidfile:
693        pid_file_manager.open_file()
694
695    try:
696        # build up the list of job dirs to parse
697        if options.singledir:
698            jobs_list = [results_dir]
699        else:
700            jobs_list = [os.path.join(results_dir, subdir)
701                         for subdir in os.listdir(results_dir)]
702
703        # build up the database
704        db = tko_db.db(autocommit=False, host=options.db_host,
705                       user=options.db_user, password=options.db_pass,
706                       database=options.db_name)
707
708        # parse all the jobs
709        for path in jobs_list:
710            lockfile = open(os.path.join(path, ".parse.lock"), "w")
711            flags = fcntl.LOCK_EX
712            if options.noblock:
713                flags |= fcntl.LOCK_NB
714            try:
715                fcntl.flock(lockfile, flags)
716            except IOError, e:
717                # lock is not available and nonblock has been requested
718                if e.errno == errno.EWOULDBLOCK:
719                    lockfile.close()
720                    continue
721                else:
722                    raise # something unexpected happened
723            try:
724                new_jobs = parse_path(db, path, options.level, parse_options)
725                processed_jobs.update(new_jobs)
726
727            finally:
728                fcntl.flock(lockfile, fcntl.LOCK_UN)
729                lockfile.close()
730
731    except Exception as e:
732        pid_file_manager.close_file(1)
733        raise
734    else:
735        pid_file_manager.close_file(0)
736    finally:
737        metrics.Flush()
738    duration_secs = (datetime.datetime.now() - start_time).total_seconds()
739    if options.record_duration:
740        record_parsing(processed_jobs, duration_secs)
741
742
743if __name__ == "__main__":
744    main()
745