1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import datetime 6import functools 7import hashlib 8import logging 9import os.path 10 11from recipe_engine import config_types 12from recipe_engine import recipe_api 13from recipe_engine import util as recipe_util 14 15import state 16 17 18# TODO(borenet): This module was copied from build.git and heavily modified to 19# remove dependencies on other modules in build.git. It belongs in a different 20# repo. Remove this once it has been moved. 21 22 23# Minimally supported version of swarming.py script (reported by --version). 24MINIMAL_SWARMING_VERSION = (0, 8, 6) 25 26 27def text_for_task(task): 28 lines = [] 29 30 if task.dimensions.get('id'): # pragma: no cover 31 lines.append('Bot id: %r' % task.dimensions['id']) 32 if task.dimensions.get('os'): 33 lines.append('Run on OS: %r' % task.dimensions['os']) 34 35 return '<br/>'.join(lines) 36 37 38def parse_time(value): 39 """Converts serialized time from the API to datetime.datetime.""" 40 # When microseconds are 0, the '.123456' suffix is elided. This means the 41 # serialized format is not consistent, which confuses the hell out of python. 42 # TODO(maruel): Remove third format once we enforce version >=0.8.2. 43 for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S'): 44 try: 45 return datetime.datetime.strptime(value, fmt) 46 except ValueError: # pragma: no cover 47 pass 48 raise ValueError('Failed to parse %s' % value) # pragma: no cover 49 50 51class ReadOnlyDict(dict): 52 def __setitem__(self, key, value): 53 raise TypeError('ReadOnlyDict is immutable') 54 55 56class SwarmingApi(recipe_api.RecipeApi): 57 """Recipe module to use swarming.py tool to run tasks on Swarming. 58 59 General usage: 60 1. Tweak default task parameters applied to all swarming tasks (such as 61 default_dimensions and default_priority). 62 2. Isolate some test using 'isolate' recipe module. Get isolated hash as 63 a result of that process. 64 3. Create a task configuration using 'task(...)' method, providing 65 isolated hash obtained previously. 66 4. Tweak the task parameters. This step is optional. 67 5. Launch the task on swarming by calling 'trigger_task(...)'. 68 6. Continue doing useful work locally while the task is running concurrently 69 on swarming. 70 7. Wait for task to finish and collect its result (exit code, logs) 71 by calling 'collect_task(...)'. 72 73 See also example.py for concrete code. 74 """ 75 76 State = state.State 77 78 ############################################################################# 79 # The below are helper functions to help transition between the old and new # 80 # swarming result formats. TODO(martiniss): remove these # 81 ############################################################################# 82 83 def _is_expired(self, shard): 84 # FIXME: We really should only have one format for enums. We want to move to 85 # strings, currently have numbers. 86 return ( 87 shard.get('state') == self.State.EXPIRED or 88 shard.get('state') == 'EXPIRED') 89 90 def _is_timed_out(self, shard): 91 # FIXME: We really should only have one format for enums. We want to move to 92 # strings, currently have numbers. 93 return ( 94 shard.get('state') == self.State.TIMED_OUT or 95 shard.get('state') == 'TIMED_OUT') 96 97 def _get_exit_code(self, shard): 98 if shard.get('exit_code'): 99 return shard.get('exit_code') # pragma: no cover 100 lst = shard.get('exit_codes', []) 101 return str(lst[0]) if lst else None 102 103 def __init__(self, **kwargs): 104 super(SwarmingApi, self).__init__(**kwargs) 105 # All tests default to a x86-64 bot running with no GPU. This simplifies 106 # management so that new tests are not executed on exotic bots by accidents 107 # even if misconfigured. 108 self._default_dimensions = { 109 'cpu': 'x86-64', 110 'gpu': 'none', 111 } 112 # Expirations are set to mildly good values and will be tightened soon. 113 self._default_expiration = 60*60 114 self._default_env = {} 115 self._default_hard_timeout = 60*60 116 self._default_idempotent = False 117 self._default_io_timeout = 20*60 118 # The default priority is extremely low and should be increased dependending 119 # on the type of task. 120 self._default_priority = 200 121 self._default_tags = set() 122 self._default_user = None 123 self._pending_tasks = set() 124 self._show_isolated_out_in_collect_step = True 125 self._show_shards_in_collect_step = False 126 self._swarming_server = 'https://chromium-swarm.appspot.com' 127 self._verbose = False 128 129 @recipe_util.returns_placeholder 130 def summary(self): 131 return self.m.json.output() 132 133 @property 134 def swarming_server(self): 135 """URL of Swarming server to use, default is a production one.""" 136 return self._swarming_server 137 138 @swarming_server.setter 139 def swarming_server(self, value): 140 """Changes URL of Swarming server to use.""" 141 self._swarming_server = value 142 143 @property 144 def verbose(self): 145 """True to run swarming scripts with verbose output.""" 146 return self._verbose 147 148 @verbose.setter 149 def verbose(self, value): 150 """Enables or disables verbose output in swarming scripts.""" 151 assert isinstance(value, bool), value 152 self._verbose = value 153 154 @property 155 def default_expiration(self): 156 """Number of seconds that the server will wait to find a bot able to run the 157 task. 158 159 If not bot runs the task by this number of seconds, the task is canceled as 160 EXPIRED. 161 162 This value can be changed per individual task. 163 """ 164 return self._default_expiration 165 166 @default_expiration.setter 167 def default_expiration(self, value): 168 assert 30 <= value <= 24*60*60, value 169 self._default_expiration = value 170 171 @property 172 def default_hard_timeout(self): 173 """Number of seconds in which the task must complete. 174 175 If the task takes more than this amount of time, the process is assumed to 176 be hung. It forcibly killed via SIGTERM then SIGKILL after a grace period 177 (default: 30s). Then the task is marked as TIMED_OUT. 178 179 This value can be changed per individual task. 180 """ 181 return self._default_hard_timeout 182 183 @default_hard_timeout.setter 184 def default_hard_timeout(self, value): 185 assert 30 <= value <= 6*60*60, value 186 self._default_hard_timeout = value 187 188 @property 189 def default_io_timeout(self): 190 """Number of seconds at which interval the task must write to stdout or 191 stderr. 192 193 If the task takes more than this amount of time between writes to stdout or 194 stderr, the process is assumed to be hung. It forcibly killed via SIGTERM 195 then SIGKILL after a grace period (default: 30s). Then the task is marked as 196 TIMED_OUT. 197 198 This value can be changed per individual task. 199 """ 200 return self._default_io_timeout 201 202 @default_io_timeout.setter 203 def default_io_timeout(self, value): 204 assert 30 <= value <= 6*60*60, value 205 self._default_io_timeout = value 206 207 @property 208 def default_idempotent(self): 209 """Bool to specify if task deduplication can be done. 210 211 When set, the server will search for another task that ran in the last days 212 that had the exact same properties. If it finds one, the task will not be 213 run at all, the previous results will be returned as-is. 214 215 For more infos, see: 216 https://github.com/luci/luci-py/blob/master/appengine/swarming/doc/User-Guide.md#task-idempotency 217 218 This value can be changed per individual task. 219 """ 220 return self._default_idempotent 221 222 @default_idempotent.setter 223 def default_idempotent(self, value): 224 assert isinstance(value, bool), value 225 self._default_idempotent = value 226 227 @property 228 def default_user(self): 229 """String to represent who triggered the task. 230 231 The user should be an email address when someone requested testing via 232 pre-commit or manual testing. 233 234 This value can be changed per individual task. 235 """ 236 return self._default_user 237 238 @default_user.setter 239 def default_user(self, value): 240 assert value is None or isinstance(value, basestring), value 241 self._default_user = value 242 243 @property 244 def default_dimensions(self): 245 """Returns a copy of the default Swarming dimensions to run task on. 246 247 The dimensions are what is used to filter which bots are able to run the 248 task successfully. This is particularly useful to discern between OS 249 versions, type of CPU, GPU card or VM, or preallocated pool. 250 251 Example: 252 {'cpu': 'x86-64', 'os': 'Windows-XP-SP3'} 253 254 This value can be changed per individual task. 255 """ 256 return ReadOnlyDict(self._default_dimensions) 257 258 def set_default_dimension(self, key, value): 259 assert isinstance(key, basestring), key 260 assert isinstance(value, basestring) or value is None, value 261 if value is None: 262 self._default_dimensions.pop(key, None) 263 else: 264 self._default_dimensions[key] = value # pragma: no cover 265 266 @property 267 def default_env(self): 268 """Returns a copy of the default environment variable to run tasks with. 269 270 By default the environment variable is not modified. Additional environment 271 variables can be specified for each task. 272 273 This value can be changed per individual task. 274 """ 275 return ReadOnlyDict(self._default_env) 276 277 def set_default_env(self, key, value): 278 assert isinstance(key, basestring), key 279 assert isinstance(value, basestring), value 280 self._default_env[key] = value 281 282 @property 283 def default_priority(self): 284 """Swarming task priority for tasks triggered from the recipe. 285 286 Priority ranges from 1 to 255. The lower the value, the most important the 287 task is and will preempty any task with a lower priority. 288 289 This value can be changed per individual task. 290 """ 291 return self._default_priority 292 293 @default_priority.setter 294 def default_priority(self, value): 295 assert 1 <= value <= 255 296 self._default_priority = value 297 298 def add_default_tag(self, tag): 299 """Adds a tag to the Swarming tasks triggered. 300 301 Tags are used for maintenance, they can be used to calculate the number of 302 tasks run for a day to calculate the cost of a type of type (CQ, ASAN, etc). 303 304 Tags can be added per individual task. 305 """ 306 assert ':' in tag, tag 307 self._default_tags.add(tag) 308 309 @property 310 def show_isolated_out_in_collect_step(self): 311 """Show the shard's isolated out link in each collect step.""" 312 return self._show_isolated_out_in_collect_step 313 314 @show_isolated_out_in_collect_step.setter 315 def show_isolated_out_in_collect_step(self, value): 316 self._show_isolated_out_in_collect_step = value 317 318 @property 319 def show_shards_in_collect_step(self): 320 """Show the shard link in each collect step.""" 321 return self._show_shards_in_collect_step 322 323 @show_shards_in_collect_step.setter 324 def show_shards_in_collect_step(self, value): 325 self._show_shards_in_collect_step = value 326 327 @staticmethod 328 def prefered_os_dimension(platform): 329 """Given a platform name returns the prefered Swarming OS dimension. 330 331 Platform name is usually provided by 'platform' recipe module, it's one 332 of 'win', 'linux', 'mac'. This function returns more concrete Swarming OS 333 dimension that represent this platform on Swarming by default. 334 335 Recipes are free to use other OS dimension if there's a need for it. For 336 example WinXP try bot recipe may explicitly specify 'Windows-XP-SP3' 337 dimension. 338 """ 339 return { 340 'linux': 'Ubuntu-14.04', 341 'mac': 'Mac-10.9', 342 'win': 'Windows-7-SP1', 343 }[platform] 344 345 def task(self, title, isolated_hash, ignore_task_failure=False, shards=1, 346 task_output_dir=None, extra_args=None, idempotent=None, 347 cipd_packages=None, build_properties=None, merge=None): 348 """Returns a new SwarmingTask instance to run an isolated executable on 349 Swarming. 350 351 For google test executables, use gtest_task() instead. 352 353 At the time of this writting, this code is used by V8, Skia and iOS. 354 355 The return value can be customized if necessary (see SwarmingTask class 356 below). Pass it to 'trigger_task' to launch it on swarming. Later pass the 357 same instance to 'collect_task' to wait for the task to finish and fetch its 358 results. 359 360 Args: 361 title: name of the test, used as part of a task ID. 362 isolated_hash: hash of isolated test on isolate server, the test should 363 be already isolated there, see 'isolate' recipe module. 364 ignore_task_failure: whether to ignore the test failure of swarming 365 tasks. By default, this is set to False. 366 shards: if defined, the number of shards to use for the task. By default 367 this value is either 1 or based on the title. 368 task_output_dir: if defined, the directory where task results are placed. 369 The caller is responsible for removing this folder when finished. 370 extra_args: list of command line arguments to pass to isolated tasks. 371 idempotent: whether this task is considered idempotent. Defaults 372 to self.default_idempotent if not specified. 373 cipd_packages: list of 3-tuples corresponding to CIPD packages needed for 374 the task: ('path', 'package_name', 'version'), defined as follows: 375 path: Path relative to the Swarming root dir in which to install 376 the package. 377 package_name: Name of the package to install, 378 eg. "infra/tools/authutil/${platform}" 379 version: Version of the package, either a package instance ID, 380 ref, or tag key/value pair. 381 build_properties: An optional dict containing various build properties. 382 These are typically but not necessarily the properties emitted by 383 bot_update. 384 merge: An optional dict containing: 385 "script": path to a script to call to post process and merge the 386 collected outputs from the tasks. The script should take one 387 named (but required) parameter, '-o' (for output), that represents 388 the path that the merged results should be written to, and accept 389 N additional paths to result files to merge. The merged results 390 should be in the JSON Results File Format 391 (https://www.chromium.org/developers/the-json-test-results-format) 392 and may optionally contain a top level "links" field that 393 may contain a dict mapping link text to URLs, for a set of 394 links that will be included in the buildbot output. 395 "args": an optional list of additional arguments to pass to the 396 above script. 397 """ 398 if idempotent is None: 399 idempotent = self.default_idempotent 400 return SwarmingTask( 401 title=title, 402 isolated_hash=isolated_hash, 403 dimensions=self._default_dimensions, 404 env=self._default_env, 405 priority=self.default_priority, 406 shards=shards, 407 buildername=self.m.properties.get('buildername'), 408 buildnumber=self.m.properties.get('buildnumber'), 409 user=self.default_user, 410 expiration=self.default_expiration, 411 io_timeout=self.default_io_timeout, 412 hard_timeout=self.default_hard_timeout, 413 idempotent=idempotent, 414 ignore_task_failure=ignore_task_failure, 415 extra_args=extra_args, 416 collect_step=self._default_collect_step, 417 task_output_dir=task_output_dir, 418 cipd_packages=cipd_packages, 419 build_properties=build_properties, 420 merge=merge) 421 422 def check_client_version(self, step_test_data=None): 423 """Yields steps to verify compatibility with swarming_client version.""" 424 return self.m.swarming_client.ensure_script_version( 425 'swarming.py', MINIMAL_SWARMING_VERSION, step_test_data) 426 427 def trigger_task(self, task, **kwargs): 428 """Triggers one task. 429 430 It the task is sharded, will trigger all shards. This steps justs posts 431 the task and immediately returns. Use 'collect_task' to wait for a task to 432 finish and grab its result. 433 434 Behaves as a regular recipe step: returns StepData with step results 435 on success or raises StepFailure if step fails. 436 437 Args: 438 task: SwarmingTask instance. 439 kwargs: passed to recipe step constructor as-is. 440 """ 441 assert isinstance(task, SwarmingTask) 442 assert task.task_name not in self._pending_tasks, ( 443 'Triggered same task twice: %s' % task.task_name) 444 assert 'os' in task.dimensions, task.dimensions 445 self._pending_tasks.add(task.task_name) 446 447 # Trigger parameters. 448 args = [ 449 'trigger', 450 '--swarming', self.swarming_server, 451 '--isolate-server', self.m.isolate.isolate_server, 452 '--priority', str(task.priority), 453 '--shards', str(task.shards), 454 '--task-name', task.task_name, 455 '--dump-json', self.m.json.output(), 456 '--expiration', str(task.expiration), 457 '--io-timeout', str(task.io_timeout), 458 '--hard-timeout', str(task.hard_timeout), 459 ] 460 for name, value in sorted(task.dimensions.iteritems()): 461 assert isinstance(value, basestring), value 462 args.extend(['--dimension', name, value]) 463 for name, value in sorted(task.env.iteritems()): 464 assert isinstance(value, basestring), value 465 args.extend(['--env', name, value]) 466 467 # Default tags. 468 tags = set(task.tags) 469 tags.update(self._default_tags) 470 tags.add('data:' + task.isolated_hash) 471 tags.add('name:' + task.title.split(' ')[0]) 472 mastername = self.m.properties.get('mastername') 473 if mastername: # pragma: no cover 474 tags.add('master:' + mastername) 475 if task.buildername: # pragma: no cover 476 tags.add('buildername:' + task.buildername) 477 if task.buildnumber: # pragma: no cover 478 tags.add('buildnumber:%s' % task.buildnumber) 479 if task.dimensions.get('os'): 480 tags.add('os:' + task.dimensions['os']) 481 if self.m.properties.get('bot_id'): # pragma: no cover 482 tags.add('slavename:%s' % self.m.properties['bot_id']) 483 tags.add('stepname:%s' % self.get_step_name('', task)) 484 rietveld = self.m.properties.get('rietveld') 485 issue = self.m.properties.get('issue') 486 patchset = self.m.properties.get('patchset') 487 if rietveld and issue and patchset: 488 # The expected format is strict to the usage of buildbot properties on the 489 # Chromium Try Server. Fix if necessary. 490 tags.add('rietveld:%s/%s/#ps%s' % (rietveld, issue, patchset)) 491 for tag in sorted(tags): 492 assert ':' in tag, tag 493 args.extend(['--tag', tag]) 494 495 if self.verbose: 496 args.append('--verbose') 497 if task.idempotent: 498 args.append('--idempotent') 499 if task.user: 500 args.extend(['--user', task.user]) 501 502 if task.cipd_packages: 503 for path, pkg, version in task.cipd_packages: 504 args.extend(['--cipd-package', '%s:%s:%s' % (path, pkg, version)]) 505 506 # What isolated command to trigger. 507 args.extend(('--isolated', task.isolated_hash)) 508 509 # Additional command line args for isolated command. 510 if task.extra_args: # pragma: no cover 511 args.append('--') 512 args.extend(task.extra_args) 513 514 # The step can fail only on infra failures, so mark it as 'infra_step'. 515 try: 516 return self.m.python( 517 name=self.get_step_name('trigger', task), 518 script=self.m.swarming_client.path.join('swarming.py'), 519 args=args, 520 step_test_data=functools.partial( 521 self._gen_trigger_step_test_data, task), 522 infra_step=True, 523 **kwargs) 524 finally: 525 # Store trigger output with the |task|, print links to triggered shards. 526 step_result = self.m.step.active_result 527 step_result.presentation.step_text += text_for_task(task) 528 529 if step_result.presentation != self.m.step.FAILURE: 530 task._trigger_output = step_result.json.output 531 links = step_result.presentation.links 532 for index in xrange(task.shards): 533 url = task.get_shard_view_url(index) 534 if url: 535 links['shard #%d' % index] = url 536 assert not hasattr(step_result, 'swarming_task') 537 step_result.swarming_task = task 538 539 def collect_task(self, task, **kwargs): 540 """Waits for a single triggered task to finish. 541 542 If the task is sharded, will wait for all shards to finish. Behaves as 543 a regular recipe step: returns StepData with step results on success or 544 raises StepFailure if task fails. 545 546 Args: 547 task: SwarmingTask instance, previously triggered with 'trigger' method. 548 kwargs: passed to recipe step constructor as-is. 549 """ 550 # TODO(vadimsh): Raise InfraFailure on Swarming failures. 551 assert isinstance(task, SwarmingTask) 552 assert task.task_name in self._pending_tasks, ( 553 'Trying to collect a task that was not triggered: %s' % 554 task.task_name) 555 self._pending_tasks.remove(task.task_name) 556 557 try: 558 return task.collect_step(task, **kwargs) 559 finally: 560 try: 561 self.m.step.active_result.swarming_task = task 562 except Exception: # pragma: no cover 563 # If we don't have an active_result, something failed very early, 564 # so we eat this exception and let that one propagate. 565 pass 566 567 def trigger(self, tasks, **kwargs): # pragma: no cover 568 """Batch version of 'trigger_task'. 569 570 Deprecated, to be removed soon. Use 'trigger_task' in a loop instead, 571 properly handling exceptions. This method doesn't handle trigger failures 572 well (it aborts on a first failure). 573 """ 574 return [self.trigger_task(t, **kwargs) for t in tasks] 575 576 def collect(self, tasks, **kwargs): # pragma: no cover 577 """Batch version of 'collect_task'. 578 579 Deprecated, to be removed soon. Use 'collect_task' in a loop instead, 580 properly handling exceptions. This method doesn't handle collect failures 581 well (it aborts on a first failure). 582 """ 583 return [self.collect_task(t, **kwargs) for t in tasks] 584 585 # To keep compatibility with some build_internal code. To be removed as well. 586 collect_each = collect 587 588 @staticmethod 589 def _display_pending(summary_json, step_presentation): 590 """Shows max pending time in seconds across all shards if it exceeds 10s.""" 591 pending_times = [ 592 (parse_time(shard['started_ts']) - 593 parse_time(shard['created_ts'])).total_seconds() 594 for shard in summary_json.get('shards', []) if shard.get('started_ts') 595 ] 596 max_pending = max(pending_times) if pending_times else 0 597 598 # Only display annotation when pending more than 10 seconds to reduce noise. 599 if max_pending > 10: 600 step_presentation.step_text += '<br>swarming pending %ds' % max_pending 601 602 def _default_collect_step( 603 self, task, merged_test_output=None, 604 step_test_data=None, 605 **kwargs): 606 """Produces a step that collects a result of an arbitrary task.""" 607 task_output_dir = task.task_output_dir or self.m.raw_io.output_dir() 608 609 # If we don't already have a Placeholder, wrap the task_output_dir in one 610 # so we can read out of it later w/ step_result.raw_io.output_dir. 611 if not isinstance(task_output_dir, recipe_util.Placeholder): 612 task_output_dir = self.m.raw_io.output_dir(leak_to=task_output_dir) 613 614 task_args = [ 615 '-o', merged_test_output or self.m.json.output(), 616 '--task-output-dir', task_output_dir, 617 ] 618 619 merge_script = (task.merge.get('script') 620 or self.resource('noop_merge.py')) 621 merge_args = (task.merge.get('args') or []) 622 623 task_args.extend([ 624 '--merge-script', merge_script, 625 '--merge-additional-args', self.m.json.dumps(merge_args), 626 ]) 627 628 if task.build_properties: # pragma: no cover 629 properties = dict(task.build_properties) 630 properties.update(self.m.properties) 631 task_args.extend([ 632 '--build-properties', self.m.json.dumps(properties), 633 ]) 634 635 task_args.append('--') 636 # Arguments for the actual 'collect' command. 637 collect_cmd = [ 638 'python', 639 '-u', 640 self.m.swarming_client.path.join('swarming.py'), 641 ] 642 collect_cmd.extend(self.get_collect_cmd_args(task)) 643 collect_cmd.extend([ 644 '--task-summary-json', self.summary(), 645 ]) 646 647 task_args.extend(collect_cmd) 648 649 allowed_return_codes = {0} 650 if task.ignore_task_failure: # pragma: no cover 651 allowed_return_codes = 'any' 652 653 # The call to collect_task emits two JSON files: 654 # 1) a task summary JSON emitted by swarming 655 # 2) a gtest results JSON emitted by the task 656 # This builds an instance of StepTestData that covers both. 657 step_test_data = step_test_data or ( 658 self.test_api.canned_summary_output(task.shards) + 659 self.m.json.test_api.output({})) 660 661 try: 662 with self.m.context(cwd=self.m.path['start_dir']): 663 return self.m.python( 664 name=self.get_step_name('', task), 665 script=self.resource('collect_task.py'), 666 args=task_args, 667 ok_ret=allowed_return_codes, 668 step_test_data=lambda: step_test_data, 669 **kwargs) 670 finally: 671 step_result = None 672 try: 673 step_result = self.m.step.active_result 674 step_result.presentation.step_text = text_for_task(task) 675 summary_json = step_result.swarming.summary 676 self._handle_summary_json(task, summary_json, step_result) 677 678 links = {} 679 if hasattr(step_result, 'json') and hasattr(step_result.json, 'output'): 680 links = step_result.json.output.get('links', {}) 681 for k, v in links.iteritems(): # pragma: no cover 682 step_result.presentation.links[k] = v 683 except Exception as e: 684 if step_result: 685 step_result.presentation.logs['no_results_exc'] = [str(e)] 686 687 def get_step_name(self, prefix, task): 688 """SwarmingTask -> name of a step of a waterfall. 689 690 Will take a task title (+ step name prefix) and append OS dimension to it. 691 692 Args: 693 prefix: prefix to append to task title, like 'trigger'. 694 task: SwarmingTask instance. 695 696 Returns: 697 '[<prefix>] <task title> on <OS>' 698 """ 699 prefix = '[%s] ' % prefix if prefix else '' 700 task_os = task.dimensions['os'] 701 702 bot_os = self.prefered_os_dimension(self.m.platform.name) 703 suffix = ('' if ( 704 task_os == bot_os or task_os.lower() == self.m.platform.name.lower()) 705 else ' on %s' % task_os) 706 # Note: properly detecting dimensions of the bot the recipe is running 707 # on is somewhat non-trivial. It is not safe to assume it uses default 708 # or preferred dimensions for its OS. For example, the version of the OS 709 # can differ. 710 return ''.join((prefix, task.title, suffix)) 711 712 def _handle_summary_json(self, task, summary, step_result): 713 # We store this now, and add links to all shards first, before failing the 714 # build. Format is tuple of (error message, shard that failed) 715 infra_failures = [] 716 links = step_result.presentation.links 717 for index, shard in enumerate(summary['shards']): 718 url = task.get_shard_view_url(index) 719 display_text = 'shard #%d' % index 720 721 if not shard or shard.get('internal_failure'): # pragma: no cover 722 display_text = ( 723 'shard #%d had an internal swarming failure' % index) 724 infra_failures.append((index, 'Internal swarming failure')) 725 elif self._is_expired(shard): 726 display_text = ( 727 'shard #%d expired, not enough capacity' % index) 728 infra_failures.append(( 729 index, 'There isn\'t enough capacity to run your test')) 730 elif self._is_timed_out(shard): 731 display_text = ( 732 'shard #%d timed out, took too much time to complete' % index) 733 elif self._get_exit_code(shard) != '0': # pragma: no cover 734 display_text = 'shard #%d (failed)' % index 735 736 if self.show_isolated_out_in_collect_step: 737 isolated_out = shard.get('isolated_out') 738 if isolated_out: 739 link_name = 'shard #%d isolated out' % index 740 links[link_name] = isolated_out['view_url'] 741 742 if url and self.show_shards_in_collect_step: 743 links[display_text] = url 744 745 self._display_pending(summary, step_result.presentation) 746 747 if infra_failures: 748 template = 'Shard #%s failed: %s' 749 750 # Done so that raising an InfraFailure doesn't cause an error. 751 # TODO(martiniss): Remove this hack. Requires recipe engine change 752 step_result._retcode = 2 753 step_result.presentation.status = self.m.step.EXCEPTION 754 raise recipe_api.InfraFailure( 755 '\n'.join(template % f for f in infra_failures), result=step_result) 756 757 def get_collect_cmd_args(self, task): 758 """SwarmingTask -> argument list for 'swarming.py' command.""" 759 args = [ 760 'collect', 761 '--swarming', self.swarming_server, 762 '--decorate', 763 '--print-status-updates', 764 ] 765 if self.verbose: 766 args.append('--verbose') 767 args.extend(('--json', self.m.json.input(task.trigger_output))) 768 return args 769 770 def _gen_trigger_step_test_data(self, task): 771 """Generates an expected value of --dump-json in 'trigger' step. 772 773 Used when running recipes to generate test expectations. 774 """ 775 # Suffixes of shard subtask names. 776 subtasks = [] 777 if task.shards == 1: 778 subtasks = [''] 779 else: 780 subtasks = [':%d:%d' % (task.shards, i) for i in range(task.shards)] 781 return self.m.json.test_api.output({ 782 'base_task_name': task.task_name, 783 'tasks': { 784 '%s%s' % (task.task_name, suffix): { 785 'task_id': '1%02d00' % i, 786 'shard_index': i, 787 'view_url': '%s/user/task/1%02d00' % (self.swarming_server, i), 788 } for i, suffix in enumerate(subtasks) 789 }, 790 }) 791 792 793class SwarmingTask(object): 794 """Definition of a task to run on swarming.""" 795 796 def __init__(self, title, isolated_hash, ignore_task_failure, dimensions, 797 env, priority, shards, buildername, buildnumber, expiration, 798 user, io_timeout, hard_timeout, idempotent, extra_args, 799 collect_step, task_output_dir, cipd_packages=None, 800 build_properties=None, merge=None): 801 """Configuration of a swarming task. 802 803 Args: 804 title: display name of the task, hints to what task is doing. Usually 805 corresponds to a name of a test executable. Doesn't have to be unique. 806 isolated_hash: hash of isolated file that describes all files needed to 807 run the task as well as command line to launch. See 'isolate' recipe 808 module. 809 ignore_task_failure: whether to ignore the test failure of swarming 810 tasks. 811 cipd_packages: list of 3-tuples corresponding to CIPD packages needed for 812 the task: ('path', 'package_name', 'version'), defined as follows: 813 path: Path relative to the Swarming root dir in which to install 814 the package. 815 package_name: Name of the package to install, 816 eg. "infra/tools/authutil/${platform}" 817 version: Version of the package, either a package instance ID, 818 ref, or tag key/value pair. 819 collect_step: callback that will be called to collect and processes 820 results of task execution, signature is collect_step(task, **kwargs). 821 dimensions: key-value mapping with swarming dimensions that specify 822 on what Swarming slaves task can run. One important dimension is 'os', 823 which defines platform flavor to run the task on. See Swarming doc. 824 env: key-value mapping with additional environment variables to add to 825 environment before launching the task executable. 826 priority: integer [0, 255] that defines how urgent the task is. 827 Lower value corresponds to higher priority. Swarming service executes 828 tasks with higher priority first. 829 shards: how many concurrent shards to run, makes sense only for 830 isolated tests based on gtest. Swarming uses GTEST_SHARD_INDEX 831 and GTEST_TOTAL_SHARDS environment variables to tell the executable 832 what shard to run. 833 buildername: buildbot builder this task was triggered from. 834 buildnumber: build number of a build this task was triggered from. 835 expiration: number of schedule until the task shouldn't even be run if it 836 hadn't started yet. 837 user: user that requested this task, if applicable. 838 io_timeout: number of seconds that the task is allowed to not emit any 839 stdout bytes, after which it is forcibly killed. 840 hard_timeout: number of seconds for which the task is allowed to run, 841 after which it is forcibly killed. 842 idempotent: True if the results from a previous task can be reused. E.g. 843 this task has no side-effects. 844 extra_args: list of command line arguments to pass to isolated tasks. 845 task_output_dir: if defined, the directory where task results are placed 846 during the collect step. 847 build_properties: An optional dict containing various build properties. 848 These are typically but not necessarily the properties emitted by 849 bot_update. 850 merge: An optional dict containing: 851 "script": path to a script to call to post process and merge the 852 collected outputs from the tasks. 853 "args": an optional list of additional arguments to pass to the 854 above script. 855 """ 856 self._trigger_output = None 857 self.build_properties = build_properties 858 self.buildername = buildername 859 self.buildnumber = buildnumber 860 self.cipd_packages = cipd_packages 861 self.collect_step = collect_step 862 self.dimensions = dimensions.copy() 863 self.env = env.copy() 864 self.expiration = expiration 865 self.extra_args = tuple(extra_args or []) 866 self.hard_timeout = hard_timeout 867 self.idempotent = idempotent 868 self.ignore_task_failure = ignore_task_failure 869 self.io_timeout = io_timeout 870 self.isolated_hash = isolated_hash 871 self.merge = merge or {} 872 self.priority = priority 873 self.shards = shards 874 self.tags = set() 875 self.task_output_dir = task_output_dir 876 self.title = title 877 self.user = user 878 879 @property 880 def task_name(self): 881 """Name of this task, derived from its other properties. 882 883 The task name is purely to make sense of the task and is not used in any 884 other way. 885 """ 886 out = '%s/%s/%s' % ( 887 self.title, self.dimensions['os'], self.isolated_hash[:10]) 888 if self.buildername: # pragma: no cover 889 out += '/%s/%s' % (self.buildername, self.buildnumber or -1) 890 return out 891 892 @property 893 def trigger_output(self): 894 """JSON results of 'trigger' step or None if not triggered.""" 895 return self._trigger_output 896 897 def get_shard_view_url(self, index): 898 """Returns URL of HTML page with shard details or None if not available. 899 900 Works only after the task has been successfully triggered. 901 """ 902 if self._trigger_output and self._trigger_output.get('tasks'): 903 for shard_dict in self._trigger_output['tasks'].itervalues(): 904 if shard_dict['shard_index'] == index: 905 return shard_dict['view_url'] 906