1#!/usr/bin/env python3 2# Copyright 2018 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Custom swarming base trigger class. 6 7This base class consolidates custom swarming triggering logic, to allow one bot 8to conceptually span multiple Swarming configurations, while lumping all trigger 9calls under one logical step. It also gives the subclasses the ability to 10define their own logic for pruning the configurations they want to trigger 11jobs on and what configurations to use. 12 13See perf_device_triggerer.py for an example of how to use this base class. 14 15""" 16 17import copy 18import json 19import os 20import subprocess 21import sys 22import tempfile 23import time 24import logging 25import six 26 27SRC_DIR = os.path.dirname( 28 os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 29 30# .exe on Windows. 31EXECUTABLE_SUFFIX = '.exe' if sys.platform == 'win32' else '' 32 33SWARMING_GO = os.path.join(SRC_DIR, 'tools', 'luci-go', 34 'swarming' + EXECUTABLE_SUFFIX) 35 36_A_WEEK_IN_SECONDS = 60 * 60 * 24 * 7 37 38 39def convert_to_go_swarming_args(args): 40 go_args = [] 41 i = 0 42 while i < len(args): 43 current_arg = args[i] 44 if current_arg == '--swarming': 45 current_arg = '--server' 46 go_args.append(current_arg) 47 i += 1 48 if current_arg == '--dimension': 49 go_args.append('{}={}'.format(args[i], args[i + 1])) 50 i += 2 51 return go_args 52 53 54def strip_unicode(obj): 55 """Recursively re-encodes strings as utf-8 inside |obj|. Returns the result. 56 """ 57 if isinstance(obj, six.text_type): 58 return obj.encode('utf-8', 'replace') 59 if isinstance(obj, list): 60 return list(map(strip_unicode, obj)) 61 62 if isinstance(obj, dict): 63 new_obj = type(obj)( 64 (strip_unicode(k), strip_unicode(v)) for k, v in obj.items()) 65 return new_obj 66 return obj 67 68 69class BaseTestTriggerer(object): # pylint: disable=useless-object-inheritance 70 71 def __init__(self): 72 self._bot_configs = None 73 self._bot_statuses = [] 74 self._total_bots = 0 75 76 def modify_args(self, 77 all_args, 78 bot_index, 79 shard_index, 80 total_shards, 81 temp_file, 82 shard_map=None): 83 """Modifies the given argument list. 84 85 Specifically, it does the following: 86 * Adds a --dump_json argument, to read in the results of the 87 individual trigger command. 88 * Adds the dimensions associated with the bot config at the given index. 89 * If the number of shards is greater than one, adds --env 90 arguments to set the GTEST_SHARD_INDEX and GTEST_TOTAL_SHARDS 91 environment variables to _shard_index_ and _total_shards_, 92 respectively. 93 94 The arguments are structured like this: 95 <args to swarming trigger> -- <args to bot running isolate> 96 This means we have to add arguments to specific locations in the argument 97 list, to either affect the trigger command, or what the bot runs. 98 99 """ 100 bot_args = ['--dump-json', temp_file] 101 if total_shards > 1: 102 bot_args.append('--env') 103 bot_args.append('GTEST_SHARD_INDEX=%s' % shard_index) 104 bot_args.append('--env') 105 bot_args.append('GTEST_TOTAL_SHARDS=%s' % total_shards) 106 if self._bot_configs: 107 for key, val in sorted(self._bot_configs[bot_index].items()): 108 bot_args.append('--dimension') 109 bot_args.append(key) 110 bot_args.append(val) 111 if '--' in all_args: 112 dash_ind = all_args.index('--') 113 additional_args = all_args[:dash_ind] + bot_args + all_args[ 114 dash_ind:] 115 else: 116 additional_args = all_args + bot_args 117 additional_args = self.append_additional_args(additional_args, 118 shard_index) 119 # crbug/1140389: debug print outs 120 logging.info('DEBUG: Before adding shardmap args: %s', additional_args) 121 if shard_map: 122 shard_map_str = json.dumps(shard_map, separators=(',', ':')) 123 shard_map_args = ['--use-dynamic-shards'] 124 shard_map_args.append('--dynamic-shardmap=%s' % shard_map_str) 125 additional_args += shard_map_args 126 return additional_args 127 128 def append_additional_args(self, args, shard_index): 129 """ Gives subclasses ability to append additional args if necessary 130 131 Base class just returns given args.""" 132 del shard_index # unused 133 return args 134 135 def parse_bot_configs(self, args): 136 try: 137 self._bot_configs = strip_unicode( 138 json.loads(args.multiple_trigger_configs)) 139 except json.JSONDecodeError as e: 140 raise ValueError( 141 'Error while parsing JSON from bot config string %s: %s' % 142 (args.multiple_trigger_configs, str(e))) from e 143 # Validate the input. 144 if not isinstance(self._bot_configs, list): 145 raise ValueError('Bot configurations must be a list, were: %s' % 146 args.multiple_trigger_configs) 147 if len(self._bot_configs) < 1: 148 raise ValueError( 149 'Bot configuration list must have at least one entry') 150 if not all(isinstance(entry, dict) for entry in self._bot_configs): 151 raise ValueError('Bot configurations must all be dictionaries') 152 153 def list_bots(self, dimensions, server='chromium-swarm.appspot.com'): 154 """List bots having specified bot dimensions. 155 156 Type of returned value is list of 157 https://source.chromium.org/search?q=%22class%20BotInfo(messages.Message)%22%20f:luci%2Fappengine%2Fswarming&ssfr=1 158 """ 159 160 args = [SWARMING_GO, 'bots', '-server', server] 161 162 for key in sorted(dimensions): 163 args.extend(['-dimension', '%s=%s' % (key, dimensions[key])]) 164 165 logging.info('Running Go `swarming` with args: %s', args) 166 167 with tempfile.NamedTemporaryFile(delete=False) as result_json: 168 result_json.close() 169 args.extend(['--json', result_json.name]) 170 subprocess.check_call(args) 171 with open(result_json.name) as f: 172 return json.load(f) 173 174 def list_tasks(self, tags, limit=None, server='chromium-swarm.appspot.com'): 175 """List bots having specified task tags. 176 177 Type of returned value is list of 178 https://source.chromium.org/search?q=%22class%20TaskResult(messages.Message):%22%20f:luci%2Fappengine%2Fswarming&ssfr=1 179 """ 180 181 args = [SWARMING_GO, 'tasks', '-server', server] 182 183 for tag in sorted(tags): 184 args.extend(['-tag', tag]) 185 186 # If a query uses a general dimension value, e.g., os:Mac, it will take 187 # forever. We now limited the time range to be within a week. 188 start_epoch_time = int(time.time()) - _A_WEEK_IN_SECONDS 189 args.extend(['-start', str(start_epoch_time)]) 190 191 if limit is not None: 192 args.extend(['-limit', str(limit)]) 193 194 logging.info('Running Go `swarming` with args: %s', args) 195 196 with tempfile.NamedTemporaryFile(delete=False) as result_json: 197 result_json.close() 198 args.extend(['-json', result_json.name]) 199 subprocess.check_call(args) 200 with open(result_json.name) as f: 201 return json.load(f) 202 203 def remove_swarming_dimension(self, args, dimension): 204 for i, argument in enumerate(args): 205 if argument == '--dimension' and args[i + 1] == dimension: 206 return args[:i] + args[i + 3:] 207 return args 208 209 def make_temp_file(self, prefix=None, suffix=None): 210 # This trick of closing the file handle is needed on Windows in order to 211 # make the file writeable. 212 h, temp_file = tempfile.mkstemp(prefix=prefix, suffix=suffix) 213 os.close(h) 214 return temp_file 215 216 def delete_temp_file(self, temp_file): 217 os.remove(temp_file) 218 219 def read_json_from_temp_file(self, temp_file): 220 with open(temp_file) as f: 221 return json.load(f) 222 223 def read_encoded_json_from_temp_file(self, temp_file): 224 return strip_unicode(self.read_json_from_temp_file(temp_file)) 225 226 def write_json_to_file(self, merged_json, output_file): 227 with open(output_file, 'w') as f: 228 json.dump(merged_json, f) 229 230 def run_swarming_go(self, 231 args, 232 json_path, 233 shard_index, 234 shards, 235 merged_json=None): 236 237 logging.info('Running Go `swarming` with args: %s', args) 238 239 if merged_json is None: 240 merged_json = {} 241 242 if 'tasks' not in merged_json: 243 merged_json['tasks'] = {} 244 245 ret = subprocess.call([SWARMING_GO] + convert_to_go_swarming_args(args)) 246 result_json = self.read_json_from_temp_file(json_path) 247 248 tasks = {} 249 for task in result_json['tasks']: 250 k = task['request']['task_id'] 251 tasks[k] = task['request'] 252 invocation = task.get('task_result', {}).get('resultdb_info', 253 {}).get('invocation') 254 if invocation: 255 tasks[k]['invocation'] = invocation 256 257 for k, v in tasks.items(): 258 v['shard_index'] = shard_index 259 merged_json['tasks'][k + ':%d:%d' % (shard_index, shards)] = v 260 self.write_json_to_file(merged_json, json_path) 261 return ret 262 263 def prune_test_specific_configs(self, args): 264 # Ability for base class to further prune configs to 265 # run tests on. 266 pass 267 268 def select_config_indices(self, args): 269 # Main implementation for base class to determine which bot config to 270 # trigger for each shard. 271 # 272 # Returns a list of tuples (shard_index, bot_config_index). 273 # bot_config_index is an index into self._bot_configs 274 pass 275 276 def indices_to_trigger(self, args): 277 """Returns the indices of the swarming shards that should be 278 triggered.""" 279 if args.shard_index is None: 280 return list(range(args.shards)) 281 return [args.shard_index] 282 283 def generate_shard_map(self, args, buildername, selected_config): 284 """Returns shard map generated on runtime if needed.""" 285 pass # pylint: disable=unnecessary-pass 286 287 def trigger_tasks(self, args, remaining): 288 """Triggers tasks for each bot. 289 290 Args: 291 args: Parsed arguments which we need to use. 292 remaining: The remainder of the arguments, which should be passed to 293 swarming.py calls. 294 295 Returns: 296 Exit code for the script. 297 """ 298 if args.multiple_dimension_script_verbose: 299 logging.basicConfig(level=logging.DEBUG) 300 301 # crbug/1140389: debug print outs 302 logging.info('DEBUG: init: %s', remaining) 303 304 self.parse_bot_configs(args) 305 # Prunes config list to the exact set of configurations to trigger jobs 306 # on. This logic is specific to the base class if they want to prune 307 # list further. 308 self.prune_test_specific_configs(args) 309 310 # In the remaining arguments, find the Swarming dimensions that are 311 # specified by the bot configs and remove them, because for each shard, 312 # we're going to select one of the bot configs and put all of its 313 # Swarming dimensions on the command line. 314 filtered_remaining_args = copy.deepcopy(remaining) 315 for config in self._bot_configs: 316 for k in config.keys(): 317 filtered_remaining_args = self.remove_swarming_dimension( 318 filtered_remaining_args, k) 319 # crbug/1140389: debug print outs 320 logging.info('DEBUG: After filtered: %s', filtered_remaining_args) 321 322 merged_json = {} 323 #pylint: disable=assignment-from-no-return 324 selected_config = self.select_config_indices(args) 325 shard_map = self.generate_shard_map( 326 args, self._findBuilderName(filtered_remaining_args), 327 selected_config) 328 #pylint: enable=assignment-from-no-return 329 # Choose selected configs for this run of the test suite. 330 for shard_index, bot_index in selected_config: 331 # For each shard that we're going to distribute, do the following: 332 # 1. Pick which bot configuration to use. 333 # 2. Insert that bot configuration's dimensions as command line 334 # arguments, and invoke "swarming.py trigger". 335 # Holds the results of the swarming.py trigger call. 336 try: 337 json_temp = self.make_temp_file( 338 prefix='base_trigger_dimensions', suffix='.json') 339 # crbug/1140389: debug print outs 340 logging.info('DEBUG: Before modify args: %s', 341 filtered_remaining_args) 342 args_to_pass = self.modify_args(filtered_remaining_args, 343 bot_index, shard_index, 344 args.shards, json_temp, 345 shard_map) 346 # crbug/1140389: debug print outs 347 logging.info('DEBUG: Before calling swarming: %s', args_to_pass) 348 ret = self.run_swarming_go(args_to_pass, json_temp, shard_index, 349 args.shards, merged_json) 350 if ret: 351 sys.stderr.write('Failed to trigger a task, aborting\n') 352 return ret 353 finally: 354 self.delete_temp_file(json_temp) 355 self.write_json_to_file(merged_json, args.dump_json) 356 return 0 357 358 # pylint: disable=inconsistent-return-statements 359 def _findBuilderName(self, args): 360 args_length = len(args) 361 for i in range(args_length): 362 if (args[i] == '--tag' and i < args_length - 1 363 and args[i + 1].startswith('buildername:')): 364 return args[i + 1].split(':', 1)[1] 365 366 # pylint: enable=inconsistent-return-statements 367 368 @staticmethod 369 def setup_parser_contract(parser): 370 parser.add_argument( 371 '--multiple-trigger-configs', 372 type=str, 373 required=False, 374 help='The Swarming configurations to trigger tasks on, ' 375 'in the form of a JSON array of dictionaries (these are' 376 ' Swarming dimension_sets). At least one entry is' 377 'required if you dont override parse_bot_configs') 378 parser.add_argument('--multiple-dimension-script-verbose', 379 type=bool, 380 default=False, 381 help='Turn on verbose logging') 382 parser.add_argument( 383 '--dump-json', 384 required=True, 385 help='(Swarming Trigger Script API) Where to dump the' 386 ' resulting json which indicates which tasks were' 387 ' triggered for which shards.') 388 parser.add_argument( 389 '--shards', 390 type=int, 391 default=1, 392 help='How many shards to trigger. Duplicated from the' 393 ' `swarming.py trigger` command.') 394 parser.add_argument('--shard-index', 395 type=int, 396 default=None, 397 help='Which shard to trigger. Duplicated from the ' 398 '`swarming.py trigger` command.') 399 return parser 400