1# pylint: disable=missing-docstring 2 3import logging 4from datetime import datetime 5import django.core 6try: 7 from django.db import models as dbmodels, connection 8except django.core.exceptions.ImproperlyConfigured: 9 raise ImportError('Django database not yet configured. Import either ' 10 'setup_django_environment or ' 11 'setup_django_lite_environment from ' 12 'autotest_lib.frontend before any imports that ' 13 'depend on django models.') 14from django.db import utils as django_utils 15from xml.sax import saxutils 16import common 17from autotest_lib.frontend.afe import model_logic, model_attributes 18from autotest_lib.frontend.afe import rdb_model_extensions 19from autotest_lib.frontend import settings, thread_local 20from autotest_lib.client.common_lib import enum, error, host_protections 21from autotest_lib.client.common_lib import global_config 22from autotest_lib.client.common_lib import host_queue_entry_states 23from autotest_lib.client.common_lib import control_data, priorities, decorators 24from autotest_lib.server import utils as server_utils 25 26# job options and user preferences 27DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 28DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 29 30RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 31 'SKYLAB', 'respect_static_labels', type=bool, default=False) 32 33RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value( 34 'SKYLAB', 'respect_static_attributes', type=bool, default=False) 35 36 37class AclAccessViolation(Exception): 38 """\ 39 Raised when an operation is attempted with proper permissions as 40 dictated by ACLs. 41 """ 42 43 44class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 45 """\ 46 An atomic group defines a collection of hosts which must only be scheduled 47 all at once. Any host with a label having an atomic group will only be 48 scheduled for a job at the same time as other hosts sharing that label. 49 50 Required: 51 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 52 max_number_of_machines: The maximum number of machines that will be 53 scheduled at once when scheduling jobs to this atomic group. 54 The job.synch_count is considered the minimum. 55 56 Optional: 57 description: Arbitrary text description of this group's purpose. 58 """ 59 name = dbmodels.CharField(max_length=255, unique=True) 60 description = dbmodels.TextField(blank=True) 61 # This magic value is the default to simplify the scheduler logic. 62 # It must be "large". The common use of atomic groups is to want all 63 # machines in the group to be used, limits on which subset used are 64 # often chosen via dependency labels. 65 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 66 # "infinity" is around 3.3 million. 67 INFINITE_MACHINES = 333333333 68 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 69 invalid = dbmodels.BooleanField(default=False, 70 editable=settings.FULL_ADMIN) 71 72 name_field = 'name' 73 objects = model_logic.ModelWithInvalidManager() 74 valid_objects = model_logic.ValidObjectsManager() 75 76 77 def enqueue_job(self, job, is_template=False): 78 """Enqueue a job on an associated atomic group of hosts. 79 80 @param job: A job to enqueue. 81 @param is_template: Whether the status should be "Template". 82 """ 83 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 84 is_template=is_template) 85 queue_entry.save() 86 87 88 def clean_object(self): 89 self.label_set.clear() 90 91 92 class Meta: 93 """Metadata for class AtomicGroup.""" 94 db_table = 'afe_atomic_groups' 95 96 97 def __unicode__(self): 98 return unicode(self.name) 99 100 101class Label(model_logic.ModelWithInvalid, dbmodels.Model): 102 """\ 103 Required: 104 name: label name 105 106 Optional: 107 kernel_config: URL/path to kernel config for jobs run on this label. 108 platform: If True, this is a platform label (defaults to False). 109 only_if_needed: If True, a Host with this label can only be used if that 110 label is requested by the job/test (either as the meta_host or 111 in the job_dependencies). 112 atomic_group: The atomic group associated with this label. 113 """ 114 name = dbmodels.CharField(max_length=255, unique=True) 115 kernel_config = dbmodels.CharField(max_length=255, blank=True) 116 platform = dbmodels.BooleanField(default=False) 117 invalid = dbmodels.BooleanField(default=False, 118 editable=settings.FULL_ADMIN) 119 only_if_needed = dbmodels.BooleanField(default=False) 120 121 name_field = 'name' 122 objects = model_logic.ModelWithInvalidManager() 123 valid_objects = model_logic.ValidObjectsManager() 124 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 125 126 127 def clean_object(self): 128 self.host_set.clear() 129 self.test_set.clear() 130 131 132 def enqueue_job(self, job, is_template=False): 133 """Enqueue a job on any host of this label. 134 135 @param job: A job to enqueue. 136 @param is_template: Whether the status should be "Template". 137 """ 138 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 139 is_template=is_template) 140 queue_entry.save() 141 142 143 144 class Meta: 145 """Metadata for class Label.""" 146 db_table = 'afe_labels' 147 148 149 def __unicode__(self): 150 return unicode(self.name) 151 152 153 def is_replaced_by_static(self): 154 """Detect whether a label is replaced by a static label. 155 156 'Static' means it can only be modified by skylab inventory tools. 157 """ 158 if RESPECT_STATIC_LABELS: 159 replaced = ReplacedLabel.objects.filter(label__id=self.id) 160 if len(replaced) > 0: 161 return True 162 163 return False 164 165 166class StaticLabel(model_logic.ModelWithInvalid, dbmodels.Model): 167 """\ 168 Required: 169 name: label name 170 171 Optional: 172 kernel_config: URL/path to kernel config for jobs run on this label. 173 platform: If True, this is a platform label (defaults to False). 174 only_if_needed: Deprecated. This is always False. 175 atomic_group: Deprecated. This is always NULL. 176 """ 177 name = dbmodels.CharField(max_length=255, unique=True) 178 kernel_config = dbmodels.CharField(max_length=255, blank=True) 179 platform = dbmodels.BooleanField(default=False) 180 invalid = dbmodels.BooleanField(default=False, 181 editable=settings.FULL_ADMIN) 182 only_if_needed = dbmodels.BooleanField(default=False) 183 184 name_field = 'name' 185 objects = model_logic.ModelWithInvalidManager() 186 valid_objects = model_logic.ValidObjectsManager() 187 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 188 189 def clean_object(self): 190 self.host_set.clear() 191 self.test_set.clear() 192 193 194 class Meta: 195 """Metadata for class StaticLabel.""" 196 db_table = 'afe_static_labels' 197 198 199 def __unicode__(self): 200 return unicode(self.name) 201 202 203class ReplacedLabel(dbmodels.Model, model_logic.ModelExtensions): 204 """The tag to indicate Whether to replace labels with static labels.""" 205 label = dbmodels.ForeignKey(Label) 206 objects = model_logic.ExtendedManager() 207 208 209 class Meta: 210 """Metadata for class ReplacedLabel.""" 211 db_table = 'afe_replaced_labels' 212 213 214 def __unicode__(self): 215 return unicode(self.label) 216 217 218class Shard(dbmodels.Model, model_logic.ModelExtensions): 219 220 hostname = dbmodels.CharField(max_length=255, unique=True) 221 222 name_field = 'hostname' 223 224 labels = dbmodels.ManyToManyField(Label, blank=True, 225 db_table='afe_shards_labels') 226 227 class Meta: 228 """Metadata for class ParameterizedJob.""" 229 db_table = 'afe_shards' 230 231 232class Drone(dbmodels.Model, model_logic.ModelExtensions): 233 """ 234 A scheduler drone 235 236 hostname: the drone's hostname 237 """ 238 hostname = dbmodels.CharField(max_length=255, unique=True) 239 240 name_field = 'hostname' 241 objects = model_logic.ExtendedManager() 242 243 244 def save(self, *args, **kwargs): 245 if not User.current_user().is_superuser(): 246 raise Exception('Only superusers may edit drones') 247 super(Drone, self).save(*args, **kwargs) 248 249 250 def delete(self): 251 if not User.current_user().is_superuser(): 252 raise Exception('Only superusers may delete drones') 253 super(Drone, self).delete() 254 255 256 class Meta: 257 """Metadata for class Drone.""" 258 db_table = 'afe_drones' 259 260 def __unicode__(self): 261 return unicode(self.hostname) 262 263 264class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 265 """ 266 A set of scheduler drones 267 268 These will be used by the scheduler to decide what drones a job is allowed 269 to run on. 270 271 name: the drone set's name 272 drones: the drones that are part of the set 273 """ 274 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 275 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 276 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 277 'SCHEDULER', 'default_drone_set_name', default=None) 278 279 name = dbmodels.CharField(max_length=255, unique=True) 280 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 281 282 name_field = 'name' 283 objects = model_logic.ExtendedManager() 284 285 286 def save(self, *args, **kwargs): 287 if not User.current_user().is_superuser(): 288 raise Exception('Only superusers may edit drone sets') 289 super(DroneSet, self).save(*args, **kwargs) 290 291 292 def delete(self): 293 if not User.current_user().is_superuser(): 294 raise Exception('Only superusers may delete drone sets') 295 super(DroneSet, self).delete() 296 297 298 @classmethod 299 def drone_sets_enabled(cls): 300 """Returns whether drone sets are enabled. 301 302 @param cls: Implicit class object. 303 """ 304 return cls.DRONE_SETS_ENABLED 305 306 307 @classmethod 308 def default_drone_set_name(cls): 309 """Returns the default drone set name. 310 311 @param cls: Implicit class object. 312 """ 313 return cls.DEFAULT_DRONE_SET_NAME 314 315 316 @classmethod 317 def get_default(cls): 318 """Gets the default drone set name, compatible with Job.add_object. 319 320 @param cls: Implicit class object. 321 """ 322 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 323 324 325 @classmethod 326 def resolve_name(cls, drone_set_name): 327 """ 328 Returns the name of one of these, if not None, in order of preference: 329 1) the drone set given, 330 2) the current user's default drone set, or 331 3) the global default drone set 332 333 or returns None if drone sets are disabled 334 335 @param cls: Implicit class object. 336 @param drone_set_name: A drone set name. 337 """ 338 if not cls.drone_sets_enabled(): 339 return None 340 341 user = User.current_user() 342 user_drone_set_name = user.drone_set and user.drone_set.name 343 344 return drone_set_name or user_drone_set_name or cls.get_default().name 345 346 347 def get_drone_hostnames(self): 348 """ 349 Gets the hostnames of all drones in this drone set 350 """ 351 return set(self.drones.all().values_list('hostname', flat=True)) 352 353 354 class Meta: 355 """Metadata for class DroneSet.""" 356 db_table = 'afe_drone_sets' 357 358 def __unicode__(self): 359 return unicode(self.name) 360 361 362class User(dbmodels.Model, model_logic.ModelExtensions): 363 """\ 364 Required: 365 login :user login name 366 367 Optional: 368 access_level: 0=User (default), 1=Admin, 100=Root 369 """ 370 ACCESS_ROOT = 100 371 ACCESS_ADMIN = 1 372 ACCESS_USER = 0 373 374 AUTOTEST_SYSTEM = 'autotest_system' 375 376 login = dbmodels.CharField(max_length=255, unique=True) 377 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 378 379 # user preferences 380 reboot_before = dbmodels.SmallIntegerField( 381 choices=model_attributes.RebootBefore.choices(), blank=True, 382 default=DEFAULT_REBOOT_BEFORE) 383 reboot_after = dbmodels.SmallIntegerField( 384 choices=model_attributes.RebootAfter.choices(), blank=True, 385 default=DEFAULT_REBOOT_AFTER) 386 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 387 show_experimental = dbmodels.BooleanField(default=False) 388 389 name_field = 'login' 390 objects = model_logic.ExtendedManager() 391 392 393 def save(self, *args, **kwargs): 394 # is this a new object being saved for the first time? 395 first_time = (self.id is None) 396 user = thread_local.get_user() 397 if user and not user.is_superuser() and user.login != self.login: 398 raise AclAccessViolation("You cannot modify user " + self.login) 399 super(User, self).save(*args, **kwargs) 400 if first_time: 401 everyone = AclGroup.objects.get(name='Everyone') 402 everyone.users.add(self) 403 404 405 def is_superuser(self): 406 """Returns whether the user has superuser access.""" 407 return self.access_level >= self.ACCESS_ROOT 408 409 410 @classmethod 411 def current_user(cls): 412 """Returns the current user. 413 414 @param cls: Implicit class object. 415 """ 416 user = thread_local.get_user() 417 if user is None: 418 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 419 user.access_level = cls.ACCESS_ROOT 420 user.save() 421 return user 422 423 424 @classmethod 425 def get_record(cls, data): 426 """Check the database for an identical record. 427 428 Check for a record with matching id and login. If one exists, 429 return it. If one does not exist there is a possibility that 430 the following cases have happened: 431 1. Same id, different login 432 We received: "1 chromeos-test" 433 And we have: "1 debug-user" 434 In this case we need to delete "1 debug_user" and insert 435 "1 chromeos-test". 436 437 2. Same login, different id: 438 We received: "1 chromeos-test" 439 And we have: "2 chromeos-test" 440 In this case we need to delete "2 chromeos-test" and insert 441 "1 chromeos-test". 442 443 As long as this method deletes bad records and raises the 444 DoesNotExist exception the caller will handle creating the 445 new record. 446 447 @raises: DoesNotExist, if a record with the matching login and id 448 does not exist. 449 """ 450 451 # Both the id and login should be uniqe but there are cases when 452 # we might already have a user with the same login/id because 453 # current_user will proactively create a user record if it doesn't 454 # exist. Since we want to avoid conflict between the master and 455 # shard, just delete any existing user records that don't match 456 # what we're about to deserialize from the master. 457 try: 458 return cls.objects.get(login=data['login'], id=data['id']) 459 except cls.DoesNotExist: 460 cls.delete_matching_record(login=data['login']) 461 cls.delete_matching_record(id=data['id']) 462 raise 463 464 465 class Meta: 466 """Metadata for class User.""" 467 db_table = 'afe_users' 468 469 def __unicode__(self): 470 return unicode(self.login) 471 472 473class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 474 model_logic.ModelWithAttributes): 475 """\ 476 Required: 477 hostname 478 479 optional: 480 locked: if true, host is locked and will not be queued 481 482 Internal: 483 From AbstractHostModel: 484 status: string describing status of host 485 invalid: true if the host has been deleted 486 protection: indicates what can be done to this host during repair 487 lock_time: DateTime at which the host was locked 488 dirty: true if the host has been used without being rebooted 489 Local: 490 locked_by: user that locked the host, or null if the host is unlocked 491 """ 492 493 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 494 'hostattribute_set', 495 'labels', 496 'shard']) 497 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) 498 499 500 def custom_deserialize_relation(self, link, data): 501 assert link == 'shard', 'Link %s should not be deserialized' % link 502 self.shard = Shard.deserialize(data) 503 504 505 # Note: Only specify foreign keys here, specify all native host columns in 506 # rdb_model_extensions instead. 507 Protection = host_protections.Protection 508 labels = dbmodels.ManyToManyField(Label, blank=True, 509 db_table='afe_hosts_labels') 510 static_labels = dbmodels.ManyToManyField( 511 StaticLabel, blank=True, db_table='afe_static_hosts_labels') 512 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 513 name_field = 'hostname' 514 objects = model_logic.ModelWithInvalidManager() 515 valid_objects = model_logic.ValidObjectsManager() 516 leased_objects = model_logic.LeasedHostManager() 517 518 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 519 520 def __init__(self, *args, **kwargs): 521 super(Host, self).__init__(*args, **kwargs) 522 self._record_attributes(['status']) 523 524 525 @classmethod 526 def classify_labels(cls, label_names): 527 """Split labels to static & non-static. 528 529 @label_names: a list of labels (string). 530 531 @returns: a list of StaticLabel objects & a list of 532 (non-static) Label objects. 533 """ 534 if not label_names: 535 return [], [] 536 537 labels = Label.objects.filter(name__in=label_names) 538 539 if not RESPECT_STATIC_LABELS: 540 return [], labels 541 542 return cls.classify_label_objects(labels) 543 544 545 @classmethod 546 def classify_label_objects(cls, label_objects): 547 if not RESPECT_STATIC_LABELS: 548 return [], label_objects 549 550 replaced_labels = ReplacedLabel.objects.filter(label__in=label_objects) 551 replaced_ids = [l.label.id for l in replaced_labels] 552 non_static_labels = [ 553 l for l in label_objects if not l.id in replaced_ids] 554 static_label_names = [ 555 l.name for l in label_objects if l.id in replaced_ids] 556 static_labels = StaticLabel.objects.filter(name__in=static_label_names) 557 return static_labels, non_static_labels 558 559 560 @classmethod 561 def get_hosts_with_labels(cls, label_names, initial_query): 562 """Get hosts by label filters. 563 564 @param label_names: label (string) lists for fetching hosts. 565 @param initial_query: a model_logic.QuerySet of Host object, e.g. 566 567 Host.objects.all(), Host.valid_objects.all(). 568 569 This initial_query cannot be a sliced QuerySet, e.g. 570 571 Host.objects.all().filter(query_limit=10) 572 """ 573 if not label_names: 574 return initial_query 575 576 static_labels, non_static_labels = cls.classify_labels(label_names) 577 if len(static_labels) + len(non_static_labels) != len(label_names): 578 # Some labels don't exist in afe db, which means no hosts 579 # should be matched. 580 return set() 581 582 for l in static_labels: 583 initial_query = initial_query.filter(static_labels=l) 584 585 for l in non_static_labels: 586 initial_query = initial_query.filter(labels=l) 587 588 return initial_query 589 590 591 @classmethod 592 def get_hosts_with_label_ids(cls, label_ids, initial_query): 593 """Get hosts by label_id filters. 594 595 @param label_ids: label id (int) lists for fetching hosts. 596 @param initial_query: a list of Host object, e.g. 597 [<Host: 100.107.151.253>, <Host: 100.107.151.251>, ...] 598 """ 599 labels = Label.objects.filter(id__in=label_ids) 600 label_names = [l.name for l in labels] 601 return cls.get_hosts_with_labels(label_names, initial_query) 602 603 604 @staticmethod 605 def create_one_time_host(hostname): 606 """Creates a one-time host. 607 608 @param hostname: The name for the host. 609 """ 610 query = Host.objects.filter(hostname=hostname) 611 if query.count() == 0: 612 host = Host(hostname=hostname, invalid=True) 613 host.do_validate() 614 else: 615 host = query[0] 616 if not host.invalid: 617 raise model_logic.ValidationError({ 618 'hostname' : '%s already exists in the autotest DB. ' 619 'Select it rather than entering it as a one time ' 620 'host.' % hostname 621 }) 622 host.protection = host_protections.Protection.DO_NOT_REPAIR 623 host.locked = False 624 host.save() 625 host.clean_object() 626 return host 627 628 629 @classmethod 630 def _assign_to_shard_nothing_helper(cls): 631 """Does nothing. 632 633 This method is called in the middle of assign_to_shard, and does 634 nothing. It exists to allow integration tests to simulate a race 635 condition.""" 636 637 638 @classmethod 639 def assign_to_shard(cls, shard, known_ids): 640 """Assigns hosts to a shard. 641 642 For all labels that have been assigned to a shard, all hosts that 643 have at least one of the shard's labels are assigned to the shard. 644 Hosts that are assigned to the shard but aren't already present on the 645 shard are returned. 646 647 Any boards that are in |known_ids| but that do not belong to the shard 648 are incorrect ids, which are also returned so that the shard can remove 649 them locally. 650 651 Board to shard mapping is many-to-one. Many different boards can be 652 hosted in a shard. However, DUTs of a single board cannot be distributed 653 into more than one shard. 654 655 @param shard: The shard object to assign labels/hosts for. 656 @param known_ids: List of all host-ids the shard already knows. 657 This is used to figure out which hosts should be sent 658 to the shard. If shard_ids were used instead, hosts 659 would only be transferred once, even if the client 660 failed persisting them. 661 The number of hosts usually lies in O(100), so the 662 overhead is acceptable. 663 664 @returns a tuple of (hosts objects that should be sent to the shard, 665 incorrect host ids that should not belong to] 666 shard) 667 """ 668 # Disclaimer: concurrent heartbeats should theoretically not occur in 669 # the current setup. As they may be introduced in the near future, 670 # this comment will be left here. 671 672 # Sending stuff twice is acceptable, but forgetting something isn't. 673 # Detecting duplicates on the client is easy, but here it's harder. The 674 # following options were considered: 675 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 676 # than select returned, as concurrently more hosts might have been 677 # inserted 678 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 679 # hosts for the shard, this is overhead 680 # - SELECT and then UPDATE only selected without requerying afterwards: 681 # returns the old state of the records. 682 new_hosts = [] 683 684 possible_new_host_ids = set(Host.objects.filter( 685 labels__in=shard.labels.all(), 686 leased=False 687 ).exclude( 688 id__in=known_ids, 689 ).values_list('pk', flat=True)) 690 691 # No-op in production, used to simulate race condition in tests. 692 cls._assign_to_shard_nothing_helper() 693 694 if possible_new_host_ids: 695 Host.objects.filter( 696 pk__in=possible_new_host_ids, 697 labels__in=shard.labels.all(), 698 leased=False 699 ).update(shard=shard) 700 new_hosts = list(Host.objects.filter( 701 pk__in=possible_new_host_ids, 702 shard=shard 703 ).all()) 704 705 invalid_host_ids = list(Host.objects.filter( 706 id__in=known_ids 707 ).exclude( 708 shard=shard 709 ).values_list('pk', flat=True)) 710 711 return new_hosts, invalid_host_ids 712 713 def resurrect_object(self, old_object): 714 super(Host, self).resurrect_object(old_object) 715 # invalid hosts can be in use by the scheduler (as one-time hosts), so 716 # don't change the status 717 self.status = old_object.status 718 719 720 def clean_object(self): 721 self.aclgroup_set.clear() 722 self.labels.clear() 723 self.static_labels.clear() 724 725 726 def save(self, *args, **kwargs): 727 # extra spaces in the hostname can be a sneaky source of errors 728 self.hostname = self.hostname.strip() 729 # is this a new object being saved for the first time? 730 first_time = (self.id is None) 731 if not first_time: 732 AclGroup.check_for_acl_violation_hosts([self]) 733 # If locked is changed, send its status and user made the change to 734 # metaDB. Locks are important in host history because if a device is 735 # locked then we don't really care what state it is in. 736 if self.locked and not self.locked_by: 737 self.locked_by = User.current_user() 738 if not self.lock_time: 739 self.lock_time = datetime.now() 740 self.dirty = True 741 elif not self.locked and self.locked_by: 742 self.locked_by = None 743 self.lock_time = None 744 super(Host, self).save(*args, **kwargs) 745 if first_time: 746 everyone = AclGroup.objects.get(name='Everyone') 747 everyone.hosts.add(self) 748 # remove attributes that may have lingered from an old host and 749 # should not be associated with a new host 750 for host_attribute in self.hostattribute_set.all(): 751 self.delete_attribute(host_attribute.attribute) 752 self._check_for_updated_attributes() 753 754 755 def delete(self): 756 AclGroup.check_for_acl_violation_hosts([self]) 757 logging.info('Preconditions for deleting host %s...', self.hostname) 758 for queue_entry in self.hostqueueentry_set.all(): 759 logging.info(' Deleting and aborting hqe %s...', queue_entry) 760 queue_entry.deleted = True 761 queue_entry.abort() 762 logging.info(' ... done with hqe %s.', queue_entry) 763 for host_attribute in self.hostattribute_set.all(): 764 logging.info(' Deleting attribute %s...', host_attribute) 765 self.delete_attribute(host_attribute.attribute) 766 logging.info(' ... done with attribute %s.', host_attribute) 767 logging.info('... preconditions done for host %s.', self.hostname) 768 logging.info('Deleting host %s...', self.hostname) 769 super(Host, self).delete() 770 logging.info('... done.') 771 772 773 def on_attribute_changed(self, attribute, old_value): 774 assert attribute == 'status' 775 logging.info('%s -> %s', self.hostname, self.status) 776 777 778 def enqueue_job(self, job, is_template=False): 779 """Enqueue a job on this host. 780 781 @param job: A job to enqueue. 782 @param is_template: Whther the status should be "Template". 783 """ 784 queue_entry = HostQueueEntry.create(host=self, job=job, 785 is_template=is_template) 786 # allow recovery of dead hosts from the frontend 787 if not self.active_queue_entry() and self.is_dead(): 788 self.status = Host.Status.READY 789 self.save() 790 queue_entry.save() 791 792 block = IneligibleHostQueue(job=job, host=self) 793 block.save() 794 795 796 def platform(self): 797 """The platform of the host.""" 798 # TODO(showard): slighly hacky? 799 platforms = self.labels.filter(platform=True) 800 if len(platforms) == 0: 801 return None 802 return platforms[0] 803 platform.short_description = 'Platform' 804 805 806 @classmethod 807 def check_no_platform(cls, hosts): 808 """Verify the specified hosts have no associated platforms. 809 810 @param cls: Implicit class object. 811 @param hosts: The hosts to verify. 812 @raises model_logic.ValidationError if any hosts already have a 813 platform. 814 """ 815 Host.objects.populate_relationships(hosts, Label, 'label_list') 816 Host.objects.populate_relationships(hosts, StaticLabel, 817 'staticlabel_list') 818 errors = [] 819 for host in hosts: 820 platforms = [label.name for label in host.label_list 821 if label.platform] 822 if RESPECT_STATIC_LABELS: 823 platforms += [label.name for label in host.staticlabel_list 824 if label.platform] 825 826 if platforms: 827 # do a join, just in case this host has multiple platforms, 828 # we'll be able to see it 829 errors.append('Host %s already has a platform: %s' % ( 830 host.hostname, ', '.join(platforms))) 831 if errors: 832 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 833 834 835 @classmethod 836 def check_board_labels_allowed(cls, hosts, new_labels=[]): 837 """Verify the specified hosts have valid board labels and the given 838 new board labels can be added. 839 840 @param cls: Implicit class object. 841 @param hosts: The hosts to verify. 842 @param new_labels: A list of labels to be added to the hosts. 843 844 @raises model_logic.ValidationError if any host has invalid board labels 845 or the given board labels cannot be added to the hsots. 846 """ 847 Host.objects.populate_relationships(hosts, Label, 'label_list') 848 Host.objects.populate_relationships(hosts, StaticLabel, 849 'staticlabel_list') 850 errors = [] 851 for host in hosts: 852 boards = [label.name for label in host.label_list 853 if label.name.startswith('board:')] 854 if RESPECT_STATIC_LABELS: 855 boards += [label.name for label in host.staticlabel_list 856 if label.name.startswith('board:')] 857 858 new_boards = [name for name in new_labels 859 if name.startswith('board:')] 860 if len(boards) + len(new_boards) > 1: 861 # do a join, just in case this host has multiple boards, 862 # we'll be able to see it 863 errors.append('Host %s already has board labels: %s' % ( 864 host.hostname, ', '.join(boards))) 865 if errors: 866 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 867 868 869 def is_dead(self): 870 """Returns whether the host is dead (has status repair failed).""" 871 return self.status == Host.Status.REPAIR_FAILED 872 873 874 def active_queue_entry(self): 875 """Returns the active queue entry for this host, or None if none.""" 876 active = list(self.hostqueueentry_set.filter(active=True)) 877 if not active: 878 return None 879 assert len(active) == 1, ('More than one active entry for ' 880 'host ' + self.hostname) 881 return active[0] 882 883 884 def _get_attribute_model_and_args(self, attribute): 885 return HostAttribute, dict(host=self, attribute=attribute) 886 887 888 def _get_static_attribute_model_and_args(self, attribute): 889 return StaticHostAttribute, dict(host=self, attribute=attribute) 890 891 892 def _is_replaced_by_static_attribute(self, attribute): 893 if RESPECT_STATIC_ATTRIBUTES: 894 model, args = self._get_static_attribute_model_and_args(attribute) 895 try: 896 static_attr = model.objects.get(**args) 897 return True 898 except StaticHostAttribute.DoesNotExist: 899 return False 900 901 return False 902 903 904 @classmethod 905 def get_attribute_model(cls): 906 """Return the attribute model. 907 908 Override method in parent class. See ModelExtensions for details. 909 @returns: The attribute model of Host. 910 """ 911 return HostAttribute 912 913 914 class Meta: 915 """Metadata for the Host class.""" 916 db_table = 'afe_hosts' 917 918 919 def __unicode__(self): 920 return unicode(self.hostname) 921 922 923class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): 924 """Arbitrary keyvals associated with hosts.""" 925 926 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 927 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 928 host = dbmodels.ForeignKey(Host) 929 attribute = dbmodels.CharField(max_length=90) 930 value = dbmodels.CharField(max_length=300) 931 932 objects = model_logic.ExtendedManager() 933 934 class Meta: 935 """Metadata for the HostAttribute class.""" 936 db_table = 'afe_host_attributes' 937 938 939 @classmethod 940 def get_record(cls, data): 941 """Check the database for an identical record. 942 943 Use host_id and attribute to search for a existing record. 944 945 @raises: DoesNotExist, if no record found 946 @raises: MultipleObjectsReturned if multiple records found. 947 """ 948 # TODO(fdeng): We should use host_id and attribute together as 949 # a primary key in the db. 950 return cls.objects.get(host_id=data['host_id'], 951 attribute=data['attribute']) 952 953 954 @classmethod 955 def deserialize(cls, data): 956 """Override deserialize in parent class. 957 958 Do not deserialize id as id is not kept consistent on master and shards. 959 960 @param data: A dictionary of data to deserialize. 961 962 @returns: A HostAttribute object. 963 """ 964 if data: 965 data.pop('id') 966 return super(HostAttribute, cls).deserialize(data) 967 968 969class StaticHostAttribute(dbmodels.Model, model_logic.ModelExtensions): 970 """Static arbitrary keyvals associated with hosts.""" 971 972 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 973 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 974 host = dbmodels.ForeignKey(Host) 975 attribute = dbmodels.CharField(max_length=90) 976 value = dbmodels.CharField(max_length=300) 977 978 objects = model_logic.ExtendedManager() 979 980 class Meta: 981 """Metadata for the StaticHostAttribute class.""" 982 db_table = 'afe_static_host_attributes' 983 984 985 @classmethod 986 def get_record(cls, data): 987 """Check the database for an identical record. 988 989 Use host_id and attribute to search for a existing record. 990 991 @raises: DoesNotExist, if no record found 992 @raises: MultipleObjectsReturned if multiple records found. 993 """ 994 return cls.objects.get(host_id=data['host_id'], 995 attribute=data['attribute']) 996 997 998 @classmethod 999 def deserialize(cls, data): 1000 """Override deserialize in parent class. 1001 1002 Do not deserialize id as id is not kept consistent on master and shards. 1003 1004 @param data: A dictionary of data to deserialize. 1005 1006 @returns: A StaticHostAttribute object. 1007 """ 1008 if data: 1009 data.pop('id') 1010 return super(StaticHostAttribute, cls).deserialize(data) 1011 1012 1013class Test(dbmodels.Model, model_logic.ModelExtensions): 1014 """\ 1015 Required: 1016 author: author name 1017 description: description of the test 1018 name: test name 1019 time: short, medium, long 1020 test_class: This describes the class for your the test belongs in. 1021 test_category: This describes the category for your tests 1022 test_type: Client or Server 1023 path: path to pass to run_test() 1024 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 1025 async job. If it's >1 it's sync job for that number of machines 1026 i.e. if sync_count = 2 it is a sync job that requires two 1027 machines. 1028 Optional: 1029 dependencies: What the test requires to run. Comma deliminated list 1030 dependency_labels: many-to-many relationship with labels corresponding to 1031 test dependencies. 1032 experimental: If this is set to True production servers will ignore the test 1033 run_verify: Whether or not the scheduler should run the verify stage 1034 run_reset: Whether or not the scheduler should run the reset stage 1035 test_retry: Number of times to retry test if the test did not complete 1036 successfully. (optional, default: 0) 1037 """ 1038 TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1) 1039 1040 name = dbmodels.CharField(max_length=255, unique=True) 1041 author = dbmodels.CharField(max_length=255) 1042 test_class = dbmodels.CharField(max_length=255) 1043 test_category = dbmodels.CharField(max_length=255) 1044 dependencies = dbmodels.CharField(max_length=255, blank=True) 1045 description = dbmodels.TextField(blank=True) 1046 experimental = dbmodels.BooleanField(default=True) 1047 run_verify = dbmodels.BooleanField(default=False) 1048 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 1049 default=TestTime.MEDIUM) 1050 test_type = dbmodels.SmallIntegerField( 1051 choices=control_data.CONTROL_TYPE.choices()) 1052 sync_count = dbmodels.IntegerField(default=1) 1053 path = dbmodels.CharField(max_length=255, unique=True) 1054 test_retry = dbmodels.IntegerField(blank=True, default=0) 1055 run_reset = dbmodels.BooleanField(default=True) 1056 1057 dependency_labels = ( 1058 dbmodels.ManyToManyField(Label, blank=True, 1059 db_table='afe_autotests_dependency_labels')) 1060 name_field = 'name' 1061 objects = model_logic.ExtendedManager() 1062 1063 1064 def admin_description(self): 1065 """Returns a string representing the admin description.""" 1066 escaped_description = saxutils.escape(self.description) 1067 return '<span style="white-space:pre">%s</span>' % escaped_description 1068 admin_description.allow_tags = True 1069 admin_description.short_description = 'Description' 1070 1071 1072 class Meta: 1073 """Metadata for class Test.""" 1074 db_table = 'afe_autotests' 1075 1076 def __unicode__(self): 1077 return unicode(self.name) 1078 1079 1080class TestParameter(dbmodels.Model): 1081 """ 1082 A declared parameter of a test 1083 """ 1084 test = dbmodels.ForeignKey(Test) 1085 name = dbmodels.CharField(max_length=255) 1086 1087 class Meta: 1088 """Metadata for class TestParameter.""" 1089 db_table = 'afe_test_parameters' 1090 unique_together = ('test', 'name') 1091 1092 def __unicode__(self): 1093 return u'%s (%s)' % (self.name, self.test.name) 1094 1095 1096class Profiler(dbmodels.Model, model_logic.ModelExtensions): 1097 """\ 1098 Required: 1099 name: profiler name 1100 test_type: Client or Server 1101 1102 Optional: 1103 description: arbirary text description 1104 """ 1105 name = dbmodels.CharField(max_length=255, unique=True) 1106 description = dbmodels.TextField(blank=True) 1107 1108 name_field = 'name' 1109 objects = model_logic.ExtendedManager() 1110 1111 1112 class Meta: 1113 """Metadata for class Profiler.""" 1114 db_table = 'afe_profilers' 1115 1116 def __unicode__(self): 1117 return unicode(self.name) 1118 1119 1120class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 1121 """\ 1122 Required: 1123 name: name of ACL group 1124 1125 Optional: 1126 description: arbitrary description of group 1127 """ 1128 1129 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 1130 1131 name = dbmodels.CharField(max_length=255, unique=True) 1132 description = dbmodels.CharField(max_length=255, blank=True) 1133 users = dbmodels.ManyToManyField(User, blank=False, 1134 db_table='afe_acl_groups_users') 1135 hosts = dbmodels.ManyToManyField(Host, blank=True, 1136 db_table='afe_acl_groups_hosts') 1137 1138 name_field = 'name' 1139 objects = model_logic.ExtendedManager() 1140 1141 @staticmethod 1142 def check_for_acl_violation_hosts(hosts): 1143 """Verify the current user has access to the specified hosts. 1144 1145 @param hosts: The hosts to verify against. 1146 @raises AclAccessViolation if the current user doesn't have access 1147 to a host. 1148 """ 1149 user = User.current_user() 1150 if user.is_superuser(): 1151 return 1152 accessible_host_ids = set( 1153 host.id for host in Host.objects.filter(aclgroup__users=user)) 1154 for host in hosts: 1155 # Check if the user has access to this host, 1156 # but only if it is not a metahost or a one-time-host. 1157 no_access = (isinstance(host, Host) 1158 and not host.invalid 1159 and int(host.id) not in accessible_host_ids) 1160 if no_access: 1161 raise AclAccessViolation("%s does not have access to %s" % 1162 (str(user), str(host))) 1163 1164 1165 @staticmethod 1166 def check_abort_permissions(queue_entries): 1167 """Look for queue entries that aren't abortable by the current user. 1168 1169 An entry is not abortable if: 1170 * the job isn't owned by this user, and 1171 * the machine isn't ACL-accessible, or 1172 * the machine is in the "Everyone" ACL 1173 1174 @param queue_entries: The queue entries to check. 1175 @raises AclAccessViolation if a queue entry is not abortable by the 1176 current user. 1177 """ 1178 user = User.current_user() 1179 if user.is_superuser(): 1180 return 1181 not_owned = queue_entries.exclude(job__owner=user.login) 1182 # I do this using ID sets instead of just Django filters because 1183 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 1184 # 1.0. 1185 # TODO: Use Django filters, now that we're using 1.0. 1186 accessible_ids = set( 1187 entry.id for entry 1188 in not_owned.filter(host__aclgroup__users__login=user.login)) 1189 public_ids = set(entry.id for entry 1190 in not_owned.filter(host__aclgroup__name='Everyone')) 1191 cannot_abort = [entry for entry in not_owned.select_related() 1192 if entry.id not in accessible_ids 1193 or entry.id in public_ids] 1194 if len(cannot_abort) == 0: 1195 return 1196 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 1197 entry.host_or_metahost_name()) 1198 for entry in cannot_abort) 1199 raise AclAccessViolation('You cannot abort the following job entries: ' 1200 + entry_names) 1201 1202 1203 def check_for_acl_violation_acl_group(self): 1204 """Verifies the current user has acces to this ACL group. 1205 1206 @raises AclAccessViolation if the current user doesn't have access to 1207 this ACL group. 1208 """ 1209 user = User.current_user() 1210 if user.is_superuser(): 1211 return 1212 if self.name == 'Everyone': 1213 raise AclAccessViolation("You cannot modify 'Everyone'!") 1214 if not user in self.users.all(): 1215 raise AclAccessViolation("You do not have access to %s" 1216 % self.name) 1217 1218 @staticmethod 1219 def on_host_membership_change(): 1220 """Invoked when host membership changes.""" 1221 everyone = AclGroup.objects.get(name='Everyone') 1222 1223 # find hosts that aren't in any ACL group and add them to Everyone 1224 # TODO(showard): this is a bit of a hack, since the fact that this query 1225 # works is kind of a coincidence of Django internals. This trick 1226 # doesn't work in general (on all foreign key relationships). I'll 1227 # replace it with a better technique when the need arises. 1228 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 1229 everyone.hosts.add(*orphaned_hosts.distinct()) 1230 1231 # find hosts in both Everyone and another ACL group, and remove them 1232 # from Everyone 1233 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 1234 acled_hosts = set() 1235 for host in hosts_in_everyone: 1236 # Has an ACL group other than Everyone 1237 if host.aclgroup_set.count() > 1: 1238 acled_hosts.add(host) 1239 everyone.hosts.remove(*acled_hosts) 1240 1241 1242 def delete(self): 1243 if (self.name == 'Everyone'): 1244 raise AclAccessViolation("You cannot delete 'Everyone'!") 1245 self.check_for_acl_violation_acl_group() 1246 super(AclGroup, self).delete() 1247 self.on_host_membership_change() 1248 1249 1250 def add_current_user_if_empty(self): 1251 """Adds the current user if the set of users is empty.""" 1252 if not self.users.count(): 1253 self.users.add(User.current_user()) 1254 1255 1256 def perform_after_save(self, change): 1257 """Called after a save. 1258 1259 @param change: Whether there was a change. 1260 """ 1261 if not change: 1262 self.users.add(User.current_user()) 1263 self.add_current_user_if_empty() 1264 self.on_host_membership_change() 1265 1266 1267 def save(self, *args, **kwargs): 1268 change = bool(self.id) 1269 if change: 1270 # Check the original object for an ACL violation 1271 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 1272 super(AclGroup, self).save(*args, **kwargs) 1273 self.perform_after_save(change) 1274 1275 1276 class Meta: 1277 """Metadata for class AclGroup.""" 1278 db_table = 'afe_acl_groups' 1279 1280 def __unicode__(self): 1281 return unicode(self.name) 1282 1283 1284class ParameterizedJob(dbmodels.Model): 1285 """ 1286 Auxiliary configuration for a parameterized job. 1287 1288 This class is obsolete, and ought to be dead. Due to a series of 1289 unfortunate events, it can't be deleted: 1290 * In `class Job` we're required to keep a reference to this class 1291 for the sake of the scheduler unit tests. 1292 * The existence of the reference in `Job` means that certain 1293 methods here will get called from the `get_jobs` RPC. 1294 So, the definitions below seem to be the minimum stub we can support 1295 unless/until we change the database schema. 1296 """ 1297 1298 @classmethod 1299 def smart_get(cls, id_or_name, *args, **kwargs): 1300 """For compatibility with Job.add_object. 1301 1302 @param cls: Implicit class object. 1303 @param id_or_name: The ID or name to get. 1304 @param args: Non-keyword arguments. 1305 @param kwargs: Keyword arguments. 1306 """ 1307 return cls.objects.get(pk=id_or_name) 1308 1309 1310 def job(self): 1311 """Returns the job if it exists, or else None.""" 1312 jobs = self.job_set.all() 1313 assert jobs.count() <= 1 1314 return jobs and jobs[0] or None 1315 1316 1317 class Meta: 1318 """Metadata for class ParameterizedJob.""" 1319 db_table = 'afe_parameterized_jobs' 1320 1321 def __unicode__(self): 1322 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 1323 1324 1325class JobManager(model_logic.ExtendedManager): 1326 'Custom manager to provide efficient status counts querying.' 1327 def get_status_counts(self, job_ids): 1328 """Returns a dict mapping the given job IDs to their status count dicts. 1329 1330 @param job_ids: A list of job IDs. 1331 """ 1332 if not job_ids: 1333 return {} 1334 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1335 cursor = connection.cursor() 1336 cursor.execute(""" 1337 SELECT job_id, status, aborted, complete, COUNT(*) 1338 FROM afe_host_queue_entries 1339 WHERE job_id IN %s 1340 GROUP BY job_id, status, aborted, complete 1341 """ % id_list) 1342 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1343 for job_id, status, aborted, complete, count in cursor.fetchall(): 1344 job_dict = all_job_counts[job_id] 1345 full_status = HostQueueEntry.compute_full_status(status, aborted, 1346 complete) 1347 job_dict.setdefault(full_status, 0) 1348 job_dict[full_status] += count 1349 return all_job_counts 1350 1351 1352class Job(dbmodels.Model, model_logic.ModelExtensions): 1353 """\ 1354 owner: username of job owner 1355 name: job name (does not have to be unique) 1356 priority: Integer priority value. Higher is more important. 1357 control_file: contents of control file 1358 control_type: Client or Server 1359 created_on: date of job creation 1360 submitted_on: date of job submission 1361 synch_count: how many hosts should be used per autoserv execution 1362 run_verify: Whether or not to run the verify phase 1363 run_reset: Whether or not to run the reset phase 1364 timeout: DEPRECATED - hours from queuing time until job times out 1365 timeout_mins: minutes from job queuing time until the job times out 1366 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1367 times out 1368 max_runtime_mins: minutes from job starting time until job times out 1369 email_list: list of people to email on completion delimited by any of: 1370 white space, ',', ':', ';' 1371 dependency_labels: many-to-many relationship with labels corresponding to 1372 job dependencies 1373 reboot_before: Never, If dirty, or Always 1374 reboot_after: Never, If all tests passed, or Always 1375 parse_failed_repair: if True, a failed repair launched by this job will have 1376 its results parsed as part of the job. 1377 drone_set: The set of drones to run this job on 1378 parent_job: Parent job (optional) 1379 test_retry: Number of times to retry test if the test did not complete 1380 successfully. (optional, default: 0) 1381 require_ssp: Require server-side packaging unless require_ssp is set to 1382 False. (optional, default: None) 1383 """ 1384 1385 # TODO: Investigate, if jobkeyval_set is really needed. 1386 # dynamic_suite will write them into an attached file for the drone, but 1387 # it doesn't seem like they are actually used. If they aren't used, remove 1388 # jobkeyval_set here. 1389 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1390 'hostqueueentry_set', 1391 'jobkeyval_set', 1392 'shard']) 1393 1394 # SQL for selecting jobs that should be sent to shard. 1395 # We use raw sql as django filters were not optimized. 1396 # The following jobs are excluded by the SQL. 1397 # - Non-aborted jobs known to shard as specified in |known_ids|. 1398 # Note for jobs aborted on master, even if already known to shard, 1399 # will be sent to shard again so that shard can abort them. 1400 # - Completed jobs 1401 # - Active jobs 1402 # - Jobs without host_queue_entries 1403 NON_ABORTED_KNOWN_JOBS = '(t2.aborted = 0 AND t1.id IN (%(known_ids)s))' 1404 1405 SQL_SHARD_JOBS = ( 1406 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1407 'INNER JOIN afe_host_queue_entries t2 ON ' 1408 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1409 ' %(check_known_jobs)s) ' 1410 'LEFT OUTER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) ' 1411 'JOIN afe_shards_labels t4 ' 1412 ' ON (t4.label_id = t3.label_id OR t4.label_id = t2.meta_host) ' 1413 'WHERE t4.shard_id = %(shard_id)s' 1414 ) 1415 1416 # Jobs can be created with assigned hosts and have no dependency 1417 # labels nor meta_host. 1418 # We are looking for: 1419 # - a job whose hqe's meta_host is null 1420 # - a job whose hqe has a host 1421 # - one of the host's labels matches the shard's label. 1422 # Non-aborted known jobs, completed jobs, active jobs, jobs 1423 # without hqe are exluded as we do with SQL_SHARD_JOBS. 1424 SQL_SHARD_JOBS_WITH_HOSTS = ( 1425 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1426 'INNER JOIN afe_host_queue_entries t2 ON ' 1427 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1428 ' AND t2.meta_host IS NULL AND t2.host_id IS NOT NULL ' 1429 ' %(check_known_jobs)s) ' 1430 'LEFT OUTER JOIN %(host_label_table)s t3 ON (t2.host_id = t3.host_id) ' 1431 'WHERE (t3.%(host_label_column)s IN %(label_ids)s)' 1432 ) 1433 1434 # Even if we had filters about complete, active and aborted 1435 # bits in the above two SQLs, there is a chance that 1436 # the result may still contain a job with an hqe with 'complete=1' 1437 # or 'active=1' or 'aborted=0 and afe_job.id in known jobs.' 1438 # This happens when a job has two (or more) hqes and at least 1439 # one hqe has different bits than others. 1440 # We use a second sql to ensure we exclude all un-desired jobs. 1441 SQL_JOBS_TO_EXCLUDE =( 1442 'SELECT t1.id FROM afe_jobs t1 ' 1443 'INNER JOIN afe_host_queue_entries t2 ON ' 1444 ' (t1.id = t2.job_id) ' 1445 'WHERE (t1.id in (%(candidates)s) ' 1446 ' AND (t2.complete=1 OR t2.active=1 ' 1447 ' %(check_known_jobs)s))' 1448 ) 1449 1450 def _deserialize_relation(self, link, data): 1451 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1452 for obj in data: 1453 obj['job_id'] = self.id 1454 1455 super(Job, self)._deserialize_relation(link, data) 1456 1457 1458 def custom_deserialize_relation(self, link, data): 1459 assert link == 'shard', 'Link %s should not be deserialized' % link 1460 self.shard = Shard.deserialize(data) 1461 1462 1463 def sanity_check_update_from_shard(self, shard, updated_serialized): 1464 # If the job got aborted on the master after the client fetched it 1465 # no shard_id will be set. The shard might still push updates though, 1466 # as the job might complete before the abort bit syncs to the shard. 1467 # Alternative considered: The master scheduler could be changed to not 1468 # set aborted jobs to completed that are sharded out. But that would 1469 # require database queries and seemed more complicated to implement. 1470 # This seems safe to do, as there won't be updates pushed from the wrong 1471 # shards should be powered off and wiped hen they are removed from the 1472 # master. 1473 if self.shard_id and self.shard_id != shard.id: 1474 raise error.IgnorableUnallowedRecordsSentToMaster( 1475 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1476 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1477 shard.id)) 1478 1479 1480 RebootBefore = model_attributes.RebootBefore 1481 RebootAfter = model_attributes.RebootAfter 1482 # TIMEOUT is deprecated. 1483 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1484 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1485 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1486 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1487 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1488 # completed. 1489 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1490 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1491 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1492 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1493 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1494 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, default=False) 1495 FETCH_READONLY_JOBS = global_config.global_config.get_config_value( 1496 'AUTOTEST_WEB','readonly_heartbeat', type=bool, default=False) 1497 CHECK_MASTER_IF_EMPTY = global_config.global_config.get_config_value( 1498 'AUTOTEST_WEB','heartbeat_fall_back_to_master', 1499 type=bool, default=False) 1500 1501 1502 owner = dbmodels.CharField(max_length=255) 1503 name = dbmodels.CharField(max_length=255) 1504 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1505 control_file = dbmodels.TextField(null=True, blank=True) 1506 control_type = dbmodels.SmallIntegerField( 1507 choices=control_data.CONTROL_TYPE.choices(), 1508 blank=True, # to allow 0 1509 default=control_data.CONTROL_TYPE.CLIENT) 1510 created_on = dbmodels.DateTimeField() 1511 synch_count = dbmodels.IntegerField(blank=True, default=0) 1512 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1513 run_verify = dbmodels.BooleanField(default=False) 1514 email_list = dbmodels.CharField(max_length=250, blank=True) 1515 dependency_labels = ( 1516 dbmodels.ManyToManyField(Label, blank=True, 1517 db_table='afe_jobs_dependency_labels')) 1518 reboot_before = dbmodels.SmallIntegerField( 1519 choices=model_attributes.RebootBefore.choices(), blank=True, 1520 default=DEFAULT_REBOOT_BEFORE) 1521 reboot_after = dbmodels.SmallIntegerField( 1522 choices=model_attributes.RebootAfter.choices(), blank=True, 1523 default=DEFAULT_REBOOT_AFTER) 1524 parse_failed_repair = dbmodels.BooleanField( 1525 default=DEFAULT_PARSE_FAILED_REPAIR) 1526 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1527 # completed. 1528 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1529 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1530 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1531 1532 # TODO(jrbarnette) We have to keep `parameterized_job` around or it 1533 # breaks the scheduler_models unit tests (and fixing the unit tests 1534 # will break the scheduler, so don't do that). 1535 # 1536 # The ultimate fix is to delete the column from the database table 1537 # at which point, you _must_ delete this. Until you're ready to do 1538 # that, DON'T MUCK WITH IT. 1539 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1540 blank=True) 1541 1542 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1543 1544 test_retry = dbmodels.IntegerField(blank=True, default=0) 1545 1546 run_reset = dbmodels.BooleanField(default=True) 1547 1548 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1549 1550 # If this is None on the master, a slave should be found. 1551 # If this is None on a slave, it should be synced back to the master 1552 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1553 1554 # If this is None, server-side packaging will be used for server side test, 1555 # unless it's disabled in global config AUTOSERV/enable_ssp_container. 1556 require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True) 1557 1558 # custom manager 1559 objects = JobManager() 1560 1561 1562 @decorators.cached_property 1563 def labels(self): 1564 """All the labels of this job""" 1565 # We need to convert dependency_labels to a list, because all() gives us 1566 # back an iterator, and storing/caching an iterator means we'd only be 1567 # able to read from it once. 1568 return list(self.dependency_labels.all()) 1569 1570 1571 def is_server_job(self): 1572 """Returns whether this job is of type server.""" 1573 return self.control_type == control_data.CONTROL_TYPE.SERVER 1574 1575 1576 @classmethod 1577 def create(cls, owner, options, hosts): 1578 """Creates a job. 1579 1580 The job is created by taking some information (the listed args) and 1581 filling in the rest of the necessary information. 1582 1583 @param cls: Implicit class object. 1584 @param owner: The owner for the job. 1585 @param options: An options object. 1586 @param hosts: The hosts to use. 1587 """ 1588 AclGroup.check_for_acl_violation_hosts(hosts) 1589 1590 control_file = options.get('control_file') 1591 1592 user = User.current_user() 1593 if options.get('reboot_before') is None: 1594 options['reboot_before'] = user.get_reboot_before_display() 1595 if options.get('reboot_after') is None: 1596 options['reboot_after'] = user.get_reboot_after_display() 1597 1598 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1599 1600 if options.get('timeout_mins') is None and options.get('timeout'): 1601 options['timeout_mins'] = options['timeout'] * 60 1602 1603 job = cls.add_object( 1604 owner=owner, 1605 name=options['name'], 1606 priority=options['priority'], 1607 control_file=control_file, 1608 control_type=options['control_type'], 1609 synch_count=options.get('synch_count'), 1610 # timeout needs to be deleted in the future. 1611 timeout=options.get('timeout'), 1612 timeout_mins=options.get('timeout_mins'), 1613 max_runtime_mins=options.get('max_runtime_mins'), 1614 run_verify=options.get('run_verify'), 1615 email_list=options.get('email_list'), 1616 reboot_before=options.get('reboot_before'), 1617 reboot_after=options.get('reboot_after'), 1618 parse_failed_repair=options.get('parse_failed_repair'), 1619 created_on=datetime.now(), 1620 drone_set=drone_set, 1621 parent_job=options.get('parent_job_id'), 1622 test_retry=options.get('test_retry'), 1623 run_reset=options.get('run_reset'), 1624 require_ssp=options.get('require_ssp')) 1625 1626 job.dependency_labels = options['dependencies'] 1627 1628 if options.get('keyvals'): 1629 for key, value in options['keyvals'].iteritems(): 1630 JobKeyval.objects.create(job=job, key=key, value=value) 1631 1632 return job 1633 1634 1635 @classmethod 1636 def assign_to_shard(cls, shard, known_ids): 1637 """Assigns unassigned jobs to a shard. 1638 1639 For all labels that have been assigned to this shard, all jobs that 1640 have this label, are assigned to this shard. 1641 1642 Jobs that are assigned to the shard but aren't already present on the 1643 shard are returned. 1644 1645 @param shard: The shard to assign jobs to. 1646 @param known_ids: List of all ids of incomplete jobs, the shard already 1647 knows about. 1648 This is used to figure out which jobs should be sent 1649 to the shard. If shard_ids were used instead, jobs 1650 would only be transferred once, even if the client 1651 failed persisting them. 1652 The number of unfinished jobs usually lies in O(1000). 1653 Assuming one id takes 8 chars in the json, this means 1654 overhead that lies in the lower kilobyte range. 1655 A not in query with 5000 id's takes about 30ms. 1656 1657 @returns The job objects that should be sent to the shard. 1658 """ 1659 # Disclaimer: Concurrent heartbeats should not occur in today's setup. 1660 # If this changes or they are triggered manually, this applies: 1661 # Jobs may be returned more than once by concurrent calls of this 1662 # function, as there is a race condition between SELECT and UPDATE. 1663 job_ids = set([]) 1664 check_known_jobs_exclude = '' 1665 check_known_jobs_include = '' 1666 1667 if known_ids: 1668 check_known_jobs = ( 1669 cls.NON_ABORTED_KNOWN_JOBS % 1670 {'known_ids': ','.join([str(i) for i in known_ids])}) 1671 check_known_jobs_exclude = 'AND NOT ' + check_known_jobs 1672 check_known_jobs_include = 'OR ' + check_known_jobs 1673 1674 raw_sql = cls.SQL_SHARD_JOBS % { 1675 'check_known_jobs': check_known_jobs_exclude, 1676 'shard_id': shard.id 1677 } 1678 1679 1680 if cls.FETCH_READONLY_JOBS: 1681 #TODO(jkop): Get rid of this kludge when we update Django to >=1.7 1682 #correct usage would be .raw(..., using='readonly') 1683 old_db = Job.objects._db 1684 try: 1685 Job.objects._db = 'readonly' 1686 job_ids = set([j.id for j in Job.objects.raw(raw_sql)]) 1687 except django_utils.DatabaseError: 1688 logging.exception( 1689 'Error attempting to query slave db, will retry on master') 1690 finally: 1691 Job.objects._db = old_db 1692 else: 1693 job_ids = set([j.id for j in Job.objects.raw(raw_sql)]) 1694 1695 static_labels, non_static_labels = Host.classify_label_objects( 1696 shard.labels.all()) 1697 if static_labels: 1698 label_ids = [str(l.id) for l in static_labels] 1699 query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 1700 'check_known_jobs': check_known_jobs_exclude, 1701 'host_label_table': 'afe_static_hosts_labels', 1702 'host_label_column': 'staticlabel_id', 1703 'label_ids': '(%s)' % ','.join(label_ids)}) 1704 job_ids |= set([j.id for j in query]) 1705 1706 if non_static_labels: 1707 label_ids = [str(l.id) for l in non_static_labels] 1708 query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 1709 'check_known_jobs': check_known_jobs_exclude, 1710 'host_label_table': 'afe_hosts_labels', 1711 'host_label_column': 'label_id', 1712 'label_ids': '(%s)' % ','.join(label_ids)}) 1713 job_ids |= set([j.id for j in query]) 1714 1715 if job_ids: 1716 query = Job.objects.raw( 1717 cls.SQL_JOBS_TO_EXCLUDE % 1718 {'check_known_jobs': check_known_jobs_include, 1719 'candidates': ','.join([str(i) for i in job_ids])}) 1720 job_ids -= set([j.id for j in query]) 1721 1722 if job_ids: 1723 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1724 return list(Job.objects.filter(pk__in=job_ids).all()) 1725 return [] 1726 1727 1728 def queue(self, hosts, is_template=False): 1729 """Enqueue a job on the given hosts. 1730 1731 @param hosts: The hosts to use. 1732 @param is_template: Whether the status should be "Template". 1733 """ 1734 if not hosts: 1735 # hostless job 1736 entry = HostQueueEntry.create(job=self, is_template=is_template) 1737 entry.save() 1738 return 1739 1740 for host in hosts: 1741 host.enqueue_job(self, is_template=is_template) 1742 1743 1744 def user(self): 1745 """Gets the user of this job, or None if it doesn't exist.""" 1746 try: 1747 return User.objects.get(login=self.owner) 1748 except self.DoesNotExist: 1749 return None 1750 1751 1752 def abort(self): 1753 """Aborts this job.""" 1754 for queue_entry in self.hostqueueentry_set.all(): 1755 queue_entry.abort() 1756 1757 1758 def tag(self): 1759 """Returns a string tag for this job.""" 1760 return server_utils.get_job_tag(self.id, self.owner) 1761 1762 1763 def keyval_dict(self): 1764 """Returns all keyvals for this job as a dictionary.""" 1765 return dict((keyval.key, keyval.value) 1766 for keyval in self.jobkeyval_set.all()) 1767 1768 1769 @classmethod 1770 def get_attribute_model(cls): 1771 """Return the attribute model. 1772 1773 Override method in parent class. This class is called when 1774 deserializing the one-to-many relationship betwen Job and JobKeyval. 1775 On deserialization, we will try to clear any existing job keyvals 1776 associated with a job to avoid any inconsistency. 1777 Though Job doesn't implement ModelWithAttribute, we still treat 1778 it as an attribute model for this purpose. 1779 1780 @returns: The attribute model of Job. 1781 """ 1782 return JobKeyval 1783 1784 1785 class Meta: 1786 """Metadata for class Job.""" 1787 db_table = 'afe_jobs' 1788 1789 def __unicode__(self): 1790 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1791 1792 1793class JobHandoff(dbmodels.Model, model_logic.ModelExtensions): 1794 """Jobs that have been handed off to lucifer.""" 1795 1796 job = dbmodels.OneToOneField(Job, on_delete=dbmodels.CASCADE, 1797 primary_key=True) 1798 created = dbmodels.DateTimeField(auto_now_add=True) 1799 completed = dbmodels.BooleanField(default=False) 1800 drone = dbmodels.CharField( 1801 max_length=128, null=True, 1802 help_text=''' 1803The hostname of the drone the job is running on and whose job_aborter 1804should be responsible for aborting the job if the job process dies. 1805NULL means any drone's job_aborter has free reign to abort the job. 1806''') 1807 1808 class Meta: 1809 """Metadata for class Job.""" 1810 db_table = 'afe_job_handoffs' 1811 1812 1813class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1814 """Keyvals associated with jobs""" 1815 1816 SERIALIZATION_LINKS_TO_KEEP = set(['job']) 1817 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 1818 1819 job = dbmodels.ForeignKey(Job) 1820 key = dbmodels.CharField(max_length=90) 1821 value = dbmodels.CharField(max_length=300) 1822 1823 objects = model_logic.ExtendedManager() 1824 1825 1826 @classmethod 1827 def get_record(cls, data): 1828 """Check the database for an identical record. 1829 1830 Use job_id and key to search for a existing record. 1831 1832 @raises: DoesNotExist, if no record found 1833 @raises: MultipleObjectsReturned if multiple records found. 1834 """ 1835 # TODO(fdeng): We should use job_id and key together as 1836 # a primary key in the db. 1837 return cls.objects.get(job_id=data['job_id'], key=data['key']) 1838 1839 1840 @classmethod 1841 def deserialize(cls, data): 1842 """Override deserialize in parent class. 1843 1844 Do not deserialize id as id is not kept consistent on master and shards. 1845 1846 @param data: A dictionary of data to deserialize. 1847 1848 @returns: A JobKeyval object. 1849 """ 1850 if data: 1851 data.pop('id') 1852 return super(JobKeyval, cls).deserialize(data) 1853 1854 1855 class Meta: 1856 """Metadata for class JobKeyval.""" 1857 db_table = 'afe_job_keyvals' 1858 1859 1860class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1861 """Represents an ineligible host queue.""" 1862 job = dbmodels.ForeignKey(Job) 1863 host = dbmodels.ForeignKey(Host) 1864 1865 objects = model_logic.ExtendedManager() 1866 1867 class Meta: 1868 """Metadata for class IneligibleHostQueue.""" 1869 db_table = 'afe_ineligible_host_queues' 1870 1871 1872class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1873 """Represents a host queue entry.""" 1874 1875 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1876 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 1877 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) 1878 1879 1880 def custom_deserialize_relation(self, link, data): 1881 assert link == 'meta_host' 1882 self.meta_host = Label.deserialize(data) 1883 1884 1885 def sanity_check_update_from_shard(self, shard, updated_serialized, 1886 job_ids_sent): 1887 if self.job_id not in job_ids_sent: 1888 raise error.IgnorableUnallowedRecordsSentToMaster( 1889 'Sent HostQueueEntry without corresponding ' 1890 'job entry: %s' % updated_serialized) 1891 1892 1893 Status = host_queue_entry_states.Status 1894 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1895 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1896 PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES 1897 IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES 1898 1899 job = dbmodels.ForeignKey(Job) 1900 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1901 status = dbmodels.CharField(max_length=255) 1902 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1903 db_column='meta_host') 1904 active = dbmodels.BooleanField(default=False) 1905 complete = dbmodels.BooleanField(default=False) 1906 deleted = dbmodels.BooleanField(default=False) 1907 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1908 default='') 1909 # If atomic_group is set, this is a virtual HostQueueEntry that will 1910 # be expanded into many actual hosts within the group at schedule time. 1911 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1912 aborted = dbmodels.BooleanField(default=False) 1913 started_on = dbmodels.DateTimeField(null=True, blank=True) 1914 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1915 1916 objects = model_logic.ExtendedManager() 1917 1918 1919 def __init__(self, *args, **kwargs): 1920 super(HostQueueEntry, self).__init__(*args, **kwargs) 1921 self._record_attributes(['status']) 1922 1923 1924 @classmethod 1925 def create(cls, job, host=None, meta_host=None, 1926 is_template=False): 1927 """Creates a new host queue entry. 1928 1929 @param cls: Implicit class object. 1930 @param job: The associated job. 1931 @param host: The associated host. 1932 @param meta_host: The associated meta host. 1933 @param is_template: Whether the status should be "Template". 1934 """ 1935 if is_template: 1936 status = cls.Status.TEMPLATE 1937 else: 1938 status = cls.Status.QUEUED 1939 1940 return cls(job=job, host=host, meta_host=meta_host, status=status) 1941 1942 1943 def save(self, *args, **kwargs): 1944 self._set_active_and_complete() 1945 super(HostQueueEntry, self).save(*args, **kwargs) 1946 self._check_for_updated_attributes() 1947 1948 1949 def execution_path(self): 1950 """ 1951 Path to this entry's results (relative to the base results directory). 1952 """ 1953 return server_utils.get_hqe_exec_path(self.job.tag(), 1954 self.execution_subdir) 1955 1956 1957 def host_or_metahost_name(self): 1958 """Returns the first non-None name found in priority order. 1959 1960 The priority order checked is: (1) host name; (2) meta host name 1961 """ 1962 if self.host: 1963 return self.host.hostname 1964 else: 1965 assert self.meta_host 1966 return self.meta_host.name 1967 1968 1969 def _set_active_and_complete(self): 1970 if self.status in self.ACTIVE_STATUSES: 1971 self.active, self.complete = True, False 1972 elif self.status in self.COMPLETE_STATUSES: 1973 self.active, self.complete = False, True 1974 else: 1975 self.active, self.complete = False, False 1976 1977 1978 def on_attribute_changed(self, attribute, old_value): 1979 assert attribute == 'status' 1980 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 1981 self.status) 1982 1983 1984 def is_meta_host_entry(self): 1985 'True if this is a entry has a meta_host instead of a host.' 1986 return self.host is None and self.meta_host is not None 1987 1988 1989 # This code is shared between rpc_interface and models.HostQueueEntry. 1990 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 1991 # a class method was the best way to refactor it. Attempting to put it in 1992 # rpc_utils or a new utils module failed as that would require us to import 1993 # models.py but to call it from here we would have to import the utils.py 1994 # thus creating a cycle. 1995 @classmethod 1996 def abort_host_queue_entries(cls, host_queue_entries): 1997 """Aborts a collection of host_queue_entries. 1998 1999 Abort these host queue entry and all host queue entries of jobs created 2000 by them. 2001 2002 @param host_queue_entries: List of host queue entries we want to abort. 2003 """ 2004 # This isn't completely immune to race conditions since it's not atomic, 2005 # but it should be safe given the scheduler's behavior. 2006 2007 # TODO(milleral): crbug.com/230100 2008 # The |abort_host_queue_entries| rpc does nearly exactly this, 2009 # however, trying to re-use the code generates some horrible 2010 # circular import error. I'd be nice to refactor things around 2011 # sometime so the code could be reused. 2012 2013 # Fixpoint algorithm to find the whole tree of HQEs to abort to 2014 # minimize the total number of database queries: 2015 children = set() 2016 new_children = set(host_queue_entries) 2017 while new_children: 2018 children.update(new_children) 2019 new_child_ids = [hqe.job_id for hqe in new_children] 2020 new_children = HostQueueEntry.objects.filter( 2021 job__parent_job__in=new_child_ids, 2022 complete=False, aborted=False).all() 2023 # To handle circular parental relationships 2024 new_children = set(new_children) - children 2025 2026 # Associate a user with the host queue entries that we're about 2027 # to abort so that we can look up who to blame for the aborts. 2028 now = datetime.now() 2029 user = User.current_user() 2030 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 2031 aborted_by=user, aborted_on=now) for hqe in children] 2032 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 2033 # Bulk update all of the HQEs to set the abort bit. 2034 child_ids = [hqe.id for hqe in children] 2035 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 2036 2037 2038 def abort(self): 2039 """ Aborts this host queue entry. 2040 2041 Abort this host queue entry and all host queue entries of jobs created by 2042 this one. 2043 2044 """ 2045 if not self.complete and not self.aborted: 2046 HostQueueEntry.abort_host_queue_entries([self]) 2047 2048 2049 @classmethod 2050 def compute_full_status(cls, status, aborted, complete): 2051 """Returns a modified status msg if the host queue entry was aborted. 2052 2053 @param cls: Implicit class object. 2054 @param status: The original status message. 2055 @param aborted: Whether the host queue entry was aborted. 2056 @param complete: Whether the host queue entry was completed. 2057 """ 2058 if aborted and not complete: 2059 return 'Aborted (%s)' % status 2060 return status 2061 2062 2063 def full_status(self): 2064 """Returns the full status of this host queue entry, as a string.""" 2065 return self.compute_full_status(self.status, self.aborted, 2066 self.complete) 2067 2068 2069 def _postprocess_object_dict(self, object_dict): 2070 object_dict['full_status'] = self.full_status() 2071 2072 2073 class Meta: 2074 """Metadata for class HostQueueEntry.""" 2075 db_table = 'afe_host_queue_entries' 2076 2077 2078 2079 def __unicode__(self): 2080 hostname = None 2081 if self.host: 2082 hostname = self.host.hostname 2083 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 2084 2085 2086class HostQueueEntryStartTimes(dbmodels.Model): 2087 """An auxilary table to HostQueueEntry to index by start time.""" 2088 insert_time = dbmodels.DateTimeField() 2089 highest_hqe_id = dbmodels.IntegerField() 2090 2091 class Meta: 2092 """Metadata for class HostQueueEntryStartTimes.""" 2093 db_table = 'afe_host_queue_entry_start_times' 2094 2095 2096class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 2097 """Represents an aborted host queue entry.""" 2098 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 2099 aborted_by = dbmodels.ForeignKey(User) 2100 aborted_on = dbmodels.DateTimeField() 2101 2102 objects = model_logic.ExtendedManager() 2103 2104 2105 def save(self, *args, **kwargs): 2106 self.aborted_on = datetime.now() 2107 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 2108 2109 class Meta: 2110 """Metadata for class AbortedHostQueueEntry.""" 2111 db_table = 'afe_aborted_host_queue_entries' 2112 2113 2114class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 2115 """\ 2116 Tasks to run on hosts at the next time they are in the Ready state. Use this 2117 for high-priority tasks, such as forced repair or forced reinstall. 2118 2119 host: host to run this task on 2120 task: special task to run 2121 time_requested: date and time the request for this task was made 2122 is_active: task is currently running 2123 is_complete: task has finished running 2124 is_aborted: task was aborted 2125 time_started: date and time the task started 2126 time_finished: date and time the task finished 2127 queue_entry: Host queue entry waiting on this task (or None, if task was not 2128 started in preparation of a job) 2129 """ 2130 Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision', 2131 string_values=True) 2132 2133 host = dbmodels.ForeignKey(Host, blank=False, null=False) 2134 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 2135 blank=False, null=False) 2136 requested_by = dbmodels.ForeignKey(User) 2137 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 2138 null=False) 2139 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 2140 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 2141 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 2142 time_started = dbmodels.DateTimeField(null=True, blank=True) 2143 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 2144 success = dbmodels.BooleanField(default=False, blank=False, null=False) 2145 time_finished = dbmodels.DateTimeField(null=True, blank=True) 2146 2147 objects = model_logic.ExtendedManager() 2148 2149 2150 def save(self, **kwargs): 2151 if self.queue_entry: 2152 self.requested_by = User.objects.get( 2153 login=self.queue_entry.job.owner) 2154 super(SpecialTask, self).save(**kwargs) 2155 2156 2157 def execution_path(self): 2158 """Returns the execution path for a special task.""" 2159 return server_utils.get_special_task_exec_path( 2160 self.host.hostname, self.id, self.task, self.time_requested) 2161 2162 2163 # property to emulate HostQueueEntry.status 2164 @property 2165 def status(self): 2166 """Returns a host queue entry status appropriate for a speical task.""" 2167 return server_utils.get_special_task_status( 2168 self.is_complete, self.success, self.is_active) 2169 2170 2171 # property to emulate HostQueueEntry.started_on 2172 @property 2173 def started_on(self): 2174 """Returns the time at which this special task started.""" 2175 return self.time_started 2176 2177 2178 @classmethod 2179 def schedule_special_task(cls, host, task): 2180 """Schedules a special task on a host if not already scheduled. 2181 2182 @param cls: Implicit class object. 2183 @param host: The host to use. 2184 @param task: The task to schedule. 2185 """ 2186 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 2187 is_active=False, 2188 is_complete=False) 2189 if existing_tasks: 2190 return existing_tasks[0] 2191 2192 special_task = SpecialTask(host=host, task=task, 2193 requested_by=User.current_user()) 2194 special_task.save() 2195 return special_task 2196 2197 2198 def abort(self): 2199 """ Abort this special task.""" 2200 self.is_aborted = True 2201 self.save() 2202 2203 2204 def activate(self): 2205 """ 2206 Sets a task as active and sets the time started to the current time. 2207 """ 2208 logging.info('Starting: %s', self) 2209 self.is_active = True 2210 self.time_started = datetime.now() 2211 self.save() 2212 2213 2214 def finish(self, success): 2215 """Sets a task as completed. 2216 2217 @param success: Whether or not the task was successful. 2218 """ 2219 logging.info('Finished: %s', self) 2220 self.is_active = False 2221 self.is_complete = True 2222 self.success = success 2223 if self.time_started: 2224 self.time_finished = datetime.now() 2225 self.save() 2226 2227 2228 class Meta: 2229 """Metadata for class SpecialTask.""" 2230 db_table = 'afe_special_tasks' 2231 2232 2233 def __unicode__(self): 2234 result = u'Special Task %s (host %s, task %s, time %s)' % ( 2235 self.id, self.host, self.task, self.time_requested) 2236 if self.is_complete: 2237 result += u' (completed)' 2238 elif self.is_active: 2239 result += u' (active)' 2240 2241 return result 2242 2243 2244class StableVersion(dbmodels.Model, model_logic.ModelExtensions): 2245 2246 board = dbmodels.CharField(max_length=255, unique=True) 2247 version = dbmodels.CharField(max_length=255) 2248 2249 class Meta: 2250 """Metadata for class StableVersion.""" 2251 db_table = 'afe_stable_versions' 2252