• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python3 -u
2
3from __future__ import absolute_import
4from __future__ import division
5from __future__ import print_function
6
7import collections
8import errno
9import fcntl
10import json
11import optparse
12import os
13import socket
14import sys
15import time
16import traceback
17
18import common
19from autotest_lib.client.bin.result_tools import utils as result_utils
20from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib
21from autotest_lib.client.bin.result_tools import runner as result_runner
22from autotest_lib.client.common_lib import control_data
23from autotest_lib.client.common_lib import global_config
24from autotest_lib.client.common_lib import mail, pidfile
25from autotest_lib.client.common_lib import utils
26from autotest_lib.frontend import setup_django_environment
27from autotest_lib.frontend.tko import models as tko_models
28from autotest_lib.server import site_utils
29from autotest_lib.server.cros.dynamic_suite import constants
30from autotest_lib.tko import db as tko_db, utils as tko_utils
31from autotest_lib.tko import models, parser_lib
32from autotest_lib.tko.perf_upload import perf_uploader
33from autotest_lib.utils.side_effects import config_loader
34import six
35
36try:
37    from autotest_lib.utils.frozen_chromite.lib import metrics
38except ImportError:
39    metrics = utils.metrics_mock
40
41
42_ParseOptions = collections.namedtuple(
43    'ParseOptions', ['reparse', 'mail_on_failure', 'dry_run', 'suite_report',
44                     'datastore_creds', 'export_to_gcloud_path',
45                     'disable_perf_upload'])
46
47_HARDCODED_CONTROL_FILE_NAMES = (
48        # client side test control, as saved in old Autotest paths.
49        'control',
50        # server side test control, as saved in old Autotest paths.
51        'control.srv',
52        # All control files, as saved in skylab.
53        'control.from_control_name',
54)
55
56# Max size for the parser is 350mb due to large suites getting throttled.
57DEFAULT_MAX_RESULT_SIZE_KB = 350000
58
59
60def parse_args():
61    """Parse args."""
62    # build up our options parser and parse sys.argv
63    parser = optparse.OptionParser()
64    parser.add_option("-m", help="Send mail for FAILED tests",
65                      dest="mailit", action="store_true")
66    parser.add_option("-r", help="Reparse the results of a job",
67                      dest="reparse", action="store_true")
68    parser.add_option("-o", help="Parse a single results directory",
69                      dest="singledir", action="store_true")
70    parser.add_option("-l", help=("Levels of subdirectories to include "
71                                  "in the job name"),
72                      type="int", dest="level", default=1)
73    parser.add_option("-n", help="No blocking on an existing parse",
74                      dest="noblock", action="store_true")
75    parser.add_option("-s", help="Database server hostname",
76                      dest="db_host", action="store")
77    parser.add_option("-u", help="Database username", dest="db_user",
78                      action="store")
79    parser.add_option("-p", help="Database password", dest="db_pass",
80                      action="store")
81    parser.add_option("-d", help="Database name", dest="db_name",
82                      action="store")
83    parser.add_option("--dry-run", help="Do not actually commit any results.",
84                      dest="dry_run", action="store_true", default=False)
85    parser.add_option(
86            "--detach", action="store_true",
87            help="Detach parsing process from the caller process. Used by "
88                 "monitor_db to safely restart without affecting parsing.",
89            default=False)
90    parser.add_option("--write-pidfile",
91                      help="write pidfile (.parser_execute)",
92                      dest="write_pidfile", action="store_true",
93                      default=False)
94    parser.add_option("--record-duration",
95                      help="[DEPRECATED] Record timing to metadata db",
96                      dest="record_duration", action="store_true",
97                      default=False)
98    parser.add_option("--suite-report",
99                      help=("Allows parsing job to attempt to create a suite "
100                            "timeline report, if it detects that the job being "
101                            "parsed is a suite job."),
102                      dest="suite_report", action="store_true",
103                      default=False)
104    parser.add_option("--datastore-creds",
105                      help=("[DEPRECATED] "
106                            "The path to gcloud datastore credentials file, "
107                            "which will be used to upload suite timeline "
108                            "report to gcloud."),
109                      dest="datastore_creds",
110                      action="store",
111                      default=None)
112    parser.add_option(
113            "--export-to-gcloud-path",
114            help=("[DEPRECATED] "
115                  "The path to export_to_gcloud script. Please find "
116                  "chromite path on your server. The script is under "
117                  "chromite/bin/."),
118            dest="export_to_gcloud_path",
119            action="store",
120            default=None)
121    parser.add_option("--disable-perf-upload",
122                      help=("Do not upload perf results to chrome perf."),
123                      dest="disable_perf_upload", action="store_true",
124                      default=False)
125    options, args = parser.parse_args()
126
127    # we need a results directory
128    if len(args) == 0:
129        tko_utils.dprint("ERROR: at least one results directory must "
130                         "be provided")
131        parser.print_help()
132        sys.exit(1)
133
134    # pass the options back
135    return options, args
136
137
138def format_failure_message(jobname, kernel, testname, status, reason):
139    """Format failure message with the given information.
140
141    @param jobname: String representing the job name.
142    @param kernel: String representing the kernel.
143    @param testname: String representing the test name.
144    @param status: String representing the test status.
145    @param reason: String representing the reason.
146
147    @return: Failure message as a string.
148    """
149    format_string = "%-12s %-20s %-12s %-10s %s"
150    return format_string % (jobname, kernel, testname, status, reason)
151
152
153def mailfailure(jobname, job, message):
154    """Send an email about the failure.
155
156    @param jobname: String representing the job name.
157    @param job: A job object.
158    @param message: The message to mail.
159    """
160    message_lines = [""]
161    message_lines.append("The following tests FAILED for this job")
162    message_lines.append("http://%s/results/%s" %
163                         (socket.gethostname(), jobname))
164    message_lines.append("")
165    message_lines.append(format_failure_message("Job name", "Kernel",
166                                                "Test name", "FAIL/WARN",
167                                                "Failure reason"))
168    message_lines.append(format_failure_message("=" * 8, "=" * 6, "=" * 8,
169                                                "=" * 8, "=" * 14))
170    message_header = "\n".join(message_lines)
171
172    subject = "AUTOTEST: FAILED tests from job %s" % jobname
173    mail.send("", job.user, "", subject, message_header + message)
174
175
176def _invalidate_original_tests(orig_job_idx, retry_job_idx):
177    """Retry tests invalidates original tests.
178
179    Whenever a retry job is complete, we want to invalidate the original
180    job's test results, such that the consumers of the tko database
181    (e.g. tko frontend, wmatrix) could figure out which results are the latest.
182
183    When a retry job is parsed, we retrieve the original job's afe_job_id
184    from the retry job's keyvals, which is then converted to tko job_idx and
185    passed into this method as |orig_job_idx|.
186
187    In this method, we are going to invalidate the rows in tko_tests that are
188    associated with the original job by flipping their 'invalid' bit to True.
189    In addition, in tko_tests, we also maintain a pointer from the retry results
190    to the original results, so that later we can always know which rows in
191    tko_tests are retries and which are the corresponding original results.
192    This is done by setting the field 'invalidates_test_idx' of the tests
193    associated with the retry job.
194
195    For example, assume Job(job_idx=105) are retried by Job(job_idx=108), after
196    this method is run, their tko_tests rows will look like:
197    __________________________________________________________________________
198    test_idx| job_idx | test              | ... | invalid | invalidates_test_idx
199    10      | 105     | example_Fail.Error| ... | 1       | NULL
200    11      | 105     | example_Fail.Fail | ... | 1       | NULL
201    ...
202    20      | 108     | example_Fail.Error| ... | 0       | 10
203    21      | 108     | example_Fail.Fail | ... | 0       | 11
204    __________________________________________________________________________
205    Note the invalid bits of the rows for Job(job_idx=105) are set to '1'.
206    And the 'invalidates_test_idx' fields of the rows for Job(job_idx=108)
207    are set to 10 and 11 (the test_idx of the rows for the original job).
208
209    @param orig_job_idx: An integer representing the original job's
210                         tko job_idx. Tests associated with this job will
211                         be marked as 'invalid'.
212    @param retry_job_idx: An integer representing the retry job's
213                          tko job_idx. The field 'invalidates_test_idx'
214                          of the tests associated with this job will be updated.
215
216    """
217    msg = 'orig_job_idx: %s, retry_job_idx: %s' % (orig_job_idx, retry_job_idx)
218    if not orig_job_idx or not retry_job_idx:
219        tko_utils.dprint('ERROR: Could not invalidate tests: ' + msg)
220    # Using django models here makes things easier, but make sure that
221    # before this method is called, all other relevant transactions have been
222    # committed to avoid race condition. In the long run, we might consider
223    # to make the rest of parser use django models.
224    orig_tests = tko_models.Test.objects.filter(job__job_idx=orig_job_idx)
225    retry_tests = tko_models.Test.objects.filter(job__job_idx=retry_job_idx)
226
227    # Invalidate original tests.
228    orig_tests.update(invalid=True)
229
230    # Maintain a dictionary that maps (test, subdir) to original tests.
231    # Note that within the scope of a job, (test, subdir) uniquelly
232    # identifies a test run, but 'test' does not.
233    # In a control file, one could run the same test with different
234    # 'subdir_tag', for example,
235    #     job.run_test('example_Fail', tag='Error', subdir_tag='subdir_1')
236    #     job.run_test('example_Fail', tag='Error', subdir_tag='subdir_2')
237    # In tko, we will get
238    #    (test='example_Fail.Error', subdir='example_Fail.Error.subdir_1')
239    #    (test='example_Fail.Error', subdir='example_Fail.Error.subdir_2')
240    invalidated_tests = {(orig_test.test, orig_test.subdir): orig_test
241                         for orig_test in orig_tests}
242    for retry in retry_tests:
243        # It is possible that (retry.test, retry.subdir) doesn't exist
244        # in invalidated_tests. This could happen when the original job
245        # didn't run some of its tests. For example, a dut goes offline
246        # since the beginning of the job, in which case invalidated_tests
247        # will only have one entry for 'SERVER_JOB'.
248        orig_test = invalidated_tests.get((retry.test, retry.subdir), None)
249        if orig_test:
250            retry.invalidates_test = orig_test
251            retry.save()
252    tko_utils.dprint('DEBUG: Invalidated tests associated to job: ' + msg)
253
254
255def _throttle_result_size(path):
256    """Limit the total size of test results for the given path.
257
258    @param path: Path of the result directory.
259    """
260    if not result_runner.ENABLE_RESULT_THROTTLING:
261        tko_utils.dprint(
262                'Result throttling is not enabled. Skipping throttling %s' %
263                path)
264        return
265
266    max_result_size_KB = _max_result_size_from_control(path)
267    if max_result_size_KB is None:
268        max_result_size_KB = DEFAULT_MAX_RESULT_SIZE_KB
269
270    try:
271        result_utils.execute(path, max_result_size_KB)
272    except:
273        tko_utils.dprint(
274                'Failed to throttle result size of %s.\nDetails %s' %
275                (path, traceback.format_exc()))
276
277
278def _max_result_size_from_control(path):
279    """Gets the max result size set in a control file, if any.
280
281    If not overrides is found, returns None.
282    """
283    for control_file in _HARDCODED_CONTROL_FILE_NAMES:
284        control = os.path.join(path, control_file)
285        if not os.path.exists(control):
286            continue
287
288        try:
289            max_result_size_KB = control_data.parse_control(
290                    control, raise_warnings=False).max_result_size_KB
291            if max_result_size_KB != DEFAULT_MAX_RESULT_SIZE_KB:
292                return max_result_size_KB
293        except IOError as e:
294            tko_utils.dprint(
295                    'Failed to access %s. Error: %s\nDetails %s' %
296                    (control, e, traceback.format_exc()))
297        except control_data.ControlVariableException as e:
298            tko_utils.dprint(
299                    'Failed to parse %s. Error: %s\nDetails %s' %
300                    (control, e, traceback.format_exc()))
301    return None
302
303
304def export_tko_job_to_file(job, jobname, filename):
305    """Exports the tko job to disk file.
306
307    @param job: database object.
308    @param jobname: the job name as string.
309    @param filename: the serialized binary destination path.
310    """
311    from autotest_lib.tko import job_serializer
312
313    serializer = job_serializer.JobSerializer()
314    serializer.serialize_to_binary(job, jobname, filename)
315
316
317def parse_one(db, pid_file_manager, jobname, path, parse_options):
318    """Parse a single job. Optionally send email on failure.
319
320    @param db: database object.
321    @param pid_file_manager: pidfile.PidFileManager object.
322    @param jobname: the tag used to search for existing job in db,
323                    e.g. '1234-chromeos-test/host1'
324    @param path: The path to the results to be parsed.
325    @param parse_options: _ParseOptions instance.
326
327    @return job: the parsed job object
328    """
329    reparse = parse_options.reparse
330    mail_on_failure = parse_options.mail_on_failure
331    dry_run = parse_options.dry_run
332    suite_report = parse_options.suite_report
333
334    tko_utils.dprint("\nScanning %s (%s)" % (jobname, path))
335    old_job_idx = db.find_job(jobname)
336    if old_job_idx is not None and not reparse:
337        tko_utils.dprint("! Job is already parsed, done")
338        return None
339
340    # look up the status version
341    job_keyval = models.job.read_keyval(path)
342    status_version = job_keyval.get("status_version", 0)
343
344    parser = parser_lib.parser(status_version)
345    job = parser.make_job(path)
346    tko_utils.dprint("+ Parsing dir=%s, jobname=%s" % (path, jobname))
347    status_log_path = _find_status_log_path(path)
348    if not status_log_path:
349        tko_utils.dprint("! Unable to parse job, no status file")
350        return None
351    _parse_status_log(parser, job, status_log_path)
352
353    if old_job_idx is not None:
354        job.job_idx = old_job_idx
355        unmatched_tests = _match_existing_tests(db, job)
356        if not dry_run:
357            _delete_tests_from_db(db, unmatched_tests)
358
359    job.afe_job_id = tko_utils.get_afe_job_id(jobname)
360    job.skylab_task_id = tko_utils.get_skylab_task_id(jobname)
361    job.afe_parent_job_id = job_keyval.get(constants.PARENT_JOB_ID)
362    job.skylab_parent_task_id = job_keyval.get(constants.PARENT_JOB_ID)
363    job.build = None
364    job.board = None
365    job.build_version = None
366    job.suite = None
367    if job.label:
368        label_info = site_utils.parse_job_name(job.label)
369        if label_info:
370            job.build = label_info.get('build', None)
371            job.build_version = label_info.get('build_version', None)
372            job.board = label_info.get('board', None)
373            job.suite = label_info.get('suite', None)
374
375    if 'suite' in job.keyval_dict:
376        job.suite = job.keyval_dict['suite']
377
378    result_utils_lib.LOG =  tko_utils.dprint
379
380    # Do not throttle results for now (b/207409280)
381    # _throttle_result_size(path)
382
383    # Record test result size to job_keyvals
384    start_time = time.time()
385    result_size_info = site_utils.collect_result_sizes(
386            path, log=tko_utils.dprint)
387    tko_utils.dprint('Finished collecting result sizes after %s seconds' %
388                     (time.time()-start_time))
389    job.keyval_dict.update(result_size_info._asdict())
390
391    # TODO(dshi): Update sizes with sponge_invocation.xml and throttle it.
392
393    # check for failures
394    message_lines = [""]
395    job_successful = True
396    for test in job.tests:
397        if not test.subdir:
398            continue
399        tko_utils.dprint("* testname, subdir, status, reason: %s %s %s %s"
400                         % (test.testname, test.subdir, test.status,
401                            test.reason))
402        if test.status not in ('GOOD', 'WARN'):
403            job_successful = False
404            pid_file_manager.num_tests_failed += 1
405            message_lines.append(format_failure_message(
406                jobname, test.kernel.base, test.subdir,
407                test.status, test.reason))
408
409    message = "\n".join(message_lines)
410
411    if not dry_run:
412        # send out a email report of failure
413        if len(message) > 2 and mail_on_failure:
414            tko_utils.dprint("Sending email report of failure on %s to %s"
415                                % (jobname, job.user))
416            mailfailure(jobname, job, message)
417
418        # Upload perf values to the perf dashboard, if applicable.
419        if parse_options.disable_perf_upload:
420            tko_utils.dprint("Skipping results upload to chrome perf as it is "
421                "disabled by config")
422        else:
423            for test in job.tests:
424                perf_uploader.upload_test(job, test, jobname)
425
426        _write_job_to_db(db, jobname, job)
427
428        # Verify the job data is written to the database.
429        if job.tests:
430            tests_in_db = db.find_tests(job.job_idx)
431            tests_in_db_count = len(tests_in_db) if tests_in_db else 0
432            if tests_in_db_count != len(job.tests):
433                tko_utils.dprint(
434                        'Failed to find enough tests for job_idx: %d. The '
435                        'job should have %d tests, only found %d tests.' %
436                        (job.job_idx, len(job.tests), tests_in_db_count))
437                metrics.Counter(
438                        'chromeos/autotest/result/db_save_failure',
439                        description='The number of times parse failed to '
440                        'save job to TKO database.').increment()
441
442        # Although the cursor has autocommit, we still need to force it to
443        # commit existing changes before we can use django models, otherwise
444        # it will go into deadlock when django models try to start a new
445        # trasaction while the current one has not finished yet.
446        db.commit()
447
448        # Handle retry job.
449        orig_afe_job_id = job_keyval.get(constants.RETRY_ORIGINAL_JOB_ID,
450                                            None)
451        if orig_afe_job_id:
452            orig_job_idx = tko_models.Job.objects.get(
453                    afe_job_id=orig_afe_job_id).job_idx
454            _invalidate_original_tests(orig_job_idx, job.job_idx)
455
456    # Serializing job into a binary file
457    export_tko_to_file = global_config.global_config.get_config_value(
458            'AUTOSERV', 'export_tko_job_to_file', type=bool, default=False)
459
460    binary_file_name = os.path.join(path, "job.serialize")
461    if export_tko_to_file:
462        export_tko_job_to_file(job, jobname, binary_file_name)
463
464    if not dry_run:
465        db.commit()
466
467    # Mark GS_OFFLOADER_NO_OFFLOAD in gs_offloader_instructions at the end of
468    # the function, so any failure, e.g., db connection error, will stop
469    # gs_offloader_instructions being updated, and logs can be uploaded for
470    # troubleshooting.
471    if job_successful:
472        # Check if we should not offload this test's results.
473        if job_keyval.get(constants.JOB_OFFLOAD_FAILURES_KEY, False):
474            # Update the gs_offloader_instructions json file.
475            gs_instructions_file = os.path.join(
476                    path, constants.GS_OFFLOADER_INSTRUCTIONS)
477            gs_offloader_instructions = {}
478            if os.path.exists(gs_instructions_file):
479                with open(gs_instructions_file, 'r') as f:
480                    gs_offloader_instructions = json.load(f)
481
482            gs_offloader_instructions[constants.GS_OFFLOADER_NO_OFFLOAD] = True
483            with open(gs_instructions_file, 'w') as f:
484                json.dump(gs_offloader_instructions, f)
485    return job
486
487
488def _write_job_to_db(db, jobname, job):
489    """Write all TKO data associated with a job to DB.
490
491    This updates the job object as a side effect.
492
493    @param db: tko.db.db_sql object.
494    @param jobname: Name of the job to write.
495    @param job: tko.models.job object.
496    """
497    db.insert_or_update_machine(job)
498    db.insert_job(jobname, job)
499    db.insert_or_update_task_reference(
500            job,
501            'skylab' if tko_utils.is_skylab_task(jobname) else 'afe',
502    )
503    db.update_job_keyvals(job)
504    for test in job.tests:
505        db.insert_test(job, test)
506
507
508def _find_status_log_path(path):
509    if os.path.exists(os.path.join(path, "status.log")):
510        return os.path.join(path, "status.log")
511    if os.path.exists(os.path.join(path, "status")):
512        return os.path.join(path, "status")
513    return ""
514
515
516def _parse_status_log(parser, job, status_log_path):
517    status_lines = open(status_log_path).readlines()
518    parser.start(job)
519    tests = parser.end(status_lines)
520
521    # parser.end can return the same object multiple times, so filter out dups
522    job.tests = []
523    already_added = set()
524    for test in tests:
525        if test not in already_added:
526            already_added.add(test)
527            job.tests.append(test)
528
529
530def _match_existing_tests(db, job):
531    """Find entries in the DB corresponding to the job's tests, update job.
532
533    @return: Any unmatched tests in the db.
534    """
535    old_job_idx = job.job_idx
536    raw_old_tests = db.select("test_idx,subdir,test", "tko_tests",
537                                {"job_idx": old_job_idx})
538    if raw_old_tests:
539        old_tests = dict(((test, subdir), test_idx)
540                            for test_idx, subdir, test in raw_old_tests)
541    else:
542        old_tests = {}
543
544    for test in job.tests:
545        test_idx = old_tests.pop((test.testname, test.subdir), None)
546        if test_idx is not None:
547            test.test_idx = test_idx
548        else:
549            tko_utils.dprint("! Reparse returned new test "
550                                "testname=%r subdir=%r" %
551                                (test.testname, test.subdir))
552    return old_tests
553
554
555def _delete_tests_from_db(db, tests):
556    for test_idx in six.itervalues(tests):
557        where = {'test_idx' : test_idx}
558        db.delete('tko_iteration_result', where)
559        db.delete('tko_iteration_perf_value', where)
560        db.delete('tko_iteration_attributes', where)
561        db.delete('tko_test_attributes', where)
562        db.delete('tko_test_labels_tests', {'test_id': test_idx})
563        db.delete('tko_tests', where)
564
565
566def _get_job_subdirs(path):
567    """
568    Returns a list of job subdirectories at path. Returns None if the test
569    is itself a job directory. Does not recurse into the subdirs.
570    """
571    # if there's a .machines file, use it to get the subdirs
572    machine_list = os.path.join(path, ".machines")
573    if os.path.exists(machine_list):
574        with open(machine_list, 'r') as ml:
575            subdirs = set(line.strip() for line in ml.readlines())
576        existing_subdirs = set(subdir for subdir in subdirs
577                               if os.path.exists(os.path.join(path, subdir)))
578        if len(existing_subdirs) != 0:
579            return existing_subdirs
580
581    # if this dir contains ONLY subdirectories, return them
582    contents = set(os.listdir(path))
583    contents.discard(".parse.lock")
584    subdirs = set(sub for sub in contents if
585                  os.path.isdir(os.path.join(path, sub)))
586    if len(contents) == len(subdirs) != 0:
587        return subdirs
588
589    # this is a job directory, or something else we don't understand
590    return None
591
592
593def parse_leaf_path(db, pid_file_manager, path, level, parse_options):
594    """Parse a leaf path.
595
596    @param db: database handle.
597    @param pid_file_manager: pidfile.PidFileManager object.
598    @param path: The path to the results to be parsed.
599    @param level: Integer, level of subdirectories to include in the job name.
600    @param parse_options: _ParseOptions instance.
601
602    @returns: The job name of the parsed job, e.g. '123-chromeos-test/host1'
603    """
604    job_elements = path.split("/")[-level:]
605    jobname = "/".join(job_elements)
606    db.run_with_retry(parse_one, db, pid_file_manager, jobname, path,
607                      parse_options)
608    return jobname
609
610
611def parse_path(db, pid_file_manager, path, level, parse_options):
612    """Parse a path
613
614    @param db: database handle.
615    @param pid_file_manager: pidfile.PidFileManager object.
616    @param path: The path to the results to be parsed.
617    @param level: Integer, level of subdirectories to include in the job name.
618    @param parse_options: _ParseOptions instance.
619
620    @returns: A set of job names of the parsed jobs.
621              set(['123-chromeos-test/host1', '123-chromeos-test/host2'])
622    """
623    processed_jobs = set()
624    job_subdirs = _get_job_subdirs(path)
625    if job_subdirs is not None:
626        # parse status.log in current directory, if it exists. multi-machine
627        # synchronous server side tests record output in this directory. without
628        # this check, we do not parse these results.
629        if os.path.exists(os.path.join(path, 'status.log')):
630            new_job = parse_leaf_path(db, pid_file_manager, path, level,
631                                      parse_options)
632            processed_jobs.add(new_job)
633        # multi-machine job
634        for subdir in job_subdirs:
635            jobpath = os.path.join(path, subdir)
636            new_jobs = parse_path(db, pid_file_manager, jobpath, level + 1,
637                                  parse_options)
638            processed_jobs.update(new_jobs)
639    else:
640        # single machine job
641        new_job = parse_leaf_path(db, pid_file_manager, path, level,
642                                  parse_options)
643        processed_jobs.add(new_job)
644    return processed_jobs
645
646
647def _detach_from_parent_process():
648    """Allow reparenting the parse process away from caller.
649
650    When monitor_db is run via upstart, restarting the job sends SIGTERM to
651    the whole process group. This makes us immune from that.
652    """
653    if os.getpid() != os.getpgid(0):
654        os.setsid()
655
656
657def main():
658    """tko_parse entry point."""
659    options, args = parse_args()
660
661    # We are obliged to use indirect=False, not use the SetupTsMonGlobalState
662    # context manager, and add a manual flush, because tko/parse is expected to
663    # be a very short lived (<1 min) script when working effectively, and we
664    # can't afford to either a) wait for up to 1min for metrics to flush at the
665    # end or b) drop metrics that were sent within the last minute of execution.
666    site_utils.SetupTsMonGlobalState('tko_parse', indirect=False,
667                                     short_lived=True)
668    try:
669        with metrics.SuccessCounter('chromeos/autotest/tko_parse/runs'):
670            _main_with_options(options, args)
671    finally:
672        metrics.Flush()
673
674
675def _main_with_options(options, args):
676    """Entry point with options parsed and metrics already set up."""
677    # Record the processed jobs so that
678    # we can send the duration of parsing to metadata db.
679    processed_jobs = set()
680
681    if options.detach:
682        _detach_from_parent_process()
683
684    results_dir = os.path.abspath(args[0])
685    assert os.path.exists(results_dir)
686
687    _update_db_config_from_json(options, results_dir)
688
689    parse_options = _ParseOptions(options.reparse, options.mailit,
690                                  options.dry_run, options.suite_report,
691                                  options.datastore_creds,
692                                  options.export_to_gcloud_path,
693                                  options.disable_perf_upload)
694
695    pid_file_manager = pidfile.PidFileManager("parser", results_dir)
696
697    if options.write_pidfile:
698        pid_file_manager.open_file()
699
700    try:
701        # build up the list of job dirs to parse
702        if options.singledir:
703            jobs_list = [results_dir]
704        else:
705            jobs_list = [os.path.join(results_dir, subdir)
706                         for subdir in os.listdir(results_dir)]
707
708        # build up the database
709        db = tko_db.db(autocommit=False, host=options.db_host,
710                       user=options.db_user, password=options.db_pass,
711                       database=options.db_name)
712
713        # parse all the jobs
714        for path in jobs_list:
715            lockfile = open(os.path.join(path, ".parse.lock"), "w")
716            flags = fcntl.LOCK_EX
717            if options.noblock:
718                flags |= fcntl.LOCK_NB
719            try:
720                fcntl.flock(lockfile, flags)
721            except IOError as e:
722                # lock is not available and nonblock has been requested
723                if e.errno == errno.EWOULDBLOCK:
724                    lockfile.close()
725                    continue
726                else:
727                    raise # something unexpected happened
728            try:
729                new_jobs = parse_path(db, pid_file_manager, path, options.level,
730                                      parse_options)
731                processed_jobs.update(new_jobs)
732
733            finally:
734                fcntl.flock(lockfile, fcntl.LOCK_UN)
735                lockfile.close()
736
737    except Exception as e:
738        pid_file_manager.close_file(1)
739        raise
740    else:
741        pid_file_manager.close_file(0)
742
743
744def _update_db_config_from_json(options, test_results_dir):
745    """Uptade DB config options using a side_effects_config.json file.
746
747    @param options: parsed args to be updated.
748    @param test_results_dir: path to test results dir.
749
750    @raises: json_format.ParseError if the file is not a valid JSON.
751             ValueError if the JSON config is incomplete.
752             OSError if some files from the JSON config are missing.
753    """
754    # results_dir passed to tko/parse is a subdir of the root results dir
755    config_dir = os.path.join(test_results_dir, os.pardir)
756    tko_utils.dprint("Attempting to read side_effects.Config from %s" %
757        config_dir)
758    config = config_loader.load(config_dir)
759
760    if config:
761        tko_utils.dprint("Validating side_effects.Config.tko")
762        config_loader.validate_tko(config)
763
764        tko_utils.dprint("Using the following DB config params from "
765            "side_effects.Config.tko:\n%s" % config.tko)
766        options.db_host = config.tko.proxy_socket
767        options.db_user = config.tko.mysql_user
768
769        with open(config.tko.mysql_password_file, 'r') as f:
770            options.db_pass = f.read().rstrip('\n')
771
772        options.disable_perf_upload = not config.chrome_perf.enabled
773    else:
774        tko_utils.dprint("No side_effects.Config found in %s - "
775            "defaulting to DB config values from shadow config" % config_dir)
776
777
778if __name__ == "__main__":
779    main()
780