• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2017 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Custom swarming triggering script.
6
7This script does custom swarming triggering logic, to enable device affinity
8for our bots, while lumping all trigger calls under one logical step.
9
10For the perf use case of device affinity, this script now enables soft device
11affinity.  This means that it tries to smartly allocate jobs to bots based
12on what is currently alive and what bot the task was last triggered on,
13preferring that last triggered bot if available.  If the
14--multiple-trigger-configs flag is specified than this script overrides
15the soft device affinity functionality in favor of the provided ids.
16
17The algorithm is roughly the following:
18
19Find eligible bots, healthy or not.
20  * Query swarming for eligible bots based on the dimensions passed in
21    on the swarming call.  Determine their health status based on
22    is not quarantied and is not is_dead
23
24Of the eligible bots determine what bot id to run the shard on.
25(Implementation in _select_config_indices_with_soft_affinity)
26  * First query swarming for the last task that ran that shard with
27    given dimensions.  Assuming they are returned with most recent first.
28  * Check if the bot id that ran that task is alive, if so trigger
29    on that bot again.
30  * If that bot isn't alive, allocate to another alive bot or if no
31    other alive bots exist, trigger on the same dead one.
32
33Scripts inheriting must have roughly the same command line interface as
34swarming.py trigger. It modifies it in the following ways:
35
36 * Intercepts the dump-json argument, and creates its own by combining the
37   results from each trigger call.
38 * Intercepts the dimensions from the swarming call and determines what bots
39   are healthy based on the above device affinity algorithm, and triggers
40 * Adds a tag to the swarming trigger job with the shard so we know the last
41   bot that ran this shard.
42
43This script is normally called from the swarming recipe module in tools/build.
44
45"""
46
47import argparse
48import copy
49import os
50import subprocess
51import sys
52import logging
53import random
54
55import base_test_triggerer
56
57SRC_DIR = os.path.dirname(
58    os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
59sys.path.append(os.path.join(SRC_DIR, 'tools', 'perf'))
60
61# //tools/perf imports.
62import generate_perf_sharding
63from core import bot_platforms
64
65
66class Bot(object):  # pylint: disable=useless-object-inheritance
67    """Eligible bots to run the task."""
68
69    def __init__(self, bot_id, is_alive):
70        self._bot_id = bot_id
71        self._is_alive = is_alive
72
73    def id(self):
74        return self._bot_id
75
76    def is_alive(self):
77        return self._is_alive
78
79    def as_json_config(self):
80        return {'id': self._bot_id}
81
82
83class PerfDeviceTriggerer(base_test_triggerer.BaseTestTriggerer):
84
85    def __init__(self, args, swarming_args):
86        # pylint: disable=super-with-arguments
87        super(PerfDeviceTriggerer, self).__init__()
88        # pylint: enable=super-with-arguments
89        self._sharded_query_failed = False
90
91        if not args.multiple_trigger_configs:
92            # Represents the list of current dimensions requested
93            # by the parent swarming job.
94            self._dimensions = self._get_swarming_dimensions(swarming_args)
95
96            # Store what swarming server we need and whether or not we need
97            # to send down authentication with it
98            self._swarming_server = self._get_swarming_server(swarming_args)
99
100            # Map of all existing bots in swarming that satisfy the current
101            # set of dimensions indexed by bot id.
102            # Note: this assumes perf bot dimensions are unique between
103            # configurations.
104            self._eligible_bots_by_ids = (
105                self._query_swarming_for_eligible_bot_configs(self._dimensions))
106
107        if args.multiple_dimension_script_verbose:
108            logging.basicConfig(level=logging.DEBUG)
109
110    def generate_shard_map(self, args, buildername, selected_config):
111        shard_map = None
112        num_of_shards = len(selected_config)
113        builder = bot_platforms.find_bot_platform(buildername)
114        if args.use_dynamic_shards and builder and num_of_shards:
115            logging.info(
116                'Generating dynamic shardmap for builder: %s with %d shards',
117                buildername, num_of_shards)
118            shard_map = generate_perf_sharding.GenerateShardMap(
119                builder=builder, num_of_shards=num_of_shards)
120            for shard_index, bot_index in selected_config:
121                bot_id = self._bot_configs[bot_index]['id']
122                shard_map['extra_infos']['bot #%s' % shard_index] = bot_id
123        return shard_map
124
125    def append_additional_args(self, args, shard_index):
126        # Append a tag to the swarming task with the shard number
127        # so we can query for the last bot that ran a specific shard.
128        tag = 'shard:%d' % shard_index
129        shard_tag = ['--tag', tag]
130        # Need to append this before the dash if present so it gets fed to
131        # the swarming task itself.
132        if '--' in args:
133            dash_ind = args.index('--')
134            return args[:dash_ind] + shard_tag + args[dash_ind:]
135        return args + shard_tag
136
137    def parse_bot_configs(self, args):
138        if args.multiple_trigger_configs:
139            # pylint: disable=super-with-arguments
140            super(PerfDeviceTriggerer, self).parse_bot_configs(args)
141            # pylint: enable=super-with-arguments
142        else:
143            self._bot_configs = []
144            # For each eligible bot, append the dimension
145            # to the eligible bot_configs
146            for _, bot in self._eligible_bots_by_ids.items():
147                self._bot_configs.append(bot.as_json_config())
148
149    def select_config_indices(self, args):
150        if args.multiple_trigger_configs:
151            configs = []
152            # If specific bot ids were passed in, we want to trigger a job for
153            # every valid config regardless of health status since
154            # each config represents exactly one bot in the perf swarming pool.
155            for index in range(len(self.indices_to_trigger(args))):
156                configs.append((index, index))
157        if args.use_dynamic_shards:
158            return self._select_config_indices_with_dynamic_sharding()
159        return self._select_config_indices_with_soft_affinity(args)
160
161    def _select_config_indices_with_dynamic_sharding(self):
162        alive_bot_ids = [
163            bot_id for bot_id, b in self._eligible_bots_by_ids.items()
164            if b.is_alive()
165        ]
166        trigger_count = len(alive_bot_ids)
167
168        indexes = list(range(trigger_count))
169        random.shuffle(indexes)
170        selected_config = [(indexes[i],
171                            self._find_bot_config_index(alive_bot_ids[i]))
172                           for i in range(trigger_count)]
173        selected_config.sort()
174
175        for shard_index, bot_index in selected_config:
176            logging.info('Shard %d\n\tBot: %s', shard_index,
177                         self._bot_configs[bot_index]['id'])
178
179        return selected_config
180
181    def _select_config_indices_with_soft_affinity(self, args):
182        trigger_count = len(self.indices_to_trigger(args))
183        # First make sure the number of shards doesn't exceed the
184        # number of eligible bots. This means there is a config error somewhere.
185        if trigger_count > len(self._eligible_bots_by_ids):
186            self._print_device_affinity_info({}, {}, self._eligible_bots_by_ids,
187                                             trigger_count)
188            raise ValueError(
189                'Not enough available machines exist in swarming '
190                'pool.  Shards requested (%d) exceeds available bots '
191                '(%d).' % (trigger_count, len(self._eligible_bots_by_ids)))
192
193        shard_to_bot_assignment_map = {}
194        unallocated_bots_by_ids = copy.deepcopy(self._eligible_bots_by_ids)
195        for shard_index in self.indices_to_trigger(args):
196            bot_id = self._query_swarming_for_last_shard_id(shard_index)
197            if bot_id and bot_id in unallocated_bots_by_ids:
198                bot = unallocated_bots_by_ids[bot_id]
199                shard_to_bot_assignment_map[shard_index] = bot
200                unallocated_bots_by_ids.pop(bot_id)
201            else:
202                shard_to_bot_assignment_map[shard_index] = None
203
204        # Maintain the current map for debugging purposes
205        existing_shard_bot_to_shard_map = copy.deepcopy(
206            shard_to_bot_assignment_map)
207        # Now create sets of remaining healthy and bad bots
208        unallocated_healthy_bots = {
209            b
210            for b in unallocated_bots_by_ids.values() if b.is_alive()
211        }
212        unallocated_bad_bots = {
213            b
214            for b in unallocated_bots_by_ids.values() if not b.is_alive()
215        }
216
217        # Try assigning healthy bots for new shards first.
218        for shard_index, bot in sorted(shard_to_bot_assignment_map.items()):
219            if not bot and unallocated_healthy_bots:
220                shard_to_bot_assignment_map[shard_index] = \
221                    unallocated_healthy_bots.pop()
222                logging.info('First time shard %d has been triggered',
223                             shard_index)
224            elif not bot:
225                shard_to_bot_assignment_map[
226                    shard_index] = unallocated_bad_bots.pop()
227
228        # Handle the rest of shards that were assigned dead bots:
229        for shard_index, bot in sorted(shard_to_bot_assignment_map.items()):
230            if not bot.is_alive() and unallocated_healthy_bots:
231                dead_bot = bot
232                healthy_bot = unallocated_healthy_bots.pop()
233                shard_to_bot_assignment_map[shard_index] = healthy_bot
234                logging.info(
235                    'Device affinity broken for shard #%d. bot %s is dead,'
236                    ' new mapping to bot %s', shard_index, dead_bot.id(),
237                    healthy_bot.id())
238
239        # Now populate the indices into the bot_configs array
240        selected_configs = []
241        for shard_index in self.indices_to_trigger(args):
242            selected_configs.append(
243                (shard_index,
244                 self._find_bot_config_index(
245                     shard_to_bot_assignment_map[shard_index].id())))
246        self._print_device_affinity_info(shard_to_bot_assignment_map,
247                                         existing_shard_bot_to_shard_map,
248                                         self._eligible_bots_by_ids,
249                                         trigger_count)
250        return selected_configs
251
252    def _print_device_affinity_info(self, new_map, existing_map, health_map,
253                                    num_shards):
254        logging.info('')
255        for shard_index in range(num_shards):
256            existing = existing_map.get(shard_index, None)
257            new = new_map.get(shard_index, None)
258            existing_id = ''
259            if existing:
260                existing_id = existing.id()
261            new_id = ''
262            if new:
263                new_id = new.id()
264            logging.info('Shard %d\n\tprevious: %s\n\tnew: %s', shard_index,
265                         existing_id, new_id)
266
267        healthy_bots = []
268        dead_bots = []
269        for _, b in health_map.items():
270            if b.is_alive():
271                healthy_bots.append(b.id())
272            else:
273                dead_bots.append(b.id())
274        logging.info('Shards needed: %d', num_shards)
275        logging.info('Total bots (dead + healthy): %d',
276                     len(dead_bots) + len(healthy_bots))
277        logging.info('Healthy bots, %d: %s', len(healthy_bots), healthy_bots)
278        logging.info('Dead Bots, %d: %s', len(dead_bots), dead_bots)
279        logging.info('')
280
281    def _query_swarming_for_eligible_bot_configs(self, dimensions):
282        """Query Swarming to figure out which bots are available.
283
284        Returns: a dictionary in which the keys are the bot id and
285        the values are Bot object that indicate the health status
286        of the bots.
287        """
288
289        query_result = self.list_bots(dimensions, server=self._swarming_server)
290        perf_bots = {}
291        for bot in query_result:
292            # Device maintenance is usually quick, and we can wait for it to
293            # finish. However, if the device is too hot, it can take a long time
294            # for it to cool down, so check for 'Device temperature' in
295            # maintenance_msg.
296            alive = (not bot.get('is_dead') and not bot.get('quarantined')
297                     and 'Device temperature' not in bot.get(
298                         'maintenance_msg', ''))
299            perf_bots[bot['bot_id']] = Bot(bot['bot_id'], alive)
300        return perf_bots
301
302    def _find_bot_config_index(self, bot_id):
303        # Find the index into the bot_config map that
304        # maps to the bot id in question
305        for i, dimensions in enumerate(self._bot_configs):
306            if dimensions['id'] == bot_id:
307                return i
308        return None
309
310    def _query_swarming_for_last_shard_id(self, shard_index):
311        """Per shard, query swarming for the last bot that ran the task.
312
313        Example: swarming.py query -S server-url.com --limit 1 \\
314          'tasks/list?tags=os:Windows&tags=pool:chrome.tests.perf&tags=shard:12'
315        """
316        values = ['%s:%s' % (k, v) for k, v in self._dimensions.items()]
317        values.sort()
318
319        # Append the shard as a tag
320        values_with_shard = list(values)
321        values_with_shard.append('%s:%s' % ('shard', str(shard_index)))
322        values_with_shard.sort()
323
324        # TODO(eyaich): For now we are ignoring the state of the returned
325        # task (ie completed, timed_out, bot_died, etc) as we are just
326        # answering the question "What bot did we last trigger this shard on?"
327        # Evaluate if this is the right decision going forward.
328
329        # Query for the last task that ran with these dimensions and this shard.
330        # Query with the shard param first. This will sometimes time out for
331        # queries we've never done before, so try querying without it if that
332        # happens.
333        try:
334            if not self._sharded_query_failed:
335                tasks = self.list_tasks(values_with_shard,
336                                        limit='1',
337                                        server=self._swarming_server)
338        except subprocess.CalledProcessError:
339            self._sharded_query_failed = True
340        if self._sharded_query_failed:
341            tasks = self.list_tasks(values,
342                                    limit='1',
343                                    server=self._swarming_server)
344
345        if tasks:
346            # We queried with a limit of 1 so we could only get back
347            # the most recent which is what we care about.
348            task = tasks[0]
349            if 'bot_id' in task:
350                return task['bot_id']
351            for tag in task['tags']:
352                if tag.startswith('id:'):
353                    return tag[len('id:'):]
354        # No eligible shard for this bot
355        return None
356
357    def _get_swarming_dimensions(self, args):
358        dimensions = {}
359        for i in range(len(args) - 2):
360            if args[i] == '--dimension':
361                dimensions[args[i + 1]] = args[i + 2]
362        return dimensions
363
364    # pylint: disable=inconsistent-return-statements
365    def _get_swarming_server(self, args):
366        for i, argument in enumerate(args):
367            if '--swarming' in argument:
368                server = args[i + 1]
369                slashes_index = server.index('//') + 2
370                # Strip out the protocol
371                return server[slashes_index:]
372
373    # pylint: enable=inconsistent-return-statements
374
375
376def main():
377    logging.basicConfig(level=logging.INFO,
378                        format='(%(levelname)s) %(asctime)s pid=%(process)d'
379                        '  %(module)s.%(funcName)s:%(lineno)d  %(message)s')
380    # Setup args for common contract of base class
381    parser = base_test_triggerer.BaseTestTriggerer.setup_parser_contract(
382        argparse.ArgumentParser(description=__doc__))
383    parser.add_argument('--use-dynamic-shards',
384                        action='store_true',
385                        required=False,
386                        help='Ignore --shards and the existing shard map. Will '
387                        'generate a shard map at run time and use as much '
388                        'device as possible.')
389    args, remaining = parser.parse_known_args()
390
391    triggerer = PerfDeviceTriggerer(args, remaining)
392    return triggerer.trigger_tasks(args, remaining)
393
394
395if __name__ == '__main__':
396    sys.exit(main())
397