1#!/usr/bin/env python3 2# Copyright 2017 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Custom swarming triggering script. 6 7This script does custom swarming triggering logic, to enable device affinity 8for our bots, while lumping all trigger calls under one logical step. 9 10For the perf use case of device affinity, this script now enables soft device 11affinity. This means that it tries to smartly allocate jobs to bots based 12on what is currently alive and what bot the task was last triggered on, 13preferring that last triggered bot if available. If the 14--multiple-trigger-configs flag is specified than this script overrides 15the soft device affinity functionality in favor of the provided ids. 16 17The algorithm is roughly the following: 18 19Find eligible bots, healthy or not. 20 * Query swarming for eligible bots based on the dimensions passed in 21 on the swarming call. Determine their health status based on 22 is not quarantied and is not is_dead 23 24Of the eligible bots determine what bot id to run the shard on. 25(Implementation in _select_config_indices_with_soft_affinity) 26 * First query swarming for the last task that ran that shard with 27 given dimensions. Assuming they are returned with most recent first. 28 * Check if the bot id that ran that task is alive, if so trigger 29 on that bot again. 30 * If that bot isn't alive, allocate to another alive bot or if no 31 other alive bots exist, trigger on the same dead one. 32 33Scripts inheriting must have roughly the same command line interface as 34swarming.py trigger. It modifies it in the following ways: 35 36 * Intercepts the dump-json argument, and creates its own by combining the 37 results from each trigger call. 38 * Intercepts the dimensions from the swarming call and determines what bots 39 are healthy based on the above device affinity algorithm, and triggers 40 * Adds a tag to the swarming trigger job with the shard so we know the last 41 bot that ran this shard. 42 43This script is normally called from the swarming recipe module in tools/build. 44 45""" 46 47import argparse 48import copy 49import os 50import subprocess 51import sys 52import logging 53import random 54 55import base_test_triggerer 56 57SRC_DIR = os.path.dirname( 58 os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 59sys.path.append(os.path.join(SRC_DIR, 'tools', 'perf')) 60 61# //tools/perf imports. 62import generate_perf_sharding 63from core import bot_platforms 64 65 66class Bot(object): # pylint: disable=useless-object-inheritance 67 """Eligible bots to run the task.""" 68 69 def __init__(self, bot_id, is_alive): 70 self._bot_id = bot_id 71 self._is_alive = is_alive 72 73 def id(self): 74 return self._bot_id 75 76 def is_alive(self): 77 return self._is_alive 78 79 def as_json_config(self): 80 return {'id': self._bot_id} 81 82 83class PerfDeviceTriggerer(base_test_triggerer.BaseTestTriggerer): 84 85 def __init__(self, args, swarming_args): 86 # pylint: disable=super-with-arguments 87 super(PerfDeviceTriggerer, self).__init__() 88 # pylint: enable=super-with-arguments 89 self._sharded_query_failed = False 90 91 if not args.multiple_trigger_configs: 92 # Represents the list of current dimensions requested 93 # by the parent swarming job. 94 self._dimensions = self._get_swarming_dimensions(swarming_args) 95 96 # Store what swarming server we need and whether or not we need 97 # to send down authentication with it 98 self._swarming_server = self._get_swarming_server(swarming_args) 99 100 # Map of all existing bots in swarming that satisfy the current 101 # set of dimensions indexed by bot id. 102 # Note: this assumes perf bot dimensions are unique between 103 # configurations. 104 self._eligible_bots_by_ids = ( 105 self._query_swarming_for_eligible_bot_configs(self._dimensions)) 106 107 if args.multiple_dimension_script_verbose: 108 logging.basicConfig(level=logging.DEBUG) 109 110 def generate_shard_map(self, args, buildername, selected_config): 111 shard_map = None 112 num_of_shards = len(selected_config) 113 builder = bot_platforms.find_bot_platform(buildername) 114 if args.use_dynamic_shards and builder and num_of_shards: 115 logging.info( 116 'Generating dynamic shardmap for builder: %s with %d shards', 117 buildername, num_of_shards) 118 shard_map = generate_perf_sharding.GenerateShardMap( 119 builder=builder, num_of_shards=num_of_shards) 120 for shard_index, bot_index in selected_config: 121 bot_id = self._bot_configs[bot_index]['id'] 122 shard_map['extra_infos']['bot #%s' % shard_index] = bot_id 123 return shard_map 124 125 def append_additional_args(self, args, shard_index): 126 # Append a tag to the swarming task with the shard number 127 # so we can query for the last bot that ran a specific shard. 128 tag = 'shard:%d' % shard_index 129 shard_tag = ['--tag', tag] 130 # Need to append this before the dash if present so it gets fed to 131 # the swarming task itself. 132 if '--' in args: 133 dash_ind = args.index('--') 134 return args[:dash_ind] + shard_tag + args[dash_ind:] 135 return args + shard_tag 136 137 def parse_bot_configs(self, args): 138 if args.multiple_trigger_configs: 139 # pylint: disable=super-with-arguments 140 super(PerfDeviceTriggerer, self).parse_bot_configs(args) 141 # pylint: enable=super-with-arguments 142 else: 143 self._bot_configs = [] 144 # For each eligible bot, append the dimension 145 # to the eligible bot_configs 146 for _, bot in self._eligible_bots_by_ids.items(): 147 self._bot_configs.append(bot.as_json_config()) 148 149 def select_config_indices(self, args): 150 if args.multiple_trigger_configs: 151 configs = [] 152 # If specific bot ids were passed in, we want to trigger a job for 153 # every valid config regardless of health status since 154 # each config represents exactly one bot in the perf swarming pool. 155 for index in range(len(self.indices_to_trigger(args))): 156 configs.append((index, index)) 157 if args.use_dynamic_shards: 158 return self._select_config_indices_with_dynamic_sharding() 159 return self._select_config_indices_with_soft_affinity(args) 160 161 def _select_config_indices_with_dynamic_sharding(self): 162 alive_bot_ids = [ 163 bot_id for bot_id, b in self._eligible_bots_by_ids.items() 164 if b.is_alive() 165 ] 166 trigger_count = len(alive_bot_ids) 167 168 indexes = list(range(trigger_count)) 169 random.shuffle(indexes) 170 selected_config = [(indexes[i], 171 self._find_bot_config_index(alive_bot_ids[i])) 172 for i in range(trigger_count)] 173 selected_config.sort() 174 175 for shard_index, bot_index in selected_config: 176 logging.info('Shard %d\n\tBot: %s', shard_index, 177 self._bot_configs[bot_index]['id']) 178 179 return selected_config 180 181 def _select_config_indices_with_soft_affinity(self, args): 182 trigger_count = len(self.indices_to_trigger(args)) 183 # First make sure the number of shards doesn't exceed the 184 # number of eligible bots. This means there is a config error somewhere. 185 if trigger_count > len(self._eligible_bots_by_ids): 186 self._print_device_affinity_info({}, {}, self._eligible_bots_by_ids, 187 trigger_count) 188 raise ValueError( 189 'Not enough available machines exist in swarming ' 190 'pool. Shards requested (%d) exceeds available bots ' 191 '(%d).' % (trigger_count, len(self._eligible_bots_by_ids))) 192 193 shard_to_bot_assignment_map = {} 194 unallocated_bots_by_ids = copy.deepcopy(self._eligible_bots_by_ids) 195 for shard_index in self.indices_to_trigger(args): 196 bot_id = self._query_swarming_for_last_shard_id(shard_index) 197 if bot_id and bot_id in unallocated_bots_by_ids: 198 bot = unallocated_bots_by_ids[bot_id] 199 shard_to_bot_assignment_map[shard_index] = bot 200 unallocated_bots_by_ids.pop(bot_id) 201 else: 202 shard_to_bot_assignment_map[shard_index] = None 203 204 # Maintain the current map for debugging purposes 205 existing_shard_bot_to_shard_map = copy.deepcopy( 206 shard_to_bot_assignment_map) 207 # Now create sets of remaining healthy and bad bots 208 unallocated_healthy_bots = { 209 b 210 for b in unallocated_bots_by_ids.values() if b.is_alive() 211 } 212 unallocated_bad_bots = { 213 b 214 for b in unallocated_bots_by_ids.values() if not b.is_alive() 215 } 216 217 # Try assigning healthy bots for new shards first. 218 for shard_index, bot in sorted(shard_to_bot_assignment_map.items()): 219 if not bot and unallocated_healthy_bots: 220 shard_to_bot_assignment_map[shard_index] = \ 221 unallocated_healthy_bots.pop() 222 logging.info('First time shard %d has been triggered', 223 shard_index) 224 elif not bot: 225 shard_to_bot_assignment_map[ 226 shard_index] = unallocated_bad_bots.pop() 227 228 # Handle the rest of shards that were assigned dead bots: 229 for shard_index, bot in sorted(shard_to_bot_assignment_map.items()): 230 if not bot.is_alive() and unallocated_healthy_bots: 231 dead_bot = bot 232 healthy_bot = unallocated_healthy_bots.pop() 233 shard_to_bot_assignment_map[shard_index] = healthy_bot 234 logging.info( 235 'Device affinity broken for shard #%d. bot %s is dead,' 236 ' new mapping to bot %s', shard_index, dead_bot.id(), 237 healthy_bot.id()) 238 239 # Now populate the indices into the bot_configs array 240 selected_configs = [] 241 for shard_index in self.indices_to_trigger(args): 242 selected_configs.append( 243 (shard_index, 244 self._find_bot_config_index( 245 shard_to_bot_assignment_map[shard_index].id()))) 246 self._print_device_affinity_info(shard_to_bot_assignment_map, 247 existing_shard_bot_to_shard_map, 248 self._eligible_bots_by_ids, 249 trigger_count) 250 return selected_configs 251 252 def _print_device_affinity_info(self, new_map, existing_map, health_map, 253 num_shards): 254 logging.info('') 255 for shard_index in range(num_shards): 256 existing = existing_map.get(shard_index, None) 257 new = new_map.get(shard_index, None) 258 existing_id = '' 259 if existing: 260 existing_id = existing.id() 261 new_id = '' 262 if new: 263 new_id = new.id() 264 logging.info('Shard %d\n\tprevious: %s\n\tnew: %s', shard_index, 265 existing_id, new_id) 266 267 healthy_bots = [] 268 dead_bots = [] 269 for _, b in health_map.items(): 270 if b.is_alive(): 271 healthy_bots.append(b.id()) 272 else: 273 dead_bots.append(b.id()) 274 logging.info('Shards needed: %d', num_shards) 275 logging.info('Total bots (dead + healthy): %d', 276 len(dead_bots) + len(healthy_bots)) 277 logging.info('Healthy bots, %d: %s', len(healthy_bots), healthy_bots) 278 logging.info('Dead Bots, %d: %s', len(dead_bots), dead_bots) 279 logging.info('') 280 281 def _query_swarming_for_eligible_bot_configs(self, dimensions): 282 """Query Swarming to figure out which bots are available. 283 284 Returns: a dictionary in which the keys are the bot id and 285 the values are Bot object that indicate the health status 286 of the bots. 287 """ 288 289 query_result = self.list_bots(dimensions, server=self._swarming_server) 290 perf_bots = {} 291 for bot in query_result: 292 # Device maintenance is usually quick, and we can wait for it to 293 # finish. However, if the device is too hot, it can take a long time 294 # for it to cool down, so check for 'Device temperature' in 295 # maintenance_msg. 296 alive = (not bot.get('is_dead') and not bot.get('quarantined') 297 and 'Device temperature' not in bot.get( 298 'maintenance_msg', '')) 299 perf_bots[bot['bot_id']] = Bot(bot['bot_id'], alive) 300 return perf_bots 301 302 def _find_bot_config_index(self, bot_id): 303 # Find the index into the bot_config map that 304 # maps to the bot id in question 305 for i, dimensions in enumerate(self._bot_configs): 306 if dimensions['id'] == bot_id: 307 return i 308 return None 309 310 def _query_swarming_for_last_shard_id(self, shard_index): 311 """Per shard, query swarming for the last bot that ran the task. 312 313 Example: swarming.py query -S server-url.com --limit 1 \\ 314 'tasks/list?tags=os:Windows&tags=pool:chrome.tests.perf&tags=shard:12' 315 """ 316 values = ['%s:%s' % (k, v) for k, v in self._dimensions.items()] 317 values.sort() 318 319 # Append the shard as a tag 320 values_with_shard = list(values) 321 values_with_shard.append('%s:%s' % ('shard', str(shard_index))) 322 values_with_shard.sort() 323 324 # TODO(eyaich): For now we are ignoring the state of the returned 325 # task (ie completed, timed_out, bot_died, etc) as we are just 326 # answering the question "What bot did we last trigger this shard on?" 327 # Evaluate if this is the right decision going forward. 328 329 # Query for the last task that ran with these dimensions and this shard. 330 # Query with the shard param first. This will sometimes time out for 331 # queries we've never done before, so try querying without it if that 332 # happens. 333 try: 334 if not self._sharded_query_failed: 335 tasks = self.list_tasks(values_with_shard, 336 limit='1', 337 server=self._swarming_server) 338 except subprocess.CalledProcessError: 339 self._sharded_query_failed = True 340 if self._sharded_query_failed: 341 tasks = self.list_tasks(values, 342 limit='1', 343 server=self._swarming_server) 344 345 if tasks: 346 # We queried with a limit of 1 so we could only get back 347 # the most recent which is what we care about. 348 task = tasks[0] 349 if 'bot_id' in task: 350 return task['bot_id'] 351 for tag in task['tags']: 352 if tag.startswith('id:'): 353 return tag[len('id:'):] 354 # No eligible shard for this bot 355 return None 356 357 def _get_swarming_dimensions(self, args): 358 dimensions = {} 359 for i in range(len(args) - 2): 360 if args[i] == '--dimension': 361 dimensions[args[i + 1]] = args[i + 2] 362 return dimensions 363 364 # pylint: disable=inconsistent-return-statements 365 def _get_swarming_server(self, args): 366 for i, argument in enumerate(args): 367 if '--swarming' in argument: 368 server = args[i + 1] 369 slashes_index = server.index('//') + 2 370 # Strip out the protocol 371 return server[slashes_index:] 372 373 # pylint: enable=inconsistent-return-statements 374 375 376def main(): 377 logging.basicConfig(level=logging.INFO, 378 format='(%(levelname)s) %(asctime)s pid=%(process)d' 379 ' %(module)s.%(funcName)s:%(lineno)d %(message)s') 380 # Setup args for common contract of base class 381 parser = base_test_triggerer.BaseTestTriggerer.setup_parser_contract( 382 argparse.ArgumentParser(description=__doc__)) 383 parser.add_argument('--use-dynamic-shards', 384 action='store_true', 385 required=False, 386 help='Ignore --shards and the existing shard map. Will ' 387 'generate a shard map at run time and use as much ' 388 'device as possible.') 389 args, remaining = parser.parse_known_args() 390 391 triggerer = PerfDeviceTriggerer(args, remaining) 392 return triggerer.trigger_tasks(args, remaining) 393 394 395if __name__ == '__main__': 396 sys.exit(main()) 397