1# Copyright 2018 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Module for swarming execution.""" 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11import json 12import logging 13import operator 14import os 15import urllib 16import uuid 17 18from lucifer import autotest 19from skylab_suite import errors 20 21 22DEFAULT_SERVICE_ACCOUNT = ( 23 '/creds/skylab_swarming_bot/skylab_bot_service_account.json') 24SKYLAB_DRONE_POOL = 'ChromeOSSkylab' 25SKYLAB_SUITE_POOL = 'ChromeOSSkylab-suite' 26 27TASK_COMPLETED = 'COMPLETED' 28TASK_COMPLETED_SUCCESS = 'COMPLETED (SUCCESS)' 29TASK_COMPLETED_FAILURE = 'COMPLETED (FAILURE)' 30TASK_EXPIRED = 'EXPIRED' 31TASK_CANCELED = 'CANCELED' 32TASK_TIMEDOUT = 'TIMED_OUT' 33TASK_RUNNING = 'RUNNING' 34TASK_PENDING = 'PENDING' 35TASK_BOT_DIED = 'BOT_DIED' 36TASK_NO_RESOURCE = 'NO_RESOURCE' 37TASK_KILLED = 'KILLED' 38TASK_FINISHED_STATUS = [TASK_COMPLETED, 39 TASK_EXPIRED, 40 TASK_CANCELED, 41 TASK_TIMEDOUT, 42 TASK_BOT_DIED, 43 TASK_NO_RESOURCE, 44 TASK_KILLED] 45# The swarming task failure status to retry. TASK_CANCELED won't get 46# retried since it's intentionally aborted. 47TASK_STATUS_TO_RETRY = [TASK_EXPIRED, TASK_TIMEDOUT, TASK_BOT_DIED, 48 TASK_NO_RESOURCE] 49 50DEFAULT_EXPIRATION_SECS = 10 * 60 51DEFAULT_TIMEOUT_SECS = 60 * 60 52 53# A mapping of priorities for skylab hwtest tasks. In swarming, 54# lower number means high priorities. Priority lower than 48 will 55# be special tasks. The upper bound for priority is 255. 56# Use the same priorities mapping as chromite/lib/constants.py 57SKYLAB_HWTEST_PRIORITIES_MAP = { 58 'Weekly': 230, 59 'CTS': 215, 60 'Daily': 200, 61 'PostBuild': 170, 62 'Default': 140, 63 'Build': 110, 64 'PFQ': 80, 65 'CQ': 50, 66 'Super': 49, 67} 68SORTED_SKYLAB_HWTEST_PRIORITY = sorted( 69 SKYLAB_HWTEST_PRIORITIES_MAP.items(), 70 key=operator.itemgetter(1)) 71 72SWARMING_DUT_READY_STATUS = 'ready' 73 74_STAINLESS_LOGS_BROWSER_URL_TEMPLATE = ( 75 "https://stainless.corp.google.com" 76 "/browse/chromeos-autotest-results/swarming-%(request_id)s/" 77) 78 79def _get_client_path(): 80 return os.path.join( 81 os.path.expanduser('~'), 82 'chromiumos/chromite/third_party/swarming.client/swarming.py') 83 84 85def task_dependencies_from_labels(labels): 86 """Parse dependencies from autotest labels. 87 88 @param labels: A list of label string. 89 90 @return a dict [key: value] to represent dependencies. 91 """ 92 translation_autotest = autotest.deps_load( 93 'skylab_inventory.translation.autotest') 94 translation_swarming = autotest.deps_load( 95 'skylab_inventory.translation.swarming') 96 dimensions = translation_swarming.labels_to_dimensions( 97 translation_autotest.from_autotest_labels(labels)) 98 dependencies = {} 99 for k, v in dimensions.iteritems(): 100 if isinstance(v, list): 101 if len(v) > 1: 102 raise ValueError( 103 'Invalid dependencies: Multiple value %r for key %s' % (k, v)) 104 105 dependencies[k] = v[0] 106 107 return dependencies 108 109 110def make_logdog_annotation_url(): 111 """Return a unique LogDog annotation URL. 112 113 If the appropriate LogDog server cannot be determined, return an 114 empty string. 115 """ 116 logdog_server = get_logdog_server() 117 if not logdog_server: 118 return '' 119 return ('logdog://%s/chromeos/skylab/%s/+/annotations' 120 % (logdog_server, uuid.uuid4().hex)) 121 122 123def get_swarming_server(): 124 """Return the swarming server for the current environment.""" 125 try: 126 return os.environ['SWARMING_SERVER'] 127 except KeyError: 128 raise errors.DroneEnvironmentError( 129 'SWARMING_SERVER environment variable not set' 130 ) 131 132 133def get_logdog_server(): 134 """Return the LogDog server for the current environment. 135 136 If the appropriate server cannot be determined, return an empty 137 string. 138 """ 139 try: 140 return os.environ['LOGDOG_SERVER'] 141 except KeyError: 142 raise errors.DroneEnvironmentError( 143 'LOGDOG_SERVER environment variable not set' 144 ) 145 146 147def _namedtuple_to_dict(value): 148 """Recursively converts a namedtuple to a dict. 149 150 Args: 151 value: a namedtuple object. 152 153 Returns: 154 A dict object with the same value. 155 """ 156 out = dict(value._asdict()) 157 for k, v in out.iteritems(): 158 if hasattr(v, '_asdict'): 159 out[k] = _namedtuple_to_dict(v) 160 elif isinstance(v, (list, tuple)): 161 l = [] 162 for elem in v: 163 if hasattr(elem, '_asdict'): 164 l.append(_namedtuple_to_dict(elem)) 165 else: 166 l.append(elem) 167 out[k] = l 168 169 return out 170 171 172def get_task_link(task_id): 173 return '%s/user/task/%s' % (os.environ.get('SWARMING_SERVER'), task_id) 174 175 176def get_stainless_logs_link(request_id): 177 """Gets a link to the stainless logs for a given task ID.""" 178 return _STAINLESS_LOGS_BROWSER_URL_TEMPLATE % { 179 'request_id': request_id, 180 } 181 182def get_task_final_state(task): 183 """Get the final state of a swarming task. 184 185 @param task: the json output of a swarming task fetched by API tasks.list. 186 """ 187 state = task['state'] 188 if state == TASK_COMPLETED: 189 state = (TASK_COMPLETED_FAILURE if task['failure'] else 190 TASK_COMPLETED_SUCCESS) 191 192 return state 193 194 195def get_task_dut_name(task_dimensions): 196 """Get the DUT name of running this task. 197 198 @param task_dimensions: a list of dict, e.g. [{'key': k, 'value': v}, ...] 199 """ 200 for dimension in task_dimensions: 201 if dimension['key'] == 'dut_name': 202 return dimension['value'][0] 203 204 return '' 205 206def bot_available(bot): 207 """Check whether a bot is available. 208 209 @param bot: A dict describes a bot's dimensions, i.e. an element in return 210 list of |query_bots_list|. 211 212 @return True if a bot is available to run task, otherwise False. 213 """ 214 return not (bot['is_dead'] or bot['quarantined']) 215 216 217class Client(object): 218 """Wrapper for interacting with swarming client.""" 219 220 # TODO(akeshet): Drop auth_json_path argument and use the same 221 # SWARMING_CREDS envvar that is used to select creds for skylab tool. 222 def __init__(self, auth_json_path=DEFAULT_SERVICE_ACCOUNT): 223 self._auth_json_path = auth_json_path 224 225 def query_task_by_tags(self, tags): 226 """Get tasks for given tags. 227 228 @param tags: A dict of tags for swarming tasks. 229 230 @return a list, which contains all tasks queried by the given tags. 231 """ 232 basic_swarming_cmd = self.get_basic_swarming_cmd('query') 233 conditions = [('tags', '%s:%s' % (k, v)) for k, v in tags.iteritems()] 234 swarming_cmd = basic_swarming_cmd + ['tasks/list?%s' % 235 urllib.urlencode(conditions)] 236 cros_build_lib = autotest.chromite_load('cros_build_lib') 237 result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True) 238 json_output = json.loads(result.output) 239 return json_output.get('items', []) 240 241 def query_task_by_id(self, task_id): 242 """Get task for given id. 243 244 @param task_id: A string to indicate a swarming task id. 245 246 @return a dict, which contains the task with the given task_id. 247 """ 248 basic_swarming_cmd = self.get_basic_swarming_cmd('query') 249 swarming_cmd = basic_swarming_cmd + ['task/%s/result' % task_id] 250 cros_build_lib = autotest.chromite_load('cros_build_lib') 251 result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True) 252 return json.loads(result.output) 253 254 def abort_task(self, task_id): 255 """Abort a swarming task by its id. 256 257 @param task_id: A string swarming task id. 258 """ 259 basic_swarming_cmd = self.get_basic_swarming_cmd('cancel') 260 swarming_cmd = basic_swarming_cmd + ['--kill-running', task_id] 261 cros_build_lib = autotest.chromite_load('cros_build_lib') 262 try: 263 cros_build_lib.RunCommand(swarming_cmd, log_output=True) 264 except cros_build_lib.RunCommandError: 265 logging.error('Task %s probably already gone, skip canceling it.', 266 task_id) 267 268 def query_bots_list(self, dimensions): 269 """Get bots list for given requirements. 270 271 @param dimensions: A dict of dimensions for swarming bots. 272 273 @return a list of bot dicts. 274 """ 275 basic_swarming_cmd = self.get_basic_swarming_cmd('query') 276 conditions = [('dimensions', '%s:%s' % (k, v)) 277 for k, v in dimensions.iteritems()] 278 swarming_cmd = basic_swarming_cmd + ['bots/list?%s' % 279 urllib.urlencode(conditions)] 280 cros_build_lib = autotest.chromite_load('cros_build_lib') 281 result = cros_build_lib.RunCommand(swarming_cmd, capture_output=True) 282 return json.loads(result.output).get('items', []) 283 284 def get_child_tasks(self, parent_task_id): 285 """Get the child tasks based on a parent swarming task id. 286 287 @param parent_task_id: The parent swarming task id. 288 289 @return a list of dicts, each dict refers to the whole stats of a task, 290 keys include 'name', 'bot_dimensions', 'tags', 'bot_id', 'state', 291 etc. 292 """ 293 swarming_cmd = self.get_basic_swarming_cmd('query') 294 swarming_cmd += ['tasks/list?tags=parent_task_id:%s' % parent_task_id] 295 timeout_util = autotest.chromite_load('timeout_util') 296 cros_build_lib = autotest.chromite_load('cros_build_lib') 297 with timeout_util.Timeout(60): 298 child_tasks = cros_build_lib.RunCommand( 299 swarming_cmd, capture_output=True) 300 return json.loads(child_tasks.output)['items'] 301 302 def get_basic_swarming_cmd(self, command): 303 cmd = [_get_client_path(), command, '--swarming', get_swarming_server()] 304 if self._auth_json_path: 305 cmd += ['--auth-service-account-json', self._auth_json_path] 306 return cmd 307 308