1# pylint: disable=missing-docstring 2 3import logging 4from datetime import datetime 5import django.core 6try: 7 from django.db import models as dbmodels, connection 8except django.core.exceptions.ImproperlyConfigured: 9 raise ImportError('Django database not yet configured. Import either ' 10 'setup_django_environment or ' 11 'setup_django_lite_environment from ' 12 'autotest_lib.frontend before any imports that ' 13 'depend on django models.') 14from xml.sax import saxutils 15import common 16from autotest_lib.frontend.afe import model_logic, model_attributes 17from autotest_lib.frontend.afe import rdb_model_extensions 18from autotest_lib.frontend import settings, thread_local 19from autotest_lib.client.common_lib import enum, error, host_protections 20from autotest_lib.client.common_lib import global_config 21from autotest_lib.client.common_lib import host_queue_entry_states 22from autotest_lib.client.common_lib import control_data, priorities, decorators 23from autotest_lib.server import utils as server_utils 24 25# job options and user preferences 26DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 27DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 28 29RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 30 'SKYLAB', 'respect_static_labels', type=bool, default=False) 31 32RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value( 33 'SKYLAB', 'respect_static_attributes', type=bool, default=False) 34 35 36class AclAccessViolation(Exception): 37 """\ 38 Raised when an operation is attempted with proper permissions as 39 dictated by ACLs. 40 """ 41 42 43class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 44 """\ 45 An atomic group defines a collection of hosts which must only be scheduled 46 all at once. Any host with a label having an atomic group will only be 47 scheduled for a job at the same time as other hosts sharing that label. 48 49 Required: 50 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 51 max_number_of_machines: The maximum number of machines that will be 52 scheduled at once when scheduling jobs to this atomic group. 53 The job.synch_count is considered the minimum. 54 55 Optional: 56 description: Arbitrary text description of this group's purpose. 57 """ 58 name = dbmodels.CharField(max_length=255, unique=True) 59 description = dbmodels.TextField(blank=True) 60 # This magic value is the default to simplify the scheduler logic. 61 # It must be "large". The common use of atomic groups is to want all 62 # machines in the group to be used, limits on which subset used are 63 # often chosen via dependency labels. 64 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 65 # "infinity" is around 3.3 million. 66 INFINITE_MACHINES = 333333333 67 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 68 invalid = dbmodels.BooleanField(default=False, 69 editable=settings.FULL_ADMIN) 70 71 name_field = 'name' 72 objects = model_logic.ModelWithInvalidManager() 73 valid_objects = model_logic.ValidObjectsManager() 74 75 76 def enqueue_job(self, job, is_template=False): 77 """Enqueue a job on an associated atomic group of hosts. 78 79 @param job: A job to enqueue. 80 @param is_template: Whether the status should be "Template". 81 """ 82 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 83 is_template=is_template) 84 queue_entry.save() 85 86 87 def clean_object(self): 88 self.label_set.clear() 89 90 91 class Meta: 92 """Metadata for class AtomicGroup.""" 93 db_table = 'afe_atomic_groups' 94 95 96 def __unicode__(self): 97 return unicode(self.name) 98 99 100class Label(model_logic.ModelWithInvalid, dbmodels.Model): 101 """\ 102 Required: 103 name: label name 104 105 Optional: 106 kernel_config: URL/path to kernel config for jobs run on this label. 107 platform: If True, this is a platform label (defaults to False). 108 only_if_needed: If True, a Host with this label can only be used if that 109 label is requested by the job/test (either as the meta_host or 110 in the job_dependencies). 111 atomic_group: The atomic group associated with this label. 112 """ 113 name = dbmodels.CharField(max_length=255, unique=True) 114 kernel_config = dbmodels.CharField(max_length=255, blank=True) 115 platform = dbmodels.BooleanField(default=False) 116 invalid = dbmodels.BooleanField(default=False, 117 editable=settings.FULL_ADMIN) 118 only_if_needed = dbmodels.BooleanField(default=False) 119 120 name_field = 'name' 121 objects = model_logic.ModelWithInvalidManager() 122 valid_objects = model_logic.ValidObjectsManager() 123 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 124 125 126 def clean_object(self): 127 self.host_set.clear() 128 self.test_set.clear() 129 130 131 def enqueue_job(self, job, is_template=False): 132 """Enqueue a job on any host of this label. 133 134 @param job: A job to enqueue. 135 @param is_template: Whether the status should be "Template". 136 """ 137 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 138 is_template=is_template) 139 queue_entry.save() 140 141 142 143 class Meta: 144 """Metadata for class Label.""" 145 db_table = 'afe_labels' 146 147 148 def __unicode__(self): 149 return unicode(self.name) 150 151 152 def is_replaced_by_static(self): 153 """Detect whether a label is replaced by a static label. 154 155 'Static' means it can only be modified by skylab inventory tools. 156 """ 157 if RESPECT_STATIC_LABELS: 158 replaced = ReplacedLabel.objects.filter(label__id=self.id) 159 if len(replaced) > 0: 160 return True 161 162 return False 163 164 165class StaticLabel(model_logic.ModelWithInvalid, dbmodels.Model): 166 """\ 167 Required: 168 name: label name 169 170 Optional: 171 kernel_config: URL/path to kernel config for jobs run on this label. 172 platform: If True, this is a platform label (defaults to False). 173 only_if_needed: Deprecated. This is always False. 174 atomic_group: Deprecated. This is always NULL. 175 """ 176 name = dbmodels.CharField(max_length=255, unique=True) 177 kernel_config = dbmodels.CharField(max_length=255, blank=True) 178 platform = dbmodels.BooleanField(default=False) 179 invalid = dbmodels.BooleanField(default=False, 180 editable=settings.FULL_ADMIN) 181 only_if_needed = dbmodels.BooleanField(default=False) 182 183 name_field = 'name' 184 objects = model_logic.ModelWithInvalidManager() 185 valid_objects = model_logic.ValidObjectsManager() 186 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 187 188 def clean_object(self): 189 self.host_set.clear() 190 self.test_set.clear() 191 192 193 class Meta: 194 """Metadata for class StaticLabel.""" 195 db_table = 'afe_static_labels' 196 197 198 def __unicode__(self): 199 return unicode(self.name) 200 201 202class ReplacedLabel(dbmodels.Model, model_logic.ModelExtensions): 203 """The tag to indicate Whether to replace labels with static labels.""" 204 label = dbmodels.ForeignKey(Label) 205 objects = model_logic.ExtendedManager() 206 207 208 class Meta: 209 """Metadata for class ReplacedLabel.""" 210 db_table = 'afe_replaced_labels' 211 212 213 def __unicode__(self): 214 return unicode(self.label) 215 216 217class Shard(dbmodels.Model, model_logic.ModelExtensions): 218 219 hostname = dbmodels.CharField(max_length=255, unique=True) 220 221 name_field = 'hostname' 222 223 labels = dbmodels.ManyToManyField(Label, blank=True, 224 db_table='afe_shards_labels') 225 226 class Meta: 227 """Metadata for class ParameterizedJob.""" 228 db_table = 'afe_shards' 229 230 231class Drone(dbmodels.Model, model_logic.ModelExtensions): 232 """ 233 A scheduler drone 234 235 hostname: the drone's hostname 236 """ 237 hostname = dbmodels.CharField(max_length=255, unique=True) 238 239 name_field = 'hostname' 240 objects = model_logic.ExtendedManager() 241 242 243 def save(self, *args, **kwargs): 244 if not User.current_user().is_superuser(): 245 raise Exception('Only superusers may edit drones') 246 super(Drone, self).save(*args, **kwargs) 247 248 249 def delete(self): 250 if not User.current_user().is_superuser(): 251 raise Exception('Only superusers may delete drones') 252 super(Drone, self).delete() 253 254 255 class Meta: 256 """Metadata for class Drone.""" 257 db_table = 'afe_drones' 258 259 def __unicode__(self): 260 return unicode(self.hostname) 261 262 263class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 264 """ 265 A set of scheduler drones 266 267 These will be used by the scheduler to decide what drones a job is allowed 268 to run on. 269 270 name: the drone set's name 271 drones: the drones that are part of the set 272 """ 273 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 274 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 275 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 276 'SCHEDULER', 'default_drone_set_name', default=None) 277 278 name = dbmodels.CharField(max_length=255, unique=True) 279 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 280 281 name_field = 'name' 282 objects = model_logic.ExtendedManager() 283 284 285 def save(self, *args, **kwargs): 286 if not User.current_user().is_superuser(): 287 raise Exception('Only superusers may edit drone sets') 288 super(DroneSet, self).save(*args, **kwargs) 289 290 291 def delete(self): 292 if not User.current_user().is_superuser(): 293 raise Exception('Only superusers may delete drone sets') 294 super(DroneSet, self).delete() 295 296 297 @classmethod 298 def drone_sets_enabled(cls): 299 """Returns whether drone sets are enabled. 300 301 @param cls: Implicit class object. 302 """ 303 return cls.DRONE_SETS_ENABLED 304 305 306 @classmethod 307 def default_drone_set_name(cls): 308 """Returns the default drone set name. 309 310 @param cls: Implicit class object. 311 """ 312 return cls.DEFAULT_DRONE_SET_NAME 313 314 315 @classmethod 316 def get_default(cls): 317 """Gets the default drone set name, compatible with Job.add_object. 318 319 @param cls: Implicit class object. 320 """ 321 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 322 323 324 @classmethod 325 def resolve_name(cls, drone_set_name): 326 """ 327 Returns the name of one of these, if not None, in order of preference: 328 1) the drone set given, 329 2) the current user's default drone set, or 330 3) the global default drone set 331 332 or returns None if drone sets are disabled 333 334 @param cls: Implicit class object. 335 @param drone_set_name: A drone set name. 336 """ 337 if not cls.drone_sets_enabled(): 338 return None 339 340 user = User.current_user() 341 user_drone_set_name = user.drone_set and user.drone_set.name 342 343 return drone_set_name or user_drone_set_name or cls.get_default().name 344 345 346 def get_drone_hostnames(self): 347 """ 348 Gets the hostnames of all drones in this drone set 349 """ 350 return set(self.drones.all().values_list('hostname', flat=True)) 351 352 353 class Meta: 354 """Metadata for class DroneSet.""" 355 db_table = 'afe_drone_sets' 356 357 def __unicode__(self): 358 return unicode(self.name) 359 360 361class User(dbmodels.Model, model_logic.ModelExtensions): 362 """\ 363 Required: 364 login :user login name 365 366 Optional: 367 access_level: 0=User (default), 1=Admin, 100=Root 368 """ 369 ACCESS_ROOT = 100 370 ACCESS_ADMIN = 1 371 ACCESS_USER = 0 372 373 AUTOTEST_SYSTEM = 'autotest_system' 374 375 login = dbmodels.CharField(max_length=255, unique=True) 376 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 377 378 # user preferences 379 reboot_before = dbmodels.SmallIntegerField( 380 choices=model_attributes.RebootBefore.choices(), blank=True, 381 default=DEFAULT_REBOOT_BEFORE) 382 reboot_after = dbmodels.SmallIntegerField( 383 choices=model_attributes.RebootAfter.choices(), blank=True, 384 default=DEFAULT_REBOOT_AFTER) 385 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 386 show_experimental = dbmodels.BooleanField(default=False) 387 388 name_field = 'login' 389 objects = model_logic.ExtendedManager() 390 391 392 def save(self, *args, **kwargs): 393 # is this a new object being saved for the first time? 394 first_time = (self.id is None) 395 user = thread_local.get_user() 396 if user and not user.is_superuser() and user.login != self.login: 397 raise AclAccessViolation("You cannot modify user " + self.login) 398 super(User, self).save(*args, **kwargs) 399 if first_time: 400 everyone = AclGroup.objects.get(name='Everyone') 401 everyone.users.add(self) 402 403 404 def is_superuser(self): 405 """Returns whether the user has superuser access.""" 406 return self.access_level >= self.ACCESS_ROOT 407 408 409 @classmethod 410 def current_user(cls): 411 """Returns the current user. 412 413 @param cls: Implicit class object. 414 """ 415 user = thread_local.get_user() 416 if user is None: 417 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 418 user.access_level = cls.ACCESS_ROOT 419 user.save() 420 return user 421 422 423 @classmethod 424 def get_record(cls, data): 425 """Check the database for an identical record. 426 427 Check for a record with matching id and login. If one exists, 428 return it. If one does not exist there is a possibility that 429 the following cases have happened: 430 1. Same id, different login 431 We received: "1 chromeos-test" 432 And we have: "1 debug-user" 433 In this case we need to delete "1 debug_user" and insert 434 "1 chromeos-test". 435 436 2. Same login, different id: 437 We received: "1 chromeos-test" 438 And we have: "2 chromeos-test" 439 In this case we need to delete "2 chromeos-test" and insert 440 "1 chromeos-test". 441 442 As long as this method deletes bad records and raises the 443 DoesNotExist exception the caller will handle creating the 444 new record. 445 446 @raises: DoesNotExist, if a record with the matching login and id 447 does not exist. 448 """ 449 450 # Both the id and login should be uniqe but there are cases when 451 # we might already have a user with the same login/id because 452 # current_user will proactively create a user record if it doesn't 453 # exist. Since we want to avoid conflict between the master and 454 # shard, just delete any existing user records that don't match 455 # what we're about to deserialize from the master. 456 try: 457 return cls.objects.get(login=data['login'], id=data['id']) 458 except cls.DoesNotExist: 459 cls.delete_matching_record(login=data['login']) 460 cls.delete_matching_record(id=data['id']) 461 raise 462 463 464 class Meta: 465 """Metadata for class User.""" 466 db_table = 'afe_users' 467 468 def __unicode__(self): 469 return unicode(self.login) 470 471 472class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 473 model_logic.ModelWithAttributes): 474 """\ 475 Required: 476 hostname 477 478 optional: 479 locked: if true, host is locked and will not be queued 480 481 Internal: 482 From AbstractHostModel: 483 status: string describing status of host 484 invalid: true if the host has been deleted 485 protection: indicates what can be done to this host during repair 486 lock_time: DateTime at which the host was locked 487 dirty: true if the host has been used without being rebooted 488 Local: 489 locked_by: user that locked the host, or null if the host is unlocked 490 """ 491 492 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 493 'hostattribute_set', 494 'labels', 495 'shard']) 496 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) 497 498 499 def custom_deserialize_relation(self, link, data): 500 assert link == 'shard', 'Link %s should not be deserialized' % link 501 self.shard = Shard.deserialize(data) 502 503 504 # Note: Only specify foreign keys here, specify all native host columns in 505 # rdb_model_extensions instead. 506 Protection = host_protections.Protection 507 labels = dbmodels.ManyToManyField(Label, blank=True, 508 db_table='afe_hosts_labels') 509 static_labels = dbmodels.ManyToManyField( 510 StaticLabel, blank=True, db_table='afe_static_hosts_labels') 511 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 512 name_field = 'hostname' 513 objects = model_logic.ModelWithInvalidManager() 514 valid_objects = model_logic.ValidObjectsManager() 515 leased_objects = model_logic.LeasedHostManager() 516 517 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 518 519 def __init__(self, *args, **kwargs): 520 super(Host, self).__init__(*args, **kwargs) 521 self._record_attributes(['status']) 522 523 524 @classmethod 525 def classify_labels(cls, label_names): 526 """Split labels to static & non-static. 527 528 @label_names: a list of labels (string). 529 530 @returns: a list of StaticLabel objects & a list of 531 (non-static) Label objects. 532 """ 533 if not label_names: 534 return [], [] 535 536 labels = Label.objects.filter(name__in=label_names) 537 538 if not RESPECT_STATIC_LABELS: 539 return [], labels 540 541 return cls.classify_label_objects(labels) 542 543 544 @classmethod 545 def classify_label_objects(cls, label_objects): 546 replaced_labels = ReplacedLabel.objects.filter(label__in=label_objects) 547 replaced_ids = [l.label.id for l in replaced_labels] 548 non_static_labels = [ 549 l for l in label_objects if not l.id in replaced_ids] 550 static_label_names = [ 551 l.name for l in label_objects if l.id in replaced_ids] 552 static_labels = StaticLabel.objects.filter(name__in=static_label_names) 553 return static_labels, non_static_labels 554 555 556 @classmethod 557 def get_hosts_with_labels(cls, label_names, initial_query): 558 """Get hosts by label filters. 559 560 @param label_names: label (string) lists for fetching hosts. 561 @param initial_query: a model_logic.QuerySet of Host object, e.g. 562 563 Host.objects.all(), Host.valid_objects.all(). 564 565 This initial_query cannot be a sliced QuerySet, e.g. 566 567 Host.objects.all().filter(query_limit=10) 568 """ 569 if not label_names: 570 return initial_query 571 572 static_labels, non_static_labels = cls.classify_labels(label_names) 573 if len(static_labels) + len(non_static_labels) != len(label_names): 574 # Some labels don't exist in afe db, which means no hosts 575 # should be matched. 576 return set() 577 578 for l in static_labels: 579 initial_query = initial_query.filter(static_labels=l) 580 581 for l in non_static_labels: 582 initial_query = initial_query.filter(labels=l) 583 584 return initial_query 585 586 587 @classmethod 588 def get_hosts_with_label_ids(cls, label_ids, initial_query): 589 """Get hosts by label_id filters. 590 591 @param label_ids: label id (int) lists for fetching hosts. 592 @param initial_query: a list of Host object, e.g. 593 [<Host: 100.107.151.253>, <Host: 100.107.151.251>, ...] 594 """ 595 labels = Label.objects.filter(id__in=label_ids) 596 label_names = [l.name for l in labels] 597 return cls.get_hosts_with_labels(label_names, initial_query) 598 599 600 @staticmethod 601 def create_one_time_host(hostname): 602 """Creates a one-time host. 603 604 @param hostname: The name for the host. 605 """ 606 query = Host.objects.filter(hostname=hostname) 607 if query.count() == 0: 608 host = Host(hostname=hostname, invalid=True) 609 host.do_validate() 610 else: 611 host = query[0] 612 if not host.invalid: 613 raise model_logic.ValidationError({ 614 'hostname' : '%s already exists in the autotest DB. ' 615 'Select it rather than entering it as a one time ' 616 'host.' % hostname 617 }) 618 host.protection = host_protections.Protection.DO_NOT_REPAIR 619 host.locked = False 620 host.save() 621 host.clean_object() 622 return host 623 624 625 @classmethod 626 def _assign_to_shard_nothing_helper(cls): 627 """Does nothing. 628 629 This method is called in the middle of assign_to_shard, and does 630 nothing. It exists to allow integration tests to simulate a race 631 condition.""" 632 633 634 @classmethod 635 def assign_to_shard(cls, shard, known_ids): 636 """Assigns hosts to a shard. 637 638 For all labels that have been assigned to a shard, all hosts that 639 have at least one of the shard's labels are assigned to the shard. 640 Hosts that are assigned to the shard but aren't already present on the 641 shard are returned. 642 643 Any boards that are in |known_ids| but that do not belong to the shard 644 are incorrect ids, which are also returned so that the shard can remove 645 them locally. 646 647 Board to shard mapping is many-to-one. Many different boards can be 648 hosted in a shard. However, DUTs of a single board cannot be distributed 649 into more than one shard. 650 651 @param shard: The shard object to assign labels/hosts for. 652 @param known_ids: List of all host-ids the shard already knows. 653 This is used to figure out which hosts should be sent 654 to the shard. If shard_ids were used instead, hosts 655 would only be transferred once, even if the client 656 failed persisting them. 657 The number of hosts usually lies in O(100), so the 658 overhead is acceptable. 659 660 @returns a tuple of (hosts objects that should be sent to the shard, 661 incorrect host ids that should not belong to] 662 shard) 663 """ 664 # Disclaimer: concurrent heartbeats should theoretically not occur in 665 # the current setup. As they may be introduced in the near future, 666 # this comment will be left here. 667 668 # Sending stuff twice is acceptable, but forgetting something isn't. 669 # Detecting duplicates on the client is easy, but here it's harder. The 670 # following options were considered: 671 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 672 # than select returned, as concurrently more hosts might have been 673 # inserted 674 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 675 # hosts for the shard, this is overhead 676 # - SELECT and then UPDATE only selected without requerying afterwards: 677 # returns the old state of the records. 678 new_hosts = [] 679 680 possible_new_host_ids = set(Host.objects.filter( 681 labels__in=shard.labels.all(), 682 leased=False 683 ).exclude( 684 id__in=known_ids, 685 ).values_list('pk', flat=True)) 686 687 # No-op in production, used to simulate race condition in tests. 688 cls._assign_to_shard_nothing_helper() 689 690 if possible_new_host_ids: 691 Host.objects.filter( 692 pk__in=possible_new_host_ids, 693 labels__in=shard.labels.all(), 694 leased=False 695 ).update(shard=shard) 696 new_hosts = list(Host.objects.filter( 697 pk__in=possible_new_host_ids, 698 shard=shard 699 ).all()) 700 701 invalid_host_ids = list(Host.objects.filter( 702 id__in=known_ids 703 ).exclude( 704 shard=shard 705 ).values_list('pk', flat=True)) 706 707 return new_hosts, invalid_host_ids 708 709 def resurrect_object(self, old_object): 710 super(Host, self).resurrect_object(old_object) 711 # invalid hosts can be in use by the scheduler (as one-time hosts), so 712 # don't change the status 713 self.status = old_object.status 714 715 716 def clean_object(self): 717 self.aclgroup_set.clear() 718 self.labels.clear() 719 self.static_labels.clear() 720 721 722 def save(self, *args, **kwargs): 723 # extra spaces in the hostname can be a sneaky source of errors 724 self.hostname = self.hostname.strip() 725 # is this a new object being saved for the first time? 726 first_time = (self.id is None) 727 if not first_time: 728 AclGroup.check_for_acl_violation_hosts([self]) 729 # If locked is changed, send its status and user made the change to 730 # metaDB. Locks are important in host history because if a device is 731 # locked then we don't really care what state it is in. 732 if self.locked and not self.locked_by: 733 self.locked_by = User.current_user() 734 if not self.lock_time: 735 self.lock_time = datetime.now() 736 self.dirty = True 737 elif not self.locked and self.locked_by: 738 self.locked_by = None 739 self.lock_time = None 740 super(Host, self).save(*args, **kwargs) 741 if first_time: 742 everyone = AclGroup.objects.get(name='Everyone') 743 everyone.hosts.add(self) 744 # remove attributes that may have lingered from an old host and 745 # should not be associated with a new host 746 for host_attribute in self.hostattribute_set.all(): 747 self.delete_attribute(host_attribute.attribute) 748 self._check_for_updated_attributes() 749 750 751 def delete(self): 752 AclGroup.check_for_acl_violation_hosts([self]) 753 logging.info('Preconditions for deleting host %s...', self.hostname) 754 for queue_entry in self.hostqueueentry_set.all(): 755 logging.info(' Deleting and aborting hqe %s...', queue_entry) 756 queue_entry.deleted = True 757 queue_entry.abort() 758 logging.info(' ... done with hqe %s.', queue_entry) 759 for host_attribute in self.hostattribute_set.all(): 760 logging.info(' Deleting attribute %s...', host_attribute) 761 self.delete_attribute(host_attribute.attribute) 762 logging.info(' ... done with attribute %s.', host_attribute) 763 logging.info('... preconditions done for host %s.', self.hostname) 764 logging.info('Deleting host %s...', self.hostname) 765 super(Host, self).delete() 766 logging.info('... done.') 767 768 769 def on_attribute_changed(self, attribute, old_value): 770 assert attribute == 'status' 771 logging.info('%s -> %s', self.hostname, self.status) 772 773 774 def enqueue_job(self, job, is_template=False): 775 """Enqueue a job on this host. 776 777 @param job: A job to enqueue. 778 @param is_template: Whther the status should be "Template". 779 """ 780 queue_entry = HostQueueEntry.create(host=self, job=job, 781 is_template=is_template) 782 # allow recovery of dead hosts from the frontend 783 if not self.active_queue_entry() and self.is_dead(): 784 self.status = Host.Status.READY 785 self.save() 786 queue_entry.save() 787 788 block = IneligibleHostQueue(job=job, host=self) 789 block.save() 790 791 792 def platform(self): 793 """The platform of the host.""" 794 # TODO(showard): slighly hacky? 795 platforms = self.labels.filter(platform=True) 796 if len(platforms) == 0: 797 return None 798 return platforms[0] 799 platform.short_description = 'Platform' 800 801 802 @classmethod 803 def check_no_platform(cls, hosts): 804 """Verify the specified hosts have no associated platforms. 805 806 @param cls: Implicit class object. 807 @param hosts: The hosts to verify. 808 @raises model_logic.ValidationError if any hosts already have a 809 platform. 810 """ 811 Host.objects.populate_relationships(hosts, Label, 'label_list') 812 Host.objects.populate_relationships(hosts, StaticLabel, 813 'staticlabel_list') 814 errors = [] 815 for host in hosts: 816 platforms = [label.name for label in host.label_list 817 if label.platform] 818 if RESPECT_STATIC_LABELS: 819 platforms += [label.name for label in host.staticlabel_list 820 if label.platform] 821 822 if platforms: 823 # do a join, just in case this host has multiple platforms, 824 # we'll be able to see it 825 errors.append('Host %s already has a platform: %s' % ( 826 host.hostname, ', '.join(platforms))) 827 if errors: 828 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 829 830 831 @classmethod 832 def check_board_labels_allowed(cls, hosts, new_labels=[]): 833 """Verify the specified hosts have valid board labels and the given 834 new board labels can be added. 835 836 @param cls: Implicit class object. 837 @param hosts: The hosts to verify. 838 @param new_labels: A list of labels to be added to the hosts. 839 840 @raises model_logic.ValidationError if any host has invalid board labels 841 or the given board labels cannot be added to the hsots. 842 """ 843 Host.objects.populate_relationships(hosts, Label, 'label_list') 844 Host.objects.populate_relationships(hosts, StaticLabel, 845 'staticlabel_list') 846 errors = [] 847 for host in hosts: 848 boards = [label.name for label in host.label_list 849 if label.name.startswith('board:')] 850 if RESPECT_STATIC_LABELS: 851 boards += [label.name for label in host.staticlabel_list 852 if label.name.startswith('board:')] 853 854 if not server_utils.board_labels_allowed(boards + new_labels): 855 # do a join, just in case this host has multiple boards, 856 # we'll be able to see it 857 errors.append('Host %s already has board labels: %s' % ( 858 host.hostname, ', '.join(boards))) 859 if errors: 860 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 861 862 863 def is_dead(self): 864 """Returns whether the host is dead (has status repair failed).""" 865 return self.status == Host.Status.REPAIR_FAILED 866 867 868 def active_queue_entry(self): 869 """Returns the active queue entry for this host, or None if none.""" 870 active = list(self.hostqueueentry_set.filter(active=True)) 871 if not active: 872 return None 873 assert len(active) == 1, ('More than one active entry for ' 874 'host ' + self.hostname) 875 return active[0] 876 877 878 def _get_attribute_model_and_args(self, attribute): 879 return HostAttribute, dict(host=self, attribute=attribute) 880 881 882 def _get_static_attribute_model_and_args(self, attribute): 883 return StaticHostAttribute, dict(host=self, attribute=attribute) 884 885 886 def _is_replaced_by_static_attribute(self, attribute): 887 if RESPECT_STATIC_ATTRIBUTES: 888 model, args = self._get_static_attribute_model_and_args(attribute) 889 try: 890 static_attr = model.objects.get(**args) 891 return True 892 except StaticHostAttribute.DoesNotExist: 893 return False 894 895 return False 896 897 898 @classmethod 899 def get_attribute_model(cls): 900 """Return the attribute model. 901 902 Override method in parent class. See ModelExtensions for details. 903 @returns: The attribute model of Host. 904 """ 905 return HostAttribute 906 907 908 class Meta: 909 """Metadata for the Host class.""" 910 db_table = 'afe_hosts' 911 912 913 def __unicode__(self): 914 return unicode(self.hostname) 915 916 917class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): 918 """Arbitrary keyvals associated with hosts.""" 919 920 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 921 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 922 host = dbmodels.ForeignKey(Host) 923 attribute = dbmodels.CharField(max_length=90) 924 value = dbmodels.CharField(max_length=300) 925 926 objects = model_logic.ExtendedManager() 927 928 class Meta: 929 """Metadata for the HostAttribute class.""" 930 db_table = 'afe_host_attributes' 931 932 933 @classmethod 934 def get_record(cls, data): 935 """Check the database for an identical record. 936 937 Use host_id and attribute to search for a existing record. 938 939 @raises: DoesNotExist, if no record found 940 @raises: MultipleObjectsReturned if multiple records found. 941 """ 942 # TODO(fdeng): We should use host_id and attribute together as 943 # a primary key in the db. 944 return cls.objects.get(host_id=data['host_id'], 945 attribute=data['attribute']) 946 947 948 @classmethod 949 def deserialize(cls, data): 950 """Override deserialize in parent class. 951 952 Do not deserialize id as id is not kept consistent on master and shards. 953 954 @param data: A dictionary of data to deserialize. 955 956 @returns: A HostAttribute object. 957 """ 958 if data: 959 data.pop('id') 960 return super(HostAttribute, cls).deserialize(data) 961 962 963class StaticHostAttribute(dbmodels.Model, model_logic.ModelExtensions): 964 """Static arbitrary keyvals associated with hosts.""" 965 966 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 967 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 968 host = dbmodels.ForeignKey(Host) 969 attribute = dbmodels.CharField(max_length=90) 970 value = dbmodels.CharField(max_length=300) 971 972 objects = model_logic.ExtendedManager() 973 974 class Meta: 975 """Metadata for the StaticHostAttribute class.""" 976 db_table = 'afe_static_host_attributes' 977 978 979 @classmethod 980 def get_record(cls, data): 981 """Check the database for an identical record. 982 983 Use host_id and attribute to search for a existing record. 984 985 @raises: DoesNotExist, if no record found 986 @raises: MultipleObjectsReturned if multiple records found. 987 """ 988 return cls.objects.get(host_id=data['host_id'], 989 attribute=data['attribute']) 990 991 992 @classmethod 993 def deserialize(cls, data): 994 """Override deserialize in parent class. 995 996 Do not deserialize id as id is not kept consistent on master and shards. 997 998 @param data: A dictionary of data to deserialize. 999 1000 @returns: A StaticHostAttribute object. 1001 """ 1002 if data: 1003 data.pop('id') 1004 return super(StaticHostAttribute, cls).deserialize(data) 1005 1006 1007class Test(dbmodels.Model, model_logic.ModelExtensions): 1008 """\ 1009 Required: 1010 author: author name 1011 description: description of the test 1012 name: test name 1013 time: short, medium, long 1014 test_class: This describes the class for your the test belongs in. 1015 test_category: This describes the category for your tests 1016 test_type: Client or Server 1017 path: path to pass to run_test() 1018 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 1019 async job. If it's >1 it's sync job for that number of machines 1020 i.e. if sync_count = 2 it is a sync job that requires two 1021 machines. 1022 Optional: 1023 dependencies: What the test requires to run. Comma deliminated list 1024 dependency_labels: many-to-many relationship with labels corresponding to 1025 test dependencies. 1026 experimental: If this is set to True production servers will ignore the test 1027 run_verify: Whether or not the scheduler should run the verify stage 1028 run_reset: Whether or not the scheduler should run the reset stage 1029 test_retry: Number of times to retry test if the test did not complete 1030 successfully. (optional, default: 0) 1031 """ 1032 TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1) 1033 1034 name = dbmodels.CharField(max_length=255, unique=True) 1035 author = dbmodels.CharField(max_length=255) 1036 test_class = dbmodels.CharField(max_length=255) 1037 test_category = dbmodels.CharField(max_length=255) 1038 dependencies = dbmodels.CharField(max_length=255, blank=True) 1039 description = dbmodels.TextField(blank=True) 1040 experimental = dbmodels.BooleanField(default=True) 1041 run_verify = dbmodels.BooleanField(default=False) 1042 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 1043 default=TestTime.MEDIUM) 1044 test_type = dbmodels.SmallIntegerField( 1045 choices=control_data.CONTROL_TYPE.choices()) 1046 sync_count = dbmodels.IntegerField(default=1) 1047 path = dbmodels.CharField(max_length=255, unique=True) 1048 test_retry = dbmodels.IntegerField(blank=True, default=0) 1049 run_reset = dbmodels.BooleanField(default=True) 1050 1051 dependency_labels = ( 1052 dbmodels.ManyToManyField(Label, blank=True, 1053 db_table='afe_autotests_dependency_labels')) 1054 name_field = 'name' 1055 objects = model_logic.ExtendedManager() 1056 1057 1058 def admin_description(self): 1059 """Returns a string representing the admin description.""" 1060 escaped_description = saxutils.escape(self.description) 1061 return '<span style="white-space:pre">%s</span>' % escaped_description 1062 admin_description.allow_tags = True 1063 admin_description.short_description = 'Description' 1064 1065 1066 class Meta: 1067 """Metadata for class Test.""" 1068 db_table = 'afe_autotests' 1069 1070 def __unicode__(self): 1071 return unicode(self.name) 1072 1073 1074class TestParameter(dbmodels.Model): 1075 """ 1076 A declared parameter of a test 1077 """ 1078 test = dbmodels.ForeignKey(Test) 1079 name = dbmodels.CharField(max_length=255) 1080 1081 class Meta: 1082 """Metadata for class TestParameter.""" 1083 db_table = 'afe_test_parameters' 1084 unique_together = ('test', 'name') 1085 1086 def __unicode__(self): 1087 return u'%s (%s)' % (self.name, self.test.name) 1088 1089 1090class Profiler(dbmodels.Model, model_logic.ModelExtensions): 1091 """\ 1092 Required: 1093 name: profiler name 1094 test_type: Client or Server 1095 1096 Optional: 1097 description: arbirary text description 1098 """ 1099 name = dbmodels.CharField(max_length=255, unique=True) 1100 description = dbmodels.TextField(blank=True) 1101 1102 name_field = 'name' 1103 objects = model_logic.ExtendedManager() 1104 1105 1106 class Meta: 1107 """Metadata for class Profiler.""" 1108 db_table = 'afe_profilers' 1109 1110 def __unicode__(self): 1111 return unicode(self.name) 1112 1113 1114class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 1115 """\ 1116 Required: 1117 name: name of ACL group 1118 1119 Optional: 1120 description: arbitrary description of group 1121 """ 1122 1123 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 1124 1125 name = dbmodels.CharField(max_length=255, unique=True) 1126 description = dbmodels.CharField(max_length=255, blank=True) 1127 users = dbmodels.ManyToManyField(User, blank=False, 1128 db_table='afe_acl_groups_users') 1129 hosts = dbmodels.ManyToManyField(Host, blank=True, 1130 db_table='afe_acl_groups_hosts') 1131 1132 name_field = 'name' 1133 objects = model_logic.ExtendedManager() 1134 1135 @staticmethod 1136 def check_for_acl_violation_hosts(hosts): 1137 """Verify the current user has access to the specified hosts. 1138 1139 @param hosts: The hosts to verify against. 1140 @raises AclAccessViolation if the current user doesn't have access 1141 to a host. 1142 """ 1143 user = User.current_user() 1144 if user.is_superuser(): 1145 return 1146 accessible_host_ids = set( 1147 host.id for host in Host.objects.filter(aclgroup__users=user)) 1148 for host in hosts: 1149 # Check if the user has access to this host, 1150 # but only if it is not a metahost or a one-time-host. 1151 no_access = (isinstance(host, Host) 1152 and not host.invalid 1153 and int(host.id) not in accessible_host_ids) 1154 if no_access: 1155 raise AclAccessViolation("%s does not have access to %s" % 1156 (str(user), str(host))) 1157 1158 1159 @staticmethod 1160 def check_abort_permissions(queue_entries): 1161 """Look for queue entries that aren't abortable by the current user. 1162 1163 An entry is not abortable if: 1164 * the job isn't owned by this user, and 1165 * the machine isn't ACL-accessible, or 1166 * the machine is in the "Everyone" ACL 1167 1168 @param queue_entries: The queue entries to check. 1169 @raises AclAccessViolation if a queue entry is not abortable by the 1170 current user. 1171 """ 1172 user = User.current_user() 1173 if user.is_superuser(): 1174 return 1175 not_owned = queue_entries.exclude(job__owner=user.login) 1176 # I do this using ID sets instead of just Django filters because 1177 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 1178 # 1.0. 1179 # TODO: Use Django filters, now that we're using 1.0. 1180 accessible_ids = set( 1181 entry.id for entry 1182 in not_owned.filter(host__aclgroup__users__login=user.login)) 1183 public_ids = set(entry.id for entry 1184 in not_owned.filter(host__aclgroup__name='Everyone')) 1185 cannot_abort = [entry for entry in not_owned.select_related() 1186 if entry.id not in accessible_ids 1187 or entry.id in public_ids] 1188 if len(cannot_abort) == 0: 1189 return 1190 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 1191 entry.host_or_metahost_name()) 1192 for entry in cannot_abort) 1193 raise AclAccessViolation('You cannot abort the following job entries: ' 1194 + entry_names) 1195 1196 1197 def check_for_acl_violation_acl_group(self): 1198 """Verifies the current user has acces to this ACL group. 1199 1200 @raises AclAccessViolation if the current user doesn't have access to 1201 this ACL group. 1202 """ 1203 user = User.current_user() 1204 if user.is_superuser(): 1205 return 1206 if self.name == 'Everyone': 1207 raise AclAccessViolation("You cannot modify 'Everyone'!") 1208 if not user in self.users.all(): 1209 raise AclAccessViolation("You do not have access to %s" 1210 % self.name) 1211 1212 @staticmethod 1213 def on_host_membership_change(): 1214 """Invoked when host membership changes.""" 1215 everyone = AclGroup.objects.get(name='Everyone') 1216 1217 # find hosts that aren't in any ACL group and add them to Everyone 1218 # TODO(showard): this is a bit of a hack, since the fact that this query 1219 # works is kind of a coincidence of Django internals. This trick 1220 # doesn't work in general (on all foreign key relationships). I'll 1221 # replace it with a better technique when the need arises. 1222 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 1223 everyone.hosts.add(*orphaned_hosts.distinct()) 1224 1225 # find hosts in both Everyone and another ACL group, and remove them 1226 # from Everyone 1227 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 1228 acled_hosts = set() 1229 for host in hosts_in_everyone: 1230 # Has an ACL group other than Everyone 1231 if host.aclgroup_set.count() > 1: 1232 acled_hosts.add(host) 1233 everyone.hosts.remove(*acled_hosts) 1234 1235 1236 def delete(self): 1237 if (self.name == 'Everyone'): 1238 raise AclAccessViolation("You cannot delete 'Everyone'!") 1239 self.check_for_acl_violation_acl_group() 1240 super(AclGroup, self).delete() 1241 self.on_host_membership_change() 1242 1243 1244 def add_current_user_if_empty(self): 1245 """Adds the current user if the set of users is empty.""" 1246 if not self.users.count(): 1247 self.users.add(User.current_user()) 1248 1249 1250 def perform_after_save(self, change): 1251 """Called after a save. 1252 1253 @param change: Whether there was a change. 1254 """ 1255 if not change: 1256 self.users.add(User.current_user()) 1257 self.add_current_user_if_empty() 1258 self.on_host_membership_change() 1259 1260 1261 def save(self, *args, **kwargs): 1262 change = bool(self.id) 1263 if change: 1264 # Check the original object for an ACL violation 1265 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 1266 super(AclGroup, self).save(*args, **kwargs) 1267 self.perform_after_save(change) 1268 1269 1270 class Meta: 1271 """Metadata for class AclGroup.""" 1272 db_table = 'afe_acl_groups' 1273 1274 def __unicode__(self): 1275 return unicode(self.name) 1276 1277 1278class ParameterizedJob(dbmodels.Model): 1279 """ 1280 Auxiliary configuration for a parameterized job. 1281 1282 This class is obsolete, and ought to be dead. Due to a series of 1283 unfortunate events, it can't be deleted: 1284 * In `class Job` we're required to keep a reference to this class 1285 for the sake of the scheduler unit tests. 1286 * The existence of the reference in `Job` means that certain 1287 methods here will get called from the `get_jobs` RPC. 1288 So, the definitions below seem to be the minimum stub we can support 1289 unless/until we change the database schema. 1290 """ 1291 1292 @classmethod 1293 def smart_get(cls, id_or_name, *args, **kwargs): 1294 """For compatibility with Job.add_object. 1295 1296 @param cls: Implicit class object. 1297 @param id_or_name: The ID or name to get. 1298 @param args: Non-keyword arguments. 1299 @param kwargs: Keyword arguments. 1300 """ 1301 return cls.objects.get(pk=id_or_name) 1302 1303 1304 def job(self): 1305 """Returns the job if it exists, or else None.""" 1306 jobs = self.job_set.all() 1307 assert jobs.count() <= 1 1308 return jobs and jobs[0] or None 1309 1310 1311 class Meta: 1312 """Metadata for class ParameterizedJob.""" 1313 db_table = 'afe_parameterized_jobs' 1314 1315 def __unicode__(self): 1316 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 1317 1318 1319class JobManager(model_logic.ExtendedManager): 1320 'Custom manager to provide efficient status counts querying.' 1321 def get_status_counts(self, job_ids): 1322 """Returns a dict mapping the given job IDs to their status count dicts. 1323 1324 @param job_ids: A list of job IDs. 1325 """ 1326 if not job_ids: 1327 return {} 1328 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1329 cursor = connection.cursor() 1330 cursor.execute(""" 1331 SELECT job_id, status, aborted, complete, COUNT(*) 1332 FROM afe_host_queue_entries 1333 WHERE job_id IN %s 1334 GROUP BY job_id, status, aborted, complete 1335 """ % id_list) 1336 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1337 for job_id, status, aborted, complete, count in cursor.fetchall(): 1338 job_dict = all_job_counts[job_id] 1339 full_status = HostQueueEntry.compute_full_status(status, aborted, 1340 complete) 1341 job_dict.setdefault(full_status, 0) 1342 job_dict[full_status] += count 1343 return all_job_counts 1344 1345 1346class Job(dbmodels.Model, model_logic.ModelExtensions): 1347 """\ 1348 owner: username of job owner 1349 name: job name (does not have to be unique) 1350 priority: Integer priority value. Higher is more important. 1351 control_file: contents of control file 1352 control_type: Client or Server 1353 created_on: date of job creation 1354 submitted_on: date of job submission 1355 synch_count: how many hosts should be used per autoserv execution 1356 run_verify: Whether or not to run the verify phase 1357 run_reset: Whether or not to run the reset phase 1358 timeout: DEPRECATED - hours from queuing time until job times out 1359 timeout_mins: minutes from job queuing time until the job times out 1360 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1361 times out 1362 max_runtime_mins: minutes from job starting time until job times out 1363 email_list: list of people to email on completion delimited by any of: 1364 white space, ',', ':', ';' 1365 dependency_labels: many-to-many relationship with labels corresponding to 1366 job dependencies 1367 reboot_before: Never, If dirty, or Always 1368 reboot_after: Never, If all tests passed, or Always 1369 parse_failed_repair: if True, a failed repair launched by this job will have 1370 its results parsed as part of the job. 1371 drone_set: The set of drones to run this job on 1372 parent_job: Parent job (optional) 1373 test_retry: Number of times to retry test if the test did not complete 1374 successfully. (optional, default: 0) 1375 require_ssp: Require server-side packaging unless require_ssp is set to 1376 False. (optional, default: None) 1377 """ 1378 1379 # TODO: Investigate, if jobkeyval_set is really needed. 1380 # dynamic_suite will write them into an attached file for the drone, but 1381 # it doesn't seem like they are actually used. If they aren't used, remove 1382 # jobkeyval_set here. 1383 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1384 'hostqueueentry_set', 1385 'jobkeyval_set', 1386 'shard']) 1387 1388 # SQL for selecting jobs that should be sent to shard. 1389 # We use raw sql as django filters were not optimized. 1390 # The following jobs are excluded by the SQL. 1391 # - Non-aborted jobs known to shard as specified in |known_ids|. 1392 # Note for jobs aborted on master, even if already known to shard, 1393 # will be sent to shard again so that shard can abort them. 1394 # - Completed jobs 1395 # - Active jobs 1396 # - Jobs without host_queue_entries 1397 NON_ABORTED_KNOWN_JOBS = '(t2.aborted = 0 AND t1.id IN (%(known_ids)s))' 1398 1399 SQL_SHARD_JOBS = ( 1400 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1401 'INNER JOIN afe_host_queue_entries t2 ON ' 1402 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1403 ' %(check_known_jobs)s) ' 1404 'LEFT OUTER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) ' 1405 'JOIN afe_shards_labels t4 ' 1406 ' ON (t4.label_id = t3.label_id OR t4.label_id = t2.meta_host) ' 1407 'WHERE t4.shard_id = %(shard_id)s' 1408 ) 1409 1410 # Jobs can be created with assigned hosts and have no dependency 1411 # labels nor meta_host. 1412 # We are looking for: 1413 # - a job whose hqe's meta_host is null 1414 # - a job whose hqe has a host 1415 # - one of the host's labels matches the shard's label. 1416 # Non-aborted known jobs, completed jobs, active jobs, jobs 1417 # without hqe are exluded as we do with SQL_SHARD_JOBS. 1418 SQL_SHARD_JOBS_WITH_HOSTS = ( 1419 'SELECT DISTINCT(t1.id) FROM afe_jobs t1 ' 1420 'INNER JOIN afe_host_queue_entries t2 ON ' 1421 ' (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 ' 1422 ' AND t2.meta_host IS NULL AND t2.host_id IS NOT NULL ' 1423 ' %(check_known_jobs)s) ' 1424 'LEFT OUTER JOIN %(host_label_table)s t3 ON (t2.host_id = t3.host_id) ' 1425 'WHERE (t3.%(host_label_column)s IN %(label_ids)s)' 1426 ) 1427 1428 # Even if we had filters about complete, active and aborted 1429 # bits in the above two SQLs, there is a chance that 1430 # the result may still contain a job with an hqe with 'complete=1' 1431 # or 'active=1' or 'aborted=0 and afe_job.id in known jobs.' 1432 # This happens when a job has two (or more) hqes and at least 1433 # one hqe has different bits than others. 1434 # We use a second sql to ensure we exclude all un-desired jobs. 1435 SQL_JOBS_TO_EXCLUDE =( 1436 'SELECT t1.id FROM afe_jobs t1 ' 1437 'INNER JOIN afe_host_queue_entries t2 ON ' 1438 ' (t1.id = t2.job_id) ' 1439 'WHERE (t1.id in (%(candidates)s) ' 1440 ' AND (t2.complete=1 OR t2.active=1 ' 1441 ' %(check_known_jobs)s))' 1442 ) 1443 1444 def _deserialize_relation(self, link, data): 1445 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1446 for obj in data: 1447 obj['job_id'] = self.id 1448 1449 super(Job, self)._deserialize_relation(link, data) 1450 1451 1452 def custom_deserialize_relation(self, link, data): 1453 assert link == 'shard', 'Link %s should not be deserialized' % link 1454 self.shard = Shard.deserialize(data) 1455 1456 1457 def sanity_check_update_from_shard(self, shard, updated_serialized): 1458 # If the job got aborted on the master after the client fetched it 1459 # no shard_id will be set. The shard might still push updates though, 1460 # as the job might complete before the abort bit syncs to the shard. 1461 # Alternative considered: The master scheduler could be changed to not 1462 # set aborted jobs to completed that are sharded out. But that would 1463 # require database queries and seemed more complicated to implement. 1464 # This seems safe to do, as there won't be updates pushed from the wrong 1465 # shards should be powered off and wiped hen they are removed from the 1466 # master. 1467 if self.shard_id and self.shard_id != shard.id: 1468 raise error.IgnorableUnallowedRecordsSentToMaster( 1469 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1470 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1471 shard.id)) 1472 1473 1474 RebootBefore = model_attributes.RebootBefore 1475 RebootAfter = model_attributes.RebootAfter 1476 # TIMEOUT is deprecated. 1477 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1478 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1479 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1480 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1481 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1482 # completed. 1483 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1484 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1485 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1486 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1487 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1488 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, 1489 default=False) 1490 1491 owner = dbmodels.CharField(max_length=255) 1492 name = dbmodels.CharField(max_length=255) 1493 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1494 control_file = dbmodels.TextField(null=True, blank=True) 1495 control_type = dbmodels.SmallIntegerField( 1496 choices=control_data.CONTROL_TYPE.choices(), 1497 blank=True, # to allow 0 1498 default=control_data.CONTROL_TYPE.CLIENT) 1499 created_on = dbmodels.DateTimeField() 1500 synch_count = dbmodels.IntegerField(blank=True, default=0) 1501 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1502 run_verify = dbmodels.BooleanField(default=False) 1503 email_list = dbmodels.CharField(max_length=250, blank=True) 1504 dependency_labels = ( 1505 dbmodels.ManyToManyField(Label, blank=True, 1506 db_table='afe_jobs_dependency_labels')) 1507 reboot_before = dbmodels.SmallIntegerField( 1508 choices=model_attributes.RebootBefore.choices(), blank=True, 1509 default=DEFAULT_REBOOT_BEFORE) 1510 reboot_after = dbmodels.SmallIntegerField( 1511 choices=model_attributes.RebootAfter.choices(), blank=True, 1512 default=DEFAULT_REBOOT_AFTER) 1513 parse_failed_repair = dbmodels.BooleanField( 1514 default=DEFAULT_PARSE_FAILED_REPAIR) 1515 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1516 # completed. 1517 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1518 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1519 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1520 1521 # TODO(jrbarnette) We have to keep `parameterized_job` around or it 1522 # breaks the scheduler_models unit tests (and fixing the unit tests 1523 # will break the scheduler, so don't do that). 1524 # 1525 # The ultimate fix is to delete the column from the database table 1526 # at which point, you _must_ delete this. Until you're ready to do 1527 # that, DON'T MUCK WITH IT. 1528 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1529 blank=True) 1530 1531 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1532 1533 test_retry = dbmodels.IntegerField(blank=True, default=0) 1534 1535 run_reset = dbmodels.BooleanField(default=True) 1536 1537 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1538 1539 # If this is None on the master, a slave should be found. 1540 # If this is None on a slave, it should be synced back to the master 1541 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1542 1543 # If this is None, server-side packaging will be used for server side test, 1544 # unless it's disabled in global config AUTOSERV/enable_ssp_container. 1545 require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True) 1546 1547 # custom manager 1548 objects = JobManager() 1549 1550 1551 @decorators.cached_property 1552 def labels(self): 1553 """All the labels of this job""" 1554 # We need to convert dependency_labels to a list, because all() gives us 1555 # back an iterator, and storing/caching an iterator means we'd only be 1556 # able to read from it once. 1557 return list(self.dependency_labels.all()) 1558 1559 1560 def is_server_job(self): 1561 """Returns whether this job is of type server.""" 1562 return self.control_type == control_data.CONTROL_TYPE.SERVER 1563 1564 1565 @classmethod 1566 def create(cls, owner, options, hosts): 1567 """Creates a job. 1568 1569 The job is created by taking some information (the listed args) and 1570 filling in the rest of the necessary information. 1571 1572 @param cls: Implicit class object. 1573 @param owner: The owner for the job. 1574 @param options: An options object. 1575 @param hosts: The hosts to use. 1576 """ 1577 AclGroup.check_for_acl_violation_hosts(hosts) 1578 1579 control_file = options.get('control_file') 1580 1581 user = User.current_user() 1582 if options.get('reboot_before') is None: 1583 options['reboot_before'] = user.get_reboot_before_display() 1584 if options.get('reboot_after') is None: 1585 options['reboot_after'] = user.get_reboot_after_display() 1586 1587 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1588 1589 if options.get('timeout_mins') is None and options.get('timeout'): 1590 options['timeout_mins'] = options['timeout'] * 60 1591 1592 job = cls.add_object( 1593 owner=owner, 1594 name=options['name'], 1595 priority=options['priority'], 1596 control_file=control_file, 1597 control_type=options['control_type'], 1598 synch_count=options.get('synch_count'), 1599 # timeout needs to be deleted in the future. 1600 timeout=options.get('timeout'), 1601 timeout_mins=options.get('timeout_mins'), 1602 max_runtime_mins=options.get('max_runtime_mins'), 1603 run_verify=options.get('run_verify'), 1604 email_list=options.get('email_list'), 1605 reboot_before=options.get('reboot_before'), 1606 reboot_after=options.get('reboot_after'), 1607 parse_failed_repair=options.get('parse_failed_repair'), 1608 created_on=datetime.now(), 1609 drone_set=drone_set, 1610 parent_job=options.get('parent_job_id'), 1611 test_retry=options.get('test_retry'), 1612 run_reset=options.get('run_reset'), 1613 require_ssp=options.get('require_ssp')) 1614 1615 job.dependency_labels = options['dependencies'] 1616 1617 if options.get('keyvals'): 1618 for key, value in options['keyvals'].iteritems(): 1619 JobKeyval.objects.create(job=job, key=key, value=value) 1620 1621 return job 1622 1623 1624 @classmethod 1625 def assign_to_shard(cls, shard, known_ids): 1626 """Assigns unassigned jobs to a shard. 1627 1628 For all labels that have been assigned to this shard, all jobs that 1629 have this label, are assigned to this shard. 1630 1631 Jobs that are assigned to the shard but aren't already present on the 1632 shard are returned. 1633 1634 @param shard: The shard to assign jobs to. 1635 @param known_ids: List of all ids of incomplete jobs, the shard already 1636 knows about. 1637 This is used to figure out which jobs should be sent 1638 to the shard. If shard_ids were used instead, jobs 1639 would only be transferred once, even if the client 1640 failed persisting them. 1641 The number of unfinished jobs usually lies in O(1000). 1642 Assuming one id takes 8 chars in the json, this means 1643 overhead that lies in the lower kilobyte range. 1644 A not in query with 5000 id's takes about 30ms. 1645 1646 @returns The job objects that should be sent to the shard. 1647 """ 1648 # Disclaimer: Concurrent heartbeats should not occur in today's setup. 1649 # If this changes or they are triggered manually, this applies: 1650 # Jobs may be returned more than once by concurrent calls of this 1651 # function, as there is a race condition between SELECT and UPDATE. 1652 job_ids = set([]) 1653 check_known_jobs_exclude = '' 1654 check_known_jobs_include = '' 1655 1656 if known_ids: 1657 check_known_jobs = ( 1658 cls.NON_ABORTED_KNOWN_JOBS % 1659 {'known_ids': ','.join([str(i) for i in known_ids])}) 1660 check_known_jobs_exclude = 'AND NOT ' + check_known_jobs 1661 check_known_jobs_include = 'OR ' + check_known_jobs 1662 1663 query = Job.objects.raw(cls.SQL_SHARD_JOBS % { 1664 'check_known_jobs': check_known_jobs_exclude, 1665 'shard_id': shard.id}) 1666 job_ids |= set([j.id for j in query]) 1667 1668 static_labels, non_static_labels = Host.classify_label_objects( 1669 shard.labels.all()) 1670 if static_labels: 1671 label_ids = [str(l.id) for l in static_labels] 1672 query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 1673 'check_known_jobs': check_known_jobs_exclude, 1674 'host_label_table': 'afe_static_hosts_labels', 1675 'host_label_column': 'staticlabel_id', 1676 'label_ids': '(%s)' % ','.join(label_ids)}) 1677 job_ids |= set([j.id for j in query]) 1678 1679 if non_static_labels: 1680 label_ids = [str(l.id) for l in non_static_labels] 1681 query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 1682 'check_known_jobs': check_known_jobs_exclude, 1683 'host_label_table': 'afe_hosts_labels', 1684 'host_label_column': 'label_id', 1685 'label_ids': '(%s)' % ','.join(label_ids)}) 1686 job_ids |= set([j.id for j in query]) 1687 1688 if job_ids: 1689 query = Job.objects.raw( 1690 cls.SQL_JOBS_TO_EXCLUDE % 1691 {'check_known_jobs': check_known_jobs_include, 1692 'candidates': ','.join([str(i) for i in job_ids])}) 1693 job_ids -= set([j.id for j in query]) 1694 1695 if job_ids: 1696 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1697 return list(Job.objects.filter(pk__in=job_ids).all()) 1698 return [] 1699 1700 1701 def queue(self, hosts, is_template=False): 1702 """Enqueue a job on the given hosts. 1703 1704 @param hosts: The hosts to use. 1705 @param is_template: Whether the status should be "Template". 1706 """ 1707 if not hosts: 1708 # hostless job 1709 entry = HostQueueEntry.create(job=self, is_template=is_template) 1710 entry.save() 1711 return 1712 1713 for host in hosts: 1714 host.enqueue_job(self, is_template=is_template) 1715 1716 1717 def user(self): 1718 """Gets the user of this job, or None if it doesn't exist.""" 1719 try: 1720 return User.objects.get(login=self.owner) 1721 except self.DoesNotExist: 1722 return None 1723 1724 1725 def abort(self): 1726 """Aborts this job.""" 1727 for queue_entry in self.hostqueueentry_set.all(): 1728 queue_entry.abort() 1729 1730 1731 def tag(self): 1732 """Returns a string tag for this job.""" 1733 return server_utils.get_job_tag(self.id, self.owner) 1734 1735 1736 def keyval_dict(self): 1737 """Returns all keyvals for this job as a dictionary.""" 1738 return dict((keyval.key, keyval.value) 1739 for keyval in self.jobkeyval_set.all()) 1740 1741 1742 @classmethod 1743 def get_attribute_model(cls): 1744 """Return the attribute model. 1745 1746 Override method in parent class. This class is called when 1747 deserializing the one-to-many relationship betwen Job and JobKeyval. 1748 On deserialization, we will try to clear any existing job keyvals 1749 associated with a job to avoid any inconsistency. 1750 Though Job doesn't implement ModelWithAttribute, we still treat 1751 it as an attribute model for this purpose. 1752 1753 @returns: The attribute model of Job. 1754 """ 1755 return JobKeyval 1756 1757 1758 class Meta: 1759 """Metadata for class Job.""" 1760 db_table = 'afe_jobs' 1761 1762 def __unicode__(self): 1763 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1764 1765 1766class JobHandoff(dbmodels.Model, model_logic.ModelExtensions): 1767 """Jobs that have been handed off to lucifer.""" 1768 1769 job = dbmodels.OneToOneField(Job, on_delete=dbmodels.CASCADE, 1770 primary_key=True) 1771 created = dbmodels.DateTimeField(auto_now_add=True) 1772 completed = dbmodels.BooleanField(default=False) 1773 drone = dbmodels.CharField( 1774 max_length=128, null=True, 1775 help_text=''' 1776The hostname of the drone the job is running on and whose job_aborter 1777should be responsible for aborting the job if the job process dies. 1778NULL means any drone's job_aborter has free reign to abort the job. 1779''') 1780 1781 class Meta: 1782 """Metadata for class Job.""" 1783 db_table = 'afe_job_handoffs' 1784 1785 1786class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1787 """Keyvals associated with jobs""" 1788 1789 SERIALIZATION_LINKS_TO_KEEP = set(['job']) 1790 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 1791 1792 job = dbmodels.ForeignKey(Job) 1793 key = dbmodels.CharField(max_length=90) 1794 value = dbmodels.CharField(max_length=300) 1795 1796 objects = model_logic.ExtendedManager() 1797 1798 1799 @classmethod 1800 def get_record(cls, data): 1801 """Check the database for an identical record. 1802 1803 Use job_id and key to search for a existing record. 1804 1805 @raises: DoesNotExist, if no record found 1806 @raises: MultipleObjectsReturned if multiple records found. 1807 """ 1808 # TODO(fdeng): We should use job_id and key together as 1809 # a primary key in the db. 1810 return cls.objects.get(job_id=data['job_id'], key=data['key']) 1811 1812 1813 @classmethod 1814 def deserialize(cls, data): 1815 """Override deserialize in parent class. 1816 1817 Do not deserialize id as id is not kept consistent on master and shards. 1818 1819 @param data: A dictionary of data to deserialize. 1820 1821 @returns: A JobKeyval object. 1822 """ 1823 if data: 1824 data.pop('id') 1825 return super(JobKeyval, cls).deserialize(data) 1826 1827 1828 class Meta: 1829 """Metadata for class JobKeyval.""" 1830 db_table = 'afe_job_keyvals' 1831 1832 1833class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1834 """Represents an ineligible host queue.""" 1835 job = dbmodels.ForeignKey(Job) 1836 host = dbmodels.ForeignKey(Host) 1837 1838 objects = model_logic.ExtendedManager() 1839 1840 class Meta: 1841 """Metadata for class IneligibleHostQueue.""" 1842 db_table = 'afe_ineligible_host_queues' 1843 1844 1845class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1846 """Represents a host queue entry.""" 1847 1848 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1849 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 1850 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) 1851 1852 1853 def custom_deserialize_relation(self, link, data): 1854 assert link == 'meta_host' 1855 self.meta_host = Label.deserialize(data) 1856 1857 1858 def sanity_check_update_from_shard(self, shard, updated_serialized, 1859 job_ids_sent): 1860 if self.job_id not in job_ids_sent: 1861 raise error.IgnorableUnallowedRecordsSentToMaster( 1862 'Sent HostQueueEntry without corresponding ' 1863 'job entry: %s' % updated_serialized) 1864 1865 1866 Status = host_queue_entry_states.Status 1867 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1868 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1869 PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES 1870 IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES 1871 1872 job = dbmodels.ForeignKey(Job) 1873 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1874 status = dbmodels.CharField(max_length=255) 1875 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1876 db_column='meta_host') 1877 active = dbmodels.BooleanField(default=False) 1878 complete = dbmodels.BooleanField(default=False) 1879 deleted = dbmodels.BooleanField(default=False) 1880 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1881 default='') 1882 # If atomic_group is set, this is a virtual HostQueueEntry that will 1883 # be expanded into many actual hosts within the group at schedule time. 1884 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1885 aborted = dbmodels.BooleanField(default=False) 1886 started_on = dbmodels.DateTimeField(null=True, blank=True) 1887 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1888 1889 objects = model_logic.ExtendedManager() 1890 1891 1892 def __init__(self, *args, **kwargs): 1893 super(HostQueueEntry, self).__init__(*args, **kwargs) 1894 self._record_attributes(['status']) 1895 1896 1897 @classmethod 1898 def create(cls, job, host=None, meta_host=None, 1899 is_template=False): 1900 """Creates a new host queue entry. 1901 1902 @param cls: Implicit class object. 1903 @param job: The associated job. 1904 @param host: The associated host. 1905 @param meta_host: The associated meta host. 1906 @param is_template: Whether the status should be "Template". 1907 """ 1908 if is_template: 1909 status = cls.Status.TEMPLATE 1910 else: 1911 status = cls.Status.QUEUED 1912 1913 return cls(job=job, host=host, meta_host=meta_host, status=status) 1914 1915 1916 def save(self, *args, **kwargs): 1917 self._set_active_and_complete() 1918 super(HostQueueEntry, self).save(*args, **kwargs) 1919 self._check_for_updated_attributes() 1920 1921 1922 def execution_path(self): 1923 """ 1924 Path to this entry's results (relative to the base results directory). 1925 """ 1926 return server_utils.get_hqe_exec_path(self.job.tag(), 1927 self.execution_subdir) 1928 1929 1930 def host_or_metahost_name(self): 1931 """Returns the first non-None name found in priority order. 1932 1933 The priority order checked is: (1) host name; (2) meta host name 1934 """ 1935 if self.host: 1936 return self.host.hostname 1937 else: 1938 assert self.meta_host 1939 return self.meta_host.name 1940 1941 1942 def _set_active_and_complete(self): 1943 if self.status in self.ACTIVE_STATUSES: 1944 self.active, self.complete = True, False 1945 elif self.status in self.COMPLETE_STATUSES: 1946 self.active, self.complete = False, True 1947 else: 1948 self.active, self.complete = False, False 1949 1950 1951 def on_attribute_changed(self, attribute, old_value): 1952 assert attribute == 'status' 1953 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 1954 self.status) 1955 1956 1957 def is_meta_host_entry(self): 1958 'True if this is a entry has a meta_host instead of a host.' 1959 return self.host is None and self.meta_host is not None 1960 1961 1962 # This code is shared between rpc_interface and models.HostQueueEntry. 1963 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 1964 # a class method was the best way to refactor it. Attempting to put it in 1965 # rpc_utils or a new utils module failed as that would require us to import 1966 # models.py but to call it from here we would have to import the utils.py 1967 # thus creating a cycle. 1968 @classmethod 1969 def abort_host_queue_entries(cls, host_queue_entries): 1970 """Aborts a collection of host_queue_entries. 1971 1972 Abort these host queue entry and all host queue entries of jobs created 1973 by them. 1974 1975 @param host_queue_entries: List of host queue entries we want to abort. 1976 """ 1977 # This isn't completely immune to race conditions since it's not atomic, 1978 # but it should be safe given the scheduler's behavior. 1979 1980 # TODO(milleral): crbug.com/230100 1981 # The |abort_host_queue_entries| rpc does nearly exactly this, 1982 # however, trying to re-use the code generates some horrible 1983 # circular import error. I'd be nice to refactor things around 1984 # sometime so the code could be reused. 1985 1986 # Fixpoint algorithm to find the whole tree of HQEs to abort to 1987 # minimize the total number of database queries: 1988 children = set() 1989 new_children = set(host_queue_entries) 1990 while new_children: 1991 children.update(new_children) 1992 new_child_ids = [hqe.job_id for hqe in new_children] 1993 new_children = HostQueueEntry.objects.filter( 1994 job__parent_job__in=new_child_ids, 1995 complete=False, aborted=False).all() 1996 # To handle circular parental relationships 1997 new_children = set(new_children) - children 1998 1999 # Associate a user with the host queue entries that we're about 2000 # to abort so that we can look up who to blame for the aborts. 2001 now = datetime.now() 2002 user = User.current_user() 2003 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 2004 aborted_by=user, aborted_on=now) for hqe in children] 2005 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 2006 # Bulk update all of the HQEs to set the abort bit. 2007 child_ids = [hqe.id for hqe in children] 2008 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 2009 2010 2011 def abort(self): 2012 """ Aborts this host queue entry. 2013 2014 Abort this host queue entry and all host queue entries of jobs created by 2015 this one. 2016 2017 """ 2018 if not self.complete and not self.aborted: 2019 HostQueueEntry.abort_host_queue_entries([self]) 2020 2021 2022 @classmethod 2023 def compute_full_status(cls, status, aborted, complete): 2024 """Returns a modified status msg if the host queue entry was aborted. 2025 2026 @param cls: Implicit class object. 2027 @param status: The original status message. 2028 @param aborted: Whether the host queue entry was aborted. 2029 @param complete: Whether the host queue entry was completed. 2030 """ 2031 if aborted and not complete: 2032 return 'Aborted (%s)' % status 2033 return status 2034 2035 2036 def full_status(self): 2037 """Returns the full status of this host queue entry, as a string.""" 2038 return self.compute_full_status(self.status, self.aborted, 2039 self.complete) 2040 2041 2042 def _postprocess_object_dict(self, object_dict): 2043 object_dict['full_status'] = self.full_status() 2044 2045 2046 class Meta: 2047 """Metadata for class HostQueueEntry.""" 2048 db_table = 'afe_host_queue_entries' 2049 2050 2051 2052 def __unicode__(self): 2053 hostname = None 2054 if self.host: 2055 hostname = self.host.hostname 2056 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 2057 2058 2059class HostQueueEntryStartTimes(dbmodels.Model): 2060 """An auxilary table to HostQueueEntry to index by start time.""" 2061 insert_time = dbmodels.DateTimeField() 2062 highest_hqe_id = dbmodels.IntegerField() 2063 2064 class Meta: 2065 """Metadata for class HostQueueEntryStartTimes.""" 2066 db_table = 'afe_host_queue_entry_start_times' 2067 2068 2069class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 2070 """Represents an aborted host queue entry.""" 2071 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 2072 aborted_by = dbmodels.ForeignKey(User) 2073 aborted_on = dbmodels.DateTimeField() 2074 2075 objects = model_logic.ExtendedManager() 2076 2077 2078 def save(self, *args, **kwargs): 2079 self.aborted_on = datetime.now() 2080 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 2081 2082 class Meta: 2083 """Metadata for class AbortedHostQueueEntry.""" 2084 db_table = 'afe_aborted_host_queue_entries' 2085 2086 2087class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 2088 """\ 2089 Tasks to run on hosts at the next time they are in the Ready state. Use this 2090 for high-priority tasks, such as forced repair or forced reinstall. 2091 2092 host: host to run this task on 2093 task: special task to run 2094 time_requested: date and time the request for this task was made 2095 is_active: task is currently running 2096 is_complete: task has finished running 2097 is_aborted: task was aborted 2098 time_started: date and time the task started 2099 time_finished: date and time the task finished 2100 queue_entry: Host queue entry waiting on this task (or None, if task was not 2101 started in preparation of a job) 2102 """ 2103 Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision', 2104 string_values=True) 2105 2106 host = dbmodels.ForeignKey(Host, blank=False, null=False) 2107 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 2108 blank=False, null=False) 2109 requested_by = dbmodels.ForeignKey(User) 2110 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 2111 null=False) 2112 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 2113 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 2114 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 2115 time_started = dbmodels.DateTimeField(null=True, blank=True) 2116 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 2117 success = dbmodels.BooleanField(default=False, blank=False, null=False) 2118 time_finished = dbmodels.DateTimeField(null=True, blank=True) 2119 2120 objects = model_logic.ExtendedManager() 2121 2122 2123 def save(self, **kwargs): 2124 if self.queue_entry: 2125 self.requested_by = User.objects.get( 2126 login=self.queue_entry.job.owner) 2127 super(SpecialTask, self).save(**kwargs) 2128 2129 2130 def execution_path(self): 2131 """Returns the execution path for a special task.""" 2132 return server_utils.get_special_task_exec_path( 2133 self.host.hostname, self.id, self.task, self.time_requested) 2134 2135 2136 # property to emulate HostQueueEntry.status 2137 @property 2138 def status(self): 2139 """Returns a host queue entry status appropriate for a speical task.""" 2140 return server_utils.get_special_task_status( 2141 self.is_complete, self.success, self.is_active) 2142 2143 2144 # property to emulate HostQueueEntry.started_on 2145 @property 2146 def started_on(self): 2147 """Returns the time at which this special task started.""" 2148 return self.time_started 2149 2150 2151 @classmethod 2152 def schedule_special_task(cls, host, task): 2153 """Schedules a special task on a host if not already scheduled. 2154 2155 @param cls: Implicit class object. 2156 @param host: The host to use. 2157 @param task: The task to schedule. 2158 """ 2159 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 2160 is_active=False, 2161 is_complete=False) 2162 if existing_tasks: 2163 return existing_tasks[0] 2164 2165 special_task = SpecialTask(host=host, task=task, 2166 requested_by=User.current_user()) 2167 special_task.save() 2168 return special_task 2169 2170 2171 def abort(self): 2172 """ Abort this special task.""" 2173 self.is_aborted = True 2174 self.save() 2175 2176 2177 def activate(self): 2178 """ 2179 Sets a task as active and sets the time started to the current time. 2180 """ 2181 logging.info('Starting: %s', self) 2182 self.is_active = True 2183 self.time_started = datetime.now() 2184 self.save() 2185 2186 2187 def finish(self, success): 2188 """Sets a task as completed. 2189 2190 @param success: Whether or not the task was successful. 2191 """ 2192 logging.info('Finished: %s', self) 2193 self.is_active = False 2194 self.is_complete = True 2195 self.success = success 2196 if self.time_started: 2197 self.time_finished = datetime.now() 2198 self.save() 2199 2200 2201 class Meta: 2202 """Metadata for class SpecialTask.""" 2203 db_table = 'afe_special_tasks' 2204 2205 2206 def __unicode__(self): 2207 result = u'Special Task %s (host %s, task %s, time %s)' % ( 2208 self.id, self.host, self.task, self.time_requested) 2209 if self.is_complete: 2210 result += u' (completed)' 2211 elif self.is_active: 2212 result += u' (active)' 2213 2214 return result 2215 2216 2217class StableVersion(dbmodels.Model, model_logic.ModelExtensions): 2218 2219 board = dbmodels.CharField(max_length=255, unique=True) 2220 version = dbmodels.CharField(max_length=255) 2221 2222 class Meta: 2223 """Metadata for class StableVersion.""" 2224 db_table = 'afe_stable_versions' 2225