1# Copyright 2018 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Definition of a CrOS suite in skylab. 6 7This file is a simplicication of dynamic_suite.suite without any useless 8features for skylab suite. 9 10Suite class in this file mainly has 2 features: 11 1. Integrate parameters from control file & passed in arguments. 12 2. Find proper child tests for a given suite. 13 14Use case: 15 See _run_suite() in skylab_suite.run_suite_skylab. 16""" 17 18from __future__ import absolute_import 19from __future__ import division 20from __future__ import print_function 21 22import collections 23import logging 24import os 25 26from lucifer import autotest 27from skylab_suite import errors 28from skylab_suite import swarming_lib 29 30 31SuiteSpec = collections.namedtuple( 32 'SuiteSpec', 33 [ 34 'builds', 35 'suite_name', 36 'suite_file_name', 37 'test_source_build', 38 'suite_args', 39 'priority', 40 'board', 41 'model', 42 'pool', 43 'job_keyvals', 44 'minimum_duts', 45 'timeout_mins', 46 'quota_account', 47 ]) 48 49SuiteHandlerSpec = collections.namedtuple( 50 'SuiteHandlerSpec', 51 [ 52 'suite_name', 53 'wait', 54 'suite_id', 55 'timeout_mins', 56 'passed_mins', 57 'test_retry', 58 'max_retries', 59 'provision_num_required', 60 ]) 61 62TestHandlerSpec = collections.namedtuple( 63 'TestHandlerSpec', 64 [ 65 'test_spec', 66 'remaining_retries', 67 'previous_retried_ids', 68 ]) 69 70TestSpec = collections.namedtuple( 71 'TestSpec', 72 [ 73 'test', 74 'priority', 75 'board', 76 'model', 77 'pool', 78 'build', 79 'keyvals', 80 # TODO(akeshet): Determine why this is necessary 81 # (can't this just be specified as its own dimension?) and 82 # delete it if it isn't necessary. 83 'bot_id', 84 'dut_name', 85 'expiration_secs', 86 'grace_period_secs', 87 'execution_timeout_secs', 88 'io_timeout_secs', 89 'quota_account', 90 ]) 91 92 93class SuiteHandler(object): 94 """The class for handling a CrOS suite run. 95 96 Its responsibility includes handling retries for child tests. 97 """ 98 99 def __init__(self, specs, client): 100 self._suite_name = specs.suite_name 101 self._wait = specs.wait 102 self._timeout_mins = specs.timeout_mins 103 self._provision_num_required = specs.provision_num_required 104 self._test_retry = specs.test_retry 105 self._max_retries = specs.max_retries 106 self.passed_mins = specs.passed_mins 107 108 # The swarming task id of the suite that this suite_handler is handling. 109 self._suite_id = specs.suite_id 110 # The swarming task id of current run_suite_skylab process. It could be 111 # different from self._suite_id if a suite_id is passed in. 112 self._task_id = os.environ.get('SWARMING_TASK_ID') 113 self._task_to_test_maps = {} 114 self.successfully_provisioned_duts = set() 115 self._client = client 116 117 # It only maintains the swarming task of the final run of each 118 # child task, i.e. it doesn't include failed swarming tasks of 119 # each child task which will get retried later. 120 self._active_child_tasks = [] 121 122 def should_wait(self): 123 """Return whether to wait for a suite's result.""" 124 return self._wait 125 126 def is_provision(self): 127 """Return whether the suite handler is for provision suite.""" 128 return self._suite_name == 'provision' 129 130 def set_suite_id(self, suite_id): 131 """Set swarming task id for a suite. 132 133 @param suite_id: The swarming task id of this suite. 134 """ 135 self._suite_id = suite_id 136 137 def add_test_by_task_id(self, task_id, test_handler_spec): 138 """Record a child test and its swarming task id. 139 140 @param task_id: the swarming task id of a child test. 141 @param test_handler_spec: a TestHandlerSpec object. 142 """ 143 self._task_to_test_maps[task_id] = test_handler_spec 144 145 def get_test_by_task_id(self, task_id): 146 """Get a child test by its swarming task id. 147 148 @param task_id: the swarming task id of a child test. 149 """ 150 return self._task_to_test_maps[task_id] 151 152 def remove_test_by_task_id(self, task_id): 153 """Delete a child test by its swarming task id. 154 155 @param task_id: the swarming task id of a child test. 156 """ 157 self._task_to_test_maps.pop(task_id, None) 158 159 def set_max_retries(self, max_retries): 160 """Set the max retries for a suite. 161 162 @param max_retries: The current maximum retries to set. 163 """ 164 self._max_retries = max_retries 165 166 @property 167 def task_to_test_maps(self): 168 """Get the task_to_test_maps of a suite.""" 169 return self._task_to_test_maps 170 171 @property 172 def timeout_mins(self): 173 """Get the timeout minutes of a suite.""" 174 return self._timeout_mins 175 176 @property 177 def suite_id(self): 178 """Get the swarming task id of a suite.""" 179 return self._suite_id 180 181 @property 182 def task_id(self): 183 """Get swarming task id of current process.""" 184 return self._task_id 185 186 @property 187 def max_retries(self): 188 """Get the max num of retries of a suite.""" 189 return self._max_retries 190 191 def get_active_child_tasks(self, suite_id): 192 """Get the child tasks which is actively monitored by a suite. 193 194 The active child tasks list includes tasks which are currently running 195 or finished without following retries. E.g. 196 Suite task X: 197 child task 1: x1 (first try x1_1, second try x1_2) 198 child task 2: x2 (first try: x2_1) 199 The final active child task list will include task x1_2 and x2_1, won't 200 include x1_1 since it's a task which is finished but get retried later. 201 """ 202 all_tasks = self._client.get_child_tasks(suite_id) 203 return [t for t in all_tasks if t['task_id'] in self._task_to_test_maps] 204 205 def handle_results(self, suite_id): 206 """Handle child tasks' results.""" 207 self._active_child_tasks = self.get_active_child_tasks(suite_id) 208 self.retried_tasks = [t for t in self._active_child_tasks 209 if self._should_retry(t)] 210 logging.info('Found %d tests to be retried.', len(self.retried_tasks)) 211 212 def _check_all_tasks_finished(self): 213 """Check whether all tasks are finished, including retried tasks.""" 214 finished_tasks = [t for t in self._active_child_tasks if 215 t['state'] in swarming_lib.TASK_FINISHED_STATUS] 216 logging.info('%d/%d child tasks finished, %d got retried.', 217 len(finished_tasks), len(self._active_child_tasks), 218 len(self.retried_tasks)) 219 return (len(finished_tasks) == len(self._active_child_tasks) 220 and not self.retried_tasks) 221 222 def _set_successful_provisioned_duts(self): 223 """Set successfully provisioned duts.""" 224 for t in self._active_child_tasks: 225 if (swarming_lib.get_task_final_state(t) == 226 swarming_lib.TASK_COMPLETED_SUCCESS): 227 dut_name = self.get_test_by_task_id( 228 t['task_id']).test_spec.dut_name 229 if dut_name: 230 self.successfully_provisioned_duts.add(dut_name) 231 232 def is_provision_successfully_finished(self): 233 """Check whether provision succeeds.""" 234 logging.info('Found %d successfully provisioned duts, ' 235 'the minimum requirement is %d', 236 len(self.successfully_provisioned_duts), 237 self._provision_num_required) 238 return (len(self.successfully_provisioned_duts) >= 239 self._provision_num_required) 240 241 def is_finished_waiting(self): 242 """Check whether the suite should finish its waiting.""" 243 if self.is_provision(): 244 self._set_successful_provisioned_duts() 245 return (self.is_provision_successfully_finished() or 246 self._check_all_tasks_finished()) 247 248 return self._check_all_tasks_finished() 249 250 def _should_retry(self, test_result): 251 """Check whether a test should be retried. 252 253 We will retry a test if: 254 1. The test-level retry is enabled for this suite. 255 2. The test fails. 256 3. The test is currently monitored by the suite, i.e. 257 it's not a previous retried test. 258 4. The test has remaining retries based on JOB_RETRIES in 259 its control file. 260 5. The suite-level max retries isn't hit. 261 262 @param test_result: A json test result from swarming API. 263 264 @return True if we should retry the test. 265 """ 266 task_id = test_result['task_id'] 267 state = test_result['state'] 268 is_failure = test_result['failure'] 269 return (self._test_retry and 270 ((state == swarming_lib.TASK_COMPLETED and is_failure) 271 or (state in swarming_lib.TASK_STATUS_TO_RETRY)) 272 and (task_id in self._task_to_test_maps) 273 and (self._task_to_test_maps[task_id].remaining_retries > 0) 274 and (self._max_retries > 0)) 275 276 277class Suite(object): 278 """The class for a CrOS suite.""" 279 EXPIRATION_SECS = swarming_lib.DEFAULT_EXPIRATION_SECS 280 281 def __init__(self, spec, client): 282 """Initialize a suite. 283 284 @param spec: A SuiteSpec object. 285 @param client: A swarming_lib.Client instance. 286 """ 287 self._ds = None 288 289 self.control_file = '' 290 self.test_specs = [] 291 self.builds = spec.builds 292 self.test_source_build = spec.test_source_build 293 self.suite_name = spec.suite_name 294 self.suite_file_name = spec.suite_file_name 295 self.priority = spec.priority 296 self.board = spec.board 297 self.model = spec.model 298 self.pool = spec.pool 299 self.job_keyvals = spec.job_keyvals 300 self.minimum_duts = spec.minimum_duts 301 self.timeout_mins = spec.timeout_mins 302 self.quota_account = spec.quota_account 303 self._client = client 304 305 @property 306 def ds(self): 307 """Getter for private |self._ds| property. 308 309 This ensures that once self.ds is called, there's a devserver ready 310 for it. 311 """ 312 if self._ds is None: 313 raise errors.InValidPropertyError( 314 'Property self.ds is None. Please call stage_suite_artifacts() ' 315 'before calling it.') 316 317 return self._ds 318 319 def _get_cros_build(self): 320 provision = autotest.load('server.cros.provision') 321 return self.builds.get(provision.CROS_VERSION_PREFIX, 322 self.builds.values()[0]) 323 324 def _create_suite_keyvals(self): 325 constants = autotest.load('server.cros.dynamic_suite.constants') 326 provision = autotest.load('server.cros.provision') 327 cros_build = self._get_cros_build() 328 keyvals = { 329 constants.JOB_BUILD_KEY: cros_build, 330 constants.JOB_SUITE_KEY: self.suite_name, 331 constants.JOB_BUILDS_KEY: self.builds 332 } 333 if (cros_build != self.test_source_build or 334 len(self.builds) > 1): 335 keyvals[constants.JOB_TEST_SOURCE_BUILD_KEY] = ( 336 self.test_source_build) 337 for prefix, build in self.builds.iteritems(): 338 if prefix == provision.FW_RW_VERSION_PREFIX: 339 keyvals[constants.FWRW_BUILD]= build 340 elif prefix == provision.FW_RO_VERSION_PREFIX: 341 keyvals[constants.FWRO_BUILD] = build 342 343 for key in self.job_keyvals: 344 if key in constants.INHERITED_KEYVALS: 345 keyvals[key] = self.job_keyvals[key] 346 347 return keyvals 348 349 def prepare(self): 350 """Prepare a suite job for execution.""" 351 self._stage_suite_artifacts() 352 self._parse_suite_args() 353 keyvals = self._create_suite_keyvals() 354 available_bots = self._get_available_bots() 355 if len(available_bots) < self.minimum_duts: 356 raise errors.NoAvailableDUTsError( 357 self.board, self.pool, len(available_bots), 358 self.minimum_duts) 359 360 tests = self._find_tests(available_bots_num=len(available_bots)) 361 self.test_specs = self._get_test_specs(tests, available_bots, keyvals) 362 363 def _create_test_spec(self, test, keyvals, bot_id='', dut_name=''): 364 return TestSpec( 365 test=test, 366 priority=self.priority, 367 board=self.board, 368 model=self.model, 369 pool=self.pool, 370 build=self.test_source_build, 371 bot_id=bot_id, 372 dut_name=dut_name, 373 keyvals=keyvals, 374 expiration_secs=self.timeout_mins * 60, 375 grace_period_secs=swarming_lib.DEFAULT_TIMEOUT_SECS, 376 execution_timeout_secs=self.timeout_mins * 60, 377 io_timeout_secs=swarming_lib.DEFAULT_TIMEOUT_SECS, 378 quota_account=self.quota_account, 379 ) 380 381 def _get_test_specs(self, tests, available_bots, keyvals): 382 return [self._create_test_spec(test, keyvals) for test in tests] 383 384 def _stage_suite_artifacts(self): 385 """Stage suite control files and suite-to-tests mapping file. 386 387 @param build: The build to stage artifacts. 388 """ 389 suite_common = autotest.load('server.cros.dynamic_suite.suite_common') 390 ds, _ = suite_common.stage_build_artifacts(self.test_source_build) 391 self._ds = ds 392 393 def _parse_suite_args(self): 394 """Get the suite args. 395 396 The suite args includes: 397 a. suite args in suite control file. 398 b. passed-in suite args by user. 399 """ 400 suite_common = autotest.load('server.cros.dynamic_suite.suite_common') 401 self.control_file = suite_common.get_control_file_by_build( 402 self.test_source_build, self.ds, self.suite_file_name) 403 404 def _find_tests(self, available_bots_num=0): 405 """Fetch the child tests.""" 406 control_file_getter = autotest.load( 407 'server.cros.dynamic_suite.control_file_getter') 408 suite_common = autotest.load('server.cros.dynamic_suite.suite_common') 409 cf_getter = control_file_getter.DevServerGetter( 410 self.test_source_build, self.ds) 411 tests = suite_common.retrieve_for_suite( 412 cf_getter, self.suite_name) 413 return suite_common.filter_tests( 414 tests, suite_common.name_in_tag_predicate(self.suite_name)) 415 416 def _get_available_bots(self): 417 """Get available bots for suites.""" 418 dimensions = {'pool': swarming_lib.SKYLAB_DRONE_POOL, 419 'label-board': self.board} 420 swarming_pool_deps = swarming_lib.task_dependencies_from_labels( 421 ['pool:%s' % self.pool]) 422 dimensions.update(swarming_pool_deps) 423 bots = self._client.query_bots_list(dimensions) 424 return [bot for bot in bots if swarming_lib.bot_available(bot)] 425 426 427class ProvisionSuite(Suite): 428 """The class for a CrOS provision suite.""" 429 EXPIRATION_SECS = swarming_lib.DEFAULT_EXPIRATION_SECS 430 431 def __init__(self, spec, client): 432 super(ProvisionSuite, self).__init__(spec, client) 433 self._num_required = spec.suite_args['num_required'] 434 435 def _find_tests(self, available_bots_num=0): 436 """Fetch the child tests for provision suite.""" 437 control_file_getter = autotest.load( 438 'server.cros.dynamic_suite.control_file_getter') 439 suite_common = autotest.load('server.cros.dynamic_suite.suite_common') 440 cf_getter = control_file_getter.DevServerGetter( 441 self.test_source_build, self.ds) 442 dummy_test = suite_common.retrieve_control_data_for_test( 443 cf_getter, 'dummy_Pass') 444 logging.info('Get %d available DUTs for provision.', available_bots_num) 445 if available_bots_num < self._num_required: 446 logging.warning('Not enough available DUTs for provision.') 447 raise errors.NoAvailableDUTsError( 448 self.board, self.pool, available_bots_num, 449 self._num_required) 450 451 return [dummy_test] * max(self._num_required, available_bots_num) 452 453 def _get_test_specs(self, tests, available_bots, keyvals): 454 test_specs = [] 455 for idx, test in enumerate(tests): 456 if idx < len(available_bots): 457 bot = available_bots[idx] 458 test_specs.append(self._create_test_spec( 459 test, keyvals, bot_id=bot['bot_id'], 460 dut_name=swarming_lib.get_task_dut_name( 461 bot['dimensions']))) 462 else: 463 test_specs.append(self._create_test_spec(test, keyvals)) 464 465 return test_specs 466