• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import datetime
6import functools
7import hashlib
8import logging
9import os.path
10
11from recipe_engine import config_types
12from recipe_engine import recipe_api
13from recipe_engine import util as recipe_util
14
15import state
16
17
18# TODO(borenet): This module was copied from build.git and heavily modified to
19# remove dependencies on other modules in build.git.  It belongs in a different
20# repo. Remove this once it has been moved.
21
22
23# Minimally supported version of swarming.py script (reported by --version).
24MINIMAL_SWARMING_VERSION = (0, 8, 6)
25
26
27def text_for_task(task):
28  lines = []
29
30  if task.dimensions.get('id'):  # pragma: no cover
31    lines.append('Bot id: %r' % task.dimensions['id'])
32  if task.dimensions.get('os'):
33    lines.append('Run on OS: %r' % task.dimensions['os'])
34
35  return '<br/>'.join(lines)
36
37
38def parse_time(value):
39  """Converts serialized time from the API to datetime.datetime."""
40  # When microseconds are 0, the '.123456' suffix is elided. This means the
41  # serialized format is not consistent, which confuses the hell out of python.
42  # TODO(maruel): Remove third format once we enforce version >=0.8.2.
43  for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S'):
44    try:
45      return datetime.datetime.strptime(value, fmt)
46    except ValueError:  # pragma: no cover
47      pass
48  raise ValueError('Failed to parse %s' % value)  # pragma: no cover
49
50
51class ReadOnlyDict(dict):
52  def __setitem__(self, key, value):
53    raise TypeError('ReadOnlyDict is immutable')
54
55
56class SwarmingApi(recipe_api.RecipeApi):
57  """Recipe module to use swarming.py tool to run tasks on Swarming.
58
59  General usage:
60    1. Tweak default task parameters applied to all swarming tasks (such as
61       default_dimensions and default_priority).
62    2. Isolate some test using 'isolate' recipe module. Get isolated hash as
63       a result of that process.
64    3. Create a task configuration using 'task(...)' method, providing
65       isolated hash obtained previously.
66    4. Tweak the task parameters. This step is optional.
67    5. Launch the task on swarming by calling 'trigger_task(...)'.
68    6. Continue doing useful work locally while the task is running concurrently
69       on swarming.
70    7. Wait for task to finish and collect its result (exit code, logs)
71       by calling 'collect_task(...)'.
72
73  See also example.py for concrete code.
74  """
75
76  State = state.State
77
78  #############################################################################
79  # The below are helper functions to help transition between the old and new #
80  # swarming result formats. TODO(martiniss): remove these                    #
81  #############################################################################
82
83  def _is_expired(self, shard):
84    # FIXME: We really should only have one format for enums. We want to move to
85    # strings, currently have numbers.
86    return (
87        shard.get('state') == self.State.EXPIRED or
88        shard.get('state') == 'EXPIRED')
89
90  def _is_timed_out(self, shard):
91    # FIXME: We really should only have one format for enums. We want to move to
92    # strings, currently have numbers.
93    return (
94        shard.get('state') == self.State.TIMED_OUT or
95        shard.get('state') == 'TIMED_OUT')
96
97  def _get_exit_code(self, shard):
98    if shard.get('exit_code'):
99      return shard.get('exit_code')  # pragma: no cover
100    lst = shard.get('exit_codes', [])
101    return str(lst[0]) if lst else None
102
103  def __init__(self, **kwargs):
104    super(SwarmingApi, self).__init__(**kwargs)
105    # All tests default to a x86-64 bot running with no GPU. This simplifies
106    # management so that new tests are not executed on exotic bots by accidents
107    # even if misconfigured.
108    self._default_dimensions = {
109      'cpu': 'x86-64',
110      'gpu': 'none',
111    }
112    # Expirations are set to mildly good values and will be tightened soon.
113    self._default_expiration = 60*60
114    self._default_env = {}
115    self._default_hard_timeout = 60*60
116    self._default_idempotent = False
117    self._default_io_timeout = 20*60
118    # The default priority is extremely low and should be increased dependending
119    # on the type of task.
120    self._default_priority = 200
121    self._default_tags = set()
122    self._default_user = None
123    self._pending_tasks = set()
124    self._show_isolated_out_in_collect_step = True
125    self._show_shards_in_collect_step = False
126    self._swarming_server = 'https://chromium-swarm.appspot.com'
127    self._verbose = False
128
129  @recipe_util.returns_placeholder
130  def summary(self):
131    return self.m.json.output()
132
133  @property
134  def swarming_server(self):
135    """URL of Swarming server to use, default is a production one."""
136    return self._swarming_server
137
138  @swarming_server.setter
139  def swarming_server(self, value):
140    """Changes URL of Swarming server to use."""
141    self._swarming_server = value
142
143  @property
144  def verbose(self):
145    """True to run swarming scripts with verbose output."""
146    return self._verbose
147
148  @verbose.setter
149  def verbose(self, value):
150    """Enables or disables verbose output in swarming scripts."""
151    assert isinstance(value, bool), value
152    self._verbose = value
153
154  @property
155  def default_expiration(self):
156    """Number of seconds that the server will wait to find a bot able to run the
157    task.
158
159    If not bot runs the task by this number of seconds, the task is canceled as
160    EXPIRED.
161
162    This value can be changed per individual task.
163    """
164    return self._default_expiration
165
166  @default_expiration.setter
167  def default_expiration(self, value):
168    assert 30 <= value <= 24*60*60, value
169    self._default_expiration = value
170
171  @property
172  def default_hard_timeout(self):
173    """Number of seconds in which the task must complete.
174
175    If the task takes more than this amount of time, the process is assumed to
176    be hung. It forcibly killed via SIGTERM then SIGKILL after a grace period
177    (default: 30s). Then the task is marked as TIMED_OUT.
178
179    This value can be changed per individual task.
180    """
181    return self._default_hard_timeout
182
183  @default_hard_timeout.setter
184  def default_hard_timeout(self, value):
185    assert 30 <= value <= 6*60*60, value
186    self._default_hard_timeout = value
187
188  @property
189  def default_io_timeout(self):
190    """Number of seconds at which interval the task must write to stdout or
191    stderr.
192
193    If the task takes more than this amount of time between writes to stdout or
194    stderr, the process is assumed to be hung. It forcibly killed via SIGTERM
195    then SIGKILL after a grace period (default: 30s). Then the task is marked as
196    TIMED_OUT.
197
198    This value can be changed per individual task.
199    """
200    return self._default_io_timeout
201
202  @default_io_timeout.setter
203  def default_io_timeout(self, value):
204    assert 30 <= value <= 6*60*60, value
205    self._default_io_timeout = value
206
207  @property
208  def default_idempotent(self):
209    """Bool to specify if task deduplication can be done.
210
211    When set, the server will search for another task that ran in the last days
212    that had the exact same properties. If it finds one, the task will not be
213    run at all, the previous results will be returned as-is.
214
215    For more infos, see:
216    https://github.com/luci/luci-py/blob/master/appengine/swarming/doc/User-Guide.md#task-idempotency
217
218    This value can be changed per individual task.
219    """
220    return self._default_idempotent
221
222  @default_idempotent.setter
223  def default_idempotent(self, value):
224    assert isinstance(value, bool), value
225    self._default_idempotent = value
226
227  @property
228  def default_user(self):
229    """String to represent who triggered the task.
230
231    The user should be an email address when someone requested testing via
232    pre-commit or manual testing.
233
234    This value can be changed per individual task.
235    """
236    return self._default_user
237
238  @default_user.setter
239  def default_user(self, value):
240    assert value is None or isinstance(value, basestring), value
241    self._default_user = value
242
243  @property
244  def default_dimensions(self):
245    """Returns a copy of the default Swarming dimensions to run task on.
246
247    The dimensions are what is used to filter which bots are able to run the
248    task successfully. This is particularly useful to discern between OS
249    versions, type of CPU, GPU card or VM, or preallocated pool.
250
251    Example:
252      {'cpu': 'x86-64', 'os': 'Windows-XP-SP3'}
253
254    This value can be changed per individual task.
255    """
256    return ReadOnlyDict(self._default_dimensions)
257
258  def set_default_dimension(self, key, value):
259    assert isinstance(key, basestring), key
260    assert isinstance(value, basestring) or value is None, value
261    if value is None:
262      self._default_dimensions.pop(key, None)
263    else:
264      self._default_dimensions[key] = value  # pragma: no cover
265
266  @property
267  def default_env(self):
268    """Returns a copy of the default environment variable to run tasks with.
269
270    By default the environment variable is not modified. Additional environment
271    variables can be specified for each task.
272
273    This value can be changed per individual task.
274    """
275    return ReadOnlyDict(self._default_env)
276
277  def set_default_env(self, key, value):
278    assert isinstance(key, basestring), key
279    assert isinstance(value, basestring), value
280    self._default_env[key] = value
281
282  @property
283  def default_priority(self):
284    """Swarming task priority for tasks triggered from the recipe.
285
286    Priority ranges from 1 to 255. The lower the value, the most important the
287    task is and will preempty any task with a lower priority.
288
289    This value can be changed per individual task.
290    """
291    return self._default_priority
292
293  @default_priority.setter
294  def default_priority(self, value):
295    assert 1 <= value <= 255
296    self._default_priority = value
297
298  def add_default_tag(self, tag):
299    """Adds a tag to the Swarming tasks triggered.
300
301    Tags are used for maintenance, they can be used to calculate the number of
302    tasks run for a day to calculate the cost of a type of type (CQ, ASAN, etc).
303
304    Tags can be added per individual task.
305    """
306    assert ':' in tag, tag
307    self._default_tags.add(tag)
308
309  @property
310  def show_isolated_out_in_collect_step(self):
311    """Show the shard's isolated out link in each collect step."""
312    return self._show_isolated_out_in_collect_step
313
314  @show_isolated_out_in_collect_step.setter
315  def show_isolated_out_in_collect_step(self, value):
316    self._show_isolated_out_in_collect_step = value
317
318  @property
319  def show_shards_in_collect_step(self):
320    """Show the shard link in each collect step."""
321    return self._show_shards_in_collect_step
322
323  @show_shards_in_collect_step.setter
324  def show_shards_in_collect_step(self, value):
325    self._show_shards_in_collect_step = value
326
327  @staticmethod
328  def prefered_os_dimension(platform):
329    """Given a platform name returns the prefered Swarming OS dimension.
330
331    Platform name is usually provided by 'platform' recipe module, it's one
332    of 'win', 'linux', 'mac'. This function returns more concrete Swarming OS
333    dimension that represent this platform on Swarming by default.
334
335    Recipes are free to use other OS dimension if there's a need for it. For
336    example WinXP try bot recipe may explicitly specify 'Windows-XP-SP3'
337    dimension.
338    """
339    return {
340      'linux': 'Ubuntu-14.04',
341      'mac': 'Mac-10.9',
342      'win': 'Windows-7-SP1',
343    }[platform]
344
345  def task(self, title, isolated_hash, ignore_task_failure=False, shards=1,
346           task_output_dir=None, extra_args=None, idempotent=None,
347           cipd_packages=None, build_properties=None, merge=None):
348    """Returns a new SwarmingTask instance to run an isolated executable on
349    Swarming.
350
351    For google test executables, use gtest_task() instead.
352
353    At the time of this writting, this code is used by V8, Skia and iOS.
354
355    The return value can be customized if necessary (see SwarmingTask class
356    below). Pass it to 'trigger_task' to launch it on swarming. Later pass the
357    same instance to 'collect_task' to wait for the task to finish and fetch its
358    results.
359
360    Args:
361      title: name of the test, used as part of a task ID.
362      isolated_hash: hash of isolated test on isolate server, the test should
363          be already isolated there, see 'isolate' recipe module.
364      ignore_task_failure: whether to ignore the test failure of swarming
365        tasks. By default, this is set to False.
366      shards: if defined, the number of shards to use for the task. By default
367          this value is either 1 or based on the title.
368      task_output_dir: if defined, the directory where task results are placed.
369          The caller is responsible for removing this folder when finished.
370      extra_args: list of command line arguments to pass to isolated tasks.
371      idempotent: whether this task is considered idempotent. Defaults
372          to self.default_idempotent if not specified.
373      cipd_packages: list of 3-tuples corresponding to CIPD packages needed for
374          the task: ('path', 'package_name', 'version'), defined as follows:
375              path: Path relative to the Swarming root dir in which to install
376                  the package.
377              package_name: Name of the package to install,
378                  eg. "infra/tools/authutil/${platform}"
379              version: Version of the package, either a package instance ID,
380                  ref, or tag key/value pair.
381      build_properties: An optional dict containing various build properties.
382          These are typically but not necessarily the properties emitted by
383          bot_update.
384      merge: An optional dict containing:
385          "script": path to a script to call to post process and merge the
386              collected outputs from the tasks. The script should take one
387              named (but required) parameter, '-o' (for output), that represents
388              the path that the merged results should be written to, and accept
389              N additional paths to result files to merge. The merged results
390              should be in the JSON Results File Format
391              (https://www.chromium.org/developers/the-json-test-results-format)
392              and may optionally contain a top level "links" field that
393              may contain a dict mapping link text to URLs, for a set of
394              links that will be included in the buildbot output.
395          "args": an optional list of additional arguments to pass to the
396              above script.
397    """
398    if idempotent is None:
399      idempotent = self.default_idempotent
400    return SwarmingTask(
401        title=title,
402        isolated_hash=isolated_hash,
403        dimensions=self._default_dimensions,
404        env=self._default_env,
405        priority=self.default_priority,
406        shards=shards,
407        buildername=self.m.properties.get('buildername'),
408        buildnumber=self.m.properties.get('buildnumber'),
409        user=self.default_user,
410        expiration=self.default_expiration,
411        io_timeout=self.default_io_timeout,
412        hard_timeout=self.default_hard_timeout,
413        idempotent=idempotent,
414        ignore_task_failure=ignore_task_failure,
415        extra_args=extra_args,
416        collect_step=self._default_collect_step,
417        task_output_dir=task_output_dir,
418        cipd_packages=cipd_packages,
419        build_properties=build_properties,
420        merge=merge)
421
422  def check_client_version(self, step_test_data=None):
423    """Yields steps to verify compatibility with swarming_client version."""
424    return self.m.swarming_client.ensure_script_version(
425        'swarming.py', MINIMAL_SWARMING_VERSION, step_test_data)
426
427  def trigger_task(self, task, **kwargs):
428    """Triggers one task.
429
430    It the task is sharded, will trigger all shards. This steps justs posts
431    the task and immediately returns. Use 'collect_task' to wait for a task to
432    finish and grab its result.
433
434    Behaves as a regular recipe step: returns StepData with step results
435    on success or raises StepFailure if step fails.
436
437    Args:
438      task: SwarmingTask instance.
439      kwargs: passed to recipe step constructor as-is.
440    """
441    assert isinstance(task, SwarmingTask)
442    assert task.task_name not in self._pending_tasks, (
443        'Triggered same task twice: %s' % task.task_name)
444    assert 'os' in task.dimensions, task.dimensions
445    self._pending_tasks.add(task.task_name)
446
447    # Trigger parameters.
448    args = [
449      'trigger',
450      '--swarming', self.swarming_server,
451      '--isolate-server', self.m.isolate.isolate_server,
452      '--priority', str(task.priority),
453      '--shards', str(task.shards),
454      '--task-name', task.task_name,
455      '--dump-json', self.m.json.output(),
456      '--expiration', str(task.expiration),
457      '--io-timeout', str(task.io_timeout),
458      '--hard-timeout', str(task.hard_timeout),
459    ]
460    for name, value in sorted(task.dimensions.iteritems()):
461      assert isinstance(value, basestring), value
462      args.extend(['--dimension', name, value])
463    for name, value in sorted(task.env.iteritems()):
464      assert isinstance(value, basestring), value
465      args.extend(['--env', name, value])
466
467    # Default tags.
468    tags = set(task.tags)
469    tags.update(self._default_tags)
470    tags.add('data:' + task.isolated_hash)
471    tags.add('name:' + task.title.split(' ')[0])
472    mastername = self.m.properties.get('mastername')
473    if mastername:  # pragma: no cover
474      tags.add('master:' + mastername)
475    if task.buildername:  # pragma: no cover
476      tags.add('buildername:' + task.buildername)
477    if task.buildnumber:  # pragma: no cover
478      tags.add('buildnumber:%s' % task.buildnumber)
479    if task.dimensions.get('os'):
480      tags.add('os:' + task.dimensions['os'])
481    if self.m.properties.get('bot_id'):  # pragma: no cover
482      tags.add('slavename:%s' % self.m.properties['bot_id'])
483    tags.add('stepname:%s' % self.get_step_name('', task))
484    rietveld = self.m.properties.get('rietveld')
485    issue = self.m.properties.get('issue')
486    patchset = self.m.properties.get('patchset')
487    if rietveld and issue and patchset:
488      # The expected format is strict to the usage of buildbot properties on the
489      # Chromium Try Server. Fix if necessary.
490      tags.add('rietveld:%s/%s/#ps%s' % (rietveld, issue, patchset))
491    for tag in sorted(tags):
492      assert ':' in tag, tag
493      args.extend(['--tag', tag])
494
495    if self.verbose:
496      args.append('--verbose')
497    if task.idempotent:
498      args.append('--idempotent')
499    if task.user:
500      args.extend(['--user', task.user])
501
502    if task.cipd_packages:
503      for path, pkg, version in task.cipd_packages:
504        args.extend(['--cipd-package', '%s:%s:%s' % (path, pkg, version)])
505
506    # What isolated command to trigger.
507    args.extend(('--isolated', task.isolated_hash))
508
509    # Additional command line args for isolated command.
510    if task.extra_args:  # pragma: no cover
511      args.append('--')
512      args.extend(task.extra_args)
513
514    # The step can fail only on infra failures, so mark it as 'infra_step'.
515    try:
516      return self.m.python(
517          name=self.get_step_name('trigger', task),
518          script=self.m.swarming_client.path.join('swarming.py'),
519          args=args,
520          step_test_data=functools.partial(
521              self._gen_trigger_step_test_data, task),
522          infra_step=True,
523          **kwargs)
524    finally:
525      # Store trigger output with the |task|, print links to triggered shards.
526      step_result = self.m.step.active_result
527      step_result.presentation.step_text += text_for_task(task)
528
529      if step_result.presentation != self.m.step.FAILURE:
530        task._trigger_output = step_result.json.output
531        links = step_result.presentation.links
532        for index in xrange(task.shards):
533          url = task.get_shard_view_url(index)
534          if url:
535            links['shard #%d' % index] = url
536      assert not hasattr(step_result, 'swarming_task')
537      step_result.swarming_task = task
538
539  def collect_task(self, task, **kwargs):
540    """Waits for a single triggered task to finish.
541
542    If the task is sharded, will wait for all shards to finish. Behaves as
543    a regular recipe step: returns StepData with step results on success or
544    raises StepFailure if task fails.
545
546    Args:
547      task: SwarmingTask instance, previously triggered with 'trigger' method.
548      kwargs: passed to recipe step constructor as-is.
549    """
550    # TODO(vadimsh): Raise InfraFailure on Swarming failures.
551    assert isinstance(task, SwarmingTask)
552    assert task.task_name in self._pending_tasks, (
553        'Trying to collect a task that was not triggered: %s' %
554        task.task_name)
555    self._pending_tasks.remove(task.task_name)
556
557    try:
558      return task.collect_step(task, **kwargs)
559    finally:
560      try:
561        self.m.step.active_result.swarming_task = task
562      except Exception:  # pragma: no cover
563        # If we don't have an active_result, something failed very early,
564        # so we eat this exception and let that one propagate.
565        pass
566
567  def trigger(self, tasks, **kwargs):  # pragma: no cover
568    """Batch version of 'trigger_task'.
569
570    Deprecated, to be removed soon. Use 'trigger_task' in a loop instead,
571    properly handling exceptions. This method doesn't handle trigger failures
572    well (it aborts on a first failure).
573    """
574    return [self.trigger_task(t, **kwargs) for t in tasks]
575
576  def collect(self, tasks, **kwargs):  # pragma: no cover
577    """Batch version of 'collect_task'.
578
579    Deprecated, to be removed soon. Use 'collect_task' in a loop instead,
580    properly handling exceptions. This method doesn't handle collect failures
581    well (it aborts on a first failure).
582    """
583    return [self.collect_task(t, **kwargs) for t in tasks]
584
585  # To keep compatibility with some build_internal code. To be removed as well.
586  collect_each = collect
587
588  @staticmethod
589  def _display_pending(summary_json, step_presentation):
590    """Shows max pending time in seconds across all shards if it exceeds 10s."""
591    pending_times = [
592      (parse_time(shard['started_ts']) -
593        parse_time(shard['created_ts'])).total_seconds()
594      for shard in summary_json.get('shards', []) if shard.get('started_ts')
595    ]
596    max_pending = max(pending_times) if pending_times else 0
597
598    # Only display annotation when pending more than 10 seconds to reduce noise.
599    if max_pending > 10:
600      step_presentation.step_text += '<br>swarming pending %ds' % max_pending
601
602  def _default_collect_step(
603      self, task, merged_test_output=None,
604      step_test_data=None,
605      **kwargs):
606    """Produces a step that collects a result of an arbitrary task."""
607    task_output_dir = task.task_output_dir or self.m.raw_io.output_dir()
608
609    # If we don't already have a Placeholder, wrap the task_output_dir in one
610    # so we can read out of it later w/ step_result.raw_io.output_dir.
611    if not isinstance(task_output_dir, recipe_util.Placeholder):
612      task_output_dir = self.m.raw_io.output_dir(leak_to=task_output_dir)
613
614    task_args = [
615      '-o', merged_test_output or self.m.json.output(),
616      '--task-output-dir', task_output_dir,
617    ]
618
619    merge_script = (task.merge.get('script')
620                    or self.resource('noop_merge.py'))
621    merge_args = (task.merge.get('args') or [])
622
623    task_args.extend([
624      '--merge-script', merge_script,
625      '--merge-additional-args', self.m.json.dumps(merge_args),
626    ])
627
628    if task.build_properties:  # pragma: no cover
629      properties = dict(task.build_properties)
630      properties.update(self.m.properties)
631      task_args.extend([
632          '--build-properties', self.m.json.dumps(properties),
633      ])
634
635    task_args.append('--')
636    # Arguments for the actual 'collect' command.
637    collect_cmd = [
638      'python',
639      '-u',
640      self.m.swarming_client.path.join('swarming.py'),
641    ]
642    collect_cmd.extend(self.get_collect_cmd_args(task))
643    collect_cmd.extend([
644      '--task-summary-json', self.summary(),
645    ])
646
647    task_args.extend(collect_cmd)
648
649    allowed_return_codes = {0}
650    if task.ignore_task_failure:  # pragma: no cover
651      allowed_return_codes = 'any'
652
653    # The call to collect_task emits two JSON files:
654    #  1) a task summary JSON emitted by swarming
655    #  2) a gtest results JSON emitted by the task
656    # This builds an instance of StepTestData that covers both.
657    step_test_data = step_test_data or (
658      self.test_api.canned_summary_output(task.shards) +
659      self.m.json.test_api.output({}))
660
661    try:
662      with self.m.context(cwd=self.m.path['start_dir']):
663        return self.m.python(
664            name=self.get_step_name('', task),
665            script=self.resource('collect_task.py'),
666            args=task_args,
667            ok_ret=allowed_return_codes,
668            step_test_data=lambda: step_test_data,
669            **kwargs)
670    finally:
671      step_result = None
672      try:
673        step_result = self.m.step.active_result
674        step_result.presentation.step_text = text_for_task(task)
675        summary_json = step_result.swarming.summary
676        self._handle_summary_json(task, summary_json, step_result)
677
678        links = {}
679        if hasattr(step_result, 'json') and hasattr(step_result.json, 'output'):
680          links = step_result.json.output.get('links', {})
681        for k, v in links.iteritems():  # pragma: no cover
682          step_result.presentation.links[k] = v
683      except Exception as e:
684        if step_result:
685          step_result.presentation.logs['no_results_exc'] = [str(e)]
686
687  def get_step_name(self, prefix, task):
688    """SwarmingTask -> name of a step of a waterfall.
689
690    Will take a task title (+ step name prefix) and append OS dimension to it.
691
692    Args:
693      prefix: prefix to append to task title, like 'trigger'.
694      task: SwarmingTask instance.
695
696    Returns:
697      '[<prefix>] <task title> on <OS>'
698    """
699    prefix = '[%s] ' % prefix if prefix else ''
700    task_os = task.dimensions['os']
701
702    bot_os = self.prefered_os_dimension(self.m.platform.name)
703    suffix = ('' if (
704        task_os == bot_os or task_os.lower() == self.m.platform.name.lower())
705              else ' on %s' % task_os)
706    # Note: properly detecting dimensions of the bot the recipe is running
707    # on is somewhat non-trivial. It is not safe to assume it uses default
708    # or preferred dimensions for its OS. For example, the version of the OS
709    # can differ.
710    return ''.join((prefix, task.title, suffix))
711
712  def _handle_summary_json(self, task, summary, step_result):
713    # We store this now, and add links to all shards first, before failing the
714    # build. Format is tuple of (error message, shard that failed)
715    infra_failures = []
716    links = step_result.presentation.links
717    for index, shard in enumerate(summary['shards']):
718      url = task.get_shard_view_url(index)
719      display_text = 'shard #%d' % index
720
721      if not shard or shard.get('internal_failure'):  # pragma: no cover
722        display_text = (
723          'shard #%d had an internal swarming failure' % index)
724        infra_failures.append((index, 'Internal swarming failure'))
725      elif self._is_expired(shard):
726        display_text = (
727          'shard #%d expired, not enough capacity' % index)
728        infra_failures.append((
729            index, 'There isn\'t enough capacity to run your test'))
730      elif self._is_timed_out(shard):
731        display_text = (
732          'shard #%d timed out, took too much time to complete' % index)
733      elif self._get_exit_code(shard) != '0':  # pragma: no cover
734        display_text = 'shard #%d (failed)' % index
735
736      if self.show_isolated_out_in_collect_step:
737        isolated_out = shard.get('isolated_out')
738        if isolated_out:
739          link_name = 'shard #%d isolated out' % index
740          links[link_name] = isolated_out['view_url']
741
742      if url and self.show_shards_in_collect_step:
743        links[display_text] = url
744
745    self._display_pending(summary, step_result.presentation)
746
747    if infra_failures:
748      template = 'Shard #%s failed: %s'
749
750      # Done so that raising an InfraFailure doesn't cause an error.
751      # TODO(martiniss): Remove this hack. Requires recipe engine change
752      step_result._retcode = 2
753      step_result.presentation.status = self.m.step.EXCEPTION
754      raise recipe_api.InfraFailure(
755          '\n'.join(template % f for f in infra_failures), result=step_result)
756
757  def get_collect_cmd_args(self, task):
758    """SwarmingTask -> argument list for 'swarming.py' command."""
759    args = [
760      'collect',
761      '--swarming', self.swarming_server,
762      '--decorate',
763      '--print-status-updates',
764    ]
765    if self.verbose:
766      args.append('--verbose')
767    args.extend(('--json', self.m.json.input(task.trigger_output)))
768    return args
769
770  def _gen_trigger_step_test_data(self, task):
771    """Generates an expected value of --dump-json in 'trigger' step.
772
773    Used when running recipes to generate test expectations.
774    """
775    # Suffixes of shard subtask names.
776    subtasks = []
777    if task.shards == 1:
778      subtasks = ['']
779    else:
780      subtasks = [':%d:%d' % (task.shards, i) for i in range(task.shards)]
781    return self.m.json.test_api.output({
782      'base_task_name': task.task_name,
783      'tasks': {
784        '%s%s' % (task.task_name, suffix): {
785          'task_id': '1%02d00' % i,
786          'shard_index': i,
787          'view_url': '%s/user/task/1%02d00' % (self.swarming_server, i),
788        } for i, suffix in enumerate(subtasks)
789      },
790    })
791
792
793class SwarmingTask(object):
794  """Definition of a task to run on swarming."""
795
796  def __init__(self, title, isolated_hash, ignore_task_failure, dimensions,
797               env, priority, shards, buildername, buildnumber, expiration,
798               user, io_timeout, hard_timeout, idempotent, extra_args,
799               collect_step, task_output_dir, cipd_packages=None,
800               build_properties=None, merge=None):
801    """Configuration of a swarming task.
802
803    Args:
804      title: display name of the task, hints to what task is doing. Usually
805          corresponds to a name of a test executable. Doesn't have to be unique.
806      isolated_hash: hash of isolated file that describes all files needed to
807          run the task as well as command line to launch. See 'isolate' recipe
808          module.
809      ignore_task_failure: whether to ignore the test failure of swarming
810        tasks.
811      cipd_packages: list of 3-tuples corresponding to CIPD packages needed for
812          the task: ('path', 'package_name', 'version'), defined as follows:
813              path: Path relative to the Swarming root dir in which to install
814                  the package.
815              package_name: Name of the package to install,
816                  eg. "infra/tools/authutil/${platform}"
817              version: Version of the package, either a package instance ID,
818                  ref, or tag key/value pair.
819      collect_step: callback that will be called to collect and processes
820          results of task execution, signature is collect_step(task, **kwargs).
821      dimensions: key-value mapping with swarming dimensions that specify
822          on what Swarming slaves task can run. One important dimension is 'os',
823          which defines platform flavor to run the task on. See Swarming doc.
824      env: key-value mapping with additional environment variables to add to
825          environment before launching the task executable.
826      priority: integer [0, 255] that defines how urgent the task is.
827          Lower value corresponds to higher priority. Swarming service executes
828          tasks with higher priority first.
829      shards: how many concurrent shards to run, makes sense only for
830          isolated tests based on gtest. Swarming uses GTEST_SHARD_INDEX
831          and GTEST_TOTAL_SHARDS environment variables to tell the executable
832          what shard to run.
833      buildername: buildbot builder this task was triggered from.
834      buildnumber: build number of a build this task was triggered from.
835      expiration: number of schedule until the task shouldn't even be run if it
836          hadn't started yet.
837      user: user that requested this task, if applicable.
838      io_timeout: number of seconds that the task is allowed to not emit any
839          stdout bytes, after which it is forcibly killed.
840      hard_timeout: number of seconds for which the task is allowed to run,
841          after which it is forcibly killed.
842      idempotent: True if the results from a previous task can be reused. E.g.
843          this task has no side-effects.
844      extra_args: list of command line arguments to pass to isolated tasks.
845      task_output_dir: if defined, the directory where task results are placed
846          during the collect step.
847      build_properties: An optional dict containing various build properties.
848          These are typically but not necessarily the properties emitted by
849          bot_update.
850      merge: An optional dict containing:
851          "script": path to a script to call to post process and merge the
852              collected outputs from the tasks.
853          "args": an optional list of additional arguments to pass to the
854              above script.
855    """
856    self._trigger_output = None
857    self.build_properties = build_properties
858    self.buildername = buildername
859    self.buildnumber = buildnumber
860    self.cipd_packages = cipd_packages
861    self.collect_step = collect_step
862    self.dimensions = dimensions.copy()
863    self.env = env.copy()
864    self.expiration = expiration
865    self.extra_args = tuple(extra_args or [])
866    self.hard_timeout = hard_timeout
867    self.idempotent = idempotent
868    self.ignore_task_failure = ignore_task_failure
869    self.io_timeout = io_timeout
870    self.isolated_hash = isolated_hash
871    self.merge = merge or {}
872    self.priority = priority
873    self.shards = shards
874    self.tags = set()
875    self.task_output_dir = task_output_dir
876    self.title = title
877    self.user = user
878
879  @property
880  def task_name(self):
881    """Name of this task, derived from its other properties.
882
883    The task name is purely to make sense of the task and is not used in any
884    other way.
885    """
886    out = '%s/%s/%s' % (
887        self.title, self.dimensions['os'], self.isolated_hash[:10])
888    if self.buildername:  # pragma: no cover
889      out += '/%s/%s' % (self.buildername, self.buildnumber or -1)
890    return out
891
892  @property
893  def trigger_output(self):
894    """JSON results of 'trigger' step or None if not triggered."""
895    return self._trigger_output
896
897  def get_shard_view_url(self, index):
898    """Returns URL of HTML page with shard details or None if not available.
899
900    Works only after the task has been successfully triggered.
901    """
902    if self._trigger_output and self._trigger_output.get('tasks'):
903      for shard_dict in self._trigger_output['tasks'].itervalues():
904        if shard_dict['shard_index'] == index:
905          return shard_dict['view_url']
906