• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Module for CrOS dynamic test suite generation and execution."""
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11import contextlib
12import itertools
13import json
14import logging
15import os
16import re
17import time
18
19from lucifer import autotest
20from skylab_suite import cros_suite
21from skylab_suite import swarming_lib
22
23
24SKYLAB_DRONE_SWARMING_WORKER = '/opt/infra-tools/skylab_swarming_worker'
25SKYLAB_SUITE_USER = 'skylab_suite_runner'
26SKYLAB_TOOL = '/opt/infra-tools/skylab'
27
28SUITE_WAIT_SLEEP_INTERVAL_SECONDS = 30
29
30# See #5 in crbug.com/873886 for more details.
31_NOT_SUPPORTED_DEPENDENCIES = ['skip_provision', 'cleanup-reboot', 'rpm',
32                               'modem_repair']
33
34
35def run(client, test_specs, suite_handler, dry_run=False):
36    """Run a CrOS dynamic test suite.
37
38    @param client: A swarming_lib.Client instance.
39    @param test_specs: A list of cros_suite.TestSpec objects.
40    @param suite_handler: A cros_suite.SuiteHandler object.
41    @param dry_run: Whether to kick off dry runs of the tests.
42    """
43    assert isinstance(client, swarming_lib.Client)
44    if suite_handler.suite_id:
45        # Resume an existing suite.
46        _resume_suite(client, test_specs, suite_handler, dry_run)
47    else:
48        # Make a new suite.
49        _run_suite(test_specs, suite_handler, dry_run)
50
51
52def _resume_suite(client, test_specs, suite_handler, dry_run=False):
53    """Resume a suite and its child tasks by given suite id."""
54    assert isinstance(client, swarming_lib.Client)
55    suite_id = suite_handler.suite_id
56    all_tasks = client.get_child_tasks(suite_id)
57    not_yet_scheduled = _get_unscheduled_test_specs(
58            test_specs, suite_handler, all_tasks)
59
60    logging.info('Not yet scheduled test_specs: %r', not_yet_scheduled)
61    _create_test_tasks(not_yet_scheduled, suite_handler, suite_id, dry_run)
62
63    if suite_id is not None and suite_handler.should_wait():
64        _wait_for_results(suite_handler, dry_run=dry_run)
65
66
67def _get_unscheduled_test_specs(test_specs, suite_handler, all_tasks):
68    not_yet_scheduled = []
69    for test_spec in test_specs:
70        if suite_handler.is_provision():
71            # We cannot check bot_id because pending tasks do not have it yet.
72            bot_id_tag = 'id:%s' % test_spec.bot_id
73            tasks = [t for t in all_tasks if bot_id_tag in t['tags']]
74        else:
75            tasks = [t for t in all_tasks if t['name']==test_spec.test.name]
76
77        if not tasks:
78            not_yet_scheduled.append(test_spec)
79            continue
80
81        current_task = _get_current_task(tasks)
82        test_task_id = (current_task['task_id'] if current_task
83                        else tasks[0]['task_id'])
84        remaining_retries = test_spec.test.job_retries - len(tasks)
85        previous_retried_ids = [t['task_id'] for t in tasks
86                                if t['task_id'] != test_task_id]
87        suite_handler.add_test_by_task_id(
88                test_task_id,
89                cros_suite.TestHandlerSpec(
90                        test_spec=test_spec,
91                        remaining_retries=remaining_retries,
92                        previous_retried_ids=previous_retried_ids))
93
94    return not_yet_scheduled
95
96
97def _get_current_task(tasks):
98    """Get current running task.
99
100    @param tasks: A list of task dicts including task_id, state, etc.
101
102    @return a dict representing the current running task.
103    """
104    current_task = None
105    for t in tasks:
106        if t['state'] not in swarming_lib.TASK_FINISHED_STATUS:
107            if current_task:
108                raise ValueError(
109                        'Parent task has 2 same running child tasks: %s, %s'
110                        % (current_task['task_id'], t['task_id']))
111
112            current_task = t
113
114    return current_task
115
116
117def _run_suite(test_specs, suite_handler, dry_run=False):
118    """Make a new suite."""
119    suite_id = os.environ.get('SWARMING_TASK_ID')
120    if not suite_id:
121        raise ValueError("Unable to determine suite's task id from env var "
122                         "SWARMING_TASK_ID.")
123    _create_test_tasks(test_specs, suite_handler, suite_id, dry_run)
124    suite_handler.set_suite_id(suite_id)
125
126    if suite_handler.should_wait():
127        _wait_for_results(suite_handler, dry_run=dry_run)
128
129
130def _create_test_tasks(test_specs, suite_handler, suite_id, dry_run=False):
131    """Create test tasks for a list of tests (TestSpecs).
132
133    Given a list of TestSpec object, this function will schedule them on
134    swarming one by one, and add them to the swarming_task_id-to-test map
135    of suite_handler to keep monitoring them.
136
137    @param test_specs: A list of cros_suite.TestSpec objects to schedule.
138    @param suite_handler: A cros_suite.SuiteHandler object to monitor the
139        test_specs' progress.
140    @param suite_id: A string ID for a suite task, it's the parent task id for
141        these to-be-scheduled test_specs.
142    @param dry_run: Whether to kick off dry runs of the tests.
143    """
144    for test_spec in test_specs:
145        test_task_id = _create_test_task(
146                test_spec,
147                suite_id=suite_id,
148                is_provision=suite_handler.is_provision(),
149                dry_run=dry_run)
150        suite_handler.add_test_by_task_id(
151                test_task_id,
152                cros_suite.TestHandlerSpec(
153                        test_spec=test_spec,
154                        remaining_retries=test_spec.test.job_retries - 1,
155                        previous_retried_ids=[]))
156
157
158def _create_test_task(test_spec, suite_id=None,
159                      is_provision=False, dry_run=False):
160    """Create a test task for a given test spec.
161
162    @param test_spec: A cros_suite.TestSpec object.
163    @param suite_id: the suite task id of the test.
164    @param dry_run: If true, don't actually create task.
165
166    @return the swarming task id of this task.
167    """
168    logging.info('Creating task for test %s', test_spec.test.name)
169    skylab_tool_path = os.environ.get('SKYLAB_TOOL', SKYLAB_TOOL)
170
171    cmd = [
172        skylab_tool_path, 'create-test',
173        '-board', test_spec.board,
174        '-image', test_spec.build,
175        '-service-account-json', os.environ['SWARMING_CREDS'],
176        ]
177    if _is_dev():
178        cmd += ['-dev']
179    if test_spec.pool:
180        # TODO(akeshet): Clean up this hack around pool name translation.
181        autotest_pool_label = 'pool:%s' % test_spec.pool
182        pool_dependency_value = swarming_lib.task_dependencies_from_labels(
183            [autotest_pool_label])['label-pool']
184        cmd += ['-pool', pool_dependency_value]
185
186    if test_spec.model:
187        cmd += ['-model', test_spec.model]
188    if test_spec.quota_account:
189        cmd += ['-qs-account', test_spec.quota_account]
190    if test_spec.test.test_type.lower() == 'client':
191        cmd += ['-client-test']
192
193    tags = _compute_tags(test_spec.build, suite_id)
194    dimensions = _compute_dimensions(
195            test_spec.bot_id, test_spec.test.dependencies)
196    keyvals_flat = _compute_job_keyvals_flat(test_spec.keyvals, suite_id)
197
198    for tag in tags:
199        cmd += ['-tag', tag]
200    for keyval in keyvals_flat:
201        cmd += ['-keyval', keyval]
202    cmd += [test_spec.test.name]
203    cmd += dimensions
204
205    if dry_run:
206        logging.info('Would have created task with command %s', cmd)
207        return
208
209    # TODO(akeshet): Avoid this late chromite import.
210    cros_build_lib = autotest.chromite_load('cros_build_lib')
211    result = cros_build_lib.RunCommand(cmd, capture_output=True)
212    # TODO(akeshet): Use -json flag and json-parse output of the command instead
213    # of regex matching to determine task_id.
214    m = re.match('.*id=(.*)$', result.output)
215    task_id = m.group(1)
216    logging.info('Created task with id %s', task_id)
217    return task_id
218
219
220# TODO(akeshet): Eliminate the need for this, by either adding an explicit
221# swarming_server argument to skylab tool, or having the tool respect the
222# SWARMING_SERVER environment variable. See crbug.com/948774
223def _is_dev():
224    """Detect whether skylab tool should be invoked with -dev flag."""
225    return 'chromium-swarm-dev' in os.environ['SWARMING_SERVER']
226
227def _compute_tags(build, suite_id):
228    tags = [
229        'build:%s' % build,
230    ]
231    if suite_id is not None:
232        tags += ['parent_task_id:%s' % suite_id]
233    return tags
234
235
236def _compute_dimensions(bot_id, dependencies):
237    dimensions = []
238    if bot_id:
239        dimensions += ['id:%s' % bot_id]
240    deps = _filter_unsupported_dependencies(dependencies)
241    flattened_swarming_deps = sorted([
242        '%s:%s' % (k, v) for
243        k, v in swarming_lib.task_dependencies_from_labels(deps).items()
244        ])
245    dimensions += flattened_swarming_deps
246    return dimensions
247
248
249def _compute_job_keyvals_flat(keyvals, suite_id):
250    # Job keyvals calculation.
251    job_keyvals = keyvals.copy()
252    if suite_id is not None:
253        # TODO(akeshet): Avoid this late autotest constants import.
254        constants = autotest.load('server.cros.dynamic_suite.constants')
255        job_keyvals[constants.PARENT_JOB_ID] = suite_id
256    keyvals_flat = sorted(
257        ['%s:%s' % (k, v) for k, v in job_keyvals.items()])
258    return keyvals_flat
259
260
261def _filter_unsupported_dependencies(dependencies):
262    """Filter out Skylab-unsupported test dependencies, with a warning."""
263    deps = []
264    for dep in dependencies:
265        if dep in _NOT_SUPPORTED_DEPENDENCIES:
266            logging.warning('Dependency %s is not supported in skylab', dep)
267        else:
268            deps.append(dep)
269    return deps
270
271
272@contextlib.contextmanager
273def disable_logging(logging_level):
274    """Context manager for disabling logging of a given logging level."""
275    try:
276        logging.disable(logging_level)
277        yield
278    finally:
279        logging.disable(logging.NOTSET)
280
281
282def _loop_and_wait_forever(suite_handler, dry_run):
283    """Wait for child tasks to finish or break."""
284    for iterations in itertools.count(0):
285        # Log progress every 300 seconds.
286        no_logging = bool(iterations * SUITE_WAIT_SLEEP_INTERVAL_SECONDS % 300)
287        with disable_logging(logging.INFO if no_logging else logging.NOTSET):
288            suite_handler.handle_results(suite_handler.suite_id)
289            if suite_handler.is_finished_waiting():
290                break
291
292        for t in suite_handler.retried_tasks:
293            _retry_test(suite_handler, t['task_id'], dry_run=dry_run)
294
295        time.sleep(SUITE_WAIT_SLEEP_INTERVAL_SECONDS)
296
297
298def _wait_for_results(suite_handler, dry_run=False):
299    """Wait for child tasks to finish and return their results.
300
301    @param suite_handler: a cros_suite.SuiteHandler object.
302    """
303    timeout_util = autotest.chromite_load('timeout_util')
304    try:
305        with timeout_util.Timeout(suite_handler.timeout_mins * 60 -
306                                  suite_handler.passed_mins * 60):
307            _loop_and_wait_forever(suite_handler, dry_run)
308    except timeout_util.TimeoutError:
309        logging.error('Timeout in waiting for child tasks.')
310        return
311
312    logging.info('Finished to wait for child tasks.')
313
314
315def _retry_test(suite_handler, task_id, dry_run=False):
316    """Retry test for a suite.
317
318    We will execute the following actions for retrying a test:
319        1. Schedule the test.
320        2. Add the test with the new swarming task id to the suite's
321           retry handler, but reduce its remaining retries by 1.
322        3. Reduce the suite-level max retries by 1.
323        4. Remove prevous failed test from retry handler since it's not
324           actively monitored by the suite.
325
326    @param suite_handler: a cros_suite.SuiteHandler object.
327    @param task_id: The swarming task id for the retried test.
328    @param dry_run: Whether to retry a dry run of the test.
329    """
330    last_retry_spec = suite_handler.get_test_by_task_id(task_id)
331    logging.info('Retrying test %s, remaining %d retries.',
332                 last_retry_spec.test_spec.test.name,
333                 last_retry_spec.remaining_retries - 1)
334    retried_task_id = _create_test_task(
335            last_retry_spec.test_spec,
336            suite_id=suite_handler.suite_id,
337            is_provision=suite_handler.is_provision(),
338            dry_run=dry_run)
339    previous_retried_ids = last_retry_spec.previous_retried_ids + [task_id]
340    suite_handler.add_test_by_task_id(
341            retried_task_id,
342            cros_suite.TestHandlerSpec(
343                    test_spec=last_retry_spec.test_spec,
344                    remaining_retries=last_retry_spec.remaining_retries - 1,
345                    previous_retried_ids=previous_retried_ids))
346    suite_handler.set_max_retries(suite_handler.max_retries - 1)
347    suite_handler.remove_test_by_task_id(task_id)
348
349
350def _convert_dict_to_string(input_dict):
351    """Convert dictionary to a string.
352
353    @param input_dict: A dictionary.
354    """
355    for k, v in input_dict.iteritems():
356        if isinstance(v, dict):
357            input_dict[k] = _convert_dict_to_string(v)
358        else:
359            input_dict[k] = str(v)
360
361    return json.dumps(input_dict)
362