• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# pylint: disable-msg=C0111
2
3"""\
4Functions to expose over the RPC interface.
5
6For all modify* and delete* functions that ask for an 'id' parameter to
7identify the object to operate on, the id may be either
8 * the database row ID
9 * the name of the object (label name, hostname, user login, etc.)
10 * a dictionary containing uniquely identifying field (this option should seldom
11   be used)
12
13When specifying foreign key fields (i.e. adding hosts to a label, or adding
14users to an ACL group), the given value may be either the database row ID or the
15name of the object.
16
17All get* functions return lists of dictionaries.  Each dictionary represents one
18object and maps field names to values.
19
20Some examples:
21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
23modify_test('sleeptest', test_type='Client', params=', seconds=60')
24delete_acl_group(1) # delete by ID
25delete_acl_group('Everyone') # delete by name
26acl_group_add_users('Everyone', ['mbligh', 'showard'])
27get_jobs(owner='showard', status='Queued')
28
29See doctests/001_rpc_test.txt for (lots) more examples.
30"""
31
32__author__ = 'showard@google.com (Steve Howard)'
33
34import sys
35import datetime
36import logging
37
38from django.db.models import Count
39import common
40from autotest_lib.client.common_lib import priorities
41from autotest_lib.client.common_lib.cros import dev_server
42from autotest_lib.client.common_lib.cros.graphite import autotest_stats
43from autotest_lib.frontend.afe import control_file, rpc_utils
44from autotest_lib.frontend.afe import models, model_logic, model_attributes
45from autotest_lib.frontend.afe import site_rpc_interface
46from autotest_lib.frontend.tko import models as tko_models
47from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
48from autotest_lib.server import frontend
49from autotest_lib.server import utils
50from autotest_lib.server.cros import provision
51from autotest_lib.server.cros.dynamic_suite import tools
52from autotest_lib.site_utils import status_history
53
54
55_timer = autotest_stats.Timer('rpc_interface')
56
57def get_parameterized_autoupdate_image_url(job):
58    """Get the parameterized autoupdate image url from a parameterized job."""
59    known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
60    image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
61                                                           name='image')
62    para_set = job.parameterized_job.parameterizedjobparameter_set
63    job_test_para = para_set.get(test_parameter=image_parameter)
64    return job_test_para.parameter_value
65
66
67# labels
68
69def modify_label(id, **data):
70    """Modify a label.
71
72    @param id: id or name of a label. More often a label name.
73    @param data: New data for a label.
74    """
75    label_model = models.Label.smart_get(id)
76    label_model.update_object(data)
77
78    # Master forwards the RPC to shards
79    if not utils.is_shard():
80        rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
81                             id=id, **data)
82
83
84def delete_label(id):
85    """Delete a label.
86
87    @param id: id or name of a label. More often a label name.
88    """
89    label_model = models.Label.smart_get(id)
90    # Hosts that have the label to be deleted. Save this info before
91    # the label is deleted to use it later.
92    hosts = []
93    for h in label_model.host_set.all():
94        hosts.append(models.Host.smart_get(h.id))
95    label_model.delete()
96
97    # Master forwards the RPC to shards
98    if not utils.is_shard():
99        rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
100
101
102def add_label(name, ignore_exception_if_exists=False, **kwargs):
103    """Adds a new label of a given name.
104
105    @param name: label name.
106    @param ignore_exception_if_exists: If True and the exception was
107        thrown due to the duplicated label name when adding a label,
108        then suppress the exception. Default is False.
109    @param kwargs: keyword args that store more info about a label
110        other than the name.
111    @return: int/long id of a new label.
112    """
113    # models.Label.add_object() throws model_logic.ValidationError
114    # when it is given a label name that already exists.
115    # However, ValidationError can be thrown with different errors,
116    # and those errors should be thrown up to the call chain.
117    try:
118        label = models.Label.add_object(name=name, **kwargs)
119    except:
120        exc_info = sys.exc_info()
121        if ignore_exception_if_exists:
122            label = rpc_utils.get_label(name)
123            # If the exception is raised not because of duplicated
124            # "name", then raise the original exception.
125            if label is None:
126                raise exc_info[0], exc_info[1], exc_info[2]
127        else:
128            raise exc_info[0], exc_info[1], exc_info[2]
129    return label.id
130
131
132def add_label_to_hosts(id, hosts):
133    """Adds a label of the given id to the given hosts only in local DB.
134
135    @param id: id or name of a label. More often a label name.
136    @param hosts: The hostnames of hosts that need the label.
137
138    @raises models.Label.DoesNotExist: If the label with id doesn't exist.
139    """
140    label = models.Label.smart_get(id)
141    host_objs = models.Host.smart_get_bulk(hosts)
142    if label.platform:
143        models.Host.check_no_platform(host_objs)
144    label.host_set.add(*host_objs)
145
146
147@rpc_utils.route_rpc_to_master
148def label_add_hosts(id, hosts):
149    """Adds a label with the given id to the given hosts.
150
151    This method should be run only on master not shards.
152    The given label will be created if it doesn't exist, provided the `id`
153    supplied is a label name not an int/long id.
154
155    @param id: id or name of a label. More often a label name.
156    @param hosts: A list of hostnames or ids. More often hostnames.
157
158    @raises ValueError: If the id specified is an int/long (label id)
159                        while the label does not exist.
160    """
161    try:
162        label = models.Label.smart_get(id)
163    except models.Label.DoesNotExist:
164        # This matches the type checks in smart_get, which is a hack
165        # in and off itself. The aim here is to create any non-existent
166        # label, which we cannot do if the 'id' specified isn't a label name.
167        if isinstance(id, basestring):
168            label = models.Label.smart_get(add_label(id))
169        else:
170            raise ValueError('Label id (%s) does not exist. Please specify '
171                             'the argument, id, as a string (label name).'
172                             % id)
173    add_label_to_hosts(id, hosts)
174
175    host_objs = models.Host.smart_get_bulk(hosts)
176    # Make sure the label exists on the shard with the same id
177    # as it is on the master.
178    # It is possible that the label is already in a shard because
179    # we are adding a new label only to shards of hosts that the label
180    # is going to be attached.
181    # For example, we add a label L1 to a host in shard S1.
182    # Master and S1 will have L1 but other shards won't.
183    # Later, when we add the same label L1 to hosts in shards S1 and S2,
184    # S1 already has the label but S2 doesn't.
185    # S2 should have the new label without any problem.
186    # We ignore exception in such a case.
187    rpc_utils.fanout_rpc(
188            host_objs, 'add_label', include_hostnames=False,
189            name=label.name, ignore_exception_if_exists=True,
190            id=label.id, platform=label.platform)
191    rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
192
193
194def remove_label_from_hosts(id, hosts):
195    """Removes a label of the given id from the given hosts only in local DB.
196
197    @param id: id or name of a label.
198    @param hosts: The hostnames of hosts that need to remove the label from.
199    """
200    host_objs = models.Host.smart_get_bulk(hosts)
201    models.Label.smart_get(id).host_set.remove(*host_objs)
202
203
204@rpc_utils.route_rpc_to_master
205def label_remove_hosts(id, hosts):
206    """Removes a label of the given id from the given hosts.
207
208    This method should be run only on master not shards.
209
210    @param id: id or name of a label.
211    @param hosts: A list of hostnames or ids. More often hostnames.
212    """
213    host_objs = models.Host.smart_get_bulk(hosts)
214    remove_label_from_hosts(id, hosts)
215
216    rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
217
218
219def get_labels(exclude_filters=(), **filter_data):
220    """\
221    @param exclude_filters: A sequence of dictionaries of filters.
222
223    @returns A sequence of nested dictionaries of label information.
224    """
225    labels = models.Label.query_objects(filter_data)
226    for exclude_filter in exclude_filters:
227        labels = labels.exclude(**exclude_filter)
228    return rpc_utils.prepare_rows_as_nested_dicts(labels, ('atomic_group',))
229
230
231# atomic groups
232
233def add_atomic_group(name, max_number_of_machines=None, description=None):
234    return models.AtomicGroup.add_object(
235            name=name, max_number_of_machines=max_number_of_machines,
236            description=description).id
237
238
239def modify_atomic_group(id, **data):
240    models.AtomicGroup.smart_get(id).update_object(data)
241
242
243def delete_atomic_group(id):
244    models.AtomicGroup.smart_get(id).delete()
245
246
247def atomic_group_add_labels(id, labels):
248    label_objs = models.Label.smart_get_bulk(labels)
249    models.AtomicGroup.smart_get(id).label_set.add(*label_objs)
250
251
252def atomic_group_remove_labels(id, labels):
253    label_objs = models.Label.smart_get_bulk(labels)
254    models.AtomicGroup.smart_get(id).label_set.remove(*label_objs)
255
256
257def get_atomic_groups(**filter_data):
258    return rpc_utils.prepare_for_serialization(
259            models.AtomicGroup.list_objects(filter_data))
260
261
262# hosts
263
264def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
265    if locked and not lock_reason:
266        raise model_logic.ValidationError(
267            {'locked': 'Please provide a reason for locking when adding host.'})
268
269    return models.Host.add_object(hostname=hostname, status=status,
270                                  locked=locked, lock_reason=lock_reason,
271                                  protection=protection).id
272
273
274@rpc_utils.route_rpc_to_master
275def modify_host(id, **kwargs):
276    """Modify local attributes of a host.
277
278    If this is called on the master, but the host is assigned to a shard, this
279    will call `modify_host_local` RPC to the responsible shard. This means if
280    a host is being locked using this function, this change will also propagate
281    to shards.
282    When this is called on a shard, the shard just routes the RPC to the master
283    and does nothing.
284
285    @param id: id of the host to modify.
286    @param kwargs: key=value pairs of values to set on the host.
287    """
288    rpc_utils.check_modify_host(kwargs)
289    host = models.Host.smart_get(id)
290    try:
291        rpc_utils.check_modify_host_locking(host, kwargs)
292    except model_logic.ValidationError as e:
293        if not kwargs.get('force_modify_locking', False):
294            raise
295        logging.exception('The following exception will be ignored and lock '
296                          'modification will be enforced. %s', e)
297
298    # This is required to make `lock_time` for a host be exactly same
299    # between the master and a shard.
300    if kwargs.get('locked', None) and 'lock_time' not in kwargs:
301        kwargs['lock_time'] = datetime.datetime.now()
302    host.update_object(kwargs)
303
304    # force_modifying_locking is not an internal field in database, remove.
305    kwargs.pop('force_modify_locking', None)
306    rpc_utils.fanout_rpc([host], 'modify_host_local',
307                         include_hostnames=False, id=id, **kwargs)
308
309
310def modify_host_local(id, **kwargs):
311    """Modify host attributes in local DB.
312
313    @param id: Host id.
314    @param kwargs: key=value pairs of values to set on the host.
315    """
316    models.Host.smart_get(id).update_object(kwargs)
317
318
319@rpc_utils.route_rpc_to_master
320def modify_hosts(host_filter_data, update_data):
321    """Modify local attributes of multiple hosts.
322
323    If this is called on the master, but one of the hosts in that match the
324    filters is assigned to a shard, this will call `modify_hosts_local` RPC
325    to the responsible shard.
326    When this is called on a shard, the shard just routes the RPC to the master
327    and does nothing.
328
329    The filters are always applied on the master, not on the shards. This means
330    if the states of a host differ on the master and a shard, the state on the
331    master will be used. I.e. this means:
332    A host was synced to Shard 1. On Shard 1 the status of the host was set to
333    'Repair Failed'.
334    - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
335    update the host (both on the shard and on the master), because the state
336    of the host as the master knows it is still 'Ready'.
337    - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
338    will not update the host, because the filter doesn't apply on the master.
339
340    @param host_filter_data: Filters out which hosts to modify.
341    @param update_data: A dictionary with the changes to make to the hosts.
342    """
343    update_data = update_data.copy()
344    rpc_utils.check_modify_host(update_data)
345    hosts = models.Host.query_objects(host_filter_data)
346
347    affected_shard_hostnames = set()
348    affected_host_ids = []
349
350    # Check all hosts before changing data for exception safety.
351    for host in hosts:
352        try:
353            rpc_utils.check_modify_host_locking(host, update_data)
354        except model_logic.ValidationError as e:
355            if not update_data.get('force_modify_locking', False):
356                raise
357            logging.exception('The following exception will be ignored and '
358                              'lock modification will be enforced. %s', e)
359
360        if host.shard:
361            affected_shard_hostnames.add(host.shard.rpc_hostname())
362            affected_host_ids.append(host.id)
363
364    # This is required to make `lock_time` for a host be exactly same
365    # between the master and a shard.
366    if update_data.get('locked', None) and 'lock_time' not in update_data:
367        update_data['lock_time'] = datetime.datetime.now()
368    for host in hosts:
369        host.update_object(update_data)
370
371    update_data.pop('force_modify_locking', None)
372    # Caution: Changing the filter from the original here. See docstring.
373    rpc_utils.run_rpc_on_multiple_hostnames(
374            'modify_hosts_local', affected_shard_hostnames,
375            host_filter_data={'id__in': affected_host_ids},
376            update_data=update_data)
377
378
379def modify_hosts_local(host_filter_data, update_data):
380    """Modify attributes of hosts in local DB.
381
382    @param host_filter_data: Filters out which hosts to modify.
383    @param update_data: A dictionary with the changes to make to the hosts.
384    """
385    for host in models.Host.query_objects(host_filter_data):
386        host.update_object(update_data)
387
388
389def add_labels_to_host(id, labels):
390    """Adds labels to a given host only in local DB.
391
392    @param id: id or hostname for a host.
393    @param labels: ids or names for labels.
394    """
395    label_objs = models.Label.smart_get_bulk(labels)
396    models.Host.smart_get(id).labels.add(*label_objs)
397
398
399@rpc_utils.route_rpc_to_master
400def host_add_labels(id, labels):
401    """Adds labels to a given host.
402
403    @param id: id or hostname for a host.
404    @param labels: ids or names for labels.
405
406    @raises ValidationError: If adding more than one platform label.
407    """
408    label_objs = models.Label.smart_get_bulk(labels)
409    platforms = [label.name for label in label_objs if label.platform]
410    if len(platforms) > 1:
411        raise model_logic.ValidationError(
412            {'labels': 'Adding more than one platform label: %s' %
413                       ', '.join(platforms)})
414
415    host_obj = models.Host.smart_get(id)
416    if len(platforms) == 1:
417        models.Host.check_no_platform([host_obj])
418    add_labels_to_host(id, labels)
419
420    rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
421                         id=id, labels=labels)
422
423
424def remove_labels_from_host(id, labels):
425    """Removes labels from a given host only in local DB.
426
427    @param id: id or hostname for a host.
428    @param labels: ids or names for labels.
429    """
430    label_objs = models.Label.smart_get_bulk(labels)
431    models.Host.smart_get(id).labels.remove(*label_objs)
432
433
434@rpc_utils.route_rpc_to_master
435def host_remove_labels(id, labels):
436    """Removes labels from a given host.
437
438    @param id: id or hostname for a host.
439    @param labels: ids or names for labels.
440    """
441    remove_labels_from_host(id, labels)
442
443    host_obj = models.Host.smart_get(id)
444    rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
445                         id=id, labels=labels)
446
447
448def get_host_attribute(attribute, **host_filter_data):
449    """
450    @param attribute: string name of attribute
451    @param host_filter_data: filter data to apply to Hosts to choose hosts to
452                             act upon
453    """
454    hosts = rpc_utils.get_host_query((), False, False, True, host_filter_data)
455    hosts = list(hosts)
456    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
457                                               'attribute_list')
458    host_attr_dicts = []
459    for host_obj in hosts:
460        for attr_obj in host_obj.attribute_list:
461            if attr_obj.attribute == attribute:
462                host_attr_dicts.append(attr_obj.get_object_dict())
463    return rpc_utils.prepare_for_serialization(host_attr_dicts)
464
465
466def set_host_attribute(attribute, value, **host_filter_data):
467    """
468    @param attribute: string name of attribute
469    @param value: string, or None to delete an attribute
470    @param host_filter_data: filter data to apply to Hosts to choose hosts to
471                             act upon
472    """
473    assert host_filter_data # disallow accidental actions on all hosts
474    hosts = models.Host.query_objects(host_filter_data)
475    models.AclGroup.check_for_acl_violation_hosts(hosts)
476    for host in hosts:
477        host.set_or_delete_attribute(attribute, value)
478
479    # Master forwards this RPC to shards.
480    if not utils.is_shard():
481        rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False,
482                attribute=attribute, value=value, **host_filter_data)
483
484
485@rpc_utils.forward_single_host_rpc_to_shard
486def delete_host(id):
487    models.Host.smart_get(id).delete()
488
489
490def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
491              exclude_atomic_group_hosts=False, valid_only=True,
492              include_current_job=False, **filter_data):
493    """Get a list of dictionaries which contains the information of hosts.
494
495    @param multiple_labels: match hosts in all of the labels given.  Should
496            be a list of label names.
497    @param exclude_only_if_needed_labels: Exclude hosts with at least one
498            "only_if_needed" label applied.
499    @param exclude_atomic_group_hosts: Exclude hosts that have one or more
500            atomic group labels associated with them.
501    @param include_current_job: Set to True to include ids of currently running
502            job and special task.
503    """
504    hosts = rpc_utils.get_host_query(multiple_labels,
505                                     exclude_only_if_needed_labels,
506                                     exclude_atomic_group_hosts,
507                                     valid_only, filter_data)
508    hosts = list(hosts)
509    models.Host.objects.populate_relationships(hosts, models.Label,
510                                               'label_list')
511    models.Host.objects.populate_relationships(hosts, models.AclGroup,
512                                               'acl_list')
513    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
514                                               'attribute_list')
515    host_dicts = []
516    for host_obj in hosts:
517        host_dict = host_obj.get_object_dict()
518        host_dict['labels'] = [label.name for label in host_obj.label_list]
519        host_dict['platform'], host_dict['atomic_group'] = (rpc_utils.
520                find_platform_and_atomic_group(host_obj))
521        host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
522        host_dict['attributes'] = dict((attribute.attribute, attribute.value)
523                                       for attribute in host_obj.attribute_list)
524        if include_current_job:
525            host_dict['current_job'] = None
526            host_dict['current_special_task'] = None
527            entries = models.HostQueueEntry.objects.filter(
528                    host_id=host_dict['id'], active=True, complete=False)
529            if entries:
530                host_dict['current_job'] = (
531                        entries[0].get_object_dict()['job'])
532            tasks = models.SpecialTask.objects.filter(
533                    host_id=host_dict['id'], is_active=True, is_complete=False)
534            if tasks:
535                host_dict['current_special_task'] = (
536                        '%d-%s' % (tasks[0].get_object_dict()['id'],
537                                   tasks[0].get_object_dict()['task'].lower()))
538        host_dicts.append(host_dict)
539    return rpc_utils.prepare_for_serialization(host_dicts)
540
541
542def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
543                  exclude_atomic_group_hosts=False, valid_only=True,
544                  **filter_data):
545    """
546    Same parameters as get_hosts().
547
548    @returns The number of matching hosts.
549    """
550    hosts = rpc_utils.get_host_query(multiple_labels,
551                                     exclude_only_if_needed_labels,
552                                     exclude_atomic_group_hosts,
553                                     valid_only, filter_data)
554    return hosts.count()
555
556
557# tests
558
559def add_test(name, test_type, path, author=None, dependencies=None,
560             experimental=True, run_verify=None, test_class=None,
561             test_time=None, test_category=None, description=None,
562             sync_count=1):
563    return models.Test.add_object(name=name, test_type=test_type, path=path,
564                                  author=author, dependencies=dependencies,
565                                  experimental=experimental,
566                                  run_verify=run_verify, test_time=test_time,
567                                  test_category=test_category,
568                                  sync_count=sync_count,
569                                  test_class=test_class,
570                                  description=description).id
571
572
573def modify_test(id, **data):
574    models.Test.smart_get(id).update_object(data)
575
576
577def delete_test(id):
578    models.Test.smart_get(id).delete()
579
580
581def get_tests(**filter_data):
582    return rpc_utils.prepare_for_serialization(
583        models.Test.list_objects(filter_data))
584
585
586@_timer.decorate
587def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
588    """Gets the counts of all passed and failed tests from the matching jobs.
589
590    @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g.,
591            'butterfly-release/R40-6457.21.0/bvt-cq/'.
592    @param label_name: Label that must be set in the jobs, e.g.,
593            'cros-version:butterfly-release/R40-6457.21.0'.
594
595    @returns A summary of the counts of all the passed and failed tests.
596    """
597    job_ids = list(models.Job.objects.filter(
598            name__startswith=job_name_prefix,
599            dependency_labels__name=label_name).values_list(
600                'pk', flat=True))
601    summary = {'passed': 0, 'failed': 0}
602    if not job_ids:
603        return summary
604
605    counts = (tko_models.TestView.objects.filter(
606            afe_job_id__in=job_ids).exclude(
607                test_name='SERVER_JOB').exclude(
608                    test_name__startswith='CLIENT_JOB').values(
609                        'status').annotate(
610                            count=Count('status')))
611    for status in counts:
612        if status['status'] == 'GOOD':
613            summary['passed'] += status['count']
614        else:
615            summary['failed'] += status['count']
616    return summary
617
618
619# profilers
620
621def add_profiler(name, description=None):
622    return models.Profiler.add_object(name=name, description=description).id
623
624
625def modify_profiler(id, **data):
626    models.Profiler.smart_get(id).update_object(data)
627
628
629def delete_profiler(id):
630    models.Profiler.smart_get(id).delete()
631
632
633def get_profilers(**filter_data):
634    return rpc_utils.prepare_for_serialization(
635        models.Profiler.list_objects(filter_data))
636
637
638# users
639
640def add_user(login, access_level=None):
641    return models.User.add_object(login=login, access_level=access_level).id
642
643
644def modify_user(id, **data):
645    models.User.smart_get(id).update_object(data)
646
647
648def delete_user(id):
649    models.User.smart_get(id).delete()
650
651
652def get_users(**filter_data):
653    return rpc_utils.prepare_for_serialization(
654        models.User.list_objects(filter_data))
655
656
657# acl groups
658
659def add_acl_group(name, description=None):
660    group = models.AclGroup.add_object(name=name, description=description)
661    group.users.add(models.User.current_user())
662    return group.id
663
664
665def modify_acl_group(id, **data):
666    group = models.AclGroup.smart_get(id)
667    group.check_for_acl_violation_acl_group()
668    group.update_object(data)
669    group.add_current_user_if_empty()
670
671
672def acl_group_add_users(id, users):
673    group = models.AclGroup.smart_get(id)
674    group.check_for_acl_violation_acl_group()
675    users = models.User.smart_get_bulk(users)
676    group.users.add(*users)
677
678
679def acl_group_remove_users(id, users):
680    group = models.AclGroup.smart_get(id)
681    group.check_for_acl_violation_acl_group()
682    users = models.User.smart_get_bulk(users)
683    group.users.remove(*users)
684    group.add_current_user_if_empty()
685
686
687def acl_group_add_hosts(id, hosts):
688    group = models.AclGroup.smart_get(id)
689    group.check_for_acl_violation_acl_group()
690    hosts = models.Host.smart_get_bulk(hosts)
691    group.hosts.add(*hosts)
692    group.on_host_membership_change()
693
694
695def acl_group_remove_hosts(id, hosts):
696    group = models.AclGroup.smart_get(id)
697    group.check_for_acl_violation_acl_group()
698    hosts = models.Host.smart_get_bulk(hosts)
699    group.hosts.remove(*hosts)
700    group.on_host_membership_change()
701
702
703def delete_acl_group(id):
704    models.AclGroup.smart_get(id).delete()
705
706
707def get_acl_groups(**filter_data):
708    acl_groups = models.AclGroup.list_objects(filter_data)
709    for acl_group in acl_groups:
710        acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
711        acl_group['users'] = [user.login
712                              for user in acl_group_obj.users.all()]
713        acl_group['hosts'] = [host.hostname
714                              for host in acl_group_obj.hosts.all()]
715    return rpc_utils.prepare_for_serialization(acl_groups)
716
717
718# jobs
719
720def generate_control_file(tests=(), kernel=None, label=None, profilers=(),
721                          client_control_file='', use_container=False,
722                          profile_only=None, upload_kernel_config=False,
723                          db_tests=True):
724    """
725    Generates a client-side control file to load a kernel and run tests.
726
727    @param tests List of tests to run. See db_tests for more information.
728    @param kernel A list of kernel info dictionaries configuring which kernels
729        to boot for this job and other options for them
730    @param label Name of label to grab kernel config from.
731    @param profilers List of profilers to activate during the job.
732    @param client_control_file The contents of a client-side control file to
733        run at the end of all tests.  If this is supplied, all tests must be
734        client side.
735        TODO: in the future we should support server control files directly
736        to wrap with a kernel.  That'll require changing the parameter
737        name and adding a boolean to indicate if it is a client or server
738        control file.
739    @param use_container unused argument today.  TODO: Enable containers
740        on the host during a client side test.
741    @param profile_only A boolean that indicates what default profile_only
742        mode to use in the control file. Passing None will generate a
743        control file that does not explcitly set the default mode at all.
744    @param upload_kernel_config: if enabled it will generate server control
745            file code that uploads the kernel config file to the client and
746            tells the client of the new (local) path when compiling the kernel;
747            the tests must be server side tests
748    @param db_tests: if True, the test object can be found in the database
749                     backing the test model. In this case, tests is a tuple
750                     of test IDs which are used to retrieve the test objects
751                     from the database. If False, tests is a tuple of test
752                     dictionaries stored client-side in the AFE.
753
754    @returns a dict with the following keys:
755        control_file: str, The control file text.
756        is_server: bool, is the control file a server-side control file?
757        synch_count: How many machines the job uses per autoserv execution.
758            synch_count == 1 means the job is asynchronous.
759        dependencies: A list of the names of labels on which the job depends.
760    """
761    if not tests and not client_control_file:
762        return dict(control_file='', is_server=False, synch_count=1,
763                    dependencies=[])
764
765    cf_info, test_objects, profiler_objects, label = (
766        rpc_utils.prepare_generate_control_file(tests, kernel, label,
767                                                profilers, db_tests))
768    cf_info['control_file'] = control_file.generate_control(
769        tests=test_objects, kernels=kernel, platform=label,
770        profilers=profiler_objects, is_server=cf_info['is_server'],
771        client_control_file=client_control_file, profile_only=profile_only,
772        upload_kernel_config=upload_kernel_config)
773    return cf_info
774
775
776def create_parameterized_job(name, priority, test, parameters, kernel=None,
777                             label=None, profilers=(), profiler_parameters=None,
778                             use_container=False, profile_only=None,
779                             upload_kernel_config=False, hosts=(),
780                             meta_hosts=(), one_time_hosts=(),
781                             atomic_group_name=None, synch_count=None,
782                             is_template=False, timeout=None,
783                             timeout_mins=None, max_runtime_mins=None,
784                             run_verify=False, email_list='', dependencies=(),
785                             reboot_before=None, reboot_after=None,
786                             parse_failed_repair=None, hostless=False,
787                             keyvals=None, drone_set=None, run_reset=True,
788                             require_ssp=None):
789    """
790    Creates and enqueues a parameterized job.
791
792    Most parameters a combination of the parameters for generate_control_file()
793    and create_job(), with the exception of:
794
795    @param test name or ID of the test to run
796    @param parameters a map of parameter name ->
797                          tuple of (param value, param type)
798    @param profiler_parameters a dictionary of parameters for the profilers:
799                                   key: profiler name
800                                   value: dict of param name -> tuple of
801                                                                (param value,
802                                                                 param type)
803    """
804    # Save the values of the passed arguments here. What we're going to do with
805    # them is pass them all to rpc_utils.get_create_job_common_args(), which
806    # will extract the subset of these arguments that apply for
807    # rpc_utils.create_job_common(), which we then pass in to that function.
808    args = locals()
809
810    # Set up the parameterized job configs
811    test_obj = models.Test.smart_get(test)
812    control_type = test_obj.test_type
813
814    try:
815        label = models.Label.smart_get(label)
816    except models.Label.DoesNotExist:
817        label = None
818
819    kernel_objs = models.Kernel.create_kernels(kernel)
820    profiler_objs = [models.Profiler.smart_get(profiler)
821                     for profiler in profilers]
822
823    parameterized_job = models.ParameterizedJob.objects.create(
824            test=test_obj, label=label, use_container=use_container,
825            profile_only=profile_only,
826            upload_kernel_config=upload_kernel_config)
827    parameterized_job.kernels.add(*kernel_objs)
828
829    for profiler in profiler_objs:
830        parameterized_profiler = models.ParameterizedJobProfiler.objects.create(
831                parameterized_job=parameterized_job,
832                profiler=profiler)
833        profiler_params = profiler_parameters.get(profiler.name, {})
834        for name, (value, param_type) in profiler_params.iteritems():
835            models.ParameterizedJobProfilerParameter.objects.create(
836                    parameterized_job_profiler=parameterized_profiler,
837                    parameter_name=name,
838                    parameter_value=value,
839                    parameter_type=param_type)
840
841    try:
842        for parameter in test_obj.testparameter_set.all():
843            if parameter.name in parameters:
844                param_value, param_type = parameters.pop(parameter.name)
845                parameterized_job.parameterizedjobparameter_set.create(
846                        test_parameter=parameter, parameter_value=param_value,
847                        parameter_type=param_type)
848
849        if parameters:
850            raise Exception('Extra parameters remain: %r' % parameters)
851
852        return rpc_utils.create_job_common(
853                parameterized_job=parameterized_job.id,
854                control_type=control_type,
855                **rpc_utils.get_create_job_common_args(args))
856    except:
857        parameterized_job.delete()
858        raise
859
860
861def create_job_page_handler(name, priority, control_file, control_type,
862                            image=None, hostless=False, firmware_rw_build=None,
863                            firmware_ro_build=None, test_source_build=None,
864                            **kwargs):
865    """\
866    Create and enqueue a job.
867
868    @param name name of this job
869    @param priority Integer priority of this job.  Higher is more important.
870    @param control_file String contents of the control file.
871    @param control_type Type of control file, Client or Server.
872    @param image: ChromeOS build to be installed in the dut. Default to None.
873    @param firmware_rw_build: Firmware build to update RW firmware. Default to
874                              None, i.e., RW firmware will not be updated.
875    @param firmware_ro_build: Firmware build to update RO firmware. Default to
876                              None, i.e., RO firmware will not be updated.
877    @param test_source_build: Build to be used to retrieve test code. Default
878                              to None.
879    @param kwargs extra args that will be required by create_suite_job or
880                  create_job.
881
882    @returns The created Job id number.
883    """
884    control_file = rpc_utils.encode_ascii(control_file)
885    if not control_file:
886        raise model_logic.ValidationError({
887                'control_file' : "Control file cannot be empty"})
888
889    if image and hostless:
890        builds = {}
891        builds[provision.CROS_VERSION_PREFIX] = image
892        if firmware_rw_build:
893            builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
894        if firmware_ro_build:
895            builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
896        return site_rpc_interface.create_suite_job(
897                name=name, control_file=control_file, priority=priority,
898                builds=builds, test_source_build=test_source_build, **kwargs)
899    return create_job(name, priority, control_file, control_type, image=image,
900                      hostless=hostless, **kwargs)
901
902
903@rpc_utils.route_rpc_to_master
904def create_job(name, priority, control_file, control_type,
905               hosts=(), meta_hosts=(), one_time_hosts=(),
906               atomic_group_name=None, synch_count=None, is_template=False,
907               timeout=None, timeout_mins=None, max_runtime_mins=None,
908               run_verify=False, email_list='', dependencies=(),
909               reboot_before=None, reboot_after=None, parse_failed_repair=None,
910               hostless=False, keyvals=None, drone_set=None, image=None,
911               parent_job_id=None, test_retry=0, run_reset=True,
912               require_ssp=None, args=(), **kwargs):
913    """\
914    Create and enqueue a job.
915
916    @param name name of this job
917    @param priority Integer priority of this job.  Higher is more important.
918    @param control_file String contents of the control file.
919    @param control_type Type of control file, Client or Server.
920    @param synch_count How many machines the job uses per autoserv execution.
921        synch_count == 1 means the job is asynchronous.  If an atomic group is
922        given this value is treated as a minimum.
923    @param is_template If true then create a template job.
924    @param timeout Hours after this call returns until the job times out.
925    @param timeout_mins Minutes after this call returns until the job times
926        out.
927    @param max_runtime_mins Minutes from job starting time until job times out
928    @param run_verify Should the host be verified before running the test?
929    @param email_list String containing emails to mail when the job is done
930    @param dependencies List of label names on which this job depends
931    @param reboot_before Never, If dirty, or Always
932    @param reboot_after Never, If all tests passed, or Always
933    @param parse_failed_repair if true, results of failed repairs launched by
934        this job will be parsed as part of the job.
935    @param hostless if true, create a hostless job
936    @param keyvals dict of keyvals to associate with the job
937    @param hosts List of hosts to run job on.
938    @param meta_hosts List where each entry is a label name, and for each entry
939        one host will be chosen from that label to run the job on.
940    @param one_time_hosts List of hosts not in the database to run the job on.
941    @param atomic_group_name The name of an atomic group to schedule the job on.
942    @param drone_set The name of the drone set to run this test on.
943    @param image OS image to install before running job.
944    @param parent_job_id id of a job considered to be parent of created job.
945    @param test_retry Number of times to retry test if the test did not
946        complete successfully. (optional, default: 0)
947    @param run_reset Should the host be reset before running the test?
948    @param require_ssp Set to True to require server-side packaging to run the
949                       test. If it's set to None, drone will still try to run
950                       the server side with server-side packaging. If the
951                       autotest-server package doesn't exist for the build or
952                       image is not set, drone will run the test without server-
953                       side packaging. Default is None.
954    @param args A list of args to be injected into control file.
955    @param kwargs extra keyword args. NOT USED.
956
957    @returns The created Job id number.
958    """
959    if args:
960        control_file = tools.inject_vars({'args': args}, control_file)
961
962    if image is None:
963        return rpc_utils.create_job_common(
964                **rpc_utils.get_create_job_common_args(locals()))
965
966    # Translate the image name, in case its a relative build name.
967    ds = dev_server.ImageServer.resolve(image)
968    image = ds.translate(image)
969
970    # When image is supplied use a known parameterized test already in the
971    # database to pass the OS image path from the front end, through the
972    # scheduler, and finally to autoserv as the --image parameter.
973
974    # The test autoupdate_ParameterizedJob is in afe_autotests and used to
975    # instantiate a Test object and from there a ParameterizedJob.
976    known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
977    known_parameterized_job = models.ParameterizedJob.objects.create(
978            test=known_test_obj)
979
980    # autoupdate_ParameterizedJob has a single parameter, the image parameter,
981    # stored in the table afe_test_parameters.  We retrieve and set this
982    # instance of the parameter to the OS image path.
983    image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
984                                                           name='image')
985    known_parameterized_job.parameterizedjobparameter_set.create(
986            test_parameter=image_parameter, parameter_value=image,
987            parameter_type='string')
988
989    # TODO(crbug.com/502638): save firmware build etc to parameterized_job.
990
991    # By passing a parameterized_job to create_job_common the job entry in
992    # the afe_jobs table will have the field parameterized_job_id set.
993    # The scheduler uses this id in the afe_parameterized_jobs table to
994    # match this job to our known test, and then with the
995    # afe_parameterized_job_parameters table to get the actual image path.
996    return rpc_utils.create_job_common(
997            parameterized_job=known_parameterized_job.id,
998            **rpc_utils.get_create_job_common_args(locals()))
999
1000
1001def abort_host_queue_entries(**filter_data):
1002    """\
1003    Abort a set of host queue entries.
1004
1005    @return: A list of dictionaries, each contains information
1006             about an aborted HQE.
1007    """
1008    query = models.HostQueueEntry.query_objects(filter_data)
1009
1010    # Dont allow aborts on:
1011    #   1. Jobs that have already completed (whether or not they were aborted)
1012    #   2. Jobs that we have already been aborted (but may not have completed)
1013    query = query.filter(complete=False).filter(aborted=False)
1014    models.AclGroup.check_abort_permissions(query)
1015    host_queue_entries = list(query.select_related())
1016    rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
1017
1018    models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
1019    hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
1020                 'Job name': hqe.job.name} for hqe in host_queue_entries]
1021    return hqe_info
1022
1023
1024def abort_special_tasks(**filter_data):
1025    """\
1026    Abort the special task, or tasks, specified in the filter.
1027    """
1028    query = models.SpecialTask.query_objects(filter_data)
1029    special_tasks = query.filter(is_active=True)
1030    for task in special_tasks:
1031        task.abort()
1032
1033
1034def _call_special_tasks_on_hosts(task, hosts):
1035    """\
1036    Schedules a set of hosts for a special task.
1037
1038    @returns A list of hostnames that a special task was created for.
1039    """
1040    models.AclGroup.check_for_acl_violation_hosts(hosts)
1041    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
1042    if shard_host_map and not utils.is_shard():
1043        raise ValueError('The following hosts are on shards, please '
1044                         'follow the link to the shards and create jobs '
1045                         'there instead. %s.' % shard_host_map)
1046    for host in hosts:
1047        models.SpecialTask.schedule_special_task(host, task)
1048    return list(sorted(host.hostname for host in hosts))
1049
1050
1051def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
1052    """Forward special tasks to corresponding shards.
1053
1054    For master, when special tasks are fired on hosts that are sharded,
1055    forward the RPC to corresponding shards.
1056
1057    For shard, create special task records in local DB.
1058
1059    @param task: Enum value of frontend.afe.models.SpecialTask.Task
1060    @param rpc: RPC name to forward.
1061    @param filter_data: Filter keywords to be used for DB query.
1062
1063    @return: A list of hostnames that a special task was created for.
1064    """
1065    hosts = models.Host.query_objects(filter_data)
1066    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True)
1067
1068    # Filter out hosts on a shard from those on the master, forward
1069    # rpcs to the shard with an additional hostname__in filter, and
1070    # create a local SpecialTask for each remaining host.
1071    if shard_host_map and not utils.is_shard():
1072        hosts = [h for h in hosts if h.shard is None]
1073        for shard, hostnames in shard_host_map.iteritems():
1074
1075            # The main client of this module is the frontend website, and
1076            # it invokes it with an 'id' or an 'id__in' filter. Regardless,
1077            # the 'hostname' filter should narrow down the list of hosts on
1078            # each shard even though we supply all the ids in filter_data.
1079            # This method uses hostname instead of id because it fits better
1080            # with the overall architecture of redirection functions in
1081            # rpc_utils.
1082            shard_filter = filter_data.copy()
1083            shard_filter['hostname__in'] = hostnames
1084            rpc_utils.run_rpc_on_multiple_hostnames(
1085                    rpc, [shard], **shard_filter)
1086
1087    # There is a race condition here if someone assigns a shard to one of these
1088    # hosts before we create the task. The host will stay on the master if:
1089    # 1. The host is not Ready
1090    # 2. The host is Ready but has a task
1091    # But if the host is Ready and doesn't have a task yet, it will get sent
1092    # to the shard as we're creating a task here.
1093
1094    # Given that we only rarely verify Ready hosts it isn't worth putting this
1095    # entire method in a transaction. The worst case scenario is that we have
1096    # a verify running on a Ready host while the shard is using it, if the
1097    # verify fails no subsequent tasks will be created against the host on the
1098    # master, and verifies are safe enough that this is OK.
1099    return _call_special_tasks_on_hosts(task, hosts)
1100
1101
1102def reverify_hosts(**filter_data):
1103    """\
1104    Schedules a set of hosts for verify.
1105
1106    @returns A list of hostnames that a verify task was created for.
1107    """
1108    return _forward_special_tasks_on_hosts(
1109            models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
1110
1111
1112def repair_hosts(**filter_data):
1113    """\
1114    Schedules a set of hosts for repair.
1115
1116    @returns A list of hostnames that a repair task was created for.
1117    """
1118    return _forward_special_tasks_on_hosts(
1119            models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
1120
1121
1122def get_jobs(not_yet_run=False, running=False, finished=False,
1123             suite=False, sub=False, standalone=False, **filter_data):
1124    """\
1125    Extra status filter args for get_jobs:
1126    -not_yet_run: Include only jobs that have not yet started running.
1127    -running: Include only jobs that have start running but for which not
1128    all hosts have completed.
1129    -finished: Include only jobs for which all hosts have completed (or
1130    aborted).
1131
1132    Extra type filter args for get_jobs:
1133    -suite: Include only jobs with child jobs.
1134    -sub: Include only jobs with a parent job.
1135    -standalone: Inlcude only jobs with no child or parent jobs.
1136    At most one of these three fields should be specified.
1137    """
1138    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1139                                                    running,
1140                                                    finished)
1141    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1142                                                                 suite,
1143                                                                 sub,
1144                                                                 standalone)
1145    job_dicts = []
1146    jobs = list(models.Job.query_objects(filter_data))
1147    models.Job.objects.populate_relationships(jobs, models.Label,
1148                                              'dependencies')
1149    models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
1150    for job in jobs:
1151        job_dict = job.get_object_dict()
1152        job_dict['dependencies'] = ','.join(label.name
1153                                            for label in job.dependencies)
1154        job_dict['keyvals'] = dict((keyval.key, keyval.value)
1155                                   for keyval in job.keyvals)
1156        if job.parameterized_job:
1157            job_dict['image'] = get_parameterized_autoupdate_image_url(job)
1158        job_dicts.append(job_dict)
1159    return rpc_utils.prepare_for_serialization(job_dicts)
1160
1161
1162def get_num_jobs(not_yet_run=False, running=False, finished=False,
1163                 suite=False, sub=False, standalone=False,
1164                 **filter_data):
1165    """\
1166    See get_jobs() for documentation of extra filter parameters.
1167    """
1168    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1169                                                    running,
1170                                                    finished)
1171    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1172                                                                 suite,
1173                                                                 sub,
1174                                                                 standalone)
1175    return models.Job.query_count(filter_data)
1176
1177
1178def get_jobs_summary(**filter_data):
1179    """\
1180    Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
1181
1182    'status_counts' filed is a dictionary mapping status strings to the number
1183    of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
1184
1185    'result_counts' field is piped to tko's rpc_interface and has the return
1186    format specified under get_group_counts.
1187    """
1188    jobs = get_jobs(**filter_data)
1189    ids = [job['id'] for job in jobs]
1190    all_status_counts = models.Job.objects.get_status_counts(ids)
1191    for job in jobs:
1192        job['status_counts'] = all_status_counts[job['id']]
1193        job['result_counts'] = tko_rpc_interface.get_status_counts(
1194                ['afe_job_id', 'afe_job_id'],
1195                header_groups=[['afe_job_id'], ['afe_job_id']],
1196                **{'afe_job_id': job['id']})
1197    return rpc_utils.prepare_for_serialization(jobs)
1198
1199
1200def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
1201    """\
1202    Retrieves all the information needed to clone a job.
1203    """
1204    job = models.Job.objects.get(id=id)
1205    job_info = rpc_utils.get_job_info(job,
1206                                      preserve_metahosts,
1207                                      queue_entry_filter_data)
1208
1209    host_dicts = []
1210    for host in job_info['hosts']:
1211        host_dict = get_hosts(id=host.id)[0]
1212        other_labels = host_dict['labels']
1213        if host_dict['platform']:
1214            other_labels.remove(host_dict['platform'])
1215        host_dict['other_labels'] = ', '.join(other_labels)
1216        host_dicts.append(host_dict)
1217
1218    for host in job_info['one_time_hosts']:
1219        host_dict = dict(hostname=host.hostname,
1220                         id=host.id,
1221                         platform='(one-time host)',
1222                         locked_text='')
1223        host_dicts.append(host_dict)
1224
1225    # convert keys from Label objects to strings (names of labels)
1226    meta_host_counts = dict((meta_host.name, count) for meta_host, count
1227                            in job_info['meta_host_counts'].iteritems())
1228
1229    info = dict(job=job.get_object_dict(),
1230                meta_host_counts=meta_host_counts,
1231                hosts=host_dicts)
1232    info['job']['dependencies'] = job_info['dependencies']
1233    if job_info['atomic_group']:
1234        info['atomic_group_name'] = (job_info['atomic_group']).name
1235    else:
1236        info['atomic_group_name'] = None
1237    info['hostless'] = job_info['hostless']
1238    info['drone_set'] = job.drone_set and job.drone_set.name
1239
1240    if job.parameterized_job:
1241        info['job']['image'] = get_parameterized_autoupdate_image_url(job)
1242
1243    return rpc_utils.prepare_for_serialization(info)
1244
1245
1246# host queue entries
1247
1248def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
1249    """\
1250    @returns A sequence of nested dictionaries of host and job information.
1251    """
1252    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1253                                                   'started_on__lte',
1254                                                   start_time,
1255                                                   end_time,
1256                                                   **filter_data)
1257    return rpc_utils.prepare_rows_as_nested_dicts(
1258            models.HostQueueEntry.query_objects(filter_data),
1259            ('host', 'atomic_group', 'job'))
1260
1261
1262def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
1263    """\
1264    Get the number of host queue entries associated with this job.
1265    """
1266    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1267                                                   'started_on__lte',
1268                                                   start_time,
1269                                                   end_time,
1270                                                   **filter_data)
1271    return models.HostQueueEntry.query_count(filter_data)
1272
1273
1274def get_hqe_percentage_complete(**filter_data):
1275    """
1276    Computes the fraction of host queue entries matching the given filter data
1277    that are complete.
1278    """
1279    query = models.HostQueueEntry.query_objects(filter_data)
1280    complete_count = query.filter(complete=True).count()
1281    total_count = query.count()
1282    if total_count == 0:
1283        return 1
1284    return float(complete_count) / total_count
1285
1286
1287# special tasks
1288
1289def get_special_tasks(**filter_data):
1290    """Get special task entries from the local database.
1291
1292    Query the special tasks table for tasks matching the given
1293    `filter_data`, and return a list of the results.  No attempt is
1294    made to forward the call to shards; the buck will stop here.
1295    The caller is expected to know the target shard for such reasons
1296    as:
1297      * The caller is a service (such as gs_offloader) configured
1298        to operate on behalf of one specific shard, and no other.
1299      * The caller has a host as a parameter, and knows that this is
1300        the shard assigned to that host.
1301
1302    @param filter_data  Filter keywords to pass to the underlying
1303                        database query.
1304
1305    """
1306    return rpc_utils.prepare_rows_as_nested_dicts(
1307            models.SpecialTask.query_objects(filter_data),
1308            ('host', 'queue_entry'))
1309
1310
1311def get_host_special_tasks(host_id, **filter_data):
1312    """Get special task entries for a given host.
1313
1314    Query the special tasks table for tasks that ran on the host
1315    given by `host_id` and matching the given `filter_data`.
1316    Return a list of the results.  If the host is assigned to a
1317    shard, forward this call to that shard.
1318
1319    @param host_id      Id in the database of the target host.
1320    @param filter_data  Filter keywords to pass to the underlying
1321                        database query.
1322
1323    """
1324    # Retrieve host data even if the host is in an invalid state.
1325    host = models.Host.smart_get(host_id, False)
1326    if not host.shard:
1327        return get_special_tasks(host_id=host_id, **filter_data)
1328    else:
1329        # The return values from AFE methods are post-processed
1330        # objects that aren't JSON-serializable.  So, we have to
1331        # call AFE.run() to get the raw, serializable output from
1332        # the shard.
1333        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1334        return shard_afe.run('get_special_tasks',
1335                             host_id=host_id, **filter_data)
1336
1337
1338def get_num_special_tasks(**kwargs):
1339    """Get the number of special task entries from the local database.
1340
1341    Query the special tasks table for tasks matching the given 'kwargs',
1342    and return the number of the results. No attempt is made to forward
1343    the call to shards; the buck will stop here.
1344
1345    @param kwargs    Filter keywords to pass to the underlying database query.
1346
1347    """
1348    return models.SpecialTask.query_count(kwargs)
1349
1350
1351def get_host_num_special_tasks(host, **kwargs):
1352    """Get special task entries for a given host.
1353
1354    Query the special tasks table for tasks that ran on the host
1355    given by 'host' and matching the given 'kwargs'.
1356    Return a list of the results.  If the host is assigned to a
1357    shard, forward this call to that shard.
1358
1359    @param host      id or name of a host. More often a hostname.
1360    @param kwargs    Filter keywords to pass to the underlying database query.
1361
1362    """
1363    # Retrieve host data even if the host is in an invalid state.
1364    host_model = models.Host.smart_get(host, False)
1365    if not host_model.shard:
1366        return get_num_special_tasks(host=host, **kwargs)
1367    else:
1368        shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname())
1369        return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
1370
1371
1372def get_status_task(host_id, end_time):
1373    """Get the "status task" for a host from the local shard.
1374
1375    Returns a single special task representing the given host's
1376    "status task".  The status task is a completed special task that
1377    identifies whether the corresponding host was working or broken
1378    when it completed.  A successful task indicates a working host;
1379    a failed task indicates broken.
1380
1381    This call will not be forward to a shard; the receiving server
1382    must be the shard that owns the host.
1383
1384    @param host_id      Id in the database of the target host.
1385    @param end_time     Time reference for the host's status.
1386
1387    @return A single task; its status (successful or not)
1388            corresponds to the status of the host (working or
1389            broken) at the given time.  If no task is found, return
1390            `None`.
1391
1392    """
1393    tasklist = rpc_utils.prepare_rows_as_nested_dicts(
1394            status_history.get_status_task(host_id, end_time),
1395            ('host', 'queue_entry'))
1396    return tasklist[0] if tasklist else None
1397
1398
1399def get_host_status_task(host_id, end_time):
1400    """Get the "status task" for a host from its owning shard.
1401
1402    Finds the given host's owning shard, and forwards to it a call
1403    to `get_status_task()` (see above).
1404
1405    @param host_id      Id in the database of the target host.
1406    @param end_time     Time reference for the host's status.
1407
1408    @return A single task; its status (successful or not)
1409            corresponds to the status of the host (working or
1410            broken) at the given time.  If no task is found, return
1411            `None`.
1412
1413    """
1414    host = models.Host.smart_get(host_id)
1415    if not host.shard:
1416        return get_status_task(host_id, end_time)
1417    else:
1418        # The return values from AFE methods are post-processed
1419        # objects that aren't JSON-serializable.  So, we have to
1420        # call AFE.run() to get the raw, serializable output from
1421        # the shard.
1422        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1423        return shard_afe.run('get_status_task',
1424                             host_id=host_id, end_time=end_time)
1425
1426
1427def get_host_diagnosis_interval(host_id, end_time, success):
1428    """Find a "diagnosis interval" for a given host.
1429
1430    A "diagnosis interval" identifies a start and end time where
1431    the host went from "working" to "broken", or vice versa.  The
1432    interval's starting time is the starting time of the last status
1433    task with the old status; the end time is the finish time of the
1434    first status task with the new status.
1435
1436    This routine finds the most recent diagnosis interval for the
1437    given host prior to `end_time`, with a starting status matching
1438    `success`.  If `success` is true, the interval will start with a
1439    successful status task; if false the interval will start with a
1440    failed status task.
1441
1442    @param host_id      Id in the database of the target host.
1443    @param end_time     Time reference for the diagnosis interval.
1444    @param success      Whether the diagnosis interval should start
1445                        with a successful or failed status task.
1446
1447    @return A list of two strings.  The first is the timestamp for
1448            the beginning of the interval; the second is the
1449            timestamp for the end.  If the host has never changed
1450            state, the list is empty.
1451
1452    """
1453    host = models.Host.smart_get(host_id)
1454    if not host.shard or utils.is_shard():
1455        return status_history.get_diagnosis_interval(
1456                host_id, end_time, success)
1457    else:
1458        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1459        return shard_afe.get_host_diagnosis_interval(
1460                host_id, end_time, success)
1461
1462
1463# support for host detail view
1464
1465def get_host_queue_entries_and_special_tasks(host, query_start=None,
1466                                             query_limit=None, start_time=None,
1467                                             end_time=None):
1468    """
1469    @returns an interleaved list of HostQueueEntries and SpecialTasks,
1470            in approximate run order.  each dict contains keys for type, host,
1471            job, status, started_on, execution_path, and ID.
1472    """
1473    total_limit = None
1474    if query_limit is not None:
1475        total_limit = query_start + query_limit
1476    filter_data_common = {'host': host,
1477                          'query_limit': total_limit,
1478                          'sort_by': ['-id']}
1479
1480    filter_data_special_tasks = rpc_utils.inject_times_to_filter(
1481            'time_started__gte', 'time_started__lte', start_time, end_time,
1482            **filter_data_common)
1483
1484    queue_entries = get_host_queue_entries(
1485            start_time, end_time, **filter_data_common)
1486    special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
1487
1488    interleaved_entries = rpc_utils.interleave_entries(queue_entries,
1489                                                       special_tasks)
1490    if query_start is not None:
1491        interleaved_entries = interleaved_entries[query_start:]
1492    if query_limit is not None:
1493        interleaved_entries = interleaved_entries[:query_limit]
1494    return rpc_utils.prepare_host_queue_entries_and_special_tasks(
1495            interleaved_entries, queue_entries)
1496
1497
1498def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
1499                                                 end_time=None):
1500    filter_data_common = {'host': host}
1501
1502    filter_data_queue_entries, filter_data_special_tasks = (
1503            rpc_utils.inject_times_to_hqe_special_tasks_filters(
1504                    filter_data_common, start_time, end_time))
1505
1506    return (models.HostQueueEntry.query_count(filter_data_queue_entries)
1507            + get_host_num_special_tasks(**filter_data_special_tasks))
1508
1509
1510# recurring run
1511
1512def get_recurring(**filter_data):
1513    return rpc_utils.prepare_rows_as_nested_dicts(
1514            models.RecurringRun.query_objects(filter_data),
1515            ('job', 'owner'))
1516
1517
1518def get_num_recurring(**filter_data):
1519    return models.RecurringRun.query_count(filter_data)
1520
1521
1522def delete_recurring_runs(**filter_data):
1523    to_delete = models.RecurringRun.query_objects(filter_data)
1524    to_delete.delete()
1525
1526
1527def create_recurring_run(job_id, start_date, loop_period, loop_count):
1528    owner = models.User.current_user().login
1529    job = models.Job.objects.get(id=job_id)
1530    return job.create_recurring_job(start_date=start_date,
1531                                    loop_period=loop_period,
1532                                    loop_count=loop_count,
1533                                    owner=owner)
1534
1535
1536# other
1537
1538def echo(data=""):
1539    """\
1540    Returns a passed in string. For doing a basic test to see if RPC calls
1541    can successfully be made.
1542    """
1543    return data
1544
1545
1546def get_motd():
1547    """\
1548    Returns the message of the day as a string.
1549    """
1550    return rpc_utils.get_motd()
1551
1552
1553def get_static_data():
1554    """\
1555    Returns a dictionary containing a bunch of data that shouldn't change
1556    often and is otherwise inaccessible.  This includes:
1557
1558    priorities: List of job priority choices.
1559    default_priority: Default priority value for new jobs.
1560    users: Sorted list of all users.
1561    labels: Sorted list of labels not start with 'cros-version' and
1562            'fw-version'.
1563    atomic_groups: Sorted list of all atomic groups.
1564    tests: Sorted list of all tests.
1565    profilers: Sorted list of all profilers.
1566    current_user: Logged-in username.
1567    host_statuses: Sorted list of possible Host statuses.
1568    job_statuses: Sorted list of possible HostQueueEntry statuses.
1569    job_timeout_default: The default job timeout length in minutes.
1570    parse_failed_repair_default: Default value for the parse_failed_repair job
1571            option.
1572    reboot_before_options: A list of valid RebootBefore string enums.
1573    reboot_after_options: A list of valid RebootAfter string enums.
1574    motd: Server's message of the day.
1575    status_dictionary: A mapping from one word job status names to a more
1576            informative description.
1577    """
1578
1579    job_fields = models.Job.get_field_dict()
1580    default_drone_set_name = models.DroneSet.default_drone_set_name()
1581    drone_sets = ([default_drone_set_name] +
1582                  sorted(drone_set.name for drone_set in
1583                         models.DroneSet.objects.exclude(
1584                                 name=default_drone_set_name)))
1585
1586    result = {}
1587    result['priorities'] = priorities.Priority.choices()
1588    default_priority = priorities.Priority.DEFAULT
1589    result['default_priority'] = 'Default'
1590    result['max_schedulable_priority'] = priorities.Priority.DEFAULT
1591    result['users'] = get_users(sort_by=['login'])
1592
1593    label_exclude_filters = [{'name__startswith': 'cros-version'},
1594                             {'name__startswith': 'fw-version'},
1595                             {'name__startswith': 'fwrw-version'},
1596                             {'name__startswith': 'fwro-version'}]
1597    result['labels'] = get_labels(
1598        label_exclude_filters,
1599        sort_by=['-platform', 'name'])
1600
1601    result['atomic_groups'] = get_atomic_groups(sort_by=['name'])
1602    result['tests'] = get_tests(sort_by=['name'])
1603    result['profilers'] = get_profilers(sort_by=['name'])
1604    result['current_user'] = rpc_utils.prepare_for_serialization(
1605        models.User.current_user().get_object_dict())
1606    result['host_statuses'] = sorted(models.Host.Status.names)
1607    result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
1608    result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
1609    result['job_max_runtime_mins_default'] = (
1610        models.Job.DEFAULT_MAX_RUNTIME_MINS)
1611    result['parse_failed_repair_default'] = bool(
1612        models.Job.DEFAULT_PARSE_FAILED_REPAIR)
1613    result['reboot_before_options'] = model_attributes.RebootBefore.names
1614    result['reboot_after_options'] = model_attributes.RebootAfter.names
1615    result['motd'] = rpc_utils.get_motd()
1616    result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
1617    result['drone_sets'] = drone_sets
1618    result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled()
1619
1620    result['status_dictionary'] = {"Aborted": "Aborted",
1621                                   "Verifying": "Verifying Host",
1622                                   "Provisioning": "Provisioning Host",
1623                                   "Pending": "Waiting on other hosts",
1624                                   "Running": "Running autoserv",
1625                                   "Completed": "Autoserv completed",
1626                                   "Failed": "Failed to complete",
1627                                   "Queued": "Queued",
1628                                   "Starting": "Next in host's queue",
1629                                   "Stopped": "Other host(s) failed verify",
1630                                   "Parsing": "Awaiting parse of final results",
1631                                   "Gathering": "Gathering log files",
1632                                   "Template": "Template job for recurring run",
1633                                   "Waiting": "Waiting for scheduler action",
1634                                   "Archiving": "Archiving results",
1635                                   "Resetting": "Resetting hosts"}
1636
1637    result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
1638    result['is_moblab'] = bool(utils.is_moblab())
1639
1640    return result
1641
1642
1643def get_server_time():
1644    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
1645