1# pylint: disable-msg=C0111 2 3"""\ 4Functions to expose over the RPC interface. 5 6For all modify* and delete* functions that ask for an 'id' parameter to 7identify the object to operate on, the id may be either 8 * the database row ID 9 * the name of the object (label name, hostname, user login, etc.) 10 * a dictionary containing uniquely identifying field (this option should seldom 11 be used) 12 13When specifying foreign key fields (i.e. adding hosts to a label, or adding 14users to an ACL group), the given value may be either the database row ID or the 15name of the object. 16 17All get* functions return lists of dictionaries. Each dictionary represents one 18object and maps field names to values. 19 20Some examples: 21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2 22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2' 23modify_test('sleeptest', test_type='Client', params=', seconds=60') 24delete_acl_group(1) # delete by ID 25delete_acl_group('Everyone') # delete by name 26acl_group_add_users('Everyone', ['mbligh', 'showard']) 27get_jobs(owner='showard', status='Queued') 28 29See doctests/001_rpc_test.txt for (lots) more examples. 30""" 31 32__author__ = 'showard@google.com (Steve Howard)' 33 34import sys 35import datetime 36import logging 37 38from django.db.models import Count 39import common 40from autotest_lib.client.common_lib import priorities 41from autotest_lib.client.common_lib.cros import dev_server 42from autotest_lib.client.common_lib.cros.graphite import autotest_stats 43from autotest_lib.frontend.afe import control_file, rpc_utils 44from autotest_lib.frontend.afe import models, model_logic, model_attributes 45from autotest_lib.frontend.afe import site_rpc_interface 46from autotest_lib.frontend.tko import models as tko_models 47from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface 48from autotest_lib.server import frontend 49from autotest_lib.server import utils 50from autotest_lib.server.cros import provision 51from autotest_lib.server.cros.dynamic_suite import tools 52from autotest_lib.site_utils import status_history 53 54 55_timer = autotest_stats.Timer('rpc_interface') 56 57def get_parameterized_autoupdate_image_url(job): 58 """Get the parameterized autoupdate image url from a parameterized job.""" 59 known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob') 60 image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj, 61 name='image') 62 para_set = job.parameterized_job.parameterizedjobparameter_set 63 job_test_para = para_set.get(test_parameter=image_parameter) 64 return job_test_para.parameter_value 65 66 67# labels 68 69def modify_label(id, **data): 70 """Modify a label. 71 72 @param id: id or name of a label. More often a label name. 73 @param data: New data for a label. 74 """ 75 label_model = models.Label.smart_get(id) 76 label_model.update_object(data) 77 78 # Master forwards the RPC to shards 79 if not utils.is_shard(): 80 rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False, 81 id=id, **data) 82 83 84def delete_label(id): 85 """Delete a label. 86 87 @param id: id or name of a label. More often a label name. 88 """ 89 label_model = models.Label.smart_get(id) 90 # Hosts that have the label to be deleted. Save this info before 91 # the label is deleted to use it later. 92 hosts = [] 93 for h in label_model.host_set.all(): 94 hosts.append(models.Host.smart_get(h.id)) 95 label_model.delete() 96 97 # Master forwards the RPC to shards 98 if not utils.is_shard(): 99 rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id) 100 101 102def add_label(name, ignore_exception_if_exists=False, **kwargs): 103 """Adds a new label of a given name. 104 105 @param name: label name. 106 @param ignore_exception_if_exists: If True and the exception was 107 thrown due to the duplicated label name when adding a label, 108 then suppress the exception. Default is False. 109 @param kwargs: keyword args that store more info about a label 110 other than the name. 111 @return: int/long id of a new label. 112 """ 113 # models.Label.add_object() throws model_logic.ValidationError 114 # when it is given a label name that already exists. 115 # However, ValidationError can be thrown with different errors, 116 # and those errors should be thrown up to the call chain. 117 try: 118 label = models.Label.add_object(name=name, **kwargs) 119 except: 120 exc_info = sys.exc_info() 121 if ignore_exception_if_exists: 122 label = rpc_utils.get_label(name) 123 # If the exception is raised not because of duplicated 124 # "name", then raise the original exception. 125 if label is None: 126 raise exc_info[0], exc_info[1], exc_info[2] 127 else: 128 raise exc_info[0], exc_info[1], exc_info[2] 129 return label.id 130 131 132def add_label_to_hosts(id, hosts): 133 """Adds a label of the given id to the given hosts only in local DB. 134 135 @param id: id or name of a label. More often a label name. 136 @param hosts: The hostnames of hosts that need the label. 137 138 @raises models.Label.DoesNotExist: If the label with id doesn't exist. 139 """ 140 label = models.Label.smart_get(id) 141 host_objs = models.Host.smart_get_bulk(hosts) 142 if label.platform: 143 models.Host.check_no_platform(host_objs) 144 label.host_set.add(*host_objs) 145 146 147@rpc_utils.route_rpc_to_master 148def label_add_hosts(id, hosts): 149 """Adds a label with the given id to the given hosts. 150 151 This method should be run only on master not shards. 152 The given label will be created if it doesn't exist, provided the `id` 153 supplied is a label name not an int/long id. 154 155 @param id: id or name of a label. More often a label name. 156 @param hosts: A list of hostnames or ids. More often hostnames. 157 158 @raises ValueError: If the id specified is an int/long (label id) 159 while the label does not exist. 160 """ 161 try: 162 label = models.Label.smart_get(id) 163 except models.Label.DoesNotExist: 164 # This matches the type checks in smart_get, which is a hack 165 # in and off itself. The aim here is to create any non-existent 166 # label, which we cannot do if the 'id' specified isn't a label name. 167 if isinstance(id, basestring): 168 label = models.Label.smart_get(add_label(id)) 169 else: 170 raise ValueError('Label id (%s) does not exist. Please specify ' 171 'the argument, id, as a string (label name).' 172 % id) 173 add_label_to_hosts(id, hosts) 174 175 host_objs = models.Host.smart_get_bulk(hosts) 176 # Make sure the label exists on the shard with the same id 177 # as it is on the master. 178 # It is possible that the label is already in a shard because 179 # we are adding a new label only to shards of hosts that the label 180 # is going to be attached. 181 # For example, we add a label L1 to a host in shard S1. 182 # Master and S1 will have L1 but other shards won't. 183 # Later, when we add the same label L1 to hosts in shards S1 and S2, 184 # S1 already has the label but S2 doesn't. 185 # S2 should have the new label without any problem. 186 # We ignore exception in such a case. 187 rpc_utils.fanout_rpc( 188 host_objs, 'add_label', include_hostnames=False, 189 name=label.name, ignore_exception_if_exists=True, 190 id=label.id, platform=label.platform) 191 rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id) 192 193 194def remove_label_from_hosts(id, hosts): 195 """Removes a label of the given id from the given hosts only in local DB. 196 197 @param id: id or name of a label. 198 @param hosts: The hostnames of hosts that need to remove the label from. 199 """ 200 host_objs = models.Host.smart_get_bulk(hosts) 201 models.Label.smart_get(id).host_set.remove(*host_objs) 202 203 204@rpc_utils.route_rpc_to_master 205def label_remove_hosts(id, hosts): 206 """Removes a label of the given id from the given hosts. 207 208 This method should be run only on master not shards. 209 210 @param id: id or name of a label. 211 @param hosts: A list of hostnames or ids. More often hostnames. 212 """ 213 host_objs = models.Host.smart_get_bulk(hosts) 214 remove_label_from_hosts(id, hosts) 215 216 rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id) 217 218 219def get_labels(exclude_filters=(), **filter_data): 220 """\ 221 @param exclude_filters: A sequence of dictionaries of filters. 222 223 @returns A sequence of nested dictionaries of label information. 224 """ 225 labels = models.Label.query_objects(filter_data) 226 for exclude_filter in exclude_filters: 227 labels = labels.exclude(**exclude_filter) 228 return rpc_utils.prepare_rows_as_nested_dicts(labels, ('atomic_group',)) 229 230 231# atomic groups 232 233def add_atomic_group(name, max_number_of_machines=None, description=None): 234 return models.AtomicGroup.add_object( 235 name=name, max_number_of_machines=max_number_of_machines, 236 description=description).id 237 238 239def modify_atomic_group(id, **data): 240 models.AtomicGroup.smart_get(id).update_object(data) 241 242 243def delete_atomic_group(id): 244 models.AtomicGroup.smart_get(id).delete() 245 246 247def atomic_group_add_labels(id, labels): 248 label_objs = models.Label.smart_get_bulk(labels) 249 models.AtomicGroup.smart_get(id).label_set.add(*label_objs) 250 251 252def atomic_group_remove_labels(id, labels): 253 label_objs = models.Label.smart_get_bulk(labels) 254 models.AtomicGroup.smart_get(id).label_set.remove(*label_objs) 255 256 257def get_atomic_groups(**filter_data): 258 return rpc_utils.prepare_for_serialization( 259 models.AtomicGroup.list_objects(filter_data)) 260 261 262# hosts 263 264def add_host(hostname, status=None, locked=None, lock_reason='', protection=None): 265 if locked and not lock_reason: 266 raise model_logic.ValidationError( 267 {'locked': 'Please provide a reason for locking when adding host.'}) 268 269 return models.Host.add_object(hostname=hostname, status=status, 270 locked=locked, lock_reason=lock_reason, 271 protection=protection).id 272 273 274@rpc_utils.route_rpc_to_master 275def modify_host(id, **kwargs): 276 """Modify local attributes of a host. 277 278 If this is called on the master, but the host is assigned to a shard, this 279 will call `modify_host_local` RPC to the responsible shard. This means if 280 a host is being locked using this function, this change will also propagate 281 to shards. 282 When this is called on a shard, the shard just routes the RPC to the master 283 and does nothing. 284 285 @param id: id of the host to modify. 286 @param kwargs: key=value pairs of values to set on the host. 287 """ 288 rpc_utils.check_modify_host(kwargs) 289 host = models.Host.smart_get(id) 290 try: 291 rpc_utils.check_modify_host_locking(host, kwargs) 292 except model_logic.ValidationError as e: 293 if not kwargs.get('force_modify_locking', False): 294 raise 295 logging.exception('The following exception will be ignored and lock ' 296 'modification will be enforced. %s', e) 297 298 # This is required to make `lock_time` for a host be exactly same 299 # between the master and a shard. 300 if kwargs.get('locked', None) and 'lock_time' not in kwargs: 301 kwargs['lock_time'] = datetime.datetime.now() 302 host.update_object(kwargs) 303 304 # force_modifying_locking is not an internal field in database, remove. 305 kwargs.pop('force_modify_locking', None) 306 rpc_utils.fanout_rpc([host], 'modify_host_local', 307 include_hostnames=False, id=id, **kwargs) 308 309 310def modify_host_local(id, **kwargs): 311 """Modify host attributes in local DB. 312 313 @param id: Host id. 314 @param kwargs: key=value pairs of values to set on the host. 315 """ 316 models.Host.smart_get(id).update_object(kwargs) 317 318 319@rpc_utils.route_rpc_to_master 320def modify_hosts(host_filter_data, update_data): 321 """Modify local attributes of multiple hosts. 322 323 If this is called on the master, but one of the hosts in that match the 324 filters is assigned to a shard, this will call `modify_hosts_local` RPC 325 to the responsible shard. 326 When this is called on a shard, the shard just routes the RPC to the master 327 and does nothing. 328 329 The filters are always applied on the master, not on the shards. This means 330 if the states of a host differ on the master and a shard, the state on the 331 master will be used. I.e. this means: 332 A host was synced to Shard 1. On Shard 1 the status of the host was set to 333 'Repair Failed'. 334 - A call to modify_hosts with host_filter_data={'status': 'Ready'} will 335 update the host (both on the shard and on the master), because the state 336 of the host as the master knows it is still 'Ready'. 337 - A call to modify_hosts with host_filter_data={'status': 'Repair failed' 338 will not update the host, because the filter doesn't apply on the master. 339 340 @param host_filter_data: Filters out which hosts to modify. 341 @param update_data: A dictionary with the changes to make to the hosts. 342 """ 343 update_data = update_data.copy() 344 rpc_utils.check_modify_host(update_data) 345 hosts = models.Host.query_objects(host_filter_data) 346 347 affected_shard_hostnames = set() 348 affected_host_ids = [] 349 350 # Check all hosts before changing data for exception safety. 351 for host in hosts: 352 try: 353 rpc_utils.check_modify_host_locking(host, update_data) 354 except model_logic.ValidationError as e: 355 if not update_data.get('force_modify_locking', False): 356 raise 357 logging.exception('The following exception will be ignored and ' 358 'lock modification will be enforced. %s', e) 359 360 if host.shard: 361 affected_shard_hostnames.add(host.shard.rpc_hostname()) 362 affected_host_ids.append(host.id) 363 364 # This is required to make `lock_time` for a host be exactly same 365 # between the master and a shard. 366 if update_data.get('locked', None) and 'lock_time' not in update_data: 367 update_data['lock_time'] = datetime.datetime.now() 368 for host in hosts: 369 host.update_object(update_data) 370 371 update_data.pop('force_modify_locking', None) 372 # Caution: Changing the filter from the original here. See docstring. 373 rpc_utils.run_rpc_on_multiple_hostnames( 374 'modify_hosts_local', affected_shard_hostnames, 375 host_filter_data={'id__in': affected_host_ids}, 376 update_data=update_data) 377 378 379def modify_hosts_local(host_filter_data, update_data): 380 """Modify attributes of hosts in local DB. 381 382 @param host_filter_data: Filters out which hosts to modify. 383 @param update_data: A dictionary with the changes to make to the hosts. 384 """ 385 for host in models.Host.query_objects(host_filter_data): 386 host.update_object(update_data) 387 388 389def add_labels_to_host(id, labels): 390 """Adds labels to a given host only in local DB. 391 392 @param id: id or hostname for a host. 393 @param labels: ids or names for labels. 394 """ 395 label_objs = models.Label.smart_get_bulk(labels) 396 models.Host.smart_get(id).labels.add(*label_objs) 397 398 399@rpc_utils.route_rpc_to_master 400def host_add_labels(id, labels): 401 """Adds labels to a given host. 402 403 @param id: id or hostname for a host. 404 @param labels: ids or names for labels. 405 406 @raises ValidationError: If adding more than one platform label. 407 """ 408 label_objs = models.Label.smart_get_bulk(labels) 409 platforms = [label.name for label in label_objs if label.platform] 410 if len(platforms) > 1: 411 raise model_logic.ValidationError( 412 {'labels': 'Adding more than one platform label: %s' % 413 ', '.join(platforms)}) 414 415 host_obj = models.Host.smart_get(id) 416 if len(platforms) == 1: 417 models.Host.check_no_platform([host_obj]) 418 add_labels_to_host(id, labels) 419 420 rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False, 421 id=id, labels=labels) 422 423 424def remove_labels_from_host(id, labels): 425 """Removes labels from a given host only in local DB. 426 427 @param id: id or hostname for a host. 428 @param labels: ids or names for labels. 429 """ 430 label_objs = models.Label.smart_get_bulk(labels) 431 models.Host.smart_get(id).labels.remove(*label_objs) 432 433 434@rpc_utils.route_rpc_to_master 435def host_remove_labels(id, labels): 436 """Removes labels from a given host. 437 438 @param id: id or hostname for a host. 439 @param labels: ids or names for labels. 440 """ 441 remove_labels_from_host(id, labels) 442 443 host_obj = models.Host.smart_get(id) 444 rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False, 445 id=id, labels=labels) 446 447 448def get_host_attribute(attribute, **host_filter_data): 449 """ 450 @param attribute: string name of attribute 451 @param host_filter_data: filter data to apply to Hosts to choose hosts to 452 act upon 453 """ 454 hosts = rpc_utils.get_host_query((), False, False, True, host_filter_data) 455 hosts = list(hosts) 456 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 457 'attribute_list') 458 host_attr_dicts = [] 459 for host_obj in hosts: 460 for attr_obj in host_obj.attribute_list: 461 if attr_obj.attribute == attribute: 462 host_attr_dicts.append(attr_obj.get_object_dict()) 463 return rpc_utils.prepare_for_serialization(host_attr_dicts) 464 465 466def set_host_attribute(attribute, value, **host_filter_data): 467 """ 468 @param attribute: string name of attribute 469 @param value: string, or None to delete an attribute 470 @param host_filter_data: filter data to apply to Hosts to choose hosts to 471 act upon 472 """ 473 assert host_filter_data # disallow accidental actions on all hosts 474 hosts = models.Host.query_objects(host_filter_data) 475 models.AclGroup.check_for_acl_violation_hosts(hosts) 476 for host in hosts: 477 host.set_or_delete_attribute(attribute, value) 478 479 # Master forwards this RPC to shards. 480 if not utils.is_shard(): 481 rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False, 482 attribute=attribute, value=value, **host_filter_data) 483 484 485@rpc_utils.forward_single_host_rpc_to_shard 486def delete_host(id): 487 models.Host.smart_get(id).delete() 488 489 490def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 491 exclude_atomic_group_hosts=False, valid_only=True, 492 include_current_job=False, **filter_data): 493 """Get a list of dictionaries which contains the information of hosts. 494 495 @param multiple_labels: match hosts in all of the labels given. Should 496 be a list of label names. 497 @param exclude_only_if_needed_labels: Exclude hosts with at least one 498 "only_if_needed" label applied. 499 @param exclude_atomic_group_hosts: Exclude hosts that have one or more 500 atomic group labels associated with them. 501 @param include_current_job: Set to True to include ids of currently running 502 job and special task. 503 """ 504 hosts = rpc_utils.get_host_query(multiple_labels, 505 exclude_only_if_needed_labels, 506 exclude_atomic_group_hosts, 507 valid_only, filter_data) 508 hosts = list(hosts) 509 models.Host.objects.populate_relationships(hosts, models.Label, 510 'label_list') 511 models.Host.objects.populate_relationships(hosts, models.AclGroup, 512 'acl_list') 513 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 514 'attribute_list') 515 host_dicts = [] 516 for host_obj in hosts: 517 host_dict = host_obj.get_object_dict() 518 host_dict['labels'] = [label.name for label in host_obj.label_list] 519 host_dict['platform'], host_dict['atomic_group'] = (rpc_utils. 520 find_platform_and_atomic_group(host_obj)) 521 host_dict['acls'] = [acl.name for acl in host_obj.acl_list] 522 host_dict['attributes'] = dict((attribute.attribute, attribute.value) 523 for attribute in host_obj.attribute_list) 524 if include_current_job: 525 host_dict['current_job'] = None 526 host_dict['current_special_task'] = None 527 entries = models.HostQueueEntry.objects.filter( 528 host_id=host_dict['id'], active=True, complete=False) 529 if entries: 530 host_dict['current_job'] = ( 531 entries[0].get_object_dict()['job']) 532 tasks = models.SpecialTask.objects.filter( 533 host_id=host_dict['id'], is_active=True, is_complete=False) 534 if tasks: 535 host_dict['current_special_task'] = ( 536 '%d-%s' % (tasks[0].get_object_dict()['id'], 537 tasks[0].get_object_dict()['task'].lower())) 538 host_dicts.append(host_dict) 539 return rpc_utils.prepare_for_serialization(host_dicts) 540 541 542def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 543 exclude_atomic_group_hosts=False, valid_only=True, 544 **filter_data): 545 """ 546 Same parameters as get_hosts(). 547 548 @returns The number of matching hosts. 549 """ 550 hosts = rpc_utils.get_host_query(multiple_labels, 551 exclude_only_if_needed_labels, 552 exclude_atomic_group_hosts, 553 valid_only, filter_data) 554 return hosts.count() 555 556 557# tests 558 559def add_test(name, test_type, path, author=None, dependencies=None, 560 experimental=True, run_verify=None, test_class=None, 561 test_time=None, test_category=None, description=None, 562 sync_count=1): 563 return models.Test.add_object(name=name, test_type=test_type, path=path, 564 author=author, dependencies=dependencies, 565 experimental=experimental, 566 run_verify=run_verify, test_time=test_time, 567 test_category=test_category, 568 sync_count=sync_count, 569 test_class=test_class, 570 description=description).id 571 572 573def modify_test(id, **data): 574 models.Test.smart_get(id).update_object(data) 575 576 577def delete_test(id): 578 models.Test.smart_get(id).delete() 579 580 581def get_tests(**filter_data): 582 return rpc_utils.prepare_for_serialization( 583 models.Test.list_objects(filter_data)) 584 585 586@_timer.decorate 587def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name): 588 """Gets the counts of all passed and failed tests from the matching jobs. 589 590 @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g., 591 'butterfly-release/R40-6457.21.0/bvt-cq/'. 592 @param label_name: Label that must be set in the jobs, e.g., 593 'cros-version:butterfly-release/R40-6457.21.0'. 594 595 @returns A summary of the counts of all the passed and failed tests. 596 """ 597 job_ids = list(models.Job.objects.filter( 598 name__startswith=job_name_prefix, 599 dependency_labels__name=label_name).values_list( 600 'pk', flat=True)) 601 summary = {'passed': 0, 'failed': 0} 602 if not job_ids: 603 return summary 604 605 counts = (tko_models.TestView.objects.filter( 606 afe_job_id__in=job_ids).exclude( 607 test_name='SERVER_JOB').exclude( 608 test_name__startswith='CLIENT_JOB').values( 609 'status').annotate( 610 count=Count('status'))) 611 for status in counts: 612 if status['status'] == 'GOOD': 613 summary['passed'] += status['count'] 614 else: 615 summary['failed'] += status['count'] 616 return summary 617 618 619# profilers 620 621def add_profiler(name, description=None): 622 return models.Profiler.add_object(name=name, description=description).id 623 624 625def modify_profiler(id, **data): 626 models.Profiler.smart_get(id).update_object(data) 627 628 629def delete_profiler(id): 630 models.Profiler.smart_get(id).delete() 631 632 633def get_profilers(**filter_data): 634 return rpc_utils.prepare_for_serialization( 635 models.Profiler.list_objects(filter_data)) 636 637 638# users 639 640def add_user(login, access_level=None): 641 return models.User.add_object(login=login, access_level=access_level).id 642 643 644def modify_user(id, **data): 645 models.User.smart_get(id).update_object(data) 646 647 648def delete_user(id): 649 models.User.smart_get(id).delete() 650 651 652def get_users(**filter_data): 653 return rpc_utils.prepare_for_serialization( 654 models.User.list_objects(filter_data)) 655 656 657# acl groups 658 659def add_acl_group(name, description=None): 660 group = models.AclGroup.add_object(name=name, description=description) 661 group.users.add(models.User.current_user()) 662 return group.id 663 664 665def modify_acl_group(id, **data): 666 group = models.AclGroup.smart_get(id) 667 group.check_for_acl_violation_acl_group() 668 group.update_object(data) 669 group.add_current_user_if_empty() 670 671 672def acl_group_add_users(id, users): 673 group = models.AclGroup.smart_get(id) 674 group.check_for_acl_violation_acl_group() 675 users = models.User.smart_get_bulk(users) 676 group.users.add(*users) 677 678 679def acl_group_remove_users(id, users): 680 group = models.AclGroup.smart_get(id) 681 group.check_for_acl_violation_acl_group() 682 users = models.User.smart_get_bulk(users) 683 group.users.remove(*users) 684 group.add_current_user_if_empty() 685 686 687def acl_group_add_hosts(id, hosts): 688 group = models.AclGroup.smart_get(id) 689 group.check_for_acl_violation_acl_group() 690 hosts = models.Host.smart_get_bulk(hosts) 691 group.hosts.add(*hosts) 692 group.on_host_membership_change() 693 694 695def acl_group_remove_hosts(id, hosts): 696 group = models.AclGroup.smart_get(id) 697 group.check_for_acl_violation_acl_group() 698 hosts = models.Host.smart_get_bulk(hosts) 699 group.hosts.remove(*hosts) 700 group.on_host_membership_change() 701 702 703def delete_acl_group(id): 704 models.AclGroup.smart_get(id).delete() 705 706 707def get_acl_groups(**filter_data): 708 acl_groups = models.AclGroup.list_objects(filter_data) 709 for acl_group in acl_groups: 710 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id']) 711 acl_group['users'] = [user.login 712 for user in acl_group_obj.users.all()] 713 acl_group['hosts'] = [host.hostname 714 for host in acl_group_obj.hosts.all()] 715 return rpc_utils.prepare_for_serialization(acl_groups) 716 717 718# jobs 719 720def generate_control_file(tests=(), kernel=None, label=None, profilers=(), 721 client_control_file='', use_container=False, 722 profile_only=None, upload_kernel_config=False, 723 db_tests=True): 724 """ 725 Generates a client-side control file to load a kernel and run tests. 726 727 @param tests List of tests to run. See db_tests for more information. 728 @param kernel A list of kernel info dictionaries configuring which kernels 729 to boot for this job and other options for them 730 @param label Name of label to grab kernel config from. 731 @param profilers List of profilers to activate during the job. 732 @param client_control_file The contents of a client-side control file to 733 run at the end of all tests. If this is supplied, all tests must be 734 client side. 735 TODO: in the future we should support server control files directly 736 to wrap with a kernel. That'll require changing the parameter 737 name and adding a boolean to indicate if it is a client or server 738 control file. 739 @param use_container unused argument today. TODO: Enable containers 740 on the host during a client side test. 741 @param profile_only A boolean that indicates what default profile_only 742 mode to use in the control file. Passing None will generate a 743 control file that does not explcitly set the default mode at all. 744 @param upload_kernel_config: if enabled it will generate server control 745 file code that uploads the kernel config file to the client and 746 tells the client of the new (local) path when compiling the kernel; 747 the tests must be server side tests 748 @param db_tests: if True, the test object can be found in the database 749 backing the test model. In this case, tests is a tuple 750 of test IDs which are used to retrieve the test objects 751 from the database. If False, tests is a tuple of test 752 dictionaries stored client-side in the AFE. 753 754 @returns a dict with the following keys: 755 control_file: str, The control file text. 756 is_server: bool, is the control file a server-side control file? 757 synch_count: How many machines the job uses per autoserv execution. 758 synch_count == 1 means the job is asynchronous. 759 dependencies: A list of the names of labels on which the job depends. 760 """ 761 if not tests and not client_control_file: 762 return dict(control_file='', is_server=False, synch_count=1, 763 dependencies=[]) 764 765 cf_info, test_objects, profiler_objects, label = ( 766 rpc_utils.prepare_generate_control_file(tests, kernel, label, 767 profilers, db_tests)) 768 cf_info['control_file'] = control_file.generate_control( 769 tests=test_objects, kernels=kernel, platform=label, 770 profilers=profiler_objects, is_server=cf_info['is_server'], 771 client_control_file=client_control_file, profile_only=profile_only, 772 upload_kernel_config=upload_kernel_config) 773 return cf_info 774 775 776def create_parameterized_job(name, priority, test, parameters, kernel=None, 777 label=None, profilers=(), profiler_parameters=None, 778 use_container=False, profile_only=None, 779 upload_kernel_config=False, hosts=(), 780 meta_hosts=(), one_time_hosts=(), 781 atomic_group_name=None, synch_count=None, 782 is_template=False, timeout=None, 783 timeout_mins=None, max_runtime_mins=None, 784 run_verify=False, email_list='', dependencies=(), 785 reboot_before=None, reboot_after=None, 786 parse_failed_repair=None, hostless=False, 787 keyvals=None, drone_set=None, run_reset=True, 788 require_ssp=None): 789 """ 790 Creates and enqueues a parameterized job. 791 792 Most parameters a combination of the parameters for generate_control_file() 793 and create_job(), with the exception of: 794 795 @param test name or ID of the test to run 796 @param parameters a map of parameter name -> 797 tuple of (param value, param type) 798 @param profiler_parameters a dictionary of parameters for the profilers: 799 key: profiler name 800 value: dict of param name -> tuple of 801 (param value, 802 param type) 803 """ 804 # Save the values of the passed arguments here. What we're going to do with 805 # them is pass them all to rpc_utils.get_create_job_common_args(), which 806 # will extract the subset of these arguments that apply for 807 # rpc_utils.create_job_common(), which we then pass in to that function. 808 args = locals() 809 810 # Set up the parameterized job configs 811 test_obj = models.Test.smart_get(test) 812 control_type = test_obj.test_type 813 814 try: 815 label = models.Label.smart_get(label) 816 except models.Label.DoesNotExist: 817 label = None 818 819 kernel_objs = models.Kernel.create_kernels(kernel) 820 profiler_objs = [models.Profiler.smart_get(profiler) 821 for profiler in profilers] 822 823 parameterized_job = models.ParameterizedJob.objects.create( 824 test=test_obj, label=label, use_container=use_container, 825 profile_only=profile_only, 826 upload_kernel_config=upload_kernel_config) 827 parameterized_job.kernels.add(*kernel_objs) 828 829 for profiler in profiler_objs: 830 parameterized_profiler = models.ParameterizedJobProfiler.objects.create( 831 parameterized_job=parameterized_job, 832 profiler=profiler) 833 profiler_params = profiler_parameters.get(profiler.name, {}) 834 for name, (value, param_type) in profiler_params.iteritems(): 835 models.ParameterizedJobProfilerParameter.objects.create( 836 parameterized_job_profiler=parameterized_profiler, 837 parameter_name=name, 838 parameter_value=value, 839 parameter_type=param_type) 840 841 try: 842 for parameter in test_obj.testparameter_set.all(): 843 if parameter.name in parameters: 844 param_value, param_type = parameters.pop(parameter.name) 845 parameterized_job.parameterizedjobparameter_set.create( 846 test_parameter=parameter, parameter_value=param_value, 847 parameter_type=param_type) 848 849 if parameters: 850 raise Exception('Extra parameters remain: %r' % parameters) 851 852 return rpc_utils.create_job_common( 853 parameterized_job=parameterized_job.id, 854 control_type=control_type, 855 **rpc_utils.get_create_job_common_args(args)) 856 except: 857 parameterized_job.delete() 858 raise 859 860 861def create_job_page_handler(name, priority, control_file, control_type, 862 image=None, hostless=False, firmware_rw_build=None, 863 firmware_ro_build=None, test_source_build=None, 864 **kwargs): 865 """\ 866 Create and enqueue a job. 867 868 @param name name of this job 869 @param priority Integer priority of this job. Higher is more important. 870 @param control_file String contents of the control file. 871 @param control_type Type of control file, Client or Server. 872 @param image: ChromeOS build to be installed in the dut. Default to None. 873 @param firmware_rw_build: Firmware build to update RW firmware. Default to 874 None, i.e., RW firmware will not be updated. 875 @param firmware_ro_build: Firmware build to update RO firmware. Default to 876 None, i.e., RO firmware will not be updated. 877 @param test_source_build: Build to be used to retrieve test code. Default 878 to None. 879 @param kwargs extra args that will be required by create_suite_job or 880 create_job. 881 882 @returns The created Job id number. 883 """ 884 control_file = rpc_utils.encode_ascii(control_file) 885 if not control_file: 886 raise model_logic.ValidationError({ 887 'control_file' : "Control file cannot be empty"}) 888 889 if image and hostless: 890 builds = {} 891 builds[provision.CROS_VERSION_PREFIX] = image 892 if firmware_rw_build: 893 builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build 894 if firmware_ro_build: 895 builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build 896 return site_rpc_interface.create_suite_job( 897 name=name, control_file=control_file, priority=priority, 898 builds=builds, test_source_build=test_source_build, **kwargs) 899 return create_job(name, priority, control_file, control_type, image=image, 900 hostless=hostless, **kwargs) 901 902 903@rpc_utils.route_rpc_to_master 904def create_job(name, priority, control_file, control_type, 905 hosts=(), meta_hosts=(), one_time_hosts=(), 906 atomic_group_name=None, synch_count=None, is_template=False, 907 timeout=None, timeout_mins=None, max_runtime_mins=None, 908 run_verify=False, email_list='', dependencies=(), 909 reboot_before=None, reboot_after=None, parse_failed_repair=None, 910 hostless=False, keyvals=None, drone_set=None, image=None, 911 parent_job_id=None, test_retry=0, run_reset=True, 912 require_ssp=None, args=(), **kwargs): 913 """\ 914 Create and enqueue a job. 915 916 @param name name of this job 917 @param priority Integer priority of this job. Higher is more important. 918 @param control_file String contents of the control file. 919 @param control_type Type of control file, Client or Server. 920 @param synch_count How many machines the job uses per autoserv execution. 921 synch_count == 1 means the job is asynchronous. If an atomic group is 922 given this value is treated as a minimum. 923 @param is_template If true then create a template job. 924 @param timeout Hours after this call returns until the job times out. 925 @param timeout_mins Minutes after this call returns until the job times 926 out. 927 @param max_runtime_mins Minutes from job starting time until job times out 928 @param run_verify Should the host be verified before running the test? 929 @param email_list String containing emails to mail when the job is done 930 @param dependencies List of label names on which this job depends 931 @param reboot_before Never, If dirty, or Always 932 @param reboot_after Never, If all tests passed, or Always 933 @param parse_failed_repair if true, results of failed repairs launched by 934 this job will be parsed as part of the job. 935 @param hostless if true, create a hostless job 936 @param keyvals dict of keyvals to associate with the job 937 @param hosts List of hosts to run job on. 938 @param meta_hosts List where each entry is a label name, and for each entry 939 one host will be chosen from that label to run the job on. 940 @param one_time_hosts List of hosts not in the database to run the job on. 941 @param atomic_group_name The name of an atomic group to schedule the job on. 942 @param drone_set The name of the drone set to run this test on. 943 @param image OS image to install before running job. 944 @param parent_job_id id of a job considered to be parent of created job. 945 @param test_retry Number of times to retry test if the test did not 946 complete successfully. (optional, default: 0) 947 @param run_reset Should the host be reset before running the test? 948 @param require_ssp Set to True to require server-side packaging to run the 949 test. If it's set to None, drone will still try to run 950 the server side with server-side packaging. If the 951 autotest-server package doesn't exist for the build or 952 image is not set, drone will run the test without server- 953 side packaging. Default is None. 954 @param args A list of args to be injected into control file. 955 @param kwargs extra keyword args. NOT USED. 956 957 @returns The created Job id number. 958 """ 959 if args: 960 control_file = tools.inject_vars({'args': args}, control_file) 961 962 if image is None: 963 return rpc_utils.create_job_common( 964 **rpc_utils.get_create_job_common_args(locals())) 965 966 # Translate the image name, in case its a relative build name. 967 ds = dev_server.ImageServer.resolve(image) 968 image = ds.translate(image) 969 970 # When image is supplied use a known parameterized test already in the 971 # database to pass the OS image path from the front end, through the 972 # scheduler, and finally to autoserv as the --image parameter. 973 974 # The test autoupdate_ParameterizedJob is in afe_autotests and used to 975 # instantiate a Test object and from there a ParameterizedJob. 976 known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob') 977 known_parameterized_job = models.ParameterizedJob.objects.create( 978 test=known_test_obj) 979 980 # autoupdate_ParameterizedJob has a single parameter, the image parameter, 981 # stored in the table afe_test_parameters. We retrieve and set this 982 # instance of the parameter to the OS image path. 983 image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj, 984 name='image') 985 known_parameterized_job.parameterizedjobparameter_set.create( 986 test_parameter=image_parameter, parameter_value=image, 987 parameter_type='string') 988 989 # TODO(crbug.com/502638): save firmware build etc to parameterized_job. 990 991 # By passing a parameterized_job to create_job_common the job entry in 992 # the afe_jobs table will have the field parameterized_job_id set. 993 # The scheduler uses this id in the afe_parameterized_jobs table to 994 # match this job to our known test, and then with the 995 # afe_parameterized_job_parameters table to get the actual image path. 996 return rpc_utils.create_job_common( 997 parameterized_job=known_parameterized_job.id, 998 **rpc_utils.get_create_job_common_args(locals())) 999 1000 1001def abort_host_queue_entries(**filter_data): 1002 """\ 1003 Abort a set of host queue entries. 1004 1005 @return: A list of dictionaries, each contains information 1006 about an aborted HQE. 1007 """ 1008 query = models.HostQueueEntry.query_objects(filter_data) 1009 1010 # Dont allow aborts on: 1011 # 1. Jobs that have already completed (whether or not they were aborted) 1012 # 2. Jobs that we have already been aborted (but may not have completed) 1013 query = query.filter(complete=False).filter(aborted=False) 1014 models.AclGroup.check_abort_permissions(query) 1015 host_queue_entries = list(query.select_related()) 1016 rpc_utils.check_abort_synchronous_jobs(host_queue_entries) 1017 1018 models.HostQueueEntry.abort_host_queue_entries(host_queue_entries) 1019 hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id, 1020 'Job name': hqe.job.name} for hqe in host_queue_entries] 1021 return hqe_info 1022 1023 1024def abort_special_tasks(**filter_data): 1025 """\ 1026 Abort the special task, or tasks, specified in the filter. 1027 """ 1028 query = models.SpecialTask.query_objects(filter_data) 1029 special_tasks = query.filter(is_active=True) 1030 for task in special_tasks: 1031 task.abort() 1032 1033 1034def _call_special_tasks_on_hosts(task, hosts): 1035 """\ 1036 Schedules a set of hosts for a special task. 1037 1038 @returns A list of hostnames that a special task was created for. 1039 """ 1040 models.AclGroup.check_for_acl_violation_hosts(hosts) 1041 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1042 if shard_host_map and not utils.is_shard(): 1043 raise ValueError('The following hosts are on shards, please ' 1044 'follow the link to the shards and create jobs ' 1045 'there instead. %s.' % shard_host_map) 1046 for host in hosts: 1047 models.SpecialTask.schedule_special_task(host, task) 1048 return list(sorted(host.hostname for host in hosts)) 1049 1050 1051def _forward_special_tasks_on_hosts(task, rpc, **filter_data): 1052 """Forward special tasks to corresponding shards. 1053 1054 For master, when special tasks are fired on hosts that are sharded, 1055 forward the RPC to corresponding shards. 1056 1057 For shard, create special task records in local DB. 1058 1059 @param task: Enum value of frontend.afe.models.SpecialTask.Task 1060 @param rpc: RPC name to forward. 1061 @param filter_data: Filter keywords to be used for DB query. 1062 1063 @return: A list of hostnames that a special task was created for. 1064 """ 1065 hosts = models.Host.query_objects(filter_data) 1066 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True) 1067 1068 # Filter out hosts on a shard from those on the master, forward 1069 # rpcs to the shard with an additional hostname__in filter, and 1070 # create a local SpecialTask for each remaining host. 1071 if shard_host_map and not utils.is_shard(): 1072 hosts = [h for h in hosts if h.shard is None] 1073 for shard, hostnames in shard_host_map.iteritems(): 1074 1075 # The main client of this module is the frontend website, and 1076 # it invokes it with an 'id' or an 'id__in' filter. Regardless, 1077 # the 'hostname' filter should narrow down the list of hosts on 1078 # each shard even though we supply all the ids in filter_data. 1079 # This method uses hostname instead of id because it fits better 1080 # with the overall architecture of redirection functions in 1081 # rpc_utils. 1082 shard_filter = filter_data.copy() 1083 shard_filter['hostname__in'] = hostnames 1084 rpc_utils.run_rpc_on_multiple_hostnames( 1085 rpc, [shard], **shard_filter) 1086 1087 # There is a race condition here if someone assigns a shard to one of these 1088 # hosts before we create the task. The host will stay on the master if: 1089 # 1. The host is not Ready 1090 # 2. The host is Ready but has a task 1091 # But if the host is Ready and doesn't have a task yet, it will get sent 1092 # to the shard as we're creating a task here. 1093 1094 # Given that we only rarely verify Ready hosts it isn't worth putting this 1095 # entire method in a transaction. The worst case scenario is that we have 1096 # a verify running on a Ready host while the shard is using it, if the 1097 # verify fails no subsequent tasks will be created against the host on the 1098 # master, and verifies are safe enough that this is OK. 1099 return _call_special_tasks_on_hosts(task, hosts) 1100 1101 1102def reverify_hosts(**filter_data): 1103 """\ 1104 Schedules a set of hosts for verify. 1105 1106 @returns A list of hostnames that a verify task was created for. 1107 """ 1108 return _forward_special_tasks_on_hosts( 1109 models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data) 1110 1111 1112def repair_hosts(**filter_data): 1113 """\ 1114 Schedules a set of hosts for repair. 1115 1116 @returns A list of hostnames that a repair task was created for. 1117 """ 1118 return _forward_special_tasks_on_hosts( 1119 models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data) 1120 1121 1122def get_jobs(not_yet_run=False, running=False, finished=False, 1123 suite=False, sub=False, standalone=False, **filter_data): 1124 """\ 1125 Extra status filter args for get_jobs: 1126 -not_yet_run: Include only jobs that have not yet started running. 1127 -running: Include only jobs that have start running but for which not 1128 all hosts have completed. 1129 -finished: Include only jobs for which all hosts have completed (or 1130 aborted). 1131 1132 Extra type filter args for get_jobs: 1133 -suite: Include only jobs with child jobs. 1134 -sub: Include only jobs with a parent job. 1135 -standalone: Inlcude only jobs with no child or parent jobs. 1136 At most one of these three fields should be specified. 1137 """ 1138 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1139 running, 1140 finished) 1141 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1142 suite, 1143 sub, 1144 standalone) 1145 job_dicts = [] 1146 jobs = list(models.Job.query_objects(filter_data)) 1147 models.Job.objects.populate_relationships(jobs, models.Label, 1148 'dependencies') 1149 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') 1150 for job in jobs: 1151 job_dict = job.get_object_dict() 1152 job_dict['dependencies'] = ','.join(label.name 1153 for label in job.dependencies) 1154 job_dict['keyvals'] = dict((keyval.key, keyval.value) 1155 for keyval in job.keyvals) 1156 if job.parameterized_job: 1157 job_dict['image'] = get_parameterized_autoupdate_image_url(job) 1158 job_dicts.append(job_dict) 1159 return rpc_utils.prepare_for_serialization(job_dicts) 1160 1161 1162def get_num_jobs(not_yet_run=False, running=False, finished=False, 1163 suite=False, sub=False, standalone=False, 1164 **filter_data): 1165 """\ 1166 See get_jobs() for documentation of extra filter parameters. 1167 """ 1168 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1169 running, 1170 finished) 1171 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1172 suite, 1173 sub, 1174 standalone) 1175 return models.Job.query_count(filter_data) 1176 1177 1178def get_jobs_summary(**filter_data): 1179 """\ 1180 Like get_jobs(), but adds 'status_counts' and 'result_counts' field. 1181 1182 'status_counts' filed is a dictionary mapping status strings to the number 1183 of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}. 1184 1185 'result_counts' field is piped to tko's rpc_interface and has the return 1186 format specified under get_group_counts. 1187 """ 1188 jobs = get_jobs(**filter_data) 1189 ids = [job['id'] for job in jobs] 1190 all_status_counts = models.Job.objects.get_status_counts(ids) 1191 for job in jobs: 1192 job['status_counts'] = all_status_counts[job['id']] 1193 job['result_counts'] = tko_rpc_interface.get_status_counts( 1194 ['afe_job_id', 'afe_job_id'], 1195 header_groups=[['afe_job_id'], ['afe_job_id']], 1196 **{'afe_job_id': job['id']}) 1197 return rpc_utils.prepare_for_serialization(jobs) 1198 1199 1200def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None): 1201 """\ 1202 Retrieves all the information needed to clone a job. 1203 """ 1204 job = models.Job.objects.get(id=id) 1205 job_info = rpc_utils.get_job_info(job, 1206 preserve_metahosts, 1207 queue_entry_filter_data) 1208 1209 host_dicts = [] 1210 for host in job_info['hosts']: 1211 host_dict = get_hosts(id=host.id)[0] 1212 other_labels = host_dict['labels'] 1213 if host_dict['platform']: 1214 other_labels.remove(host_dict['platform']) 1215 host_dict['other_labels'] = ', '.join(other_labels) 1216 host_dicts.append(host_dict) 1217 1218 for host in job_info['one_time_hosts']: 1219 host_dict = dict(hostname=host.hostname, 1220 id=host.id, 1221 platform='(one-time host)', 1222 locked_text='') 1223 host_dicts.append(host_dict) 1224 1225 # convert keys from Label objects to strings (names of labels) 1226 meta_host_counts = dict((meta_host.name, count) for meta_host, count 1227 in job_info['meta_host_counts'].iteritems()) 1228 1229 info = dict(job=job.get_object_dict(), 1230 meta_host_counts=meta_host_counts, 1231 hosts=host_dicts) 1232 info['job']['dependencies'] = job_info['dependencies'] 1233 if job_info['atomic_group']: 1234 info['atomic_group_name'] = (job_info['atomic_group']).name 1235 else: 1236 info['atomic_group_name'] = None 1237 info['hostless'] = job_info['hostless'] 1238 info['drone_set'] = job.drone_set and job.drone_set.name 1239 1240 if job.parameterized_job: 1241 info['job']['image'] = get_parameterized_autoupdate_image_url(job) 1242 1243 return rpc_utils.prepare_for_serialization(info) 1244 1245 1246# host queue entries 1247 1248def get_host_queue_entries(start_time=None, end_time=None, **filter_data): 1249 """\ 1250 @returns A sequence of nested dictionaries of host and job information. 1251 """ 1252 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1253 'started_on__lte', 1254 start_time, 1255 end_time, 1256 **filter_data) 1257 return rpc_utils.prepare_rows_as_nested_dicts( 1258 models.HostQueueEntry.query_objects(filter_data), 1259 ('host', 'atomic_group', 'job')) 1260 1261 1262def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data): 1263 """\ 1264 Get the number of host queue entries associated with this job. 1265 """ 1266 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1267 'started_on__lte', 1268 start_time, 1269 end_time, 1270 **filter_data) 1271 return models.HostQueueEntry.query_count(filter_data) 1272 1273 1274def get_hqe_percentage_complete(**filter_data): 1275 """ 1276 Computes the fraction of host queue entries matching the given filter data 1277 that are complete. 1278 """ 1279 query = models.HostQueueEntry.query_objects(filter_data) 1280 complete_count = query.filter(complete=True).count() 1281 total_count = query.count() 1282 if total_count == 0: 1283 return 1 1284 return float(complete_count) / total_count 1285 1286 1287# special tasks 1288 1289def get_special_tasks(**filter_data): 1290 """Get special task entries from the local database. 1291 1292 Query the special tasks table for tasks matching the given 1293 `filter_data`, and return a list of the results. No attempt is 1294 made to forward the call to shards; the buck will stop here. 1295 The caller is expected to know the target shard for such reasons 1296 as: 1297 * The caller is a service (such as gs_offloader) configured 1298 to operate on behalf of one specific shard, and no other. 1299 * The caller has a host as a parameter, and knows that this is 1300 the shard assigned to that host. 1301 1302 @param filter_data Filter keywords to pass to the underlying 1303 database query. 1304 1305 """ 1306 return rpc_utils.prepare_rows_as_nested_dicts( 1307 models.SpecialTask.query_objects(filter_data), 1308 ('host', 'queue_entry')) 1309 1310 1311def get_host_special_tasks(host_id, **filter_data): 1312 """Get special task entries for a given host. 1313 1314 Query the special tasks table for tasks that ran on the host 1315 given by `host_id` and matching the given `filter_data`. 1316 Return a list of the results. If the host is assigned to a 1317 shard, forward this call to that shard. 1318 1319 @param host_id Id in the database of the target host. 1320 @param filter_data Filter keywords to pass to the underlying 1321 database query. 1322 1323 """ 1324 # Retrieve host data even if the host is in an invalid state. 1325 host = models.Host.smart_get(host_id, False) 1326 if not host.shard: 1327 return get_special_tasks(host_id=host_id, **filter_data) 1328 else: 1329 # The return values from AFE methods are post-processed 1330 # objects that aren't JSON-serializable. So, we have to 1331 # call AFE.run() to get the raw, serializable output from 1332 # the shard. 1333 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1334 return shard_afe.run('get_special_tasks', 1335 host_id=host_id, **filter_data) 1336 1337 1338def get_num_special_tasks(**kwargs): 1339 """Get the number of special task entries from the local database. 1340 1341 Query the special tasks table for tasks matching the given 'kwargs', 1342 and return the number of the results. No attempt is made to forward 1343 the call to shards; the buck will stop here. 1344 1345 @param kwargs Filter keywords to pass to the underlying database query. 1346 1347 """ 1348 return models.SpecialTask.query_count(kwargs) 1349 1350 1351def get_host_num_special_tasks(host, **kwargs): 1352 """Get special task entries for a given host. 1353 1354 Query the special tasks table for tasks that ran on the host 1355 given by 'host' and matching the given 'kwargs'. 1356 Return a list of the results. If the host is assigned to a 1357 shard, forward this call to that shard. 1358 1359 @param host id or name of a host. More often a hostname. 1360 @param kwargs Filter keywords to pass to the underlying database query. 1361 1362 """ 1363 # Retrieve host data even if the host is in an invalid state. 1364 host_model = models.Host.smart_get(host, False) 1365 if not host_model.shard: 1366 return get_num_special_tasks(host=host, **kwargs) 1367 else: 1368 shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname()) 1369 return shard_afe.run('get_num_special_tasks', host=host, **kwargs) 1370 1371 1372def get_status_task(host_id, end_time): 1373 """Get the "status task" for a host from the local shard. 1374 1375 Returns a single special task representing the given host's 1376 "status task". The status task is a completed special task that 1377 identifies whether the corresponding host was working or broken 1378 when it completed. A successful task indicates a working host; 1379 a failed task indicates broken. 1380 1381 This call will not be forward to a shard; the receiving server 1382 must be the shard that owns the host. 1383 1384 @param host_id Id in the database of the target host. 1385 @param end_time Time reference for the host's status. 1386 1387 @return A single task; its status (successful or not) 1388 corresponds to the status of the host (working or 1389 broken) at the given time. If no task is found, return 1390 `None`. 1391 1392 """ 1393 tasklist = rpc_utils.prepare_rows_as_nested_dicts( 1394 status_history.get_status_task(host_id, end_time), 1395 ('host', 'queue_entry')) 1396 return tasklist[0] if tasklist else None 1397 1398 1399def get_host_status_task(host_id, end_time): 1400 """Get the "status task" for a host from its owning shard. 1401 1402 Finds the given host's owning shard, and forwards to it a call 1403 to `get_status_task()` (see above). 1404 1405 @param host_id Id in the database of the target host. 1406 @param end_time Time reference for the host's status. 1407 1408 @return A single task; its status (successful or not) 1409 corresponds to the status of the host (working or 1410 broken) at the given time. If no task is found, return 1411 `None`. 1412 1413 """ 1414 host = models.Host.smart_get(host_id) 1415 if not host.shard: 1416 return get_status_task(host_id, end_time) 1417 else: 1418 # The return values from AFE methods are post-processed 1419 # objects that aren't JSON-serializable. So, we have to 1420 # call AFE.run() to get the raw, serializable output from 1421 # the shard. 1422 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1423 return shard_afe.run('get_status_task', 1424 host_id=host_id, end_time=end_time) 1425 1426 1427def get_host_diagnosis_interval(host_id, end_time, success): 1428 """Find a "diagnosis interval" for a given host. 1429 1430 A "diagnosis interval" identifies a start and end time where 1431 the host went from "working" to "broken", or vice versa. The 1432 interval's starting time is the starting time of the last status 1433 task with the old status; the end time is the finish time of the 1434 first status task with the new status. 1435 1436 This routine finds the most recent diagnosis interval for the 1437 given host prior to `end_time`, with a starting status matching 1438 `success`. If `success` is true, the interval will start with a 1439 successful status task; if false the interval will start with a 1440 failed status task. 1441 1442 @param host_id Id in the database of the target host. 1443 @param end_time Time reference for the diagnosis interval. 1444 @param success Whether the diagnosis interval should start 1445 with a successful or failed status task. 1446 1447 @return A list of two strings. The first is the timestamp for 1448 the beginning of the interval; the second is the 1449 timestamp for the end. If the host has never changed 1450 state, the list is empty. 1451 1452 """ 1453 host = models.Host.smart_get(host_id) 1454 if not host.shard or utils.is_shard(): 1455 return status_history.get_diagnosis_interval( 1456 host_id, end_time, success) 1457 else: 1458 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1459 return shard_afe.get_host_diagnosis_interval( 1460 host_id, end_time, success) 1461 1462 1463# support for host detail view 1464 1465def get_host_queue_entries_and_special_tasks(host, query_start=None, 1466 query_limit=None, start_time=None, 1467 end_time=None): 1468 """ 1469 @returns an interleaved list of HostQueueEntries and SpecialTasks, 1470 in approximate run order. each dict contains keys for type, host, 1471 job, status, started_on, execution_path, and ID. 1472 """ 1473 total_limit = None 1474 if query_limit is not None: 1475 total_limit = query_start + query_limit 1476 filter_data_common = {'host': host, 1477 'query_limit': total_limit, 1478 'sort_by': ['-id']} 1479 1480 filter_data_special_tasks = rpc_utils.inject_times_to_filter( 1481 'time_started__gte', 'time_started__lte', start_time, end_time, 1482 **filter_data_common) 1483 1484 queue_entries = get_host_queue_entries( 1485 start_time, end_time, **filter_data_common) 1486 special_tasks = get_host_special_tasks(host, **filter_data_special_tasks) 1487 1488 interleaved_entries = rpc_utils.interleave_entries(queue_entries, 1489 special_tasks) 1490 if query_start is not None: 1491 interleaved_entries = interleaved_entries[query_start:] 1492 if query_limit is not None: 1493 interleaved_entries = interleaved_entries[:query_limit] 1494 return rpc_utils.prepare_host_queue_entries_and_special_tasks( 1495 interleaved_entries, queue_entries) 1496 1497 1498def get_num_host_queue_entries_and_special_tasks(host, start_time=None, 1499 end_time=None): 1500 filter_data_common = {'host': host} 1501 1502 filter_data_queue_entries, filter_data_special_tasks = ( 1503 rpc_utils.inject_times_to_hqe_special_tasks_filters( 1504 filter_data_common, start_time, end_time)) 1505 1506 return (models.HostQueueEntry.query_count(filter_data_queue_entries) 1507 + get_host_num_special_tasks(**filter_data_special_tasks)) 1508 1509 1510# recurring run 1511 1512def get_recurring(**filter_data): 1513 return rpc_utils.prepare_rows_as_nested_dicts( 1514 models.RecurringRun.query_objects(filter_data), 1515 ('job', 'owner')) 1516 1517 1518def get_num_recurring(**filter_data): 1519 return models.RecurringRun.query_count(filter_data) 1520 1521 1522def delete_recurring_runs(**filter_data): 1523 to_delete = models.RecurringRun.query_objects(filter_data) 1524 to_delete.delete() 1525 1526 1527def create_recurring_run(job_id, start_date, loop_period, loop_count): 1528 owner = models.User.current_user().login 1529 job = models.Job.objects.get(id=job_id) 1530 return job.create_recurring_job(start_date=start_date, 1531 loop_period=loop_period, 1532 loop_count=loop_count, 1533 owner=owner) 1534 1535 1536# other 1537 1538def echo(data=""): 1539 """\ 1540 Returns a passed in string. For doing a basic test to see if RPC calls 1541 can successfully be made. 1542 """ 1543 return data 1544 1545 1546def get_motd(): 1547 """\ 1548 Returns the message of the day as a string. 1549 """ 1550 return rpc_utils.get_motd() 1551 1552 1553def get_static_data(): 1554 """\ 1555 Returns a dictionary containing a bunch of data that shouldn't change 1556 often and is otherwise inaccessible. This includes: 1557 1558 priorities: List of job priority choices. 1559 default_priority: Default priority value for new jobs. 1560 users: Sorted list of all users. 1561 labels: Sorted list of labels not start with 'cros-version' and 1562 'fw-version'. 1563 atomic_groups: Sorted list of all atomic groups. 1564 tests: Sorted list of all tests. 1565 profilers: Sorted list of all profilers. 1566 current_user: Logged-in username. 1567 host_statuses: Sorted list of possible Host statuses. 1568 job_statuses: Sorted list of possible HostQueueEntry statuses. 1569 job_timeout_default: The default job timeout length in minutes. 1570 parse_failed_repair_default: Default value for the parse_failed_repair job 1571 option. 1572 reboot_before_options: A list of valid RebootBefore string enums. 1573 reboot_after_options: A list of valid RebootAfter string enums. 1574 motd: Server's message of the day. 1575 status_dictionary: A mapping from one word job status names to a more 1576 informative description. 1577 """ 1578 1579 job_fields = models.Job.get_field_dict() 1580 default_drone_set_name = models.DroneSet.default_drone_set_name() 1581 drone_sets = ([default_drone_set_name] + 1582 sorted(drone_set.name for drone_set in 1583 models.DroneSet.objects.exclude( 1584 name=default_drone_set_name))) 1585 1586 result = {} 1587 result['priorities'] = priorities.Priority.choices() 1588 default_priority = priorities.Priority.DEFAULT 1589 result['default_priority'] = 'Default' 1590 result['max_schedulable_priority'] = priorities.Priority.DEFAULT 1591 result['users'] = get_users(sort_by=['login']) 1592 1593 label_exclude_filters = [{'name__startswith': 'cros-version'}, 1594 {'name__startswith': 'fw-version'}, 1595 {'name__startswith': 'fwrw-version'}, 1596 {'name__startswith': 'fwro-version'}] 1597 result['labels'] = get_labels( 1598 label_exclude_filters, 1599 sort_by=['-platform', 'name']) 1600 1601 result['atomic_groups'] = get_atomic_groups(sort_by=['name']) 1602 result['tests'] = get_tests(sort_by=['name']) 1603 result['profilers'] = get_profilers(sort_by=['name']) 1604 result['current_user'] = rpc_utils.prepare_for_serialization( 1605 models.User.current_user().get_object_dict()) 1606 result['host_statuses'] = sorted(models.Host.Status.names) 1607 result['job_statuses'] = sorted(models.HostQueueEntry.Status.names) 1608 result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS 1609 result['job_max_runtime_mins_default'] = ( 1610 models.Job.DEFAULT_MAX_RUNTIME_MINS) 1611 result['parse_failed_repair_default'] = bool( 1612 models.Job.DEFAULT_PARSE_FAILED_REPAIR) 1613 result['reboot_before_options'] = model_attributes.RebootBefore.names 1614 result['reboot_after_options'] = model_attributes.RebootAfter.names 1615 result['motd'] = rpc_utils.get_motd() 1616 result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled() 1617 result['drone_sets'] = drone_sets 1618 result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled() 1619 1620 result['status_dictionary'] = {"Aborted": "Aborted", 1621 "Verifying": "Verifying Host", 1622 "Provisioning": "Provisioning Host", 1623 "Pending": "Waiting on other hosts", 1624 "Running": "Running autoserv", 1625 "Completed": "Autoserv completed", 1626 "Failed": "Failed to complete", 1627 "Queued": "Queued", 1628 "Starting": "Next in host's queue", 1629 "Stopped": "Other host(s) failed verify", 1630 "Parsing": "Awaiting parse of final results", 1631 "Gathering": "Gathering log files", 1632 "Template": "Template job for recurring run", 1633 "Waiting": "Waiting for scheduler action", 1634 "Archiving": "Archiving results", 1635 "Resetting": "Resetting hosts"} 1636 1637 result['wmatrix_url'] = rpc_utils.get_wmatrix_url() 1638 result['is_moblab'] = bool(utils.is_moblab()) 1639 1640 return result 1641 1642 1643def get_server_time(): 1644 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M") 1645