1# pylint: disable=missing-docstring 2 3"""\ 4Functions to expose over the RPC interface. 5 6For all modify* and delete* functions that ask for an 'id' parameter to 7identify the object to operate on, the id may be either 8 * the database row ID 9 * the name of the object (label name, hostname, user login, etc.) 10 * a dictionary containing uniquely identifying field (this option should seldom 11 be used) 12 13When specifying foreign key fields (i.e. adding hosts to a label, or adding 14users to an ACL group), the given value may be either the database row ID or the 15name of the object. 16 17All get* functions return lists of dictionaries. Each dictionary represents one 18object and maps field names to values. 19 20Some examples: 21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2 22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2' 23modify_test('sleeptest', test_type='Client', params=', seconds=60') 24delete_acl_group(1) # delete by ID 25delete_acl_group('Everyone') # delete by name 26acl_group_add_users('Everyone', ['mbligh', 'showard']) 27get_jobs(owner='showard', status='Queued') 28 29See doctests/001_rpc_test.txt for (lots) more examples. 30""" 31 32__author__ = 'showard@google.com (Steve Howard)' 33 34import ast 35import datetime 36import logging 37import os 38import sys 39 40from django.db import connection as db_connection 41from django.db import transaction 42from django.db.models import Count 43from django.db.utils import DatabaseError 44 45import common 46from autotest_lib.client.common_lib import control_data 47from autotest_lib.client.common_lib import error 48from autotest_lib.client.common_lib import global_config 49from autotest_lib.client.common_lib import priorities 50from autotest_lib.client.common_lib import time_utils 51from autotest_lib.client.common_lib.cros import dev_server 52from autotest_lib.frontend.afe import control_file as control_file_lib 53from autotest_lib.frontend.afe import model_attributes 54from autotest_lib.frontend.afe import model_logic 55from autotest_lib.frontend.afe import models 56from autotest_lib.frontend.afe import rpc_utils 57from autotest_lib.frontend.tko import models as tko_models 58from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface 59from autotest_lib.server import frontend 60from autotest_lib.server import utils 61from autotest_lib.server.cros import provision 62from autotest_lib.server.cros.dynamic_suite import constants 63from autotest_lib.server.cros.dynamic_suite import control_file_getter 64from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase 65from autotest_lib.server.cros.dynamic_suite import tools 66from autotest_lib.server.cros.dynamic_suite.suite import Suite 67from autotest_lib.server.lib import status_history 68from autotest_lib.site_utils import job_history 69from autotest_lib.site_utils import server_manager_utils 70from autotest_lib.site_utils import stable_version_utils 71 72 73_CONFIG = global_config.global_config 74 75# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py. 76 77# labels 78 79def modify_label(id, **data): 80 """Modify a label. 81 82 @param id: id or name of a label. More often a label name. 83 @param data: New data for a label. 84 """ 85 label_model = models.Label.smart_get(id) 86 label_model.update_object(data) 87 88 # Master forwards the RPC to shards 89 if not utils.is_shard(): 90 rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False, 91 id=id, **data) 92 93 94def delete_label(id): 95 """Delete a label. 96 97 @param id: id or name of a label. More often a label name. 98 """ 99 label_model = models.Label.smart_get(id) 100 # Hosts that have the label to be deleted. Save this info before 101 # the label is deleted to use it later. 102 hosts = [] 103 for h in label_model.host_set.all(): 104 hosts.append(models.Host.smart_get(h.id)) 105 label_model.delete() 106 107 # Master forwards the RPC to shards 108 if not utils.is_shard(): 109 rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id) 110 111 112def add_label(name, ignore_exception_if_exists=False, **kwargs): 113 """Adds a new label of a given name. 114 115 @param name: label name. 116 @param ignore_exception_if_exists: If True and the exception was 117 thrown due to the duplicated label name when adding a label, 118 then suppress the exception. Default is False. 119 @param kwargs: keyword args that store more info about a label 120 other than the name. 121 @return: int/long id of a new label. 122 """ 123 # models.Label.add_object() throws model_logic.ValidationError 124 # when it is given a label name that already exists. 125 # However, ValidationError can be thrown with different errors, 126 # and those errors should be thrown up to the call chain. 127 try: 128 label = models.Label.add_object(name=name, **kwargs) 129 except: 130 exc_info = sys.exc_info() 131 if ignore_exception_if_exists: 132 label = rpc_utils.get_label(name) 133 # If the exception is raised not because of duplicated 134 # "name", then raise the original exception. 135 if label is None: 136 raise exc_info[0], exc_info[1], exc_info[2] 137 else: 138 raise exc_info[0], exc_info[1], exc_info[2] 139 return label.id 140 141 142def add_label_to_hosts(id, hosts): 143 """Adds a label of the given id to the given hosts only in local DB. 144 145 @param id: id or name of a label. More often a label name. 146 @param hosts: The hostnames of hosts that need the label. 147 148 @raises models.Label.DoesNotExist: If the label with id doesn't exist. 149 """ 150 label = models.Label.smart_get(id) 151 host_objs = models.Host.smart_get_bulk(hosts) 152 if label.platform: 153 models.Host.check_no_platform(host_objs) 154 # Ensure a host has no more than one board label with it. 155 if label.name.startswith('board:'): 156 models.Host.check_board_labels_allowed(host_objs, [label.name]) 157 label.host_set.add(*host_objs) 158 159 160def _create_label_everywhere(id, hosts): 161 """ 162 Yet another method to create labels. 163 164 ALERT! This method should be run only on master not shards! 165 DO NOT RUN THIS ON A SHARD!!! Deputies will hate you if you do!!! 166 167 This method exists primarily to serve label_add_hosts() and 168 host_add_labels(). Basically it pulls out the label check/add logic 169 from label_add_hosts() into this nice method that not only creates 170 the label but also tells the shards that service the hosts to also 171 create the label. 172 173 @param id: id or name of a label. More often a label name. 174 @param hosts: A list of hostnames or ids. More often hostnames. 175 """ 176 try: 177 label = models.Label.smart_get(id) 178 except models.Label.DoesNotExist: 179 # This matches the type checks in smart_get, which is a hack 180 # in and off itself. The aim here is to create any non-existent 181 # label, which we cannot do if the 'id' specified isn't a label name. 182 if isinstance(id, basestring): 183 label = models.Label.smart_get(add_label(id)) 184 else: 185 raise ValueError('Label id (%s) does not exist. Please specify ' 186 'the argument, id, as a string (label name).' 187 % id) 188 189 # Make sure the label exists on the shard with the same id 190 # as it is on the master. 191 # It is possible that the label is already in a shard because 192 # we are adding a new label only to shards of hosts that the label 193 # is going to be attached. 194 # For example, we add a label L1 to a host in shard S1. 195 # Master and S1 will have L1 but other shards won't. 196 # Later, when we add the same label L1 to hosts in shards S1 and S2, 197 # S1 already has the label but S2 doesn't. 198 # S2 should have the new label without any problem. 199 # We ignore exception in such a case. 200 host_objs = models.Host.smart_get_bulk(hosts) 201 rpc_utils.fanout_rpc( 202 host_objs, 'add_label', include_hostnames=False, 203 name=label.name, ignore_exception_if_exists=True, 204 id=label.id, platform=label.platform) 205 206 207@rpc_utils.route_rpc_to_master 208def label_add_hosts(id, hosts): 209 """Adds a label with the given id to the given hosts. 210 211 This method should be run only on master not shards. 212 The given label will be created if it doesn't exist, provided the `id` 213 supplied is a label name not an int/long id. 214 215 @param id: id or name of a label. More often a label name. 216 @param hosts: A list of hostnames or ids. More often hostnames. 217 218 @raises ValueError: If the id specified is an int/long (label id) 219 while the label does not exist. 220 """ 221 # Create the label. 222 _create_label_everywhere(id, hosts) 223 224 # Add it to the master. 225 add_label_to_hosts(id, hosts) 226 227 # Add it to the shards. 228 host_objs = models.Host.smart_get_bulk(hosts) 229 rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id) 230 231 232def remove_label_from_hosts(id, hosts): 233 """Removes a label of the given id from the given hosts only in local DB. 234 235 @param id: id or name of a label. 236 @param hosts: The hostnames of hosts that need to remove the label from. 237 """ 238 host_objs = models.Host.smart_get_bulk(hosts) 239 models.Label.smart_get(id).host_set.remove(*host_objs) 240 241 242@rpc_utils.route_rpc_to_master 243def label_remove_hosts(id, hosts): 244 """Removes a label of the given id from the given hosts. 245 246 This method should be run only on master not shards. 247 248 @param id: id or name of a label. 249 @param hosts: A list of hostnames or ids. More often hostnames. 250 """ 251 host_objs = models.Host.smart_get_bulk(hosts) 252 remove_label_from_hosts(id, hosts) 253 254 rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id) 255 256 257def get_labels(exclude_filters=(), **filter_data): 258 """\ 259 @param exclude_filters: A sequence of dictionaries of filters. 260 261 @returns A sequence of nested dictionaries of label information. 262 """ 263 labels = models.Label.query_objects(filter_data) 264 for exclude_filter in exclude_filters: 265 labels = labels.exclude(**exclude_filter) 266 return rpc_utils.prepare_rows_as_nested_dicts(labels, ()) 267 268 269# hosts 270 271def add_host(hostname, status=None, locked=None, lock_reason='', protection=None): 272 if locked and not lock_reason: 273 raise model_logic.ValidationError( 274 {'locked': 'Please provide a reason for locking when adding host.'}) 275 276 return models.Host.add_object(hostname=hostname, status=status, 277 locked=locked, lock_reason=lock_reason, 278 protection=protection).id 279 280 281@rpc_utils.route_rpc_to_master 282def modify_host(id, **kwargs): 283 """Modify local attributes of a host. 284 285 If this is called on the master, but the host is assigned to a shard, this 286 will call `modify_host_local` RPC to the responsible shard. This means if 287 a host is being locked using this function, this change will also propagate 288 to shards. 289 When this is called on a shard, the shard just routes the RPC to the master 290 and does nothing. 291 292 @param id: id of the host to modify. 293 @param kwargs: key=value pairs of values to set on the host. 294 """ 295 rpc_utils.check_modify_host(kwargs) 296 host = models.Host.smart_get(id) 297 try: 298 rpc_utils.check_modify_host_locking(host, kwargs) 299 except model_logic.ValidationError as e: 300 if not kwargs.get('force_modify_locking', False): 301 raise 302 logging.exception('The following exception will be ignored and lock ' 303 'modification will be enforced. %s', e) 304 305 # This is required to make `lock_time` for a host be exactly same 306 # between the master and a shard. 307 if kwargs.get('locked', None) and 'lock_time' not in kwargs: 308 kwargs['lock_time'] = datetime.datetime.now() 309 host.update_object(kwargs) 310 311 # force_modifying_locking is not an internal field in database, remove. 312 kwargs.pop('force_modify_locking', None) 313 rpc_utils.fanout_rpc([host], 'modify_host_local', 314 include_hostnames=False, id=id, **kwargs) 315 316 317def modify_host_local(id, **kwargs): 318 """Modify host attributes in local DB. 319 320 @param id: Host id. 321 @param kwargs: key=value pairs of values to set on the host. 322 """ 323 models.Host.smart_get(id).update_object(kwargs) 324 325 326@rpc_utils.route_rpc_to_master 327def modify_hosts(host_filter_data, update_data): 328 """Modify local attributes of multiple hosts. 329 330 If this is called on the master, but one of the hosts in that match the 331 filters is assigned to a shard, this will call `modify_hosts_local` RPC 332 to the responsible shard. 333 When this is called on a shard, the shard just routes the RPC to the master 334 and does nothing. 335 336 The filters are always applied on the master, not on the shards. This means 337 if the states of a host differ on the master and a shard, the state on the 338 master will be used. I.e. this means: 339 A host was synced to Shard 1. On Shard 1 the status of the host was set to 340 'Repair Failed'. 341 - A call to modify_hosts with host_filter_data={'status': 'Ready'} will 342 update the host (both on the shard and on the master), because the state 343 of the host as the master knows it is still 'Ready'. 344 - A call to modify_hosts with host_filter_data={'status': 'Repair failed' 345 will not update the host, because the filter doesn't apply on the master. 346 347 @param host_filter_data: Filters out which hosts to modify. 348 @param update_data: A dictionary with the changes to make to the hosts. 349 """ 350 update_data = update_data.copy() 351 rpc_utils.check_modify_host(update_data) 352 hosts = models.Host.query_objects(host_filter_data) 353 354 affected_shard_hostnames = set() 355 affected_host_ids = [] 356 357 # Check all hosts before changing data for exception safety. 358 for host in hosts: 359 try: 360 rpc_utils.check_modify_host_locking(host, update_data) 361 except model_logic.ValidationError as e: 362 if not update_data.get('force_modify_locking', False): 363 raise 364 logging.exception('The following exception will be ignored and ' 365 'lock modification will be enforced. %s', e) 366 367 if host.shard: 368 affected_shard_hostnames.add(host.shard.rpc_hostname()) 369 affected_host_ids.append(host.id) 370 371 # This is required to make `lock_time` for a host be exactly same 372 # between the master and a shard. 373 if update_data.get('locked', None) and 'lock_time' not in update_data: 374 update_data['lock_time'] = datetime.datetime.now() 375 for host in hosts: 376 host.update_object(update_data) 377 378 update_data.pop('force_modify_locking', None) 379 # Caution: Changing the filter from the original here. See docstring. 380 rpc_utils.run_rpc_on_multiple_hostnames( 381 'modify_hosts_local', affected_shard_hostnames, 382 host_filter_data={'id__in': affected_host_ids}, 383 update_data=update_data) 384 385 386def modify_hosts_local(host_filter_data, update_data): 387 """Modify attributes of hosts in local DB. 388 389 @param host_filter_data: Filters out which hosts to modify. 390 @param update_data: A dictionary with the changes to make to the hosts. 391 """ 392 for host in models.Host.query_objects(host_filter_data): 393 host.update_object(update_data) 394 395 396def add_labels_to_host(id, labels): 397 """Adds labels to a given host only in local DB. 398 399 @param id: id or hostname for a host. 400 @param labels: ids or names for labels. 401 """ 402 label_objs = models.Label.smart_get_bulk(labels) 403 models.Host.smart_get(id).labels.add(*label_objs) 404 405 406@rpc_utils.route_rpc_to_master 407def host_add_labels(id, labels): 408 """Adds labels to a given host. 409 410 @param id: id or hostname for a host. 411 @param labels: ids or names for labels. 412 413 @raises ValidationError: If adding more than one platform/board label. 414 """ 415 # Create the labels on the master/shards. 416 for label in labels: 417 _create_label_everywhere(label, [id]) 418 419 label_objs = models.Label.smart_get_bulk(labels) 420 platforms = [label.name for label in label_objs if label.platform] 421 boards = [label.name for label in label_objs 422 if label.name.startswith('board:')] 423 if len(platforms) > 1 or not utils.board_labels_allowed(boards): 424 raise model_logic.ValidationError( 425 {'labels': ('Adding more than one platform label, or a list of ' 426 'non-compatible board labels.: %s %s' % 427 (', '.join(platforms), ', '.join(boards)))}) 428 429 host_obj = models.Host.smart_get(id) 430 if platforms: 431 models.Host.check_no_platform([host_obj]) 432 if boards: 433 models.Host.check_board_labels_allowed([host_obj], labels) 434 add_labels_to_host(id, labels) 435 436 rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False, 437 id=id, labels=labels) 438 439 440def remove_labels_from_host(id, labels): 441 """Removes labels from a given host only in local DB. 442 443 @param id: id or hostname for a host. 444 @param labels: ids or names for labels. 445 """ 446 label_objs = models.Label.smart_get_bulk(labels) 447 models.Host.smart_get(id).labels.remove(*label_objs) 448 449 450@rpc_utils.route_rpc_to_master 451def host_remove_labels(id, labels): 452 """Removes labels from a given host. 453 454 @param id: id or hostname for a host. 455 @param labels: ids or names for labels. 456 """ 457 remove_labels_from_host(id, labels) 458 459 host_obj = models.Host.smart_get(id) 460 rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False, 461 id=id, labels=labels) 462 463 464def get_host_attribute(attribute, **host_filter_data): 465 """ 466 @param attribute: string name of attribute 467 @param host_filter_data: filter data to apply to Hosts to choose hosts to 468 act upon 469 """ 470 hosts = rpc_utils.get_host_query((), False, True, host_filter_data) 471 hosts = list(hosts) 472 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 473 'attribute_list') 474 host_attr_dicts = [] 475 for host_obj in hosts: 476 for attr_obj in host_obj.attribute_list: 477 if attr_obj.attribute == attribute: 478 host_attr_dicts.append(attr_obj.get_object_dict()) 479 return rpc_utils.prepare_for_serialization(host_attr_dicts) 480 481 482def set_host_attribute(attribute, value, **host_filter_data): 483 """ 484 @param attribute: string name of attribute 485 @param value: string, or None to delete an attribute 486 @param host_filter_data: filter data to apply to Hosts to choose hosts to 487 act upon 488 """ 489 assert host_filter_data # disallow accidental actions on all hosts 490 hosts = models.Host.query_objects(host_filter_data) 491 models.AclGroup.check_for_acl_violation_hosts(hosts) 492 for host in hosts: 493 host.set_or_delete_attribute(attribute, value) 494 495 # Master forwards this RPC to shards. 496 if not utils.is_shard(): 497 rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False, 498 attribute=attribute, value=value, **host_filter_data) 499 500 501@rpc_utils.forward_single_host_rpc_to_shard 502def delete_host(id): 503 models.Host.smart_get(id).delete() 504 505 506def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 507 valid_only=True, include_current_job=False, **filter_data): 508 """Get a list of dictionaries which contains the information of hosts. 509 510 @param multiple_labels: match hosts in all of the labels given. Should 511 be a list of label names. 512 @param exclude_only_if_needed_labels: Exclude hosts with at least one 513 "only_if_needed" label applied. 514 @param include_current_job: Set to True to include ids of currently running 515 job and special task. 516 """ 517 hosts = rpc_utils.get_host_query(multiple_labels, 518 exclude_only_if_needed_labels, 519 valid_only, filter_data) 520 hosts = list(hosts) 521 models.Host.objects.populate_relationships(hosts, models.Label, 522 'label_list') 523 models.Host.objects.populate_relationships(hosts, models.AclGroup, 524 'acl_list') 525 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 526 'attribute_list') 527 host_dicts = [] 528 for host_obj in hosts: 529 host_dict = host_obj.get_object_dict() 530 host_dict['labels'] = [label.name for label in host_obj.label_list] 531 host_dict['platform'] = rpc_utils.find_platform(host_obj) 532 host_dict['acls'] = [acl.name for acl in host_obj.acl_list] 533 host_dict['attributes'] = dict((attribute.attribute, attribute.value) 534 for attribute in host_obj.attribute_list) 535 if include_current_job: 536 host_dict['current_job'] = None 537 host_dict['current_special_task'] = None 538 entries = models.HostQueueEntry.objects.filter( 539 host_id=host_dict['id'], active=True, complete=False) 540 if entries: 541 host_dict['current_job'] = ( 542 entries[0].get_object_dict()['job']) 543 tasks = models.SpecialTask.objects.filter( 544 host_id=host_dict['id'], is_active=True, is_complete=False) 545 if tasks: 546 host_dict['current_special_task'] = ( 547 '%d-%s' % (tasks[0].get_object_dict()['id'], 548 tasks[0].get_object_dict()['task'].lower())) 549 host_dicts.append(host_dict) 550 return rpc_utils.prepare_for_serialization(host_dicts) 551 552 553def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 554 valid_only=True, **filter_data): 555 """ 556 Same parameters as get_hosts(). 557 558 @returns The number of matching hosts. 559 """ 560 hosts = rpc_utils.get_host_query(multiple_labels, 561 exclude_only_if_needed_labels, 562 valid_only, filter_data) 563 return hosts.count() 564 565 566# tests 567 568def get_tests(**filter_data): 569 return rpc_utils.prepare_for_serialization( 570 models.Test.list_objects(filter_data)) 571 572 573def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name): 574 """Gets the counts of all passed and failed tests from the matching jobs. 575 576 @param job_name_prefix: Name prefix of the jobs to get the summary 577 from, e.g., 'butterfly-release/r40-6457.21.0/bvt-cq/'. Prefix 578 matching is case insensitive. 579 @param label_name: Label that must be set in the jobs, e.g., 580 'cros-version:butterfly-release/R40-6457.21.0'. 581 582 @returns A summary of the counts of all the passed and failed tests. 583 """ 584 job_ids = list(models.Job.objects.filter( 585 name__istartswith=job_name_prefix, 586 dependency_labels__name=label_name).values_list( 587 'pk', flat=True)) 588 summary = {'passed': 0, 'failed': 0} 589 if not job_ids: 590 return summary 591 592 counts = (tko_models.TestView.objects.filter( 593 afe_job_id__in=job_ids).exclude( 594 test_name='SERVER_JOB').exclude( 595 test_name__startswith='CLIENT_JOB').values( 596 'status').annotate( 597 count=Count('status'))) 598 for status in counts: 599 if status['status'] == 'GOOD': 600 summary['passed'] += status['count'] 601 else: 602 summary['failed'] += status['count'] 603 return summary 604 605 606# profilers 607 608def add_profiler(name, description=None): 609 return models.Profiler.add_object(name=name, description=description).id 610 611 612def modify_profiler(id, **data): 613 models.Profiler.smart_get(id).update_object(data) 614 615 616def delete_profiler(id): 617 models.Profiler.smart_get(id).delete() 618 619 620def get_profilers(**filter_data): 621 return rpc_utils.prepare_for_serialization( 622 models.Profiler.list_objects(filter_data)) 623 624 625# users 626 627def get_users(**filter_data): 628 return rpc_utils.prepare_for_serialization( 629 models.User.list_objects(filter_data)) 630 631 632# acl groups 633 634def add_acl_group(name, description=None): 635 group = models.AclGroup.add_object(name=name, description=description) 636 group.users.add(models.User.current_user()) 637 return group.id 638 639 640def modify_acl_group(id, **data): 641 group = models.AclGroup.smart_get(id) 642 group.check_for_acl_violation_acl_group() 643 group.update_object(data) 644 group.add_current_user_if_empty() 645 646 647def acl_group_add_users(id, users): 648 group = models.AclGroup.smart_get(id) 649 group.check_for_acl_violation_acl_group() 650 users = models.User.smart_get_bulk(users) 651 group.users.add(*users) 652 653 654def acl_group_remove_users(id, users): 655 group = models.AclGroup.smart_get(id) 656 group.check_for_acl_violation_acl_group() 657 users = models.User.smart_get_bulk(users) 658 group.users.remove(*users) 659 group.add_current_user_if_empty() 660 661 662def acl_group_add_hosts(id, hosts): 663 group = models.AclGroup.smart_get(id) 664 group.check_for_acl_violation_acl_group() 665 hosts = models.Host.smart_get_bulk(hosts) 666 group.hosts.add(*hosts) 667 group.on_host_membership_change() 668 669 670def acl_group_remove_hosts(id, hosts): 671 group = models.AclGroup.smart_get(id) 672 group.check_for_acl_violation_acl_group() 673 hosts = models.Host.smart_get_bulk(hosts) 674 group.hosts.remove(*hosts) 675 group.on_host_membership_change() 676 677 678def delete_acl_group(id): 679 models.AclGroup.smart_get(id).delete() 680 681 682def get_acl_groups(**filter_data): 683 acl_groups = models.AclGroup.list_objects(filter_data) 684 for acl_group in acl_groups: 685 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id']) 686 acl_group['users'] = [user.login 687 for user in acl_group_obj.users.all()] 688 acl_group['hosts'] = [host.hostname 689 for host in acl_group_obj.hosts.all()] 690 return rpc_utils.prepare_for_serialization(acl_groups) 691 692 693# jobs 694 695def generate_control_file(tests=(), profilers=(), 696 client_control_file='', use_container=False, 697 profile_only=None, db_tests=True, 698 test_source_build=None): 699 """ 700 Generates a client-side control file to run tests. 701 702 @param tests List of tests to run. See db_tests for more information. 703 @param profilers List of profilers to activate during the job. 704 @param client_control_file The contents of a client-side control file to 705 run at the end of all tests. If this is supplied, all tests must be 706 client side. 707 TODO: in the future we should support server control files directly 708 to wrap with a kernel. That'll require changing the parameter 709 name and adding a boolean to indicate if it is a client or server 710 control file. 711 @param use_container unused argument today. TODO: Enable containers 712 on the host during a client side test. 713 @param profile_only A boolean that indicates what default profile_only 714 mode to use in the control file. Passing None will generate a 715 control file that does not explcitly set the default mode at all. 716 @param db_tests: if True, the test object can be found in the database 717 backing the test model. In this case, tests is a tuple 718 of test IDs which are used to retrieve the test objects 719 from the database. If False, tests is a tuple of test 720 dictionaries stored client-side in the AFE. 721 @param test_source_build: Build to be used to retrieve test code. Default 722 to None. 723 724 @returns a dict with the following keys: 725 control_file: str, The control file text. 726 is_server: bool, is the control file a server-side control file? 727 synch_count: How many machines the job uses per autoserv execution. 728 synch_count == 1 means the job is asynchronous. 729 dependencies: A list of the names of labels on which the job depends. 730 """ 731 if not tests and not client_control_file: 732 return dict(control_file='', is_server=False, synch_count=1, 733 dependencies=[]) 734 735 cf_info, test_objects, profiler_objects = ( 736 rpc_utils.prepare_generate_control_file(tests, profilers, 737 db_tests)) 738 cf_info['control_file'] = control_file_lib.generate_control( 739 tests=test_objects, profilers=profiler_objects, 740 is_server=cf_info['is_server'], 741 client_control_file=client_control_file, profile_only=profile_only, 742 test_source_build=test_source_build) 743 return cf_info 744 745 746def create_job_page_handler(name, priority, control_file, control_type, 747 image=None, hostless=False, firmware_rw_build=None, 748 firmware_ro_build=None, test_source_build=None, 749 is_cloning=False, **kwargs): 750 """\ 751 Create and enqueue a job. 752 753 @param name name of this job 754 @param priority Integer priority of this job. Higher is more important. 755 @param control_file String contents of the control file. 756 @param control_type Type of control file, Client or Server. 757 @param image: ChromeOS build to be installed in the dut. Default to None. 758 @param firmware_rw_build: Firmware build to update RW firmware. Default to 759 None, i.e., RW firmware will not be updated. 760 @param firmware_ro_build: Firmware build to update RO firmware. Default to 761 None, i.e., RO firmware will not be updated. 762 @param test_source_build: Build to be used to retrieve test code. Default 763 to None. 764 @param is_cloning: True if creating a cloning job. 765 @param kwargs extra args that will be required by create_suite_job or 766 create_job. 767 768 @returns The created Job id number. 769 """ 770 if is_cloning: 771 logging.info('Start to clone a new job') 772 # When cloning a job, hosts and meta_hosts should not exist together, 773 # which would cause host-scheduler to schedule two hqe jobs to one host 774 # at the same time, and crash itself. Clear meta_hosts for this case. 775 if kwargs.get('hosts') and kwargs.get('meta_hosts'): 776 kwargs['meta_hosts'] = [] 777 else: 778 logging.info('Start to create a new job') 779 control_file = rpc_utils.encode_ascii(control_file) 780 if not control_file: 781 raise model_logic.ValidationError({ 782 'control_file' : "Control file cannot be empty"}) 783 784 if image and hostless: 785 builds = {} 786 builds[provision.CROS_VERSION_PREFIX] = image 787 if firmware_rw_build: 788 builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build 789 if firmware_ro_build: 790 builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build 791 return create_suite_job( 792 name=name, control_file=control_file, priority=priority, 793 builds=builds, test_source_build=test_source_build, 794 is_cloning=is_cloning, **kwargs) 795 return create_job(name, priority, control_file, control_type, image=image, 796 hostless=hostless, **kwargs) 797 798 799@rpc_utils.route_rpc_to_master 800def create_job( 801 name, 802 priority, 803 control_file, 804 control_type, 805 hosts=(), 806 meta_hosts=(), 807 one_time_hosts=(), 808 synch_count=None, 809 is_template=False, 810 timeout=None, 811 timeout_mins=None, 812 max_runtime_mins=None, 813 run_verify=False, 814 email_list='', 815 dependencies=(), 816 reboot_before=None, 817 reboot_after=None, 818 parse_failed_repair=None, 819 hostless=False, 820 keyvals=None, 821 drone_set=None, 822 image=None, 823 parent_job_id=None, 824 test_retry=0, 825 run_reset=True, 826 require_ssp=None, 827 args=(), 828 **kwargs): 829 """\ 830 Create and enqueue a job. 831 832 @param name name of this job 833 @param priority Integer priority of this job. Higher is more important. 834 @param control_file String contents of the control file. 835 @param control_type Type of control file, Client or Server. 836 @param synch_count How many machines the job uses per autoserv execution. 837 synch_count == 1 means the job is asynchronous. If an atomic group is 838 given this value is treated as a minimum. 839 @param is_template If true then create a template job. 840 @param timeout Hours after this call returns until the job times out. 841 @param timeout_mins Minutes after this call returns until the job times 842 out. 843 @param max_runtime_mins Minutes from job starting time until job times out 844 @param run_verify Should the host be verified before running the test? 845 @param email_list String containing emails to mail when the job is done 846 @param dependencies List of label names on which this job depends 847 @param reboot_before Never, If dirty, or Always 848 @param reboot_after Never, If all tests passed, or Always 849 @param parse_failed_repair if true, results of failed repairs launched by 850 this job will be parsed as part of the job. 851 @param hostless if true, create a hostless job 852 @param keyvals dict of keyvals to associate with the job 853 @param hosts List of hosts to run job on. 854 @param meta_hosts List where each entry is a label name, and for each entry 855 one host will be chosen from that label to run the job on. 856 @param one_time_hosts List of hosts not in the database to run the job on. 857 @param drone_set The name of the drone set to run this test on. 858 @param image OS image to install before running job. 859 @param parent_job_id id of a job considered to be parent of created job. 860 @param test_retry Number of times to retry test if the test did not 861 complete successfully. (optional, default: 0) 862 @param run_reset Should the host be reset before running the test? 863 @param require_ssp Set to True to require server-side packaging to run the 864 test. If it's set to None, drone will still try to run 865 the server side with server-side packaging. If the 866 autotest-server package doesn't exist for the build or 867 image is not set, drone will run the test without server- 868 side packaging. Default is None. 869 @param args A list of args to be injected into control file. 870 @param kwargs extra keyword args. NOT USED. 871 872 @returns The created Job id number. 873 """ 874 if args: 875 control_file = tools.inject_vars({'args': args}, control_file) 876 if image: 877 dependencies += (provision.image_version_to_label(image),) 878 return rpc_utils.create_job_common( 879 name=name, 880 priority=priority, 881 control_type=control_type, 882 control_file=control_file, 883 hosts=hosts, 884 meta_hosts=meta_hosts, 885 one_time_hosts=one_time_hosts, 886 synch_count=synch_count, 887 is_template=is_template, 888 timeout=timeout, 889 timeout_mins=timeout_mins, 890 max_runtime_mins=max_runtime_mins, 891 run_verify=run_verify, 892 email_list=email_list, 893 dependencies=dependencies, 894 reboot_before=reboot_before, 895 reboot_after=reboot_after, 896 parse_failed_repair=parse_failed_repair, 897 hostless=hostless, 898 keyvals=keyvals, 899 drone_set=drone_set, 900 parent_job_id=parent_job_id, 901 test_retry=test_retry, 902 run_reset=run_reset, 903 require_ssp=require_ssp) 904 905 906def abort_host_queue_entries(**filter_data): 907 """\ 908 Abort a set of host queue entries. 909 910 @return: A list of dictionaries, each contains information 911 about an aborted HQE. 912 """ 913 query = models.HostQueueEntry.query_objects(filter_data) 914 915 # Dont allow aborts on: 916 # 1. Jobs that have already completed (whether or not they were aborted) 917 # 2. Jobs that we have already been aborted (but may not have completed) 918 query = query.filter(complete=False).filter(aborted=False) 919 models.AclGroup.check_abort_permissions(query) 920 host_queue_entries = list(query.select_related()) 921 rpc_utils.check_abort_synchronous_jobs(host_queue_entries) 922 923 models.HostQueueEntry.abort_host_queue_entries(host_queue_entries) 924 hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id, 925 'Job name': hqe.job.name} for hqe in host_queue_entries] 926 return hqe_info 927 928 929def abort_special_tasks(**filter_data): 930 """\ 931 Abort the special task, or tasks, specified in the filter. 932 """ 933 query = models.SpecialTask.query_objects(filter_data) 934 special_tasks = query.filter(is_active=True) 935 for task in special_tasks: 936 task.abort() 937 938 939def _call_special_tasks_on_hosts(task, hosts): 940 """\ 941 Schedules a set of hosts for a special task. 942 943 @returns A list of hostnames that a special task was created for. 944 """ 945 models.AclGroup.check_for_acl_violation_hosts(hosts) 946 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 947 if shard_host_map and not utils.is_shard(): 948 raise ValueError('The following hosts are on shards, please ' 949 'follow the link to the shards and create jobs ' 950 'there instead. %s.' % shard_host_map) 951 for host in hosts: 952 models.SpecialTask.schedule_special_task(host, task) 953 return list(sorted(host.hostname for host in hosts)) 954 955 956def _forward_special_tasks_on_hosts(task, rpc, **filter_data): 957 """Forward special tasks to corresponding shards. 958 959 For master, when special tasks are fired on hosts that are sharded, 960 forward the RPC to corresponding shards. 961 962 For shard, create special task records in local DB. 963 964 @param task: Enum value of frontend.afe.models.SpecialTask.Task 965 @param rpc: RPC name to forward. 966 @param filter_data: Filter keywords to be used for DB query. 967 968 @return: A list of hostnames that a special task was created for. 969 """ 970 hosts = models.Host.query_objects(filter_data) 971 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True) 972 973 # Filter out hosts on a shard from those on the master, forward 974 # rpcs to the shard with an additional hostname__in filter, and 975 # create a local SpecialTask for each remaining host. 976 if shard_host_map and not utils.is_shard(): 977 hosts = [h for h in hosts if h.shard is None] 978 for shard, hostnames in shard_host_map.iteritems(): 979 980 # The main client of this module is the frontend website, and 981 # it invokes it with an 'id' or an 'id__in' filter. Regardless, 982 # the 'hostname' filter should narrow down the list of hosts on 983 # each shard even though we supply all the ids in filter_data. 984 # This method uses hostname instead of id because it fits better 985 # with the overall architecture of redirection functions in 986 # rpc_utils. 987 shard_filter = filter_data.copy() 988 shard_filter['hostname__in'] = hostnames 989 rpc_utils.run_rpc_on_multiple_hostnames( 990 rpc, [shard], **shard_filter) 991 992 # There is a race condition here if someone assigns a shard to one of these 993 # hosts before we create the task. The host will stay on the master if: 994 # 1. The host is not Ready 995 # 2. The host is Ready but has a task 996 # But if the host is Ready and doesn't have a task yet, it will get sent 997 # to the shard as we're creating a task here. 998 999 # Given that we only rarely verify Ready hosts it isn't worth putting this 1000 # entire method in a transaction. The worst case scenario is that we have 1001 # a verify running on a Ready host while the shard is using it, if the 1002 # verify fails no subsequent tasks will be created against the host on the 1003 # master, and verifies are safe enough that this is OK. 1004 return _call_special_tasks_on_hosts(task, hosts) 1005 1006 1007def reverify_hosts(**filter_data): 1008 """\ 1009 Schedules a set of hosts for verify. 1010 1011 @returns A list of hostnames that a verify task was created for. 1012 """ 1013 return _forward_special_tasks_on_hosts( 1014 models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data) 1015 1016 1017def repair_hosts(**filter_data): 1018 """\ 1019 Schedules a set of hosts for repair. 1020 1021 @returns A list of hostnames that a repair task was created for. 1022 """ 1023 return _forward_special_tasks_on_hosts( 1024 models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data) 1025 1026 1027def get_jobs(not_yet_run=False, running=False, finished=False, 1028 suite=False, sub=False, standalone=False, **filter_data): 1029 """\ 1030 Extra status filter args for get_jobs: 1031 -not_yet_run: Include only jobs that have not yet started running. 1032 -running: Include only jobs that have start running but for which not 1033 all hosts have completed. 1034 -finished: Include only jobs for which all hosts have completed (or 1035 aborted). 1036 1037 Extra type filter args for get_jobs: 1038 -suite: Include only jobs with child jobs. 1039 -sub: Include only jobs with a parent job. 1040 -standalone: Inlcude only jobs with no child or parent jobs. 1041 At most one of these three fields should be specified. 1042 """ 1043 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1044 running, 1045 finished) 1046 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1047 suite, 1048 sub, 1049 standalone) 1050 job_dicts = [] 1051 jobs = list(models.Job.query_objects(filter_data)) 1052 models.Job.objects.populate_relationships(jobs, models.Label, 1053 'dependencies') 1054 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') 1055 for job in jobs: 1056 job_dict = job.get_object_dict() 1057 job_dict['dependencies'] = ','.join(label.name 1058 for label in job.dependencies) 1059 job_dict['keyvals'] = dict((keyval.key, keyval.value) 1060 for keyval in job.keyvals) 1061 job_dicts.append(job_dict) 1062 return rpc_utils.prepare_for_serialization(job_dicts) 1063 1064 1065def get_num_jobs(not_yet_run=False, running=False, finished=False, 1066 suite=False, sub=False, standalone=False, 1067 **filter_data): 1068 """\ 1069 See get_jobs() for documentation of extra filter parameters. 1070 """ 1071 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1072 running, 1073 finished) 1074 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1075 suite, 1076 sub, 1077 standalone) 1078 return models.Job.query_count(filter_data) 1079 1080 1081def get_jobs_summary(**filter_data): 1082 """\ 1083 Like get_jobs(), but adds 'status_counts' and 'result_counts' field. 1084 1085 'status_counts' filed is a dictionary mapping status strings to the number 1086 of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}. 1087 1088 'result_counts' field is piped to tko's rpc_interface and has the return 1089 format specified under get_group_counts. 1090 """ 1091 jobs = get_jobs(**filter_data) 1092 ids = [job['id'] for job in jobs] 1093 all_status_counts = models.Job.objects.get_status_counts(ids) 1094 for job in jobs: 1095 job['status_counts'] = all_status_counts[job['id']] 1096 job['result_counts'] = tko_rpc_interface.get_status_counts( 1097 ['afe_job_id', 'afe_job_id'], 1098 header_groups=[['afe_job_id'], ['afe_job_id']], 1099 **{'afe_job_id': job['id']}) 1100 return rpc_utils.prepare_for_serialization(jobs) 1101 1102 1103def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None): 1104 """\ 1105 Retrieves all the information needed to clone a job. 1106 """ 1107 job = models.Job.objects.get(id=id) 1108 job_info = rpc_utils.get_job_info(job, 1109 preserve_metahosts, 1110 queue_entry_filter_data) 1111 1112 host_dicts = [] 1113 for host in job_info['hosts']: 1114 host_dict = get_hosts(id=host.id)[0] 1115 other_labels = host_dict['labels'] 1116 if host_dict['platform']: 1117 other_labels.remove(host_dict['platform']) 1118 host_dict['other_labels'] = ', '.join(other_labels) 1119 host_dicts.append(host_dict) 1120 1121 for host in job_info['one_time_hosts']: 1122 host_dict = dict(hostname=host.hostname, 1123 id=host.id, 1124 platform='(one-time host)', 1125 locked_text='') 1126 host_dicts.append(host_dict) 1127 1128 # convert keys from Label objects to strings (names of labels) 1129 meta_host_counts = dict((meta_host.name, count) for meta_host, count 1130 in job_info['meta_host_counts'].iteritems()) 1131 1132 info = dict(job=job.get_object_dict(), 1133 meta_host_counts=meta_host_counts, 1134 hosts=host_dicts) 1135 info['job']['dependencies'] = job_info['dependencies'] 1136 info['hostless'] = job_info['hostless'] 1137 info['drone_set'] = job.drone_set and job.drone_set.name 1138 1139 image = _get_image_for_job(job, job_info['hostless']) 1140 if image: 1141 info['job']['image'] = image 1142 1143 return rpc_utils.prepare_for_serialization(info) 1144 1145 1146def _get_image_for_job(job, hostless): 1147 """Gets the image used for a job. 1148 1149 Gets the image used for an AFE job from the job's keyvals 'build' or 1150 'builds'. If that fails, and the job is a hostless job, tries to 1151 get the image from its control file attributes 'build' or 'builds'. 1152 1153 TODO(ntang): Needs to handle FAFT with two builds for ro/rw. 1154 1155 @param job An AFE job object. 1156 @param hostless Boolean indicating whether the job is hostless. 1157 1158 @returns The image build used for the job. 1159 """ 1160 keyvals = job.keyval_dict() 1161 image = keyvals.get('build') 1162 if not image: 1163 value = keyvals.get('builds') 1164 builds = None 1165 if isinstance(value, dict): 1166 builds = value 1167 elif isinstance(value, basestring): 1168 builds = ast.literal_eval(value) 1169 if builds: 1170 image = builds.get('cros-version') 1171 if not image and hostless and job.control_file: 1172 try: 1173 control_obj = control_data.parse_control_string( 1174 job.control_file) 1175 if hasattr(control_obj, 'build'): 1176 image = getattr(control_obj, 'build') 1177 if not image and hasattr(control_obj, 'builds'): 1178 builds = getattr(control_obj, 'builds') 1179 image = builds.get('cros-version') 1180 except: 1181 logging.warning('Failed to parse control file for job: %s', 1182 job.name) 1183 return image 1184 1185 1186def get_host_queue_entries_by_insert_time( 1187 insert_time_after=None, insert_time_before=None, **filter_data): 1188 """Like get_host_queue_entries, but using the insert index table. 1189 1190 @param insert_time_after: A lower bound on insert_time 1191 @param insert_time_before: An upper bound on insert_time 1192 @returns A sequence of nested dictionaries of host and job information. 1193 """ 1194 assert insert_time_after is not None or insert_time_before is not None, \ 1195 ('Caller to get_host_queue_entries_by_insert_time must provide either' 1196 ' insert_time_after or insert_time_before.') 1197 # Get insert bounds on the index of the host queue entries. 1198 if insert_time_after: 1199 query = models.HostQueueEntryStartTimes.objects.filter( 1200 # Note: '-insert_time' means descending. We want the largest 1201 # insert time smaller than the insert time. 1202 insert_time__lte=insert_time_after).order_by('-insert_time') 1203 try: 1204 constraint = query[0].highest_hqe_id 1205 if 'id__gte' in filter_data: 1206 constraint = max(constraint, filter_data['id__gte']) 1207 filter_data['id__gte'] = constraint 1208 except IndexError: 1209 pass 1210 1211 # Get end bounds. 1212 if insert_time_before: 1213 query = models.HostQueueEntryStartTimes.objects.filter( 1214 insert_time__gte=insert_time_before).order_by('insert_time') 1215 try: 1216 constraint = query[0].highest_hqe_id 1217 if 'id__lte' in filter_data: 1218 constraint = min(constraint, filter_data['id__lte']) 1219 filter_data['id__lte'] = constraint 1220 except IndexError: 1221 pass 1222 1223 return rpc_utils.prepare_rows_as_nested_dicts( 1224 models.HostQueueEntry.query_objects(filter_data), 1225 ('host', 'job')) 1226 1227 1228def get_host_queue_entries(start_time=None, end_time=None, **filter_data): 1229 """\ 1230 @returns A sequence of nested dictionaries of host and job information. 1231 """ 1232 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1233 'started_on__lte', 1234 start_time, 1235 end_time, 1236 **filter_data) 1237 return rpc_utils.prepare_rows_as_nested_dicts( 1238 models.HostQueueEntry.query_objects(filter_data), 1239 ('host', 'job')) 1240 1241 1242def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data): 1243 """\ 1244 Get the number of host queue entries associated with this job. 1245 """ 1246 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1247 'started_on__lte', 1248 start_time, 1249 end_time, 1250 **filter_data) 1251 return models.HostQueueEntry.query_count(filter_data) 1252 1253 1254def get_hqe_percentage_complete(**filter_data): 1255 """ 1256 Computes the fraction of host queue entries matching the given filter data 1257 that are complete. 1258 """ 1259 query = models.HostQueueEntry.query_objects(filter_data) 1260 complete_count = query.filter(complete=True).count() 1261 total_count = query.count() 1262 if total_count == 0: 1263 return 1 1264 return float(complete_count) / total_count 1265 1266 1267# special tasks 1268 1269def get_special_tasks(**filter_data): 1270 """Get special task entries from the local database. 1271 1272 Query the special tasks table for tasks matching the given 1273 `filter_data`, and return a list of the results. No attempt is 1274 made to forward the call to shards; the buck will stop here. 1275 The caller is expected to know the target shard for such reasons 1276 as: 1277 * The caller is a service (such as gs_offloader) configured 1278 to operate on behalf of one specific shard, and no other. 1279 * The caller has a host as a parameter, and knows that this is 1280 the shard assigned to that host. 1281 1282 @param filter_data Filter keywords to pass to the underlying 1283 database query. 1284 1285 """ 1286 return rpc_utils.prepare_rows_as_nested_dicts( 1287 models.SpecialTask.query_objects(filter_data), 1288 ('host', 'queue_entry')) 1289 1290 1291def get_host_special_tasks(host_id, **filter_data): 1292 """Get special task entries for a given host. 1293 1294 Query the special tasks table for tasks that ran on the host 1295 given by `host_id` and matching the given `filter_data`. 1296 Return a list of the results. If the host is assigned to a 1297 shard, forward this call to that shard. 1298 1299 @param host_id Id in the database of the target host. 1300 @param filter_data Filter keywords to pass to the underlying 1301 database query. 1302 1303 """ 1304 # Retrieve host data even if the host is in an invalid state. 1305 host = models.Host.smart_get(host_id, False) 1306 if not host.shard: 1307 return get_special_tasks(host_id=host_id, **filter_data) 1308 else: 1309 # The return values from AFE methods are post-processed 1310 # objects that aren't JSON-serializable. So, we have to 1311 # call AFE.run() to get the raw, serializable output from 1312 # the shard. 1313 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1314 return shard_afe.run('get_special_tasks', 1315 host_id=host_id, **filter_data) 1316 1317 1318def get_num_special_tasks(**kwargs): 1319 """Get the number of special task entries from the local database. 1320 1321 Query the special tasks table for tasks matching the given 'kwargs', 1322 and return the number of the results. No attempt is made to forward 1323 the call to shards; the buck will stop here. 1324 1325 @param kwargs Filter keywords to pass to the underlying database query. 1326 1327 """ 1328 return models.SpecialTask.query_count(kwargs) 1329 1330 1331def get_host_num_special_tasks(host, **kwargs): 1332 """Get special task entries for a given host. 1333 1334 Query the special tasks table for tasks that ran on the host 1335 given by 'host' and matching the given 'kwargs'. 1336 Return a list of the results. If the host is assigned to a 1337 shard, forward this call to that shard. 1338 1339 @param host id or name of a host. More often a hostname. 1340 @param kwargs Filter keywords to pass to the underlying database query. 1341 1342 """ 1343 # Retrieve host data even if the host is in an invalid state. 1344 host_model = models.Host.smart_get(host, False) 1345 if not host_model.shard: 1346 return get_num_special_tasks(host=host, **kwargs) 1347 else: 1348 shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname()) 1349 return shard_afe.run('get_num_special_tasks', host=host, **kwargs) 1350 1351 1352def get_status_task(host_id, end_time): 1353 """Get the "status task" for a host from the local shard. 1354 1355 Returns a single special task representing the given host's 1356 "status task". The status task is a completed special task that 1357 identifies whether the corresponding host was working or broken 1358 when it completed. A successful task indicates a working host; 1359 a failed task indicates broken. 1360 1361 This call will not be forward to a shard; the receiving server 1362 must be the shard that owns the host. 1363 1364 @param host_id Id in the database of the target host. 1365 @param end_time Time reference for the host's status. 1366 1367 @return A single task; its status (successful or not) 1368 corresponds to the status of the host (working or 1369 broken) at the given time. If no task is found, return 1370 `None`. 1371 1372 """ 1373 tasklist = rpc_utils.prepare_rows_as_nested_dicts( 1374 status_history.get_status_task(host_id, end_time), 1375 ('host', 'queue_entry')) 1376 return tasklist[0] if tasklist else None 1377 1378 1379def get_host_status_task(host_id, end_time): 1380 """Get the "status task" for a host from its owning shard. 1381 1382 Finds the given host's owning shard, and forwards to it a call 1383 to `get_status_task()` (see above). 1384 1385 @param host_id Id in the database of the target host. 1386 @param end_time Time reference for the host's status. 1387 1388 @return A single task; its status (successful or not) 1389 corresponds to the status of the host (working or 1390 broken) at the given time. If no task is found, return 1391 `None`. 1392 1393 """ 1394 host = models.Host.smart_get(host_id) 1395 if not host.shard: 1396 return get_status_task(host_id, end_time) 1397 else: 1398 # The return values from AFE methods are post-processed 1399 # objects that aren't JSON-serializable. So, we have to 1400 # call AFE.run() to get the raw, serializable output from 1401 # the shard. 1402 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1403 return shard_afe.run('get_status_task', 1404 host_id=host_id, end_time=end_time) 1405 1406 1407def get_host_diagnosis_interval(host_id, end_time, success): 1408 """Find a "diagnosis interval" for a given host. 1409 1410 A "diagnosis interval" identifies a start and end time where 1411 the host went from "working" to "broken", or vice versa. The 1412 interval's starting time is the starting time of the last status 1413 task with the old status; the end time is the finish time of the 1414 first status task with the new status. 1415 1416 This routine finds the most recent diagnosis interval for the 1417 given host prior to `end_time`, with a starting status matching 1418 `success`. If `success` is true, the interval will start with a 1419 successful status task; if false the interval will start with a 1420 failed status task. 1421 1422 @param host_id Id in the database of the target host. 1423 @param end_time Time reference for the diagnosis interval. 1424 @param success Whether the diagnosis interval should start 1425 with a successful or failed status task. 1426 1427 @return A list of two strings. The first is the timestamp for 1428 the beginning of the interval; the second is the 1429 timestamp for the end. If the host has never changed 1430 state, the list is empty. 1431 1432 """ 1433 host = models.Host.smart_get(host_id) 1434 if not host.shard or utils.is_shard(): 1435 return status_history.get_diagnosis_interval( 1436 host_id, end_time, success) 1437 else: 1438 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1439 return shard_afe.get_host_diagnosis_interval( 1440 host_id, end_time, success) 1441 1442 1443# support for host detail view 1444 1445def get_host_queue_entries_and_special_tasks(host, query_start=None, 1446 query_limit=None, start_time=None, 1447 end_time=None): 1448 """ 1449 @returns an interleaved list of HostQueueEntries and SpecialTasks, 1450 in approximate run order. each dict contains keys for type, host, 1451 job, status, started_on, execution_path, and ID. 1452 """ 1453 total_limit = None 1454 if query_limit is not None: 1455 total_limit = query_start + query_limit 1456 filter_data_common = {'host': host, 1457 'query_limit': total_limit, 1458 'sort_by': ['-id']} 1459 1460 filter_data_special_tasks = rpc_utils.inject_times_to_filter( 1461 'time_started__gte', 'time_started__lte', start_time, end_time, 1462 **filter_data_common) 1463 1464 queue_entries = get_host_queue_entries( 1465 start_time, end_time, **filter_data_common) 1466 special_tasks = get_host_special_tasks(host, **filter_data_special_tasks) 1467 1468 interleaved_entries = rpc_utils.interleave_entries(queue_entries, 1469 special_tasks) 1470 if query_start is not None: 1471 interleaved_entries = interleaved_entries[query_start:] 1472 if query_limit is not None: 1473 interleaved_entries = interleaved_entries[:query_limit] 1474 return rpc_utils.prepare_host_queue_entries_and_special_tasks( 1475 interleaved_entries, queue_entries) 1476 1477 1478def get_num_host_queue_entries_and_special_tasks(host, start_time=None, 1479 end_time=None): 1480 filter_data_common = {'host': host} 1481 1482 filter_data_queue_entries, filter_data_special_tasks = ( 1483 rpc_utils.inject_times_to_hqe_special_tasks_filters( 1484 filter_data_common, start_time, end_time)) 1485 1486 return (models.HostQueueEntry.query_count(filter_data_queue_entries) 1487 + get_host_num_special_tasks(**filter_data_special_tasks)) 1488 1489 1490# other 1491 1492def echo(data=""): 1493 """\ 1494 Returns a passed in string. For doing a basic test to see if RPC calls 1495 can successfully be made. 1496 """ 1497 return data 1498 1499 1500def get_motd(): 1501 """\ 1502 Returns the message of the day as a string. 1503 """ 1504 return rpc_utils.get_motd() 1505 1506 1507def get_static_data(): 1508 """\ 1509 Returns a dictionary containing a bunch of data that shouldn't change 1510 often and is otherwise inaccessible. This includes: 1511 1512 priorities: List of job priority choices. 1513 default_priority: Default priority value for new jobs. 1514 users: Sorted list of all users. 1515 labels: Sorted list of labels not start with 'cros-version' and 1516 'fw-version'. 1517 tests: Sorted list of all tests. 1518 profilers: Sorted list of all profilers. 1519 current_user: Logged-in username. 1520 host_statuses: Sorted list of possible Host statuses. 1521 job_statuses: Sorted list of possible HostQueueEntry statuses. 1522 job_timeout_default: The default job timeout length in minutes. 1523 parse_failed_repair_default: Default value for the parse_failed_repair job 1524 option. 1525 reboot_before_options: A list of valid RebootBefore string enums. 1526 reboot_after_options: A list of valid RebootAfter string enums. 1527 motd: Server's message of the day. 1528 status_dictionary: A mapping from one word job status names to a more 1529 informative description. 1530 """ 1531 1532 default_drone_set_name = models.DroneSet.default_drone_set_name() 1533 drone_sets = ([default_drone_set_name] + 1534 sorted(drone_set.name for drone_set in 1535 models.DroneSet.objects.exclude( 1536 name=default_drone_set_name))) 1537 1538 result = {} 1539 result['priorities'] = priorities.Priority.choices() 1540 result['default_priority'] = 'Default' 1541 result['max_schedulable_priority'] = priorities.Priority.DEFAULT 1542 result['users'] = get_users(sort_by=['login']) 1543 1544 label_exclude_filters = [{'name__startswith': 'cros-version'}, 1545 {'name__startswith': 'fw-version'}, 1546 {'name__startswith': 'fwrw-version'}, 1547 {'name__startswith': 'fwro-version'}, 1548 {'name__startswith': 'ab-version'}, 1549 {'name__startswith': 'testbed-version'}] 1550 result['labels'] = get_labels( 1551 label_exclude_filters, 1552 sort_by=['-platform', 'name']) 1553 1554 result['tests'] = get_tests(sort_by=['name']) 1555 result['profilers'] = get_profilers(sort_by=['name']) 1556 result['current_user'] = rpc_utils.prepare_for_serialization( 1557 models.User.current_user().get_object_dict()) 1558 result['host_statuses'] = sorted(models.Host.Status.names) 1559 result['job_statuses'] = sorted(models.HostQueueEntry.Status.names) 1560 result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS 1561 result['job_max_runtime_mins_default'] = ( 1562 models.Job.DEFAULT_MAX_RUNTIME_MINS) 1563 result['parse_failed_repair_default'] = bool( 1564 models.Job.DEFAULT_PARSE_FAILED_REPAIR) 1565 result['reboot_before_options'] = model_attributes.RebootBefore.names 1566 result['reboot_after_options'] = model_attributes.RebootAfter.names 1567 result['motd'] = rpc_utils.get_motd() 1568 result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled() 1569 result['drone_sets'] = drone_sets 1570 1571 result['status_dictionary'] = {"Aborted": "Aborted", 1572 "Verifying": "Verifying Host", 1573 "Provisioning": "Provisioning Host", 1574 "Pending": "Waiting on other hosts", 1575 "Running": "Running autoserv", 1576 "Completed": "Autoserv completed", 1577 "Failed": "Failed to complete", 1578 "Queued": "Queued", 1579 "Starting": "Next in host's queue", 1580 "Stopped": "Other host(s) failed verify", 1581 "Parsing": "Awaiting parse of final results", 1582 "Gathering": "Gathering log files", 1583 "Waiting": "Waiting for scheduler action", 1584 "Archiving": "Archiving results", 1585 "Resetting": "Resetting hosts"} 1586 1587 result['wmatrix_url'] = rpc_utils.get_wmatrix_url() 1588 result['is_moblab'] = bool(utils.is_moblab()) 1589 1590 return result 1591 1592 1593def get_server_time(): 1594 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M") 1595 1596 1597def ping_db(): 1598 """Simple connection test to db""" 1599 try: 1600 db_connection.cursor() 1601 except DatabaseError: 1602 return [False] 1603 return [True] 1604 1605 1606def get_hosts_by_attribute(attribute, value): 1607 """ 1608 Get the list of valid hosts that share the same host attribute value. 1609 1610 @param attribute: String of the host attribute to check. 1611 @param value: String of the value that is shared between hosts. 1612 1613 @returns List of hostnames that all have the same host attribute and 1614 value. 1615 """ 1616 hosts = models.HostAttribute.query_objects({'attribute': attribute, 1617 'value': value}) 1618 return [row.host.hostname for row in hosts if row.host.invalid == 0] 1619 1620 1621def canonicalize_suite_name(suite_name): 1622 """Canonicalize the suite's name. 1623 1624 @param suite_name: the name of the suite. 1625 """ 1626 # Do not change this naming convention without updating 1627 # site_utils.parse_job_name. 1628 return 'test_suites/control.%s' % suite_name 1629 1630 1631def formatted_now(): 1632 """Format the current datetime.""" 1633 return datetime.datetime.now().strftime(time_utils.TIME_FMT) 1634 1635 1636def _get_control_file_by_build(build, ds, suite_name): 1637 """Return control file contents for |suite_name|. 1638 1639 Query the dev server at |ds| for the control file |suite_name|, included 1640 in |build| for |board|. 1641 1642 @param build: unique name by which to refer to the image from now on. 1643 @param ds: a dev_server.DevServer instance to fetch control file with. 1644 @param suite_name: canonicalized suite name, e.g. test_suites/control.bvt. 1645 @raises ControlFileNotFound if a unique suite control file doesn't exist. 1646 @raises NoControlFileList if we can't list the control files at all. 1647 @raises ControlFileEmpty if the control file exists on the server, but 1648 can't be read. 1649 1650 @return the contents of the desired control file. 1651 """ 1652 getter = control_file_getter.DevServerGetter.create(build, ds) 1653 devserver_name = ds.hostname 1654 # Get the control file for the suite. 1655 try: 1656 control_file_in = getter.get_control_file_contents_by_name(suite_name) 1657 except error.CrosDynamicSuiteException as e: 1658 raise type(e)('Failed to get control file for %s ' 1659 '(devserver: %s) (error: %s)' % 1660 (build, devserver_name, e)) 1661 if not control_file_in: 1662 raise error.ControlFileEmpty( 1663 "Fetching %s returned no data. (devserver: %s)" % 1664 (suite_name, devserver_name)) 1665 # Force control files to only contain ascii characters. 1666 try: 1667 control_file_in.encode('ascii') 1668 except UnicodeDecodeError as e: 1669 raise error.ControlFileMalformed(str(e)) 1670 1671 return control_file_in 1672 1673 1674def _get_control_file_by_suite(suite_name): 1675 """Get control file contents by suite name. 1676 1677 @param suite_name: Suite name as string. 1678 @returns: Control file contents as string. 1679 """ 1680 getter = control_file_getter.FileSystemGetter( 1681 [_CONFIG.get_config_value('SCHEDULER', 1682 'drone_installation_directory')]) 1683 return getter.get_control_file_contents_by_name(suite_name) 1684 1685 1686def _stage_build_artifacts(build, hostname=None): 1687 """ 1688 Ensure components of |build| necessary for installing images are staged. 1689 1690 @param build image we want to stage. 1691 @param hostname hostname of a dut may run test on. This is to help to locate 1692 a devserver closer to duts if needed. Default is None. 1693 1694 @raises StageControlFileFailure: if the dev server throws 500 while staging 1695 suite control files. 1696 1697 @return: dev_server.ImageServer instance to use with this build. 1698 @return: timings dictionary containing staging start/end times. 1699 """ 1700 timings = {} 1701 # Ensure components of |build| necessary for installing images are staged 1702 # on the dev server. However set synchronous to False to allow other 1703 # components to be downloaded in the background. 1704 ds = dev_server.resolve(build, hostname=hostname) 1705 ds_name = ds.hostname 1706 timings[constants.DOWNLOAD_STARTED_TIME] = formatted_now() 1707 try: 1708 ds.stage_artifacts(image=build, artifacts=['test_suites']) 1709 except dev_server.DevServerException as e: 1710 raise error.StageControlFileFailure( 1711 "Failed to stage %s on %s: %s" % (build, ds_name, e)) 1712 timings[constants.PAYLOAD_FINISHED_TIME] = formatted_now() 1713 return (ds, timings) 1714 1715 1716@rpc_utils.route_rpc_to_master 1717def create_suite_job( 1718 name='', 1719 board='', 1720 pool='', 1721 control_file='', 1722 check_hosts=True, 1723 num=None, 1724 file_bugs=False, 1725 timeout=24, 1726 timeout_mins=None, 1727 priority=priorities.Priority.DEFAULT, 1728 suite_args=None, 1729 wait_for_results=True, 1730 job_retry=False, 1731 max_retries=None, 1732 max_runtime_mins=None, 1733 suite_min_duts=0, 1734 offload_failures_only=False, 1735 builds=None, 1736 test_source_build=None, 1737 run_prod_code=False, 1738 delay_minutes=0, 1739 is_cloning=False, 1740 job_keyvals=None, 1741 test_args=None, 1742 **kwargs 1743): 1744 """ 1745 Create a job to run a test suite on the given device with the given image. 1746 1747 When the timeout specified in the control file is reached, the 1748 job is guaranteed to have completed and results will be available. 1749 1750 @param name: The test name if control_file is supplied, otherwise the name 1751 of the test suite to run, e.g. 'bvt'. 1752 @param board: the kind of device to run the tests on. 1753 @param builds: the builds to install e.g. 1754 {'cros-version:': 'x86-alex-release/R18-1655.0.0', 1755 'fwrw-version:': 'x86-alex-firmware/R36-5771.50.0', 1756 'fwro-version:': 'x86-alex-firmware/R36-5771.49.0'} 1757 If builds is given a value, it overrides argument build. 1758 @param test_source_build: Build that contains the server-side test code. 1759 @param pool: Specify the pool of machines to use for scheduling 1760 purposes. 1761 @param control_file: the control file of the job. 1762 @param check_hosts: require appropriate live hosts to exist in the lab. 1763 @param num: Specify the number of machines to schedule across (integer). 1764 Leave unspecified or use None to use default sharding factor. 1765 @param file_bugs: File a bug on each test failure in this suite. 1766 @param timeout: The max lifetime of this suite, in hours. 1767 @param timeout_mins: The max lifetime of this suite, in minutes. Takes 1768 priority over timeout. 1769 @param priority: Integer denoting priority. Higher is more important. 1770 @param suite_args: Optional arguments which will be parsed by the suite 1771 control file. Used by control.test_that_wrapper to 1772 determine which tests to run. 1773 @param wait_for_results: Set to False to run the suite job without waiting 1774 for test jobs to finish. Default is True. 1775 @param job_retry: Set to True to enable job-level retry. Default is False. 1776 @param max_retries: Integer, maximum job retries allowed at suite level. 1777 None for no max. 1778 @param max_runtime_mins: Maximum amount of time a job can be running in 1779 minutes. 1780 @param suite_min_duts: Integer. Scheduler will prioritize getting the 1781 minimum number of machines for the suite when it is 1782 competing with another suite that has a higher 1783 priority but already got minimum machines it needs. 1784 @param offload_failures_only: Only enable gs_offloading for failed jobs. 1785 @param run_prod_code: If True, the suite will run the test code that 1786 lives in prod aka the test code currently on the 1787 lab servers. If False, the control files and test 1788 code for this suite run will be retrieved from the 1789 build artifacts. 1790 @param delay_minutes: Delay the creation of test jobs for a given number of 1791 minutes. 1792 @param is_cloning: True if creating a cloning job. 1793 @param job_keyvals: A dict of job keyvals to be inject to control file. 1794 @param test_args: A dict of args passed all the way to each individual test 1795 that will be actually run. 1796 @param kwargs: extra keyword args. NOT USED. 1797 1798 @raises ControlFileNotFound: if a unique suite control file doesn't exist. 1799 @raises NoControlFileList: if we can't list the control files at all. 1800 @raises StageControlFileFailure: If the dev server throws 500 while 1801 staging test_suites. 1802 @raises ControlFileEmpty: if the control file exists on the server, but 1803 can't be read. 1804 1805 @return: the job ID of the suite; -1 on error. 1806 """ 1807 if type(num) is not int and num is not None: 1808 raise error.SuiteArgumentException('Ill specified num argument %r. ' 1809 'Must be an integer or None.' % num) 1810 if num == 0: 1811 logging.warning("Can't run on 0 hosts; using default.") 1812 num = None 1813 1814 if builds is None: 1815 builds = {} 1816 1817 # Default test source build to CrOS build if it's not specified and 1818 # run_prod_code is set to False. 1819 if not run_prod_code: 1820 test_source_build = Suite.get_test_source_build( 1821 builds, test_source_build=test_source_build) 1822 1823 sample_dut = rpc_utils.get_sample_dut(board, pool) 1824 1825 suite_name = canonicalize_suite_name(name) 1826 if run_prod_code: 1827 ds = dev_server.resolve(test_source_build, hostname=sample_dut) 1828 keyvals = {} 1829 else: 1830 (ds, keyvals) = _stage_build_artifacts( 1831 test_source_build, hostname=sample_dut) 1832 keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts 1833 1834 # Do not change this naming convention without updating 1835 # site_utils.parse_job_name. 1836 if run_prod_code: 1837 # If run_prod_code is True, test_source_build is not set, use the 1838 # first build in the builds list for the sutie job name. 1839 name = '%s-%s' % (builds.values()[0], suite_name) 1840 else: 1841 name = '%s-%s' % (test_source_build, suite_name) 1842 1843 timeout_mins = timeout_mins or timeout * 60 1844 max_runtime_mins = max_runtime_mins or timeout * 60 1845 1846 if not board: 1847 board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0] 1848 1849 if run_prod_code: 1850 control_file = _get_control_file_by_suite(suite_name) 1851 1852 if not control_file: 1853 # No control file was supplied so look it up from the build artifacts. 1854 control_file = _get_control_file_by_build( 1855 test_source_build, ds, suite_name) 1856 1857 # Prepend builds and board to the control file. 1858 if is_cloning: 1859 control_file = tools.remove_injection(control_file) 1860 1861 inject_dict = { 1862 'board': board, 1863 # `build` is needed for suites like AU to stage image inside suite 1864 # control file. 1865 'build': test_source_build, 1866 'builds': builds, 1867 'check_hosts': check_hosts, 1868 'pool': pool, 1869 'num': num, 1870 'file_bugs': file_bugs, 1871 'timeout': timeout, 1872 'timeout_mins': timeout_mins, 1873 'devserver_url': ds.url(), 1874 'priority': priority, 1875 'suite_args' : suite_args, 1876 'wait_for_results': wait_for_results, 1877 'job_retry': job_retry, 1878 'max_retries': max_retries, 1879 'max_runtime_mins': max_runtime_mins, 1880 'offload_failures_only': offload_failures_only, 1881 'test_source_build': test_source_build, 1882 'run_prod_code': run_prod_code, 1883 'delay_minutes': delay_minutes, 1884 'job_keyvals': job_keyvals, 1885 'test_args': test_args, 1886 } 1887 control_file = tools.inject_vars(inject_dict, control_file) 1888 1889 return rpc_utils.create_job_common(name, 1890 priority=priority, 1891 timeout_mins=timeout_mins, 1892 max_runtime_mins=max_runtime_mins, 1893 control_type='Server', 1894 control_file=control_file, 1895 hostless=True, 1896 keyvals=keyvals) 1897 1898 1899def get_job_history(**filter_data): 1900 """Get history of the job, including the special tasks executed for the job 1901 1902 @param filter_data: filter for the call, should at least include 1903 {'job_id': [job id]} 1904 @returns: JSON string of the job's history, including the information such 1905 as the hosts run the job and the special tasks executed before 1906 and after the job. 1907 """ 1908 job_id = filter_data['job_id'] 1909 job_info = job_history.get_job_info(job_id) 1910 return rpc_utils.prepare_for_serialization(job_info.get_history()) 1911 1912 1913def get_host_history(start_time, end_time, hosts=None, board=None, pool=None): 1914 """Deprecated.""" 1915 raise ValueError('get_host_history rpc is deprecated ' 1916 'and no longer implemented.') 1917 1918 1919def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(), 1920 known_host_ids=(), known_host_statuses=()): 1921 """Receive updates for job statuses from shards and assign hosts and jobs. 1922 1923 @param shard_hostname: Hostname of the calling shard 1924 @param jobs: Jobs in serialized form that should be updated with newer 1925 status from a shard. 1926 @param hqes: Hostqueueentries in serialized form that should be updated with 1927 newer status from a shard. Note that for every hostqueueentry 1928 the corresponding job must be in jobs. 1929 @param known_job_ids: List of ids of jobs the shard already has. 1930 @param known_host_ids: List of ids of hosts the shard already has. 1931 @param known_host_statuses: List of statuses of hosts the shard already has. 1932 1933 @returns: Serialized representations of hosts, jobs, suite job keyvals 1934 and their dependencies to be inserted into a shard's database. 1935 """ 1936 # The following alternatives to sending host and job ids in every heartbeat 1937 # have been considered: 1938 # 1. Sending the highest known job and host ids. This would work for jobs: 1939 # Newer jobs always have larger ids. Also, if a job is not assigned to a 1940 # particular shard during a heartbeat, it never will be assigned to this 1941 # shard later. 1942 # This is not true for hosts though: A host that is leased won't be sent 1943 # to the shard now, but might be sent in a future heartbeat. This means 1944 # sometimes hosts should be transfered that have a lower id than the 1945 # maximum host id the shard knows. 1946 # 2. Send the number of jobs/hosts the shard knows to the master in each 1947 # heartbeat. Compare these to the number of records that already have 1948 # the shard_id set to this shard. In the normal case, they should match. 1949 # In case they don't, resend all entities of that type. 1950 # This would work well for hosts, because there aren't that many. 1951 # Resending all jobs is quite a big overhead though. 1952 # Also, this approach might run into edge cases when entities are 1953 # ever deleted. 1954 # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts. 1955 # Using two different approaches isn't consistent and might cause 1956 # confusion. Also the issues with the case of deletions might still 1957 # occur. 1958 # 1959 # The overhead of sending all job and host ids in every heartbeat is low: 1960 # At peaks one board has about 1200 created but unfinished jobs. 1961 # See the numbers here: http://goo.gl/gQCGWH 1962 # Assuming that job id's have 6 digits and that json serialization takes a 1963 # comma and a space as overhead, the traffic per id sent is about 8 bytes. 1964 # If 5000 ids need to be sent, this means 40 kilobytes of traffic. 1965 # A NOT IN query with 5000 ids took about 30ms in tests made. 1966 # These numbers seem low enough to outweigh the disadvantages of the 1967 # solutions described above. 1968 shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname) 1969 rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes) 1970 assert len(known_host_ids) == len(known_host_statuses) 1971 for i in range(len(known_host_ids)): 1972 host_model = models.Host.objects.get(pk=known_host_ids[i]) 1973 if host_model.status != known_host_statuses[i]: 1974 host_model.status = known_host_statuses[i] 1975 host_model.save() 1976 1977 hosts, jobs, suite_keyvals, inc_ids = rpc_utils.find_records_for_shard( 1978 shard_obj, known_job_ids=known_job_ids, 1979 known_host_ids=known_host_ids) 1980 return { 1981 'hosts': [host.serialize() for host in hosts], 1982 'jobs': [job.serialize() for job in jobs], 1983 'suite_keyvals': [kv.serialize() for kv in suite_keyvals], 1984 'incorrect_host_ids': [int(i) for i in inc_ids], 1985 } 1986 1987 1988def get_shards(**filter_data): 1989 """Return a list of all shards. 1990 1991 @returns A sequence of nested dictionaries of shard information. 1992 """ 1993 shards = models.Shard.query_objects(filter_data) 1994 serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ()) 1995 for serialized, shard in zip(serialized_shards, shards): 1996 serialized['labels'] = [label.name for label in shard.labels.all()] 1997 1998 return serialized_shards 1999 2000 2001def _assign_board_to_shard_precheck(labels): 2002 """Verify whether board labels are valid to be added to a given shard. 2003 2004 First check whether board label is in correct format. Second, check whether 2005 the board label exist. Third, check whether the board has already been 2006 assigned to shard. 2007 2008 @param labels: Board labels separated by comma. 2009 2010 @raises error.RPCException: If label provided doesn't start with `board:` 2011 or board has been added to shard already. 2012 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2013 2014 @returns: A list of label models that ready to be added to shard. 2015 """ 2016 if not labels: 2017 # allow creation of label-less shards (labels='' would otherwise fail the 2018 # checks below) 2019 return [] 2020 labels = labels.split(',') 2021 label_models = [] 2022 for label in labels: 2023 # Check whether the board label is in correct format. 2024 if not label.startswith('board:'): 2025 raise error.RPCException('Sharding only supports `board:.*` label.') 2026 # Check whether the board label exist. If not, exception will be thrown 2027 # by smart_get function. 2028 label = models.Label.smart_get(label) 2029 # Check whether the board has been sharded already 2030 try: 2031 shard = models.Shard.objects.get(labels=label) 2032 raise error.RPCException( 2033 '%s is already on shard %s' % (label, shard.hostname)) 2034 except models.Shard.DoesNotExist: 2035 # board is not on any shard, so it's valid. 2036 label_models.append(label) 2037 return label_models 2038 2039 2040def add_shard(hostname, labels): 2041 """Add a shard and start running jobs on it. 2042 2043 @param hostname: The hostname of the shard to be added; needs to be unique. 2044 @param labels: Board labels separated by comma. Jobs of one of the labels 2045 will be assigned to the shard. 2046 2047 @raises error.RPCException: If label provided doesn't start with `board:` or 2048 board has been added to shard already. 2049 @raises model_logic.ValidationError: If a shard with the given hostname 2050 already exist. 2051 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2052 2053 @returns: The id of the added shard. 2054 """ 2055 labels = _assign_board_to_shard_precheck(labels) 2056 shard = models.Shard.add_object(hostname=hostname) 2057 for label in labels: 2058 shard.labels.add(label) 2059 return shard.id 2060 2061 2062def add_board_to_shard(hostname, labels): 2063 """Add boards to a given shard 2064 2065 @param hostname: The hostname of the shard to be changed. 2066 @param labels: Board labels separated by comma. 2067 2068 @raises error.RPCException: If label provided doesn't start with `board:` or 2069 board has been added to shard already. 2070 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2071 2072 @returns: The id of the changed shard. 2073 """ 2074 labels = _assign_board_to_shard_precheck(labels) 2075 shard = models.Shard.objects.get(hostname=hostname) 2076 for label in labels: 2077 shard.labels.add(label) 2078 return shard.id 2079 2080 2081# Remove board RPCs are rare, so we can afford to make them a bit more 2082# expensive (by performing in a transaction) in order to guarantee 2083# atomicity. 2084# TODO(akeshet): If we ever update to newer version of django, we need to 2085# migrate to transaction.atomic instead of commit_on_success 2086@transaction.commit_on_success 2087def remove_board_from_shard(hostname, label): 2088 """Remove board from the given shard. 2089 @param hostname: The hostname of the shard to be changed. 2090 @param labels: Board label. 2091 2092 @raises models.Label.DoesNotExist: If the label specified doesn't exist. 2093 2094 @returns: The id of the changed shard. 2095 """ 2096 shard = models.Shard.objects.get(hostname=hostname) 2097 label = models.Label.smart_get(label) 2098 if label not in shard.labels.all(): 2099 raise error.RPCException( 2100 'Cannot remove label from shard that does not belong to it.') 2101 shard.labels.remove(label) 2102 models.Host.objects.filter(labels__in=[label]).update(shard=None) 2103 2104 2105def delete_shard(hostname): 2106 """Delete a shard and reclaim all resources from it. 2107 2108 This claims back all assigned hosts from the shard. To ensure all DUTs are 2109 in a sane state, a Reboot task with highest priority is scheduled for them. 2110 This reboots the DUTs and then all left tasks continue to run in drone of 2111 the master. 2112 2113 The procedure for deleting a shard: 2114 * Lock all unlocked hosts on that shard. 2115 * Remove shard information . 2116 * Assign a reboot task with highest priority to these hosts. 2117 * Unlock these hosts, then, the reboot tasks run in front of all other 2118 tasks. 2119 2120 The status of jobs that haven't been reported to be finished yet, will be 2121 lost. The master scheduler will pick up the jobs and execute them. 2122 2123 @param hostname: Hostname of the shard to delete. 2124 """ 2125 shard = rpc_utils.retrieve_shard(shard_hostname=hostname) 2126 hostnames_to_lock = [h.hostname for h in 2127 models.Host.objects.filter(shard=shard, locked=False)] 2128 2129 # TODO(beeps): Power off shard 2130 # For ChromeOS hosts, a reboot test with the highest priority is added to 2131 # the DUT. After a reboot it should be ganranteed that no processes from 2132 # prior tests that were run by a shard are still running on. 2133 2134 # Lock all unlocked hosts. 2135 dicts = {'locked': True, 'lock_time': datetime.datetime.now()} 2136 models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts) 2137 2138 # Remove shard information. 2139 models.Host.objects.filter(shard=shard).update(shard=None) 2140 models.Job.objects.filter(shard=shard).update(shard=None) 2141 shard.labels.clear() 2142 shard.delete() 2143 2144 # Assign a reboot task with highest priority: Super. 2145 t = models.Test.objects.get(name='platform_BootPerfServer:shard') 2146 c = utils.read_file(os.path.join(common.autotest_dir, t.path)) 2147 if hostnames_to_lock: 2148 rpc_utils.create_job_common( 2149 'reboot_dut_for_shard_deletion', 2150 priority=priorities.Priority.SUPER, 2151 control_type='Server', 2152 control_file=c, hosts=hostnames_to_lock, 2153 timeout_mins=10, 2154 max_runtime_mins=10) 2155 2156 # Unlock these shard-related hosts. 2157 dicts = {'locked': False, 'lock_time': None} 2158 models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts) 2159 2160 2161def get_servers(hostname=None, role=None, status=None): 2162 """Get a list of servers with matching role and status. 2163 2164 @param hostname: FQDN of the server. 2165 @param role: Name of the server role, e.g., drone, scheduler. Default to 2166 None to match any role. 2167 @param status: Status of the server, e.g., primary, backup, repair_required. 2168 Default to None to match any server status. 2169 2170 @raises error.RPCException: If server database is not used. 2171 @return: A list of server names for servers with matching role and status. 2172 """ 2173 if not server_manager_utils.use_server_db(): 2174 raise error.RPCException('Server database is not enabled. Please try ' 2175 'retrieve servers from global config.') 2176 servers = server_manager_utils.get_servers(hostname=hostname, role=role, 2177 status=status) 2178 return [s.get_details() for s in servers] 2179 2180 2181@rpc_utils.route_rpc_to_master 2182def get_stable_version(board=stable_version_utils.DEFAULT, android=False): 2183 """Get stable version for the given board. 2184 2185 @param board: Name of the board. 2186 @param android: If True, the given board is an Android-based device. If 2187 False, assume its a Chrome OS-based device. 2188 2189 @return: Stable version of the given board. Return global configure value 2190 of CROS.stable_cros_version if stable_versinos table does not have 2191 entry of board DEFAULT. 2192 """ 2193 return stable_version_utils.get(board=board, android=android) 2194 2195 2196@rpc_utils.route_rpc_to_master 2197def get_all_stable_versions(): 2198 """Get stable versions for all boards. 2199 2200 @return: A dictionary of board:version. 2201 """ 2202 return stable_version_utils.get_all() 2203 2204 2205@rpc_utils.route_rpc_to_master 2206def set_stable_version(version, board=stable_version_utils.DEFAULT): 2207 """Modify stable version for the given board. 2208 2209 @param version: The new value of stable version for given board. 2210 @param board: Name of the board, default to value `DEFAULT`. 2211 """ 2212 stable_version_utils.set(version=version, board=board) 2213 2214 2215@rpc_utils.route_rpc_to_master 2216def delete_stable_version(board): 2217 """Modify stable version for the given board. 2218 2219 Delete a stable version entry in afe_stable_versions table for a given 2220 board, so default stable version will be used. 2221 2222 @param board: Name of the board. 2223 """ 2224 stable_version_utils.delete(board=board) 2225 2226 2227def get_tests_by_build(build, ignore_invalid_tests=True): 2228 """Get the tests that are available for the specified build. 2229 2230 @param build: unique name by which to refer to the image. 2231 @param ignore_invalid_tests: flag on if unparsable tests are ignored. 2232 2233 @return: A sorted list of all tests that are in the build specified. 2234 """ 2235 # Collect the control files specified in this build 2236 cfile_getter = control_file_lib._initialize_control_file_getter(build) 2237 if SuiteBase.ENABLE_CONTROLS_IN_BATCH: 2238 control_file_info_list = cfile_getter.get_suite_info() 2239 control_file_list = control_file_info_list.keys() 2240 else: 2241 control_file_list = cfile_getter.get_control_file_list() 2242 2243 test_objects = [] 2244 _id = 0 2245 for control_file_path in control_file_list: 2246 # Read and parse the control file 2247 if SuiteBase.ENABLE_CONTROLS_IN_BATCH: 2248 control_file = control_file_info_list[control_file_path] 2249 else: 2250 control_file = cfile_getter.get_control_file_contents( 2251 control_file_path) 2252 try: 2253 control_obj = control_data.parse_control_string(control_file) 2254 except: 2255 logging.info('Failed to parse control file: %s', control_file_path) 2256 if not ignore_invalid_tests: 2257 raise 2258 2259 # Extract the values needed for the AFE from the control_obj. 2260 # The keys list represents attributes in the control_obj that 2261 # are required by the AFE 2262 keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental', 2263 'test_category', 'test_class', 'dependencies', 'run_verify', 2264 'sync_count', 'job_retries', 'retries', 'path'] 2265 2266 test_object = {} 2267 for key in keys: 2268 test_object[key] = getattr(control_obj, key) if hasattr( 2269 control_obj, key) else '' 2270 2271 # Unfortunately, the AFE expects different key-names for certain 2272 # values, these must be corrected to avoid the risk of tests 2273 # being omitted by the AFE. 2274 # The 'id' is an additional value used in the AFE. 2275 # The control_data parsing does not reference 'run_reset', but it 2276 # is also used in the AFE and defaults to True. 2277 test_object['id'] = _id 2278 test_object['run_reset'] = True 2279 test_object['description'] = test_object.get('doc', '') 2280 test_object['test_time'] = test_object.get('time', 0) 2281 test_object['test_retry'] = test_object.get('retries', 0) 2282 2283 # Fix the test name to be consistent with the current presentation 2284 # of test names in the AFE. 2285 testpath, subname = os.path.split(control_file_path) 2286 testname = os.path.basename(testpath) 2287 subname = subname.split('.')[1:] 2288 if subname: 2289 testname = '%s:%s' % (testname, ':'.join(subname)) 2290 2291 test_object['name'] = testname 2292 2293 # Correct the test path as parse_control_string sets an empty string. 2294 test_object['path'] = control_file_path 2295 2296 _id += 1 2297 test_objects.append(test_object) 2298 2299 test_objects = sorted(test_objects, key=lambda x: x.get('name')) 2300 return rpc_utils.prepare_for_serialization(test_objects) 2301