1# pylint: disable=missing-docstring 2 3import logging 4from datetime import datetime 5import django.core 6try: 7 from django.db import models as dbmodels, connection 8except django.core.exceptions.ImproperlyConfigured: 9 raise ImportError('Django database not yet configured. Import either ' 10 'setup_django_environment or ' 11 'setup_django_lite_environment from ' 12 'autotest_lib.frontend before any imports that ' 13 'depend on django models.') 14from xml.sax import saxutils 15import common 16from autotest_lib.frontend.afe import model_logic, model_attributes 17from autotest_lib.frontend.afe import rdb_model_extensions 18from autotest_lib.frontend import settings, thread_local 19from autotest_lib.client.common_lib import enum, error, host_protections 20from autotest_lib.client.common_lib import global_config 21from autotest_lib.client.common_lib import host_queue_entry_states 22from autotest_lib.client.common_lib import control_data, priorities, decorators 23from autotest_lib.client.common_lib import utils 24from autotest_lib.server import utils as server_utils 25 26# job options and user preferences 27DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 28DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 29 30 31class AclAccessViolation(Exception): 32 """\ 33 Raised when an operation is attempted with proper permissions as 34 dictated by ACLs. 35 """ 36 37 38class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 39 """\ 40 An atomic group defines a collection of hosts which must only be scheduled 41 all at once. Any host with a label having an atomic group will only be 42 scheduled for a job at the same time as other hosts sharing that label. 43 44 Required: 45 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 46 max_number_of_machines: The maximum number of machines that will be 47 scheduled at once when scheduling jobs to this atomic group. 48 The job.synch_count is considered the minimum. 49 50 Optional: 51 description: Arbitrary text description of this group's purpose. 52 """ 53 name = dbmodels.CharField(max_length=255, unique=True) 54 description = dbmodels.TextField(blank=True) 55 # This magic value is the default to simplify the scheduler logic. 56 # It must be "large". The common use of atomic groups is to want all 57 # machines in the group to be used, limits on which subset used are 58 # often chosen via dependency labels. 59 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 60 # "infinity" is around 3.3 million. 61 INFINITE_MACHINES = 333333333 62 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 63 invalid = dbmodels.BooleanField(default=False, 64 editable=settings.FULL_ADMIN) 65 66 name_field = 'name' 67 objects = model_logic.ModelWithInvalidManager() 68 valid_objects = model_logic.ValidObjectsManager() 69 70 71 def enqueue_job(self, job, is_template=False): 72 """Enqueue a job on an associated atomic group of hosts. 73 74 @param job: A job to enqueue. 75 @param is_template: Whether the status should be "Template". 76 """ 77 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 78 is_template=is_template) 79 queue_entry.save() 80 81 82 def clean_object(self): 83 self.label_set.clear() 84 85 86 class Meta: 87 """Metadata for class AtomicGroup.""" 88 db_table = 'afe_atomic_groups' 89 90 91 def __unicode__(self): 92 return unicode(self.name) 93 94 95class Label(model_logic.ModelWithInvalid, dbmodels.Model): 96 """\ 97 Required: 98 name: label name 99 100 Optional: 101 kernel_config: URL/path to kernel config for jobs run on this label. 102 platform: If True, this is a platform label (defaults to False). 103 only_if_needed: If True, a Host with this label can only be used if that 104 label is requested by the job/test (either as the meta_host or 105 in the job_dependencies). 106 atomic_group: The atomic group associated with this label. 107 """ 108 name = dbmodels.CharField(max_length=255, unique=True) 109 kernel_config = dbmodels.CharField(max_length=255, blank=True) 110 platform = dbmodels.BooleanField(default=False) 111 invalid = dbmodels.BooleanField(default=False, 112 editable=settings.FULL_ADMIN) 113 only_if_needed = dbmodels.BooleanField(default=False) 114 115 name_field = 'name' 116 objects = model_logic.ModelWithInvalidManager() 117 valid_objects = model_logic.ValidObjectsManager() 118 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 119 120 121 def clean_object(self): 122 self.host_set.clear() 123 self.test_set.clear() 124 125 126 def enqueue_job(self, job, is_template=False): 127 """Enqueue a job on any host of this label. 128 129 @param job: A job to enqueue. 130 @param is_template: Whether the status should be "Template". 131 """ 132 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 133 is_template=is_template) 134 queue_entry.save() 135 136 137 138 class Meta: 139 """Metadata for class Label.""" 140 db_table = 'afe_labels' 141 142 143 def __unicode__(self): 144 return unicode(self.name) 145 146 147class Shard(dbmodels.Model, model_logic.ModelExtensions): 148 149 hostname = dbmodels.CharField(max_length=255, unique=True) 150 151 name_field = 'hostname' 152 153 labels = dbmodels.ManyToManyField(Label, blank=True, 154 db_table='afe_shards_labels') 155 156 class Meta: 157 """Metadata for class ParameterizedJob.""" 158 db_table = 'afe_shards' 159 160 161 def rpc_hostname(self): 162 """Get the rpc hostname of the shard. 163 164 @return: Just the shard hostname for all non-testing environments. 165 The address of the default gateway for vm testing environments. 166 """ 167 # TODO: Figure out a better solution for testing. Since no 2 shards 168 # can run on the same host, if the shard hostname is localhost we 169 # conclude that it must be a vm in a test cluster. In such situations 170 # a name of localhost:<port> is necessary to achieve the correct 171 # afe links/redirection from the frontend (this happens through the 172 # host), but for rpcs that are performed *on* the shard, they need to 173 # use the address of the gateway. 174 # In the virtual machine testing environment (i.e., puppylab), each 175 # shard VM has a hostname like localhost:<port>. In the real cluster 176 # environment, a shard node does not have 'localhost' for its hostname. 177 # The following hostname substitution is needed only for the VM 178 # in puppylab. 179 # The 'hostname' should not be replaced in the case of real cluster. 180 if utils.is_puppylab_vm(self.hostname): 181 hostname = self.hostname.split(':')[0] 182 return self.hostname.replace( 183 hostname, utils.DEFAULT_VM_GATEWAY) 184 return self.hostname 185 186 187class Drone(dbmodels.Model, model_logic.ModelExtensions): 188 """ 189 A scheduler drone 190 191 hostname: the drone's hostname 192 """ 193 hostname = dbmodels.CharField(max_length=255, unique=True) 194 195 name_field = 'hostname' 196 objects = model_logic.ExtendedManager() 197 198 199 def save(self, *args, **kwargs): 200 if not User.current_user().is_superuser(): 201 raise Exception('Only superusers may edit drones') 202 super(Drone, self).save(*args, **kwargs) 203 204 205 def delete(self): 206 if not User.current_user().is_superuser(): 207 raise Exception('Only superusers may delete drones') 208 super(Drone, self).delete() 209 210 211 class Meta: 212 """Metadata for class Drone.""" 213 db_table = 'afe_drones' 214 215 def __unicode__(self): 216 return unicode(self.hostname) 217 218 219class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 220 """ 221 A set of scheduler drones 222 223 These will be used by the scheduler to decide what drones a job is allowed 224 to run on. 225 226 name: the drone set's name 227 drones: the drones that are part of the set 228 """ 229 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 230 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 231 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 232 'SCHEDULER', 'default_drone_set_name', default=None) 233 234 name = dbmodels.CharField(max_length=255, unique=True) 235 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 236 237 name_field = 'name' 238 objects = model_logic.ExtendedManager() 239 240 241 def save(self, *args, **kwargs): 242 if not User.current_user().is_superuser(): 243 raise Exception('Only superusers may edit drone sets') 244 super(DroneSet, self).save(*args, **kwargs) 245 246 247 def delete(self): 248 if not User.current_user().is_superuser(): 249 raise Exception('Only superusers may delete drone sets') 250 super(DroneSet, self).delete() 251 252 253 @classmethod 254 def drone_sets_enabled(cls): 255 """Returns whether drone sets are enabled. 256 257 @param cls: Implicit class object. 258 """ 259 return cls.DRONE_SETS_ENABLED 260 261 262 @classmethod 263 def default_drone_set_name(cls): 264 """Returns the default drone set name. 265 266 @param cls: Implicit class object. 267 """ 268 return cls.DEFAULT_DRONE_SET_NAME 269 270 271 @classmethod 272 def get_default(cls): 273 """Gets the default drone set name, compatible with Job.add_object. 274 275 @param cls: Implicit class object. 276 """ 277 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 278 279 280 @classmethod 281 def resolve_name(cls, drone_set_name): 282 """ 283 Returns the name of one of these, if not None, in order of preference: 284 1) the drone set given, 285 2) the current user's default drone set, or 286 3) the global default drone set 287 288 or returns None if drone sets are disabled 289 290 @param cls: Implicit class object. 291 @param drone_set_name: A drone set name. 292 """ 293 if not cls.drone_sets_enabled(): 294 return None 295 296 user = User.current_user() 297 user_drone_set_name = user.drone_set and user.drone_set.name 298 299 return drone_set_name or user_drone_set_name or cls.get_default().name 300 301 302 def get_drone_hostnames(self): 303 """ 304 Gets the hostnames of all drones in this drone set 305 """ 306 return set(self.drones.all().values_list('hostname', flat=True)) 307 308 309 class Meta: 310 """Metadata for class DroneSet.""" 311 db_table = 'afe_drone_sets' 312 313 def __unicode__(self): 314 return unicode(self.name) 315 316 317class User(dbmodels.Model, model_logic.ModelExtensions): 318 """\ 319 Required: 320 login :user login name 321 322 Optional: 323 access_level: 0=User (default), 1=Admin, 100=Root 324 """ 325 ACCESS_ROOT = 100 326 ACCESS_ADMIN = 1 327 ACCESS_USER = 0 328 329 AUTOTEST_SYSTEM = 'autotest_system' 330 331 login = dbmodels.CharField(max_length=255, unique=True) 332 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 333 334 # user preferences 335 reboot_before = dbmodels.SmallIntegerField( 336 choices=model_attributes.RebootBefore.choices(), blank=True, 337 default=DEFAULT_REBOOT_BEFORE) 338 reboot_after = dbmodels.SmallIntegerField( 339 choices=model_attributes.RebootAfter.choices(), blank=True, 340 default=DEFAULT_REBOOT_AFTER) 341 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 342 show_experimental = dbmodels.BooleanField(default=False) 343 344 name_field = 'login' 345 objects = model_logic.ExtendedManager() 346 347 348 def save(self, *args, **kwargs): 349 # is this a new object being saved for the first time? 350 first_time = (self.id is None) 351 user = thread_local.get_user() 352 if user and not user.is_superuser() and user.login != self.login: 353 raise AclAccessViolation("You cannot modify user " + self.login) 354 super(User, self).save(*args, **kwargs) 355 if first_time: 356 everyone = AclGroup.objects.get(name='Everyone') 357 everyone.users.add(self) 358 359 360 def is_superuser(self): 361 """Returns whether the user has superuser access.""" 362 return self.access_level >= self.ACCESS_ROOT 363 364 365 @classmethod 366 def current_user(cls): 367 """Returns the current user. 368 369 @param cls: Implicit class object. 370 """ 371 user = thread_local.get_user() 372 if user is None: 373 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 374 user.access_level = cls.ACCESS_ROOT 375 user.save() 376 return user 377 378 379 @classmethod 380 def get_record(cls, data): 381 """Check the database for an identical record. 382 383 Check for a record with matching id and login. If one exists, 384 return it. If one does not exist there is a possibility that 385 the following cases have happened: 386 1. Same id, different login 387 We received: "1 chromeos-test" 388 And we have: "1 debug-user" 389 In this case we need to delete "1 debug_user" and insert 390 "1 chromeos-test". 391 392 2. Same login, different id: 393 We received: "1 chromeos-test" 394 And we have: "2 chromeos-test" 395 In this case we need to delete "2 chromeos-test" and insert 396 "1 chromeos-test". 397 398 As long as this method deletes bad records and raises the 399 DoesNotExist exception the caller will handle creating the 400 new record. 401 402 @raises: DoesNotExist, if a record with the matching login and id 403 does not exist. 404 """ 405 406 # Both the id and login should be uniqe but there are cases when 407 # we might already have a user with the same login/id because 408 # current_user will proactively create a user record if it doesn't 409 # exist. Since we want to avoid conflict between the master and 410 # shard, just delete any existing user records that don't match 411 # what we're about to deserialize from the master. 412 try: 413 return cls.objects.get(login=data['login'], id=data['id']) 414 except cls.DoesNotExist: 415 cls.delete_matching_record(login=data['login']) 416 cls.delete_matching_record(id=data['id']) 417 raise 418 419 420 class Meta: 421 """Metadata for class User.""" 422 db_table = 'afe_users' 423 424 def __unicode__(self): 425 return unicode(self.login) 426 427 428class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 429 model_logic.ModelWithAttributes): 430 """\ 431 Required: 432 hostname 433 434 optional: 435 locked: if true, host is locked and will not be queued 436 437 Internal: 438 From AbstractHostModel: 439 status: string describing status of host 440 invalid: true if the host has been deleted 441 protection: indicates what can be done to this host during repair 442 lock_time: DateTime at which the host was locked 443 dirty: true if the host has been used without being rebooted 444 Local: 445 locked_by: user that locked the host, or null if the host is unlocked 446 """ 447 448 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 449 'hostattribute_set', 450 'labels', 451 'shard']) 452 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) 453 454 455 def custom_deserialize_relation(self, link, data): 456 assert link == 'shard', 'Link %s should not be deserialized' % link 457 self.shard = Shard.deserialize(data) 458 459 460 # Note: Only specify foreign keys here, specify all native host columns in 461 # rdb_model_extensions instead. 462 Protection = host_protections.Protection 463 labels = dbmodels.ManyToManyField(Label, blank=True, 464 db_table='afe_hosts_labels') 465 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 466 name_field = 'hostname' 467 objects = model_logic.ModelWithInvalidManager() 468 valid_objects = model_logic.ValidObjectsManager() 469 leased_objects = model_logic.LeasedHostManager() 470 471 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 472 473 def __init__(self, *args, **kwargs): 474 super(Host, self).__init__(*args, **kwargs) 475 self._record_attributes(['status']) 476 477 478 @staticmethod 479 def create_one_time_host(hostname): 480 """Creates a one-time host. 481 482 @param hostname: The name for the host. 483 """ 484 query = Host.objects.filter(hostname=hostname) 485 if query.count() == 0: 486 host = Host(hostname=hostname, invalid=True) 487 host.do_validate() 488 else: 489 host = query[0] 490 if not host.invalid: 491 raise model_logic.ValidationError({ 492 'hostname' : '%s already exists in the autotest DB. ' 493 'Select it rather than entering it as a one time ' 494 'host.' % hostname 495 }) 496 host.protection = host_protections.Protection.DO_NOT_REPAIR 497 host.locked = False 498 host.save() 499 host.clean_object() 500 return host 501 502 503 @classmethod 504 def _assign_to_shard_nothing_helper(cls): 505 """Does nothing. 506 507 This method is called in the middle of assign_to_shard, and does 508 nothing. It exists to allow integration tests to simulate a race 509 condition.""" 510 511 512 @classmethod 513 def assign_to_shard(cls, shard, known_ids): 514 """Assigns hosts to a shard. 515 516 For all labels that have been assigned to a shard, all hosts that 517 have at least one of the shard's labels are assigned to the shard. 518 Hosts that are assigned to the shard but aren't already present on the 519 shard are returned. 520 521 Any boards that are in |known_ids| but that do not belong to the shard 522 are incorrect ids, which are also returned so that the shard can remove 523 them locally. 524 525 Board to shard mapping is many-to-one. Many different boards can be 526 hosted in a shard. However, DUTs of a single board cannot be distributed 527 into more than one shard. 528 529 @param shard: The shard object to assign labels/hosts for. 530 @param known_ids: List of all host-ids the shard already knows. 531 This is used to figure out which hosts should be sent 532 to the shard. If shard_ids were used instead, hosts 533 would only be transferred once, even if the client 534 failed persisting them. 535 The number of hosts usually lies in O(100), so the 536 overhead is acceptable. 537 538 @returns a tuple of (hosts objects that should be sent to the shard, 539 incorrect host ids that should not belong to] 540 shard) 541 """ 542 # Disclaimer: concurrent heartbeats should theoretically not occur in 543 # the current setup. As they may be introduced in the near future, 544 # this comment will be left here. 545 546 # Sending stuff twice is acceptable, but forgetting something isn't. 547 # Detecting duplicates on the client is easy, but here it's harder. The 548 # following options were considered: 549 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 550 # than select returned, as concurrently more hosts might have been 551 # inserted 552 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 553 # hosts for the shard, this is overhead 554 # - SELECT and then UPDATE only selected without requerying afterwards: 555 # returns the old state of the records. 556 new_hosts = [] 557 558 possible_new_host_ids = set(Host.objects.filter( 559 labels__in=shard.labels.all(), 560 leased=False 561 ).exclude( 562 id__in=known_ids, 563 ).values_list('pk', flat=True)) 564 565 # No-op in production, used to simulate race condition in tests. 566 cls._assign_to_shard_nothing_helper() 567 568 if possible_new_host_ids: 569 Host.objects.filter( 570 pk__in=possible_new_host_ids, 571 labels__in=shard.labels.all(), 572 leased=False 573 ).update(shard=shard) 574 new_hosts = list(Host.objects.filter( 575 pk__in=possible_new_host_ids, 576 shard=shard 577 ).all()) 578 579 invalid_host_ids = list(Host.objects.filter( 580 id__in=known_ids 581 ).exclude( 582 shard=shard 583 ).values_list('pk', flat=True)) 584 585 return new_hosts, invalid_host_ids 586 587 def resurrect_object(self, old_object): 588 super(Host, self).resurrect_object(old_object) 589 # invalid hosts can be in use by the scheduler (as one-time hosts), so 590 # don't change the status 591 self.status = old_object.status 592 593 594 def clean_object(self): 595 self.aclgroup_set.clear() 596 self.labels.clear() 597 598 599 def save(self, *args, **kwargs): 600 # extra spaces in the hostname can be a sneaky source of errors 601 self.hostname = self.hostname.strip() 602 # is this a new object being saved for the first time? 603 first_time = (self.id is None) 604 if not first_time: 605 AclGroup.check_for_acl_violation_hosts([self]) 606 # If locked is changed, send its status and user made the change to 607 # metaDB. Locks are important in host history because if a device is 608 # locked then we don't really care what state it is in. 609 if self.locked and not self.locked_by: 610 self.locked_by = User.current_user() 611 if not self.lock_time: 612 self.lock_time = datetime.now() 613 self.dirty = True 614 elif not self.locked and self.locked_by: 615 self.locked_by = None 616 self.lock_time = None 617 super(Host, self).save(*args, **kwargs) 618 if first_time: 619 everyone = AclGroup.objects.get(name='Everyone') 620 everyone.hosts.add(self) 621 self._check_for_updated_attributes() 622 623 624 def delete(self): 625 AclGroup.check_for_acl_violation_hosts([self]) 626 for queue_entry in self.hostqueueentry_set.all(): 627 queue_entry.deleted = True 628 queue_entry.abort() 629 super(Host, self).delete() 630 631 632 def on_attribute_changed(self, attribute, old_value): 633 assert attribute == 'status' 634 logging.info(self.hostname + ' -> ' + self.status) 635 636 637 def enqueue_job(self, job, is_template=False): 638 """Enqueue a job on this host. 639 640 @param job: A job to enqueue. 641 @param is_template: Whther the status should be "Template". 642 """ 643 queue_entry = HostQueueEntry.create(host=self, job=job, 644 is_template=is_template) 645 # allow recovery of dead hosts from the frontend 646 if not self.active_queue_entry() and self.is_dead(): 647 self.status = Host.Status.READY 648 self.save() 649 queue_entry.save() 650 651 block = IneligibleHostQueue(job=job, host=self) 652 block.save() 653 654 655 def platform(self): 656 """The platform of the host.""" 657 # TODO(showard): slighly hacky? 658 platforms = self.labels.filter(platform=True) 659 if len(platforms) == 0: 660 return None 661 return platforms[0] 662 platform.short_description = 'Platform' 663 664 665 @classmethod 666 def check_no_platform(cls, hosts): 667 """Verify the specified hosts have no associated platforms. 668 669 @param cls: Implicit class object. 670 @param hosts: The hosts to verify. 671 @raises model_logic.ValidationError if any hosts already have a 672 platform. 673 """ 674 Host.objects.populate_relationships(hosts, Label, 'label_list') 675 errors = [] 676 for host in hosts: 677 platforms = [label.name for label in host.label_list 678 if label.platform] 679 if platforms: 680 # do a join, just in case this host has multiple platforms, 681 # we'll be able to see it 682 errors.append('Host %s already has a platform: %s' % ( 683 host.hostname, ', '.join(platforms))) 684 if errors: 685 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 686 687 688 @classmethod 689 def check_board_labels_allowed(cls, hosts, new_labels=[]): 690 """Verify the specified hosts have valid board labels and the given 691 new board labels can be added. 692 693 @param cls: Implicit class object. 694 @param hosts: The hosts to verify. 695 @param new_labels: A list of labels to be added to the hosts. 696 697 @raises model_logic.ValidationError if any host has invalid board labels 698 or the given board labels cannot be added to the hsots. 699 """ 700 Host.objects.populate_relationships(hosts, Label, 'label_list') 701 errors = [] 702 for host in hosts: 703 boards = [label.name for label in host.label_list 704 if label.name.startswith('board:')] 705 if not server_utils.board_labels_allowed(boards + new_labels): 706 # do a join, just in case this host has multiple boards, 707 # we'll be able to see it 708 errors.append('Host %s already has board labels: %s' % ( 709 host.hostname, ', '.join(boards))) 710 if errors: 711 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 712 713 714 def is_dead(self): 715 """Returns whether the host is dead (has status repair failed).""" 716 return self.status == Host.Status.REPAIR_FAILED 717 718 719 def active_queue_entry(self): 720 """Returns the active queue entry for this host, or None if none.""" 721 active = list(self.hostqueueentry_set.filter(active=True)) 722 if not active: 723 return None 724 assert len(active) == 1, ('More than one active entry for ' 725 'host ' + self.hostname) 726 return active[0] 727 728 729 def _get_attribute_model_and_args(self, attribute): 730 return HostAttribute, dict(host=self, attribute=attribute) 731 732 733 @classmethod 734 def get_attribute_model(cls): 735 """Return the attribute model. 736 737 Override method in parent class. See ModelExtensions for details. 738 @returns: The attribute model of Host. 739 """ 740 return HostAttribute 741 742 743 class Meta: 744 """Metadata for the Host class.""" 745 db_table = 'afe_hosts' 746 747 748 def __unicode__(self): 749 return unicode(self.hostname) 750 751 752class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): 753 """Arbitrary keyvals associated with hosts.""" 754 755 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 756 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 757 host = dbmodels.ForeignKey(Host) 758 attribute = dbmodels.CharField(max_length=90) 759 value = dbmodels.CharField(max_length=300) 760 761 objects = model_logic.ExtendedManager() 762 763 class Meta: 764 """Metadata for the HostAttribute class.""" 765 db_table = 'afe_host_attributes' 766 767 768 @classmethod 769 def get_record(cls, data): 770 """Check the database for an identical record. 771 772 Use host_id and attribute to search for a existing record. 773 774 @raises: DoesNotExist, if no record found 775 @raises: MultipleObjectsReturned if multiple records found. 776 """ 777 # TODO(fdeng): We should use host_id and attribute together as 778 # a primary key in the db. 779 return cls.objects.get(host_id=data['host_id'], 780 attribute=data['attribute']) 781 782 783 @classmethod 784 def deserialize(cls, data): 785 """Override deserialize in parent class. 786 787 Do not deserialize id as id is not kept consistent on master and shards. 788 789 @param data: A dictionary of data to deserialize. 790 791 @returns: A HostAttribute object. 792 """ 793 if data: 794 data.pop('id') 795 return super(HostAttribute, cls).deserialize(data) 796 797 798class Test(dbmodels.Model, model_logic.ModelExtensions): 799 """\ 800 Required: 801 author: author name 802 description: description of the test 803 name: test name 804 time: short, medium, long 805 test_class: This describes the class for your the test belongs in. 806 test_category: This describes the category for your tests 807 test_type: Client or Server 808 path: path to pass to run_test() 809 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 810 async job. If it's >1 it's sync job for that number of machines 811 i.e. if sync_count = 2 it is a sync job that requires two 812 machines. 813 Optional: 814 dependencies: What the test requires to run. Comma deliminated list 815 dependency_labels: many-to-many relationship with labels corresponding to 816 test dependencies. 817 experimental: If this is set to True production servers will ignore the test 818 run_verify: Whether or not the scheduler should run the verify stage 819 run_reset: Whether or not the scheduler should run the reset stage 820 test_retry: Number of times to retry test if the test did not complete 821 successfully. (optional, default: 0) 822 """ 823 TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1) 824 825 name = dbmodels.CharField(max_length=255, unique=True) 826 author = dbmodels.CharField(max_length=255) 827 test_class = dbmodels.CharField(max_length=255) 828 test_category = dbmodels.CharField(max_length=255) 829 dependencies = dbmodels.CharField(max_length=255, blank=True) 830 description = dbmodels.TextField(blank=True) 831 experimental = dbmodels.BooleanField(default=True) 832 run_verify = dbmodels.BooleanField(default=False) 833 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 834 default=TestTime.MEDIUM) 835 test_type = dbmodels.SmallIntegerField( 836 choices=control_data.CONTROL_TYPE.choices()) 837 sync_count = dbmodels.IntegerField(default=1) 838 path = dbmodels.CharField(max_length=255, unique=True) 839 test_retry = dbmodels.IntegerField(blank=True, default=0) 840 run_reset = dbmodels.BooleanField(default=True) 841 842 dependency_labels = ( 843 dbmodels.ManyToManyField(Label, blank=True, 844 db_table='afe_autotests_dependency_labels')) 845 name_field = 'name' 846 objects = model_logic.ExtendedManager() 847 848 849 def admin_description(self): 850 """Returns a string representing the admin description.""" 851 escaped_description = saxutils.escape(self.description) 852 return '<span style="white-space:pre">%s</span>' % escaped_description 853 admin_description.allow_tags = True 854 admin_description.short_description = 'Description' 855 856 857 class Meta: 858 """Metadata for class Test.""" 859 db_table = 'afe_autotests' 860 861 def __unicode__(self): 862 return unicode(self.name) 863 864 865class TestParameter(dbmodels.Model): 866 """ 867 A declared parameter of a test 868 """ 869 test = dbmodels.ForeignKey(Test) 870 name = dbmodels.CharField(max_length=255) 871 872 class Meta: 873 """Metadata for class TestParameter.""" 874 db_table = 'afe_test_parameters' 875 unique_together = ('test', 'name') 876 877 def __unicode__(self): 878 return u'%s (%s)' % (self.name, self.test.name) 879 880 881class Profiler(dbmodels.Model, model_logic.ModelExtensions): 882 """\ 883 Required: 884 name: profiler name 885 test_type: Client or Server 886 887 Optional: 888 description: arbirary text description 889 """ 890 name = dbmodels.CharField(max_length=255, unique=True) 891 description = dbmodels.TextField(blank=True) 892 893 name_field = 'name' 894 objects = model_logic.ExtendedManager() 895 896 897 class Meta: 898 """Metadata for class Profiler.""" 899 db_table = 'afe_profilers' 900 901 def __unicode__(self): 902 return unicode(self.name) 903 904 905class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 906 """\ 907 Required: 908 name: name of ACL group 909 910 Optional: 911 description: arbitrary description of group 912 """ 913 914 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 915 916 name = dbmodels.CharField(max_length=255, unique=True) 917 description = dbmodels.CharField(max_length=255, blank=True) 918 users = dbmodels.ManyToManyField(User, blank=False, 919 db_table='afe_acl_groups_users') 920 hosts = dbmodels.ManyToManyField(Host, blank=True, 921 db_table='afe_acl_groups_hosts') 922 923 name_field = 'name' 924 objects = model_logic.ExtendedManager() 925 926 @staticmethod 927 def check_for_acl_violation_hosts(hosts): 928 """Verify the current user has access to the specified hosts. 929 930 @param hosts: The hosts to verify against. 931 @raises AclAccessViolation if the current user doesn't have access 932 to a host. 933 """ 934 user = User.current_user() 935 if user.is_superuser(): 936 return 937 accessible_host_ids = set( 938 host.id for host in Host.objects.filter(aclgroup__users=user)) 939 for host in hosts: 940 # Check if the user has access to this host, 941 # but only if it is not a metahost or a one-time-host. 942 no_access = (isinstance(host, Host) 943 and not host.invalid 944 and int(host.id) not in accessible_host_ids) 945 if no_access: 946 raise AclAccessViolation("%s does not have access to %s" % 947 (str(user), str(host))) 948 949 950 @staticmethod 951 def check_abort_permissions(queue_entries): 952 """Look for queue entries that aren't abortable by the current user. 953 954 An entry is not abortable if: 955 * the job isn't owned by this user, and 956 * the machine isn't ACL-accessible, or 957 * the machine is in the "Everyone" ACL 958 959 @param queue_entries: The queue entries to check. 960 @raises AclAccessViolation if a queue entry is not abortable by the 961 current user. 962 """ 963 user = User.current_user() 964 if user.is_superuser(): 965 return 966 not_owned = queue_entries.exclude(job__owner=user.login) 967 # I do this using ID sets instead of just Django filters because 968 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 969 # 1.0. 970 # TODO: Use Django filters, now that we're using 1.0. 971 accessible_ids = set( 972 entry.id for entry 973 in not_owned.filter(host__aclgroup__users__login=user.login)) 974 public_ids = set(entry.id for entry 975 in not_owned.filter(host__aclgroup__name='Everyone')) 976 cannot_abort = [entry for entry in not_owned.select_related() 977 if entry.id not in accessible_ids 978 or entry.id in public_ids] 979 if len(cannot_abort) == 0: 980 return 981 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 982 entry.host_or_metahost_name()) 983 for entry in cannot_abort) 984 raise AclAccessViolation('You cannot abort the following job entries: ' 985 + entry_names) 986 987 988 def check_for_acl_violation_acl_group(self): 989 """Verifies the current user has acces to this ACL group. 990 991 @raises AclAccessViolation if the current user doesn't have access to 992 this ACL group. 993 """ 994 user = User.current_user() 995 if user.is_superuser(): 996 return 997 if self.name == 'Everyone': 998 raise AclAccessViolation("You cannot modify 'Everyone'!") 999 if not user in self.users.all(): 1000 raise AclAccessViolation("You do not have access to %s" 1001 % self.name) 1002 1003 @staticmethod 1004 def on_host_membership_change(): 1005 """Invoked when host membership changes.""" 1006 everyone = AclGroup.objects.get(name='Everyone') 1007 1008 # find hosts that aren't in any ACL group and add them to Everyone 1009 # TODO(showard): this is a bit of a hack, since the fact that this query 1010 # works is kind of a coincidence of Django internals. This trick 1011 # doesn't work in general (on all foreign key relationships). I'll 1012 # replace it with a better technique when the need arises. 1013 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 1014 everyone.hosts.add(*orphaned_hosts.distinct()) 1015 1016 # find hosts in both Everyone and another ACL group, and remove them 1017 # from Everyone 1018 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 1019 acled_hosts = set() 1020 for host in hosts_in_everyone: 1021 # Has an ACL group other than Everyone 1022 if host.aclgroup_set.count() > 1: 1023 acled_hosts.add(host) 1024 everyone.hosts.remove(*acled_hosts) 1025 1026 1027 def delete(self): 1028 if (self.name == 'Everyone'): 1029 raise AclAccessViolation("You cannot delete 'Everyone'!") 1030 self.check_for_acl_violation_acl_group() 1031 super(AclGroup, self).delete() 1032 self.on_host_membership_change() 1033 1034 1035 def add_current_user_if_empty(self): 1036 """Adds the current user if the set of users is empty.""" 1037 if not self.users.count(): 1038 self.users.add(User.current_user()) 1039 1040 1041 def perform_after_save(self, change): 1042 """Called after a save. 1043 1044 @param change: Whether there was a change. 1045 """ 1046 if not change: 1047 self.users.add(User.current_user()) 1048 self.add_current_user_if_empty() 1049 self.on_host_membership_change() 1050 1051 1052 def save(self, *args, **kwargs): 1053 change = bool(self.id) 1054 if change: 1055 # Check the original object for an ACL violation 1056 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 1057 super(AclGroup, self).save(*args, **kwargs) 1058 self.perform_after_save(change) 1059 1060 1061 class Meta: 1062 """Metadata for class AclGroup.""" 1063 db_table = 'afe_acl_groups' 1064 1065 def __unicode__(self): 1066 return unicode(self.name) 1067 1068 1069class ParameterizedJob(dbmodels.Model): 1070 """ 1071 Auxiliary configuration for a parameterized job. 1072 1073 This class is obsolete, and ought to be dead. Due to a series of 1074 unfortunate events, it can't be deleted: 1075 * In `class Job` we're required to keep a reference to this class 1076 for the sake of the scheduler unit tests. 1077 * The existence of the reference in `Job` means that certain 1078 methods here will get called from the `get_jobs` RPC. 1079 So, the definitions below seem to be the minimum stub we can support 1080 unless/until we change the database schema. 1081 """ 1082 1083 @classmethod 1084 def smart_get(cls, id_or_name, *args, **kwargs): 1085 """For compatibility with Job.add_object. 1086 1087 @param cls: Implicit class object. 1088 @param id_or_name: The ID or name to get. 1089 @param args: Non-keyword arguments. 1090 @param kwargs: Keyword arguments. 1091 """ 1092 return cls.objects.get(pk=id_or_name) 1093 1094 1095 def job(self): 1096 """Returns the job if it exists, or else None.""" 1097 jobs = self.job_set.all() 1098 assert jobs.count() <= 1 1099 return jobs and jobs[0] or None 1100 1101 1102 class Meta: 1103 """Metadata for class ParameterizedJob.""" 1104 db_table = 'afe_parameterized_jobs' 1105 1106 def __unicode__(self): 1107 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 1108 1109 1110class JobManager(model_logic.ExtendedManager): 1111 'Custom manager to provide efficient status counts querying.' 1112 def get_status_counts(self, job_ids): 1113 """Returns a dict mapping the given job IDs to their status count dicts. 1114 1115 @param job_ids: A list of job IDs. 1116 """ 1117 if not job_ids: 1118 return {} 1119 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1120 cursor = connection.cursor() 1121 cursor.execute(""" 1122 SELECT job_id, status, aborted, complete, COUNT(*) 1123 FROM afe_host_queue_entries 1124 WHERE job_id IN %s 1125 GROUP BY job_id, status, aborted, complete 1126 """ % id_list) 1127 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1128 for job_id, status, aborted, complete, count in cursor.fetchall(): 1129 job_dict = all_job_counts[job_id] 1130 full_status = HostQueueEntry.compute_full_status(status, aborted, 1131 complete) 1132 job_dict.setdefault(full_status, 0) 1133 job_dict[full_status] += count 1134 return all_job_counts 1135 1136 1137class Job(dbmodels.Model, model_logic.ModelExtensions): 1138 """\ 1139 owner: username of job owner 1140 name: job name (does not have to be unique) 1141 priority: Integer priority value. Higher is more important. 1142 control_file: contents of control file 1143 control_type: Client or Server 1144 created_on: date of job creation 1145 submitted_on: date of job submission 1146 synch_count: how many hosts should be used per autoserv execution 1147 run_verify: Whether or not to run the verify phase 1148 run_reset: Whether or not to run the reset phase 1149 timeout: DEPRECATED - hours from queuing time until job times out 1150 timeout_mins: minutes from job queuing time until the job times out 1151 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1152 times out 1153 max_runtime_mins: minutes from job starting time until job times out 1154 email_list: list of people to email on completion delimited by any of: 1155 white space, ',', ':', ';' 1156 dependency_labels: many-to-many relationship with labels corresponding to 1157 job dependencies 1158 reboot_before: Never, If dirty, or Always 1159 reboot_after: Never, If all tests passed, or Always 1160 parse_failed_repair: if True, a failed repair launched by this job will have 1161 its results parsed as part of the job. 1162 drone_set: The set of drones to run this job on 1163 parent_job: Parent job (optional) 1164 test_retry: Number of times to retry test if the test did not complete 1165 successfully. (optional, default: 0) 1166 require_ssp: Require server-side packaging unless require_ssp is set to 1167 False. (optional, default: None) 1168 """ 1169 1170 # TODO: Investigate, if jobkeyval_set is really needed. 1171 # dynamic_suite will write them into an attached file for the drone, but 1172 # it doesn't seem like they are actually used. If they aren't used, remove 1173 # jobkeyval_set here. 1174 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1175 'hostqueueentry_set', 1176 'jobkeyval_set', 1177 'shard']) 1178 1179 # SQL for selecting jobs that should be sent to shard. 1180 # We use raw sql as django filters were not optimized. 1181 # The following jobs are excluded by the SQL. 1182 # - Non-aborted jobs known to shard as specified in |known_ids|. 1183 # Note for jobs aborted on master, even if already known to shard, 1184 # will be sent to shard again so that shard can abort them. 1185 # - Completed jobs 1186 # - Active jobs 1187 # - Jobs without host_queue_entries 1188 NON_ABORTED_KNOWN_JOBS = '(t2.aborted = 0 AND t1.id IN (%(known_ids)s))' 1189 1190 SQL_SHARD_JOBS = ( 1191 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1192 'INNER JOIN afe_host_queue_entries t2 ON ' 1193 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1194 ' %(check_known_jobs)s) ' 1195 'LEFT OUTER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) ' 1196 'JOIN afe_shards_labels t4 ' 1197 ' ON (t4.label_id = t3.label_id OR t4.label_id = t2.meta_host) ' 1198 'WHERE t4.shard_id = %(shard_id)s' 1199 ) 1200 1201 # Jobs can be created with assigned hosts and have no dependency 1202 # labels nor meta_host. 1203 # We are looking for: 1204 # - a job whose hqe's meta_host is null 1205 # - a job whose hqe has a host 1206 # - one of the host's labels matches the shard's label. 1207 # Non-aborted known jobs, completed jobs, active jobs, jobs 1208 # without hqe are exluded as we do with SQL_SHARD_JOBS. 1209 SQL_SHARD_JOBS_WITH_HOSTS = ( 1210 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1211 'INNER JOIN afe_host_queue_entries t2 ON ' 1212 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1213 ' AND t2.meta_host IS NULL AND t2.host_id IS NOT NULL ' 1214 ' %(check_known_jobs)s) ' 1215 'LEFT OUTER JOIN afe_hosts_labels t3 ON (t2.host_id = t3.host_id) ' 1216 'WHERE (t3.label_id IN ' 1217 ' (SELECT label_id FROM afe_shards_labels ' 1218 ' WHERE shard_id = %(shard_id)s))' 1219 ) 1220 1221 # Even if we had filters about complete, active and aborted 1222 # bits in the above two SQLs, there is a chance that 1223 # the result may still contain a job with an hqe with 'complete=1' 1224 # or 'active=1' or 'aborted=0 and afe_job.id in known jobs.' 1225 # This happens when a job has two (or more) hqes and at least 1226 # one hqe has different bits than others. 1227 # We use a second sql to ensure we exclude all un-desired jobs. 1228 SQL_JOBS_TO_EXCLUDE =( 1229 'SELECT t1.id FROM afe_jobs t1 ' 1230 'INNER JOIN afe_host_queue_entries t2 ON ' 1231 ' (t1.id = t2.job_id) ' 1232 'WHERE (t1.id in (%(candidates)s) ' 1233 ' AND (t2.complete=1 OR t2.active=1 ' 1234 ' %(check_known_jobs)s))' 1235 ) 1236 1237 def _deserialize_relation(self, link, data): 1238 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1239 for obj in data: 1240 obj['job_id'] = self.id 1241 1242 super(Job, self)._deserialize_relation(link, data) 1243 1244 1245 def custom_deserialize_relation(self, link, data): 1246 assert link == 'shard', 'Link %s should not be deserialized' % link 1247 self.shard = Shard.deserialize(data) 1248 1249 1250 def sanity_check_update_from_shard(self, shard, updated_serialized): 1251 # If the job got aborted on the master after the client fetched it 1252 # no shard_id will be set. The shard might still push updates though, 1253 # as the job might complete before the abort bit syncs to the shard. 1254 # Alternative considered: The master scheduler could be changed to not 1255 # set aborted jobs to completed that are sharded out. But that would 1256 # require database queries and seemed more complicated to implement. 1257 # This seems safe to do, as there won't be updates pushed from the wrong 1258 # shards should be powered off and wiped hen they are removed from the 1259 # master. 1260 if self.shard_id and self.shard_id != shard.id: 1261 raise error.IgnorableUnallowedRecordsSentToMaster( 1262 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1263 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1264 shard.id)) 1265 1266 1267 # TIMEOUT is deprecated. 1268 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1269 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1270 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1271 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1272 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1273 # completed. 1274 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1275 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1276 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1277 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1278 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1279 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, 1280 default=False) 1281 1282 owner = dbmodels.CharField(max_length=255) 1283 name = dbmodels.CharField(max_length=255) 1284 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1285 control_file = dbmodels.TextField(null=True, blank=True) 1286 control_type = dbmodels.SmallIntegerField( 1287 choices=control_data.CONTROL_TYPE.choices(), 1288 blank=True, # to allow 0 1289 default=control_data.CONTROL_TYPE.CLIENT) 1290 created_on = dbmodels.DateTimeField() 1291 synch_count = dbmodels.IntegerField(blank=True, default=0) 1292 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1293 run_verify = dbmodels.BooleanField(default=False) 1294 email_list = dbmodels.CharField(max_length=250, blank=True) 1295 dependency_labels = ( 1296 dbmodels.ManyToManyField(Label, blank=True, 1297 db_table='afe_jobs_dependency_labels')) 1298 reboot_before = dbmodels.SmallIntegerField( 1299 choices=model_attributes.RebootBefore.choices(), blank=True, 1300 default=DEFAULT_REBOOT_BEFORE) 1301 reboot_after = dbmodels.SmallIntegerField( 1302 choices=model_attributes.RebootAfter.choices(), blank=True, 1303 default=DEFAULT_REBOOT_AFTER) 1304 parse_failed_repair = dbmodels.BooleanField( 1305 default=DEFAULT_PARSE_FAILED_REPAIR) 1306 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1307 # completed. 1308 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1309 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1310 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1311 1312 # TODO(jrbarnette) We have to keep `parameterized_job` around or it 1313 # breaks the scheduler_models unit tests (and fixing the unit tests 1314 # will break the scheduler, so don't do that). 1315 # 1316 # The ultimate fix is to delete the column from the database table 1317 # at which point, you _must_ delete this. Until you're ready to do 1318 # that, DON'T MUCK WITH IT. 1319 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1320 blank=True) 1321 1322 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1323 1324 test_retry = dbmodels.IntegerField(blank=True, default=0) 1325 1326 run_reset = dbmodels.BooleanField(default=True) 1327 1328 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1329 1330 # If this is None on the master, a slave should be found. 1331 # If this is None on a slave, it should be synced back to the master 1332 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1333 1334 # If this is None, server-side packaging will be used for server side test, 1335 # unless it's disabled in global config AUTOSERV/enable_ssp_container. 1336 require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True) 1337 1338 # custom manager 1339 objects = JobManager() 1340 1341 1342 @decorators.cached_property 1343 def labels(self): 1344 """All the labels of this job""" 1345 # We need to convert dependency_labels to a list, because all() gives us 1346 # back an iterator, and storing/caching an iterator means we'd only be 1347 # able to read from it once. 1348 return list(self.dependency_labels.all()) 1349 1350 1351 def is_server_job(self): 1352 """Returns whether this job is of type server.""" 1353 return self.control_type == control_data.CONTROL_TYPE.SERVER 1354 1355 1356 @classmethod 1357 def create(cls, owner, options, hosts): 1358 """Creates a job. 1359 1360 The job is created by taking some information (the listed args) and 1361 filling in the rest of the necessary information. 1362 1363 @param cls: Implicit class object. 1364 @param owner: The owner for the job. 1365 @param options: An options object. 1366 @param hosts: The hosts to use. 1367 """ 1368 AclGroup.check_for_acl_violation_hosts(hosts) 1369 1370 control_file = options.get('control_file') 1371 1372 user = User.current_user() 1373 if options.get('reboot_before') is None: 1374 options['reboot_before'] = user.get_reboot_before_display() 1375 if options.get('reboot_after') is None: 1376 options['reboot_after'] = user.get_reboot_after_display() 1377 1378 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1379 1380 if options.get('timeout_mins') is None and options.get('timeout'): 1381 options['timeout_mins'] = options['timeout'] * 60 1382 1383 job = cls.add_object( 1384 owner=owner, 1385 name=options['name'], 1386 priority=options['priority'], 1387 control_file=control_file, 1388 control_type=options['control_type'], 1389 synch_count=options.get('synch_count'), 1390 # timeout needs to be deleted in the future. 1391 timeout=options.get('timeout'), 1392 timeout_mins=options.get('timeout_mins'), 1393 max_runtime_mins=options.get('max_runtime_mins'), 1394 run_verify=options.get('run_verify'), 1395 email_list=options.get('email_list'), 1396 reboot_before=options.get('reboot_before'), 1397 reboot_after=options.get('reboot_after'), 1398 parse_failed_repair=options.get('parse_failed_repair'), 1399 created_on=datetime.now(), 1400 drone_set=drone_set, 1401 parent_job=options.get('parent_job_id'), 1402 test_retry=options.get('test_retry'), 1403 run_reset=options.get('run_reset'), 1404 require_ssp=options.get('require_ssp')) 1405 1406 job.dependency_labels = options['dependencies'] 1407 1408 if options.get('keyvals'): 1409 for key, value in options['keyvals'].iteritems(): 1410 JobKeyval.objects.create(job=job, key=key, value=value) 1411 1412 return job 1413 1414 1415 @classmethod 1416 def assign_to_shard(cls, shard, known_ids): 1417 """Assigns unassigned jobs to a shard. 1418 1419 For all labels that have been assigned to this shard, all jobs that 1420 have this label, are assigned to this shard. 1421 1422 Jobs that are assigned to the shard but aren't already present on the 1423 shard are returned. 1424 1425 @param shard: The shard to assign jobs to. 1426 @param known_ids: List of all ids of incomplete jobs, the shard already 1427 knows about. 1428 This is used to figure out which jobs should be sent 1429 to the shard. If shard_ids were used instead, jobs 1430 would only be transferred once, even if the client 1431 failed persisting them. 1432 The number of unfinished jobs usually lies in O(1000). 1433 Assuming one id takes 8 chars in the json, this means 1434 overhead that lies in the lower kilobyte range. 1435 A not in query with 5000 id's takes about 30ms. 1436 1437 @returns The job objects that should be sent to the shard. 1438 """ 1439 # Disclaimer: Concurrent heartbeats should not occur in today's setup. 1440 # If this changes or they are triggered manually, this applies: 1441 # Jobs may be returned more than once by concurrent calls of this 1442 # function, as there is a race condition between SELECT and UPDATE. 1443 job_ids = set([]) 1444 check_known_jobs_exclude = '' 1445 check_known_jobs_include = '' 1446 1447 if known_ids: 1448 check_known_jobs = ( 1449 cls.NON_ABORTED_KNOWN_JOBS % 1450 {'known_ids': ','.join([str(i) for i in known_ids])}) 1451 check_known_jobs_exclude = 'AND NOT ' + check_known_jobs 1452 check_known_jobs_include = 'OR ' + check_known_jobs 1453 1454 for sql in [cls.SQL_SHARD_JOBS, cls.SQL_SHARD_JOBS_WITH_HOSTS]: 1455 query = Job.objects.raw(sql % { 1456 'check_known_jobs': check_known_jobs_exclude, 1457 'shard_id': shard.id}) 1458 job_ids |= set([j.id for j in query]) 1459 1460 if job_ids: 1461 query = Job.objects.raw( 1462 cls.SQL_JOBS_TO_EXCLUDE % 1463 {'check_known_jobs': check_known_jobs_include, 1464 'candidates': ','.join([str(i) for i in job_ids])}) 1465 job_ids -= set([j.id for j in query]) 1466 1467 if job_ids: 1468 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1469 return list(Job.objects.filter(pk__in=job_ids).all()) 1470 return [] 1471 1472 1473 def queue(self, hosts, is_template=False): 1474 """Enqueue a job on the given hosts. 1475 1476 @param hosts: The hosts to use. 1477 @param is_template: Whether the status should be "Template". 1478 """ 1479 if not hosts: 1480 # hostless job 1481 entry = HostQueueEntry.create(job=self, is_template=is_template) 1482 entry.save() 1483 return 1484 1485 for host in hosts: 1486 host.enqueue_job(self, is_template=is_template) 1487 1488 1489 def user(self): 1490 """Gets the user of this job, or None if it doesn't exist.""" 1491 try: 1492 return User.objects.get(login=self.owner) 1493 except self.DoesNotExist: 1494 return None 1495 1496 1497 def abort(self): 1498 """Aborts this job.""" 1499 for queue_entry in self.hostqueueentry_set.all(): 1500 queue_entry.abort() 1501 1502 1503 def tag(self): 1504 """Returns a string tag for this job.""" 1505 return server_utils.get_job_tag(self.id, self.owner) 1506 1507 1508 def keyval_dict(self): 1509 """Returns all keyvals for this job as a dictionary.""" 1510 return dict((keyval.key, keyval.value) 1511 for keyval in self.jobkeyval_set.all()) 1512 1513 1514 @classmethod 1515 def get_attribute_model(cls): 1516 """Return the attribute model. 1517 1518 Override method in parent class. This class is called when 1519 deserializing the one-to-many relationship betwen Job and JobKeyval. 1520 On deserialization, we will try to clear any existing job keyvals 1521 associated with a job to avoid any inconsistency. 1522 Though Job doesn't implement ModelWithAttribute, we still treat 1523 it as an attribute model for this purpose. 1524 1525 @returns: The attribute model of Job. 1526 """ 1527 return JobKeyval 1528 1529 1530 class Meta: 1531 """Metadata for class Job.""" 1532 db_table = 'afe_jobs' 1533 1534 def __unicode__(self): 1535 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1536 1537 1538class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1539 """Keyvals associated with jobs""" 1540 1541 SERIALIZATION_LINKS_TO_KEEP = set(['job']) 1542 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 1543 1544 job = dbmodels.ForeignKey(Job) 1545 key = dbmodels.CharField(max_length=90) 1546 value = dbmodels.CharField(max_length=300) 1547 1548 objects = model_logic.ExtendedManager() 1549 1550 1551 @classmethod 1552 def get_record(cls, data): 1553 """Check the database for an identical record. 1554 1555 Use job_id and key to search for a existing record. 1556 1557 @raises: DoesNotExist, if no record found 1558 @raises: MultipleObjectsReturned if multiple records found. 1559 """ 1560 # TODO(fdeng): We should use job_id and key together as 1561 # a primary key in the db. 1562 return cls.objects.get(job_id=data['job_id'], key=data['key']) 1563 1564 1565 @classmethod 1566 def deserialize(cls, data): 1567 """Override deserialize in parent class. 1568 1569 Do not deserialize id as id is not kept consistent on master and shards. 1570 1571 @param data: A dictionary of data to deserialize. 1572 1573 @returns: A JobKeyval object. 1574 """ 1575 if data: 1576 data.pop('id') 1577 return super(JobKeyval, cls).deserialize(data) 1578 1579 1580 class Meta: 1581 """Metadata for class JobKeyval.""" 1582 db_table = 'afe_job_keyvals' 1583 1584 1585class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1586 """Represents an ineligible host queue.""" 1587 job = dbmodels.ForeignKey(Job) 1588 host = dbmodels.ForeignKey(Host) 1589 1590 objects = model_logic.ExtendedManager() 1591 1592 class Meta: 1593 """Metadata for class IneligibleHostQueue.""" 1594 db_table = 'afe_ineligible_host_queues' 1595 1596 1597class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1598 """Represents a host queue entry.""" 1599 1600 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1601 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 1602 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) 1603 1604 1605 def custom_deserialize_relation(self, link, data): 1606 assert link == 'meta_host' 1607 self.meta_host = Label.deserialize(data) 1608 1609 1610 def sanity_check_update_from_shard(self, shard, updated_serialized, 1611 job_ids_sent): 1612 if self.job_id not in job_ids_sent: 1613 raise error.IgnorableUnallowedRecordsSentToMaster( 1614 'Sent HostQueueEntry without corresponding ' 1615 'job entry: %s' % updated_serialized) 1616 1617 1618 Status = host_queue_entry_states.Status 1619 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1620 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1621 PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES 1622 IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES 1623 1624 job = dbmodels.ForeignKey(Job) 1625 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1626 status = dbmodels.CharField(max_length=255) 1627 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1628 db_column='meta_host') 1629 active = dbmodels.BooleanField(default=False) 1630 complete = dbmodels.BooleanField(default=False) 1631 deleted = dbmodels.BooleanField(default=False) 1632 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1633 default='') 1634 # If atomic_group is set, this is a virtual HostQueueEntry that will 1635 # be expanded into many actual hosts within the group at schedule time. 1636 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1637 aborted = dbmodels.BooleanField(default=False) 1638 started_on = dbmodels.DateTimeField(null=True, blank=True) 1639 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1640 1641 objects = model_logic.ExtendedManager() 1642 1643 1644 def __init__(self, *args, **kwargs): 1645 super(HostQueueEntry, self).__init__(*args, **kwargs) 1646 self._record_attributes(['status']) 1647 1648 1649 @classmethod 1650 def create(cls, job, host=None, meta_host=None, 1651 is_template=False): 1652 """Creates a new host queue entry. 1653 1654 @param cls: Implicit class object. 1655 @param job: The associated job. 1656 @param host: The associated host. 1657 @param meta_host: The associated meta host. 1658 @param is_template: Whether the status should be "Template". 1659 """ 1660 if is_template: 1661 status = cls.Status.TEMPLATE 1662 else: 1663 status = cls.Status.QUEUED 1664 1665 return cls(job=job, host=host, meta_host=meta_host, status=status) 1666 1667 1668 def save(self, *args, **kwargs): 1669 self._set_active_and_complete() 1670 super(HostQueueEntry, self).save(*args, **kwargs) 1671 self._check_for_updated_attributes() 1672 1673 1674 def execution_path(self): 1675 """ 1676 Path to this entry's results (relative to the base results directory). 1677 """ 1678 return server_utils.get_hqe_exec_path(self.job.tag(), 1679 self.execution_subdir) 1680 1681 1682 def host_or_metahost_name(self): 1683 """Returns the first non-None name found in priority order. 1684 1685 The priority order checked is: (1) host name; (2) meta host name 1686 """ 1687 if self.host: 1688 return self.host.hostname 1689 else: 1690 assert self.meta_host 1691 return self.meta_host.name 1692 1693 1694 def _set_active_and_complete(self): 1695 if self.status in self.ACTIVE_STATUSES: 1696 self.active, self.complete = True, False 1697 elif self.status in self.COMPLETE_STATUSES: 1698 self.active, self.complete = False, True 1699 else: 1700 self.active, self.complete = False, False 1701 1702 1703 def on_attribute_changed(self, attribute, old_value): 1704 assert attribute == 'status' 1705 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 1706 self.status) 1707 1708 1709 def is_meta_host_entry(self): 1710 'True if this is a entry has a meta_host instead of a host.' 1711 return self.host is None and self.meta_host is not None 1712 1713 1714 # This code is shared between rpc_interface and models.HostQueueEntry. 1715 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 1716 # a class method was the best way to refactor it. Attempting to put it in 1717 # rpc_utils or a new utils module failed as that would require us to import 1718 # models.py but to call it from here we would have to import the utils.py 1719 # thus creating a cycle. 1720 @classmethod 1721 def abort_host_queue_entries(cls, host_queue_entries): 1722 """Aborts a collection of host_queue_entries. 1723 1724 Abort these host queue entry and all host queue entries of jobs created 1725 by them. 1726 1727 @param host_queue_entries: List of host queue entries we want to abort. 1728 """ 1729 # This isn't completely immune to race conditions since it's not atomic, 1730 # but it should be safe given the scheduler's behavior. 1731 1732 # TODO(milleral): crbug.com/230100 1733 # The |abort_host_queue_entries| rpc does nearly exactly this, 1734 # however, trying to re-use the code generates some horrible 1735 # circular import error. I'd be nice to refactor things around 1736 # sometime so the code could be reused. 1737 1738 # Fixpoint algorithm to find the whole tree of HQEs to abort to 1739 # minimize the total number of database queries: 1740 children = set() 1741 new_children = set(host_queue_entries) 1742 while new_children: 1743 children.update(new_children) 1744 new_child_ids = [hqe.job_id for hqe in new_children] 1745 new_children = HostQueueEntry.objects.filter( 1746 job__parent_job__in=new_child_ids, 1747 complete=False, aborted=False).all() 1748 # To handle circular parental relationships 1749 new_children = set(new_children) - children 1750 1751 # Associate a user with the host queue entries that we're about 1752 # to abort so that we can look up who to blame for the aborts. 1753 now = datetime.now() 1754 user = User.current_user() 1755 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 1756 aborted_by=user, aborted_on=now) for hqe in children] 1757 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 1758 # Bulk update all of the HQEs to set the abort bit. 1759 child_ids = [hqe.id for hqe in children] 1760 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 1761 1762 1763 def abort(self): 1764 """ Aborts this host queue entry. 1765 1766 Abort this host queue entry and all host queue entries of jobs created by 1767 this one. 1768 1769 """ 1770 if not self.complete and not self.aborted: 1771 HostQueueEntry.abort_host_queue_entries([self]) 1772 1773 1774 @classmethod 1775 def compute_full_status(cls, status, aborted, complete): 1776 """Returns a modified status msg if the host queue entry was aborted. 1777 1778 @param cls: Implicit class object. 1779 @param status: The original status message. 1780 @param aborted: Whether the host queue entry was aborted. 1781 @param complete: Whether the host queue entry was completed. 1782 """ 1783 if aborted and not complete: 1784 return 'Aborted (%s)' % status 1785 return status 1786 1787 1788 def full_status(self): 1789 """Returns the full status of this host queue entry, as a string.""" 1790 return self.compute_full_status(self.status, self.aborted, 1791 self.complete) 1792 1793 1794 def _postprocess_object_dict(self, object_dict): 1795 object_dict['full_status'] = self.full_status() 1796 1797 1798 class Meta: 1799 """Metadata for class HostQueueEntry.""" 1800 db_table = 'afe_host_queue_entries' 1801 1802 1803 1804 def __unicode__(self): 1805 hostname = None 1806 if self.host: 1807 hostname = self.host.hostname 1808 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 1809 1810 1811class HostQueueEntryStartTimes(dbmodels.Model): 1812 """An auxilary table to HostQueueEntry to index by start time.""" 1813 insert_time = dbmodels.DateTimeField() 1814 highest_hqe_id = dbmodels.IntegerField() 1815 1816 class Meta: 1817 """Metadata for class HostQueueEntryStartTimes.""" 1818 db_table = 'afe_host_queue_entry_start_times' 1819 1820 1821class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1822 """Represents an aborted host queue entry.""" 1823 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 1824 aborted_by = dbmodels.ForeignKey(User) 1825 aborted_on = dbmodels.DateTimeField() 1826 1827 objects = model_logic.ExtendedManager() 1828 1829 1830 def save(self, *args, **kwargs): 1831 self.aborted_on = datetime.now() 1832 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 1833 1834 class Meta: 1835 """Metadata for class AbortedHostQueueEntry.""" 1836 db_table = 'afe_aborted_host_queue_entries' 1837 1838 1839class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 1840 """\ 1841 Tasks to run on hosts at the next time they are in the Ready state. Use this 1842 for high-priority tasks, such as forced repair or forced reinstall. 1843 1844 host: host to run this task on 1845 task: special task to run 1846 time_requested: date and time the request for this task was made 1847 is_active: task is currently running 1848 is_complete: task has finished running 1849 is_aborted: task was aborted 1850 time_started: date and time the task started 1851 time_finished: date and time the task finished 1852 queue_entry: Host queue entry waiting on this task (or None, if task was not 1853 started in preparation of a job) 1854 """ 1855 Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision', 1856 string_values=True) 1857 1858 host = dbmodels.ForeignKey(Host, blank=False, null=False) 1859 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 1860 blank=False, null=False) 1861 requested_by = dbmodels.ForeignKey(User) 1862 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 1863 null=False) 1864 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 1865 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 1866 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 1867 time_started = dbmodels.DateTimeField(null=True, blank=True) 1868 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 1869 success = dbmodels.BooleanField(default=False, blank=False, null=False) 1870 time_finished = dbmodels.DateTimeField(null=True, blank=True) 1871 1872 objects = model_logic.ExtendedManager() 1873 1874 1875 def save(self, **kwargs): 1876 if self.queue_entry: 1877 self.requested_by = User.objects.get( 1878 login=self.queue_entry.job.owner) 1879 super(SpecialTask, self).save(**kwargs) 1880 1881 1882 def execution_path(self): 1883 """Returns the execution path for a special task.""" 1884 return server_utils.get_special_task_exec_path( 1885 self.host.hostname, self.id, self.task, self.time_requested) 1886 1887 1888 # property to emulate HostQueueEntry.status 1889 @property 1890 def status(self): 1891 """Returns a host queue entry status appropriate for a speical task.""" 1892 return server_utils.get_special_task_status( 1893 self.is_complete, self.success, self.is_active) 1894 1895 1896 # property to emulate HostQueueEntry.started_on 1897 @property 1898 def started_on(self): 1899 """Returns the time at which this special task started.""" 1900 return self.time_started 1901 1902 1903 @classmethod 1904 def schedule_special_task(cls, host, task): 1905 """Schedules a special task on a host if not already scheduled. 1906 1907 @param cls: Implicit class object. 1908 @param host: The host to use. 1909 @param task: The task to schedule. 1910 """ 1911 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 1912 is_active=False, 1913 is_complete=False) 1914 if existing_tasks: 1915 return existing_tasks[0] 1916 1917 special_task = SpecialTask(host=host, task=task, 1918 requested_by=User.current_user()) 1919 special_task.save() 1920 return special_task 1921 1922 1923 def abort(self): 1924 """ Abort this special task.""" 1925 self.is_aborted = True 1926 self.save() 1927 1928 1929 def activate(self): 1930 """ 1931 Sets a task as active and sets the time started to the current time. 1932 """ 1933 logging.info('Starting: %s', self) 1934 self.is_active = True 1935 self.time_started = datetime.now() 1936 self.save() 1937 1938 1939 def finish(self, success): 1940 """Sets a task as completed. 1941 1942 @param success: Whether or not the task was successful. 1943 """ 1944 logging.info('Finished: %s', self) 1945 self.is_active = False 1946 self.is_complete = True 1947 self.success = success 1948 if self.time_started: 1949 self.time_finished = datetime.now() 1950 self.save() 1951 1952 1953 class Meta: 1954 """Metadata for class SpecialTask.""" 1955 db_table = 'afe_special_tasks' 1956 1957 1958 def __unicode__(self): 1959 result = u'Special Task %s (host %s, task %s, time %s)' % ( 1960 self.id, self.host, self.task, self.time_requested) 1961 if self.is_complete: 1962 result += u' (completed)' 1963 elif self.is_active: 1964 result += u' (active)' 1965 1966 return result 1967 1968 1969class StableVersion(dbmodels.Model, model_logic.ModelExtensions): 1970 1971 board = dbmodels.CharField(max_length=255, unique=True) 1972 version = dbmodels.CharField(max_length=255) 1973 1974 class Meta: 1975 """Metadata for class StableVersion.""" 1976 db_table = 'afe_stable_versions' 1977