1# pylint: disable=missing-docstring 2 3"""\ 4Functions to expose over the RPC interface. 5 6For all modify* and delete* functions that ask for an 'id' parameter to 7identify the object to operate on, the id may be either 8 * the database row ID 9 * the name of the object (label name, hostname, user login, etc.) 10 * a dictionary containing uniquely identifying field (this option should seldom 11 be used) 12 13When specifying foreign key fields (i.e. adding hosts to a label, or adding 14users to an ACL group), the given value may be either the database row ID or the 15name of the object. 16 17All get* functions return lists of dictionaries. Each dictionary represents one 18object and maps field names to values. 19 20Some examples: 21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2 22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2' 23modify_test('sleeptest', test_type='Client', params=', seconds=60') 24delete_acl_group(1) # delete by ID 25delete_acl_group('Everyone') # delete by name 26acl_group_add_users('Everyone', ['mbligh', 'showard']) 27get_jobs(owner='showard', status='Queued') 28 29See doctests/001_rpc_test.txt for (lots) more examples. 30""" 31 32__author__ = 'showard@google.com (Steve Howard)' 33 34import ast 35import collections 36import contextlib 37import datetime 38import logging 39import os 40import sys 41import warnings 42 43from django.db import connection as db_connection 44from django.db import transaction 45from django.db.models import Count 46from django.db.utils import DatabaseError 47 48import common 49from autotest_lib.client.common_lib import control_data 50from autotest_lib.client.common_lib import error 51from autotest_lib.client.common_lib import global_config 52from autotest_lib.client.common_lib import priorities 53from autotest_lib.client.common_lib.cros import dev_server 54from autotest_lib.frontend.afe import control_file as control_file_lib 55from autotest_lib.frontend.afe import model_attributes 56from autotest_lib.frontend.afe import model_logic 57from autotest_lib.frontend.afe import models 58from autotest_lib.frontend.afe import rpc_utils 59from autotest_lib.frontend.tko import models as tko_models 60from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface 61from autotest_lib.server import frontend 62from autotest_lib.server import utils 63from autotest_lib.server.cros import provision 64from autotest_lib.server.cros.dynamic_suite import constants 65from autotest_lib.server.cros.dynamic_suite import control_file_getter 66from autotest_lib.server.cros.dynamic_suite import suite_common 67from autotest_lib.server.cros.dynamic_suite import tools 68from autotest_lib.server.cros.dynamic_suite.suite import Suite 69from autotest_lib.server.lib import status_history 70from autotest_lib.site_utils import job_history 71from autotest_lib.site_utils import server_manager_utils 72from autotest_lib.site_utils import stable_version_utils 73 74 75_CONFIG = global_config.global_config 76 77# Definition of LabHealthIndicator 78LabHealthIndicator = collections.namedtuple( 79 'LabHealthIndicator', 80 [ 81 'if_lab_close', 82 'available_duts', 83 'devserver_health', 84 'upcoming_builds', 85 ] 86) 87 88RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 89 'SKYLAB', 'respect_static_labels', type=bool, default=False) 90 91RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value( 92 'SKYLAB', 'respect_static_attributes', type=bool, default=False) 93 94# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py. 95 96# labels 97 98def modify_label(id, **data): 99 """Modify a label. 100 101 @param id: id or name of a label. More often a label name. 102 @param data: New data for a label. 103 """ 104 label_model = models.Label.smart_get(id) 105 if label_model.is_replaced_by_static(): 106 raise error.UnmodifiableLabelException( 107 'Failed to delete label "%s" because it is a static label. ' 108 'Use go/chromeos-skylab-inventory-tools to modify this ' 109 'label.' % label_model.name) 110 111 label_model.update_object(data) 112 113 # Master forwards the RPC to shards 114 if not utils.is_shard(): 115 rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False, 116 id=id, **data) 117 118 119def delete_label(id): 120 """Delete a label. 121 122 @param id: id or name of a label. More often a label name. 123 """ 124 label_model = models.Label.smart_get(id) 125 if label_model.is_replaced_by_static(): 126 raise error.UnmodifiableLabelException( 127 'Failed to delete label "%s" because it is a static label. ' 128 'Use go/chromeos-skylab-inventory-tools to modify this ' 129 'label.' % label_model.name) 130 131 # Hosts that have the label to be deleted. Save this info before 132 # the label is deleted to use it later. 133 hosts = [] 134 for h in label_model.host_set.all(): 135 hosts.append(models.Host.smart_get(h.id)) 136 label_model.delete() 137 138 # Master forwards the RPC to shards 139 if not utils.is_shard(): 140 rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id) 141 142 143def add_label(name, ignore_exception_if_exists=False, **kwargs): 144 """Adds a new label of a given name. 145 146 @param name: label name. 147 @param ignore_exception_if_exists: If True and the exception was 148 thrown due to the duplicated label name when adding a label, 149 then suppress the exception. Default is False. 150 @param kwargs: keyword args that store more info about a label 151 other than the name. 152 @return: int/long id of a new label. 153 """ 154 # models.Label.add_object() throws model_logic.ValidationError 155 # when it is given a label name that already exists. 156 # However, ValidationError can be thrown with different errors, 157 # and those errors should be thrown up to the call chain. 158 try: 159 label = models.Label.add_object(name=name, **kwargs) 160 except: 161 exc_info = sys.exc_info() 162 if ignore_exception_if_exists: 163 label = rpc_utils.get_label(name) 164 # If the exception is raised not because of duplicated 165 # "name", then raise the original exception. 166 if label is None: 167 raise exc_info[0], exc_info[1], exc_info[2] 168 else: 169 raise exc_info[0], exc_info[1], exc_info[2] 170 return label.id 171 172 173def add_label_to_hosts(id, hosts): 174 """Adds a label of the given id to the given hosts only in local DB. 175 176 @param id: id or name of a label. More often a label name. 177 @param hosts: The hostnames of hosts that need the label. 178 179 @raises models.Label.DoesNotExist: If the label with id doesn't exist. 180 """ 181 label = models.Label.smart_get(id) 182 if label.is_replaced_by_static(): 183 label = models.StaticLabel.smart_get(label.name) 184 185 host_objs = models.Host.smart_get_bulk(hosts) 186 if label.platform: 187 models.Host.check_no_platform(host_objs) 188 # Ensure a host has no more than one board label with it. 189 if label.name.startswith('board:'): 190 models.Host.check_board_labels_allowed(host_objs, [label.name]) 191 label.host_set.add(*host_objs) 192 193 194def _create_label_everywhere(id, hosts): 195 """ 196 Yet another method to create labels. 197 198 ALERT! This method should be run only on master not shards! 199 DO NOT RUN THIS ON A SHARD!!! Deputies will hate you if you do!!! 200 201 This method exists primarily to serve label_add_hosts() and 202 host_add_labels(). Basically it pulls out the label check/add logic 203 from label_add_hosts() into this nice method that not only creates 204 the label but also tells the shards that service the hosts to also 205 create the label. 206 207 @param id: id or name of a label. More often a label name. 208 @param hosts: A list of hostnames or ids. More often hostnames. 209 """ 210 try: 211 label = models.Label.smart_get(id) 212 except models.Label.DoesNotExist: 213 # This matches the type checks in smart_get, which is a hack 214 # in and off itself. The aim here is to create any non-existent 215 # label, which we cannot do if the 'id' specified isn't a label name. 216 if isinstance(id, basestring): 217 label = models.Label.smart_get(add_label(id)) 218 else: 219 raise ValueError('Label id (%s) does not exist. Please specify ' 220 'the argument, id, as a string (label name).' 221 % id) 222 223 # Make sure the label exists on the shard with the same id 224 # as it is on the master. 225 # It is possible that the label is already in a shard because 226 # we are adding a new label only to shards of hosts that the label 227 # is going to be attached. 228 # For example, we add a label L1 to a host in shard S1. 229 # Master and S1 will have L1 but other shards won't. 230 # Later, when we add the same label L1 to hosts in shards S1 and S2, 231 # S1 already has the label but S2 doesn't. 232 # S2 should have the new label without any problem. 233 # We ignore exception in such a case. 234 host_objs = models.Host.smart_get_bulk(hosts) 235 rpc_utils.fanout_rpc( 236 host_objs, 'add_label', include_hostnames=False, 237 name=label.name, ignore_exception_if_exists=True, 238 id=label.id, platform=label.platform) 239 240 241@rpc_utils.route_rpc_to_master 242def label_add_hosts(id, hosts): 243 """Adds a label with the given id to the given hosts. 244 245 This method should be run only on master not shards. 246 The given label will be created if it doesn't exist, provided the `id` 247 supplied is a label name not an int/long id. 248 249 @param id: id or name of a label. More often a label name. 250 @param hosts: A list of hostnames or ids. More often hostnames. 251 252 @raises ValueError: If the id specified is an int/long (label id) 253 while the label does not exist. 254 """ 255 # Create the label. 256 _create_label_everywhere(id, hosts) 257 258 # Add it to the master. 259 add_label_to_hosts(id, hosts) 260 261 # Add it to the shards. 262 host_objs = models.Host.smart_get_bulk(hosts) 263 rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id) 264 265 266def remove_label_from_hosts(id, hosts): 267 """Removes a label of the given id from the given hosts only in local DB. 268 269 @param id: id or name of a label. 270 @param hosts: The hostnames of hosts that need to remove the label from. 271 """ 272 host_objs = models.Host.smart_get_bulk(hosts) 273 label = models.Label.smart_get(id) 274 if label.is_replaced_by_static(): 275 raise error.UnmodifiableLabelException( 276 'Failed to remove label "%s" for hosts "%r" because it is a ' 277 'static label. Use go/chromeos-skylab-inventory-tools to ' 278 'modify this label.' % (label.name, hosts)) 279 280 label.host_set.remove(*host_objs) 281 282 283@rpc_utils.route_rpc_to_master 284def label_remove_hosts(id, hosts): 285 """Removes a label of the given id from the given hosts. 286 287 This method should be run only on master not shards. 288 289 @param id: id or name of a label. 290 @param hosts: A list of hostnames or ids. More often hostnames. 291 """ 292 host_objs = models.Host.smart_get_bulk(hosts) 293 remove_label_from_hosts(id, hosts) 294 295 rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id) 296 297 298def get_labels(exclude_filters=(), **filter_data): 299 """\ 300 @param exclude_filters: A sequence of dictionaries of filters. 301 302 @returns A sequence of nested dictionaries of label information. 303 """ 304 labels = models.Label.query_objects(filter_data) 305 for exclude_filter in exclude_filters: 306 labels = labels.exclude(**exclude_filter) 307 308 if not RESPECT_STATIC_LABELS: 309 return rpc_utils.prepare_rows_as_nested_dicts(labels, ()) 310 311 static_labels = models.StaticLabel.query_objects(filter_data) 312 for exclude_filter in exclude_filters: 313 static_labels = static_labels.exclude(**exclude_filter) 314 315 non_static_lists = rpc_utils.prepare_rows_as_nested_dicts(labels, ()) 316 static_lists = rpc_utils.prepare_rows_as_nested_dicts(static_labels, ()) 317 318 label_ids = [label.id for label in labels] 319 replaced = models.ReplacedLabel.objects.filter(label__id__in=label_ids) 320 replaced_ids = {r.label_id for r in replaced} 321 replaced_label_names = {l.name for l in labels if l.id in replaced_ids} 322 323 return_lists = [] 324 for non_static_label in non_static_lists: 325 if non_static_label.get('id') not in replaced_ids: 326 return_lists.append(non_static_label) 327 328 for static_label in static_lists: 329 if static_label.get('name') in replaced_label_names: 330 return_lists.append(static_label) 331 332 return return_lists 333 334 335# hosts 336 337def add_host(hostname, status=None, locked=None, lock_reason='', protection=None): 338 if locked and not lock_reason: 339 raise model_logic.ValidationError( 340 {'locked': 'Please provide a reason for locking when adding host.'}) 341 342 return models.Host.add_object(hostname=hostname, status=status, 343 locked=locked, lock_reason=lock_reason, 344 protection=protection).id 345 346 347@rpc_utils.route_rpc_to_master 348def modify_host(id, **kwargs): 349 """Modify local attributes of a host. 350 351 If this is called on the master, but the host is assigned to a shard, this 352 will call `modify_host_local` RPC to the responsible shard. This means if 353 a host is being locked using this function, this change will also propagate 354 to shards. 355 When this is called on a shard, the shard just routes the RPC to the master 356 and does nothing. 357 358 @param id: id of the host to modify. 359 @param kwargs: key=value pairs of values to set on the host. 360 """ 361 rpc_utils.check_modify_host(kwargs) 362 host = models.Host.smart_get(id) 363 try: 364 rpc_utils.check_modify_host_locking(host, kwargs) 365 except model_logic.ValidationError as e: 366 if not kwargs.get('force_modify_locking', False): 367 raise 368 logging.exception('The following exception will be ignored and lock ' 369 'modification will be enforced. %s', e) 370 371 # This is required to make `lock_time` for a host be exactly same 372 # between the master and a shard. 373 if kwargs.get('locked', None) and 'lock_time' not in kwargs: 374 kwargs['lock_time'] = datetime.datetime.now() 375 376 # force_modifying_locking is not an internal field in database, remove. 377 shard_kwargs = dict(kwargs) 378 shard_kwargs.pop('force_modify_locking', None) 379 rpc_utils.fanout_rpc([host], 'modify_host_local', 380 include_hostnames=False, id=id, **shard_kwargs) 381 382 # Update the local DB **after** RPC fanout is complete. 383 # This guarantees that the master state is only updated if the shards were 384 # correctly updated. 385 # In case the shard update fails mid-flight and the master-shard desync, we 386 # always consider the master state to be the source-of-truth, and any 387 # (automated) corrective actions will revert the (partial) shard updates. 388 host.update_object(kwargs) 389 390 391def modify_host_local(id, **kwargs): 392 """Modify host attributes in local DB. 393 394 @param id: Host id. 395 @param kwargs: key=value pairs of values to set on the host. 396 """ 397 models.Host.smart_get(id).update_object(kwargs) 398 399 400@rpc_utils.route_rpc_to_master 401def modify_hosts(host_filter_data, update_data): 402 """Modify local attributes of multiple hosts. 403 404 If this is called on the master, but one of the hosts in that match the 405 filters is assigned to a shard, this will call `modify_hosts_local` RPC 406 to the responsible shard. 407 When this is called on a shard, the shard just routes the RPC to the master 408 and does nothing. 409 410 The filters are always applied on the master, not on the shards. This means 411 if the states of a host differ on the master and a shard, the state on the 412 master will be used. I.e. this means: 413 A host was synced to Shard 1. On Shard 1 the status of the host was set to 414 'Repair Failed'. 415 - A call to modify_hosts with host_filter_data={'status': 'Ready'} will 416 update the host (both on the shard and on the master), because the state 417 of the host as the master knows it is still 'Ready'. 418 - A call to modify_hosts with host_filter_data={'status': 'Repair failed' 419 will not update the host, because the filter doesn't apply on the master. 420 421 @param host_filter_data: Filters out which hosts to modify. 422 @param update_data: A dictionary with the changes to make to the hosts. 423 """ 424 update_data = update_data.copy() 425 rpc_utils.check_modify_host(update_data) 426 hosts = models.Host.query_objects(host_filter_data) 427 428 affected_shard_hostnames = set() 429 affected_host_ids = [] 430 431 # Check all hosts before changing data for exception safety. 432 for host in hosts: 433 try: 434 rpc_utils.check_modify_host_locking(host, update_data) 435 except model_logic.ValidationError as e: 436 if not update_data.get('force_modify_locking', False): 437 raise 438 logging.exception('The following exception will be ignored and ' 439 'lock modification will be enforced. %s', e) 440 441 if host.shard: 442 affected_shard_hostnames.add(host.shard.hostname) 443 affected_host_ids.append(host.id) 444 445 # This is required to make `lock_time` for a host be exactly same 446 # between the master and a shard. 447 if update_data.get('locked', None) and 'lock_time' not in update_data: 448 update_data['lock_time'] = datetime.datetime.now() 449 for host in hosts: 450 host.update_object(update_data) 451 452 update_data.pop('force_modify_locking', None) 453 # Caution: Changing the filter from the original here. See docstring. 454 rpc_utils.run_rpc_on_multiple_hostnames( 455 'modify_hosts_local', affected_shard_hostnames, 456 host_filter_data={'id__in': affected_host_ids}, 457 update_data=update_data) 458 459 460def modify_hosts_local(host_filter_data, update_data): 461 """Modify attributes of hosts in local DB. 462 463 @param host_filter_data: Filters out which hosts to modify. 464 @param update_data: A dictionary with the changes to make to the hosts. 465 """ 466 for host in models.Host.query_objects(host_filter_data): 467 host.update_object(update_data) 468 469 470def add_labels_to_host(id, labels): 471 """Adds labels to a given host only in local DB. 472 473 @param id: id or hostname for a host. 474 @param labels: ids or names for labels. 475 """ 476 label_objs = models.Label.smart_get_bulk(labels) 477 if not RESPECT_STATIC_LABELS: 478 models.Host.smart_get(id).labels.add(*label_objs) 479 else: 480 static_labels, non_static_labels = models.Host.classify_label_objects( 481 label_objs) 482 host = models.Host.smart_get(id) 483 host.static_labels.add(*static_labels) 484 host.labels.add(*non_static_labels) 485 486 487@rpc_utils.route_rpc_to_master 488def host_add_labels(id, labels): 489 """Adds labels to a given host. 490 491 @param id: id or hostname for a host. 492 @param labels: ids or names for labels. 493 494 @raises ValidationError: If adding more than one platform/board label. 495 """ 496 # Create the labels on the master/shards. 497 for label in labels: 498 _create_label_everywhere(label, [id]) 499 500 label_objs = models.Label.smart_get_bulk(labels) 501 502 platforms = [label.name for label in label_objs if label.platform] 503 if len(platforms) > 1: 504 raise model_logic.ValidationError( 505 {'labels': ('Adding more than one platform: %s' % 506 ', '.join(platforms))}) 507 508 host_obj = models.Host.smart_get(id) 509 if platforms: 510 models.Host.check_no_platform([host_obj]) 511 if any(label_name.startswith('board:') for label_name in labels): 512 models.Host.check_board_labels_allowed([host_obj], labels) 513 add_labels_to_host(id, labels) 514 515 rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False, 516 id=id, labels=labels) 517 518 519def remove_labels_from_host(id, labels): 520 """Removes labels from a given host only in local DB. 521 522 @param id: id or hostname for a host. 523 @param labels: ids or names for labels. 524 """ 525 label_objs = models.Label.smart_get_bulk(labels) 526 if not RESPECT_STATIC_LABELS: 527 models.Host.smart_get(id).labels.remove(*label_objs) 528 else: 529 static_labels, non_static_labels = models.Host.classify_label_objects( 530 label_objs) 531 host = models.Host.smart_get(id) 532 host.labels.remove(*non_static_labels) 533 if static_labels: 534 logging.info('Cannot remove labels "%r" for host "%r" due to they ' 535 'are static labels. Use ' 536 'go/chromeos-skylab-inventory-tools to modify these ' 537 'labels.', static_labels, id) 538 539 540@rpc_utils.route_rpc_to_master 541def host_remove_labels(id, labels): 542 """Removes labels from a given host. 543 544 @param id: id or hostname for a host. 545 @param labels: ids or names for labels. 546 """ 547 remove_labels_from_host(id, labels) 548 549 host_obj = models.Host.smart_get(id) 550 rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False, 551 id=id, labels=labels) 552 553 554def get_host_attribute(attribute, **host_filter_data): 555 """ 556 @param attribute: string name of attribute 557 @param host_filter_data: filter data to apply to Hosts to choose hosts to 558 act upon 559 """ 560 hosts = rpc_utils.get_host_query((), False, True, host_filter_data) 561 hosts = list(hosts) 562 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 563 'attribute_list') 564 host_attr_dicts = [] 565 host_objs = [] 566 for host_obj in hosts: 567 for attr_obj in host_obj.attribute_list: 568 if attr_obj.attribute == attribute: 569 host_attr_dicts.append(attr_obj.get_object_dict()) 570 host_objs.append(host_obj) 571 572 if RESPECT_STATIC_ATTRIBUTES: 573 for host_attr, host_obj in zip(host_attr_dicts, host_objs): 574 static_attrs = models.StaticHostAttribute.query_objects( 575 {'host_id': host_obj.id, 'attribute': attribute}) 576 if len(static_attrs) > 0: 577 host_attr['value'] = static_attrs[0].value 578 579 return rpc_utils.prepare_for_serialization(host_attr_dicts) 580 581 582@rpc_utils.route_rpc_to_master 583def set_host_attribute(attribute, value, **host_filter_data): 584 """Set an attribute on hosts. 585 586 This RPC is a shim that forwards calls to master to be handled there. 587 588 @param attribute: string name of attribute 589 @param value: string, or None to delete an attribute 590 @param host_filter_data: filter data to apply to Hosts to choose hosts to 591 act upon 592 """ 593 assert not utils.is_shard() 594 set_host_attribute_impl(attribute, value, **host_filter_data) 595 596 597def set_host_attribute_impl(attribute, value, **host_filter_data): 598 """Set an attribute on hosts. 599 600 *** DO NOT CALL THIS RPC from client code *** 601 This RPC exists for master-shard communication only. 602 Call set_host_attribute instead. 603 604 @param attribute: string name of attribute 605 @param value: string, or None to delete an attribute 606 @param host_filter_data: filter data to apply to Hosts to choose hosts to 607 act upon 608 """ 609 assert host_filter_data # disallow accidental actions on all hosts 610 hosts = models.Host.query_objects(host_filter_data) 611 models.AclGroup.check_for_acl_violation_hosts(hosts) 612 for host in hosts: 613 host.set_or_delete_attribute(attribute, value) 614 615 # Master forwards this RPC to shards. 616 if not utils.is_shard(): 617 rpc_utils.fanout_rpc(hosts, 'set_host_attribute_impl', False, 618 attribute=attribute, value=value, **host_filter_data) 619 620 621@rpc_utils.forward_single_host_rpc_to_shard 622def delete_host(id): 623 models.Host.smart_get(id).delete() 624 625 626def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 627 valid_only=True, include_current_job=False, **filter_data): 628 """Get a list of dictionaries which contains the information of hosts. 629 630 @param multiple_labels: match hosts in all of the labels given. Should 631 be a list of label names. 632 @param exclude_only_if_needed_labels: Deprecated. Raise error if it's True. 633 @param include_current_job: Set to True to include ids of currently running 634 job and special task. 635 """ 636 if exclude_only_if_needed_labels: 637 raise error.RPCException('exclude_only_if_needed_labels is deprecated') 638 639 hosts = rpc_utils.get_host_query(multiple_labels, 640 exclude_only_if_needed_labels, 641 valid_only, filter_data) 642 hosts = list(hosts) 643 models.Host.objects.populate_relationships(hosts, models.Label, 644 'label_list') 645 models.Host.objects.populate_relationships(hosts, models.AclGroup, 646 'acl_list') 647 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 648 'attribute_list') 649 models.Host.objects.populate_relationships(hosts, 650 models.StaticHostAttribute, 651 'staticattribute_list') 652 host_dicts = [] 653 for host_obj in hosts: 654 host_dict = host_obj.get_object_dict() 655 host_dict['acls'] = [acl.name for acl in host_obj.acl_list] 656 host_dict['attributes'] = dict((attribute.attribute, attribute.value) 657 for attribute in host_obj.attribute_list) 658 if RESPECT_STATIC_LABELS: 659 label_list = [] 660 # Only keep static labels which has a corresponding entries in 661 # afe_labels. 662 for label in host_obj.label_list: 663 if label.is_replaced_by_static(): 664 static_label = models.StaticLabel.smart_get(label.name) 665 label_list.append(static_label) 666 else: 667 label_list.append(label) 668 669 host_dict['labels'] = [label.name for label in label_list] 670 host_dict['platform'] = rpc_utils.find_platform( 671 host_obj.hostname, label_list) 672 else: 673 host_dict['labels'] = [label.name for label in host_obj.label_list] 674 host_dict['platform'] = rpc_utils.find_platform( 675 host_obj.hostname, host_obj.label_list) 676 677 if RESPECT_STATIC_ATTRIBUTES: 678 # Overwrite attribute with values in afe_static_host_attributes. 679 for attr in host_obj.staticattribute_list: 680 if attr.attribute in host_dict['attributes']: 681 host_dict['attributes'][attr.attribute] = attr.value 682 683 if include_current_job: 684 host_dict['current_job'] = None 685 host_dict['current_special_task'] = None 686 entries = models.HostQueueEntry.objects.filter( 687 host_id=host_dict['id'], active=True, complete=False) 688 if entries: 689 host_dict['current_job'] = ( 690 entries[0].get_object_dict()['job']) 691 tasks = models.SpecialTask.objects.filter( 692 host_id=host_dict['id'], is_active=True, is_complete=False) 693 if tasks: 694 host_dict['current_special_task'] = ( 695 '%d-%s' % (tasks[0].get_object_dict()['id'], 696 tasks[0].get_object_dict()['task'].lower())) 697 host_dicts.append(host_dict) 698 699 return rpc_utils.prepare_for_serialization(host_dicts) 700 701 702def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 703 valid_only=True, **filter_data): 704 """ 705 Same parameters as get_hosts(). 706 707 @returns The number of matching hosts. 708 """ 709 if exclude_only_if_needed_labels: 710 raise error.RPCException('exclude_only_if_needed_labels is deprecated') 711 712 hosts = rpc_utils.get_host_query(multiple_labels, 713 exclude_only_if_needed_labels, 714 valid_only, filter_data) 715 return len(hosts) 716 717 718# tests 719 720def get_tests(**filter_data): 721 return rpc_utils.prepare_for_serialization( 722 models.Test.list_objects(filter_data)) 723 724 725def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name): 726 """Gets the counts of all passed and failed tests from the matching jobs. 727 728 @param job_name_prefix: Name prefix of the jobs to get the summary 729 from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix 730 matching is case insensitive. 731 @param label_name: Label that must be set in the jobs, e.g., 732 'cros-version:butterfly-release/R40-6457.21.0'. 733 734 @returns A summary of the counts of all the passed and failed tests. 735 """ 736 job_ids = list(models.Job.objects.filter( 737 name__istartswith=job_name_prefix, 738 dependency_labels__name=label_name).values_list( 739 'pk', flat=True)) 740 summary = {'passed': 0, 'failed': 0} 741 if not job_ids: 742 return summary 743 744 counts = (tko_models.TestView.objects.filter( 745 afe_job_id__in=job_ids).exclude( 746 test_name='SERVER_JOB').exclude( 747 test_name__startswith='CLIENT_JOB').values( 748 'status').annotate( 749 count=Count('status'))) 750 for status in counts: 751 if status['status'] == 'GOOD': 752 summary['passed'] += status['count'] 753 else: 754 summary['failed'] += status['count'] 755 return summary 756 757 758# profilers 759 760def add_profiler(name, description=None): 761 return models.Profiler.add_object(name=name, description=description).id 762 763 764def modify_profiler(id, **data): 765 models.Profiler.smart_get(id).update_object(data) 766 767 768def delete_profiler(id): 769 models.Profiler.smart_get(id).delete() 770 771 772def get_profilers(**filter_data): 773 return rpc_utils.prepare_for_serialization( 774 models.Profiler.list_objects(filter_data)) 775 776 777# users 778 779def get_users(**filter_data): 780 return rpc_utils.prepare_for_serialization( 781 models.User.list_objects(filter_data)) 782 783 784# acl groups 785 786def add_acl_group(name, description=None): 787 group = models.AclGroup.add_object(name=name, description=description) 788 group.users.add(models.User.current_user()) 789 return group.id 790 791 792def modify_acl_group(id, **data): 793 group = models.AclGroup.smart_get(id) 794 group.check_for_acl_violation_acl_group() 795 group.update_object(data) 796 group.add_current_user_if_empty() 797 798 799def acl_group_add_users(id, users): 800 group = models.AclGroup.smart_get(id) 801 group.check_for_acl_violation_acl_group() 802 users = models.User.smart_get_bulk(users) 803 group.users.add(*users) 804 805 806def acl_group_remove_users(id, users): 807 group = models.AclGroup.smart_get(id) 808 group.check_for_acl_violation_acl_group() 809 users = models.User.smart_get_bulk(users) 810 group.users.remove(*users) 811 group.add_current_user_if_empty() 812 813 814def acl_group_add_hosts(id, hosts): 815 group = models.AclGroup.smart_get(id) 816 group.check_for_acl_violation_acl_group() 817 hosts = models.Host.smart_get_bulk(hosts) 818 group.hosts.add(*hosts) 819 group.on_host_membership_change() 820 821 822def acl_group_remove_hosts(id, hosts): 823 group = models.AclGroup.smart_get(id) 824 group.check_for_acl_violation_acl_group() 825 hosts = models.Host.smart_get_bulk(hosts) 826 group.hosts.remove(*hosts) 827 group.on_host_membership_change() 828 829 830def delete_acl_group(id): 831 models.AclGroup.smart_get(id).delete() 832 833 834def get_acl_groups(**filter_data): 835 acl_groups = models.AclGroup.list_objects(filter_data) 836 for acl_group in acl_groups: 837 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id']) 838 acl_group['users'] = [user.login 839 for user in acl_group_obj.users.all()] 840 acl_group['hosts'] = [host.hostname 841 for host in acl_group_obj.hosts.all()] 842 return rpc_utils.prepare_for_serialization(acl_groups) 843 844 845# jobs 846 847def generate_control_file(tests=(), profilers=(), 848 client_control_file='', use_container=False, 849 profile_only=None, db_tests=True, 850 test_source_build=None): 851 """ 852 Generates a client-side control file to run tests. 853 854 @param tests List of tests to run. See db_tests for more information. 855 @param profilers List of profilers to activate during the job. 856 @param client_control_file The contents of a client-side control file to 857 run at the end of all tests. If this is supplied, all tests must be 858 client side. 859 TODO: in the future we should support server control files directly 860 to wrap with a kernel. That'll require changing the parameter 861 name and adding a boolean to indicate if it is a client or server 862 control file. 863 @param use_container unused argument today. TODO: Enable containers 864 on the host during a client side test. 865 @param profile_only A boolean that indicates what default profile_only 866 mode to use in the control file. Passing None will generate a 867 control file that does not explcitly set the default mode at all. 868 @param db_tests: if True, the test object can be found in the database 869 backing the test model. In this case, tests is a tuple 870 of test IDs which are used to retrieve the test objects 871 from the database. If False, tests is a tuple of test 872 dictionaries stored client-side in the AFE. 873 @param test_source_build: Build to be used to retrieve test code. Default 874 to None. 875 876 @returns a dict with the following keys: 877 control_file: str, The control file text. 878 is_server: bool, is the control file a server-side control file? 879 synch_count: How many machines the job uses per autoserv execution. 880 synch_count == 1 means the job is asynchronous. 881 dependencies: A list of the names of labels on which the job depends. 882 """ 883 if not tests and not client_control_file: 884 return dict(control_file='', is_server=False, synch_count=1, 885 dependencies=[]) 886 887 cf_info, test_objects, profiler_objects = ( 888 rpc_utils.prepare_generate_control_file(tests, profilers, 889 db_tests)) 890 cf_info['control_file'] = control_file_lib.generate_control( 891 tests=test_objects, profilers=profiler_objects, 892 is_server=cf_info['is_server'], 893 client_control_file=client_control_file, profile_only=profile_only, 894 test_source_build=test_source_build) 895 return cf_info 896 897 898def create_job_page_handler(name, priority, control_file, control_type, 899 image=None, hostless=False, firmware_rw_build=None, 900 firmware_ro_build=None, test_source_build=None, 901 is_cloning=False, cheets_build=None, **kwargs): 902 """\ 903 Create and enqueue a job. 904 905 @param name name of this job 906 @param priority Integer priority of this job. Higher is more important. 907 @param control_file String contents of the control file. 908 @param control_type Type of control file, Client or Server. 909 @param image: ChromeOS build to be installed in the dut. Default to None. 910 @param firmware_rw_build: Firmware build to update RW firmware. Default to 911 None, i.e., RW firmware will not be updated. 912 @param firmware_ro_build: Firmware build to update RO firmware. Default to 913 None, i.e., RO firmware will not be updated. 914 @param test_source_build: Build to be used to retrieve test code. Default 915 to None. 916 @param is_cloning: True if creating a cloning job. 917 @param cheets_build: ChromeOS Android build to be installed in the dut. 918 Default to None. Cheets build will not be updated. 919 @param kwargs extra args that will be required by create_suite_job or 920 create_job. 921 922 @returns The created Job id number. 923 """ 924 test_args = {} 925 if kwargs.get('args'): 926 # args' format is: ['disable_sysinfo=False', 'fast=True', ...] 927 args = kwargs.get('args') 928 for arg in args: 929 k, v = arg.split('=')[0], arg.split('=')[1] 930 test_args[k] = v 931 932 if is_cloning: 933 logging.info('Start to clone a new job') 934 # When cloning a job, hosts and meta_hosts should not exist together, 935 # which would cause host-scheduler to schedule two hqe jobs to one host 936 # at the same time, and crash itself. Clear meta_hosts for this case. 937 if kwargs.get('hosts') and kwargs.get('meta_hosts'): 938 kwargs['meta_hosts'] = [] 939 else: 940 logging.info('Start to create a new job') 941 control_file = rpc_utils.encode_ascii(control_file) 942 if not control_file: 943 raise model_logic.ValidationError({ 944 'control_file' : "Control file cannot be empty"}) 945 946 if image and hostless: 947 builds = {} 948 builds[provision.CROS_VERSION_PREFIX] = image 949 if cheets_build: 950 builds[provision.CROS_ANDROID_VERSION_PREFIX] = cheets_build 951 if firmware_rw_build: 952 builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build 953 if firmware_ro_build: 954 builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build 955 return create_suite_job( 956 name=name, control_file=control_file, priority=priority, 957 builds=builds, test_source_build=test_source_build, 958 is_cloning=is_cloning, test_args=test_args, **kwargs) 959 960 return create_job(name, priority, control_file, control_type, image=image, 961 hostless=hostless, test_args=test_args, **kwargs) 962 963 964@rpc_utils.route_rpc_to_master 965def create_job( 966 name, 967 priority, 968 control_file, 969 control_type, 970 hosts=(), 971 meta_hosts=(), 972 one_time_hosts=(), 973 synch_count=None, 974 is_template=False, 975 timeout=None, 976 timeout_mins=None, 977 max_runtime_mins=None, 978 run_verify=False, 979 email_list='', 980 dependencies=(), 981 reboot_before=None, 982 reboot_after=None, 983 parse_failed_repair=None, 984 hostless=False, 985 keyvals=None, 986 drone_set=None, 987 image=None, 988 parent_job_id=None, 989 test_retry=0, 990 run_reset=True, 991 require_ssp=None, 992 test_args=None, 993 **kwargs): 994 """\ 995 Create and enqueue a job. 996 997 @param name name of this job 998 @param priority Integer priority of this job. Higher is more important. 999 @param control_file String contents of the control file. 1000 @param control_type Type of control file, Client or Server. 1001 @param synch_count How many machines the job uses per autoserv execution. 1002 synch_count == 1 means the job is asynchronous. If an atomic group is 1003 given this value is treated as a minimum. 1004 @param is_template If true then create a template job. 1005 @param timeout Hours after this call returns until the job times out. 1006 @param timeout_mins Minutes after this call returns until the job times 1007 out. 1008 @param max_runtime_mins Minutes from job starting time until job times out 1009 @param run_verify Should the host be verified before running the test? 1010 @param email_list String containing emails to mail when the job is done 1011 @param dependencies List of label names on which this job depends 1012 @param reboot_before Never, If dirty, or Always 1013 @param reboot_after Never, If all tests passed, or Always 1014 @param parse_failed_repair if true, results of failed repairs launched by 1015 this job will be parsed as part of the job. 1016 @param hostless if true, create a hostless job 1017 @param keyvals dict of keyvals to associate with the job 1018 @param hosts List of hosts to run job on. 1019 @param meta_hosts List where each entry is a label name, and for each entry 1020 one host will be chosen from that label to run the job on. 1021 @param one_time_hosts List of hosts not in the database to run the job on. 1022 @param drone_set The name of the drone set to run this test on. 1023 @param image OS image to install before running job. 1024 @param parent_job_id id of a job considered to be parent of created job. 1025 @param test_retry DEPRECATED 1026 @param run_reset Should the host be reset before running the test? 1027 @param require_ssp Set to True to require server-side packaging to run the 1028 test. If it's set to None, drone will still try to run 1029 the server side with server-side packaging. If the 1030 autotest-server package doesn't exist for the build or 1031 image is not set, drone will run the test without server- 1032 side packaging. Default is None. 1033 @param test_args A dict of args passed to be injected into control file. 1034 @param kwargs extra keyword args. NOT USED. 1035 1036 @returns The created Job id number. 1037 """ 1038 if test_args: 1039 control_file = tools.inject_vars(test_args, control_file) 1040 if image: 1041 dependencies += (provision.image_version_to_label(image),) 1042 return rpc_utils.create_job_common( 1043 name=name, 1044 priority=priority, 1045 control_type=control_type, 1046 control_file=control_file, 1047 hosts=hosts, 1048 meta_hosts=meta_hosts, 1049 one_time_hosts=one_time_hosts, 1050 synch_count=synch_count, 1051 is_template=is_template, 1052 timeout=timeout, 1053 timeout_mins=timeout_mins, 1054 max_runtime_mins=max_runtime_mins, 1055 run_verify=run_verify, 1056 email_list=email_list, 1057 dependencies=dependencies, 1058 reboot_before=reboot_before, 1059 reboot_after=reboot_after, 1060 parse_failed_repair=parse_failed_repair, 1061 hostless=hostless, 1062 keyvals=keyvals, 1063 drone_set=drone_set, 1064 parent_job_id=parent_job_id, 1065 run_reset=run_reset, 1066 require_ssp=require_ssp) 1067 1068 1069def abort_host_queue_entries(**filter_data): 1070 """\ 1071 Abort a set of host queue entries. 1072 1073 @return: A list of dictionaries, each contains information 1074 about an aborted HQE. 1075 """ 1076 query = models.HostQueueEntry.query_objects(filter_data) 1077 1078 # Dont allow aborts on: 1079 # 1. Jobs that have already completed (whether or not they were aborted) 1080 # 2. Jobs that we have already been aborted (but may not have completed) 1081 query = query.filter(complete=False).filter(aborted=False) 1082 models.AclGroup.check_abort_permissions(query) 1083 host_queue_entries = list(query.select_related()) 1084 rpc_utils.check_abort_synchronous_jobs(host_queue_entries) 1085 1086 models.HostQueueEntry.abort_host_queue_entries(host_queue_entries) 1087 hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id, 1088 'Job name': hqe.job.name} for hqe in host_queue_entries] 1089 return hqe_info 1090 1091 1092def abort_special_tasks(**filter_data): 1093 """\ 1094 Abort the special task, or tasks, specified in the filter. 1095 """ 1096 query = models.SpecialTask.query_objects(filter_data) 1097 special_tasks = query.filter(is_active=True) 1098 for task in special_tasks: 1099 task.abort() 1100 1101 1102def _call_special_tasks_on_hosts(task, hosts): 1103 """\ 1104 Schedules a set of hosts for a special task. 1105 1106 @returns A list of hostnames that a special task was created for. 1107 """ 1108 models.AclGroup.check_for_acl_violation_hosts(hosts) 1109 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1110 if shard_host_map and not utils.is_shard(): 1111 raise ValueError('The following hosts are on shards, please ' 1112 'follow the link to the shards and create jobs ' 1113 'there instead. %s.' % shard_host_map) 1114 for host in hosts: 1115 models.SpecialTask.schedule_special_task(host, task) 1116 return list(sorted(host.hostname for host in hosts)) 1117 1118 1119def _forward_special_tasks_on_hosts(task, rpc, **filter_data): 1120 """Forward special tasks to corresponding shards. 1121 1122 For master, when special tasks are fired on hosts that are sharded, 1123 forward the RPC to corresponding shards. 1124 1125 For shard, create special task records in local DB. 1126 1127 @param task: Enum value of frontend.afe.models.SpecialTask.Task 1128 @param rpc: RPC name to forward. 1129 @param filter_data: Filter keywords to be used for DB query. 1130 1131 @return: A list of hostnames that a special task was created for. 1132 """ 1133 hosts = models.Host.query_objects(filter_data) 1134 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1135 1136 # Filter out hosts on a shard from those on the master, forward 1137 # rpcs to the shard with an additional hostname__in filter, and 1138 # create a local SpecialTask for each remaining host. 1139 if shard_host_map and not utils.is_shard(): 1140 hosts = [h for h in hosts if h.shard is None] 1141 for shard, hostnames in shard_host_map.iteritems(): 1142 1143 # The main client of this module is the frontend website, and 1144 # it invokes it with an 'id' or an 'id__in' filter. Regardless, 1145 # the 'hostname' filter should narrow down the list of hosts on 1146 # each shard even though we supply all the ids in filter_data. 1147 # This method uses hostname instead of id because it fits better 1148 # with the overall architecture of redirection functions in 1149 # rpc_utils. 1150 shard_filter = filter_data.copy() 1151 shard_filter['hostname__in'] = hostnames 1152 rpc_utils.run_rpc_on_multiple_hostnames( 1153 rpc, [shard], **shard_filter) 1154 1155 # There is a race condition here if someone assigns a shard to one of these 1156 # hosts before we create the task. The host will stay on the master if: 1157 # 1. The host is not Ready 1158 # 2. The host is Ready but has a task 1159 # But if the host is Ready and doesn't have a task yet, it will get sent 1160 # to the shard as we're creating a task here. 1161 1162 # Given that we only rarely verify Ready hosts it isn't worth putting this 1163 # entire method in a transaction. The worst case scenario is that we have 1164 # a verify running on a Ready host while the shard is using it, if the 1165 # verify fails no subsequent tasks will be created against the host on the 1166 # master, and verifies are safe enough that this is OK. 1167 return _call_special_tasks_on_hosts(task, hosts) 1168 1169 1170def reverify_hosts(**filter_data): 1171 """\ 1172 Schedules a set of hosts for verify. 1173 1174 @returns A list of hostnames that a verify task was created for. 1175 """ 1176 return _forward_special_tasks_on_hosts( 1177 models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data) 1178 1179 1180def repair_hosts(**filter_data): 1181 """\ 1182 Schedules a set of hosts for repair. 1183 1184 @returns A list of hostnames that a repair task was created for. 1185 """ 1186 return _forward_special_tasks_on_hosts( 1187 models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data) 1188 1189 1190def get_jobs(not_yet_run=False, running=False, finished=False, 1191 suite=False, sub=False, standalone=False, **filter_data): 1192 """\ 1193 Extra status filter args for get_jobs: 1194 -not_yet_run: Include only jobs that have not yet started running. 1195 -running: Include only jobs that have start running but for which not 1196 all hosts have completed. 1197 -finished: Include only jobs for which all hosts have completed (or 1198 aborted). 1199 1200 Extra type filter args for get_jobs: 1201 -suite: Include only jobs with child jobs. 1202 -sub: Include only jobs with a parent job. 1203 -standalone: Inlcude only jobs with no child or parent jobs. 1204 At most one of these three fields should be specified. 1205 """ 1206 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1207 running, 1208 finished) 1209 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1210 suite, 1211 sub, 1212 standalone) 1213 job_dicts = [] 1214 jobs = list(models.Job.query_objects(filter_data)) 1215 models.Job.objects.populate_relationships(jobs, models.Label, 1216 'dependencies') 1217 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') 1218 for job in jobs: 1219 job_dict = job.get_object_dict() 1220 job_dict['dependencies'] = ','.join(label.name 1221 for label in job.dependencies) 1222 job_dict['keyvals'] = dict((keyval.key, keyval.value) 1223 for keyval in job.keyvals) 1224 job_dicts.append(job_dict) 1225 return rpc_utils.prepare_for_serialization(job_dicts) 1226 1227 1228def get_num_jobs(not_yet_run=False, running=False, finished=False, 1229 suite=False, sub=False, standalone=False, 1230 **filter_data): 1231 """\ 1232 See get_jobs() for documentation of extra filter parameters. 1233 """ 1234 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1235 running, 1236 finished) 1237 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1238 suite, 1239 sub, 1240 standalone) 1241 return models.Job.query_count(filter_data) 1242 1243 1244def get_jobs_summary(**filter_data): 1245 """\ 1246 Like get_jobs(), but adds 'status_counts' and 'result_counts' field. 1247 1248 'status_counts' filed is a dictionary mapping status strings to the number 1249 of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}. 1250 1251 'result_counts' field is piped to tko's rpc_interface and has the return 1252 format specified under get_group_counts. 1253 """ 1254 jobs = get_jobs(**filter_data) 1255 ids = [job['id'] for job in jobs] 1256 all_status_counts = models.Job.objects.get_status_counts(ids) 1257 for job in jobs: 1258 job['status_counts'] = all_status_counts[job['id']] 1259 job['result_counts'] = tko_rpc_interface.get_status_counts( 1260 ['afe_job_id', 'afe_job_id'], 1261 header_groups=[['afe_job_id'], ['afe_job_id']], 1262 **{'afe_job_id': job['id']}) 1263 return rpc_utils.prepare_for_serialization(jobs) 1264 1265 1266def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None): 1267 """\ 1268 Retrieves all the information needed to clone a job. 1269 """ 1270 job = models.Job.objects.get(id=id) 1271 job_info = rpc_utils.get_job_info(job, 1272 preserve_metahosts, 1273 queue_entry_filter_data) 1274 1275 host_dicts = [] 1276 for host in job_info['hosts']: 1277 host_dict = get_hosts(id=host.id)[0] 1278 other_labels = host_dict['labels'] 1279 if host_dict['platform']: 1280 other_labels.remove(host_dict['platform']) 1281 host_dict['other_labels'] = ', '.join(other_labels) 1282 host_dicts.append(host_dict) 1283 1284 for host in job_info['one_time_hosts']: 1285 host_dict = dict(hostname=host.hostname, 1286 id=host.id, 1287 platform='(one-time host)', 1288 locked_text='') 1289 host_dicts.append(host_dict) 1290 1291 # convert keys from Label objects to strings (names of labels) 1292 meta_host_counts = dict((meta_host.name, count) for meta_host, count 1293 in job_info['meta_host_counts'].iteritems()) 1294 1295 info = dict(job=job.get_object_dict(), 1296 meta_host_counts=meta_host_counts, 1297 hosts=host_dicts) 1298 info['job']['dependencies'] = job_info['dependencies'] 1299 info['hostless'] = job_info['hostless'] 1300 info['drone_set'] = job.drone_set and job.drone_set.name 1301 1302 image = _get_image_for_job(job, job_info['hostless']) 1303 if image: 1304 info['job']['image'] = image 1305 1306 return rpc_utils.prepare_for_serialization(info) 1307 1308 1309def _get_image_for_job(job, hostless): 1310 """Gets the image used for a job. 1311 1312 Gets the image used for an AFE job from the job's keyvals 'build' or 1313 'builds'. If that fails, and the job is a hostless job, tries to 1314 get the image from its control file attributes 'build' or 'builds'. 1315 1316 TODO(ntang): Needs to handle FAFT with two builds for ro/rw. 1317 1318 @param job An AFE job object. 1319 @param hostless Boolean indicating whether the job is hostless. 1320 1321 @returns The image build used for the job. 1322 """ 1323 keyvals = job.keyval_dict() 1324 image = keyvals.get('build') 1325 if not image: 1326 value = keyvals.get('builds') 1327 builds = None 1328 if isinstance(value, dict): 1329 builds = value 1330 elif isinstance(value, basestring): 1331 builds = ast.literal_eval(value) 1332 if builds: 1333 image = builds.get('cros-version') 1334 if not image and hostless and job.control_file: 1335 try: 1336 control_obj = control_data.parse_control_string( 1337 job.control_file) 1338 if hasattr(control_obj, 'build'): 1339 image = getattr(control_obj, 'build') 1340 if not image and hasattr(control_obj, 'builds'): 1341 builds = getattr(control_obj, 'builds') 1342 image = builds.get('cros-version') 1343 except: 1344 logging.warning('Failed to parse control file for job: %s', 1345 job.name) 1346 return image 1347 1348 1349def get_host_queue_entries_by_insert_time( 1350 insert_time_after=None, insert_time_before=None, **filter_data): 1351 """Like get_host_queue_entries, but using the insert index table. 1352 1353 @param insert_time_after: A lower bound on insert_time 1354 @param insert_time_before: An upper bound on insert_time 1355 @returns A sequence of nested dictionaries of host and job information. 1356 """ 1357 assert insert_time_after is not None or insert_time_before is not None, \ 1358 ('Caller to get_host_queue_entries_by_insert_time must provide either' 1359 ' insert_time_after or insert_time_before.') 1360 # Get insert bounds on the index of the host queue entries. 1361 if insert_time_after: 1362 query = models.HostQueueEntryStartTimes.objects.filter( 1363 # Note: '-insert_time' means descending. We want the largest 1364 # insert time smaller than the insert time. 1365 insert_time__lte=insert_time_after).order_by('-insert_time') 1366 try: 1367 constraint = query[0].highest_hqe_id 1368 if 'id__gte' in filter_data: 1369 constraint = max(constraint, filter_data['id__gte']) 1370 filter_data['id__gte'] = constraint 1371 except IndexError: 1372 pass 1373 1374 # Get end bounds. 1375 if insert_time_before: 1376 query = models.HostQueueEntryStartTimes.objects.filter( 1377 insert_time__gte=insert_time_before).order_by('insert_time') 1378 try: 1379 constraint = query[0].highest_hqe_id 1380 if 'id__lte' in filter_data: 1381 constraint = min(constraint, filter_data['id__lte']) 1382 filter_data['id__lte'] = constraint 1383 except IndexError: 1384 pass 1385 1386 return rpc_utils.prepare_rows_as_nested_dicts( 1387 models.HostQueueEntry.query_objects(filter_data), 1388 ('host', 'job')) 1389 1390 1391def get_host_queue_entries(start_time=None, end_time=None, **filter_data): 1392 """\ 1393 @returns A sequence of nested dictionaries of host and job information. 1394 """ 1395 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1396 'started_on__lte', 1397 start_time, 1398 end_time, 1399 **filter_data) 1400 return rpc_utils.prepare_rows_as_nested_dicts( 1401 models.HostQueueEntry.query_objects(filter_data), 1402 ('host', 'job')) 1403 1404 1405def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data): 1406 """\ 1407 Get the number of host queue entries associated with this job. 1408 """ 1409 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1410 'started_on__lte', 1411 start_time, 1412 end_time, 1413 **filter_data) 1414 return models.HostQueueEntry.query_count(filter_data) 1415 1416 1417def get_hqe_percentage_complete(**filter_data): 1418 """ 1419 Computes the fraction of host queue entries matching the given filter data 1420 that are complete. 1421 """ 1422 query = models.HostQueueEntry.query_objects(filter_data) 1423 complete_count = query.filter(complete=True).count() 1424 total_count = query.count() 1425 if total_count == 0: 1426 return 1 1427 return float(complete_count) / total_count 1428 1429 1430# special tasks 1431 1432def get_special_tasks(**filter_data): 1433 """Get special task entries from the local database. 1434 1435 Query the special tasks table for tasks matching the given 1436 `filter_data`, and return a list of the results. No attempt is 1437 made to forward the call to shards; the buck will stop here. 1438 The caller is expected to know the target shard for such reasons 1439 as: 1440 * The caller is a service (such as gs_offloader) configured 1441 to operate on behalf of one specific shard, and no other. 1442 * The caller has a host as a parameter, and knows that this is 1443 the shard assigned to that host. 1444 1445 @param filter_data Filter keywords to pass to the underlying 1446 database query. 1447 1448 """ 1449 return rpc_utils.prepare_rows_as_nested_dicts( 1450 models.SpecialTask.query_objects(filter_data), 1451 ('host', 'queue_entry')) 1452 1453 1454def get_host_special_tasks(host_id, **filter_data): 1455 """Get special task entries for a given host. 1456 1457 Query the special tasks table for tasks that ran on the host 1458 given by `host_id` and matching the given `filter_data`. 1459 Return a list of the results. If the host is assigned to a 1460 shard, forward this call to that shard. 1461 1462 @param host_id Id in the database of the target host. 1463 @param filter_data Filter keywords to pass to the underlying 1464 database query. 1465 1466 """ 1467 # Retrieve host data even if the host is in an invalid state. 1468 host = models.Host.smart_get(host_id, False) 1469 if not host.shard: 1470 return get_special_tasks(host_id=host_id, **filter_data) 1471 else: 1472 # The return values from AFE methods are post-processed 1473 # objects that aren't JSON-serializable. So, we have to 1474 # call AFE.run() to get the raw, serializable output from 1475 # the shard. 1476 shard_afe = frontend.AFE(server=host.shard.hostname) 1477 return shard_afe.run('get_special_tasks', 1478 host_id=host_id, **filter_data) 1479 1480 1481def get_num_special_tasks(**kwargs): 1482 """Get the number of special task entries from the local database. 1483 1484 Query the special tasks table for tasks matching the given 'kwargs', 1485 and return the number of the results. No attempt is made to forward 1486 the call to shards; the buck will stop here. 1487 1488 @param kwargs Filter keywords to pass to the underlying database query. 1489 1490 """ 1491 return models.SpecialTask.query_count(kwargs) 1492 1493 1494def get_host_num_special_tasks(host, **kwargs): 1495 """Get special task entries for a given host. 1496 1497 Query the special tasks table for tasks that ran on the host 1498 given by 'host' and matching the given 'kwargs'. 1499 Return a list of the results. If the host is assigned to a 1500 shard, forward this call to that shard. 1501 1502 @param host id or name of a host. More often a hostname. 1503 @param kwargs Filter keywords to pass to the underlying database query. 1504 1505 """ 1506 # Retrieve host data even if the host is in an invalid state. 1507 host_model = models.Host.smart_get(host, False) 1508 if not host_model.shard: 1509 return get_num_special_tasks(host=host, **kwargs) 1510 else: 1511 shard_afe = frontend.AFE(server=host_model.shard.hostname) 1512 return shard_afe.run('get_num_special_tasks', host=host, **kwargs) 1513 1514 1515def get_status_task(host_id, end_time): 1516 """Get the "status task" for a host from the local shard. 1517 1518 Returns a single special task representing the given host's 1519 "status task". The status task is a completed special task that 1520 identifies whether the corresponding host was working or broken 1521 when it completed. A successful task indicates a working host; 1522 a failed task indicates broken. 1523 1524 This call will not be forward to a shard; the receiving server 1525 must be the shard that owns the host. 1526 1527 @param host_id Id in the database of the target host. 1528 @param end_time Time reference for the host's status. 1529 1530 @return A single task; its status (successful or not) 1531 corresponds to the status of the host (working or 1532 broken) at the given time. If no task is found, return 1533 `None`. 1534 1535 """ 1536 tasklist = rpc_utils.prepare_rows_as_nested_dicts( 1537 status_history.get_status_task(host_id, end_time), 1538 ('host', 'queue_entry')) 1539 return tasklist[0] if tasklist else None 1540 1541 1542def get_host_status_task(host_id, end_time): 1543 """Get the "status task" for a host from its owning shard. 1544 1545 Finds the given host's owning shard, and forwards to it a call 1546 to `get_status_task()` (see above). 1547 1548 @param host_id Id in the database of the target host. 1549 @param end_time Time reference for the host's status. 1550 1551 @return A single task; its status (successful or not) 1552 corresponds to the status of the host (working or 1553 broken) at the given time. If no task is found, return 1554 `None`. 1555 1556 """ 1557 host = models.Host.smart_get(host_id) 1558 if not host.shard: 1559 return get_status_task(host_id, end_time) 1560 else: 1561 # The return values from AFE methods are post-processed 1562 # objects that aren't JSON-serializable. So, we have to 1563 # call AFE.run() to get the raw, serializable output from 1564 # the shard. 1565 shard_afe = frontend.AFE(server=host.shard.hostname) 1566 return shard_afe.run('get_status_task', 1567 host_id=host_id, end_time=end_time) 1568 1569 1570def get_host_diagnosis_interval(host_id, end_time, success): 1571 """Find a "diagnosis interval" for a given host. 1572 1573 A "diagnosis interval" identifies a start and end time where 1574 the host went from "working" to "broken", or vice versa. The 1575 interval's starting time is the starting time of the last status 1576 task with the old status; the end time is the finish time of the 1577 first status task with the new status. 1578 1579 This routine finds the most recent diagnosis interval for the 1580 given host prior to `end_time`, with a starting status matching 1581 `success`. If `success` is true, the interval will start with a 1582 successful status task; if false the interval will start with a 1583 failed status task. 1584 1585 @param host_id Id in the database of the target host. 1586 @param end_time Time reference for the diagnosis interval. 1587 @param success Whether the diagnosis interval should start 1588 with a successful or failed status task. 1589 1590 @return A list of two strings. The first is the timestamp for 1591 the beginning of the interval; the second is the 1592 timestamp for the end. If the host has never changed 1593 state, the list is empty. 1594 1595 """ 1596 host = models.Host.smart_get(host_id) 1597 if not host.shard or utils.is_shard(): 1598 return status_history.get_diagnosis_interval( 1599 host_id, end_time, success) 1600 else: 1601 shard_afe = frontend.AFE(server=host.shard.hostname) 1602 return shard_afe.get_host_diagnosis_interval( 1603 host_id, end_time, success) 1604 1605 1606# support for host detail view 1607 1608def get_host_queue_entries_and_special_tasks(host, query_start=None, 1609 query_limit=None, start_time=None, 1610 end_time=None): 1611 """ 1612 @returns an interleaved list of HostQueueEntries and SpecialTasks, 1613 in approximate run order. each dict contains keys for type, host, 1614 job, status, started_on, execution_path, and ID. 1615 """ 1616 total_limit = None 1617 if query_limit is not None: 1618 total_limit = query_start + query_limit 1619 filter_data_common = {'host': host, 1620 'query_limit': total_limit, 1621 'sort_by': ['-id']} 1622 1623 filter_data_special_tasks = rpc_utils.inject_times_to_filter( 1624 'time_started__gte', 'time_started__lte', start_time, end_time, 1625 **filter_data_common) 1626 1627 queue_entries = get_host_queue_entries( 1628 start_time, end_time, **filter_data_common) 1629 special_tasks = get_host_special_tasks(host, **filter_data_special_tasks) 1630 1631 interleaved_entries = rpc_utils.interleave_entries(queue_entries, 1632 special_tasks) 1633 if query_start is not None: 1634 interleaved_entries = interleaved_entries[query_start:] 1635 if query_limit is not None: 1636 interleaved_entries = interleaved_entries[:query_limit] 1637 return rpc_utils.prepare_host_queue_entries_and_special_tasks( 1638 interleaved_entries, queue_entries) 1639 1640 1641def get_num_host_queue_entries_and_special_tasks(host, start_time=None, 1642 end_time=None): 1643 filter_data_common = {'host': host} 1644 1645 filter_data_queue_entries, filter_data_special_tasks = ( 1646 rpc_utils.inject_times_to_hqe_special_tasks_filters( 1647 filter_data_common, start_time, end_time)) 1648 1649 return (models.HostQueueEntry.query_count(filter_data_queue_entries) 1650 + get_host_num_special_tasks(**filter_data_special_tasks)) 1651 1652 1653# other 1654 1655def echo(data=""): 1656 """\ 1657 Returns a passed in string. For doing a basic test to see if RPC calls 1658 can successfully be made. 1659 """ 1660 return data 1661 1662 1663def get_motd(): 1664 """\ 1665 Returns the message of the day as a string. 1666 """ 1667 return rpc_utils.get_motd() 1668 1669 1670def get_static_data(): 1671 """\ 1672 Returns a dictionary containing a bunch of data that shouldn't change 1673 often and is otherwise inaccessible. This includes: 1674 1675 priorities: List of job priority choices. 1676 default_priority: Default priority value for new jobs. 1677 users: Sorted list of all users. 1678 labels: Sorted list of labels not start with 'cros-version' and 1679 'fw-version'. 1680 tests: Sorted list of all tests. 1681 profilers: Sorted list of all profilers. 1682 current_user: Logged-in username. 1683 host_statuses: Sorted list of possible Host statuses. 1684 job_statuses: Sorted list of possible HostQueueEntry statuses. 1685 job_timeout_default: The default job timeout length in minutes. 1686 parse_failed_repair_default: Default value for the parse_failed_repair job 1687 option. 1688 reboot_before_options: A list of valid RebootBefore string enums. 1689 reboot_after_options: A list of valid RebootAfter string enums. 1690 motd: Server's message of the day. 1691 status_dictionary: A mapping from one word job status names to a more 1692 informative description. 1693 """ 1694 1695 default_drone_set_name = models.DroneSet.default_drone_set_name() 1696 drone_sets = ([default_drone_set_name] + 1697 sorted(drone_set.name for drone_set in 1698 models.DroneSet.objects.exclude( 1699 name=default_drone_set_name))) 1700 1701 result = {} 1702 result['priorities'] = priorities.Priority.choices() 1703 result['default_priority'] = 'Default' 1704 result['max_schedulable_priority'] = priorities.Priority.DEFAULT 1705 result['users'] = get_users(sort_by=['login']) 1706 1707 label_exclude_filters = [{'name__startswith': 'cros-version'}, 1708 {'name__startswith': 'fw-version'}, 1709 {'name__startswith': 'fwrw-version'}, 1710 {'name__startswith': 'fwro-version'}] 1711 result['labels'] = get_labels( 1712 label_exclude_filters, 1713 sort_by=['-platform', 'name']) 1714 1715 result['tests'] = get_tests(sort_by=['name']) 1716 result['profilers'] = get_profilers(sort_by=['name']) 1717 result['current_user'] = rpc_utils.prepare_for_serialization( 1718 models.User.current_user().get_object_dict()) 1719 result['host_statuses'] = sorted(models.Host.Status.names) 1720 result['job_statuses'] = sorted(models.HostQueueEntry.Status.names) 1721 result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS 1722 result['job_max_runtime_mins_default'] = ( 1723 models.Job.DEFAULT_MAX_RUNTIME_MINS) 1724 result['parse_failed_repair_default'] = bool( 1725 models.Job.DEFAULT_PARSE_FAILED_REPAIR) 1726 result['reboot_before_options'] = model_attributes.RebootBefore.names 1727 result['reboot_after_options'] = model_attributes.RebootAfter.names 1728 result['motd'] = rpc_utils.get_motd() 1729 result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled() 1730 result['drone_sets'] = drone_sets 1731 1732 result['status_dictionary'] = {"Aborted": "Aborted", 1733 "Verifying": "Verifying Host", 1734 "Provisioning": "Provisioning Host", 1735 "Pending": "Waiting on other hosts", 1736 "Running": "Running autoserv", 1737 "Completed": "Autoserv completed", 1738 "Failed": "Failed to complete", 1739 "Queued": "Queued", 1740 "Starting": "Next in host's queue", 1741 "Stopped": "Other host(s) failed verify", 1742 "Parsing": "Awaiting parse of final results", 1743 "Gathering": "Gathering log files", 1744 "Waiting": "Waiting for scheduler action", 1745 "Archiving": "Archiving results", 1746 "Resetting": "Resetting hosts"} 1747 1748 result['wmatrix_url'] = rpc_utils.get_wmatrix_url() 1749 result['stainless_url'] = rpc_utils.get_stainless_url() 1750 result['is_moblab'] = bool(utils.is_moblab()) 1751 1752 return result 1753 1754 1755def get_server_time(): 1756 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M") 1757 1758 1759def ping_db(): 1760 """Simple connection test to db""" 1761 try: 1762 db_connection.cursor() 1763 except DatabaseError: 1764 return [False] 1765 return [True] 1766 1767 1768def get_hosts_by_attribute(attribute, value): 1769 """ 1770 Get the list of valid hosts that share the same host attribute value. 1771 1772 @param attribute: String of the host attribute to check. 1773 @param value: String of the value that is shared between hosts. 1774 1775 @returns List of hostnames that all have the same host attribute and 1776 value. 1777 """ 1778 rows = models.HostAttribute.query_objects({'attribute': attribute, 1779 'value': value}) 1780 if RESPECT_STATIC_ATTRIBUTES: 1781 returned_hosts = set() 1782 # Add hosts: 1783 # * Non-valid 1784 # * Exist in afe_host_attribute with given attribute. 1785 # * Don't exist in afe_static_host_attribute OR exist in 1786 # afe_static_host_attribute with the same given value. 1787 for row in rows: 1788 if row.host.invalid != 0: 1789 continue 1790 1791 static_hosts = models.StaticHostAttribute.query_objects( 1792 {'host_id': row.host.id, 'attribute': attribute}) 1793 values = [static_host.value for static_host in static_hosts] 1794 if len(values) == 0 or values[0] == value: 1795 returned_hosts.add(row.host.hostname) 1796 1797 # Add hosts: 1798 # * Non-valid 1799 # * Exist in afe_static_host_attribute with given attribute 1800 # and value 1801 # * No need to check whether each static attribute has its 1802 # corresponding entry in afe_host_attribute since it is ensured 1803 # in inventory sync. 1804 static_rows = models.StaticHostAttribute.query_objects( 1805 {'attribute': attribute, 'value': value}) 1806 for row in static_rows: 1807 if row.host.invalid != 0: 1808 continue 1809 1810 returned_hosts.add(row.host.hostname) 1811 1812 return list(returned_hosts) 1813 else: 1814 return [row.host.hostname for row in rows if row.host.invalid == 0] 1815 1816 1817def _get_control_file_by_suite(suite_name): 1818 """Get control file contents by suite name. 1819 1820 @param suite_name: Suite name as string. 1821 @returns: Control file contents as string. 1822 """ 1823 getter = control_file_getter.FileSystemGetter( 1824 [_CONFIG.get_config_value('SCHEDULER', 1825 'drone_installation_directory')]) 1826 return getter.get_control_file_contents_by_name(suite_name) 1827 1828 1829@rpc_utils.route_rpc_to_master 1830def create_suite_job( 1831 name='', 1832 board='', 1833 pool='', 1834 child_dependencies=(), 1835 control_file='', 1836 check_hosts=True, 1837 num=None, 1838 file_bugs=False, 1839 timeout=24, 1840 timeout_mins=None, 1841 priority=priorities.Priority.DEFAULT, 1842 suite_args=None, 1843 wait_for_results=True, 1844 job_retry=False, 1845 max_retries=None, 1846 max_runtime_mins=None, 1847 suite_min_duts=0, 1848 offload_failures_only=False, 1849 builds=None, 1850 test_source_build=None, 1851 run_prod_code=False, 1852 delay_minutes=0, 1853 is_cloning=False, 1854 job_keyvals=None, 1855 test_args=None, 1856 **kwargs): 1857 """ 1858 Create a job to run a test suite on the given device with the given image. 1859 1860 When the timeout specified in the control file is reached, the 1861 job is guaranteed to have completed and results will be available. 1862 1863 @param name: The test name if control_file is supplied, otherwise the name 1864 of the test suite to run, e.g. 'bvt'. 1865 @param board: the kind of device to run the tests on. 1866 @param builds: the builds to install e.g. 1867 {'cros-version:': 'x86-alex-release/R18-1655.0.0', 1868 'fwrw-version:': 'x86-alex-firmware/R36-5771.50.0', 1869 'fwro-version:': 'x86-alex-firmware/R36-5771.49.0'} 1870 If builds is given a value, it overrides argument build. 1871 @param test_source_build: Build that contains the server-side test code. 1872 @param pool: Specify the pool of machines to use for scheduling 1873 purposes. 1874 @param child_dependencies: (optional) list of additional dependency labels 1875 (strings) that will be added as dependency labels to child jobs. 1876 @param control_file: the control file of the job. 1877 @param check_hosts: require appropriate live hosts to exist in the lab. 1878 @param num: Specify the number of machines to schedule across (integer). 1879 Leave unspecified or use None to use default sharding factor. 1880 @param file_bugs: File a bug on each test failure in this suite. 1881 @param timeout: The max lifetime of this suite, in hours. 1882 @param timeout_mins: The max lifetime of this suite, in minutes. Takes 1883 priority over timeout. 1884 @param priority: Integer denoting priority. Higher is more important. 1885 @param suite_args: Optional arguments which will be parsed by the suite 1886 control file. Used by control.test_that_wrapper to 1887 determine which tests to run. 1888 @param wait_for_results: Set to False to run the suite job without waiting 1889 for test jobs to finish. Default is True. 1890 @param job_retry: Set to True to enable job-level retry. Default is False. 1891 @param max_retries: Integer, maximum job retries allowed at suite level. 1892 None for no max. 1893 @param max_runtime_mins: Maximum amount of time a job can be running in 1894 minutes. 1895 @param suite_min_duts: Integer. Scheduler will prioritize getting the 1896 minimum number of machines for the suite when it is 1897 competing with another suite that has a higher 1898 priority but already got minimum machines it needs. 1899 @param offload_failures_only: Only enable gs_offloading for failed jobs. 1900 @param run_prod_code: If True, the suite will run the test code that 1901 lives in prod aka the test code currently on the 1902 lab servers. If False, the control files and test 1903 code for this suite run will be retrieved from the 1904 build artifacts. 1905 @param delay_minutes: Delay the creation of test jobs for a given number of 1906 minutes. 1907 @param is_cloning: True if creating a cloning job. 1908 @param job_keyvals: A dict of job keyvals to be inject to control file. 1909 @param test_args: A dict of args passed all the way to each individual test 1910 that will be actually run. 1911 @param kwargs: extra keyword args. NOT USED. 1912 1913 @raises ControlFileNotFound: if a unique suite control file doesn't exist. 1914 @raises NoControlFileList: if we can't list the control files at all. 1915 @raises StageControlFileFailure: If the dev server throws 500 while 1916 staging test_suites. 1917 @raises ControlFileEmpty: if the control file exists on the server, but 1918 can't be read. 1919 1920 @return: the job ID of the suite; -1 on error. 1921 """ 1922 if num is not None: 1923 warnings.warn('num is deprecated for create_suite_job') 1924 del num 1925 1926 if builds is None: 1927 builds = {} 1928 1929 # Default test source build to CrOS build if it's not specified and 1930 # run_prod_code is set to False. 1931 if not run_prod_code: 1932 test_source_build = Suite.get_test_source_build( 1933 builds, test_source_build=test_source_build) 1934 1935 sample_dut = rpc_utils.get_sample_dut(board, pool) 1936 1937 suite_name = suite_common.canonicalize_suite_name(name) 1938 if run_prod_code: 1939 ds = dev_server.resolve(test_source_build, hostname=sample_dut) 1940 keyvals = {} 1941 else: 1942 ds, keyvals = suite_common.stage_build_artifacts( 1943 test_source_build, hostname=sample_dut) 1944 keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts 1945 1946 # Do not change this naming convention without updating 1947 # site_utils.parse_job_name. 1948 if run_prod_code: 1949 # If run_prod_code is True, test_source_build is not set, use the 1950 # first build in the builds list for the sutie job name. 1951 name = '%s-%s' % (builds.values()[0], suite_name) 1952 else: 1953 name = '%s-%s' % (test_source_build, suite_name) 1954 1955 timeout_mins = timeout_mins or timeout * 60 1956 max_runtime_mins = max_runtime_mins or timeout * 60 1957 1958 if not board: 1959 board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0] 1960 1961 if run_prod_code: 1962 control_file = _get_control_file_by_suite(suite_name) 1963 1964 if not control_file: 1965 # No control file was supplied so look it up from the build artifacts. 1966 control_file = suite_common.get_control_file_by_build( 1967 test_source_build, ds, suite_name) 1968 1969 # Prepend builds and board to the control file. 1970 if is_cloning: 1971 control_file = tools.remove_injection(control_file) 1972 1973 if suite_args is None: 1974 suite_args = dict() 1975 1976 inject_dict = { 1977 'board': board, 1978 # `build` is needed for suites like AU to stage image inside suite 1979 # control file. 1980 'build': test_source_build, 1981 'builds': builds, 1982 'check_hosts': check_hosts, 1983 'pool': pool, 1984 'child_dependencies': child_dependencies, 1985 'file_bugs': file_bugs, 1986 'timeout': timeout, 1987 'timeout_mins': timeout_mins, 1988 'devserver_url': ds.url(), 1989 'priority': priority, 1990 'wait_for_results': wait_for_results, 1991 'job_retry': job_retry, 1992 'max_retries': max_retries, 1993 'max_runtime_mins': max_runtime_mins, 1994 'offload_failures_only': offload_failures_only, 1995 'test_source_build': test_source_build, 1996 'run_prod_code': run_prod_code, 1997 'delay_minutes': delay_minutes, 1998 'job_keyvals': job_keyvals, 1999 'test_args': test_args, 2000 } 2001 inject_dict.update(suite_args) 2002 control_file = tools.inject_vars(inject_dict, control_file) 2003 2004 return rpc_utils.create_job_common(name, 2005 priority=priority, 2006 timeout_mins=timeout_mins, 2007 max_runtime_mins=max_runtime_mins, 2008 control_type='Server', 2009 control_file=control_file, 2010 hostless=True, 2011 keyvals=keyvals) 2012 2013 2014def get_job_history(**filter_data): 2015 """Get history of the job, including the special tasks executed for the job 2016 2017 @param filter_data: filter for the call, should at least include 2018 {'job_id': [job id]} 2019 @returns: JSON string of the job's history, including the information such 2020 as the hosts run the job and the special tasks executed before 2021 and after the job. 2022 """ 2023 job_id = filter_data['job_id'] 2024 job_info = job_history.get_job_info(job_id) 2025 return rpc_utils.prepare_for_serialization(job_info.get_history()) 2026 2027 2028def get_host_history(start_time, end_time, hosts=None, board=None, pool=None): 2029 """Deprecated.""" 2030 raise ValueError('get_host_history rpc is deprecated ' 2031 'and no longer implemented.') 2032 2033 2034def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(), 2035 known_host_ids=(), known_host_statuses=()): 2036 """Receive updates for job statuses from shards and assign hosts and jobs. 2037 2038 @param shard_hostname: Hostname of the calling shard 2039 @param jobs: Jobs in serialized form that should be updated with newer 2040 status from a shard. 2041 @param hqes: Hostqueueentries in serialized form that should be updated with 2042 newer status from a shard. Note that for every hostqueueentry 2043 the corresponding job must be in jobs. 2044 @param known_job_ids: List of ids of jobs the shard already has. 2045 @param known_host_ids: List of ids of hosts the shard already has. 2046 @param known_host_statuses: List of statuses of hosts the shard already has. 2047 2048 @returns: Serialized representations of hosts, jobs, suite job keyvals 2049 and their dependencies to be inserted into a shard's database. 2050 """ 2051 # The following alternatives to sending host and job ids in every heartbeat 2052 # have been considered: 2053 # 1. Sending the highest known job and host ids. This would work for jobs: 2054 # Newer jobs always have larger ids. Also, if a job is not assigned to a 2055 # particular shard during a heartbeat, it never will be assigned to this 2056 # shard later. 2057 # This is not true for hosts though: A host that is leased won't be sent 2058 # to the shard now, but might be sent in a future heartbeat. This means 2059 # sometimes hosts should be transfered that have a lower id than the 2060 # maximum host id the shard knows. 2061 # 2. Send the number of jobs/hosts the shard knows to the master in each 2062 # heartbeat. Compare these to the number of records that already have 2063 # the shard_id set to this shard. In the normal case, they should match. 2064 # In case they don't, resend all entities of that type. 2065 # This would work well for hosts, because there aren't that many. 2066 # Resending all jobs is quite a big overhead though. 2067 # Also, this approach might run into edge cases when entities are 2068 # ever deleted. 2069 # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts. 2070 # Using two different approaches isn't consistent and might cause 2071 # confusion. Also the issues with the case of deletions might still 2072 # occur. 2073 # 2074 # The overhead of sending all job and host ids in every heartbeat is low: 2075 # At peaks one board has about 1200 created but unfinished jobs. 2076 # See the numbers here: http://goo.gl/gQCGWH 2077 # Assuming that job id's have 6 digits and that json serialization takes a 2078 # comma and a space as overhead, the traffic per id sent is about 8 bytes. 2079 # If 5000 ids need to be sent, this means 40 kilobytes of traffic. 2080 # A NOT IN query with 5000 ids took about 30ms in tests made. 2081 # These numbers seem low enough to outweigh the disadvantages of the 2082 # solutions described above. 2083 shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname) 2084 rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes) 2085 assert len(known_host_ids) == len(known_host_statuses) 2086 for i in range(len(known_host_ids)): 2087 host_model = models.Host.objects.get(pk=known_host_ids[i]) 2088 if host_model.status != known_host_statuses[i]: 2089 host_model.status = known_host_statuses[i] 2090 host_model.save() 2091 2092 hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard( 2093 shard_obj, known_job_ids=known_job_ids, 2094 known_host_ids=known_host_ids) 2095 return { 2096 'hosts': [host.serialize() for host in hosts], 2097 'jobs': [job.serialize() for job in jobs], 2098 'suite_keyvals': [kv.serialize() for kv in suite_keyvals], 2099 'incorrect_host_ids': [int(i) for i in inc_ids], 2100 } 2101 2102 2103def get_shards(**filter_data): 2104 """Return a list of all shards. 2105 2106 @returns A sequence of nested dictionaries of shard information. 2107 """ 2108 shards = models.Shard.query_objects(filter_data) 2109 serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ()) 2110 for serialized, shard in zip(serialized_shards, shards): 2111 serialized['labels'] = [label.name for label in shard.labels.all()] 2112 2113 return serialized_shards 2114 2115 2116def _assign_board_to_shard_precheck(labels): 2117 """Verify whether board labels are valid to be added to a given shard. 2118 2119 First check whether board label is in correct format. Second, check whether 2120 the board label exist. Third, check whether the board has already been 2121 assigned to shard. 2122 2123 @param labels: Board labels separated by comma. 2124 2125 @raises error.RPCException: If label provided doesn't start with `board:` 2126 or board has been added to shard already. 2127 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2128 2129 @returns: A list of label models that ready to be added to shard. 2130 """ 2131 if not labels: 2132 # allow creation of label-less shards (labels='' would otherwise fail the 2133 # checks below) 2134 return [] 2135 labels = labels.split(',') 2136 label_models = [] 2137 for label in labels: 2138 # Check whether the board label is in correct format. 2139 if not label.startswith('board:'): 2140 raise error.RPCException('Sharding only supports `board:.*` label.') 2141 # Check whether the board label exist. If not, exception will be thrown 2142 # by smart_get function. 2143 label = models.Label.smart_get(label) 2144 # Check whether the board has been sharded already 2145 try: 2146 shard = models.Shard.objects.get(labels=label) 2147 raise error.RPCException( 2148 '%s is already on shard %s' % (label, shard.hostname)) 2149 except models.Shard.DoesNotExist: 2150 # board is not on any shard, so it's valid. 2151 label_models.append(label) 2152 return label_models 2153 2154 2155def add_shard(hostname, labels): 2156 """Add a shard and start running jobs on it. 2157 2158 @param hostname: The hostname of the shard to be added; needs to be unique. 2159 @param labels: Board labels separated by comma. Jobs of one of the labels 2160 will be assigned to the shard. 2161 2162 @raises error.RPCException: If label provided doesn't start with `board:` or 2163 board has been added to shard already. 2164 @raises model_logic.ValidationError: If a shard with the given hostname 2165 already exist. 2166 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2167 2168 @returns: The id of the added shard. 2169 """ 2170 labels = _assign_board_to_shard_precheck(labels) 2171 shard = models.Shard.add_object(hostname=hostname) 2172 for label in labels: 2173 shard.labels.add(label) 2174 return shard.id 2175 2176 2177def add_board_to_shard(hostname, labels): 2178 """Add boards to a given shard 2179 2180 @param hostname: The hostname of the shard to be changed. 2181 @param labels: Board labels separated by comma. 2182 2183 @raises error.RPCException: If label provided doesn't start with `board:` or 2184 board has been added to shard already. 2185 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2186 2187 @returns: The id of the changed shard. 2188 """ 2189 labels = _assign_board_to_shard_precheck(labels) 2190 shard = models.Shard.objects.get(hostname=hostname) 2191 for label in labels: 2192 shard.labels.add(label) 2193 return shard.id 2194 2195 2196# Remove board RPCs are rare, so we can afford to make them a bit more 2197# expensive (by performing in a transaction) in order to guarantee 2198# atomicity. 2199# TODO(akeshet): If we ever update to newer version of django, we need to 2200# migrate to transaction.atomic instead of commit_on_success 2201@transaction.commit_on_success 2202def remove_board_from_shard(hostname, label): 2203 """Remove board from the given shard. 2204 @param hostname: The hostname of the shard to be changed. 2205 @param labels: Board label. 2206 2207 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2208 2209 @returns: The id of the changed shard. 2210 """ 2211 shard = models.Shard.objects.get(hostname=hostname) 2212 label = models.Label.smart_get(label) 2213 if label not in shard.labels.all(): 2214 raise error.RPCException( 2215 'Cannot remove label from shard that does not belong to it.') 2216 2217 shard.labels.remove(label) 2218 if label.is_replaced_by_static(): 2219 static_label = models.StaticLabel.smart_get(label.name) 2220 models.Host.objects.filter( 2221 static_labels__in=[static_label]).update(shard=None) 2222 else: 2223 models.Host.objects.filter(labels__in=[label]).update(shard=None) 2224 2225 2226def delete_shard(hostname): 2227 """Delete a shard and reclaim all resources from it. 2228 2229 This claims back all assigned hosts from the shard. 2230 """ 2231 shard = rpc_utils.retrieve_shard(shard_hostname=hostname) 2232 2233 # Remove shard information. 2234 models.Host.objects.filter(shard=shard).update(shard=None) 2235 2236 # Note: The original job-cleanup query was performed with django call 2237 # models.Job.objects.filter(shard=shard).update(shard=None) 2238 # 2239 # But that started becoming unreliable due to the large size of afe_jobs. 2240 # 2241 # We don't need atomicity here, so the new cleanup method is iterative, in 2242 # chunks of 100k jobs. 2243 QUERY = ('UPDATE afe_jobs SET shard_id = NULL WHERE shard_id = %s ' 2244 'LIMIT 100000') 2245 try: 2246 with contextlib.closing(db_connection.cursor()) as cursor: 2247 clear_jobs = True 2248 assert shard.id is not None 2249 while clear_jobs: 2250 cursor.execute(QUERY % shard.id) 2251 clear_jobs = bool(cursor.fetchone()) 2252 # Unit tests use sqlite backend instead of MySQL. sqlite does not support 2253 # UPDATE ... LIMIT, so fall back to the old behavior. 2254 except DatabaseError as e: 2255 if 'syntax error' in str(e): 2256 models.Job.objects.filter(shard=shard).update(shard=None) 2257 else: 2258 raise 2259 2260 shard.labels.clear() 2261 shard.delete() 2262 2263 2264def get_servers(hostname=None, role=None, status=None): 2265 """Get a list of servers with matching role and status. 2266 2267 @param hostname: FQDN of the server. 2268 @param role: Name of the server role, e.g., drone, scheduler. Default to 2269 None to match any role. 2270 @param status: Status of the server, e.g., primary, backup, repair_required. 2271 Default to None to match any server status. 2272 2273 @raises error.RPCException: If server database is not used. 2274 @return: A list of server names for servers with matching role and status. 2275 """ 2276 if not server_manager_utils.use_server_db(): 2277 raise error.RPCException('Server database is not enabled. Please try ' 2278 'retrieve servers from global config.') 2279 servers = server_manager_utils.get_servers(hostname=hostname, role=role, 2280 status=status) 2281 return [s.get_details() for s in servers] 2282 2283 2284@rpc_utils.route_rpc_to_master 2285def get_stable_version(board=stable_version_utils.DEFAULT, android=False): 2286 """Get stable version for the given board. 2287 2288 @param board: Name of the board. 2289 @param android: Unused legacy parameter. This is maintained for the 2290 sake of clients on old branches that still pass the 2291 parameter. TODO(jrbarnette) Remove this completely once R68 2292 drops off stable. 2293 2294 @return: Stable version of the given board. Return global configure value 2295 of CROS.stable_cros_version if stable_versinos table does not have 2296 entry of board DEFAULT. 2297 """ 2298 assert not android, 'get_stable_version no longer supports `android`.' 2299 return stable_version_utils.get(board=board) 2300 2301 2302@rpc_utils.route_rpc_to_master 2303def get_all_stable_versions(): 2304 """Get stable versions for all boards. 2305 2306 @return: A dictionary of board:version. 2307 """ 2308 return stable_version_utils.get_all() 2309 2310 2311@rpc_utils.route_rpc_to_master 2312def set_stable_version(version, board=stable_version_utils.DEFAULT): 2313 """Modify stable version for the given board. 2314 2315 @param version: The new value of stable version for given board. 2316 @param board: Name of the board, default to value `DEFAULT`. 2317 """ 2318 stable_version_utils.set(version=version, board=board) 2319 2320 2321@rpc_utils.route_rpc_to_master 2322def delete_stable_version(board): 2323 """Modify stable version for the given board. 2324 2325 Delete a stable version entry in afe_stable_versions table for a given 2326 board, so default stable version will be used. 2327 2328 @param board: Name of the board. 2329 """ 2330 stable_version_utils.delete(board=board) 2331 2332 2333def get_tests_by_build(build, ignore_invalid_tests=True): 2334 """Get the tests that are available for the specified build. 2335 2336 @param build: unique name by which to refer to the image. 2337 @param ignore_invalid_tests: flag on if unparsable tests are ignored. 2338 2339 @return: A sorted list of all tests that are in the build specified. 2340 """ 2341 # Collect the control files specified in this build 2342 cfile_getter = control_file_lib._initialize_control_file_getter(build) 2343 if suite_common.ENABLE_CONTROLS_IN_BATCH: 2344 control_file_info_list = cfile_getter.get_suite_info() 2345 control_file_list = control_file_info_list.keys() 2346 else: 2347 control_file_list = cfile_getter.get_control_file_list() 2348 2349 test_objects = [] 2350 _id = 0 2351 for control_file_path in control_file_list: 2352 # Read and parse the control file 2353 if suite_common.ENABLE_CONTROLS_IN_BATCH: 2354 control_file = control_file_info_list[control_file_path] 2355 else: 2356 control_file = cfile_getter.get_control_file_contents( 2357 control_file_path) 2358 try: 2359 control_obj = control_data.parse_control_string(control_file) 2360 except: 2361 logging.info('Failed to parse control file: %s', control_file_path) 2362 if not ignore_invalid_tests: 2363 raise 2364 2365 # Extract the values needed for the AFE from the control_obj. 2366 # The keys list represents attributes in the control_obj that 2367 # are required by the AFE 2368 keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental', 2369 'test_category', 'test_class', 'dependencies', 'run_verify', 2370 'sync_count', 'job_retries', 'path'] 2371 2372 test_object = {} 2373 for key in keys: 2374 test_object[key] = getattr(control_obj, key) if hasattr( 2375 control_obj, key) else '' 2376 2377 # Unfortunately, the AFE expects different key-names for certain 2378 # values, these must be corrected to avoid the risk of tests 2379 # being omitted by the AFE. 2380 # The 'id' is an additional value used in the AFE. 2381 # The control_data parsing does not reference 'run_reset', but it 2382 # is also used in the AFE and defaults to True. 2383 test_object['id'] = _id 2384 test_object['run_reset'] = True 2385 test_object['description'] = test_object.get('doc', '') 2386 test_object['test_time'] = test_object.get('time', 0) 2387 2388 # TODO(crbug.com/873716) DEPRECATED. Remove entirely. 2389 test_object['test_retry'] = 0 2390 2391 # Fix the test name to be consistent with the current presentation 2392 # of test names in the AFE. 2393 testpath, subname = os.path.split(control_file_path) 2394 testname = os.path.basename(testpath) 2395 subname = subname.split('.')[1:] 2396 if subname: 2397 testname = '%s:%s' % (testname, ':'.join(subname)) 2398 2399 test_object['name'] = testname 2400 2401 # Correct the test path as parse_control_string sets an empty string. 2402 test_object['path'] = control_file_path 2403 2404 _id += 1 2405 test_objects.append(test_object) 2406 2407 test_objects = sorted(test_objects, key=lambda x: x.get('name')) 2408 return rpc_utils.prepare_for_serialization(test_objects) 2409 2410 2411@rpc_utils.route_rpc_to_master 2412def get_lab_health_indicators(board=None): 2413 """Get the healthy indicators for whole lab. 2414 2415 The indicators now includes: 2416 1. lab is closed or not. 2417 2. Available DUTs list for a given board. 2418 3. Devserver capacity. 2419 4. When is the next major DUT utilization (e.g. CQ is coming in 3 minutes). 2420 2421 @param board: if board is specified, a list of available DUTs will be 2422 returned for it. Otherwise, skip this indicator. 2423 2424 @returns: A healthy indicator object including health info. 2425 """ 2426 return LabHealthIndicator(None, None, None, None) 2427