1# pylint: disable=missing-docstring 2 3"""\ 4Functions to expose over the RPC interface. 5 6For all modify* and delete* functions that ask for an 'id' parameter to 7identify the object to operate on, the id may be either 8 * the database row ID 9 * the name of the object (label name, hostname, user login, etc.) 10 * a dictionary containing uniquely identifying field (this option should seldom 11 be used) 12 13When specifying foreign key fields (i.e. adding hosts to a label, or adding 14users to an ACL group), the given value may be either the database row ID or the 15name of the object. 16 17All get* functions return lists of dictionaries. Each dictionary represents one 18object and maps field names to values. 19 20Some examples: 21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2 22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2' 23modify_test('sleeptest', test_type='Client', params=', seconds=60') 24delete_acl_group(1) # delete by ID 25delete_acl_group('Everyone') # delete by name 26acl_group_add_users('Everyone', ['mbligh', 'showard']) 27get_jobs(owner='showard', status='Queued') 28 29See doctests/001_rpc_test.txt for (lots) more examples. 30""" 31 32__author__ = 'showard@google.com (Steve Howard)' 33 34import ast 35import collections 36import contextlib 37import datetime 38import logging 39import os 40import sys 41import warnings 42 43import six 44from autotest_lib.client.common_lib import (control_data, error, global_config, 45 priorities) 46from autotest_lib.client.common_lib.cros import dev_server 47from autotest_lib.frontend.afe import control_file as control_file_lib 48from autotest_lib.frontend.afe import (model_attributes, model_logic, models, 49 rpc_utils) 50from autotest_lib.frontend.tko import models as tko_models 51from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface 52from autotest_lib.server import frontend, utils 53from autotest_lib.server.cros import provision 54from autotest_lib.server.cros.dynamic_suite import (constants, 55 control_file_getter, 56 suite_common, tools) 57from autotest_lib.server.cros.dynamic_suite.suite import Suite 58from autotest_lib.server.lib import status_history 59from autotest_lib.site_utils import job_history, stable_version_utils 60from django.db import connection as db_connection 61from django.db import transaction 62from django.db.models import Count 63from django.db.utils import DatabaseError 64 65import common 66 67_CONFIG = global_config.global_config 68 69# Definition of LabHealthIndicator 70LabHealthIndicator = collections.namedtuple( 71 'LabHealthIndicator', 72 [ 73 'if_lab_close', 74 'available_duts', 75 'devserver_health', 76 'upcoming_builds', 77 ] 78) 79 80RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 81 'SKYLAB', 'respect_static_labels', type=bool, default=False) 82 83RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value( 84 'SKYLAB', 'respect_static_attributes', type=bool, default=False) 85 86# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py. 87 88# labels 89 90def modify_label(id, **data): 91 """Modify a label. 92 93 @param id: id or name of a label. More often a label name. 94 @param data: New data for a label. 95 """ 96 label_model = models.Label.smart_get(id) 97 if label_model.is_replaced_by_static(): 98 raise error.UnmodifiableLabelException( 99 'Failed to delete label "%s" because it is a static label. ' 100 'Use go/chromeos-skylab-inventory-tools to modify this ' 101 'label.' % label_model.name) 102 103 label_model.update_object(data) 104 105 # Main forwards the RPC to shards 106 if not utils.is_shard(): 107 rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False, 108 id=id, **data) 109 110 111def delete_label(id): 112 """Delete a label. 113 114 @param id: id or name of a label. More often a label name. 115 """ 116 label_model = models.Label.smart_get(id) 117 if label_model.is_replaced_by_static(): 118 raise error.UnmodifiableLabelException( 119 'Failed to delete label "%s" because it is a static label. ' 120 'Use go/chromeos-skylab-inventory-tools to modify this ' 121 'label.' % label_model.name) 122 123 # Hosts that have the label to be deleted. Save this info before 124 # the label is deleted to use it later. 125 hosts = [] 126 for h in label_model.host_set.all(): 127 hosts.append(models.Host.smart_get(h.id)) 128 label_model.delete() 129 130 # Main forwards the RPC to shards 131 if not utils.is_shard(): 132 rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id) 133 134 135def add_label(name, ignore_exception_if_exists=False, **kwargs): 136 """Adds a new label of a given name. 137 138 @param name: label name. 139 @param ignore_exception_if_exists: If True and the exception was 140 thrown due to the duplicated label name when adding a label, 141 then suppress the exception. Default is False. 142 @param kwargs: keyword args that store more info about a label 143 other than the name. 144 @return: int/long id of a new label. 145 """ 146 # models.Label.add_object() throws model_logic.ValidationError 147 # when it is given a label name that already exists. 148 # However, ValidationError can be thrown with different errors, 149 # and those errors should be thrown up to the call chain. 150 try: 151 label = models.Label.add_object(name=name, **kwargs) 152 except: 153 exc_info = sys.exc_info() 154 if ignore_exception_if_exists: 155 label = rpc_utils.get_label(name) 156 # If the exception is raised not because of duplicated 157 # "name", then raise the original exception. 158 if label is None: 159 six.reraise(exc_info[0], exc_info[1], exc_info[2]) 160 else: 161 six.reraise(exc_info[0], exc_info[1], exc_info[2]) 162 return label.id 163 164 165def add_label_to_hosts(id, hosts): 166 """Adds a label of the given id to the given hosts only in local DB. 167 168 @param id: id or name of a label. More often a label name. 169 @param hosts: The hostnames of hosts that need the label. 170 171 @raises models.Label.DoesNotExist: If the label with id doesn't exist. 172 """ 173 label = models.Label.smart_get(id) 174 if label.is_replaced_by_static(): 175 label = models.StaticLabel.smart_get(label.name) 176 177 host_objs = models.Host.smart_get_bulk(hosts) 178 if label.platform: 179 models.Host.check_no_platform(host_objs) 180 # Ensure a host has no more than one board label with it. 181 if label.name.startswith('board:'): 182 models.Host.check_board_labels_allowed(host_objs, [label.name]) 183 label.host_set.add(*host_objs) 184 185 186def _create_label_everywhere(id, hosts): 187 """ 188 Yet another method to create labels. 189 190 ALERT! This method should be run only on main not shards! 191 DO NOT RUN THIS ON A SHARD!!! Deputies will hate you if you do!!! 192 193 This method exists primarily to serve label_add_hosts() and 194 host_add_labels(). Basically it pulls out the label check/add logic 195 from label_add_hosts() into this nice method that not only creates 196 the label but also tells the shards that service the hosts to also 197 create the label. 198 199 @param id: id or name of a label. More often a label name. 200 @param hosts: A list of hostnames or ids. More often hostnames. 201 """ 202 try: 203 label = models.Label.smart_get(id) 204 except models.Label.DoesNotExist: 205 # This matches the type checks in smart_get, which is a hack 206 # in and off itself. The aim here is to create any non-existent 207 # label, which we cannot do if the 'id' specified isn't a label name. 208 if isinstance(id, six.string_types): 209 label = models.Label.smart_get(add_label(id)) 210 else: 211 raise ValueError('Label id (%s) does not exist. Please specify ' 212 'the argument, id, as a string (label name).' 213 % id) 214 215 # Make sure the label exists on the shard with the same id 216 # as it is on the main. 217 # It is possible that the label is already in a shard because 218 # we are adding a new label only to shards of hosts that the label 219 # is going to be attached. 220 # For example, we add a label L1 to a host in shard S1. 221 # Main and S1 will have L1 but other shards won't. 222 # Later, when we add the same label L1 to hosts in shards S1 and S2, 223 # S1 already has the label but S2 doesn't. 224 # S2 should have the new label without any problem. 225 # We ignore exception in such a case. 226 host_objs = models.Host.smart_get_bulk(hosts) 227 rpc_utils.fanout_rpc( 228 host_objs, 'add_label', include_hostnames=False, 229 name=label.name, ignore_exception_if_exists=True, 230 id=label.id, platform=label.platform) 231 232 233@rpc_utils.route_rpc_to_main 234def label_add_hosts(id, hosts): 235 """Adds a label with the given id to the given hosts. 236 237 This method should be run only on main not shards. 238 The given label will be created if it doesn't exist, provided the `id` 239 supplied is a label name not an int/long id. 240 241 @param id: id or name of a label. More often a label name. 242 @param hosts: A list of hostnames or ids. More often hostnames. 243 244 @raises ValueError: If the id specified is an int/long (label id) 245 while the label does not exist. 246 """ 247 # Create the label. 248 _create_label_everywhere(id, hosts) 249 250 # Add it to the main. 251 add_label_to_hosts(id, hosts) 252 253 # Add it to the shards. 254 host_objs = models.Host.smart_get_bulk(hosts) 255 rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id) 256 257 258def remove_label_from_hosts(id, hosts): 259 """Removes a label of the given id from the given hosts only in local DB. 260 261 @param id: id or name of a label. 262 @param hosts: The hostnames of hosts that need to remove the label from. 263 """ 264 host_objs = models.Host.smart_get_bulk(hosts) 265 label = models.Label.smart_get(id) 266 if label.is_replaced_by_static(): 267 raise error.UnmodifiableLabelException( 268 'Failed to remove label "%s" for hosts "%r" because it is a ' 269 'static label. Use go/chromeos-skylab-inventory-tools to ' 270 'modify this label.' % (label.name, hosts)) 271 272 label.host_set.remove(*host_objs) 273 274 275@rpc_utils.route_rpc_to_main 276def label_remove_hosts(id, hosts): 277 """Removes a label of the given id from the given hosts. 278 279 This method should be run only on main not shards. 280 281 @param id: id or name of a label. 282 @param hosts: A list of hostnames or ids. More often hostnames. 283 """ 284 host_objs = models.Host.smart_get_bulk(hosts) 285 remove_label_from_hosts(id, hosts) 286 287 rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id) 288 289 290def get_labels(exclude_filters=(), **filter_data): 291 """\ 292 @param exclude_filters: A sequence of dictionaries of filters. 293 294 @returns A sequence of nested dictionaries of label information. 295 """ 296 labels = models.Label.query_objects(filter_data) 297 for exclude_filter in exclude_filters: 298 labels = labels.exclude(**exclude_filter) 299 300 if not RESPECT_STATIC_LABELS: 301 return rpc_utils.prepare_rows_as_nested_dicts(labels, ()) 302 303 static_labels = models.StaticLabel.query_objects(filter_data) 304 for exclude_filter in exclude_filters: 305 static_labels = static_labels.exclude(**exclude_filter) 306 307 non_static_lists = rpc_utils.prepare_rows_as_nested_dicts(labels, ()) 308 static_lists = rpc_utils.prepare_rows_as_nested_dicts(static_labels, ()) 309 310 label_ids = [label.id for label in labels] 311 replaced = models.ReplacedLabel.objects.filter(label__id__in=label_ids) 312 replaced_ids = {r.label_id for r in replaced} 313 replaced_label_names = {l.name for l in labels if l.id in replaced_ids} 314 315 return_lists = [] 316 for non_static_label in non_static_lists: 317 if non_static_label.get('id') not in replaced_ids: 318 return_lists.append(non_static_label) 319 320 for static_label in static_lists: 321 if static_label.get('name') in replaced_label_names: 322 return_lists.append(static_label) 323 324 return return_lists 325 326 327# hosts 328 329def add_host(hostname, status=None, locked=None, lock_reason='', protection=None): 330 if locked and not lock_reason: 331 raise model_logic.ValidationError( 332 {'locked': 'Please provide a reason for locking when adding host.'}) 333 334 return models.Host.add_object(hostname=hostname, status=status, 335 locked=locked, lock_reason=lock_reason, 336 protection=protection).id 337 338 339@rpc_utils.route_rpc_to_main 340def modify_host(id, **kwargs): 341 """Modify local attributes of a host. 342 343 If this is called on the main, but the host is assigned to a shard, this 344 will call `modify_host_local` RPC to the responsible shard. This means if 345 a host is being locked using this function, this change will also propagate 346 to shards. 347 When this is called on a shard, the shard just routes the RPC to the main 348 and does nothing. 349 350 @param id: id of the host to modify. 351 @param kwargs: key=value pairs of values to set on the host. 352 """ 353 rpc_utils.check_modify_host(kwargs) 354 host = models.Host.smart_get(id) 355 try: 356 rpc_utils.check_modify_host_locking(host, kwargs) 357 except model_logic.ValidationError as e: 358 if not kwargs.get('force_modify_locking', False): 359 raise 360 logging.exception('The following exception will be ignored and lock ' 361 'modification will be enforced. %s', e) 362 363 # This is required to make `lock_time` for a host be exactly same 364 # between the main and a shard. 365 if kwargs.get('locked', None) and 'lock_time' not in kwargs: 366 kwargs['lock_time'] = datetime.datetime.now() 367 368 # force_modifying_locking is not an internal field in database, remove. 369 shard_kwargs = dict(kwargs) 370 shard_kwargs.pop('force_modify_locking', None) 371 rpc_utils.fanout_rpc([host], 'modify_host_local', 372 include_hostnames=False, id=id, **shard_kwargs) 373 374 # Update the local DB **after** RPC fanout is complete. 375 # This guarantees that the main state is only updated if the shards were 376 # correctly updated. 377 # In case the shard update fails mid-flight and the main-shard desync, we 378 # always consider the main state to be the source-of-truth, and any 379 # (automated) corrective actions will revert the (partial) shard updates. 380 host.update_object(kwargs) 381 382 383def modify_host_local(id, **kwargs): 384 """Modify host attributes in local DB. 385 386 @param id: Host id. 387 @param kwargs: key=value pairs of values to set on the host. 388 """ 389 models.Host.smart_get(id).update_object(kwargs) 390 391 392@rpc_utils.route_rpc_to_main 393def modify_hosts(host_filter_data, update_data): 394 """Modify local attributes of multiple hosts. 395 396 If this is called on the main, but one of the hosts in that match the 397 filters is assigned to a shard, this will call `modify_hosts_local` RPC 398 to the responsible shard. 399 When this is called on a shard, the shard just routes the RPC to the main 400 and does nothing. 401 402 The filters are always applied on the main, not on the shards. This means 403 if the states of a host differ on the main and a shard, the state on the 404 main will be used. I.e. this means: 405 A host was synced to Shard 1. On Shard 1 the status of the host was set to 406 'Repair Failed'. 407 - A call to modify_hosts with host_filter_data={'status': 'Ready'} will 408 update the host (both on the shard and on the main), because the state 409 of the host as the main knows it is still 'Ready'. 410 - A call to modify_hosts with host_filter_data={'status': 'Repair failed' 411 will not update the host, because the filter doesn't apply on the main. 412 413 @param host_filter_data: Filters out which hosts to modify. 414 @param update_data: A dictionary with the changes to make to the hosts. 415 """ 416 update_data = update_data.copy() 417 rpc_utils.check_modify_host(update_data) 418 hosts = models.Host.query_objects(host_filter_data) 419 420 affected_shard_hostnames = set() 421 affected_host_ids = [] 422 423 # Check all hosts before changing data for exception safety. 424 for host in hosts: 425 try: 426 rpc_utils.check_modify_host_locking(host, update_data) 427 except model_logic.ValidationError as e: 428 if not update_data.get('force_modify_locking', False): 429 raise 430 logging.exception('The following exception will be ignored and ' 431 'lock modification will be enforced. %s', e) 432 433 if host.shard: 434 affected_shard_hostnames.add(host.shard.hostname) 435 affected_host_ids.append(host.id) 436 437 # This is required to make `lock_time` for a host be exactly same 438 # between the main and a shard. 439 if update_data.get('locked', None) and 'lock_time' not in update_data: 440 update_data['lock_time'] = datetime.datetime.now() 441 for host in hosts: 442 host.update_object(update_data) 443 444 update_data.pop('force_modify_locking', None) 445 # Caution: Changing the filter from the original here. See docstring. 446 rpc_utils.run_rpc_on_multiple_hostnames( 447 'modify_hosts_local', affected_shard_hostnames, 448 host_filter_data={'id__in': affected_host_ids}, 449 update_data=update_data) 450 451 452def modify_hosts_local(host_filter_data, update_data): 453 """Modify attributes of hosts in local DB. 454 455 @param host_filter_data: Filters out which hosts to modify. 456 @param update_data: A dictionary with the changes to make to the hosts. 457 """ 458 for host in models.Host.query_objects(host_filter_data): 459 host.update_object(update_data) 460 461 462def add_labels_to_host(id, labels): 463 """Adds labels to a given host only in local DB. 464 465 @param id: id or hostname for a host. 466 @param labels: ids or names for labels. 467 """ 468 label_objs = models.Label.smart_get_bulk(labels) 469 if not RESPECT_STATIC_LABELS: 470 models.Host.smart_get(id).labels.add(*label_objs) 471 else: 472 static_labels, non_static_labels = models.Host.classify_label_objects( 473 label_objs) 474 host = models.Host.smart_get(id) 475 host.static_labels.add(*static_labels) 476 host.labels.add(*non_static_labels) 477 478 479@rpc_utils.route_rpc_to_main 480def host_add_labels(id, labels): 481 """Adds labels to a given host. 482 483 @param id: id or hostname for a host. 484 @param labels: ids or names for labels. 485 486 @raises ValidationError: If adding more than one platform/board label. 487 """ 488 # Create the labels on the main/shards. 489 for label in labels: 490 _create_label_everywhere(label, [id]) 491 492 label_objs = models.Label.smart_get_bulk(labels) 493 494 platforms = [label.name for label in label_objs if label.platform] 495 if len(platforms) > 1: 496 raise model_logic.ValidationError( 497 {'labels': ('Adding more than one platform: %s' % 498 ', '.join(platforms))}) 499 500 host_obj = models.Host.smart_get(id) 501 if platforms: 502 models.Host.check_no_platform([host_obj]) 503 if any(label_name.startswith('board:') for label_name in labels): 504 models.Host.check_board_labels_allowed([host_obj], labels) 505 add_labels_to_host(id, labels) 506 507 rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False, 508 id=id, labels=labels) 509 510 511def remove_labels_from_host(id, labels): 512 """Removes labels from a given host only in local DB. 513 514 @param id: id or hostname for a host. 515 @param labels: ids or names for labels. 516 """ 517 label_objs = models.Label.smart_get_bulk(labels) 518 if not RESPECT_STATIC_LABELS: 519 models.Host.smart_get(id).labels.remove(*label_objs) 520 else: 521 static_labels, non_static_labels = models.Host.classify_label_objects( 522 label_objs) 523 host = models.Host.smart_get(id) 524 host.labels.remove(*non_static_labels) 525 if static_labels: 526 logging.info('Cannot remove labels "%r" for host "%r" due to they ' 527 'are static labels. Use ' 528 'go/chromeos-skylab-inventory-tools to modify these ' 529 'labels.', static_labels, id) 530 531 532@rpc_utils.route_rpc_to_main 533def host_remove_labels(id, labels): 534 """Removes labels from a given host. 535 536 @param id: id or hostname for a host. 537 @param labels: ids or names for labels. 538 """ 539 remove_labels_from_host(id, labels) 540 541 host_obj = models.Host.smart_get(id) 542 rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False, 543 id=id, labels=labels) 544 545 546def get_host_attribute(attribute, **host_filter_data): 547 """ 548 @param attribute: string name of attribute 549 @param host_filter_data: filter data to apply to Hosts to choose hosts to 550 act upon 551 """ 552 hosts = rpc_utils.get_host_query((), False, True, host_filter_data) 553 hosts = list(hosts) 554 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 555 'attribute_list') 556 host_attr_dicts = [] 557 host_objs = [] 558 for host_obj in hosts: 559 for attr_obj in host_obj.attribute_list: 560 if attr_obj.attribute == attribute: 561 host_attr_dicts.append(attr_obj.get_object_dict()) 562 host_objs.append(host_obj) 563 564 if RESPECT_STATIC_ATTRIBUTES: 565 for host_attr, host_obj in zip(host_attr_dicts, host_objs): 566 static_attrs = models.StaticHostAttribute.query_objects( 567 {'host_id': host_obj.id, 'attribute': attribute}) 568 if len(static_attrs) > 0: 569 host_attr['value'] = static_attrs[0].value 570 571 return rpc_utils.prepare_for_serialization(host_attr_dicts) 572 573 574@rpc_utils.route_rpc_to_main 575def set_host_attribute(attribute, value, **host_filter_data): 576 """Set an attribute on hosts. 577 578 This RPC is a shim that forwards calls to main to be handled there. 579 580 @param attribute: string name of attribute 581 @param value: string, or None to delete an attribute 582 @param host_filter_data: filter data to apply to Hosts to choose hosts to 583 act upon 584 """ 585 assert not utils.is_shard() 586 set_host_attribute_impl(attribute, value, **host_filter_data) 587 588 589def set_host_attribute_impl(attribute, value, **host_filter_data): 590 """Set an attribute on hosts. 591 592 *** DO NOT CALL THIS RPC from client code *** 593 This RPC exists for main-shard communication only. 594 Call set_host_attribute instead. 595 596 @param attribute: string name of attribute 597 @param value: string, or None to delete an attribute 598 @param host_filter_data: filter data to apply to Hosts to choose hosts to 599 act upon 600 """ 601 assert host_filter_data # disallow accidental actions on all hosts 602 hosts = models.Host.query_objects(host_filter_data) 603 models.AclGroup.check_for_acl_violation_hosts(hosts) 604 for host in hosts: 605 host.set_or_delete_attribute(attribute, value) 606 607 # Main forwards this RPC to shards. 608 if not utils.is_shard(): 609 rpc_utils.fanout_rpc(hosts, 'set_host_attribute_impl', False, 610 attribute=attribute, value=value, **host_filter_data) 611 612 613@rpc_utils.forward_single_host_rpc_to_shard 614def delete_host(id): 615 models.Host.smart_get(id).delete() 616 617 618def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 619 valid_only=True, include_current_job=False, **filter_data): 620 """Get a list of dictionaries which contains the information of hosts. 621 622 @param multiple_labels: match hosts in all of the labels given. Should 623 be a list of label names. 624 @param exclude_only_if_needed_labels: Deprecated. Raise error if it's True. 625 @param include_current_job: Set to True to include ids of currently running 626 job and special task. 627 """ 628 if exclude_only_if_needed_labels: 629 raise error.RPCException('exclude_only_if_needed_labels is deprecated') 630 631 hosts = rpc_utils.get_host_query(multiple_labels, 632 exclude_only_if_needed_labels, 633 valid_only, filter_data) 634 hosts = list(hosts) 635 models.Host.objects.populate_relationships(hosts, models.Label, 636 'label_list') 637 models.Host.objects.populate_relationships(hosts, models.AclGroup, 638 'acl_list') 639 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 640 'attribute_list') 641 models.Host.objects.populate_relationships(hosts, 642 models.StaticHostAttribute, 643 'staticattribute_list') 644 host_dicts = [] 645 for host_obj in hosts: 646 host_dict = host_obj.get_object_dict() 647 host_dict['acls'] = [acl.name for acl in host_obj.acl_list] 648 host_dict['attributes'] = dict((attribute.attribute, attribute.value) 649 for attribute in host_obj.attribute_list) 650 if RESPECT_STATIC_LABELS: 651 label_list = [] 652 # Only keep static labels which has a corresponding entries in 653 # afe_labels. 654 for label in host_obj.label_list: 655 if label.is_replaced_by_static(): 656 static_label = models.StaticLabel.smart_get(label.name) 657 label_list.append(static_label) 658 else: 659 label_list.append(label) 660 661 host_dict['labels'] = [label.name for label in label_list] 662 host_dict['platform'] = rpc_utils.find_platform( 663 host_obj.hostname, label_list) 664 else: 665 host_dict['labels'] = [label.name for label in host_obj.label_list] 666 host_dict['platform'] = rpc_utils.find_platform( 667 host_obj.hostname, host_obj.label_list) 668 669 if RESPECT_STATIC_ATTRIBUTES: 670 # Overwrite attribute with values in afe_static_host_attributes. 671 for attr in host_obj.staticattribute_list: 672 if attr.attribute in host_dict['attributes']: 673 host_dict['attributes'][attr.attribute] = attr.value 674 675 if include_current_job: 676 host_dict['current_job'] = None 677 host_dict['current_special_task'] = None 678 entries = models.HostQueueEntry.objects.filter( 679 host_id=host_dict['id'], active=True, complete=False) 680 if entries: 681 host_dict['current_job'] = ( 682 entries[0].get_object_dict()['job']) 683 tasks = models.SpecialTask.objects.filter( 684 host_id=host_dict['id'], is_active=True, is_complete=False) 685 if tasks: 686 host_dict['current_special_task'] = ( 687 '%d-%s' % (tasks[0].get_object_dict()['id'], 688 tasks[0].get_object_dict()['task'].lower())) 689 host_dicts.append(host_dict) 690 691 return rpc_utils.prepare_for_serialization(host_dicts) 692 693 694def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 695 valid_only=True, **filter_data): 696 """ 697 Same parameters as get_hosts(). 698 699 @returns The number of matching hosts. 700 """ 701 if exclude_only_if_needed_labels: 702 raise error.RPCException('exclude_only_if_needed_labels is deprecated') 703 704 hosts = rpc_utils.get_host_query(multiple_labels, 705 exclude_only_if_needed_labels, 706 valid_only, filter_data) 707 return len(hosts) 708 709 710# tests 711 712def get_tests(**filter_data): 713 return rpc_utils.prepare_for_serialization( 714 models.Test.list_objects(filter_data)) 715 716 717def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name): 718 """Gets the counts of all passed and failed tests from the matching jobs. 719 720 @param job_name_prefix: Name prefix of the jobs to get the summary 721 from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix 722 matching is case insensitive. 723 @param label_name: Label that must be set in the jobs, e.g., 724 'cros-version:butterfly-release/R40-6457.21.0'. 725 726 @returns A summary of the counts of all the passed and failed tests. 727 """ 728 job_ids = list(models.Job.objects.filter( 729 name__istartswith=job_name_prefix, 730 dependency_labels__name=label_name).values_list( 731 'pk', flat=True)) 732 summary = {'passed': 0, 'failed': 0} 733 if not job_ids: 734 return summary 735 736 counts = (tko_models.TestView.objects.filter( 737 afe_job_id__in=job_ids).exclude( 738 test_name='SERVER_JOB').exclude( 739 test_name__startswith='CLIENT_JOB').values( 740 'status').annotate( 741 count=Count('status'))) 742 for status in counts: 743 if status['status'] == 'GOOD': 744 summary['passed'] += status['count'] 745 else: 746 summary['failed'] += status['count'] 747 return summary 748 749 750# profilers 751 752def add_profiler(name, description=None): 753 return models.Profiler.add_object(name=name, description=description).id 754 755 756def modify_profiler(id, **data): 757 models.Profiler.smart_get(id).update_object(data) 758 759 760def delete_profiler(id): 761 models.Profiler.smart_get(id).delete() 762 763 764def get_profilers(**filter_data): 765 return rpc_utils.prepare_for_serialization( 766 models.Profiler.list_objects(filter_data)) 767 768 769# users 770 771def get_users(**filter_data): 772 return rpc_utils.prepare_for_serialization( 773 models.User.list_objects(filter_data)) 774 775 776# acl groups 777 778def add_acl_group(name, description=None): 779 group = models.AclGroup.add_object(name=name, description=description) 780 group.users.add(models.User.current_user()) 781 return group.id 782 783 784def modify_acl_group(id, **data): 785 group = models.AclGroup.smart_get(id) 786 group.check_for_acl_violation_acl_group() 787 group.update_object(data) 788 group.add_current_user_if_empty() 789 790 791def acl_group_add_users(id, users): 792 group = models.AclGroup.smart_get(id) 793 group.check_for_acl_violation_acl_group() 794 users = models.User.smart_get_bulk(users) 795 group.users.add(*users) 796 797 798def acl_group_remove_users(id, users): 799 group = models.AclGroup.smart_get(id) 800 group.check_for_acl_violation_acl_group() 801 users = models.User.smart_get_bulk(users) 802 group.users.remove(*users) 803 group.add_current_user_if_empty() 804 805 806def acl_group_add_hosts(id, hosts): 807 group = models.AclGroup.smart_get(id) 808 group.check_for_acl_violation_acl_group() 809 hosts = models.Host.smart_get_bulk(hosts) 810 group.hosts.add(*hosts) 811 group.on_host_membership_change() 812 813 814def acl_group_remove_hosts(id, hosts): 815 group = models.AclGroup.smart_get(id) 816 group.check_for_acl_violation_acl_group() 817 hosts = models.Host.smart_get_bulk(hosts) 818 group.hosts.remove(*hosts) 819 group.on_host_membership_change() 820 821 822def delete_acl_group(id): 823 models.AclGroup.smart_get(id).delete() 824 825 826def get_acl_groups(**filter_data): 827 acl_groups = models.AclGroup.list_objects(filter_data) 828 for acl_group in acl_groups: 829 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id']) 830 acl_group['users'] = [user.login 831 for user in acl_group_obj.users.all()] 832 acl_group['hosts'] = [host.hostname 833 for host in acl_group_obj.hosts.all()] 834 return rpc_utils.prepare_for_serialization(acl_groups) 835 836 837# jobs 838 839def generate_control_file(tests=(), profilers=(), 840 client_control_file='', use_container=False, 841 profile_only=None, db_tests=True, 842 test_source_build=None): 843 """ 844 Generates a client-side control file to run tests. 845 846 @param tests List of tests to run. See db_tests for more information. 847 @param profilers List of profilers to activate during the job. 848 @param client_control_file The contents of a client-side control file to 849 run at the end of all tests. If this is supplied, all tests must be 850 client side. 851 TODO: in the future we should support server control files directly 852 to wrap with a kernel. That'll require changing the parameter 853 name and adding a boolean to indicate if it is a client or server 854 control file. 855 @param use_container unused argument today. TODO: Enable containers 856 on the host during a client side test. 857 @param profile_only A boolean that indicates what default profile_only 858 mode to use in the control file. Passing None will generate a 859 control file that does not explcitly set the default mode at all. 860 @param db_tests: if True, the test object can be found in the database 861 backing the test model. In this case, tests is a tuple 862 of test IDs which are used to retrieve the test objects 863 from the database. If False, tests is a tuple of test 864 dictionaries stored client-side in the AFE. 865 @param test_source_build: Build to be used to retrieve test code. Default 866 to None. 867 868 @returns a dict with the following keys: 869 control_file: str, The control file text. 870 is_server: bool, is the control file a server-side control file? 871 synch_count: How many machines the job uses per autoserv execution. 872 synch_count == 1 means the job is asynchronous. 873 dependencies: A list of the names of labels on which the job depends. 874 """ 875 if not tests and not client_control_file: 876 return dict(control_file='', is_server=False, synch_count=1, 877 dependencies=[]) 878 879 cf_info, test_objects, profiler_objects = ( 880 rpc_utils.prepare_generate_control_file(tests, profilers, 881 db_tests)) 882 cf_info['control_file'] = control_file_lib.generate_control( 883 tests=test_objects, profilers=profiler_objects, 884 is_server=cf_info['is_server'], 885 client_control_file=client_control_file, profile_only=profile_only, 886 test_source_build=test_source_build) 887 return cf_info 888 889 890def create_job_page_handler(name, priority, control_file, control_type, 891 image=None, hostless=False, firmware_rw_build=None, 892 firmware_ro_build=None, test_source_build=None, 893 is_cloning=False, cheets_build=None, **kwargs): 894 """\ 895 Create and enqueue a job. 896 897 @param name name of this job 898 @param priority Integer priority of this job. Higher is more important. 899 @param control_file String contents of the control file. 900 @param control_type Type of control file, Client or Server. 901 @param image: ChromeOS build to be installed in the dut. Default to None. 902 @param firmware_rw_build: Firmware build to update RW firmware. Default to 903 None, i.e., RW firmware will not be updated. 904 @param firmware_ro_build: Firmware build to update RO firmware. Default to 905 None, i.e., RO firmware will not be updated. 906 @param test_source_build: Build to be used to retrieve test code. Default 907 to None. 908 @param is_cloning: True if creating a cloning job. 909 @param cheets_build: ChromeOS Android build to be installed in the dut. 910 Default to None. Cheets build will not be updated. 911 @param kwargs extra args that will be required by create_suite_job or 912 create_job. 913 914 @returns The created Job id number. 915 """ 916 test_args = {} 917 if kwargs.get('args'): 918 # args' format is: ['disable_sysinfo=False', 'fast=True', ...] 919 args = kwargs.get('args') 920 for arg in args: 921 k, v = arg.split('=')[0], arg.split('=')[1] 922 test_args[k] = v 923 924 if is_cloning: 925 logging.info('Start to clone a new job') 926 # When cloning a job, hosts and meta_hosts should not exist together, 927 # which would cause host-scheduler to schedule two hqe jobs to one host 928 # at the same time, and crash itself. Clear meta_hosts for this case. 929 if kwargs.get('hosts') and kwargs.get('meta_hosts'): 930 kwargs['meta_hosts'] = [] 931 else: 932 logging.info('Start to create a new job') 933 control_file = rpc_utils.encode_ascii(control_file) 934 if not control_file: 935 raise model_logic.ValidationError({ 936 'control_file' : "Control file cannot be empty"}) 937 938 if image and hostless: 939 builds = {} 940 builds[provision.CROS_VERSION_PREFIX] = image 941 if cheets_build: 942 builds[provision.CROS_ANDROID_VERSION_PREFIX] = cheets_build 943 if firmware_rw_build: 944 builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build 945 if firmware_ro_build: 946 builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build 947 return create_suite_job( 948 name=name, control_file=control_file, priority=priority, 949 builds=builds, test_source_build=test_source_build, 950 is_cloning=is_cloning, test_args=test_args, **kwargs) 951 952 return create_job(name, priority, control_file, control_type, image=image, 953 hostless=hostless, test_args=test_args, **kwargs) 954 955 956@rpc_utils.route_rpc_to_main 957def create_job( 958 name, 959 priority, 960 control_file, 961 control_type, 962 hosts=(), 963 meta_hosts=(), 964 one_time_hosts=(), 965 synch_count=None, 966 is_template=False, 967 timeout=None, 968 timeout_mins=None, 969 max_runtime_mins=None, 970 run_verify=False, 971 email_list='', 972 dependencies=(), 973 reboot_before=None, 974 reboot_after=None, 975 parse_failed_repair=None, 976 hostless=False, 977 keyvals=None, 978 drone_set=None, 979 image=None, 980 parent_job_id=None, 981 test_retry=0, 982 run_reset=True, 983 require_ssp=None, 984 test_args=None, 985 **kwargs): 986 """\ 987 Create and enqueue a job. 988 989 @param name name of this job 990 @param priority Integer priority of this job. Higher is more important. 991 @param control_file String contents of the control file. 992 @param control_type Type of control file, Client or Server. 993 @param synch_count How many machines the job uses per autoserv execution. 994 synch_count == 1 means the job is asynchronous. If an atomic group is 995 given this value is treated as a minimum. 996 @param is_template If true then create a template job. 997 @param timeout Hours after this call returns until the job times out. 998 @param timeout_mins Minutes after this call returns until the job times 999 out. 1000 @param max_runtime_mins Minutes from job starting time until job times out 1001 @param run_verify Should the host be verified before running the test? 1002 @param email_list String containing emails to mail when the job is done 1003 @param dependencies List of label names on which this job depends 1004 @param reboot_before Never, If dirty, or Always 1005 @param reboot_after Never, If all tests passed, or Always 1006 @param parse_failed_repair if true, results of failed repairs launched by 1007 this job will be parsed as part of the job. 1008 @param hostless if true, create a hostless job 1009 @param keyvals dict of keyvals to associate with the job 1010 @param hosts List of hosts to run job on. 1011 @param meta_hosts List where each entry is a label name, and for each entry 1012 one host will be chosen from that label to run the job on. 1013 @param one_time_hosts List of hosts not in the database to run the job on. 1014 @param drone_set The name of the drone set to run this test on. 1015 @param image OS image to install before running job. 1016 @param parent_job_id id of a job considered to be parent of created job. 1017 @param test_retry DEPRECATED 1018 @param run_reset Should the host be reset before running the test? 1019 @param require_ssp Set to True to require server-side packaging to run the 1020 test. If it's set to None, drone will still try to run 1021 the server side with server-side packaging. If the 1022 autotest-server package doesn't exist for the build or 1023 image is not set, drone will run the test without server- 1024 side packaging. Default is None. 1025 @param test_args A dict of args passed to be injected into control file. 1026 @param kwargs extra keyword args. NOT USED. 1027 1028 @returns The created Job id number. 1029 """ 1030 if test_args: 1031 control_file = tools.inject_vars(test_args, control_file) 1032 if image: 1033 dependencies += (provision.image_version_to_label(image),) 1034 return rpc_utils.create_job_common( 1035 name=name, 1036 priority=priority, 1037 control_type=control_type, 1038 control_file=control_file, 1039 hosts=hosts, 1040 meta_hosts=meta_hosts, 1041 one_time_hosts=one_time_hosts, 1042 synch_count=synch_count, 1043 is_template=is_template, 1044 timeout=timeout, 1045 timeout_mins=timeout_mins, 1046 max_runtime_mins=max_runtime_mins, 1047 run_verify=run_verify, 1048 email_list=email_list, 1049 dependencies=dependencies, 1050 reboot_before=reboot_before, 1051 reboot_after=reboot_after, 1052 parse_failed_repair=parse_failed_repair, 1053 hostless=hostless, 1054 keyvals=keyvals, 1055 drone_set=drone_set, 1056 parent_job_id=parent_job_id, 1057 run_reset=run_reset, 1058 require_ssp=require_ssp) 1059 1060 1061def abort_host_queue_entries(**filter_data): 1062 """\ 1063 Abort a set of host queue entries. 1064 1065 @return: A list of dictionaries, each contains information 1066 about an aborted HQE. 1067 """ 1068 query = models.HostQueueEntry.query_objects(filter_data) 1069 1070 # Dont allow aborts on: 1071 # 1. Jobs that have already completed (whether or not they were aborted) 1072 # 2. Jobs that we have already been aborted (but may not have completed) 1073 query = query.filter(complete=False).filter(aborted=False) 1074 models.AclGroup.check_abort_permissions(query) 1075 host_queue_entries = list(query.select_related()) 1076 rpc_utils.check_abort_synchronous_jobs(host_queue_entries) 1077 1078 models.HostQueueEntry.abort_host_queue_entries(host_queue_entries) 1079 hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id, 1080 'Job name': hqe.job.name} for hqe in host_queue_entries] 1081 return hqe_info 1082 1083 1084def abort_special_tasks(**filter_data): 1085 """\ 1086 Abort the special task, or tasks, specified in the filter. 1087 """ 1088 query = models.SpecialTask.query_objects(filter_data) 1089 special_tasks = query.filter(is_active=True) 1090 for task in special_tasks: 1091 task.abort() 1092 1093 1094def _call_special_tasks_on_hosts(task, hosts): 1095 """\ 1096 Schedules a set of hosts for a special task. 1097 1098 @returns A list of hostnames that a special task was created for. 1099 """ 1100 models.AclGroup.check_for_acl_violation_hosts(hosts) 1101 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1102 if shard_host_map and not utils.is_shard(): 1103 raise ValueError('The following hosts are on shards, please ' 1104 'follow the link to the shards and create jobs ' 1105 'there instead. %s.' % shard_host_map) 1106 for host in hosts: 1107 models.SpecialTask.schedule_special_task(host, task) 1108 return list(sorted(host.hostname for host in hosts)) 1109 1110 1111def _forward_special_tasks_on_hosts(task, rpc, **filter_data): 1112 """Forward special tasks to corresponding shards. 1113 1114 For main, when special tasks are fired on hosts that are sharded, 1115 forward the RPC to corresponding shards. 1116 1117 For shard, create special task records in local DB. 1118 1119 @param task: Enum value of frontend.afe.models.SpecialTask.Task 1120 @param rpc: RPC name to forward. 1121 @param filter_data: Filter keywords to be used for DB query. 1122 1123 @return: A list of hostnames that a special task was created for. 1124 """ 1125 hosts = models.Host.query_objects(filter_data) 1126 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1127 1128 # Filter out hosts on a shard from those on the main, forward 1129 # rpcs to the shard with an additional hostname__in filter, and 1130 # create a local SpecialTask for each remaining host. 1131 if shard_host_map and not utils.is_shard(): 1132 hosts = [h for h in hosts if h.shard is None] 1133 for shard, hostnames in shard_host_map.iteritems(): 1134 1135 # The main client of this module is the frontend website, and 1136 # it invokes it with an 'id' or an 'id__in' filter. Regardless, 1137 # the 'hostname' filter should narrow down the list of hosts on 1138 # each shard even though we supply all the ids in filter_data. 1139 # This method uses hostname instead of id because it fits better 1140 # with the overall architecture of redirection functions in 1141 # rpc_utils. 1142 shard_filter = filter_data.copy() 1143 shard_filter['hostname__in'] = hostnames 1144 rpc_utils.run_rpc_on_multiple_hostnames( 1145 rpc, [shard], **shard_filter) 1146 1147 # There is a race condition here if someone assigns a shard to one of these 1148 # hosts before we create the task. The host will stay on the main if: 1149 # 1. The host is not Ready 1150 # 2. The host is Ready but has a task 1151 # But if the host is Ready and doesn't have a task yet, it will get sent 1152 # to the shard as we're creating a task here. 1153 1154 # Given that we only rarely verify Ready hosts it isn't worth putting this 1155 # entire method in a transaction. The worst case scenario is that we have 1156 # a verify running on a Ready host while the shard is using it, if the 1157 # verify fails no subsequent tasks will be created against the host on the 1158 # main, and verifies are safe enough that this is OK. 1159 return _call_special_tasks_on_hosts(task, hosts) 1160 1161 1162def reverify_hosts(**filter_data): 1163 """\ 1164 Schedules a set of hosts for verify. 1165 1166 @returns A list of hostnames that a verify task was created for. 1167 """ 1168 return _forward_special_tasks_on_hosts( 1169 models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data) 1170 1171 1172def repair_hosts(**filter_data): 1173 """\ 1174 Schedules a set of hosts for repair. 1175 1176 @returns A list of hostnames that a repair task was created for. 1177 """ 1178 return _forward_special_tasks_on_hosts( 1179 models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data) 1180 1181 1182def get_jobs(not_yet_run=False, running=False, finished=False, 1183 suite=False, sub=False, standalone=False, **filter_data): 1184 """\ 1185 Extra status filter args for get_jobs: 1186 -not_yet_run: Include only jobs that have not yet started running. 1187 -running: Include only jobs that have start running but for which not 1188 all hosts have completed. 1189 -finished: Include only jobs for which all hosts have completed (or 1190 aborted). 1191 1192 Extra type filter args for get_jobs: 1193 -suite: Include only jobs with child jobs. 1194 -sub: Include only jobs with a parent job. 1195 -standalone: Inlcude only jobs with no child or parent jobs. 1196 At most one of these three fields should be specified. 1197 """ 1198 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1199 running, 1200 finished) 1201 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1202 suite, 1203 sub, 1204 standalone) 1205 job_dicts = [] 1206 jobs = list(models.Job.query_objects(filter_data)) 1207 models.Job.objects.populate_relationships(jobs, models.Label, 1208 'dependencies') 1209 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') 1210 for job in jobs: 1211 job_dict = job.get_object_dict() 1212 job_dict['dependencies'] = ','.join(label.name 1213 for label in job.dependencies) 1214 job_dict['keyvals'] = dict((keyval.key, keyval.value) 1215 for keyval in job.keyvals) 1216 job_dicts.append(job_dict) 1217 return rpc_utils.prepare_for_serialization(job_dicts) 1218 1219 1220def get_num_jobs(not_yet_run=False, running=False, finished=False, 1221 suite=False, sub=False, standalone=False, 1222 **filter_data): 1223 """\ 1224 See get_jobs() for documentation of extra filter parameters. 1225 """ 1226 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1227 running, 1228 finished) 1229 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1230 suite, 1231 sub, 1232 standalone) 1233 return models.Job.query_count(filter_data) 1234 1235 1236def get_jobs_summary(**filter_data): 1237 """\ 1238 Like get_jobs(), but adds 'status_counts' and 'result_counts' field. 1239 1240 'status_counts' filed is a dictionary mapping status strings to the number 1241 of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}. 1242 1243 'result_counts' field is piped to tko's rpc_interface and has the return 1244 format specified under get_group_counts. 1245 """ 1246 jobs = get_jobs(**filter_data) 1247 ids = [job['id'] for job in jobs] 1248 all_status_counts = models.Job.objects.get_status_counts(ids) 1249 for job in jobs: 1250 job['status_counts'] = all_status_counts[job['id']] 1251 job['result_counts'] = tko_rpc_interface.get_status_counts( 1252 ['afe_job_id', 'afe_job_id'], 1253 header_groups=[['afe_job_id'], ['afe_job_id']], 1254 **{'afe_job_id': job['id']}) 1255 return rpc_utils.prepare_for_serialization(jobs) 1256 1257 1258def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None): 1259 """\ 1260 Retrieves all the information needed to clone a job. 1261 """ 1262 job = models.Job.objects.get(id=id) 1263 job_info = rpc_utils.get_job_info(job, 1264 preserve_metahosts, 1265 queue_entry_filter_data) 1266 1267 host_dicts = [] 1268 for host in job_info['hosts']: 1269 host_dict = get_hosts(id=host.id)[0] 1270 other_labels = host_dict['labels'] 1271 if host_dict['platform']: 1272 other_labels.remove(host_dict['platform']) 1273 host_dict['other_labels'] = ', '.join(other_labels) 1274 host_dicts.append(host_dict) 1275 1276 for host in job_info['one_time_hosts']: 1277 host_dict = dict(hostname=host.hostname, 1278 id=host.id, 1279 platform='(one-time host)', 1280 locked_text='') 1281 host_dicts.append(host_dict) 1282 1283 # convert keys from Label objects to strings (names of labels) 1284 meta_host_counts = dict((meta_host.name, count) for meta_host, count 1285 in job_info['meta_host_counts'].iteritems()) 1286 1287 info = dict(job=job.get_object_dict(), 1288 meta_host_counts=meta_host_counts, 1289 hosts=host_dicts) 1290 info['job']['dependencies'] = job_info['dependencies'] 1291 info['hostless'] = job_info['hostless'] 1292 info['drone_set'] = job.drone_set and job.drone_set.name 1293 1294 image = _get_image_for_job(job, job_info['hostless']) 1295 if image: 1296 info['job']['image'] = image 1297 1298 return rpc_utils.prepare_for_serialization(info) 1299 1300 1301def _get_image_for_job(job, hostless): 1302 """Gets the image used for a job. 1303 1304 Gets the image used for an AFE job from the job's keyvals 'build' or 1305 'builds'. If that fails, and the job is a hostless job, tries to 1306 get the image from its control file attributes 'build' or 'builds'. 1307 1308 TODO(ntang): Needs to handle FAFT with two builds for ro/rw. 1309 1310 @param job An AFE job object. 1311 @param hostless Boolean indicating whether the job is hostless. 1312 1313 @returns The image build used for the job. 1314 """ 1315 keyvals = job.keyval_dict() 1316 image = keyvals.get('build') 1317 if not image: 1318 value = keyvals.get('builds') 1319 builds = None 1320 if isinstance(value, dict): 1321 builds = value 1322 elif isinstance(value, six.string_types): 1323 builds = ast.literal_eval(value) 1324 if builds: 1325 image = builds.get('cros-version') 1326 if not image and hostless and job.control_file: 1327 try: 1328 control_obj = control_data.parse_control_string( 1329 job.control_file) 1330 if hasattr(control_obj, 'build'): 1331 image = getattr(control_obj, 'build') 1332 if not image and hasattr(control_obj, 'builds'): 1333 builds = getattr(control_obj, 'builds') 1334 image = builds.get('cros-version') 1335 except: 1336 logging.warning('Failed to parse control file for job: %s', 1337 job.name) 1338 return image 1339 1340 1341def get_host_queue_entries_by_insert_time( 1342 insert_time_after=None, insert_time_before=None, **filter_data): 1343 """Like get_host_queue_entries, but using the insert index table. 1344 1345 @param insert_time_after: A lower bound on insert_time 1346 @param insert_time_before: An upper bound on insert_time 1347 @returns A sequence of nested dictionaries of host and job information. 1348 """ 1349 assert insert_time_after is not None or insert_time_before is not None, \ 1350 ('Caller to get_host_queue_entries_by_insert_time must provide either' 1351 ' insert_time_after or insert_time_before.') 1352 # Get insert bounds on the index of the host queue entries. 1353 if insert_time_after: 1354 query = models.HostQueueEntryStartTimes.objects.filter( 1355 # Note: '-insert_time' means descending. We want the largest 1356 # insert time smaller than the insert time. 1357 insert_time__lte=insert_time_after).order_by('-insert_time') 1358 try: 1359 constraint = query[0].highest_hqe_id 1360 if 'id__gte' in filter_data: 1361 constraint = max(constraint, filter_data['id__gte']) 1362 filter_data['id__gte'] = constraint 1363 except IndexError: 1364 pass 1365 1366 # Get end bounds. 1367 if insert_time_before: 1368 query = models.HostQueueEntryStartTimes.objects.filter( 1369 insert_time__gte=insert_time_before).order_by('insert_time') 1370 try: 1371 constraint = query[0].highest_hqe_id 1372 if 'id__lte' in filter_data: 1373 constraint = min(constraint, filter_data['id__lte']) 1374 filter_data['id__lte'] = constraint 1375 except IndexError: 1376 pass 1377 1378 return rpc_utils.prepare_rows_as_nested_dicts( 1379 models.HostQueueEntry.query_objects(filter_data), 1380 ('host', 'job')) 1381 1382 1383def get_host_queue_entries(start_time=None, end_time=None, **filter_data): 1384 """\ 1385 @returns A sequence of nested dictionaries of host and job information. 1386 """ 1387 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1388 'started_on__lte', 1389 start_time, 1390 end_time, 1391 **filter_data) 1392 return rpc_utils.prepare_rows_as_nested_dicts( 1393 models.HostQueueEntry.query_objects(filter_data), 1394 ('host', 'job')) 1395 1396 1397def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data): 1398 """\ 1399 Get the number of host queue entries associated with this job. 1400 """ 1401 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1402 'started_on__lte', 1403 start_time, 1404 end_time, 1405 **filter_data) 1406 return models.HostQueueEntry.query_count(filter_data) 1407 1408 1409def get_hqe_percentage_complete(**filter_data): 1410 """ 1411 Computes the fraction of host queue entries matching the given filter data 1412 that are complete. 1413 """ 1414 query = models.HostQueueEntry.query_objects(filter_data) 1415 complete_count = query.filter(complete=True).count() 1416 total_count = query.count() 1417 if total_count == 0: 1418 return 1 1419 return float(complete_count) / total_count 1420 1421 1422# special tasks 1423 1424def get_special_tasks(**filter_data): 1425 """Get special task entries from the local database. 1426 1427 Query the special tasks table for tasks matching the given 1428 `filter_data`, and return a list of the results. No attempt is 1429 made to forward the call to shards; the buck will stop here. 1430 The caller is expected to know the target shard for such reasons 1431 as: 1432 * The caller is a service (such as gs_offloader) configured 1433 to operate on behalf of one specific shard, and no other. 1434 * The caller has a host as a parameter, and knows that this is 1435 the shard assigned to that host. 1436 1437 @param filter_data Filter keywords to pass to the underlying 1438 database query. 1439 1440 """ 1441 return rpc_utils.prepare_rows_as_nested_dicts( 1442 models.SpecialTask.query_objects(filter_data), 1443 ('host', 'queue_entry')) 1444 1445 1446def get_host_special_tasks(host_id, **filter_data): 1447 """Get special task entries for a given host. 1448 1449 Query the special tasks table for tasks that ran on the host 1450 given by `host_id` and matching the given `filter_data`. 1451 Return a list of the results. If the host is assigned to a 1452 shard, forward this call to that shard. 1453 1454 @param host_id Id in the database of the target host. 1455 @param filter_data Filter keywords to pass to the underlying 1456 database query. 1457 1458 """ 1459 # Retrieve host data even if the host is in an invalid state. 1460 host = models.Host.smart_get(host_id, False) 1461 if not host.shard: 1462 return get_special_tasks(host_id=host_id, **filter_data) 1463 else: 1464 # The return values from AFE methods are post-processed 1465 # objects that aren't JSON-serializable. So, we have to 1466 # call AFE.run() to get the raw, serializable output from 1467 # the shard. 1468 shard_afe = frontend.AFE(server=host.shard.hostname) 1469 return shard_afe.run('get_special_tasks', 1470 host_id=host_id, **filter_data) 1471 1472 1473def get_num_special_tasks(**kwargs): 1474 """Get the number of special task entries from the local database. 1475 1476 Query the special tasks table for tasks matching the given 'kwargs', 1477 and return the number of the results. No attempt is made to forward 1478 the call to shards; the buck will stop here. 1479 1480 @param kwargs Filter keywords to pass to the underlying database query. 1481 1482 """ 1483 return models.SpecialTask.query_count(kwargs) 1484 1485 1486def get_host_num_special_tasks(host, **kwargs): 1487 """Get special task entries for a given host. 1488 1489 Query the special tasks table for tasks that ran on the host 1490 given by 'host' and matching the given 'kwargs'. 1491 Return a list of the results. If the host is assigned to a 1492 shard, forward this call to that shard. 1493 1494 @param host id or name of a host. More often a hostname. 1495 @param kwargs Filter keywords to pass to the underlying database query. 1496 1497 """ 1498 # Retrieve host data even if the host is in an invalid state. 1499 host_model = models.Host.smart_get(host, False) 1500 if not host_model.shard: 1501 return get_num_special_tasks(host=host, **kwargs) 1502 else: 1503 shard_afe = frontend.AFE(server=host_model.shard.hostname) 1504 return shard_afe.run('get_num_special_tasks', host=host, **kwargs) 1505 1506 1507def get_status_task(host_id, end_time): 1508 """Get the "status task" for a host from the local shard. 1509 1510 Returns a single special task representing the given host's 1511 "status task". The status task is a completed special task that 1512 identifies whether the corresponding host was working or broken 1513 when it completed. A successful task indicates a working host; 1514 a failed task indicates broken. 1515 1516 This call will not be forward to a shard; the receiving server 1517 must be the shard that owns the host. 1518 1519 @param host_id Id in the database of the target host. 1520 @param end_time Time reference for the host's status. 1521 1522 @return A single task; its status (successful or not) 1523 corresponds to the status of the host (working or 1524 broken) at the given time. If no task is found, return 1525 `None`. 1526 1527 """ 1528 tasklist = rpc_utils.prepare_rows_as_nested_dicts( 1529 status_history.get_status_task(host_id, end_time), 1530 ('host', 'queue_entry')) 1531 return tasklist[0] if tasklist else None 1532 1533 1534def get_host_status_task(host_id, end_time): 1535 """Get the "status task" for a host from its owning shard. 1536 1537 Finds the given host's owning shard, and forwards to it a call 1538 to `get_status_task()` (see above). 1539 1540 @param host_id Id in the database of the target host. 1541 @param end_time Time reference for the host's status. 1542 1543 @return A single task; its status (successful or not) 1544 corresponds to the status of the host (working or 1545 broken) at the given time. If no task is found, return 1546 `None`. 1547 1548 """ 1549 host = models.Host.smart_get(host_id) 1550 if not host.shard: 1551 return get_status_task(host_id, end_time) 1552 else: 1553 # The return values from AFE methods are post-processed 1554 # objects that aren't JSON-serializable. So, we have to 1555 # call AFE.run() to get the raw, serializable output from 1556 # the shard. 1557 shard_afe = frontend.AFE(server=host.shard.hostname) 1558 return shard_afe.run('get_status_task', 1559 host_id=host_id, end_time=end_time) 1560 1561 1562def get_host_diagnosis_interval(host_id, end_time, success): 1563 """Find a "diagnosis interval" for a given host. 1564 1565 A "diagnosis interval" identifies a start and end time where 1566 the host went from "working" to "broken", or vice versa. The 1567 interval's starting time is the starting time of the last status 1568 task with the old status; the end time is the finish time of the 1569 first status task with the new status. 1570 1571 This routine finds the most recent diagnosis interval for the 1572 given host prior to `end_time`, with a starting status matching 1573 `success`. If `success` is true, the interval will start with a 1574 successful status task; if false the interval will start with a 1575 failed status task. 1576 1577 @param host_id Id in the database of the target host. 1578 @param end_time Time reference for the diagnosis interval. 1579 @param success Whether the diagnosis interval should start 1580 with a successful or failed status task. 1581 1582 @return A list of two strings. The first is the timestamp for 1583 the beginning of the interval; the second is the 1584 timestamp for the end. If the host has never changed 1585 state, the list is empty. 1586 1587 """ 1588 host = models.Host.smart_get(host_id) 1589 if not host.shard or utils.is_shard(): 1590 return status_history.get_diagnosis_interval( 1591 host_id, end_time, success) 1592 else: 1593 shard_afe = frontend.AFE(server=host.shard.hostname) 1594 return shard_afe.get_host_diagnosis_interval( 1595 host_id, end_time, success) 1596 1597 1598# support for host detail view 1599 1600def get_host_queue_entries_and_special_tasks(host, query_start=None, 1601 query_limit=None, start_time=None, 1602 end_time=None): 1603 """ 1604 @returns an interleaved list of HostQueueEntries and SpecialTasks, 1605 in approximate run order. each dict contains keys for type, host, 1606 job, status, started_on, execution_path, and ID. 1607 """ 1608 total_limit = None 1609 if query_limit is not None: 1610 total_limit = query_start + query_limit 1611 filter_data_common = {'host': host, 1612 'query_limit': total_limit, 1613 'sort_by': ['-id']} 1614 1615 filter_data_special_tasks = rpc_utils.inject_times_to_filter( 1616 'time_started__gte', 'time_started__lte', start_time, end_time, 1617 **filter_data_common) 1618 1619 queue_entries = get_host_queue_entries( 1620 start_time, end_time, **filter_data_common) 1621 special_tasks = get_host_special_tasks(host, **filter_data_special_tasks) 1622 1623 interleaved_entries = rpc_utils.interleave_entries(queue_entries, 1624 special_tasks) 1625 if query_start is not None: 1626 interleaved_entries = interleaved_entries[query_start:] 1627 if query_limit is not None: 1628 interleaved_entries = interleaved_entries[:query_limit] 1629 return rpc_utils.prepare_host_queue_entries_and_special_tasks( 1630 interleaved_entries, queue_entries) 1631 1632 1633def get_num_host_queue_entries_and_special_tasks(host, start_time=None, 1634 end_time=None): 1635 filter_data_common = {'host': host} 1636 1637 filter_data_queue_entries, filter_data_special_tasks = ( 1638 rpc_utils.inject_times_to_hqe_special_tasks_filters( 1639 filter_data_common, start_time, end_time)) 1640 1641 return (models.HostQueueEntry.query_count(filter_data_queue_entries) 1642 + get_host_num_special_tasks(**filter_data_special_tasks)) 1643 1644 1645# other 1646 1647def echo(data=""): 1648 """\ 1649 Returns a passed in string. For doing a basic test to see if RPC calls 1650 can successfully be made. 1651 """ 1652 return data 1653 1654 1655def get_motd(): 1656 """\ 1657 Returns the message of the day as a string. 1658 """ 1659 return rpc_utils.get_motd() 1660 1661 1662def get_static_data(): 1663 """\ 1664 Returns a dictionary containing a bunch of data that shouldn't change 1665 often and is otherwise inaccessible. This includes: 1666 1667 priorities: List of job priority choices. 1668 default_priority: Default priority value for new jobs. 1669 users: Sorted list of all users. 1670 labels: Sorted list of labels not start with 'cros-version' and 1671 'fw-version'. 1672 tests: Sorted list of all tests. 1673 profilers: Sorted list of all profilers. 1674 current_user: Logged-in username. 1675 host_statuses: Sorted list of possible Host statuses. 1676 job_statuses: Sorted list of possible HostQueueEntry statuses. 1677 job_timeout_default: The default job timeout length in minutes. 1678 parse_failed_repair_default: Default value for the parse_failed_repair job 1679 option. 1680 reboot_before_options: A list of valid RebootBefore string enums. 1681 reboot_after_options: A list of valid RebootAfter string enums. 1682 motd: Server's message of the day. 1683 status_dictionary: A mapping from one word job status names to a more 1684 informative description. 1685 """ 1686 1687 default_drone_set_name = models.DroneSet.default_drone_set_name() 1688 drone_sets = ([default_drone_set_name] + 1689 sorted(drone_set.name for drone_set in 1690 models.DroneSet.objects.exclude( 1691 name=default_drone_set_name))) 1692 1693 result = {} 1694 result['priorities'] = priorities.Priority.choices() 1695 result['default_priority'] = 'Default' 1696 result['max_schedulable_priority'] = priorities.Priority.DEFAULT 1697 result['users'] = get_users(sort_by=['login']) 1698 1699 label_exclude_filters = [{'name__startswith': 'cros-version'}, 1700 {'name__startswith': 'fw-version'}, 1701 {'name__startswith': 'fwrw-version'}, 1702 {'name__startswith': 'fwro-version'}] 1703 result['labels'] = get_labels( 1704 label_exclude_filters, 1705 sort_by=['-platform', 'name']) 1706 1707 result['tests'] = get_tests(sort_by=['name']) 1708 result['profilers'] = get_profilers(sort_by=['name']) 1709 result['current_user'] = rpc_utils.prepare_for_serialization( 1710 models.User.current_user().get_object_dict()) 1711 result['host_statuses'] = sorted(models.Host.Status.names) 1712 result['job_statuses'] = sorted(models.HostQueueEntry.Status.names) 1713 result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS 1714 result['job_max_runtime_mins_default'] = ( 1715 models.Job.DEFAULT_MAX_RUNTIME_MINS) 1716 result['parse_failed_repair_default'] = bool( 1717 models.Job.DEFAULT_PARSE_FAILED_REPAIR) 1718 result['reboot_before_options'] = model_attributes.RebootBefore.names 1719 result['reboot_after_options'] = model_attributes.RebootAfter.names 1720 result['motd'] = rpc_utils.get_motd() 1721 result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled() 1722 result['drone_sets'] = drone_sets 1723 1724 result['status_dictionary'] = {"Aborted": "Aborted", 1725 "Verifying": "Verifying Host", 1726 "Provisioning": "Provisioning Host", 1727 "Pending": "Waiting on other hosts", 1728 "Running": "Running autoserv", 1729 "Completed": "Autoserv completed", 1730 "Failed": "Failed to complete", 1731 "Queued": "Queued", 1732 "Starting": "Next in host's queue", 1733 "Stopped": "Other host(s) failed verify", 1734 "Parsing": "Awaiting parse of final results", 1735 "Gathering": "Gathering log files", 1736 "Waiting": "Waiting for scheduler action", 1737 "Archiving": "Archiving results", 1738 "Resetting": "Resetting hosts"} 1739 1740 result['wmatrix_url'] = rpc_utils.get_wmatrix_url() 1741 result['stainless_url'] = rpc_utils.get_stainless_url() 1742 result['is_moblab'] = bool(utils.is_moblab()) 1743 1744 return result 1745 1746 1747def get_server_time(): 1748 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M") 1749 1750 1751def ping_db(): 1752 """Simple connection test to db""" 1753 try: 1754 db_connection.cursor() 1755 except DatabaseError: 1756 return [False] 1757 return [True] 1758 1759 1760def get_hosts_by_attribute(attribute, value): 1761 """ 1762 Get the list of valid hosts that share the same host attribute value. 1763 1764 @param attribute: String of the host attribute to check. 1765 @param value: String of the value that is shared between hosts. 1766 1767 @returns List of hostnames that all have the same host attribute and 1768 value. 1769 """ 1770 rows = models.HostAttribute.query_objects({'attribute': attribute, 1771 'value': value}) 1772 if RESPECT_STATIC_ATTRIBUTES: 1773 returned_hosts = set() 1774 # Add hosts: 1775 # * Non-valid 1776 # * Exist in afe_host_attribute with given attribute. 1777 # * Don't exist in afe_static_host_attribute OR exist in 1778 # afe_static_host_attribute with the same given value. 1779 for row in rows: 1780 if row.host.invalid != 0: 1781 continue 1782 1783 static_hosts = models.StaticHostAttribute.query_objects( 1784 {'host_id': row.host.id, 'attribute': attribute}) 1785 values = [static_host.value for static_host in static_hosts] 1786 if len(values) == 0 or values[0] == value: 1787 returned_hosts.add(row.host.hostname) 1788 1789 # Add hosts: 1790 # * Non-valid 1791 # * Exist in afe_static_host_attribute with given attribute 1792 # and value 1793 # * No need to check whether each static attribute has its 1794 # corresponding entry in afe_host_attribute since it is ensured 1795 # in inventory sync. 1796 static_rows = models.StaticHostAttribute.query_objects( 1797 {'attribute': attribute, 'value': value}) 1798 for row in static_rows: 1799 if row.host.invalid != 0: 1800 continue 1801 1802 returned_hosts.add(row.host.hostname) 1803 1804 return list(returned_hosts) 1805 else: 1806 return [row.host.hostname for row in rows if row.host.invalid == 0] 1807 1808 1809def _get_control_file_by_suite(suite_name): 1810 """Get control file contents by suite name. 1811 1812 @param suite_name: Suite name as string. 1813 @returns: Control file contents as string. 1814 """ 1815 getter = control_file_getter.FileSystemGetter( 1816 [_CONFIG.get_config_value('SCHEDULER', 1817 'drone_installation_directory')]) 1818 return getter.get_control_file_contents_by_name(suite_name) 1819 1820 1821@rpc_utils.route_rpc_to_main 1822def create_suite_job( 1823 name='', 1824 board='', 1825 pool='', 1826 child_dependencies=(), 1827 control_file='', 1828 check_hosts=True, 1829 num=None, 1830 file_bugs=False, 1831 timeout=24, 1832 timeout_mins=None, 1833 priority=priorities.Priority.DEFAULT, 1834 suite_args=None, 1835 wait_for_results=True, 1836 job_retry=False, 1837 max_retries=None, 1838 max_runtime_mins=None, 1839 suite_min_duts=0, 1840 offload_failures_only=False, 1841 builds=None, 1842 test_source_build=None, 1843 run_prod_code=False, 1844 delay_minutes=0, 1845 is_cloning=False, 1846 job_keyvals=None, 1847 test_args=None, 1848 **kwargs): 1849 """ 1850 Create a job to run a test suite on the given device with the given image. 1851 1852 When the timeout specified in the control file is reached, the 1853 job is guaranteed to have completed and results will be available. 1854 1855 @param name: The test name if control_file is supplied, otherwise the name 1856 of the test suite to run, e.g. 'bvt'. 1857 @param board: the kind of device to run the tests on. 1858 @param builds: the builds to install e.g. 1859 {'cros-version:': 'x86-alex-release/R18-1655.0.0', 1860 'fwrw-version:': 'x86-alex-firmware/R36-5771.50.0', 1861 'fwro-version:': 'x86-alex-firmware/R36-5771.49.0'} 1862 If builds is given a value, it overrides argument build. 1863 @param test_source_build: Build that contains the server-side test code. 1864 @param pool: Specify the pool of machines to use for scheduling 1865 purposes. 1866 @param child_dependencies: (optional) list of additional dependency labels 1867 (strings) that will be added as dependency labels to child jobs. 1868 @param control_file: the control file of the job. 1869 @param check_hosts: require appropriate live hosts to exist in the lab. 1870 @param num: Specify the number of machines to schedule across (integer). 1871 Leave unspecified or use None to use default sharding factor. 1872 @param file_bugs: File a bug on each test failure in this suite. 1873 @param timeout: The max lifetime of this suite, in hours. 1874 @param timeout_mins: The max lifetime of this suite, in minutes. Takes 1875 priority over timeout. 1876 @param priority: Integer denoting priority. Higher is more important. 1877 @param suite_args: Optional arguments which will be parsed by the suite 1878 control file. Used by control.test_that_wrapper to 1879 determine which tests to run. 1880 @param wait_for_results: Set to False to run the suite job without waiting 1881 for test jobs to finish. Default is True. 1882 @param job_retry: Set to True to enable job-level retry. Default is False. 1883 @param max_retries: Integer, maximum job retries allowed at suite level. 1884 None for no max. 1885 @param max_runtime_mins: Maximum amount of time a job can be running in 1886 minutes. 1887 @param suite_min_duts: Integer. Scheduler will prioritize getting the 1888 minimum number of machines for the suite when it is 1889 competing with another suite that has a higher 1890 priority but already got minimum machines it needs. 1891 @param offload_failures_only: Only enable gs_offloading for failed jobs. 1892 @param run_prod_code: If True, the suite will run the test code that 1893 lives in prod aka the test code currently on the 1894 lab servers. If False, the control files and test 1895 code for this suite run will be retrieved from the 1896 build artifacts. 1897 @param delay_minutes: Delay the creation of test jobs for a given number of 1898 minutes. 1899 @param is_cloning: True if creating a cloning job. 1900 @param job_keyvals: A dict of job keyvals to be inject to control file. 1901 @param test_args: A dict of args passed all the way to each individual test 1902 that will be actually run. 1903 @param kwargs: extra keyword args. NOT USED. 1904 1905 @raises ControlFileNotFound: if a unique suite control file doesn't exist. 1906 @raises NoControlFileList: if we can't list the control files at all. 1907 @raises StageControlFileFailure: If the dev server throws 500 while 1908 staging test_suites. 1909 @raises ControlFileEmpty: if the control file exists on the server, but 1910 can't be read. 1911 1912 @return: the job ID of the suite; -1 on error. 1913 """ 1914 if num is not None: 1915 warnings.warn('num is deprecated for create_suite_job') 1916 del num 1917 1918 if builds is None: 1919 builds = {} 1920 1921 # Default test source build to CrOS build if it's not specified and 1922 # run_prod_code is set to False. 1923 if not run_prod_code: 1924 test_source_build = Suite.get_test_source_build( 1925 builds, test_source_build=test_source_build) 1926 1927 sample_dut = rpc_utils.get_sample_dut(board, pool) 1928 1929 suite_name = suite_common.canonicalize_suite_name(name) 1930 if run_prod_code: 1931 ds = dev_server.resolve(test_source_build, hostname=sample_dut) 1932 keyvals = {} 1933 else: 1934 ds, keyvals = suite_common.stage_build_artifacts( 1935 test_source_build, hostname=sample_dut) 1936 keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts 1937 1938 # Do not change this naming convention without updating 1939 # site_utils.parse_job_name. 1940 if run_prod_code: 1941 # If run_prod_code is True, test_source_build is not set, use the 1942 # first build in the builds list for the sutie job name. 1943 name = '%s-%s' % (builds.values()[0], suite_name) 1944 else: 1945 name = '%s-%s' % (test_source_build, suite_name) 1946 1947 timeout_mins = timeout_mins or timeout * 60 1948 max_runtime_mins = max_runtime_mins or timeout * 60 1949 1950 if not board: 1951 board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0] 1952 1953 if run_prod_code: 1954 control_file = _get_control_file_by_suite(suite_name) 1955 1956 if not control_file: 1957 # No control file was supplied so look it up from the build artifacts. 1958 control_file = suite_common.get_control_file_by_build( 1959 test_source_build, ds, suite_name) 1960 1961 # Prepend builds and board to the control file. 1962 if is_cloning: 1963 control_file = tools.remove_injection(control_file) 1964 1965 if suite_args is None: 1966 suite_args = dict() 1967 1968 inject_dict = { 1969 'board': board, 1970 # `build` is needed for suites like AU to stage image inside suite 1971 # control file. 1972 'build': test_source_build, 1973 'builds': builds, 1974 'check_hosts': check_hosts, 1975 'pool': pool, 1976 'child_dependencies': child_dependencies, 1977 'file_bugs': file_bugs, 1978 'timeout': timeout, 1979 'timeout_mins': timeout_mins, 1980 'devserver_url': ds.url(), 1981 'priority': priority, 1982 'wait_for_results': wait_for_results, 1983 'job_retry': job_retry, 1984 'max_retries': max_retries, 1985 'max_runtime_mins': max_runtime_mins, 1986 'offload_failures_only': offload_failures_only, 1987 'test_source_build': test_source_build, 1988 'run_prod_code': run_prod_code, 1989 'delay_minutes': delay_minutes, 1990 'job_keyvals': job_keyvals, 1991 'test_args': test_args, 1992 } 1993 inject_dict.update(suite_args) 1994 control_file = tools.inject_vars(inject_dict, control_file) 1995 1996 return rpc_utils.create_job_common(name, 1997 priority=priority, 1998 timeout_mins=timeout_mins, 1999 max_runtime_mins=max_runtime_mins, 2000 control_type='Server', 2001 control_file=control_file, 2002 hostless=True, 2003 keyvals=keyvals) 2004 2005 2006def get_job_history(**filter_data): 2007 """Get history of the job, including the special tasks executed for the job 2008 2009 @param filter_data: filter for the call, should at least include 2010 {'job_id': [job id]} 2011 @returns: JSON string of the job's history, including the information such 2012 as the hosts run the job and the special tasks executed before 2013 and after the job. 2014 """ 2015 job_id = filter_data['job_id'] 2016 job_info = job_history.get_job_info(job_id) 2017 return rpc_utils.prepare_for_serialization(job_info.get_history()) 2018 2019 2020def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(), 2021 known_host_ids=(), known_host_statuses=()): 2022 """Receive updates for job statuses from shards and assign hosts and jobs. 2023 2024 @param shard_hostname: Hostname of the calling shard 2025 @param jobs: Jobs in serialized form that should be updated with newer 2026 status from a shard. 2027 @param hqes: Hostqueueentries in serialized form that should be updated with 2028 newer status from a shard. Note that for every hostqueueentry 2029 the corresponding job must be in jobs. 2030 @param known_job_ids: List of ids of jobs the shard already has. 2031 @param known_host_ids: List of ids of hosts the shard already has. 2032 @param known_host_statuses: List of statuses of hosts the shard already has. 2033 2034 @returns: Serialized representations of hosts, jobs, suite job keyvals 2035 and their dependencies to be inserted into a shard's database. 2036 """ 2037 # The following alternatives to sending host and job ids in every heartbeat 2038 # have been considered: 2039 # 1. Sending the highest known job and host ids. This would work for jobs: 2040 # Newer jobs always have larger ids. Also, if a job is not assigned to a 2041 # particular shard during a heartbeat, it never will be assigned to this 2042 # shard later. 2043 # This is not true for hosts though: A host that is leased won't be sent 2044 # to the shard now, but might be sent in a future heartbeat. This means 2045 # sometimes hosts should be transfered that have a lower id than the 2046 # maximum host id the shard knows. 2047 # 2. Send the number of jobs/hosts the shard knows to the main in each 2048 # heartbeat. Compare these to the number of records that already have 2049 # the shard_id set to this shard. In the normal case, they should match. 2050 # In case they don't, resend all entities of that type. 2051 # This would work well for hosts, because there aren't that many. 2052 # Resending all jobs is quite a big overhead though. 2053 # Also, this approach might run into edge cases when entities are 2054 # ever deleted. 2055 # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts. 2056 # Using two different approaches isn't consistent and might cause 2057 # confusion. Also the issues with the case of deletions might still 2058 # occur. 2059 # 2060 # The overhead of sending all job and host ids in every heartbeat is low: 2061 # At peaks one board has about 1200 created but unfinished jobs. 2062 # See the numbers here: http://goo.gl/gQCGWH 2063 # Assuming that job id's have 6 digits and that json serialization takes a 2064 # comma and a space as overhead, the traffic per id sent is about 8 bytes. 2065 # If 5000 ids need to be sent, this means 40 kilobytes of traffic. 2066 # A NOT IN query with 5000 ids took about 30ms in tests made. 2067 # These numbers seem low enough to outweigh the disadvantages of the 2068 # solutions described above. 2069 shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname) 2070 rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes) 2071 assert len(known_host_ids) == len(known_host_statuses) 2072 for i in range(len(known_host_ids)): 2073 host_model = models.Host.objects.get(pk=known_host_ids[i]) 2074 if host_model.status != known_host_statuses[i]: 2075 host_model.status = known_host_statuses[i] 2076 host_model.save() 2077 2078 hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard( 2079 shard_obj, known_job_ids=known_job_ids, 2080 known_host_ids=known_host_ids) 2081 return { 2082 'hosts': [host.serialize() for host in hosts], 2083 'jobs': [job.serialize() for job in jobs], 2084 'suite_keyvals': [kv.serialize() for kv in suite_keyvals], 2085 'incorrect_host_ids': [int(i) for i in inc_ids], 2086 } 2087 2088 2089def get_shards(**filter_data): 2090 """Return a list of all shards. 2091 2092 @returns A sequence of nested dictionaries of shard information. 2093 """ 2094 shards = models.Shard.query_objects(filter_data) 2095 serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ()) 2096 for serialized, shard in zip(serialized_shards, shards): 2097 serialized['labels'] = [label.name for label in shard.labels.all()] 2098 2099 return serialized_shards 2100 2101 2102def _assign_board_to_shard_precheck(labels): 2103 """Verify whether board labels are valid to be added to a given shard. 2104 2105 First check whether board label is in correct format. Second, check whether 2106 the board label exist. Third, check whether the board has already been 2107 assigned to shard. 2108 2109 @param labels: Board labels separated by comma. 2110 2111 @raises error.RPCException: If label provided doesn't start with `board:` 2112 or board has been added to shard already. 2113 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2114 2115 @returns: A list of label models that ready to be added to shard. 2116 """ 2117 if not labels: 2118 # allow creation of label-less shards (labels='' would otherwise fail the 2119 # checks below) 2120 return [] 2121 labels = labels.split(',') 2122 label_models = [] 2123 for label in labels: 2124 # Check whether the board label is in correct format. 2125 if not label.startswith('board:'): 2126 raise error.RPCException('Sharding only supports `board:.*` label.') 2127 # Check whether the board label exist. If not, exception will be thrown 2128 # by smart_get function. 2129 label = models.Label.smart_get(label) 2130 # Check whether the board has been sharded already 2131 try: 2132 shard = models.Shard.objects.get(labels=label) 2133 raise error.RPCException( 2134 '%s is already on shard %s' % (label, shard.hostname)) 2135 except models.Shard.DoesNotExist: 2136 # board is not on any shard, so it's valid. 2137 label_models.append(label) 2138 return label_models 2139 2140 2141def add_shard(hostname, labels): 2142 """Add a shard and start running jobs on it. 2143 2144 @param hostname: The hostname of the shard to be added; needs to be unique. 2145 @param labels: Board labels separated by comma. Jobs of one of the labels 2146 will be assigned to the shard. 2147 2148 @raises error.RPCException: If label provided doesn't start with `board:` or 2149 board has been added to shard already. 2150 @raises model_logic.ValidationError: If a shard with the given hostname 2151 already exist. 2152 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2153 2154 @returns: The id of the added shard. 2155 """ 2156 labels = _assign_board_to_shard_precheck(labels) 2157 shard = models.Shard.add_object(hostname=hostname) 2158 for label in labels: 2159 shard.labels.add(label) 2160 return shard.id 2161 2162 2163def add_board_to_shard(hostname, labels): 2164 """Add boards to a given shard 2165 2166 @param hostname: The hostname of the shard to be changed. 2167 @param labels: Board labels separated by comma. 2168 2169 @raises error.RPCException: If label provided doesn't start with `board:` or 2170 board has been added to shard already. 2171 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2172 2173 @returns: The id of the changed shard. 2174 """ 2175 labels = _assign_board_to_shard_precheck(labels) 2176 shard = models.Shard.objects.get(hostname=hostname) 2177 for label in labels: 2178 shard.labels.add(label) 2179 return shard.id 2180 2181 2182# Remove board RPCs are rare, so we can afford to make them a bit more 2183# expensive (by performing in a transaction) in order to guarantee 2184# atomicity. 2185@transaction.commit_on_success 2186def remove_board_from_shard(hostname, label): 2187 """Remove board from the given shard. 2188 @param hostname: The hostname of the shard to be changed. 2189 @param labels: Board label. 2190 2191 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2192 2193 @returns: The id of the changed shard. 2194 """ 2195 shard = models.Shard.objects.get(hostname=hostname) 2196 label = models.Label.smart_get(label) 2197 if label not in shard.labels.all(): 2198 raise error.RPCException( 2199 'Cannot remove label from shard that does not belong to it.') 2200 2201 shard.labels.remove(label) 2202 if label.is_replaced_by_static(): 2203 static_label = models.StaticLabel.smart_get(label.name) 2204 models.Host.objects.filter( 2205 static_labels__in=[static_label]).update(shard=None) 2206 else: 2207 models.Host.objects.filter(labels__in=[label]).update(shard=None) 2208 2209 2210def delete_shard(hostname): 2211 """Delete a shard and reclaim all resources from it. 2212 2213 This claims back all assigned hosts from the shard. 2214 """ 2215 shard = rpc_utils.retrieve_shard(shard_hostname=hostname) 2216 2217 # Remove shard information. 2218 models.Host.objects.filter(shard=shard).update(shard=None) 2219 2220 # Note: The original job-cleanup query was performed with django call 2221 # models.Job.objects.filter(shard=shard).update(shard=None) 2222 # 2223 # But that started becoming unreliable due to the large size of afe_jobs. 2224 # 2225 # We don't need atomicity here, so the new cleanup method is iterative, in 2226 # chunks of 100k jobs. 2227 QUERY = ('UPDATE afe_jobs SET shard_id = NULL WHERE shard_id = %s ' 2228 'LIMIT 100000') 2229 try: 2230 with contextlib.closing(db_connection.cursor()) as cursor: 2231 clear_jobs = True 2232 assert shard.id is not None 2233 while clear_jobs: 2234 cursor.execute(QUERY % shard.id) 2235 clear_jobs = bool(cursor.fetchone()) 2236 # Unit tests use sqlite backend instead of MySQL. sqlite does not support 2237 # UPDATE ... LIMIT, so fall back to the old behavior. 2238 except DatabaseError as e: 2239 if 'syntax error' in str(e): 2240 models.Job.objects.filter(shard=shard).update(shard=None) 2241 else: 2242 raise 2243 2244 shard.labels.clear() 2245 shard.delete() 2246 2247 2248def get_servers(hostname=None, role=None, status=None): 2249 """Get a list of servers with matching role and status. 2250 2251 @param hostname: FQDN of the server. 2252 @param role: Name of the server role, e.g., drone, scheduler. Default to 2253 None to match any role. 2254 @param status: Status of the server, e.g., primary, backup, repair_required. 2255 Default to None to match any server status. 2256 2257 @raises error.RPCException: If server database is not used. 2258 @return: A list of server names for servers with matching role and status. 2259 """ 2260 raise DeprecationWarning("server_manager_utils has been removed.") 2261 2262 2263@rpc_utils.route_rpc_to_main 2264def get_stable_version(board=stable_version_utils.DEFAULT, android=False): 2265 """Get stable version for the given board. 2266 2267 @param board: Name of the board. 2268 @param android: Unused legacy parameter. This is maintained for the 2269 sake of clients on old branches that still pass the 2270 parameter. TODO(jrbarnette) Remove this completely once R68 2271 drops off stable. 2272 2273 @return: Stable version of the given board. Return global configure value 2274 of CROS.stable_cros_version if stable_versinos table does not have 2275 entry of board DEFAULT. 2276 """ 2277 assert not android, 'get_stable_version no longer supports `android`.' 2278 return stable_version_utils.get(board=board) 2279 2280 2281@rpc_utils.route_rpc_to_main 2282def get_all_stable_versions(): 2283 """Get stable versions for all boards. 2284 2285 @return: A dictionary of board:version. 2286 """ 2287 return stable_version_utils.get_all() 2288 2289 2290@rpc_utils.route_rpc_to_main 2291def set_stable_version(version, board=stable_version_utils.DEFAULT): 2292 """Modify stable version for the given board. 2293 2294 @param version: The new value of stable version for given board. 2295 @param board: Name of the board, default to value `DEFAULT`. 2296 """ 2297 logging.warning("rpc_interface::set_stable_version: attempted to set stable version. setting the stable version is not permitted") 2298 return None 2299 2300 2301@rpc_utils.route_rpc_to_main 2302def delete_stable_version(board): 2303 """Modify stable version for the given board. 2304 2305 Delete a stable version entry in afe_stable_versions table for a given 2306 board, so default stable version will be used. 2307 2308 @param board: Name of the board. 2309 """ 2310 stable_version_utils.delete(board=board) 2311 2312 2313def get_tests_by_build(build, ignore_invalid_tests=True): 2314 """Get the tests that are available for the specified build. 2315 2316 @param build: unique name by which to refer to the image. 2317 @param ignore_invalid_tests: flag on if unparsable tests are ignored. 2318 2319 @return: A sorted list of all tests that are in the build specified. 2320 """ 2321 # Collect the control files specified in this build 2322 cfile_getter = control_file_lib._initialize_control_file_getter(build) 2323 if suite_common.ENABLE_CONTROLS_IN_BATCH: 2324 control_file_info_list = cfile_getter.get_suite_info() 2325 control_file_list = control_file_info_list.keys() 2326 else: 2327 control_file_list = cfile_getter.get_control_file_list() 2328 2329 test_objects = [] 2330 _id = 0 2331 for control_file_path in control_file_list: 2332 # Read and parse the control file 2333 if suite_common.ENABLE_CONTROLS_IN_BATCH: 2334 control_file = control_file_info_list[control_file_path] 2335 else: 2336 control_file = cfile_getter.get_control_file_contents( 2337 control_file_path) 2338 try: 2339 control_obj = control_data.parse_control_string(control_file) 2340 except: 2341 logging.info('Failed to parse control file: %s', control_file_path) 2342 if not ignore_invalid_tests: 2343 raise 2344 2345 # Extract the values needed for the AFE from the control_obj. 2346 # The keys list represents attributes in the control_obj that 2347 # are required by the AFE 2348 keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental', 2349 'test_category', 'test_class', 'dependencies', 'run_verify', 2350 'sync_count', 'job_retries', 'path'] 2351 2352 test_object = {} 2353 for key in keys: 2354 test_object[key] = getattr(control_obj, key) if hasattr( 2355 control_obj, key) else '' 2356 2357 # Unfortunately, the AFE expects different key-names for certain 2358 # values, these must be corrected to avoid the risk of tests 2359 # being omitted by the AFE. 2360 # The 'id' is an additional value used in the AFE. 2361 # The control_data parsing does not reference 'run_reset', but it 2362 # is also used in the AFE and defaults to True. 2363 test_object['id'] = _id 2364 test_object['run_reset'] = True 2365 test_object['description'] = test_object.get('doc', '') 2366 test_object['test_time'] = test_object.get('time', 0) 2367 2368 # TODO(crbug.com/873716) DEPRECATED. Remove entirely. 2369 test_object['test_retry'] = 0 2370 2371 # Fix the test name to be consistent with the current presentation 2372 # of test names in the AFE. 2373 testpath, subname = os.path.split(control_file_path) 2374 testname = os.path.basename(testpath) 2375 subname = subname.split('.')[1:] 2376 if subname: 2377 testname = '%s:%s' % (testname, ':'.join(subname)) 2378 2379 test_object['name'] = testname 2380 2381 # Correct the test path as parse_control_string sets an empty string. 2382 test_object['path'] = control_file_path 2383 2384 _id += 1 2385 test_objects.append(test_object) 2386 2387 test_objects = sorted(test_objects, key=lambda x: x.get('name')) 2388 return rpc_utils.prepare_for_serialization(test_objects) 2389 2390 2391@rpc_utils.route_rpc_to_main 2392def get_lab_health_indicators(board=None): 2393 """Get the healthy indicators for whole lab. 2394 2395 The indicators now includes: 2396 1. lab is closed or not. 2397 2. Available DUTs list for a given board. 2398 3. Devserver capacity. 2399 4. When is the next major DUT utilization (e.g. CQ is coming in 3 minutes). 2400 2401 @param board: if board is specified, a list of available DUTs will be 2402 returned for it. Otherwise, skip this indicator. 2403 2404 @returns: A healthy indicator object including health info. 2405 """ 2406 return LabHealthIndicator(None, None, None, None) 2407