• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# pylint: disable=missing-docstring
2
3"""\
4Functions to expose over the RPC interface.
5
6For all modify* and delete* functions that ask for an 'id' parameter to
7identify the object to operate on, the id may be either
8 * the database row ID
9 * the name of the object (label name, hostname, user login, etc.)
10 * a dictionary containing uniquely identifying field (this option should seldom
11   be used)
12
13When specifying foreign key fields (i.e. adding hosts to a label, or adding
14users to an ACL group), the given value may be either the database row ID or the
15name of the object.
16
17All get* functions return lists of dictionaries.  Each dictionary represents one
18object and maps field names to values.
19
20Some examples:
21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
23modify_test('sleeptest', test_type='Client', params=', seconds=60')
24delete_acl_group(1) # delete by ID
25delete_acl_group('Everyone') # delete by name
26acl_group_add_users('Everyone', ['mbligh', 'showard'])
27get_jobs(owner='showard', status='Queued')
28
29See doctests/001_rpc_test.txt for (lots) more examples.
30"""
31
32__author__ = 'showard@google.com (Steve Howard)'
33
34import ast
35import collections
36import contextlib
37import datetime
38import logging
39import os
40import sys
41import warnings
42
43import six
44from autotest_lib.client.common_lib import (control_data, error, global_config,
45                                            priorities)
46from autotest_lib.client.common_lib.cros import dev_server
47from autotest_lib.frontend.afe import control_file as control_file_lib
48from autotest_lib.frontend.afe import (model_attributes, model_logic, models,
49                                       rpc_utils)
50from autotest_lib.frontend.tko import models as tko_models
51from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
52from autotest_lib.server import frontend, utils
53from autotest_lib.server.cros import provision
54from autotest_lib.server.cros.dynamic_suite import (constants,
55                                                    control_file_getter,
56                                                    suite_common, tools)
57from autotest_lib.server.cros.dynamic_suite.suite import Suite
58from autotest_lib.server.lib import status_history
59from autotest_lib.site_utils import job_history, stable_version_utils
60from django.db import connection as db_connection
61from django.db import transaction
62from django.db.models import Count
63from django.db.utils import DatabaseError
64
65import common
66
67_CONFIG = global_config.global_config
68
69# Definition of LabHealthIndicator
70LabHealthIndicator = collections.namedtuple(
71        'LabHealthIndicator',
72        [
73                'if_lab_close',
74                'available_duts',
75                'devserver_health',
76                'upcoming_builds',
77        ]
78)
79
80RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
81        'SKYLAB', 'respect_static_labels', type=bool, default=False)
82
83RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value(
84        'SKYLAB', 'respect_static_attributes', type=bool, default=False)
85
86# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
87
88# labels
89
90def modify_label(id, **data):
91    """Modify a label.
92
93    @param id: id or name of a label. More often a label name.
94    @param data: New data for a label.
95    """
96    label_model = models.Label.smart_get(id)
97    if label_model.is_replaced_by_static():
98        raise error.UnmodifiableLabelException(
99                'Failed to delete label "%s" because it is a static label. '
100                'Use go/chromeos-skylab-inventory-tools to modify this '
101                'label.' % label_model.name)
102
103    label_model.update_object(data)
104
105    # Main forwards the RPC to shards
106    if not utils.is_shard():
107        rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
108                             id=id, **data)
109
110
111def delete_label(id):
112    """Delete a label.
113
114    @param id: id or name of a label. More often a label name.
115    """
116    label_model = models.Label.smart_get(id)
117    if label_model.is_replaced_by_static():
118        raise error.UnmodifiableLabelException(
119                'Failed to delete label "%s" because it is a static label. '
120                'Use go/chromeos-skylab-inventory-tools to modify this '
121                'label.' % label_model.name)
122
123    # Hosts that have the label to be deleted. Save this info before
124    # the label is deleted to use it later.
125    hosts = []
126    for h in label_model.host_set.all():
127        hosts.append(models.Host.smart_get(h.id))
128    label_model.delete()
129
130    # Main forwards the RPC to shards
131    if not utils.is_shard():
132        rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
133
134
135def add_label(name, ignore_exception_if_exists=False, **kwargs):
136    """Adds a new label of a given name.
137
138    @param name: label name.
139    @param ignore_exception_if_exists: If True and the exception was
140        thrown due to the duplicated label name when adding a label,
141        then suppress the exception. Default is False.
142    @param kwargs: keyword args that store more info about a label
143        other than the name.
144    @return: int/long id of a new label.
145    """
146    # models.Label.add_object() throws model_logic.ValidationError
147    # when it is given a label name that already exists.
148    # However, ValidationError can be thrown with different errors,
149    # and those errors should be thrown up to the call chain.
150    try:
151        label = models.Label.add_object(name=name, **kwargs)
152    except:
153        exc_info = sys.exc_info()
154        if ignore_exception_if_exists:
155            label = rpc_utils.get_label(name)
156            # If the exception is raised not because of duplicated
157            # "name", then raise the original exception.
158            if label is None:
159                six.reraise(exc_info[0], exc_info[1], exc_info[2])
160        else:
161            six.reraise(exc_info[0], exc_info[1], exc_info[2])
162    return label.id
163
164
165def add_label_to_hosts(id, hosts):
166    """Adds a label of the given id to the given hosts only in local DB.
167
168    @param id: id or name of a label. More often a label name.
169    @param hosts: The hostnames of hosts that need the label.
170
171    @raises models.Label.DoesNotExist: If the label with id doesn't exist.
172    """
173    label = models.Label.smart_get(id)
174    if label.is_replaced_by_static():
175        label = models.StaticLabel.smart_get(label.name)
176
177    host_objs = models.Host.smart_get_bulk(hosts)
178    if label.platform:
179        models.Host.check_no_platform(host_objs)
180    # Ensure a host has no more than one board label with it.
181    if label.name.startswith('board:'):
182        models.Host.check_board_labels_allowed(host_objs, [label.name])
183    label.host_set.add(*host_objs)
184
185
186def _create_label_everywhere(id, hosts):
187    """
188    Yet another method to create labels.
189
190    ALERT! This method should be run only on main not shards!
191    DO NOT RUN THIS ON A SHARD!!!  Deputies will hate you if you do!!!
192
193    This method exists primarily to serve label_add_hosts() and
194    host_add_labels().  Basically it pulls out the label check/add logic
195    from label_add_hosts() into this nice method that not only creates
196    the label but also tells the shards that service the hosts to also
197    create the label.
198
199    @param id: id or name of a label. More often a label name.
200    @param hosts: A list of hostnames or ids. More often hostnames.
201    """
202    try:
203        label = models.Label.smart_get(id)
204    except models.Label.DoesNotExist:
205        # This matches the type checks in smart_get, which is a hack
206        # in and off itself. The aim here is to create any non-existent
207        # label, which we cannot do if the 'id' specified isn't a label name.
208        if isinstance(id, six.string_types):
209            label = models.Label.smart_get(add_label(id))
210        else:
211            raise ValueError('Label id (%s) does not exist. Please specify '
212                             'the argument, id, as a string (label name).'
213                             % id)
214
215    # Make sure the label exists on the shard with the same id
216    # as it is on the main.
217    # It is possible that the label is already in a shard because
218    # we are adding a new label only to shards of hosts that the label
219    # is going to be attached.
220    # For example, we add a label L1 to a host in shard S1.
221    # Main and S1 will have L1 but other shards won't.
222    # Later, when we add the same label L1 to hosts in shards S1 and S2,
223    # S1 already has the label but S2 doesn't.
224    # S2 should have the new label without any problem.
225    # We ignore exception in such a case.
226    host_objs = models.Host.smart_get_bulk(hosts)
227    rpc_utils.fanout_rpc(
228            host_objs, 'add_label', include_hostnames=False,
229            name=label.name, ignore_exception_if_exists=True,
230            id=label.id, platform=label.platform)
231
232
233@rpc_utils.route_rpc_to_main
234def label_add_hosts(id, hosts):
235    """Adds a label with the given id to the given hosts.
236
237    This method should be run only on main not shards.
238    The given label will be created if it doesn't exist, provided the `id`
239    supplied is a label name not an int/long id.
240
241    @param id: id or name of a label. More often a label name.
242    @param hosts: A list of hostnames or ids. More often hostnames.
243
244    @raises ValueError: If the id specified is an int/long (label id)
245                        while the label does not exist.
246    """
247    # Create the label.
248    _create_label_everywhere(id, hosts)
249
250    # Add it to the main.
251    add_label_to_hosts(id, hosts)
252
253    # Add it to the shards.
254    host_objs = models.Host.smart_get_bulk(hosts)
255    rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
256
257
258def remove_label_from_hosts(id, hosts):
259    """Removes a label of the given id from the given hosts only in local DB.
260
261    @param id: id or name of a label.
262    @param hosts: The hostnames of hosts that need to remove the label from.
263    """
264    host_objs = models.Host.smart_get_bulk(hosts)
265    label = models.Label.smart_get(id)
266    if label.is_replaced_by_static():
267        raise error.UnmodifiableLabelException(
268                'Failed to remove label "%s" for hosts "%r" because it is a '
269                'static label. Use go/chromeos-skylab-inventory-tools to '
270                'modify this label.' % (label.name, hosts))
271
272    label.host_set.remove(*host_objs)
273
274
275@rpc_utils.route_rpc_to_main
276def label_remove_hosts(id, hosts):
277    """Removes a label of the given id from the given hosts.
278
279    This method should be run only on main not shards.
280
281    @param id: id or name of a label.
282    @param hosts: A list of hostnames or ids. More often hostnames.
283    """
284    host_objs = models.Host.smart_get_bulk(hosts)
285    remove_label_from_hosts(id, hosts)
286
287    rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
288
289
290def get_labels(exclude_filters=(), **filter_data):
291    """\
292    @param exclude_filters: A sequence of dictionaries of filters.
293
294    @returns A sequence of nested dictionaries of label information.
295    """
296    labels = models.Label.query_objects(filter_data)
297    for exclude_filter in exclude_filters:
298        labels = labels.exclude(**exclude_filter)
299
300    if not RESPECT_STATIC_LABELS:
301        return rpc_utils.prepare_rows_as_nested_dicts(labels, ())
302
303    static_labels = models.StaticLabel.query_objects(filter_data)
304    for exclude_filter in exclude_filters:
305        static_labels = static_labels.exclude(**exclude_filter)
306
307    non_static_lists = rpc_utils.prepare_rows_as_nested_dicts(labels, ())
308    static_lists = rpc_utils.prepare_rows_as_nested_dicts(static_labels, ())
309
310    label_ids = [label.id for label in labels]
311    replaced = models.ReplacedLabel.objects.filter(label__id__in=label_ids)
312    replaced_ids = {r.label_id for r in replaced}
313    replaced_label_names = {l.name for l in labels if l.id in replaced_ids}
314
315    return_lists  = []
316    for non_static_label in non_static_lists:
317        if non_static_label.get('id') not in replaced_ids:
318            return_lists.append(non_static_label)
319
320    for static_label in static_lists:
321        if static_label.get('name') in replaced_label_names:
322            return_lists.append(static_label)
323
324    return return_lists
325
326
327# hosts
328
329def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
330    if locked and not lock_reason:
331        raise model_logic.ValidationError(
332            {'locked': 'Please provide a reason for locking when adding host.'})
333
334    return models.Host.add_object(hostname=hostname, status=status,
335                                  locked=locked, lock_reason=lock_reason,
336                                  protection=protection).id
337
338
339@rpc_utils.route_rpc_to_main
340def modify_host(id, **kwargs):
341    """Modify local attributes of a host.
342
343    If this is called on the main, but the host is assigned to a shard, this
344    will call `modify_host_local` RPC to the responsible shard. This means if
345    a host is being locked using this function, this change will also propagate
346    to shards.
347    When this is called on a shard, the shard just routes the RPC to the main
348    and does nothing.
349
350    @param id: id of the host to modify.
351    @param kwargs: key=value pairs of values to set on the host.
352    """
353    rpc_utils.check_modify_host(kwargs)
354    host = models.Host.smart_get(id)
355    try:
356        rpc_utils.check_modify_host_locking(host, kwargs)
357    except model_logic.ValidationError as e:
358        if not kwargs.get('force_modify_locking', False):
359            raise
360        logging.exception('The following exception will be ignored and lock '
361                          'modification will be enforced. %s', e)
362
363    # This is required to make `lock_time` for a host be exactly same
364    # between the main and a shard.
365    if kwargs.get('locked', None) and 'lock_time' not in kwargs:
366        kwargs['lock_time'] = datetime.datetime.now()
367
368    # force_modifying_locking is not an internal field in database, remove.
369    shard_kwargs = dict(kwargs)
370    shard_kwargs.pop('force_modify_locking', None)
371    rpc_utils.fanout_rpc([host], 'modify_host_local',
372                         include_hostnames=False, id=id, **shard_kwargs)
373
374    # Update the local DB **after** RPC fanout is complete.
375    # This guarantees that the main state is only updated if the shards were
376    # correctly updated.
377    # In case the shard update fails mid-flight and the main-shard desync, we
378    # always consider the main state to be the source-of-truth, and any
379    # (automated) corrective actions will revert the (partial) shard updates.
380    host.update_object(kwargs)
381
382
383def modify_host_local(id, **kwargs):
384    """Modify host attributes in local DB.
385
386    @param id: Host id.
387    @param kwargs: key=value pairs of values to set on the host.
388    """
389    models.Host.smart_get(id).update_object(kwargs)
390
391
392@rpc_utils.route_rpc_to_main
393def modify_hosts(host_filter_data, update_data):
394    """Modify local attributes of multiple hosts.
395
396    If this is called on the main, but one of the hosts in that match the
397    filters is assigned to a shard, this will call `modify_hosts_local` RPC
398    to the responsible shard.
399    When this is called on a shard, the shard just routes the RPC to the main
400    and does nothing.
401
402    The filters are always applied on the main, not on the shards. This means
403    if the states of a host differ on the main and a shard, the state on the
404    main will be used. I.e. this means:
405    A host was synced to Shard 1. On Shard 1 the status of the host was set to
406    'Repair Failed'.
407    - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
408    update the host (both on the shard and on the main), because the state
409    of the host as the main knows it is still 'Ready'.
410    - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
411    will not update the host, because the filter doesn't apply on the main.
412
413    @param host_filter_data: Filters out which hosts to modify.
414    @param update_data: A dictionary with the changes to make to the hosts.
415    """
416    update_data = update_data.copy()
417    rpc_utils.check_modify_host(update_data)
418    hosts = models.Host.query_objects(host_filter_data)
419
420    affected_shard_hostnames = set()
421    affected_host_ids = []
422
423    # Check all hosts before changing data for exception safety.
424    for host in hosts:
425        try:
426            rpc_utils.check_modify_host_locking(host, update_data)
427        except model_logic.ValidationError as e:
428            if not update_data.get('force_modify_locking', False):
429                raise
430            logging.exception('The following exception will be ignored and '
431                              'lock modification will be enforced. %s', e)
432
433        if host.shard:
434            affected_shard_hostnames.add(host.shard.hostname)
435            affected_host_ids.append(host.id)
436
437    # This is required to make `lock_time` for a host be exactly same
438    # between the main and a shard.
439    if update_data.get('locked', None) and 'lock_time' not in update_data:
440        update_data['lock_time'] = datetime.datetime.now()
441    for host in hosts:
442        host.update_object(update_data)
443
444    update_data.pop('force_modify_locking', None)
445    # Caution: Changing the filter from the original here. See docstring.
446    rpc_utils.run_rpc_on_multiple_hostnames(
447            'modify_hosts_local', affected_shard_hostnames,
448            host_filter_data={'id__in': affected_host_ids},
449            update_data=update_data)
450
451
452def modify_hosts_local(host_filter_data, update_data):
453    """Modify attributes of hosts in local DB.
454
455    @param host_filter_data: Filters out which hosts to modify.
456    @param update_data: A dictionary with the changes to make to the hosts.
457    """
458    for host in models.Host.query_objects(host_filter_data):
459        host.update_object(update_data)
460
461
462def add_labels_to_host(id, labels):
463    """Adds labels to a given host only in local DB.
464
465    @param id: id or hostname for a host.
466    @param labels: ids or names for labels.
467    """
468    label_objs = models.Label.smart_get_bulk(labels)
469    if not RESPECT_STATIC_LABELS:
470        models.Host.smart_get(id).labels.add(*label_objs)
471    else:
472        static_labels, non_static_labels = models.Host.classify_label_objects(
473            label_objs)
474        host = models.Host.smart_get(id)
475        host.static_labels.add(*static_labels)
476        host.labels.add(*non_static_labels)
477
478
479@rpc_utils.route_rpc_to_main
480def host_add_labels(id, labels):
481    """Adds labels to a given host.
482
483    @param id: id or hostname for a host.
484    @param labels: ids or names for labels.
485
486    @raises ValidationError: If adding more than one platform/board label.
487    """
488    # Create the labels on the main/shards.
489    for label in labels:
490        _create_label_everywhere(label, [id])
491
492    label_objs = models.Label.smart_get_bulk(labels)
493
494    platforms = [label.name for label in label_objs if label.platform]
495    if len(platforms) > 1:
496        raise model_logic.ValidationError(
497            {'labels': ('Adding more than one platform: %s' %
498                        ', '.join(platforms))})
499
500    host_obj = models.Host.smart_get(id)
501    if platforms:
502        models.Host.check_no_platform([host_obj])
503    if any(label_name.startswith('board:') for label_name in labels):
504        models.Host.check_board_labels_allowed([host_obj], labels)
505    add_labels_to_host(id, labels)
506
507    rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
508                         id=id, labels=labels)
509
510
511def remove_labels_from_host(id, labels):
512    """Removes labels from a given host only in local DB.
513
514    @param id: id or hostname for a host.
515    @param labels: ids or names for labels.
516    """
517    label_objs = models.Label.smart_get_bulk(labels)
518    if not RESPECT_STATIC_LABELS:
519        models.Host.smart_get(id).labels.remove(*label_objs)
520    else:
521        static_labels, non_static_labels = models.Host.classify_label_objects(
522                label_objs)
523        host = models.Host.smart_get(id)
524        host.labels.remove(*non_static_labels)
525        if static_labels:
526            logging.info('Cannot remove labels "%r" for host "%r" due to they '
527                         'are static labels. Use '
528                         'go/chromeos-skylab-inventory-tools to modify these '
529                         'labels.', static_labels, id)
530
531
532@rpc_utils.route_rpc_to_main
533def host_remove_labels(id, labels):
534    """Removes labels from a given host.
535
536    @param id: id or hostname for a host.
537    @param labels: ids or names for labels.
538    """
539    remove_labels_from_host(id, labels)
540
541    host_obj = models.Host.smart_get(id)
542    rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
543                         id=id, labels=labels)
544
545
546def get_host_attribute(attribute, **host_filter_data):
547    """
548    @param attribute: string name of attribute
549    @param host_filter_data: filter data to apply to Hosts to choose hosts to
550                             act upon
551    """
552    hosts = rpc_utils.get_host_query((), False, True, host_filter_data)
553    hosts = list(hosts)
554    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
555                                               'attribute_list')
556    host_attr_dicts = []
557    host_objs = []
558    for host_obj in hosts:
559        for attr_obj in host_obj.attribute_list:
560            if attr_obj.attribute == attribute:
561                host_attr_dicts.append(attr_obj.get_object_dict())
562                host_objs.append(host_obj)
563
564    if RESPECT_STATIC_ATTRIBUTES:
565        for host_attr, host_obj in zip(host_attr_dicts, host_objs):
566            static_attrs = models.StaticHostAttribute.query_objects(
567                    {'host_id': host_obj.id, 'attribute': attribute})
568            if len(static_attrs) > 0:
569                host_attr['value'] = static_attrs[0].value
570
571    return rpc_utils.prepare_for_serialization(host_attr_dicts)
572
573
574@rpc_utils.route_rpc_to_main
575def set_host_attribute(attribute, value, **host_filter_data):
576    """Set an attribute on hosts.
577
578    This RPC is a shim that forwards calls to main to be handled there.
579
580    @param attribute: string name of attribute
581    @param value: string, or None to delete an attribute
582    @param host_filter_data: filter data to apply to Hosts to choose hosts to
583                             act upon
584    """
585    assert not utils.is_shard()
586    set_host_attribute_impl(attribute, value, **host_filter_data)
587
588
589def set_host_attribute_impl(attribute, value, **host_filter_data):
590    """Set an attribute on hosts.
591
592    *** DO NOT CALL THIS RPC from client code ***
593    This RPC exists for main-shard communication only.
594    Call set_host_attribute instead.
595
596    @param attribute: string name of attribute
597    @param value: string, or None to delete an attribute
598    @param host_filter_data: filter data to apply to Hosts to choose hosts to
599                             act upon
600    """
601    assert host_filter_data # disallow accidental actions on all hosts
602    hosts = models.Host.query_objects(host_filter_data)
603    models.AclGroup.check_for_acl_violation_hosts(hosts)
604    for host in hosts:
605        host.set_or_delete_attribute(attribute, value)
606
607    # Main forwards this RPC to shards.
608    if not utils.is_shard():
609        rpc_utils.fanout_rpc(hosts, 'set_host_attribute_impl', False,
610                attribute=attribute, value=value, **host_filter_data)
611
612
613@rpc_utils.forward_single_host_rpc_to_shard
614def delete_host(id):
615    models.Host.smart_get(id).delete()
616
617
618def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
619              valid_only=True, include_current_job=False, **filter_data):
620    """Get a list of dictionaries which contains the information of hosts.
621
622    @param multiple_labels: match hosts in all of the labels given.  Should
623            be a list of label names.
624    @param exclude_only_if_needed_labels: Deprecated. Raise error if it's True.
625    @param include_current_job: Set to True to include ids of currently running
626            job and special task.
627    """
628    if exclude_only_if_needed_labels:
629        raise error.RPCException('exclude_only_if_needed_labels is deprecated')
630
631    hosts = rpc_utils.get_host_query(multiple_labels,
632                                     exclude_only_if_needed_labels,
633                                     valid_only, filter_data)
634    hosts = list(hosts)
635    models.Host.objects.populate_relationships(hosts, models.Label,
636                                               'label_list')
637    models.Host.objects.populate_relationships(hosts, models.AclGroup,
638                                               'acl_list')
639    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
640                                               'attribute_list')
641    models.Host.objects.populate_relationships(hosts,
642                                               models.StaticHostAttribute,
643                                               'staticattribute_list')
644    host_dicts = []
645    for host_obj in hosts:
646        host_dict = host_obj.get_object_dict()
647        host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
648        host_dict['attributes'] = dict((attribute.attribute, attribute.value)
649                                       for attribute in host_obj.attribute_list)
650        if RESPECT_STATIC_LABELS:
651            label_list = []
652            # Only keep static labels which has a corresponding entries in
653            # afe_labels.
654            for label in host_obj.label_list:
655                if label.is_replaced_by_static():
656                    static_label = models.StaticLabel.smart_get(label.name)
657                    label_list.append(static_label)
658                else:
659                    label_list.append(label)
660
661            host_dict['labels'] = [label.name for label in label_list]
662            host_dict['platform'] = rpc_utils.find_platform(
663                    host_obj.hostname, label_list)
664        else:
665            host_dict['labels'] = [label.name for label in host_obj.label_list]
666            host_dict['platform'] = rpc_utils.find_platform(
667                    host_obj.hostname, host_obj.label_list)
668
669        if RESPECT_STATIC_ATTRIBUTES:
670            # Overwrite attribute with values in afe_static_host_attributes.
671            for attr in host_obj.staticattribute_list:
672                if attr.attribute in host_dict['attributes']:
673                    host_dict['attributes'][attr.attribute] = attr.value
674
675        if include_current_job:
676            host_dict['current_job'] = None
677            host_dict['current_special_task'] = None
678            entries = models.HostQueueEntry.objects.filter(
679                    host_id=host_dict['id'], active=True, complete=False)
680            if entries:
681                host_dict['current_job'] = (
682                        entries[0].get_object_dict()['job'])
683            tasks = models.SpecialTask.objects.filter(
684                    host_id=host_dict['id'], is_active=True, is_complete=False)
685            if tasks:
686                host_dict['current_special_task'] = (
687                        '%d-%s' % (tasks[0].get_object_dict()['id'],
688                                   tasks[0].get_object_dict()['task'].lower()))
689        host_dicts.append(host_dict)
690
691    return rpc_utils.prepare_for_serialization(host_dicts)
692
693
694def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
695                  valid_only=True, **filter_data):
696    """
697    Same parameters as get_hosts().
698
699    @returns The number of matching hosts.
700    """
701    if exclude_only_if_needed_labels:
702        raise error.RPCException('exclude_only_if_needed_labels is deprecated')
703
704    hosts = rpc_utils.get_host_query(multiple_labels,
705                                     exclude_only_if_needed_labels,
706                                     valid_only, filter_data)
707    return len(hosts)
708
709
710# tests
711
712def get_tests(**filter_data):
713    return rpc_utils.prepare_for_serialization(
714        models.Test.list_objects(filter_data))
715
716
717def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
718    """Gets the counts of all passed and failed tests from the matching jobs.
719
720    @param job_name_prefix: Name prefix of the jobs to get the summary
721           from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix
722           matching is case insensitive.
723    @param label_name: Label that must be set in the jobs, e.g.,
724            'cros-version:butterfly-release/R40-6457.21.0'.
725
726    @returns A summary of the counts of all the passed and failed tests.
727    """
728    job_ids = list(models.Job.objects.filter(
729            name__istartswith=job_name_prefix,
730            dependency_labels__name=label_name).values_list(
731                'pk', flat=True))
732    summary = {'passed': 0, 'failed': 0}
733    if not job_ids:
734        return summary
735
736    counts = (tko_models.TestView.objects.filter(
737            afe_job_id__in=job_ids).exclude(
738                test_name='SERVER_JOB').exclude(
739                    test_name__startswith='CLIENT_JOB').values(
740                        'status').annotate(
741                            count=Count('status')))
742    for status in counts:
743        if status['status'] == 'GOOD':
744            summary['passed'] += status['count']
745        else:
746            summary['failed'] += status['count']
747    return summary
748
749
750# profilers
751
752def add_profiler(name, description=None):
753    return models.Profiler.add_object(name=name, description=description).id
754
755
756def modify_profiler(id, **data):
757    models.Profiler.smart_get(id).update_object(data)
758
759
760def delete_profiler(id):
761    models.Profiler.smart_get(id).delete()
762
763
764def get_profilers(**filter_data):
765    return rpc_utils.prepare_for_serialization(
766        models.Profiler.list_objects(filter_data))
767
768
769# users
770
771def get_users(**filter_data):
772    return rpc_utils.prepare_for_serialization(
773        models.User.list_objects(filter_data))
774
775
776# acl groups
777
778def add_acl_group(name, description=None):
779    group = models.AclGroup.add_object(name=name, description=description)
780    group.users.add(models.User.current_user())
781    return group.id
782
783
784def modify_acl_group(id, **data):
785    group = models.AclGroup.smart_get(id)
786    group.check_for_acl_violation_acl_group()
787    group.update_object(data)
788    group.add_current_user_if_empty()
789
790
791def acl_group_add_users(id, users):
792    group = models.AclGroup.smart_get(id)
793    group.check_for_acl_violation_acl_group()
794    users = models.User.smart_get_bulk(users)
795    group.users.add(*users)
796
797
798def acl_group_remove_users(id, users):
799    group = models.AclGroup.smart_get(id)
800    group.check_for_acl_violation_acl_group()
801    users = models.User.smart_get_bulk(users)
802    group.users.remove(*users)
803    group.add_current_user_if_empty()
804
805
806def acl_group_add_hosts(id, hosts):
807    group = models.AclGroup.smart_get(id)
808    group.check_for_acl_violation_acl_group()
809    hosts = models.Host.smart_get_bulk(hosts)
810    group.hosts.add(*hosts)
811    group.on_host_membership_change()
812
813
814def acl_group_remove_hosts(id, hosts):
815    group = models.AclGroup.smart_get(id)
816    group.check_for_acl_violation_acl_group()
817    hosts = models.Host.smart_get_bulk(hosts)
818    group.hosts.remove(*hosts)
819    group.on_host_membership_change()
820
821
822def delete_acl_group(id):
823    models.AclGroup.smart_get(id).delete()
824
825
826def get_acl_groups(**filter_data):
827    acl_groups = models.AclGroup.list_objects(filter_data)
828    for acl_group in acl_groups:
829        acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
830        acl_group['users'] = [user.login
831                              for user in acl_group_obj.users.all()]
832        acl_group['hosts'] = [host.hostname
833                              for host in acl_group_obj.hosts.all()]
834    return rpc_utils.prepare_for_serialization(acl_groups)
835
836
837# jobs
838
839def generate_control_file(tests=(), profilers=(),
840                          client_control_file='', use_container=False,
841                          profile_only=None, db_tests=True,
842                          test_source_build=None):
843    """
844    Generates a client-side control file to run tests.
845
846    @param tests List of tests to run. See db_tests for more information.
847    @param profilers List of profilers to activate during the job.
848    @param client_control_file The contents of a client-side control file to
849        run at the end of all tests.  If this is supplied, all tests must be
850        client side.
851        TODO: in the future we should support server control files directly
852        to wrap with a kernel.  That'll require changing the parameter
853        name and adding a boolean to indicate if it is a client or server
854        control file.
855    @param use_container unused argument today.  TODO: Enable containers
856        on the host during a client side test.
857    @param profile_only A boolean that indicates what default profile_only
858        mode to use in the control file. Passing None will generate a
859        control file that does not explcitly set the default mode at all.
860    @param db_tests: if True, the test object can be found in the database
861                     backing the test model. In this case, tests is a tuple
862                     of test IDs which are used to retrieve the test objects
863                     from the database. If False, tests is a tuple of test
864                     dictionaries stored client-side in the AFE.
865    @param test_source_build: Build to be used to retrieve test code. Default
866                              to None.
867
868    @returns a dict with the following keys:
869        control_file: str, The control file text.
870        is_server: bool, is the control file a server-side control file?
871        synch_count: How many machines the job uses per autoserv execution.
872            synch_count == 1 means the job is asynchronous.
873        dependencies: A list of the names of labels on which the job depends.
874    """
875    if not tests and not client_control_file:
876        return dict(control_file='', is_server=False, synch_count=1,
877                    dependencies=[])
878
879    cf_info, test_objects, profiler_objects = (
880        rpc_utils.prepare_generate_control_file(tests, profilers,
881                                                db_tests))
882    cf_info['control_file'] = control_file_lib.generate_control(
883        tests=test_objects, profilers=profiler_objects,
884        is_server=cf_info['is_server'],
885        client_control_file=client_control_file, profile_only=profile_only,
886        test_source_build=test_source_build)
887    return cf_info
888
889
890def create_job_page_handler(name, priority, control_file, control_type,
891                            image=None, hostless=False, firmware_rw_build=None,
892                            firmware_ro_build=None, test_source_build=None,
893                            is_cloning=False, cheets_build=None, **kwargs):
894    """\
895    Create and enqueue a job.
896
897    @param name name of this job
898    @param priority Integer priority of this job.  Higher is more important.
899    @param control_file String contents of the control file.
900    @param control_type Type of control file, Client or Server.
901    @param image: ChromeOS build to be installed in the dut. Default to None.
902    @param firmware_rw_build: Firmware build to update RW firmware. Default to
903                              None, i.e., RW firmware will not be updated.
904    @param firmware_ro_build: Firmware build to update RO firmware. Default to
905                              None, i.e., RO firmware will not be updated.
906    @param test_source_build: Build to be used to retrieve test code. Default
907                              to None.
908    @param is_cloning: True if creating a cloning job.
909    @param cheets_build: ChromeOS Android build  to be installed in the dut.
910                         Default to None. Cheets build will not be updated.
911    @param kwargs extra args that will be required by create_suite_job or
912                  create_job.
913
914    @returns The created Job id number.
915    """
916    test_args = {}
917    if kwargs.get('args'):
918        # args' format is: ['disable_sysinfo=False', 'fast=True', ...]
919        args = kwargs.get('args')
920        for arg in args:
921            k, v = arg.split('=')[0], arg.split('=')[1]
922            test_args[k] = v
923
924    if is_cloning:
925        logging.info('Start to clone a new job')
926        # When cloning a job, hosts and meta_hosts should not exist together,
927        # which would cause host-scheduler to schedule two hqe jobs to one host
928        # at the same time, and crash itself. Clear meta_hosts for this case.
929        if kwargs.get('hosts') and kwargs.get('meta_hosts'):
930            kwargs['meta_hosts'] = []
931    else:
932        logging.info('Start to create a new job')
933    control_file = rpc_utils.encode_ascii(control_file)
934    if not control_file:
935        raise model_logic.ValidationError({
936                'control_file' : "Control file cannot be empty"})
937
938    if image and hostless:
939        builds = {}
940        builds[provision.CROS_VERSION_PREFIX] = image
941        if cheets_build:
942            builds[provision.CROS_ANDROID_VERSION_PREFIX] = cheets_build
943        if firmware_rw_build:
944            builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
945        if firmware_ro_build:
946            builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
947        return create_suite_job(
948                name=name, control_file=control_file, priority=priority,
949                builds=builds, test_source_build=test_source_build,
950                is_cloning=is_cloning, test_args=test_args, **kwargs)
951
952    return create_job(name, priority, control_file, control_type, image=image,
953                      hostless=hostless, test_args=test_args, **kwargs)
954
955
956@rpc_utils.route_rpc_to_main
957def create_job(
958        name,
959        priority,
960        control_file,
961        control_type,
962        hosts=(),
963        meta_hosts=(),
964        one_time_hosts=(),
965        synch_count=None,
966        is_template=False,
967        timeout=None,
968        timeout_mins=None,
969        max_runtime_mins=None,
970        run_verify=False,
971        email_list='',
972        dependencies=(),
973        reboot_before=None,
974        reboot_after=None,
975        parse_failed_repair=None,
976        hostless=False,
977        keyvals=None,
978        drone_set=None,
979        image=None,
980        parent_job_id=None,
981        test_retry=0,
982        run_reset=True,
983        require_ssp=None,
984        test_args=None,
985        **kwargs):
986    """\
987    Create and enqueue a job.
988
989    @param name name of this job
990    @param priority Integer priority of this job.  Higher is more important.
991    @param control_file String contents of the control file.
992    @param control_type Type of control file, Client or Server.
993    @param synch_count How many machines the job uses per autoserv execution.
994        synch_count == 1 means the job is asynchronous.  If an atomic group is
995        given this value is treated as a minimum.
996    @param is_template If true then create a template job.
997    @param timeout Hours after this call returns until the job times out.
998    @param timeout_mins Minutes after this call returns until the job times
999        out.
1000    @param max_runtime_mins Minutes from job starting time until job times out
1001    @param run_verify Should the host be verified before running the test?
1002    @param email_list String containing emails to mail when the job is done
1003    @param dependencies List of label names on which this job depends
1004    @param reboot_before Never, If dirty, or Always
1005    @param reboot_after Never, If all tests passed, or Always
1006    @param parse_failed_repair if true, results of failed repairs launched by
1007        this job will be parsed as part of the job.
1008    @param hostless if true, create a hostless job
1009    @param keyvals dict of keyvals to associate with the job
1010    @param hosts List of hosts to run job on.
1011    @param meta_hosts List where each entry is a label name, and for each entry
1012        one host will be chosen from that label to run the job on.
1013    @param one_time_hosts List of hosts not in the database to run the job on.
1014    @param drone_set The name of the drone set to run this test on.
1015    @param image OS image to install before running job.
1016    @param parent_job_id id of a job considered to be parent of created job.
1017    @param test_retry DEPRECATED
1018    @param run_reset Should the host be reset before running the test?
1019    @param require_ssp Set to True to require server-side packaging to run the
1020                       test. If it's set to None, drone will still try to run
1021                       the server side with server-side packaging. If the
1022                       autotest-server package doesn't exist for the build or
1023                       image is not set, drone will run the test without server-
1024                       side packaging. Default is None.
1025    @param test_args A dict of args passed to be injected into control file.
1026    @param kwargs extra keyword args. NOT USED.
1027
1028    @returns The created Job id number.
1029    """
1030    if test_args:
1031        control_file = tools.inject_vars(test_args, control_file)
1032    if image:
1033        dependencies += (provision.image_version_to_label(image),)
1034    return rpc_utils.create_job_common(
1035            name=name,
1036            priority=priority,
1037            control_type=control_type,
1038            control_file=control_file,
1039            hosts=hosts,
1040            meta_hosts=meta_hosts,
1041            one_time_hosts=one_time_hosts,
1042            synch_count=synch_count,
1043            is_template=is_template,
1044            timeout=timeout,
1045            timeout_mins=timeout_mins,
1046            max_runtime_mins=max_runtime_mins,
1047            run_verify=run_verify,
1048            email_list=email_list,
1049            dependencies=dependencies,
1050            reboot_before=reboot_before,
1051            reboot_after=reboot_after,
1052            parse_failed_repair=parse_failed_repair,
1053            hostless=hostless,
1054            keyvals=keyvals,
1055            drone_set=drone_set,
1056            parent_job_id=parent_job_id,
1057            run_reset=run_reset,
1058            require_ssp=require_ssp)
1059
1060
1061def abort_host_queue_entries(**filter_data):
1062    """\
1063    Abort a set of host queue entries.
1064
1065    @return: A list of dictionaries, each contains information
1066             about an aborted HQE.
1067    """
1068    query = models.HostQueueEntry.query_objects(filter_data)
1069
1070    # Dont allow aborts on:
1071    #   1. Jobs that have already completed (whether or not they were aborted)
1072    #   2. Jobs that we have already been aborted (but may not have completed)
1073    query = query.filter(complete=False).filter(aborted=False)
1074    models.AclGroup.check_abort_permissions(query)
1075    host_queue_entries = list(query.select_related())
1076    rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
1077
1078    models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
1079    hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
1080                 'Job name': hqe.job.name} for hqe in host_queue_entries]
1081    return hqe_info
1082
1083
1084def abort_special_tasks(**filter_data):
1085    """\
1086    Abort the special task, or tasks, specified in the filter.
1087    """
1088    query = models.SpecialTask.query_objects(filter_data)
1089    special_tasks = query.filter(is_active=True)
1090    for task in special_tasks:
1091        task.abort()
1092
1093
1094def _call_special_tasks_on_hosts(task, hosts):
1095    """\
1096    Schedules a set of hosts for a special task.
1097
1098    @returns A list of hostnames that a special task was created for.
1099    """
1100    models.AclGroup.check_for_acl_violation_hosts(hosts)
1101    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
1102    if shard_host_map and not utils.is_shard():
1103        raise ValueError('The following hosts are on shards, please '
1104                         'follow the link to the shards and create jobs '
1105                         'there instead. %s.' % shard_host_map)
1106    for host in hosts:
1107        models.SpecialTask.schedule_special_task(host, task)
1108    return list(sorted(host.hostname for host in hosts))
1109
1110
1111def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
1112    """Forward special tasks to corresponding shards.
1113
1114    For main, when special tasks are fired on hosts that are sharded,
1115    forward the RPC to corresponding shards.
1116
1117    For shard, create special task records in local DB.
1118
1119    @param task: Enum value of frontend.afe.models.SpecialTask.Task
1120    @param rpc: RPC name to forward.
1121    @param filter_data: Filter keywords to be used for DB query.
1122
1123    @return: A list of hostnames that a special task was created for.
1124    """
1125    hosts = models.Host.query_objects(filter_data)
1126    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
1127
1128    # Filter out hosts on a shard from those on the main, forward
1129    # rpcs to the shard with an additional hostname__in filter, and
1130    # create a local SpecialTask for each remaining host.
1131    if shard_host_map and not utils.is_shard():
1132        hosts = [h for h in hosts if h.shard is None]
1133        for shard, hostnames in shard_host_map.iteritems():
1134
1135            # The main client of this module is the frontend website, and
1136            # it invokes it with an 'id' or an 'id__in' filter. Regardless,
1137            # the 'hostname' filter should narrow down the list of hosts on
1138            # each shard even though we supply all the ids in filter_data.
1139            # This method uses hostname instead of id because it fits better
1140            # with the overall architecture of redirection functions in
1141            # rpc_utils.
1142            shard_filter = filter_data.copy()
1143            shard_filter['hostname__in'] = hostnames
1144            rpc_utils.run_rpc_on_multiple_hostnames(
1145                    rpc, [shard], **shard_filter)
1146
1147    # There is a race condition here if someone assigns a shard to one of these
1148    # hosts before we create the task. The host will stay on the main if:
1149    # 1. The host is not Ready
1150    # 2. The host is Ready but has a task
1151    # But if the host is Ready and doesn't have a task yet, it will get sent
1152    # to the shard as we're creating a task here.
1153
1154    # Given that we only rarely verify Ready hosts it isn't worth putting this
1155    # entire method in a transaction. The worst case scenario is that we have
1156    # a verify running on a Ready host while the shard is using it, if the
1157    # verify fails no subsequent tasks will be created against the host on the
1158    # main, and verifies are safe enough that this is OK.
1159    return _call_special_tasks_on_hosts(task, hosts)
1160
1161
1162def reverify_hosts(**filter_data):
1163    """\
1164    Schedules a set of hosts for verify.
1165
1166    @returns A list of hostnames that a verify task was created for.
1167    """
1168    return _forward_special_tasks_on_hosts(
1169            models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
1170
1171
1172def repair_hosts(**filter_data):
1173    """\
1174    Schedules a set of hosts for repair.
1175
1176    @returns A list of hostnames that a repair task was created for.
1177    """
1178    return _forward_special_tasks_on_hosts(
1179            models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
1180
1181
1182def get_jobs(not_yet_run=False, running=False, finished=False,
1183             suite=False, sub=False, standalone=False, **filter_data):
1184    """\
1185    Extra status filter args for get_jobs:
1186    -not_yet_run: Include only jobs that have not yet started running.
1187    -running: Include only jobs that have start running but for which not
1188    all hosts have completed.
1189    -finished: Include only jobs for which all hosts have completed (or
1190    aborted).
1191
1192    Extra type filter args for get_jobs:
1193    -suite: Include only jobs with child jobs.
1194    -sub: Include only jobs with a parent job.
1195    -standalone: Inlcude only jobs with no child or parent jobs.
1196    At most one of these three fields should be specified.
1197    """
1198    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1199                                                    running,
1200                                                    finished)
1201    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1202                                                                 suite,
1203                                                                 sub,
1204                                                                 standalone)
1205    job_dicts = []
1206    jobs = list(models.Job.query_objects(filter_data))
1207    models.Job.objects.populate_relationships(jobs, models.Label,
1208                                              'dependencies')
1209    models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
1210    for job in jobs:
1211        job_dict = job.get_object_dict()
1212        job_dict['dependencies'] = ','.join(label.name
1213                                            for label in job.dependencies)
1214        job_dict['keyvals'] = dict((keyval.key, keyval.value)
1215                                   for keyval in job.keyvals)
1216        job_dicts.append(job_dict)
1217    return rpc_utils.prepare_for_serialization(job_dicts)
1218
1219
1220def get_num_jobs(not_yet_run=False, running=False, finished=False,
1221                 suite=False, sub=False, standalone=False,
1222                 **filter_data):
1223    """\
1224    See get_jobs() for documentation of extra filter parameters.
1225    """
1226    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1227                                                    running,
1228                                                    finished)
1229    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1230                                                                 suite,
1231                                                                 sub,
1232                                                                 standalone)
1233    return models.Job.query_count(filter_data)
1234
1235
1236def get_jobs_summary(**filter_data):
1237    """\
1238    Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
1239
1240    'status_counts' filed is a dictionary mapping status strings to the number
1241    of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
1242
1243    'result_counts' field is piped to tko's rpc_interface and has the return
1244    format specified under get_group_counts.
1245    """
1246    jobs = get_jobs(**filter_data)
1247    ids = [job['id'] for job in jobs]
1248    all_status_counts = models.Job.objects.get_status_counts(ids)
1249    for job in jobs:
1250        job['status_counts'] = all_status_counts[job['id']]
1251        job['result_counts'] = tko_rpc_interface.get_status_counts(
1252                ['afe_job_id', 'afe_job_id'],
1253                header_groups=[['afe_job_id'], ['afe_job_id']],
1254                **{'afe_job_id': job['id']})
1255    return rpc_utils.prepare_for_serialization(jobs)
1256
1257
1258def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
1259    """\
1260    Retrieves all the information needed to clone a job.
1261    """
1262    job = models.Job.objects.get(id=id)
1263    job_info = rpc_utils.get_job_info(job,
1264                                      preserve_metahosts,
1265                                      queue_entry_filter_data)
1266
1267    host_dicts = []
1268    for host in job_info['hosts']:
1269        host_dict = get_hosts(id=host.id)[0]
1270        other_labels = host_dict['labels']
1271        if host_dict['platform']:
1272            other_labels.remove(host_dict['platform'])
1273        host_dict['other_labels'] = ', '.join(other_labels)
1274        host_dicts.append(host_dict)
1275
1276    for host in job_info['one_time_hosts']:
1277        host_dict = dict(hostname=host.hostname,
1278                         id=host.id,
1279                         platform='(one-time host)',
1280                         locked_text='')
1281        host_dicts.append(host_dict)
1282
1283    # convert keys from Label objects to strings (names of labels)
1284    meta_host_counts = dict((meta_host.name, count) for meta_host, count
1285                            in job_info['meta_host_counts'].iteritems())
1286
1287    info = dict(job=job.get_object_dict(),
1288                meta_host_counts=meta_host_counts,
1289                hosts=host_dicts)
1290    info['job']['dependencies'] = job_info['dependencies']
1291    info['hostless'] = job_info['hostless']
1292    info['drone_set'] = job.drone_set and job.drone_set.name
1293
1294    image = _get_image_for_job(job, job_info['hostless'])
1295    if image:
1296        info['job']['image'] = image
1297
1298    return rpc_utils.prepare_for_serialization(info)
1299
1300
1301def _get_image_for_job(job, hostless):
1302    """Gets the image used for a job.
1303
1304    Gets the image used for an AFE job from the job's keyvals 'build' or
1305    'builds'. If that fails, and the job is a hostless job, tries to
1306    get the image from its control file attributes 'build' or 'builds'.
1307
1308    TODO(ntang): Needs to handle FAFT with two builds for ro/rw.
1309
1310    @param job      An AFE job object.
1311    @param hostless Boolean indicating whether the job is hostless.
1312
1313    @returns The image build used for the job.
1314    """
1315    keyvals = job.keyval_dict()
1316    image = keyvals.get('build')
1317    if not image:
1318        value = keyvals.get('builds')
1319        builds = None
1320        if isinstance(value, dict):
1321            builds = value
1322        elif isinstance(value, six.string_types):
1323            builds = ast.literal_eval(value)
1324        if builds:
1325            image = builds.get('cros-version')
1326    if not image and hostless and job.control_file:
1327        try:
1328            control_obj = control_data.parse_control_string(
1329                    job.control_file)
1330            if hasattr(control_obj, 'build'):
1331                image = getattr(control_obj, 'build')
1332            if not image and hasattr(control_obj, 'builds'):
1333                builds = getattr(control_obj, 'builds')
1334                image = builds.get('cros-version')
1335        except:
1336            logging.warning('Failed to parse control file for job: %s',
1337                            job.name)
1338    return image
1339
1340
1341def get_host_queue_entries_by_insert_time(
1342    insert_time_after=None, insert_time_before=None, **filter_data):
1343    """Like get_host_queue_entries, but using the insert index table.
1344
1345    @param insert_time_after: A lower bound on insert_time
1346    @param insert_time_before: An upper bound on insert_time
1347    @returns A sequence of nested dictionaries of host and job information.
1348    """
1349    assert insert_time_after is not None or insert_time_before is not None, \
1350      ('Caller to get_host_queue_entries_by_insert_time must provide either'
1351       ' insert_time_after or insert_time_before.')
1352    # Get insert bounds on the index of the host queue entries.
1353    if insert_time_after:
1354        query = models.HostQueueEntryStartTimes.objects.filter(
1355            # Note: '-insert_time' means descending. We want the largest
1356            # insert time smaller than the insert time.
1357            insert_time__lte=insert_time_after).order_by('-insert_time')
1358        try:
1359            constraint = query[0].highest_hqe_id
1360            if 'id__gte' in filter_data:
1361                constraint = max(constraint, filter_data['id__gte'])
1362            filter_data['id__gte'] = constraint
1363        except IndexError:
1364            pass
1365
1366    # Get end bounds.
1367    if insert_time_before:
1368        query = models.HostQueueEntryStartTimes.objects.filter(
1369            insert_time__gte=insert_time_before).order_by('insert_time')
1370        try:
1371            constraint = query[0].highest_hqe_id
1372            if 'id__lte' in filter_data:
1373                constraint = min(constraint, filter_data['id__lte'])
1374            filter_data['id__lte'] = constraint
1375        except IndexError:
1376            pass
1377
1378    return rpc_utils.prepare_rows_as_nested_dicts(
1379            models.HostQueueEntry.query_objects(filter_data),
1380            ('host', 'job'))
1381
1382
1383def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
1384    """\
1385    @returns A sequence of nested dictionaries of host and job information.
1386    """
1387    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1388                                                   'started_on__lte',
1389                                                   start_time,
1390                                                   end_time,
1391                                                   **filter_data)
1392    return rpc_utils.prepare_rows_as_nested_dicts(
1393            models.HostQueueEntry.query_objects(filter_data),
1394            ('host', 'job'))
1395
1396
1397def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
1398    """\
1399    Get the number of host queue entries associated with this job.
1400    """
1401    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1402                                                   'started_on__lte',
1403                                                   start_time,
1404                                                   end_time,
1405                                                   **filter_data)
1406    return models.HostQueueEntry.query_count(filter_data)
1407
1408
1409def get_hqe_percentage_complete(**filter_data):
1410    """
1411    Computes the fraction of host queue entries matching the given filter data
1412    that are complete.
1413    """
1414    query = models.HostQueueEntry.query_objects(filter_data)
1415    complete_count = query.filter(complete=True).count()
1416    total_count = query.count()
1417    if total_count == 0:
1418        return 1
1419    return float(complete_count) / total_count
1420
1421
1422# special tasks
1423
1424def get_special_tasks(**filter_data):
1425    """Get special task entries from the local database.
1426
1427    Query the special tasks table for tasks matching the given
1428    `filter_data`, and return a list of the results.  No attempt is
1429    made to forward the call to shards; the buck will stop here.
1430    The caller is expected to know the target shard for such reasons
1431    as:
1432      * The caller is a service (such as gs_offloader) configured
1433        to operate on behalf of one specific shard, and no other.
1434      * The caller has a host as a parameter, and knows that this is
1435        the shard assigned to that host.
1436
1437    @param filter_data  Filter keywords to pass to the underlying
1438                        database query.
1439
1440    """
1441    return rpc_utils.prepare_rows_as_nested_dicts(
1442            models.SpecialTask.query_objects(filter_data),
1443            ('host', 'queue_entry'))
1444
1445
1446def get_host_special_tasks(host_id, **filter_data):
1447    """Get special task entries for a given host.
1448
1449    Query the special tasks table for tasks that ran on the host
1450    given by `host_id` and matching the given `filter_data`.
1451    Return a list of the results.  If the host is assigned to a
1452    shard, forward this call to that shard.
1453
1454    @param host_id      Id in the database of the target host.
1455    @param filter_data  Filter keywords to pass to the underlying
1456                        database query.
1457
1458    """
1459    # Retrieve host data even if the host is in an invalid state.
1460    host = models.Host.smart_get(host_id, False)
1461    if not host.shard:
1462        return get_special_tasks(host_id=host_id, **filter_data)
1463    else:
1464        # The return values from AFE methods are post-processed
1465        # objects that aren't JSON-serializable.  So, we have to
1466        # call AFE.run() to get the raw, serializable output from
1467        # the shard.
1468        shard_afe = frontend.AFE(server=host.shard.hostname)
1469        return shard_afe.run('get_special_tasks',
1470                             host_id=host_id, **filter_data)
1471
1472
1473def get_num_special_tasks(**kwargs):
1474    """Get the number of special task entries from the local database.
1475
1476    Query the special tasks table for tasks matching the given 'kwargs',
1477    and return the number of the results. No attempt is made to forward
1478    the call to shards; the buck will stop here.
1479
1480    @param kwargs    Filter keywords to pass to the underlying database query.
1481
1482    """
1483    return models.SpecialTask.query_count(kwargs)
1484
1485
1486def get_host_num_special_tasks(host, **kwargs):
1487    """Get special task entries for a given host.
1488
1489    Query the special tasks table for tasks that ran on the host
1490    given by 'host' and matching the given 'kwargs'.
1491    Return a list of the results.  If the host is assigned to a
1492    shard, forward this call to that shard.
1493
1494    @param host      id or name of a host. More often a hostname.
1495    @param kwargs    Filter keywords to pass to the underlying database query.
1496
1497    """
1498    # Retrieve host data even if the host is in an invalid state.
1499    host_model = models.Host.smart_get(host, False)
1500    if not host_model.shard:
1501        return get_num_special_tasks(host=host, **kwargs)
1502    else:
1503        shard_afe = frontend.AFE(server=host_model.shard.hostname)
1504        return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
1505
1506
1507def get_status_task(host_id, end_time):
1508    """Get the "status task" for a host from the local shard.
1509
1510    Returns a single special task representing the given host's
1511    "status task".  The status task is a completed special task that
1512    identifies whether the corresponding host was working or broken
1513    when it completed.  A successful task indicates a working host;
1514    a failed task indicates broken.
1515
1516    This call will not be forward to a shard; the receiving server
1517    must be the shard that owns the host.
1518
1519    @param host_id      Id in the database of the target host.
1520    @param end_time     Time reference for the host's status.
1521
1522    @return A single task; its status (successful or not)
1523            corresponds to the status of the host (working or
1524            broken) at the given time.  If no task is found, return
1525            `None`.
1526
1527    """
1528    tasklist = rpc_utils.prepare_rows_as_nested_dicts(
1529            status_history.get_status_task(host_id, end_time),
1530            ('host', 'queue_entry'))
1531    return tasklist[0] if tasklist else None
1532
1533
1534def get_host_status_task(host_id, end_time):
1535    """Get the "status task" for a host from its owning shard.
1536
1537    Finds the given host's owning shard, and forwards to it a call
1538    to `get_status_task()` (see above).
1539
1540    @param host_id      Id in the database of the target host.
1541    @param end_time     Time reference for the host's status.
1542
1543    @return A single task; its status (successful or not)
1544            corresponds to the status of the host (working or
1545            broken) at the given time.  If no task is found, return
1546            `None`.
1547
1548    """
1549    host = models.Host.smart_get(host_id)
1550    if not host.shard:
1551        return get_status_task(host_id, end_time)
1552    else:
1553        # The return values from AFE methods are post-processed
1554        # objects that aren't JSON-serializable.  So, we have to
1555        # call AFE.run() to get the raw, serializable output from
1556        # the shard.
1557        shard_afe = frontend.AFE(server=host.shard.hostname)
1558        return shard_afe.run('get_status_task',
1559                             host_id=host_id, end_time=end_time)
1560
1561
1562def get_host_diagnosis_interval(host_id, end_time, success):
1563    """Find a "diagnosis interval" for a given host.
1564
1565    A "diagnosis interval" identifies a start and end time where
1566    the host went from "working" to "broken", or vice versa.  The
1567    interval's starting time is the starting time of the last status
1568    task with the old status; the end time is the finish time of the
1569    first status task with the new status.
1570
1571    This routine finds the most recent diagnosis interval for the
1572    given host prior to `end_time`, with a starting status matching
1573    `success`.  If `success` is true, the interval will start with a
1574    successful status task; if false the interval will start with a
1575    failed status task.
1576
1577    @param host_id      Id in the database of the target host.
1578    @param end_time     Time reference for the diagnosis interval.
1579    @param success      Whether the diagnosis interval should start
1580                        with a successful or failed status task.
1581
1582    @return A list of two strings.  The first is the timestamp for
1583            the beginning of the interval; the second is the
1584            timestamp for the end.  If the host has never changed
1585            state, the list is empty.
1586
1587    """
1588    host = models.Host.smart_get(host_id)
1589    if not host.shard or utils.is_shard():
1590        return status_history.get_diagnosis_interval(
1591                host_id, end_time, success)
1592    else:
1593        shard_afe = frontend.AFE(server=host.shard.hostname)
1594        return shard_afe.get_host_diagnosis_interval(
1595                host_id, end_time, success)
1596
1597
1598# support for host detail view
1599
1600def get_host_queue_entries_and_special_tasks(host, query_start=None,
1601                                             query_limit=None, start_time=None,
1602                                             end_time=None):
1603    """
1604    @returns an interleaved list of HostQueueEntries and SpecialTasks,
1605            in approximate run order.  each dict contains keys for type, host,
1606            job, status, started_on, execution_path, and ID.
1607    """
1608    total_limit = None
1609    if query_limit is not None:
1610        total_limit = query_start + query_limit
1611    filter_data_common = {'host': host,
1612                          'query_limit': total_limit,
1613                          'sort_by': ['-id']}
1614
1615    filter_data_special_tasks = rpc_utils.inject_times_to_filter(
1616            'time_started__gte', 'time_started__lte', start_time, end_time,
1617            **filter_data_common)
1618
1619    queue_entries = get_host_queue_entries(
1620            start_time, end_time, **filter_data_common)
1621    special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
1622
1623    interleaved_entries = rpc_utils.interleave_entries(queue_entries,
1624                                                       special_tasks)
1625    if query_start is not None:
1626        interleaved_entries = interleaved_entries[query_start:]
1627    if query_limit is not None:
1628        interleaved_entries = interleaved_entries[:query_limit]
1629    return rpc_utils.prepare_host_queue_entries_and_special_tasks(
1630            interleaved_entries, queue_entries)
1631
1632
1633def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
1634                                                 end_time=None):
1635    filter_data_common = {'host': host}
1636
1637    filter_data_queue_entries, filter_data_special_tasks = (
1638            rpc_utils.inject_times_to_hqe_special_tasks_filters(
1639                    filter_data_common, start_time, end_time))
1640
1641    return (models.HostQueueEntry.query_count(filter_data_queue_entries)
1642            + get_host_num_special_tasks(**filter_data_special_tasks))
1643
1644
1645# other
1646
1647def echo(data=""):
1648    """\
1649    Returns a passed in string. For doing a basic test to see if RPC calls
1650    can successfully be made.
1651    """
1652    return data
1653
1654
1655def get_motd():
1656    """\
1657    Returns the message of the day as a string.
1658    """
1659    return rpc_utils.get_motd()
1660
1661
1662def get_static_data():
1663    """\
1664    Returns a dictionary containing a bunch of data that shouldn't change
1665    often and is otherwise inaccessible.  This includes:
1666
1667    priorities: List of job priority choices.
1668    default_priority: Default priority value for new jobs.
1669    users: Sorted list of all users.
1670    labels: Sorted list of labels not start with 'cros-version' and
1671            'fw-version'.
1672    tests: Sorted list of all tests.
1673    profilers: Sorted list of all profilers.
1674    current_user: Logged-in username.
1675    host_statuses: Sorted list of possible Host statuses.
1676    job_statuses: Sorted list of possible HostQueueEntry statuses.
1677    job_timeout_default: The default job timeout length in minutes.
1678    parse_failed_repair_default: Default value for the parse_failed_repair job
1679            option.
1680    reboot_before_options: A list of valid RebootBefore string enums.
1681    reboot_after_options: A list of valid RebootAfter string enums.
1682    motd: Server's message of the day.
1683    status_dictionary: A mapping from one word job status names to a more
1684            informative description.
1685    """
1686
1687    default_drone_set_name = models.DroneSet.default_drone_set_name()
1688    drone_sets = ([default_drone_set_name] +
1689                  sorted(drone_set.name for drone_set in
1690                         models.DroneSet.objects.exclude(
1691                                 name=default_drone_set_name)))
1692
1693    result = {}
1694    result['priorities'] = priorities.Priority.choices()
1695    result['default_priority'] = 'Default'
1696    result['max_schedulable_priority'] = priorities.Priority.DEFAULT
1697    result['users'] = get_users(sort_by=['login'])
1698
1699    label_exclude_filters = [{'name__startswith': 'cros-version'},
1700                             {'name__startswith': 'fw-version'},
1701                             {'name__startswith': 'fwrw-version'},
1702                             {'name__startswith': 'fwro-version'}]
1703    result['labels'] = get_labels(
1704        label_exclude_filters,
1705        sort_by=['-platform', 'name'])
1706
1707    result['tests'] = get_tests(sort_by=['name'])
1708    result['profilers'] = get_profilers(sort_by=['name'])
1709    result['current_user'] = rpc_utils.prepare_for_serialization(
1710        models.User.current_user().get_object_dict())
1711    result['host_statuses'] = sorted(models.Host.Status.names)
1712    result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
1713    result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
1714    result['job_max_runtime_mins_default'] = (
1715        models.Job.DEFAULT_MAX_RUNTIME_MINS)
1716    result['parse_failed_repair_default'] = bool(
1717        models.Job.DEFAULT_PARSE_FAILED_REPAIR)
1718    result['reboot_before_options'] = model_attributes.RebootBefore.names
1719    result['reboot_after_options'] = model_attributes.RebootAfter.names
1720    result['motd'] = rpc_utils.get_motd()
1721    result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
1722    result['drone_sets'] = drone_sets
1723
1724    result['status_dictionary'] = {"Aborted": "Aborted",
1725                                   "Verifying": "Verifying Host",
1726                                   "Provisioning": "Provisioning Host",
1727                                   "Pending": "Waiting on other hosts",
1728                                   "Running": "Running autoserv",
1729                                   "Completed": "Autoserv completed",
1730                                   "Failed": "Failed to complete",
1731                                   "Queued": "Queued",
1732                                   "Starting": "Next in host's queue",
1733                                   "Stopped": "Other host(s) failed verify",
1734                                   "Parsing": "Awaiting parse of final results",
1735                                   "Gathering": "Gathering log files",
1736                                   "Waiting": "Waiting for scheduler action",
1737                                   "Archiving": "Archiving results",
1738                                   "Resetting": "Resetting hosts"}
1739
1740    result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
1741    result['stainless_url'] = rpc_utils.get_stainless_url()
1742    result['is_moblab'] = bool(utils.is_moblab())
1743
1744    return result
1745
1746
1747def get_server_time():
1748    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
1749
1750
1751def ping_db():
1752    """Simple connection test to db"""
1753    try:
1754        db_connection.cursor()
1755    except DatabaseError:
1756        return [False]
1757    return [True]
1758
1759
1760def get_hosts_by_attribute(attribute, value):
1761    """
1762    Get the list of valid hosts that share the same host attribute value.
1763
1764    @param attribute: String of the host attribute to check.
1765    @param value: String of the value that is shared between hosts.
1766
1767    @returns List of hostnames that all have the same host attribute and
1768             value.
1769    """
1770    rows = models.HostAttribute.query_objects({'attribute': attribute,
1771                                               'value': value})
1772    if RESPECT_STATIC_ATTRIBUTES:
1773        returned_hosts = set()
1774        # Add hosts:
1775        #     * Non-valid
1776        #     * Exist in afe_host_attribute with given attribute.
1777        #     * Don't exist in afe_static_host_attribute OR exist in
1778        #       afe_static_host_attribute with the same given value.
1779        for row in rows:
1780            if row.host.invalid != 0:
1781                continue
1782
1783            static_hosts = models.StaticHostAttribute.query_objects(
1784                {'host_id': row.host.id, 'attribute': attribute})
1785            values = [static_host.value for static_host in static_hosts]
1786            if len(values) == 0 or values[0] == value:
1787                returned_hosts.add(row.host.hostname)
1788
1789        # Add hosts:
1790        #     * Non-valid
1791        #     * Exist in afe_static_host_attribute with given attribute
1792        #       and value
1793        #     * No need to check whether each static attribute has its
1794        #       corresponding entry in afe_host_attribute since it is ensured
1795        #       in inventory sync.
1796        static_rows = models.StaticHostAttribute.query_objects(
1797                {'attribute': attribute, 'value': value})
1798        for row in static_rows:
1799            if row.host.invalid != 0:
1800                continue
1801
1802            returned_hosts.add(row.host.hostname)
1803
1804        return list(returned_hosts)
1805    else:
1806        return [row.host.hostname for row in rows if row.host.invalid == 0]
1807
1808
1809def _get_control_file_by_suite(suite_name):
1810    """Get control file contents by suite name.
1811
1812    @param suite_name: Suite name as string.
1813    @returns: Control file contents as string.
1814    """
1815    getter = control_file_getter.FileSystemGetter(
1816            [_CONFIG.get_config_value('SCHEDULER',
1817                                      'drone_installation_directory')])
1818    return getter.get_control_file_contents_by_name(suite_name)
1819
1820
1821@rpc_utils.route_rpc_to_main
1822def create_suite_job(
1823        name='',
1824        board='',
1825        pool='',
1826        child_dependencies=(),
1827        control_file='',
1828        check_hosts=True,
1829        num=None,
1830        file_bugs=False,
1831        timeout=24,
1832        timeout_mins=None,
1833        priority=priorities.Priority.DEFAULT,
1834        suite_args=None,
1835        wait_for_results=True,
1836        job_retry=False,
1837        max_retries=None,
1838        max_runtime_mins=None,
1839        suite_min_duts=0,
1840        offload_failures_only=False,
1841        builds=None,
1842        test_source_build=None,
1843        run_prod_code=False,
1844        delay_minutes=0,
1845        is_cloning=False,
1846        job_keyvals=None,
1847        test_args=None,
1848        **kwargs):
1849    """
1850    Create a job to run a test suite on the given device with the given image.
1851
1852    When the timeout specified in the control file is reached, the
1853    job is guaranteed to have completed and results will be available.
1854
1855    @param name: The test name if control_file is supplied, otherwise the name
1856                 of the test suite to run, e.g. 'bvt'.
1857    @param board: the kind of device to run the tests on.
1858    @param builds: the builds to install e.g.
1859                   {'cros-version:': 'x86-alex-release/R18-1655.0.0',
1860                    'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
1861                    'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
1862                   If builds is given a value, it overrides argument build.
1863    @param test_source_build: Build that contains the server-side test code.
1864    @param pool: Specify the pool of machines to use for scheduling
1865            purposes.
1866    @param child_dependencies: (optional) list of additional dependency labels
1867            (strings) that will be added as dependency labels to child jobs.
1868    @param control_file: the control file of the job.
1869    @param check_hosts: require appropriate live hosts to exist in the lab.
1870    @param num: Specify the number of machines to schedule across (integer).
1871                Leave unspecified or use None to use default sharding factor.
1872    @param file_bugs: File a bug on each test failure in this suite.
1873    @param timeout: The max lifetime of this suite, in hours.
1874    @param timeout_mins: The max lifetime of this suite, in minutes. Takes
1875                         priority over timeout.
1876    @param priority: Integer denoting priority. Higher is more important.
1877    @param suite_args: Optional arguments which will be parsed by the suite
1878                       control file. Used by control.test_that_wrapper to
1879                       determine which tests to run.
1880    @param wait_for_results: Set to False to run the suite job without waiting
1881                             for test jobs to finish. Default is True.
1882    @param job_retry: Set to True to enable job-level retry. Default is False.
1883    @param max_retries: Integer, maximum job retries allowed at suite level.
1884                        None for no max.
1885    @param max_runtime_mins: Maximum amount of time a job can be running in
1886                             minutes.
1887    @param suite_min_duts: Integer. Scheduler will prioritize getting the
1888                           minimum number of machines for the suite when it is
1889                           competing with another suite that has a higher
1890                           priority but already got minimum machines it needs.
1891    @param offload_failures_only: Only enable gs_offloading for failed jobs.
1892    @param run_prod_code: If True, the suite will run the test code that
1893                          lives in prod aka the test code currently on the
1894                          lab servers. If False, the control files and test
1895                          code for this suite run will be retrieved from the
1896                          build artifacts.
1897    @param delay_minutes: Delay the creation of test jobs for a given number of
1898                          minutes.
1899    @param is_cloning: True if creating a cloning job.
1900    @param job_keyvals: A dict of job keyvals to be inject to control file.
1901    @param test_args: A dict of args passed all the way to each individual test
1902                      that will be actually run.
1903    @param kwargs: extra keyword args. NOT USED.
1904
1905    @raises ControlFileNotFound: if a unique suite control file doesn't exist.
1906    @raises NoControlFileList: if we can't list the control files at all.
1907    @raises StageControlFileFailure: If the dev server throws 500 while
1908                                     staging test_suites.
1909    @raises ControlFileEmpty: if the control file exists on the server, but
1910                              can't be read.
1911
1912    @return: the job ID of the suite; -1 on error.
1913    """
1914    if num is not None:
1915        warnings.warn('num is deprecated for create_suite_job')
1916    del num
1917
1918    if builds is None:
1919        builds = {}
1920
1921    # Default test source build to CrOS build if it's not specified and
1922    # run_prod_code is set to False.
1923    if not run_prod_code:
1924        test_source_build = Suite.get_test_source_build(
1925                builds, test_source_build=test_source_build)
1926
1927    sample_dut = rpc_utils.get_sample_dut(board, pool)
1928
1929    suite_name = suite_common.canonicalize_suite_name(name)
1930    if run_prod_code:
1931        ds = dev_server.resolve(test_source_build, hostname=sample_dut)
1932        keyvals = {}
1933    else:
1934        ds, keyvals = suite_common.stage_build_artifacts(
1935                test_source_build, hostname=sample_dut)
1936    keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
1937
1938    # Do not change this naming convention without updating
1939    # site_utils.parse_job_name.
1940    if run_prod_code:
1941        # If run_prod_code is True, test_source_build is not set, use the
1942        # first build in the builds list for the sutie job name.
1943        name = '%s-%s' % (builds.values()[0], suite_name)
1944    else:
1945        name = '%s-%s' % (test_source_build, suite_name)
1946
1947    timeout_mins = timeout_mins or timeout * 60
1948    max_runtime_mins = max_runtime_mins or timeout * 60
1949
1950    if not board:
1951        board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
1952
1953    if run_prod_code:
1954        control_file = _get_control_file_by_suite(suite_name)
1955
1956    if not control_file:
1957        # No control file was supplied so look it up from the build artifacts.
1958        control_file = suite_common.get_control_file_by_build(
1959                test_source_build, ds, suite_name)
1960
1961    # Prepend builds and board to the control file.
1962    if is_cloning:
1963        control_file = tools.remove_injection(control_file)
1964
1965    if suite_args is None:
1966        suite_args = dict()
1967
1968    inject_dict = {
1969        'board': board,
1970        # `build` is needed for suites like AU to stage image inside suite
1971        # control file.
1972        'build': test_source_build,
1973        'builds': builds,
1974        'check_hosts': check_hosts,
1975        'pool': pool,
1976        'child_dependencies': child_dependencies,
1977        'file_bugs': file_bugs,
1978        'timeout': timeout,
1979        'timeout_mins': timeout_mins,
1980        'devserver_url': ds.url(),
1981        'priority': priority,
1982        'wait_for_results': wait_for_results,
1983        'job_retry': job_retry,
1984        'max_retries': max_retries,
1985        'max_runtime_mins': max_runtime_mins,
1986        'offload_failures_only': offload_failures_only,
1987        'test_source_build': test_source_build,
1988        'run_prod_code': run_prod_code,
1989        'delay_minutes': delay_minutes,
1990        'job_keyvals': job_keyvals,
1991        'test_args': test_args,
1992    }
1993    inject_dict.update(suite_args)
1994    control_file = tools.inject_vars(inject_dict, control_file)
1995
1996    return rpc_utils.create_job_common(name,
1997                                       priority=priority,
1998                                       timeout_mins=timeout_mins,
1999                                       max_runtime_mins=max_runtime_mins,
2000                                       control_type='Server',
2001                                       control_file=control_file,
2002                                       hostless=True,
2003                                       keyvals=keyvals)
2004
2005
2006def get_job_history(**filter_data):
2007    """Get history of the job, including the special tasks executed for the job
2008
2009    @param filter_data: filter for the call, should at least include
2010                        {'job_id': [job id]}
2011    @returns: JSON string of the job's history, including the information such
2012              as the hosts run the job and the special tasks executed before
2013              and after the job.
2014    """
2015    job_id = filter_data['job_id']
2016    job_info = job_history.get_job_info(job_id)
2017    return rpc_utils.prepare_for_serialization(job_info.get_history())
2018
2019
2020def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
2021                    known_host_ids=(), known_host_statuses=()):
2022    """Receive updates for job statuses from shards and assign hosts and jobs.
2023
2024    @param shard_hostname: Hostname of the calling shard
2025    @param jobs: Jobs in serialized form that should be updated with newer
2026                 status from a shard.
2027    @param hqes: Hostqueueentries in serialized form that should be updated with
2028                 newer status from a shard. Note that for every hostqueueentry
2029                 the corresponding job must be in jobs.
2030    @param known_job_ids: List of ids of jobs the shard already has.
2031    @param known_host_ids: List of ids of hosts the shard already has.
2032    @param known_host_statuses: List of statuses of hosts the shard already has.
2033
2034    @returns: Serialized representations of hosts, jobs, suite job keyvals
2035              and their dependencies to be inserted into a shard's database.
2036    """
2037    # The following alternatives to sending host and job ids in every heartbeat
2038    # have been considered:
2039    # 1. Sending the highest known job and host ids. This would work for jobs:
2040    #    Newer jobs always have larger ids. Also, if a job is not assigned to a
2041    #    particular shard during a heartbeat, it never will be assigned to this
2042    #    shard later.
2043    #    This is not true for hosts though: A host that is leased won't be sent
2044    #    to the shard now, but might be sent in a future heartbeat. This means
2045    #    sometimes hosts should be transfered that have a lower id than the
2046    #    maximum host id the shard knows.
2047    # 2. Send the number of jobs/hosts the shard knows to the main in each
2048    #    heartbeat. Compare these to the number of records that already have
2049    #    the shard_id set to this shard. In the normal case, they should match.
2050    #    In case they don't, resend all entities of that type.
2051    #    This would work well for hosts, because there aren't that many.
2052    #    Resending all jobs is quite a big overhead though.
2053    #    Also, this approach might run into edge cases when entities are
2054    #    ever deleted.
2055    # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
2056    #    Using two different approaches isn't consistent and might cause
2057    #    confusion. Also the issues with the case of deletions might still
2058    #    occur.
2059    #
2060    # The overhead of sending all job and host ids in every heartbeat is low:
2061    # At peaks one board has about 1200 created but unfinished jobs.
2062    # See the numbers here: http://goo.gl/gQCGWH
2063    # Assuming that job id's have 6 digits and that json serialization takes a
2064    # comma and a space as overhead, the traffic per id sent is about 8 bytes.
2065    # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
2066    # A NOT IN query with 5000 ids took about 30ms in tests made.
2067    # These numbers seem low enough to outweigh the disadvantages of the
2068    # solutions described above.
2069    shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
2070    rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
2071    assert len(known_host_ids) == len(known_host_statuses)
2072    for i in range(len(known_host_ids)):
2073        host_model = models.Host.objects.get(pk=known_host_ids[i])
2074        if host_model.status != known_host_statuses[i]:
2075            host_model.status = known_host_statuses[i]
2076            host_model.save()
2077
2078    hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard(
2079            shard_obj, known_job_ids=known_job_ids,
2080            known_host_ids=known_host_ids)
2081    return {
2082        'hosts': [host.serialize() for host in hosts],
2083        'jobs': [job.serialize() for job in jobs],
2084        'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
2085        'incorrect_host_ids': [int(i) for i in inc_ids],
2086    }
2087
2088
2089def get_shards(**filter_data):
2090    """Return a list of all shards.
2091
2092    @returns A sequence of nested dictionaries of shard information.
2093    """
2094    shards = models.Shard.query_objects(filter_data)
2095    serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
2096    for serialized, shard in zip(serialized_shards, shards):
2097        serialized['labels'] = [label.name for label in shard.labels.all()]
2098
2099    return serialized_shards
2100
2101
2102def _assign_board_to_shard_precheck(labels):
2103    """Verify whether board labels are valid to be added to a given shard.
2104
2105    First check whether board label is in correct format. Second, check whether
2106    the board label exist. Third, check whether the board has already been
2107    assigned to shard.
2108
2109    @param labels: Board labels separated by comma.
2110
2111    @raises error.RPCException: If label provided doesn't start with `board:`
2112            or board has been added to shard already.
2113    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2114
2115    @returns: A list of label models that ready to be added to shard.
2116    """
2117    if not labels:
2118        # allow creation of label-less shards (labels='' would otherwise fail the
2119        # checks below)
2120        return []
2121    labels = labels.split(',')
2122    label_models = []
2123    for label in labels:
2124        # Check whether the board label is in correct format.
2125        if not label.startswith('board:'):
2126            raise error.RPCException('Sharding only supports `board:.*` label.')
2127        # Check whether the board label exist. If not, exception will be thrown
2128        # by smart_get function.
2129        label = models.Label.smart_get(label)
2130        # Check whether the board has been sharded already
2131        try:
2132            shard = models.Shard.objects.get(labels=label)
2133            raise error.RPCException(
2134                    '%s is already on shard %s' % (label, shard.hostname))
2135        except models.Shard.DoesNotExist:
2136            # board is not on any shard, so it's valid.
2137            label_models.append(label)
2138    return label_models
2139
2140
2141def add_shard(hostname, labels):
2142    """Add a shard and start running jobs on it.
2143
2144    @param hostname: The hostname of the shard to be added; needs to be unique.
2145    @param labels: Board labels separated by comma. Jobs of one of the labels
2146                   will be assigned to the shard.
2147
2148    @raises error.RPCException: If label provided doesn't start with `board:` or
2149            board has been added to shard already.
2150    @raises model_logic.ValidationError: If a shard with the given hostname
2151            already exist.
2152    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2153
2154    @returns: The id of the added shard.
2155    """
2156    labels = _assign_board_to_shard_precheck(labels)
2157    shard = models.Shard.add_object(hostname=hostname)
2158    for label in labels:
2159        shard.labels.add(label)
2160    return shard.id
2161
2162
2163def add_board_to_shard(hostname, labels):
2164    """Add boards to a given shard
2165
2166    @param hostname: The hostname of the shard to be changed.
2167    @param labels: Board labels separated by comma.
2168
2169    @raises error.RPCException: If label provided doesn't start with `board:` or
2170            board has been added to shard already.
2171    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2172
2173    @returns: The id of the changed shard.
2174    """
2175    labels = _assign_board_to_shard_precheck(labels)
2176    shard = models.Shard.objects.get(hostname=hostname)
2177    for label in labels:
2178        shard.labels.add(label)
2179    return shard.id
2180
2181
2182# Remove board RPCs are rare, so we can afford to make them a bit more
2183# expensive (by performing in a transaction) in order to guarantee
2184# atomicity.
2185@transaction.commit_on_success
2186def remove_board_from_shard(hostname, label):
2187    """Remove board from the given shard.
2188    @param hostname: The hostname of the shard to be changed.
2189    @param labels: Board label.
2190
2191    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2192
2193    @returns: The id of the changed shard.
2194    """
2195    shard = models.Shard.objects.get(hostname=hostname)
2196    label = models.Label.smart_get(label)
2197    if label not in shard.labels.all():
2198        raise error.RPCException(
2199          'Cannot remove label from shard that does not belong to it.')
2200
2201    shard.labels.remove(label)
2202    if label.is_replaced_by_static():
2203        static_label = models.StaticLabel.smart_get(label.name)
2204        models.Host.objects.filter(
2205                static_labels__in=[static_label]).update(shard=None)
2206    else:
2207        models.Host.objects.filter(labels__in=[label]).update(shard=None)
2208
2209
2210def delete_shard(hostname):
2211    """Delete a shard and reclaim all resources from it.
2212
2213    This claims back all assigned hosts from the shard.
2214    """
2215    shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
2216
2217    # Remove shard information.
2218    models.Host.objects.filter(shard=shard).update(shard=None)
2219
2220    # Note: The original job-cleanup query was performed with django call
2221    #   models.Job.objects.filter(shard=shard).update(shard=None)
2222    #
2223    # But that started becoming unreliable due to the large size of afe_jobs.
2224    #
2225    # We don't need atomicity here, so the new cleanup method is iterative, in
2226    # chunks of 100k jobs.
2227    QUERY = ('UPDATE afe_jobs SET shard_id = NULL WHERE shard_id = %s '
2228             'LIMIT 100000')
2229    try:
2230        with contextlib.closing(db_connection.cursor()) as cursor:
2231            clear_jobs = True
2232            assert shard.id is not None
2233            while clear_jobs:
2234                cursor.execute(QUERY % shard.id)
2235                clear_jobs = bool(cursor.fetchone())
2236    # Unit tests use sqlite backend instead of MySQL. sqlite does not support
2237    # UPDATE ... LIMIT, so fall back to the old behavior.
2238    except DatabaseError as e:
2239        if 'syntax error' in str(e):
2240            models.Job.objects.filter(shard=shard).update(shard=None)
2241        else:
2242            raise
2243
2244    shard.labels.clear()
2245    shard.delete()
2246
2247
2248def get_servers(hostname=None, role=None, status=None):
2249    """Get a list of servers with matching role and status.
2250
2251    @param hostname: FQDN of the server.
2252    @param role: Name of the server role, e.g., drone, scheduler. Default to
2253                 None to match any role.
2254    @param status: Status of the server, e.g., primary, backup, repair_required.
2255                   Default to None to match any server status.
2256
2257    @raises error.RPCException: If server database is not used.
2258    @return: A list of server names for servers with matching role and status.
2259    """
2260    raise DeprecationWarning("server_manager_utils has been removed.")
2261
2262
2263@rpc_utils.route_rpc_to_main
2264def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
2265    """Get stable version for the given board.
2266
2267    @param board: Name of the board.
2268    @param android: Unused legacy parameter.  This is maintained for the
2269            sake of clients on old branches that still pass the
2270            parameter.  TODO(jrbarnette) Remove this completely once R68
2271            drops off stable.
2272
2273    @return: Stable version of the given board. Return global configure value
2274             of CROS.stable_cros_version if stable_versinos table does not have
2275             entry of board DEFAULT.
2276    """
2277    assert not android, 'get_stable_version no longer supports `android`.'
2278    return stable_version_utils.get(board=board)
2279
2280
2281@rpc_utils.route_rpc_to_main
2282def get_all_stable_versions():
2283    """Get stable versions for all boards.
2284
2285    @return: A dictionary of board:version.
2286    """
2287    return stable_version_utils.get_all()
2288
2289
2290@rpc_utils.route_rpc_to_main
2291def set_stable_version(version, board=stable_version_utils.DEFAULT):
2292    """Modify stable version for the given board.
2293
2294    @param version: The new value of stable version for given board.
2295    @param board: Name of the board, default to value `DEFAULT`.
2296    """
2297    logging.warning("rpc_interface::set_stable_version: attempted to set stable version. setting the stable version is not permitted")
2298    return None
2299
2300
2301@rpc_utils.route_rpc_to_main
2302def delete_stable_version(board):
2303    """Modify stable version for the given board.
2304
2305    Delete a stable version entry in afe_stable_versions table for a given
2306    board, so default stable version will be used.
2307
2308    @param board: Name of the board.
2309    """
2310    stable_version_utils.delete(board=board)
2311
2312
2313def get_tests_by_build(build, ignore_invalid_tests=True):
2314    """Get the tests that are available for the specified build.
2315
2316    @param build: unique name by which to refer to the image.
2317    @param ignore_invalid_tests: flag on if unparsable tests are ignored.
2318
2319    @return: A sorted list of all tests that are in the build specified.
2320    """
2321    # Collect the control files specified in this build
2322    cfile_getter = control_file_lib._initialize_control_file_getter(build)
2323    if suite_common.ENABLE_CONTROLS_IN_BATCH:
2324        control_file_info_list = cfile_getter.get_suite_info()
2325        control_file_list = control_file_info_list.keys()
2326    else:
2327        control_file_list = cfile_getter.get_control_file_list()
2328
2329    test_objects = []
2330    _id = 0
2331    for control_file_path in control_file_list:
2332        # Read and parse the control file
2333        if suite_common.ENABLE_CONTROLS_IN_BATCH:
2334            control_file = control_file_info_list[control_file_path]
2335        else:
2336            control_file = cfile_getter.get_control_file_contents(
2337                    control_file_path)
2338        try:
2339            control_obj = control_data.parse_control_string(control_file)
2340        except:
2341            logging.info('Failed to parse control file: %s', control_file_path)
2342            if not ignore_invalid_tests:
2343                raise
2344
2345        # Extract the values needed for the AFE from the control_obj.
2346        # The keys list represents attributes in the control_obj that
2347        # are required by the AFE
2348        keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
2349                'test_category', 'test_class', 'dependencies', 'run_verify',
2350                'sync_count', 'job_retries', 'path']
2351
2352        test_object = {}
2353        for key in keys:
2354            test_object[key] = getattr(control_obj, key) if hasattr(
2355                    control_obj, key) else ''
2356
2357        # Unfortunately, the AFE expects different key-names for certain
2358        # values, these must be corrected to avoid the risk of tests
2359        # being omitted by the AFE.
2360        # The 'id' is an additional value used in the AFE.
2361        # The control_data parsing does not reference 'run_reset', but it
2362        # is also used in the AFE and defaults to True.
2363        test_object['id'] = _id
2364        test_object['run_reset'] = True
2365        test_object['description'] = test_object.get('doc', '')
2366        test_object['test_time'] = test_object.get('time', 0)
2367
2368        # TODO(crbug.com/873716) DEPRECATED. Remove entirely.
2369        test_object['test_retry'] = 0
2370
2371        # Fix the test name to be consistent with the current presentation
2372        # of test names in the AFE.
2373        testpath, subname = os.path.split(control_file_path)
2374        testname = os.path.basename(testpath)
2375        subname = subname.split('.')[1:]
2376        if subname:
2377            testname = '%s:%s' % (testname, ':'.join(subname))
2378
2379        test_object['name'] = testname
2380
2381        # Correct the test path as parse_control_string sets an empty string.
2382        test_object['path'] = control_file_path
2383
2384        _id += 1
2385        test_objects.append(test_object)
2386
2387    test_objects = sorted(test_objects, key=lambda x: x.get('name'))
2388    return rpc_utils.prepare_for_serialization(test_objects)
2389
2390
2391@rpc_utils.route_rpc_to_main
2392def get_lab_health_indicators(board=None):
2393    """Get the healthy indicators for whole lab.
2394
2395    The indicators now includes:
2396    1. lab is closed or not.
2397    2. Available DUTs list for a given board.
2398    3. Devserver capacity.
2399    4. When is the next major DUT utilization (e.g. CQ is coming in 3 minutes).
2400
2401    @param board: if board is specified, a list of available DUTs will be
2402        returned for it. Otherwise, skip this indicator.
2403
2404    @returns: A healthy indicator object including health info.
2405    """
2406    return LabHealthIndicator(None, None, None, None)
2407