• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# pylint: disable=missing-docstring
2
3"""\
4Functions to expose over the RPC interface.
5
6For all modify* and delete* functions that ask for an 'id' parameter to
7identify the object to operate on, the id may be either
8 * the database row ID
9 * the name of the object (label name, hostname, user login, etc.)
10 * a dictionary containing uniquely identifying field (this option should seldom
11   be used)
12
13When specifying foreign key fields (i.e. adding hosts to a label, or adding
14users to an ACL group), the given value may be either the database row ID or the
15name of the object.
16
17All get* functions return lists of dictionaries.  Each dictionary represents one
18object and maps field names to values.
19
20Some examples:
21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
23modify_test('sleeptest', test_type='Client', params=', seconds=60')
24delete_acl_group(1) # delete by ID
25delete_acl_group('Everyone') # delete by name
26acl_group_add_users('Everyone', ['mbligh', 'showard'])
27get_jobs(owner='showard', status='Queued')
28
29See doctests/001_rpc_test.txt for (lots) more examples.
30"""
31
32__author__ = 'showard@google.com (Steve Howard)'
33
34import ast
35import datetime
36import logging
37import os
38import sys
39
40from django.db import connection as db_connection
41from django.db import transaction
42from django.db.models import Count
43from django.db.utils import DatabaseError
44
45import common
46from autotest_lib.client.common_lib import control_data
47from autotest_lib.client.common_lib import error
48from autotest_lib.client.common_lib import global_config
49from autotest_lib.client.common_lib import priorities
50from autotest_lib.client.common_lib import time_utils
51from autotest_lib.client.common_lib.cros import dev_server
52from autotest_lib.frontend.afe import control_file as control_file_lib
53from autotest_lib.frontend.afe import model_attributes
54from autotest_lib.frontend.afe import model_logic
55from autotest_lib.frontend.afe import models
56from autotest_lib.frontend.afe import rpc_utils
57from autotest_lib.frontend.tko import models as tko_models
58from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
59from autotest_lib.server import frontend
60from autotest_lib.server import utils
61from autotest_lib.server.cros import provision
62from autotest_lib.server.cros.dynamic_suite import constants
63from autotest_lib.server.cros.dynamic_suite import control_file_getter
64from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
65from autotest_lib.server.cros.dynamic_suite import tools
66from autotest_lib.server.cros.dynamic_suite.suite import Suite
67from autotest_lib.server.lib import status_history
68from autotest_lib.site_utils import job_history
69from autotest_lib.site_utils import server_manager_utils
70from autotest_lib.site_utils import stable_version_utils
71
72
73_CONFIG = global_config.global_config
74
75# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
76
77# labels
78
79def modify_label(id, **data):
80    """Modify a label.
81
82    @param id: id or name of a label. More often a label name.
83    @param data: New data for a label.
84    """
85    label_model = models.Label.smart_get(id)
86    label_model.update_object(data)
87
88    # Master forwards the RPC to shards
89    if not utils.is_shard():
90        rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
91                             id=id, **data)
92
93
94def delete_label(id):
95    """Delete a label.
96
97    @param id: id or name of a label. More often a label name.
98    """
99    label_model = models.Label.smart_get(id)
100    # Hosts that have the label to be deleted. Save this info before
101    # the label is deleted to use it later.
102    hosts = []
103    for h in label_model.host_set.all():
104        hosts.append(models.Host.smart_get(h.id))
105    label_model.delete()
106
107    # Master forwards the RPC to shards
108    if not utils.is_shard():
109        rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
110
111
112def add_label(name, ignore_exception_if_exists=False, **kwargs):
113    """Adds a new label of a given name.
114
115    @param name: label name.
116    @param ignore_exception_if_exists: If True and the exception was
117        thrown due to the duplicated label name when adding a label,
118        then suppress the exception. Default is False.
119    @param kwargs: keyword args that store more info about a label
120        other than the name.
121    @return: int/long id of a new label.
122    """
123    # models.Label.add_object() throws model_logic.ValidationError
124    # when it is given a label name that already exists.
125    # However, ValidationError can be thrown with different errors,
126    # and those errors should be thrown up to the call chain.
127    try:
128        label = models.Label.add_object(name=name, **kwargs)
129    except:
130        exc_info = sys.exc_info()
131        if ignore_exception_if_exists:
132            label = rpc_utils.get_label(name)
133            # If the exception is raised not because of duplicated
134            # "name", then raise the original exception.
135            if label is None:
136                raise exc_info[0], exc_info[1], exc_info[2]
137        else:
138            raise exc_info[0], exc_info[1], exc_info[2]
139    return label.id
140
141
142def add_label_to_hosts(id, hosts):
143    """Adds a label of the given id to the given hosts only in local DB.
144
145    @param id: id or name of a label. More often a label name.
146    @param hosts: The hostnames of hosts that need the label.
147
148    @raises models.Label.DoesNotExist: If the label with id doesn't exist.
149    """
150    label = models.Label.smart_get(id)
151    host_objs = models.Host.smart_get_bulk(hosts)
152    if label.platform:
153        models.Host.check_no_platform(host_objs)
154    # Ensure a host has no more than one board label with it.
155    if label.name.startswith('board:'):
156        models.Host.check_board_labels_allowed(host_objs, [label.name])
157    label.host_set.add(*host_objs)
158
159
160def _create_label_everywhere(id, hosts):
161    """
162    Yet another method to create labels.
163
164    ALERT! This method should be run only on master not shards!
165    DO NOT RUN THIS ON A SHARD!!!  Deputies will hate you if you do!!!
166
167    This method exists primarily to serve label_add_hosts() and
168    host_add_labels().  Basically it pulls out the label check/add logic
169    from label_add_hosts() into this nice method that not only creates
170    the label but also tells the shards that service the hosts to also
171    create the label.
172
173    @param id: id or name of a label. More often a label name.
174    @param hosts: A list of hostnames or ids. More often hostnames.
175    """
176    try:
177        label = models.Label.smart_get(id)
178    except models.Label.DoesNotExist:
179        # This matches the type checks in smart_get, which is a hack
180        # in and off itself. The aim here is to create any non-existent
181        # label, which we cannot do if the 'id' specified isn't a label name.
182        if isinstance(id, basestring):
183            label = models.Label.smart_get(add_label(id))
184        else:
185            raise ValueError('Label id (%s) does not exist. Please specify '
186                             'the argument, id, as a string (label name).'
187                             % id)
188
189    # Make sure the label exists on the shard with the same id
190    # as it is on the master.
191    # It is possible that the label is already in a shard because
192    # we are adding a new label only to shards of hosts that the label
193    # is going to be attached.
194    # For example, we add a label L1 to a host in shard S1.
195    # Master and S1 will have L1 but other shards won't.
196    # Later, when we add the same label L1 to hosts in shards S1 and S2,
197    # S1 already has the label but S2 doesn't.
198    # S2 should have the new label without any problem.
199    # We ignore exception in such a case.
200    host_objs = models.Host.smart_get_bulk(hosts)
201    rpc_utils.fanout_rpc(
202            host_objs, 'add_label', include_hostnames=False,
203            name=label.name, ignore_exception_if_exists=True,
204            id=label.id, platform=label.platform)
205
206
207@rpc_utils.route_rpc_to_master
208def label_add_hosts(id, hosts):
209    """Adds a label with the given id to the given hosts.
210
211    This method should be run only on master not shards.
212    The given label will be created if it doesn't exist, provided the `id`
213    supplied is a label name not an int/long id.
214
215    @param id: id or name of a label. More often a label name.
216    @param hosts: A list of hostnames or ids. More often hostnames.
217
218    @raises ValueError: If the id specified is an int/long (label id)
219                        while the label does not exist.
220    """
221    # Create the label.
222    _create_label_everywhere(id, hosts)
223
224    # Add it to the master.
225    add_label_to_hosts(id, hosts)
226
227    # Add it to the shards.
228    host_objs = models.Host.smart_get_bulk(hosts)
229    rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
230
231
232def remove_label_from_hosts(id, hosts):
233    """Removes a label of the given id from the given hosts only in local DB.
234
235    @param id: id or name of a label.
236    @param hosts: The hostnames of hosts that need to remove the label from.
237    """
238    host_objs = models.Host.smart_get_bulk(hosts)
239    models.Label.smart_get(id).host_set.remove(*host_objs)
240
241
242@rpc_utils.route_rpc_to_master
243def label_remove_hosts(id, hosts):
244    """Removes a label of the given id from the given hosts.
245
246    This method should be run only on master not shards.
247
248    @param id: id or name of a label.
249    @param hosts: A list of hostnames or ids. More often hostnames.
250    """
251    host_objs = models.Host.smart_get_bulk(hosts)
252    remove_label_from_hosts(id, hosts)
253
254    rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
255
256
257def get_labels(exclude_filters=(), **filter_data):
258    """\
259    @param exclude_filters: A sequence of dictionaries of filters.
260
261    @returns A sequence of nested dictionaries of label information.
262    """
263    labels = models.Label.query_objects(filter_data)
264    for exclude_filter in exclude_filters:
265        labels = labels.exclude(**exclude_filter)
266    return rpc_utils.prepare_rows_as_nested_dicts(labels, ())
267
268
269# hosts
270
271def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
272    if locked and not lock_reason:
273        raise model_logic.ValidationError(
274            {'locked': 'Please provide a reason for locking when adding host.'})
275
276    return models.Host.add_object(hostname=hostname, status=status,
277                                  locked=locked, lock_reason=lock_reason,
278                                  protection=protection).id
279
280
281@rpc_utils.route_rpc_to_master
282def modify_host(id, **kwargs):
283    """Modify local attributes of a host.
284
285    If this is called on the master, but the host is assigned to a shard, this
286    will call `modify_host_local` RPC to the responsible shard. This means if
287    a host is being locked using this function, this change will also propagate
288    to shards.
289    When this is called on a shard, the shard just routes the RPC to the master
290    and does nothing.
291
292    @param id: id of the host to modify.
293    @param kwargs: key=value pairs of values to set on the host.
294    """
295    rpc_utils.check_modify_host(kwargs)
296    host = models.Host.smart_get(id)
297    try:
298        rpc_utils.check_modify_host_locking(host, kwargs)
299    except model_logic.ValidationError as e:
300        if not kwargs.get('force_modify_locking', False):
301            raise
302        logging.exception('The following exception will be ignored and lock '
303                          'modification will be enforced. %s', e)
304
305    # This is required to make `lock_time` for a host be exactly same
306    # between the master and a shard.
307    if kwargs.get('locked', None) and 'lock_time' not in kwargs:
308        kwargs['lock_time'] = datetime.datetime.now()
309    host.update_object(kwargs)
310
311    # force_modifying_locking is not an internal field in database, remove.
312    kwargs.pop('force_modify_locking', None)
313    rpc_utils.fanout_rpc([host], 'modify_host_local',
314                         include_hostnames=False, id=id, **kwargs)
315
316
317def modify_host_local(id, **kwargs):
318    """Modify host attributes in local DB.
319
320    @param id: Host id.
321    @param kwargs: key=value pairs of values to set on the host.
322    """
323    models.Host.smart_get(id).update_object(kwargs)
324
325
326@rpc_utils.route_rpc_to_master
327def modify_hosts(host_filter_data, update_data):
328    """Modify local attributes of multiple hosts.
329
330    If this is called on the master, but one of the hosts in that match the
331    filters is assigned to a shard, this will call `modify_hosts_local` RPC
332    to the responsible shard.
333    When this is called on a shard, the shard just routes the RPC to the master
334    and does nothing.
335
336    The filters are always applied on the master, not on the shards. This means
337    if the states of a host differ on the master and a shard, the state on the
338    master will be used. I.e. this means:
339    A host was synced to Shard 1. On Shard 1 the status of the host was set to
340    'Repair Failed'.
341    - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
342    update the host (both on the shard and on the master), because the state
343    of the host as the master knows it is still 'Ready'.
344    - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
345    will not update the host, because the filter doesn't apply on the master.
346
347    @param host_filter_data: Filters out which hosts to modify.
348    @param update_data: A dictionary with the changes to make to the hosts.
349    """
350    update_data = update_data.copy()
351    rpc_utils.check_modify_host(update_data)
352    hosts = models.Host.query_objects(host_filter_data)
353
354    affected_shard_hostnames = set()
355    affected_host_ids = []
356
357    # Check all hosts before changing data for exception safety.
358    for host in hosts:
359        try:
360            rpc_utils.check_modify_host_locking(host, update_data)
361        except model_logic.ValidationError as e:
362            if not update_data.get('force_modify_locking', False):
363                raise
364            logging.exception('The following exception will be ignored and '
365                              'lock modification will be enforced. %s', e)
366
367        if host.shard:
368            affected_shard_hostnames.add(host.shard.rpc_hostname())
369            affected_host_ids.append(host.id)
370
371    # This is required to make `lock_time` for a host be exactly same
372    # between the master and a shard.
373    if update_data.get('locked', None) and 'lock_time' not in update_data:
374        update_data['lock_time'] = datetime.datetime.now()
375    for host in hosts:
376        host.update_object(update_data)
377
378    update_data.pop('force_modify_locking', None)
379    # Caution: Changing the filter from the original here. See docstring.
380    rpc_utils.run_rpc_on_multiple_hostnames(
381            'modify_hosts_local', affected_shard_hostnames,
382            host_filter_data={'id__in': affected_host_ids},
383            update_data=update_data)
384
385
386def modify_hosts_local(host_filter_data, update_data):
387    """Modify attributes of hosts in local DB.
388
389    @param host_filter_data: Filters out which hosts to modify.
390    @param update_data: A dictionary with the changes to make to the hosts.
391    """
392    for host in models.Host.query_objects(host_filter_data):
393        host.update_object(update_data)
394
395
396def add_labels_to_host(id, labels):
397    """Adds labels to a given host only in local DB.
398
399    @param id: id or hostname for a host.
400    @param labels: ids or names for labels.
401    """
402    label_objs = models.Label.smart_get_bulk(labels)
403    models.Host.smart_get(id).labels.add(*label_objs)
404
405
406@rpc_utils.route_rpc_to_master
407def host_add_labels(id, labels):
408    """Adds labels to a given host.
409
410    @param id: id or hostname for a host.
411    @param labels: ids or names for labels.
412
413    @raises ValidationError: If adding more than one platform/board label.
414    """
415    # Create the labels on the master/shards.
416    for label in labels:
417        _create_label_everywhere(label, [id])
418
419    label_objs = models.Label.smart_get_bulk(labels)
420    platforms = [label.name for label in label_objs if label.platform]
421    boards = [label.name for label in label_objs
422              if label.name.startswith('board:')]
423    if len(platforms) > 1 or not utils.board_labels_allowed(boards):
424        raise model_logic.ValidationError(
425            {'labels': ('Adding more than one platform label, or a list of '
426                        'non-compatible board labels.: %s %s' %
427                        (', '.join(platforms), ', '.join(boards)))})
428
429    host_obj = models.Host.smart_get(id)
430    if platforms:
431        models.Host.check_no_platform([host_obj])
432    if boards:
433        models.Host.check_board_labels_allowed([host_obj], labels)
434    add_labels_to_host(id, labels)
435
436    rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
437                         id=id, labels=labels)
438
439
440def remove_labels_from_host(id, labels):
441    """Removes labels from a given host only in local DB.
442
443    @param id: id or hostname for a host.
444    @param labels: ids or names for labels.
445    """
446    label_objs = models.Label.smart_get_bulk(labels)
447    models.Host.smart_get(id).labels.remove(*label_objs)
448
449
450@rpc_utils.route_rpc_to_master
451def host_remove_labels(id, labels):
452    """Removes labels from a given host.
453
454    @param id: id or hostname for a host.
455    @param labels: ids or names for labels.
456    """
457    remove_labels_from_host(id, labels)
458
459    host_obj = models.Host.smart_get(id)
460    rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
461                         id=id, labels=labels)
462
463
464def get_host_attribute(attribute, **host_filter_data):
465    """
466    @param attribute: string name of attribute
467    @param host_filter_data: filter data to apply to Hosts to choose hosts to
468                             act upon
469    """
470    hosts = rpc_utils.get_host_query((), False, True, host_filter_data)
471    hosts = list(hosts)
472    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
473                                               'attribute_list')
474    host_attr_dicts = []
475    for host_obj in hosts:
476        for attr_obj in host_obj.attribute_list:
477            if attr_obj.attribute == attribute:
478                host_attr_dicts.append(attr_obj.get_object_dict())
479    return rpc_utils.prepare_for_serialization(host_attr_dicts)
480
481
482def set_host_attribute(attribute, value, **host_filter_data):
483    """
484    @param attribute: string name of attribute
485    @param value: string, or None to delete an attribute
486    @param host_filter_data: filter data to apply to Hosts to choose hosts to
487                             act upon
488    """
489    assert host_filter_data # disallow accidental actions on all hosts
490    hosts = models.Host.query_objects(host_filter_data)
491    models.AclGroup.check_for_acl_violation_hosts(hosts)
492    for host in hosts:
493        host.set_or_delete_attribute(attribute, value)
494
495    # Master forwards this RPC to shards.
496    if not utils.is_shard():
497        rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False,
498                attribute=attribute, value=value, **host_filter_data)
499
500
501@rpc_utils.forward_single_host_rpc_to_shard
502def delete_host(id):
503    models.Host.smart_get(id).delete()
504
505
506def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
507              valid_only=True, include_current_job=False, **filter_data):
508    """Get a list of dictionaries which contains the information of hosts.
509
510    @param multiple_labels: match hosts in all of the labels given.  Should
511            be a list of label names.
512    @param exclude_only_if_needed_labels: Exclude hosts with at least one
513            "only_if_needed" label applied.
514    @param include_current_job: Set to True to include ids of currently running
515            job and special task.
516    """
517    hosts = rpc_utils.get_host_query(multiple_labels,
518                                     exclude_only_if_needed_labels,
519                                     valid_only, filter_data)
520    hosts = list(hosts)
521    models.Host.objects.populate_relationships(hosts, models.Label,
522                                               'label_list')
523    models.Host.objects.populate_relationships(hosts, models.AclGroup,
524                                               'acl_list')
525    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
526                                               'attribute_list')
527    host_dicts = []
528    for host_obj in hosts:
529        host_dict = host_obj.get_object_dict()
530        host_dict['labels'] = [label.name for label in host_obj.label_list]
531        host_dict['platform'] = rpc_utils.find_platform(host_obj)
532        host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
533        host_dict['attributes'] = dict((attribute.attribute, attribute.value)
534                                       for attribute in host_obj.attribute_list)
535        if include_current_job:
536            host_dict['current_job'] = None
537            host_dict['current_special_task'] = None
538            entries = models.HostQueueEntry.objects.filter(
539                    host_id=host_dict['id'], active=True, complete=False)
540            if entries:
541                host_dict['current_job'] = (
542                        entries[0].get_object_dict()['job'])
543            tasks = models.SpecialTask.objects.filter(
544                    host_id=host_dict['id'], is_active=True, is_complete=False)
545            if tasks:
546                host_dict['current_special_task'] = (
547                        '%d-%s' % (tasks[0].get_object_dict()['id'],
548                                   tasks[0].get_object_dict()['task'].lower()))
549        host_dicts.append(host_dict)
550    return rpc_utils.prepare_for_serialization(host_dicts)
551
552
553def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
554                  valid_only=True, **filter_data):
555    """
556    Same parameters as get_hosts().
557
558    @returns The number of matching hosts.
559    """
560    hosts = rpc_utils.get_host_query(multiple_labels,
561                                     exclude_only_if_needed_labels,
562                                     valid_only, filter_data)
563    return hosts.count()
564
565
566# tests
567
568def get_tests(**filter_data):
569    return rpc_utils.prepare_for_serialization(
570        models.Test.list_objects(filter_data))
571
572
573def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
574    """Gets the counts of all passed and failed tests from the matching jobs.
575
576    @param job_name_prefix: Name prefix of the jobs to get the summary
577           from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix
578           matching is case insensitive.
579    @param label_name: Label that must be set in the jobs, e.g.,
580            'cros-version:butterfly-release/R40-6457.21.0'.
581
582    @returns A summary of the counts of all the passed and failed tests.
583    """
584    job_ids = list(models.Job.objects.filter(
585            name__istartswith=job_name_prefix,
586            dependency_labels__name=label_name).values_list(
587                'pk', flat=True))
588    summary = {'passed': 0, 'failed': 0}
589    if not job_ids:
590        return summary
591
592    counts = (tko_models.TestView.objects.filter(
593            afe_job_id__in=job_ids).exclude(
594                test_name='SERVER_JOB').exclude(
595                    test_name__startswith='CLIENT_JOB').values(
596                        'status').annotate(
597                            count=Count('status')))
598    for status in counts:
599        if status['status'] == 'GOOD':
600            summary['passed'] += status['count']
601        else:
602            summary['failed'] += status['count']
603    return summary
604
605
606# profilers
607
608def add_profiler(name, description=None):
609    return models.Profiler.add_object(name=name, description=description).id
610
611
612def modify_profiler(id, **data):
613    models.Profiler.smart_get(id).update_object(data)
614
615
616def delete_profiler(id):
617    models.Profiler.smart_get(id).delete()
618
619
620def get_profilers(**filter_data):
621    return rpc_utils.prepare_for_serialization(
622        models.Profiler.list_objects(filter_data))
623
624
625# users
626
627def get_users(**filter_data):
628    return rpc_utils.prepare_for_serialization(
629        models.User.list_objects(filter_data))
630
631
632# acl groups
633
634def add_acl_group(name, description=None):
635    group = models.AclGroup.add_object(name=name, description=description)
636    group.users.add(models.User.current_user())
637    return group.id
638
639
640def modify_acl_group(id, **data):
641    group = models.AclGroup.smart_get(id)
642    group.check_for_acl_violation_acl_group()
643    group.update_object(data)
644    group.add_current_user_if_empty()
645
646
647def acl_group_add_users(id, users):
648    group = models.AclGroup.smart_get(id)
649    group.check_for_acl_violation_acl_group()
650    users = models.User.smart_get_bulk(users)
651    group.users.add(*users)
652
653
654def acl_group_remove_users(id, users):
655    group = models.AclGroup.smart_get(id)
656    group.check_for_acl_violation_acl_group()
657    users = models.User.smart_get_bulk(users)
658    group.users.remove(*users)
659    group.add_current_user_if_empty()
660
661
662def acl_group_add_hosts(id, hosts):
663    group = models.AclGroup.smart_get(id)
664    group.check_for_acl_violation_acl_group()
665    hosts = models.Host.smart_get_bulk(hosts)
666    group.hosts.add(*hosts)
667    group.on_host_membership_change()
668
669
670def acl_group_remove_hosts(id, hosts):
671    group = models.AclGroup.smart_get(id)
672    group.check_for_acl_violation_acl_group()
673    hosts = models.Host.smart_get_bulk(hosts)
674    group.hosts.remove(*hosts)
675    group.on_host_membership_change()
676
677
678def delete_acl_group(id):
679    models.AclGroup.smart_get(id).delete()
680
681
682def get_acl_groups(**filter_data):
683    acl_groups = models.AclGroup.list_objects(filter_data)
684    for acl_group in acl_groups:
685        acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
686        acl_group['users'] = [user.login
687                              for user in acl_group_obj.users.all()]
688        acl_group['hosts'] = [host.hostname
689                              for host in acl_group_obj.hosts.all()]
690    return rpc_utils.prepare_for_serialization(acl_groups)
691
692
693# jobs
694
695def generate_control_file(tests=(), profilers=(),
696                          client_control_file='', use_container=False,
697                          profile_only=None, db_tests=True,
698                          test_source_build=None):
699    """
700    Generates a client-side control file to run tests.
701
702    @param tests List of tests to run. See db_tests for more information.
703    @param profilers List of profilers to activate during the job.
704    @param client_control_file The contents of a client-side control file to
705        run at the end of all tests.  If this is supplied, all tests must be
706        client side.
707        TODO: in the future we should support server control files directly
708        to wrap with a kernel.  That'll require changing the parameter
709        name and adding a boolean to indicate if it is a client or server
710        control file.
711    @param use_container unused argument today.  TODO: Enable containers
712        on the host during a client side test.
713    @param profile_only A boolean that indicates what default profile_only
714        mode to use in the control file. Passing None will generate a
715        control file that does not explcitly set the default mode at all.
716    @param db_tests: if True, the test object can be found in the database
717                     backing the test model. In this case, tests is a tuple
718                     of test IDs which are used to retrieve the test objects
719                     from the database. If False, tests is a tuple of test
720                     dictionaries stored client-side in the AFE.
721    @param test_source_build: Build to be used to retrieve test code. Default
722                              to None.
723
724    @returns a dict with the following keys:
725        control_file: str, The control file text.
726        is_server: bool, is the control file a server-side control file?
727        synch_count: How many machines the job uses per autoserv execution.
728            synch_count == 1 means the job is asynchronous.
729        dependencies: A list of the names of labels on which the job depends.
730    """
731    if not tests and not client_control_file:
732        return dict(control_file='', is_server=False, synch_count=1,
733                    dependencies=[])
734
735    cf_info, test_objects, profiler_objects = (
736        rpc_utils.prepare_generate_control_file(tests, profilers,
737                                                db_tests))
738    cf_info['control_file'] = control_file_lib.generate_control(
739        tests=test_objects, profilers=profiler_objects,
740        is_server=cf_info['is_server'],
741        client_control_file=client_control_file, profile_only=profile_only,
742        test_source_build=test_source_build)
743    return cf_info
744
745
746def create_job_page_handler(name, priority, control_file, control_type,
747                            image=None, hostless=False, firmware_rw_build=None,
748                            firmware_ro_build=None, test_source_build=None,
749                            is_cloning=False, **kwargs):
750    """\
751    Create and enqueue a job.
752
753    @param name name of this job
754    @param priority Integer priority of this job.  Higher is more important.
755    @param control_file String contents of the control file.
756    @param control_type Type of control file, Client or Server.
757    @param image: ChromeOS build to be installed in the dut. Default to None.
758    @param firmware_rw_build: Firmware build to update RW firmware. Default to
759                              None, i.e., RW firmware will not be updated.
760    @param firmware_ro_build: Firmware build to update RO firmware. Default to
761                              None, i.e., RO firmware will not be updated.
762    @param test_source_build: Build to be used to retrieve test code. Default
763                              to None.
764    @param is_cloning: True if creating a cloning job.
765    @param kwargs extra args that will be required by create_suite_job or
766                  create_job.
767
768    @returns The created Job id number.
769    """
770    if is_cloning:
771        logging.info('Start to clone a new job')
772        # When cloning a job, hosts and meta_hosts should not exist together,
773        # which would cause host-scheduler to schedule two hqe jobs to one host
774        # at the same time, and crash itself. Clear meta_hosts for this case.
775        if kwargs.get('hosts') and kwargs.get('meta_hosts'):
776            kwargs['meta_hosts'] = []
777    else:
778        logging.info('Start to create a new job')
779    control_file = rpc_utils.encode_ascii(control_file)
780    if not control_file:
781        raise model_logic.ValidationError({
782                'control_file' : "Control file cannot be empty"})
783
784    if image and hostless:
785        builds = {}
786        builds[provision.CROS_VERSION_PREFIX] = image
787        if firmware_rw_build:
788            builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
789        if firmware_ro_build:
790            builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
791        return create_suite_job(
792                name=name, control_file=control_file, priority=priority,
793                builds=builds, test_source_build=test_source_build,
794                is_cloning=is_cloning, **kwargs)
795    return create_job(name, priority, control_file, control_type, image=image,
796                      hostless=hostless, **kwargs)
797
798
799@rpc_utils.route_rpc_to_master
800def create_job(
801        name,
802        priority,
803        control_file,
804        control_type,
805        hosts=(),
806        meta_hosts=(),
807        one_time_hosts=(),
808        synch_count=None,
809        is_template=False,
810        timeout=None,
811        timeout_mins=None,
812        max_runtime_mins=None,
813        run_verify=False,
814        email_list='',
815        dependencies=(),
816        reboot_before=None,
817        reboot_after=None,
818        parse_failed_repair=None,
819        hostless=False,
820        keyvals=None,
821        drone_set=None,
822        image=None,
823        parent_job_id=None,
824        test_retry=0,
825        run_reset=True,
826        require_ssp=None,
827        args=(),
828        **kwargs):
829    """\
830    Create and enqueue a job.
831
832    @param name name of this job
833    @param priority Integer priority of this job.  Higher is more important.
834    @param control_file String contents of the control file.
835    @param control_type Type of control file, Client or Server.
836    @param synch_count How many machines the job uses per autoserv execution.
837        synch_count == 1 means the job is asynchronous.  If an atomic group is
838        given this value is treated as a minimum.
839    @param is_template If true then create a template job.
840    @param timeout Hours after this call returns until the job times out.
841    @param timeout_mins Minutes after this call returns until the job times
842        out.
843    @param max_runtime_mins Minutes from job starting time until job times out
844    @param run_verify Should the host be verified before running the test?
845    @param email_list String containing emails to mail when the job is done
846    @param dependencies List of label names on which this job depends
847    @param reboot_before Never, If dirty, or Always
848    @param reboot_after Never, If all tests passed, or Always
849    @param parse_failed_repair if true, results of failed repairs launched by
850        this job will be parsed as part of the job.
851    @param hostless if true, create a hostless job
852    @param keyvals dict of keyvals to associate with the job
853    @param hosts List of hosts to run job on.
854    @param meta_hosts List where each entry is a label name, and for each entry
855        one host will be chosen from that label to run the job on.
856    @param one_time_hosts List of hosts not in the database to run the job on.
857    @param drone_set The name of the drone set to run this test on.
858    @param image OS image to install before running job.
859    @param parent_job_id id of a job considered to be parent of created job.
860    @param test_retry Number of times to retry test if the test did not
861        complete successfully. (optional, default: 0)
862    @param run_reset Should the host be reset before running the test?
863    @param require_ssp Set to True to require server-side packaging to run the
864                       test. If it's set to None, drone will still try to run
865                       the server side with server-side packaging. If the
866                       autotest-server package doesn't exist for the build or
867                       image is not set, drone will run the test without server-
868                       side packaging. Default is None.
869    @param args A list of args to be injected into control file.
870    @param kwargs extra keyword args. NOT USED.
871
872    @returns The created Job id number.
873    """
874    if args:
875        control_file = tools.inject_vars({'args': args}, control_file)
876    if image:
877        dependencies += (provision.image_version_to_label(image),)
878    return rpc_utils.create_job_common(
879            name=name,
880            priority=priority,
881            control_type=control_type,
882            control_file=control_file,
883            hosts=hosts,
884            meta_hosts=meta_hosts,
885            one_time_hosts=one_time_hosts,
886            synch_count=synch_count,
887            is_template=is_template,
888            timeout=timeout,
889            timeout_mins=timeout_mins,
890            max_runtime_mins=max_runtime_mins,
891            run_verify=run_verify,
892            email_list=email_list,
893            dependencies=dependencies,
894            reboot_before=reboot_before,
895            reboot_after=reboot_after,
896            parse_failed_repair=parse_failed_repair,
897            hostless=hostless,
898            keyvals=keyvals,
899            drone_set=drone_set,
900            parent_job_id=parent_job_id,
901            test_retry=test_retry,
902            run_reset=run_reset,
903            require_ssp=require_ssp)
904
905
906def abort_host_queue_entries(**filter_data):
907    """\
908    Abort a set of host queue entries.
909
910    @return: A list of dictionaries, each contains information
911             about an aborted HQE.
912    """
913    query = models.HostQueueEntry.query_objects(filter_data)
914
915    # Dont allow aborts on:
916    #   1. Jobs that have already completed (whether or not they were aborted)
917    #   2. Jobs that we have already been aborted (but may not have completed)
918    query = query.filter(complete=False).filter(aborted=False)
919    models.AclGroup.check_abort_permissions(query)
920    host_queue_entries = list(query.select_related())
921    rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
922
923    models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
924    hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
925                 'Job name': hqe.job.name} for hqe in host_queue_entries]
926    return hqe_info
927
928
929def abort_special_tasks(**filter_data):
930    """\
931    Abort the special task, or tasks, specified in the filter.
932    """
933    query = models.SpecialTask.query_objects(filter_data)
934    special_tasks = query.filter(is_active=True)
935    for task in special_tasks:
936        task.abort()
937
938
939def _call_special_tasks_on_hosts(task, hosts):
940    """\
941    Schedules a set of hosts for a special task.
942
943    @returns A list of hostnames that a special task was created for.
944    """
945    models.AclGroup.check_for_acl_violation_hosts(hosts)
946    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
947    if shard_host_map and not utils.is_shard():
948        raise ValueError('The following hosts are on shards, please '
949                         'follow the link to the shards and create jobs '
950                         'there instead. %s.' % shard_host_map)
951    for host in hosts:
952        models.SpecialTask.schedule_special_task(host, task)
953    return list(sorted(host.hostname for host in hosts))
954
955
956def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
957    """Forward special tasks to corresponding shards.
958
959    For master, when special tasks are fired on hosts that are sharded,
960    forward the RPC to corresponding shards.
961
962    For shard, create special task records in local DB.
963
964    @param task: Enum value of frontend.afe.models.SpecialTask.Task
965    @param rpc: RPC name to forward.
966    @param filter_data: Filter keywords to be used for DB query.
967
968    @return: A list of hostnames that a special task was created for.
969    """
970    hosts = models.Host.query_objects(filter_data)
971    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True)
972
973    # Filter out hosts on a shard from those on the master, forward
974    # rpcs to the shard with an additional hostname__in filter, and
975    # create a local SpecialTask for each remaining host.
976    if shard_host_map and not utils.is_shard():
977        hosts = [h for h in hosts if h.shard is None]
978        for shard, hostnames in shard_host_map.iteritems():
979
980            # The main client of this module is the frontend website, and
981            # it invokes it with an 'id' or an 'id__in' filter. Regardless,
982            # the 'hostname' filter should narrow down the list of hosts on
983            # each shard even though we supply all the ids in filter_data.
984            # This method uses hostname instead of id because it fits better
985            # with the overall architecture of redirection functions in
986            # rpc_utils.
987            shard_filter = filter_data.copy()
988            shard_filter['hostname__in'] = hostnames
989            rpc_utils.run_rpc_on_multiple_hostnames(
990                    rpc, [shard], **shard_filter)
991
992    # There is a race condition here if someone assigns a shard to one of these
993    # hosts before we create the task. The host will stay on the master if:
994    # 1. The host is not Ready
995    # 2. The host is Ready but has a task
996    # But if the host is Ready and doesn't have a task yet, it will get sent
997    # to the shard as we're creating a task here.
998
999    # Given that we only rarely verify Ready hosts it isn't worth putting this
1000    # entire method in a transaction. The worst case scenario is that we have
1001    # a verify running on a Ready host while the shard is using it, if the
1002    # verify fails no subsequent tasks will be created against the host on the
1003    # master, and verifies are safe enough that this is OK.
1004    return _call_special_tasks_on_hosts(task, hosts)
1005
1006
1007def reverify_hosts(**filter_data):
1008    """\
1009    Schedules a set of hosts for verify.
1010
1011    @returns A list of hostnames that a verify task was created for.
1012    """
1013    return _forward_special_tasks_on_hosts(
1014            models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
1015
1016
1017def repair_hosts(**filter_data):
1018    """\
1019    Schedules a set of hosts for repair.
1020
1021    @returns A list of hostnames that a repair task was created for.
1022    """
1023    return _forward_special_tasks_on_hosts(
1024            models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
1025
1026
1027def get_jobs(not_yet_run=False, running=False, finished=False,
1028             suite=False, sub=False, standalone=False, **filter_data):
1029    """\
1030    Extra status filter args for get_jobs:
1031    -not_yet_run: Include only jobs that have not yet started running.
1032    -running: Include only jobs that have start running but for which not
1033    all hosts have completed.
1034    -finished: Include only jobs for which all hosts have completed (or
1035    aborted).
1036
1037    Extra type filter args for get_jobs:
1038    -suite: Include only jobs with child jobs.
1039    -sub: Include only jobs with a parent job.
1040    -standalone: Inlcude only jobs with no child or parent jobs.
1041    At most one of these three fields should be specified.
1042    """
1043    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1044                                                    running,
1045                                                    finished)
1046    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1047                                                                 suite,
1048                                                                 sub,
1049                                                                 standalone)
1050    job_dicts = []
1051    jobs = list(models.Job.query_objects(filter_data))
1052    models.Job.objects.populate_relationships(jobs, models.Label,
1053                                              'dependencies')
1054    models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
1055    for job in jobs:
1056        job_dict = job.get_object_dict()
1057        job_dict['dependencies'] = ','.join(label.name
1058                                            for label in job.dependencies)
1059        job_dict['keyvals'] = dict((keyval.key, keyval.value)
1060                                   for keyval in job.keyvals)
1061        job_dicts.append(job_dict)
1062    return rpc_utils.prepare_for_serialization(job_dicts)
1063
1064
1065def get_num_jobs(not_yet_run=False, running=False, finished=False,
1066                 suite=False, sub=False, standalone=False,
1067                 **filter_data):
1068    """\
1069    See get_jobs() for documentation of extra filter parameters.
1070    """
1071    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1072                                                    running,
1073                                                    finished)
1074    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1075                                                                 suite,
1076                                                                 sub,
1077                                                                 standalone)
1078    return models.Job.query_count(filter_data)
1079
1080
1081def get_jobs_summary(**filter_data):
1082    """\
1083    Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
1084
1085    'status_counts' filed is a dictionary mapping status strings to the number
1086    of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
1087
1088    'result_counts' field is piped to tko's rpc_interface and has the return
1089    format specified under get_group_counts.
1090    """
1091    jobs = get_jobs(**filter_data)
1092    ids = [job['id'] for job in jobs]
1093    all_status_counts = models.Job.objects.get_status_counts(ids)
1094    for job in jobs:
1095        job['status_counts'] = all_status_counts[job['id']]
1096        job['result_counts'] = tko_rpc_interface.get_status_counts(
1097                ['afe_job_id', 'afe_job_id'],
1098                header_groups=[['afe_job_id'], ['afe_job_id']],
1099                **{'afe_job_id': job['id']})
1100    return rpc_utils.prepare_for_serialization(jobs)
1101
1102
1103def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
1104    """\
1105    Retrieves all the information needed to clone a job.
1106    """
1107    job = models.Job.objects.get(id=id)
1108    job_info = rpc_utils.get_job_info(job,
1109                                      preserve_metahosts,
1110                                      queue_entry_filter_data)
1111
1112    host_dicts = []
1113    for host in job_info['hosts']:
1114        host_dict = get_hosts(id=host.id)[0]
1115        other_labels = host_dict['labels']
1116        if host_dict['platform']:
1117            other_labels.remove(host_dict['platform'])
1118        host_dict['other_labels'] = ', '.join(other_labels)
1119        host_dicts.append(host_dict)
1120
1121    for host in job_info['one_time_hosts']:
1122        host_dict = dict(hostname=host.hostname,
1123                         id=host.id,
1124                         platform='(one-time host)',
1125                         locked_text='')
1126        host_dicts.append(host_dict)
1127
1128    # convert keys from Label objects to strings (names of labels)
1129    meta_host_counts = dict((meta_host.name, count) for meta_host, count
1130                            in job_info['meta_host_counts'].iteritems())
1131
1132    info = dict(job=job.get_object_dict(),
1133                meta_host_counts=meta_host_counts,
1134                hosts=host_dicts)
1135    info['job']['dependencies'] = job_info['dependencies']
1136    info['hostless'] = job_info['hostless']
1137    info['drone_set'] = job.drone_set and job.drone_set.name
1138
1139    image = _get_image_for_job(job, job_info['hostless'])
1140    if image:
1141        info['job']['image'] = image
1142
1143    return rpc_utils.prepare_for_serialization(info)
1144
1145
1146def _get_image_for_job(job, hostless):
1147    """Gets the image used for a job.
1148
1149    Gets the image used for an AFE job from the job's keyvals 'build' or
1150    'builds'. If that fails, and the job is a hostless job, tries to
1151    get the image from its control file attributes 'build' or 'builds'.
1152
1153    TODO(ntang): Needs to handle FAFT with two builds for ro/rw.
1154
1155    @param job      An AFE job object.
1156    @param hostless Boolean indicating whether the job is hostless.
1157
1158    @returns The image build used for the job.
1159    """
1160    keyvals = job.keyval_dict()
1161    image = keyvals.get('build')
1162    if not image:
1163        value = keyvals.get('builds')
1164        builds = None
1165        if isinstance(value, dict):
1166            builds = value
1167        elif isinstance(value, basestring):
1168            builds = ast.literal_eval(value)
1169        if builds:
1170            image = builds.get('cros-version')
1171    if not image and hostless and job.control_file:
1172        try:
1173            control_obj = control_data.parse_control_string(
1174                    job.control_file)
1175            if hasattr(control_obj, 'build'):
1176                image = getattr(control_obj, 'build')
1177            if not image and hasattr(control_obj, 'builds'):
1178                builds = getattr(control_obj, 'builds')
1179                image = builds.get('cros-version')
1180        except:
1181            logging.warning('Failed to parse control file for job: %s',
1182                            job.name)
1183    return image
1184
1185
1186def get_host_queue_entries_by_insert_time(
1187    insert_time_after=None, insert_time_before=None, **filter_data):
1188    """Like get_host_queue_entries, but using the insert index table.
1189
1190    @param insert_time_after: A lower bound on insert_time
1191    @param insert_time_before: An upper bound on insert_time
1192    @returns A sequence of nested dictionaries of host and job information.
1193    """
1194    assert insert_time_after is not None or insert_time_before is not None, \
1195      ('Caller to get_host_queue_entries_by_insert_time must provide either'
1196       ' insert_time_after or insert_time_before.')
1197    # Get insert bounds on the index of the host queue entries.
1198    if insert_time_after:
1199        query = models.HostQueueEntryStartTimes.objects.filter(
1200            # Note: '-insert_time' means descending. We want the largest
1201            # insert time smaller than the insert time.
1202            insert_time__lte=insert_time_after).order_by('-insert_time')
1203        try:
1204            constraint = query[0].highest_hqe_id
1205            if 'id__gte' in filter_data:
1206                constraint = max(constraint, filter_data['id__gte'])
1207            filter_data['id__gte'] = constraint
1208        except IndexError:
1209            pass
1210
1211    # Get end bounds.
1212    if insert_time_before:
1213        query = models.HostQueueEntryStartTimes.objects.filter(
1214            insert_time__gte=insert_time_before).order_by('insert_time')
1215        try:
1216            constraint = query[0].highest_hqe_id
1217            if 'id__lte' in filter_data:
1218                constraint = min(constraint, filter_data['id__lte'])
1219            filter_data['id__lte'] = constraint
1220        except IndexError:
1221            pass
1222
1223    return rpc_utils.prepare_rows_as_nested_dicts(
1224            models.HostQueueEntry.query_objects(filter_data),
1225            ('host', 'job'))
1226
1227
1228def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
1229    """\
1230    @returns A sequence of nested dictionaries of host and job information.
1231    """
1232    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1233                                                   'started_on__lte',
1234                                                   start_time,
1235                                                   end_time,
1236                                                   **filter_data)
1237    return rpc_utils.prepare_rows_as_nested_dicts(
1238            models.HostQueueEntry.query_objects(filter_data),
1239            ('host', 'job'))
1240
1241
1242def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
1243    """\
1244    Get the number of host queue entries associated with this job.
1245    """
1246    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1247                                                   'started_on__lte',
1248                                                   start_time,
1249                                                   end_time,
1250                                                   **filter_data)
1251    return models.HostQueueEntry.query_count(filter_data)
1252
1253
1254def get_hqe_percentage_complete(**filter_data):
1255    """
1256    Computes the fraction of host queue entries matching the given filter data
1257    that are complete.
1258    """
1259    query = models.HostQueueEntry.query_objects(filter_data)
1260    complete_count = query.filter(complete=True).count()
1261    total_count = query.count()
1262    if total_count == 0:
1263        return 1
1264    return float(complete_count) / total_count
1265
1266
1267# special tasks
1268
1269def get_special_tasks(**filter_data):
1270    """Get special task entries from the local database.
1271
1272    Query the special tasks table for tasks matching the given
1273    `filter_data`, and return a list of the results.  No attempt is
1274    made to forward the call to shards; the buck will stop here.
1275    The caller is expected to know the target shard for such reasons
1276    as:
1277      * The caller is a service (such as gs_offloader) configured
1278        to operate on behalf of one specific shard, and no other.
1279      * The caller has a host as a parameter, and knows that this is
1280        the shard assigned to that host.
1281
1282    @param filter_data  Filter keywords to pass to the underlying
1283                        database query.
1284
1285    """
1286    return rpc_utils.prepare_rows_as_nested_dicts(
1287            models.SpecialTask.query_objects(filter_data),
1288            ('host', 'queue_entry'))
1289
1290
1291def get_host_special_tasks(host_id, **filter_data):
1292    """Get special task entries for a given host.
1293
1294    Query the special tasks table for tasks that ran on the host
1295    given by `host_id` and matching the given `filter_data`.
1296    Return a list of the results.  If the host is assigned to a
1297    shard, forward this call to that shard.
1298
1299    @param host_id      Id in the database of the target host.
1300    @param filter_data  Filter keywords to pass to the underlying
1301                        database query.
1302
1303    """
1304    # Retrieve host data even if the host is in an invalid state.
1305    host = models.Host.smart_get(host_id, False)
1306    if not host.shard:
1307        return get_special_tasks(host_id=host_id, **filter_data)
1308    else:
1309        # The return values from AFE methods are post-processed
1310        # objects that aren't JSON-serializable.  So, we have to
1311        # call AFE.run() to get the raw, serializable output from
1312        # the shard.
1313        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1314        return shard_afe.run('get_special_tasks',
1315                             host_id=host_id, **filter_data)
1316
1317
1318def get_num_special_tasks(**kwargs):
1319    """Get the number of special task entries from the local database.
1320
1321    Query the special tasks table for tasks matching the given 'kwargs',
1322    and return the number of the results. No attempt is made to forward
1323    the call to shards; the buck will stop here.
1324
1325    @param kwargs    Filter keywords to pass to the underlying database query.
1326
1327    """
1328    return models.SpecialTask.query_count(kwargs)
1329
1330
1331def get_host_num_special_tasks(host, **kwargs):
1332    """Get special task entries for a given host.
1333
1334    Query the special tasks table for tasks that ran on the host
1335    given by 'host' and matching the given 'kwargs'.
1336    Return a list of the results.  If the host is assigned to a
1337    shard, forward this call to that shard.
1338
1339    @param host      id or name of a host. More often a hostname.
1340    @param kwargs    Filter keywords to pass to the underlying database query.
1341
1342    """
1343    # Retrieve host data even if the host is in an invalid state.
1344    host_model = models.Host.smart_get(host, False)
1345    if not host_model.shard:
1346        return get_num_special_tasks(host=host, **kwargs)
1347    else:
1348        shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname())
1349        return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
1350
1351
1352def get_status_task(host_id, end_time):
1353    """Get the "status task" for a host from the local shard.
1354
1355    Returns a single special task representing the given host's
1356    "status task".  The status task is a completed special task that
1357    identifies whether the corresponding host was working or broken
1358    when it completed.  A successful task indicates a working host;
1359    a failed task indicates broken.
1360
1361    This call will not be forward to a shard; the receiving server
1362    must be the shard that owns the host.
1363
1364    @param host_id      Id in the database of the target host.
1365    @param end_time     Time reference for the host's status.
1366
1367    @return A single task; its status (successful or not)
1368            corresponds to the status of the host (working or
1369            broken) at the given time.  If no task is found, return
1370            `None`.
1371
1372    """
1373    tasklist = rpc_utils.prepare_rows_as_nested_dicts(
1374            status_history.get_status_task(host_id, end_time),
1375            ('host', 'queue_entry'))
1376    return tasklist[0] if tasklist else None
1377
1378
1379def get_host_status_task(host_id, end_time):
1380    """Get the "status task" for a host from its owning shard.
1381
1382    Finds the given host's owning shard, and forwards to it a call
1383    to `get_status_task()` (see above).
1384
1385    @param host_id      Id in the database of the target host.
1386    @param end_time     Time reference for the host's status.
1387
1388    @return A single task; its status (successful or not)
1389            corresponds to the status of the host (working or
1390            broken) at the given time.  If no task is found, return
1391            `None`.
1392
1393    """
1394    host = models.Host.smart_get(host_id)
1395    if not host.shard:
1396        return get_status_task(host_id, end_time)
1397    else:
1398        # The return values from AFE methods are post-processed
1399        # objects that aren't JSON-serializable.  So, we have to
1400        # call AFE.run() to get the raw, serializable output from
1401        # the shard.
1402        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1403        return shard_afe.run('get_status_task',
1404                             host_id=host_id, end_time=end_time)
1405
1406
1407def get_host_diagnosis_interval(host_id, end_time, success):
1408    """Find a "diagnosis interval" for a given host.
1409
1410    A "diagnosis interval" identifies a start and end time where
1411    the host went from "working" to "broken", or vice versa.  The
1412    interval's starting time is the starting time of the last status
1413    task with the old status; the end time is the finish time of the
1414    first status task with the new status.
1415
1416    This routine finds the most recent diagnosis interval for the
1417    given host prior to `end_time`, with a starting status matching
1418    `success`.  If `success` is true, the interval will start with a
1419    successful status task; if false the interval will start with a
1420    failed status task.
1421
1422    @param host_id      Id in the database of the target host.
1423    @param end_time     Time reference for the diagnosis interval.
1424    @param success      Whether the diagnosis interval should start
1425                        with a successful or failed status task.
1426
1427    @return A list of two strings.  The first is the timestamp for
1428            the beginning of the interval; the second is the
1429            timestamp for the end.  If the host has never changed
1430            state, the list is empty.
1431
1432    """
1433    host = models.Host.smart_get(host_id)
1434    if not host.shard or utils.is_shard():
1435        return status_history.get_diagnosis_interval(
1436                host_id, end_time, success)
1437    else:
1438        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1439        return shard_afe.get_host_diagnosis_interval(
1440                host_id, end_time, success)
1441
1442
1443# support for host detail view
1444
1445def get_host_queue_entries_and_special_tasks(host, query_start=None,
1446                                             query_limit=None, start_time=None,
1447                                             end_time=None):
1448    """
1449    @returns an interleaved list of HostQueueEntries and SpecialTasks,
1450            in approximate run order.  each dict contains keys for type, host,
1451            job, status, started_on, execution_path, and ID.
1452    """
1453    total_limit = None
1454    if query_limit is not None:
1455        total_limit = query_start + query_limit
1456    filter_data_common = {'host': host,
1457                          'query_limit': total_limit,
1458                          'sort_by': ['-id']}
1459
1460    filter_data_special_tasks = rpc_utils.inject_times_to_filter(
1461            'time_started__gte', 'time_started__lte', start_time, end_time,
1462            **filter_data_common)
1463
1464    queue_entries = get_host_queue_entries(
1465            start_time, end_time, **filter_data_common)
1466    special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
1467
1468    interleaved_entries = rpc_utils.interleave_entries(queue_entries,
1469                                                       special_tasks)
1470    if query_start is not None:
1471        interleaved_entries = interleaved_entries[query_start:]
1472    if query_limit is not None:
1473        interleaved_entries = interleaved_entries[:query_limit]
1474    return rpc_utils.prepare_host_queue_entries_and_special_tasks(
1475            interleaved_entries, queue_entries)
1476
1477
1478def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
1479                                                 end_time=None):
1480    filter_data_common = {'host': host}
1481
1482    filter_data_queue_entries, filter_data_special_tasks = (
1483            rpc_utils.inject_times_to_hqe_special_tasks_filters(
1484                    filter_data_common, start_time, end_time))
1485
1486    return (models.HostQueueEntry.query_count(filter_data_queue_entries)
1487            + get_host_num_special_tasks(**filter_data_special_tasks))
1488
1489
1490# other
1491
1492def echo(data=""):
1493    """\
1494    Returns a passed in string. For doing a basic test to see if RPC calls
1495    can successfully be made.
1496    """
1497    return data
1498
1499
1500def get_motd():
1501    """\
1502    Returns the message of the day as a string.
1503    """
1504    return rpc_utils.get_motd()
1505
1506
1507def get_static_data():
1508    """\
1509    Returns a dictionary containing a bunch of data that shouldn't change
1510    often and is otherwise inaccessible.  This includes:
1511
1512    priorities: List of job priority choices.
1513    default_priority: Default priority value for new jobs.
1514    users: Sorted list of all users.
1515    labels: Sorted list of labels not start with 'cros-version' and
1516            'fw-version'.
1517    tests: Sorted list of all tests.
1518    profilers: Sorted list of all profilers.
1519    current_user: Logged-in username.
1520    host_statuses: Sorted list of possible Host statuses.
1521    job_statuses: Sorted list of possible HostQueueEntry statuses.
1522    job_timeout_default: The default job timeout length in minutes.
1523    parse_failed_repair_default: Default value for the parse_failed_repair job
1524            option.
1525    reboot_before_options: A list of valid RebootBefore string enums.
1526    reboot_after_options: A list of valid RebootAfter string enums.
1527    motd: Server's message of the day.
1528    status_dictionary: A mapping from one word job status names to a more
1529            informative description.
1530    """
1531
1532    default_drone_set_name = models.DroneSet.default_drone_set_name()
1533    drone_sets = ([default_drone_set_name] +
1534                  sorted(drone_set.name for drone_set in
1535                         models.DroneSet.objects.exclude(
1536                                 name=default_drone_set_name)))
1537
1538    result = {}
1539    result['priorities'] = priorities.Priority.choices()
1540    result['default_priority'] = 'Default'
1541    result['max_schedulable_priority'] = priorities.Priority.DEFAULT
1542    result['users'] = get_users(sort_by=['login'])
1543
1544    label_exclude_filters = [{'name__startswith': 'cros-version'},
1545                             {'name__startswith': 'fw-version'},
1546                             {'name__startswith': 'fwrw-version'},
1547                             {'name__startswith': 'fwro-version'},
1548                             {'name__startswith': 'ab-version'},
1549                             {'name__startswith': 'testbed-version'}]
1550    result['labels'] = get_labels(
1551        label_exclude_filters,
1552        sort_by=['-platform', 'name'])
1553
1554    result['tests'] = get_tests(sort_by=['name'])
1555    result['profilers'] = get_profilers(sort_by=['name'])
1556    result['current_user'] = rpc_utils.prepare_for_serialization(
1557        models.User.current_user().get_object_dict())
1558    result['host_statuses'] = sorted(models.Host.Status.names)
1559    result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
1560    result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
1561    result['job_max_runtime_mins_default'] = (
1562        models.Job.DEFAULT_MAX_RUNTIME_MINS)
1563    result['parse_failed_repair_default'] = bool(
1564        models.Job.DEFAULT_PARSE_FAILED_REPAIR)
1565    result['reboot_before_options'] = model_attributes.RebootBefore.names
1566    result['reboot_after_options'] = model_attributes.RebootAfter.names
1567    result['motd'] = rpc_utils.get_motd()
1568    result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
1569    result['drone_sets'] = drone_sets
1570
1571    result['status_dictionary'] = {"Aborted": "Aborted",
1572                                   "Verifying": "Verifying Host",
1573                                   "Provisioning": "Provisioning Host",
1574                                   "Pending": "Waiting on other hosts",
1575                                   "Running": "Running autoserv",
1576                                   "Completed": "Autoserv completed",
1577                                   "Failed": "Failed to complete",
1578                                   "Queued": "Queued",
1579                                   "Starting": "Next in host's queue",
1580                                   "Stopped": "Other host(s) failed verify",
1581                                   "Parsing": "Awaiting parse of final results",
1582                                   "Gathering": "Gathering log files",
1583                                   "Waiting": "Waiting for scheduler action",
1584                                   "Archiving": "Archiving results",
1585                                   "Resetting": "Resetting hosts"}
1586
1587    result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
1588    result['is_moblab'] = bool(utils.is_moblab())
1589
1590    return result
1591
1592
1593def get_server_time():
1594    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
1595
1596
1597def ping_db():
1598    """Simple connection test to db"""
1599    try:
1600        db_connection.cursor()
1601    except DatabaseError:
1602        return [False]
1603    return [True]
1604
1605
1606def get_hosts_by_attribute(attribute, value):
1607    """
1608    Get the list of valid hosts that share the same host attribute value.
1609
1610    @param attribute: String of the host attribute to check.
1611    @param value: String of the value that is shared between hosts.
1612
1613    @returns List of hostnames that all have the same host attribute and
1614             value.
1615    """
1616    hosts = models.HostAttribute.query_objects({'attribute': attribute,
1617                                                'value': value})
1618    return [row.host.hostname for row in hosts if row.host.invalid == 0]
1619
1620
1621def canonicalize_suite_name(suite_name):
1622    """Canonicalize the suite's name.
1623
1624    @param suite_name: the name of the suite.
1625    """
1626    # Do not change this naming convention without updating
1627    # site_utils.parse_job_name.
1628    return 'test_suites/control.%s' % suite_name
1629
1630
1631def formatted_now():
1632    """Format the current datetime."""
1633    return datetime.datetime.now().strftime(time_utils.TIME_FMT)
1634
1635
1636def _get_control_file_by_build(build, ds, suite_name):
1637    """Return control file contents for |suite_name|.
1638
1639    Query the dev server at |ds| for the control file |suite_name|, included
1640    in |build| for |board|.
1641
1642    @param build: unique name by which to refer to the image from now on.
1643    @param ds: a dev_server.DevServer instance to fetch control file with.
1644    @param suite_name: canonicalized suite name, e.g. test_suites/control.bvt.
1645    @raises ControlFileNotFound if a unique suite control file doesn't exist.
1646    @raises NoControlFileList if we can't list the control files at all.
1647    @raises ControlFileEmpty if the control file exists on the server, but
1648                             can't be read.
1649
1650    @return the contents of the desired control file.
1651    """
1652    getter = control_file_getter.DevServerGetter.create(build, ds)
1653    devserver_name = ds.hostname
1654    # Get the control file for the suite.
1655    try:
1656        control_file_in = getter.get_control_file_contents_by_name(suite_name)
1657    except error.CrosDynamicSuiteException as e:
1658        raise type(e)('Failed to get control file for %s '
1659                      '(devserver: %s) (error: %s)' %
1660                      (build, devserver_name, e))
1661    if not control_file_in:
1662        raise error.ControlFileEmpty(
1663            "Fetching %s returned no data. (devserver: %s)" %
1664            (suite_name, devserver_name))
1665    # Force control files to only contain ascii characters.
1666    try:
1667        control_file_in.encode('ascii')
1668    except UnicodeDecodeError as e:
1669        raise error.ControlFileMalformed(str(e))
1670
1671    return control_file_in
1672
1673
1674def _get_control_file_by_suite(suite_name):
1675    """Get control file contents by suite name.
1676
1677    @param suite_name: Suite name as string.
1678    @returns: Control file contents as string.
1679    """
1680    getter = control_file_getter.FileSystemGetter(
1681            [_CONFIG.get_config_value('SCHEDULER',
1682                                      'drone_installation_directory')])
1683    return getter.get_control_file_contents_by_name(suite_name)
1684
1685
1686def _stage_build_artifacts(build, hostname=None):
1687    """
1688    Ensure components of |build| necessary for installing images are staged.
1689
1690    @param build image we want to stage.
1691    @param hostname hostname of a dut may run test on. This is to help to locate
1692        a devserver closer to duts if needed. Default is None.
1693
1694    @raises StageControlFileFailure: if the dev server throws 500 while staging
1695        suite control files.
1696
1697    @return: dev_server.ImageServer instance to use with this build.
1698    @return: timings dictionary containing staging start/end times.
1699    """
1700    timings = {}
1701    # Ensure components of |build| necessary for installing images are staged
1702    # on the dev server. However set synchronous to False to allow other
1703    # components to be downloaded in the background.
1704    ds = dev_server.resolve(build, hostname=hostname)
1705    ds_name = ds.hostname
1706    timings[constants.DOWNLOAD_STARTED_TIME] = formatted_now()
1707    try:
1708        ds.stage_artifacts(image=build, artifacts=['test_suites'])
1709    except dev_server.DevServerException as e:
1710        raise error.StageControlFileFailure(
1711                "Failed to stage %s on %s: %s" % (build, ds_name, e))
1712    timings[constants.PAYLOAD_FINISHED_TIME] = formatted_now()
1713    return (ds, timings)
1714
1715
1716@rpc_utils.route_rpc_to_master
1717def create_suite_job(
1718        name='',
1719        board='',
1720        pool='',
1721        control_file='',
1722        check_hosts=True,
1723        num=None,
1724        file_bugs=False,
1725        timeout=24,
1726        timeout_mins=None,
1727        priority=priorities.Priority.DEFAULT,
1728        suite_args=None,
1729        wait_for_results=True,
1730        job_retry=False,
1731        max_retries=None,
1732        max_runtime_mins=None,
1733        suite_min_duts=0,
1734        offload_failures_only=False,
1735        builds=None,
1736        test_source_build=None,
1737        run_prod_code=False,
1738        delay_minutes=0,
1739        is_cloning=False,
1740        job_keyvals=None,
1741        test_args=None,
1742        **kwargs
1743):
1744    """
1745    Create a job to run a test suite on the given device with the given image.
1746
1747    When the timeout specified in the control file is reached, the
1748    job is guaranteed to have completed and results will be available.
1749
1750    @param name: The test name if control_file is supplied, otherwise the name
1751                 of the test suite to run, e.g. 'bvt'.
1752    @param board: the kind of device to run the tests on.
1753    @param builds: the builds to install e.g.
1754                   {'cros-version:': 'x86-alex-release/R18-1655.0.0',
1755                    'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
1756                    'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
1757                   If builds is given a value, it overrides argument build.
1758    @param test_source_build: Build that contains the server-side test code.
1759    @param pool: Specify the pool of machines to use for scheduling
1760            purposes.
1761    @param control_file: the control file of the job.
1762    @param check_hosts: require appropriate live hosts to exist in the lab.
1763    @param num: Specify the number of machines to schedule across (integer).
1764                Leave unspecified or use None to use default sharding factor.
1765    @param file_bugs: File a bug on each test failure in this suite.
1766    @param timeout: The max lifetime of this suite, in hours.
1767    @param timeout_mins: The max lifetime of this suite, in minutes. Takes
1768                         priority over timeout.
1769    @param priority: Integer denoting priority. Higher is more important.
1770    @param suite_args: Optional arguments which will be parsed by the suite
1771                       control file. Used by control.test_that_wrapper to
1772                       determine which tests to run.
1773    @param wait_for_results: Set to False to run the suite job without waiting
1774                             for test jobs to finish. Default is True.
1775    @param job_retry: Set to True to enable job-level retry. Default is False.
1776    @param max_retries: Integer, maximum job retries allowed at suite level.
1777                        None for no max.
1778    @param max_runtime_mins: Maximum amount of time a job can be running in
1779                             minutes.
1780    @param suite_min_duts: Integer. Scheduler will prioritize getting the
1781                           minimum number of machines for the suite when it is
1782                           competing with another suite that has a higher
1783                           priority but already got minimum machines it needs.
1784    @param offload_failures_only: Only enable gs_offloading for failed jobs.
1785    @param run_prod_code: If True, the suite will run the test code that
1786                          lives in prod aka the test code currently on the
1787                          lab servers. If False, the control files and test
1788                          code for this suite run will be retrieved from the
1789                          build artifacts.
1790    @param delay_minutes: Delay the creation of test jobs for a given number of
1791                          minutes.
1792    @param is_cloning: True if creating a cloning job.
1793    @param job_keyvals: A dict of job keyvals to be inject to control file.
1794    @param test_args: A dict of args passed all the way to each individual test
1795                      that will be actually run.
1796    @param kwargs: extra keyword args. NOT USED.
1797
1798    @raises ControlFileNotFound: if a unique suite control file doesn't exist.
1799    @raises NoControlFileList: if we can't list the control files at all.
1800    @raises StageControlFileFailure: If the dev server throws 500 while
1801                                     staging test_suites.
1802    @raises ControlFileEmpty: if the control file exists on the server, but
1803                              can't be read.
1804
1805    @return: the job ID of the suite; -1 on error.
1806    """
1807    if type(num) is not int and num is not None:
1808        raise error.SuiteArgumentException('Ill specified num argument %r. '
1809                                           'Must be an integer or None.' % num)
1810    if num == 0:
1811        logging.warning("Can't run on 0 hosts; using default.")
1812        num = None
1813
1814    if builds is None:
1815        builds = {}
1816
1817    # Default test source build to CrOS build if it's not specified and
1818    # run_prod_code is set to False.
1819    if not run_prod_code:
1820        test_source_build = Suite.get_test_source_build(
1821                builds, test_source_build=test_source_build)
1822
1823    sample_dut = rpc_utils.get_sample_dut(board, pool)
1824
1825    suite_name = canonicalize_suite_name(name)
1826    if run_prod_code:
1827        ds = dev_server.resolve(test_source_build, hostname=sample_dut)
1828        keyvals = {}
1829    else:
1830        (ds, keyvals) = _stage_build_artifacts(
1831                test_source_build, hostname=sample_dut)
1832    keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
1833
1834    # Do not change this naming convention without updating
1835    # site_utils.parse_job_name.
1836    if run_prod_code:
1837        # If run_prod_code is True, test_source_build is not set, use the
1838        # first build in the builds list for the sutie job name.
1839        name = '%s-%s' % (builds.values()[0], suite_name)
1840    else:
1841        name = '%s-%s' % (test_source_build, suite_name)
1842
1843    timeout_mins = timeout_mins or timeout * 60
1844    max_runtime_mins = max_runtime_mins or timeout * 60
1845
1846    if not board:
1847        board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
1848
1849    if run_prod_code:
1850        control_file = _get_control_file_by_suite(suite_name)
1851
1852    if not control_file:
1853        # No control file was supplied so look it up from the build artifacts.
1854        control_file = _get_control_file_by_build(
1855                test_source_build, ds, suite_name)
1856
1857    # Prepend builds and board to the control file.
1858    if is_cloning:
1859        control_file = tools.remove_injection(control_file)
1860
1861    inject_dict = {
1862        'board': board,
1863        # `build` is needed for suites like AU to stage image inside suite
1864        # control file.
1865        'build': test_source_build,
1866        'builds': builds,
1867        'check_hosts': check_hosts,
1868        'pool': pool,
1869        'num': num,
1870        'file_bugs': file_bugs,
1871        'timeout': timeout,
1872        'timeout_mins': timeout_mins,
1873        'devserver_url': ds.url(),
1874        'priority': priority,
1875        'suite_args' : suite_args,
1876        'wait_for_results': wait_for_results,
1877        'job_retry': job_retry,
1878        'max_retries': max_retries,
1879        'max_runtime_mins': max_runtime_mins,
1880        'offload_failures_only': offload_failures_only,
1881        'test_source_build': test_source_build,
1882        'run_prod_code': run_prod_code,
1883        'delay_minutes': delay_minutes,
1884        'job_keyvals': job_keyvals,
1885        'test_args': test_args,
1886    }
1887    control_file = tools.inject_vars(inject_dict, control_file)
1888
1889    return rpc_utils.create_job_common(name,
1890                                       priority=priority,
1891                                       timeout_mins=timeout_mins,
1892                                       max_runtime_mins=max_runtime_mins,
1893                                       control_type='Server',
1894                                       control_file=control_file,
1895                                       hostless=True,
1896                                       keyvals=keyvals)
1897
1898
1899def get_job_history(**filter_data):
1900    """Get history of the job, including the special tasks executed for the job
1901
1902    @param filter_data: filter for the call, should at least include
1903                        {'job_id': [job id]}
1904    @returns: JSON string of the job's history, including the information such
1905              as the hosts run the job and the special tasks executed before
1906              and after the job.
1907    """
1908    job_id = filter_data['job_id']
1909    job_info = job_history.get_job_info(job_id)
1910    return rpc_utils.prepare_for_serialization(job_info.get_history())
1911
1912
1913def get_host_history(start_time, end_time, hosts=None, board=None, pool=None):
1914    """Deprecated."""
1915    raise ValueError('get_host_history rpc is deprecated '
1916                     'and no longer implemented.')
1917
1918
1919def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
1920                    known_host_ids=(), known_host_statuses=()):
1921    """Receive updates for job statuses from shards and assign hosts and jobs.
1922
1923    @param shard_hostname: Hostname of the calling shard
1924    @param jobs: Jobs in serialized form that should be updated with newer
1925                 status from a shard.
1926    @param hqes: Hostqueueentries in serialized form that should be updated with
1927                 newer status from a shard. Note that for every hostqueueentry
1928                 the corresponding job must be in jobs.
1929    @param known_job_ids: List of ids of jobs the shard already has.
1930    @param known_host_ids: List of ids of hosts the shard already has.
1931    @param known_host_statuses: List of statuses of hosts the shard already has.
1932
1933    @returns: Serialized representations of hosts, jobs, suite job keyvals
1934              and their dependencies to be inserted into a shard's database.
1935    """
1936    # The following alternatives to sending host and job ids in every heartbeat
1937    # have been considered:
1938    # 1. Sending the highest known job and host ids. This would work for jobs:
1939    #    Newer jobs always have larger ids. Also, if a job is not assigned to a
1940    #    particular shard during a heartbeat, it never will be assigned to this
1941    #    shard later.
1942    #    This is not true for hosts though: A host that is leased won't be sent
1943    #    to the shard now, but might be sent in a future heartbeat. This means
1944    #    sometimes hosts should be transfered that have a lower id than the
1945    #    maximum host id the shard knows.
1946    # 2. Send the number of jobs/hosts the shard knows to the master in each
1947    #    heartbeat. Compare these to the number of records that already have
1948    #    the shard_id set to this shard. In the normal case, they should match.
1949    #    In case they don't, resend all entities of that type.
1950    #    This would work well for hosts, because there aren't that many.
1951    #    Resending all jobs is quite a big overhead though.
1952    #    Also, this approach might run into edge cases when entities are
1953    #    ever deleted.
1954    # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
1955    #    Using two different approaches isn't consistent and might cause
1956    #    confusion. Also the issues with the case of deletions might still
1957    #    occur.
1958    #
1959    # The overhead of sending all job and host ids in every heartbeat is low:
1960    # At peaks one board has about 1200 created but unfinished jobs.
1961    # See the numbers here: http://goo.gl/gQCGWH
1962    # Assuming that job id's have 6 digits and that json serialization takes a
1963    # comma and a space as overhead, the traffic per id sent is about 8 bytes.
1964    # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
1965    # A NOT IN query with 5000 ids took about 30ms in tests made.
1966    # These numbers seem low enough to outweigh the disadvantages of the
1967    # solutions described above.
1968    shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
1969    rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
1970    assert len(known_host_ids) == len(known_host_statuses)
1971    for i in range(len(known_host_ids)):
1972        host_model = models.Host.objects.get(pk=known_host_ids[i])
1973        if host_model.status != known_host_statuses[i]:
1974            host_model.status = known_host_statuses[i]
1975            host_model.save()
1976
1977    hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard(
1978            shard_obj, known_job_ids=known_job_ids,
1979            known_host_ids=known_host_ids)
1980    return {
1981        'hosts': [host.serialize() for host in hosts],
1982        'jobs': [job.serialize() for job in jobs],
1983        'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
1984        'incorrect_host_ids': [int(i) for i in inc_ids],
1985    }
1986
1987
1988def get_shards(**filter_data):
1989    """Return a list of all shards.
1990
1991    @returns A sequence of nested dictionaries of shard information.
1992    """
1993    shards = models.Shard.query_objects(filter_data)
1994    serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
1995    for serialized, shard in zip(serialized_shards, shards):
1996        serialized['labels'] = [label.name for label in shard.labels.all()]
1997
1998    return serialized_shards
1999
2000
2001def _assign_board_to_shard_precheck(labels):
2002    """Verify whether board labels are valid to be added to a given shard.
2003
2004    First check whether board label is in correct format. Second, check whether
2005    the board label exist. Third, check whether the board has already been
2006    assigned to shard.
2007
2008    @param labels: Board labels separated by comma.
2009
2010    @raises error.RPCException: If label provided doesn't start with `board:`
2011            or board has been added to shard already.
2012    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2013
2014    @returns: A list of label models that ready to be added to shard.
2015    """
2016    if not labels:
2017      # allow creation of label-less shards (labels='' would otherwise fail the
2018      # checks below)
2019      return []
2020    labels = labels.split(',')
2021    label_models = []
2022    for label in labels:
2023        # Check whether the board label is in correct format.
2024        if not label.startswith('board:'):
2025            raise error.RPCException('Sharding only supports `board:.*` label.')
2026        # Check whether the board label exist. If not, exception will be thrown
2027        # by smart_get function.
2028        label = models.Label.smart_get(label)
2029        # Check whether the board has been sharded already
2030        try:
2031            shard = models.Shard.objects.get(labels=label)
2032            raise error.RPCException(
2033                    '%s is already on shard %s' % (label, shard.hostname))
2034        except models.Shard.DoesNotExist:
2035            # board is not on any shard, so it's valid.
2036            label_models.append(label)
2037    return label_models
2038
2039
2040def add_shard(hostname, labels):
2041    """Add a shard and start running jobs on it.
2042
2043    @param hostname: The hostname of the shard to be added; needs to be unique.
2044    @param labels: Board labels separated by comma. Jobs of one of the labels
2045                   will be assigned to the shard.
2046
2047    @raises error.RPCException: If label provided doesn't start with `board:` or
2048            board has been added to shard already.
2049    @raises model_logic.ValidationError: If a shard with the given hostname
2050            already exist.
2051    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2052
2053    @returns: The id of the added shard.
2054    """
2055    labels = _assign_board_to_shard_precheck(labels)
2056    shard = models.Shard.add_object(hostname=hostname)
2057    for label in labels:
2058        shard.labels.add(label)
2059    return shard.id
2060
2061
2062def add_board_to_shard(hostname, labels):
2063    """Add boards to a given shard
2064
2065    @param hostname: The hostname of the shard to be changed.
2066    @param labels: Board labels separated by comma.
2067
2068    @raises error.RPCException: If label provided doesn't start with `board:` or
2069            board has been added to shard already.
2070    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2071
2072    @returns: The id of the changed shard.
2073    """
2074    labels = _assign_board_to_shard_precheck(labels)
2075    shard = models.Shard.objects.get(hostname=hostname)
2076    for label in labels:
2077        shard.labels.add(label)
2078    return shard.id
2079
2080
2081# Remove board RPCs are rare, so we can afford to make them a bit more
2082# expensive (by performing in a transaction) in order to guarantee
2083# atomicity.
2084# TODO(akeshet): If we ever update to newer version of django, we need to
2085# migrate to transaction.atomic instead of commit_on_success
2086@transaction.commit_on_success
2087def remove_board_from_shard(hostname, label):
2088    """Remove board from the given shard.
2089    @param hostname: The hostname of the shard to be changed.
2090    @param labels: Board label.
2091
2092    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2093
2094    @returns: The id of the changed shard.
2095    """
2096    shard = models.Shard.objects.get(hostname=hostname)
2097    label = models.Label.smart_get(label)
2098    if label not in shard.labels.all():
2099        raise error.RPCException(
2100          'Cannot remove label from shard that does not belong to it.')
2101    shard.labels.remove(label)
2102    models.Host.objects.filter(labels__in=[label]).update(shard=None)
2103
2104
2105def delete_shard(hostname):
2106    """Delete a shard and reclaim all resources from it.
2107
2108    This claims back all assigned hosts from the shard. To ensure all DUTs are
2109    in a sane state, a Reboot task with highest priority is scheduled for them.
2110    This reboots the DUTs and then all left tasks continue to run in drone of
2111    the master.
2112
2113    The procedure for deleting a shard:
2114        * Lock all unlocked hosts on that shard.
2115        * Remove shard information .
2116        * Assign a reboot task with highest priority to these hosts.
2117        * Unlock these hosts, then, the reboot tasks run in front of all other
2118        tasks.
2119
2120    The status of jobs that haven't been reported to be finished yet, will be
2121    lost. The master scheduler will pick up the jobs and execute them.
2122
2123    @param hostname: Hostname of the shard to delete.
2124    """
2125    shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
2126    hostnames_to_lock = [h.hostname for h in
2127                         models.Host.objects.filter(shard=shard, locked=False)]
2128
2129    # TODO(beeps): Power off shard
2130    # For ChromeOS hosts, a reboot test with the highest priority is added to
2131    # the DUT. After a reboot it should be ganranteed that no processes from
2132    # prior tests that were run by a shard are still running on.
2133
2134    # Lock all unlocked hosts.
2135    dicts = {'locked': True, 'lock_time': datetime.datetime.now()}
2136    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
2137
2138    # Remove shard information.
2139    models.Host.objects.filter(shard=shard).update(shard=None)
2140    models.Job.objects.filter(shard=shard).update(shard=None)
2141    shard.labels.clear()
2142    shard.delete()
2143
2144    # Assign a reboot task with highest priority: Super.
2145    t = models.Test.objects.get(name='platform_BootPerfServer:shard')
2146    c = utils.read_file(os.path.join(common.autotest_dir, t.path))
2147    if hostnames_to_lock:
2148        rpc_utils.create_job_common(
2149                'reboot_dut_for_shard_deletion',
2150                priority=priorities.Priority.SUPER,
2151                control_type='Server',
2152                control_file=c, hosts=hostnames_to_lock,
2153                timeout_mins=10,
2154                max_runtime_mins=10)
2155
2156    # Unlock these shard-related hosts.
2157    dicts = {'locked': False, 'lock_time': None}
2158    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
2159
2160
2161def get_servers(hostname=None, role=None, status=None):
2162    """Get a list of servers with matching role and status.
2163
2164    @param hostname: FQDN of the server.
2165    @param role: Name of the server role, e.g., drone, scheduler. Default to
2166                 None to match any role.
2167    @param status: Status of the server, e.g., primary, backup, repair_required.
2168                   Default to None to match any server status.
2169
2170    @raises error.RPCException: If server database is not used.
2171    @return: A list of server names for servers with matching role and status.
2172    """
2173    if not server_manager_utils.use_server_db():
2174        raise error.RPCException('Server database is not enabled. Please try '
2175                                 'retrieve servers from global config.')
2176    servers = server_manager_utils.get_servers(hostname=hostname, role=role,
2177                                               status=status)
2178    return [s.get_details() for s in servers]
2179
2180
2181@rpc_utils.route_rpc_to_master
2182def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
2183    """Get stable version for the given board.
2184
2185    @param board: Name of the board.
2186    @param android: If True, the given board is an Android-based device. If
2187                    False, assume its a Chrome OS-based device.
2188
2189    @return: Stable version of the given board. Return global configure value
2190             of CROS.stable_cros_version if stable_versinos table does not have
2191             entry of board DEFAULT.
2192    """
2193    return stable_version_utils.get(board=board, android=android)
2194
2195
2196@rpc_utils.route_rpc_to_master
2197def get_all_stable_versions():
2198    """Get stable versions for all boards.
2199
2200    @return: A dictionary of board:version.
2201    """
2202    return stable_version_utils.get_all()
2203
2204
2205@rpc_utils.route_rpc_to_master
2206def set_stable_version(version, board=stable_version_utils.DEFAULT):
2207    """Modify stable version for the given board.
2208
2209    @param version: The new value of stable version for given board.
2210    @param board: Name of the board, default to value `DEFAULT`.
2211    """
2212    stable_version_utils.set(version=version, board=board)
2213
2214
2215@rpc_utils.route_rpc_to_master
2216def delete_stable_version(board):
2217    """Modify stable version for the given board.
2218
2219    Delete a stable version entry in afe_stable_versions table for a given
2220    board, so default stable version will be used.
2221
2222    @param board: Name of the board.
2223    """
2224    stable_version_utils.delete(board=board)
2225
2226
2227def get_tests_by_build(build, ignore_invalid_tests=True):
2228    """Get the tests that are available for the specified build.
2229
2230    @param build: unique name by which to refer to the image.
2231    @param ignore_invalid_tests: flag on if unparsable tests are ignored.
2232
2233    @return: A sorted list of all tests that are in the build specified.
2234    """
2235    # Collect the control files specified in this build
2236    cfile_getter = control_file_lib._initialize_control_file_getter(build)
2237    if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
2238        control_file_info_list = cfile_getter.get_suite_info()
2239        control_file_list = control_file_info_list.keys()
2240    else:
2241        control_file_list = cfile_getter.get_control_file_list()
2242
2243    test_objects = []
2244    _id = 0
2245    for control_file_path in control_file_list:
2246        # Read and parse the control file
2247        if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
2248            control_file = control_file_info_list[control_file_path]
2249        else:
2250            control_file = cfile_getter.get_control_file_contents(
2251                    control_file_path)
2252        try:
2253            control_obj = control_data.parse_control_string(control_file)
2254        except:
2255            logging.info('Failed to parse control file: %s', control_file_path)
2256            if not ignore_invalid_tests:
2257                raise
2258
2259        # Extract the values needed for the AFE from the control_obj.
2260        # The keys list represents attributes in the control_obj that
2261        # are required by the AFE
2262        keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
2263                'test_category', 'test_class', 'dependencies', 'run_verify',
2264                'sync_count', 'job_retries', 'retries', 'path']
2265
2266        test_object = {}
2267        for key in keys:
2268            test_object[key] = getattr(control_obj, key) if hasattr(
2269                    control_obj, key) else ''
2270
2271        # Unfortunately, the AFE expects different key-names for certain
2272        # values, these must be corrected to avoid the risk of tests
2273        # being omitted by the AFE.
2274        # The 'id' is an additional value used in the AFE.
2275        # The control_data parsing does not reference 'run_reset', but it
2276        # is also used in the AFE and defaults to True.
2277        test_object['id'] = _id
2278        test_object['run_reset'] = True
2279        test_object['description'] = test_object.get('doc', '')
2280        test_object['test_time'] = test_object.get('time', 0)
2281        test_object['test_retry'] = test_object.get('retries', 0)
2282
2283        # Fix the test name to be consistent with the current presentation
2284        # of test names in the AFE.
2285        testpath, subname = os.path.split(control_file_path)
2286        testname = os.path.basename(testpath)
2287        subname = subname.split('.')[1:]
2288        if subname:
2289            testname = '%s:%s' % (testname, ':'.join(subname))
2290
2291        test_object['name'] = testname
2292
2293        # Correct the test path as parse_control_string sets an empty string.
2294        test_object['path'] = control_file_path
2295
2296        _id += 1
2297        test_objects.append(test_object)
2298
2299    test_objects = sorted(test_objects, key=lambda x: x.get('name'))
2300    return rpc_utils.prepare_for_serialization(test_objects)
2301