1# Copyright 2018 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Module for CrOS dynamic test suite generation and execution.""" 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11import contextlib 12import itertools 13import json 14import logging 15import os 16import re 17import time 18 19from lucifer import autotest 20from skylab_suite import cros_suite 21from skylab_suite import swarming_lib 22 23 24SKYLAB_DRONE_SWARMING_WORKER = '/opt/infra-tools/skylab_swarming_worker' 25SKYLAB_SUITE_USER = 'skylab_suite_runner' 26SKYLAB_TOOL = '/opt/infra-tools/skylab' 27 28SUITE_WAIT_SLEEP_INTERVAL_SECONDS = 30 29 30# See #5 in crbug.com/873886 for more details. 31_NOT_SUPPORTED_DEPENDENCIES = ['skip_provision', 'cleanup-reboot', 'rpm', 32 'modem_repair'] 33 34 35def run(client, test_specs, suite_handler, dry_run=False): 36 """Run a CrOS dynamic test suite. 37 38 @param client: A swarming_lib.Client instance. 39 @param test_specs: A list of cros_suite.TestSpec objects. 40 @param suite_handler: A cros_suite.SuiteHandler object. 41 @param dry_run: Whether to kick off dry runs of the tests. 42 """ 43 assert isinstance(client, swarming_lib.Client) 44 if suite_handler.suite_id: 45 # Resume an existing suite. 46 _resume_suite(client, test_specs, suite_handler, dry_run) 47 else: 48 # Make a new suite. 49 _run_suite(test_specs, suite_handler, dry_run) 50 51 52def _resume_suite(client, test_specs, suite_handler, dry_run=False): 53 """Resume a suite and its child tasks by given suite id.""" 54 assert isinstance(client, swarming_lib.Client) 55 suite_id = suite_handler.suite_id 56 all_tasks = client.get_child_tasks(suite_id) 57 not_yet_scheduled = _get_unscheduled_test_specs( 58 test_specs, suite_handler, all_tasks) 59 60 logging.info('Not yet scheduled test_specs: %r', not_yet_scheduled) 61 _create_test_tasks(not_yet_scheduled, suite_handler, suite_id, dry_run) 62 63 if suite_id is not None and suite_handler.should_wait(): 64 _wait_for_results(suite_handler, dry_run=dry_run) 65 66 67def _get_unscheduled_test_specs(test_specs, suite_handler, all_tasks): 68 not_yet_scheduled = [] 69 for test_spec in test_specs: 70 if suite_handler.is_provision(): 71 # We cannot check bot_id because pending tasks do not have it yet. 72 bot_id_tag = 'id:%s' % test_spec.bot_id 73 tasks = [t for t in all_tasks if bot_id_tag in t['tags']] 74 else: 75 tasks = [t for t in all_tasks if t['name']==test_spec.test.name] 76 77 if not tasks: 78 not_yet_scheduled.append(test_spec) 79 continue 80 81 current_task = _get_current_task(tasks) 82 test_task_id = (current_task['task_id'] if current_task 83 else tasks[0]['task_id']) 84 remaining_retries = test_spec.test.job_retries - len(tasks) 85 previous_retried_ids = [t['task_id'] for t in tasks 86 if t['task_id'] != test_task_id] 87 suite_handler.add_test_by_task_id( 88 test_task_id, 89 cros_suite.TestHandlerSpec( 90 test_spec=test_spec, 91 remaining_retries=remaining_retries, 92 previous_retried_ids=previous_retried_ids)) 93 94 return not_yet_scheduled 95 96 97def _get_current_task(tasks): 98 """Get current running task. 99 100 @param tasks: A list of task dicts including task_id, state, etc. 101 102 @return a dict representing the current running task. 103 """ 104 current_task = None 105 for t in tasks: 106 if t['state'] not in swarming_lib.TASK_FINISHED_STATUS: 107 if current_task: 108 raise ValueError( 109 'Parent task has 2 same running child tasks: %s, %s' 110 % (current_task['task_id'], t['task_id'])) 111 112 current_task = t 113 114 return current_task 115 116 117def _run_suite(test_specs, suite_handler, dry_run=False): 118 """Make a new suite.""" 119 suite_id = os.environ.get('SWARMING_TASK_ID') 120 if not suite_id: 121 raise ValueError("Unable to determine suite's task id from env var " 122 "SWARMING_TASK_ID.") 123 _create_test_tasks(test_specs, suite_handler, suite_id, dry_run) 124 suite_handler.set_suite_id(suite_id) 125 126 if suite_handler.should_wait(): 127 _wait_for_results(suite_handler, dry_run=dry_run) 128 129 130def _create_test_tasks(test_specs, suite_handler, suite_id, dry_run=False): 131 """Create test tasks for a list of tests (TestSpecs). 132 133 Given a list of TestSpec object, this function will schedule them on 134 swarming one by one, and add them to the swarming_task_id-to-test map 135 of suite_handler to keep monitoring them. 136 137 @param test_specs: A list of cros_suite.TestSpec objects to schedule. 138 @param suite_handler: A cros_suite.SuiteHandler object to monitor the 139 test_specs' progress. 140 @param suite_id: A string ID for a suite task, it's the parent task id for 141 these to-be-scheduled test_specs. 142 @param dry_run: Whether to kick off dry runs of the tests. 143 """ 144 for test_spec in test_specs: 145 test_task_id = _create_test_task( 146 test_spec, 147 suite_id=suite_id, 148 is_provision=suite_handler.is_provision(), 149 dry_run=dry_run) 150 suite_handler.add_test_by_task_id( 151 test_task_id, 152 cros_suite.TestHandlerSpec( 153 test_spec=test_spec, 154 remaining_retries=test_spec.test.job_retries - 1, 155 previous_retried_ids=[])) 156 157 158def _create_test_task(test_spec, suite_id=None, 159 is_provision=False, dry_run=False): 160 """Create a test task for a given test spec. 161 162 @param test_spec: A cros_suite.TestSpec object. 163 @param suite_id: the suite task id of the test. 164 @param dry_run: If true, don't actually create task. 165 166 @return the swarming task id of this task. 167 """ 168 logging.info('Creating task for test %s', test_spec.test.name) 169 skylab_tool_path = os.environ.get('SKYLAB_TOOL', SKYLAB_TOOL) 170 171 cmd = [ 172 skylab_tool_path, 'create-test', 173 '-board', test_spec.board, 174 '-image', test_spec.build, 175 '-service-account-json', os.environ['SWARMING_CREDS'], 176 ] 177 if _is_dev(): 178 cmd += ['-dev'] 179 if test_spec.pool: 180 # TODO(akeshet): Clean up this hack around pool name translation. 181 autotest_pool_label = 'pool:%s' % test_spec.pool 182 pool_dependency_value = swarming_lib.task_dependencies_from_labels( 183 [autotest_pool_label])['label-pool'] 184 cmd += ['-pool', pool_dependency_value] 185 186 if test_spec.model: 187 cmd += ['-model', test_spec.model] 188 if test_spec.quota_account: 189 cmd += ['-qs-account', test_spec.quota_account] 190 if test_spec.test.test_type.lower() == 'client': 191 cmd += ['-client-test'] 192 193 tags = _compute_tags(test_spec.build, suite_id) 194 dimensions = _compute_dimensions( 195 test_spec.bot_id, test_spec.test.dependencies) 196 keyvals_flat = _compute_job_keyvals_flat(test_spec.keyvals, suite_id) 197 198 for tag in tags: 199 cmd += ['-tag', tag] 200 for keyval in keyvals_flat: 201 cmd += ['-keyval', keyval] 202 cmd += [test_spec.test.name] 203 cmd += dimensions 204 205 if dry_run: 206 logging.info('Would have created task with command %s', cmd) 207 return 208 209 # TODO(akeshet): Avoid this late chromite import. 210 cros_build_lib = autotest.chromite_load('cros_build_lib') 211 result = cros_build_lib.RunCommand(cmd, capture_output=True) 212 # TODO(akeshet): Use -json flag and json-parse output of the command instead 213 # of regex matching to determine task_id. 214 m = re.match('.*id=(.*)$', result.output) 215 task_id = m.group(1) 216 logging.info('Created task with id %s', task_id) 217 return task_id 218 219 220# TODO(akeshet): Eliminate the need for this, by either adding an explicit 221# swarming_server argument to skylab tool, or having the tool respect the 222# SWARMING_SERVER environment variable. See crbug.com/948774 223def _is_dev(): 224 """Detect whether skylab tool should be invoked with -dev flag.""" 225 return 'chromium-swarm-dev' in os.environ['SWARMING_SERVER'] 226 227def _compute_tags(build, suite_id): 228 tags = [ 229 'build:%s' % build, 230 ] 231 if suite_id is not None: 232 tags += ['parent_task_id:%s' % suite_id] 233 return tags 234 235 236def _compute_dimensions(bot_id, dependencies): 237 dimensions = [] 238 if bot_id: 239 dimensions += ['id:%s' % bot_id] 240 deps = _filter_unsupported_dependencies(dependencies) 241 flattened_swarming_deps = sorted([ 242 '%s:%s' % (k, v) for 243 k, v in swarming_lib.task_dependencies_from_labels(deps).items() 244 ]) 245 dimensions += flattened_swarming_deps 246 return dimensions 247 248 249def _compute_job_keyvals_flat(keyvals, suite_id): 250 # Job keyvals calculation. 251 job_keyvals = keyvals.copy() 252 if suite_id is not None: 253 # TODO(akeshet): Avoid this late autotest constants import. 254 constants = autotest.load('server.cros.dynamic_suite.constants') 255 job_keyvals[constants.PARENT_JOB_ID] = suite_id 256 keyvals_flat = sorted( 257 ['%s:%s' % (k, v) for k, v in job_keyvals.items()]) 258 return keyvals_flat 259 260 261def _filter_unsupported_dependencies(dependencies): 262 """Filter out Skylab-unsupported test dependencies, with a warning.""" 263 deps = [] 264 for dep in dependencies: 265 if dep in _NOT_SUPPORTED_DEPENDENCIES: 266 logging.warning('Dependency %s is not supported in skylab', dep) 267 else: 268 deps.append(dep) 269 return deps 270 271 272@contextlib.contextmanager 273def disable_logging(logging_level): 274 """Context manager for disabling logging of a given logging level.""" 275 try: 276 logging.disable(logging_level) 277 yield 278 finally: 279 logging.disable(logging.NOTSET) 280 281 282def _loop_and_wait_forever(suite_handler, dry_run): 283 """Wait for child tasks to finish or break.""" 284 for iterations in itertools.count(0): 285 # Log progress every 300 seconds. 286 no_logging = bool(iterations * SUITE_WAIT_SLEEP_INTERVAL_SECONDS % 300) 287 with disable_logging(logging.INFO if no_logging else logging.NOTSET): 288 suite_handler.handle_results(suite_handler.suite_id) 289 if suite_handler.is_finished_waiting(): 290 break 291 292 for t in suite_handler.retried_tasks: 293 _retry_test(suite_handler, t['task_id'], dry_run=dry_run) 294 295 time.sleep(SUITE_WAIT_SLEEP_INTERVAL_SECONDS) 296 297 298def _wait_for_results(suite_handler, dry_run=False): 299 """Wait for child tasks to finish and return their results. 300 301 @param suite_handler: a cros_suite.SuiteHandler object. 302 """ 303 timeout_util = autotest.chromite_load('timeout_util') 304 try: 305 with timeout_util.Timeout(suite_handler.timeout_mins * 60 - 306 suite_handler.passed_mins * 60): 307 _loop_and_wait_forever(suite_handler, dry_run) 308 except timeout_util.TimeoutError: 309 logging.error('Timeout in waiting for child tasks.') 310 return 311 312 logging.info('Finished to wait for child tasks.') 313 314 315def _retry_test(suite_handler, task_id, dry_run=False): 316 """Retry test for a suite. 317 318 We will execute the following actions for retrying a test: 319 1. Schedule the test. 320 2. Add the test with the new swarming task id to the suite's 321 retry handler, but reduce its remaining retries by 1. 322 3. Reduce the suite-level max retries by 1. 323 4. Remove prevous failed test from retry handler since it's not 324 actively monitored by the suite. 325 326 @param suite_handler: a cros_suite.SuiteHandler object. 327 @param task_id: The swarming task id for the retried test. 328 @param dry_run: Whether to retry a dry run of the test. 329 """ 330 last_retry_spec = suite_handler.get_test_by_task_id(task_id) 331 logging.info('Retrying test %s, remaining %d retries.', 332 last_retry_spec.test_spec.test.name, 333 last_retry_spec.remaining_retries - 1) 334 retried_task_id = _create_test_task( 335 last_retry_spec.test_spec, 336 suite_id=suite_handler.suite_id, 337 is_provision=suite_handler.is_provision(), 338 dry_run=dry_run) 339 previous_retried_ids = last_retry_spec.previous_retried_ids + [task_id] 340 suite_handler.add_test_by_task_id( 341 retried_task_id, 342 cros_suite.TestHandlerSpec( 343 test_spec=last_retry_spec.test_spec, 344 remaining_retries=last_retry_spec.remaining_retries - 1, 345 previous_retried_ids=previous_retried_ids)) 346 suite_handler.set_max_retries(suite_handler.max_retries - 1) 347 suite_handler.remove_test_by_task_id(task_id) 348 349 350def _convert_dict_to_string(input_dict): 351 """Convert dictionary to a string. 352 353 @param input_dict: A dictionary. 354 """ 355 for k, v in input_dict.iteritems(): 356 if isinstance(v, dict): 357 input_dict[k] = _convert_dict_to_string(v) 358 else: 359 input_dict[k] = str(v) 360 361 return json.dumps(input_dict) 362