1# pylint: disable=missing-docstring 2 3import contextlib 4from datetime import datetime 5from datetime import timedelta 6import logging 7import os 8 9import django.core 10try: 11 from django.db import models as dbmodels, connection 12except django.core.exceptions.ImproperlyConfigured: 13 raise ImportError('Django database not yet configured. Import either ' 14 'setup_django_environment or ' 15 'setup_django_lite_environment from ' 16 'autotest_lib.frontend before any imports that ' 17 'depend on django models.') 18from xml.sax import saxutils 19import common 20from autotest_lib.frontend.afe import model_logic, model_attributes 21from autotest_lib.frontend.afe import rdb_model_extensions 22from autotest_lib.frontend import settings, thread_local 23from autotest_lib.client.common_lib import autotest_enum, error, host_protections 24from autotest_lib.client.common_lib import global_config 25from autotest_lib.client.common_lib import host_queue_entry_states 26from autotest_lib.client.common_lib import control_data, priorities, decorators 27from autotest_lib.server import utils as server_utils 28 29# job options and user preferences 30DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 31DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 32 33RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 34 'SKYLAB', 'respect_static_labels', type=bool, default=False) 35 36RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value( 37 'SKYLAB', 'respect_static_attributes', type=bool, default=False) 38 39 40class AclAccessViolation(Exception): 41 """\ 42 Raised when an operation is attempted with proper permissions as 43 dictated by ACLs. 44 """ 45 46 47class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 48 """\ 49 An atomic group defines a collection of hosts which must only be scheduled 50 all at once. Any host with a label having an atomic group will only be 51 scheduled for a job at the same time as other hosts sharing that label. 52 53 Required: 54 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 55 max_number_of_machines: The maximum number of machines that will be 56 scheduled at once when scheduling jobs to this atomic group. 57 The job.synch_count is considered the minimum. 58 59 Optional: 60 description: Arbitrary text description of this group's purpose. 61 """ 62 name = dbmodels.CharField(max_length=255, unique=True) 63 description = dbmodels.TextField(blank=True) 64 # This magic value is the default to simplify the scheduler logic. 65 # It must be "large". The common use of atomic groups is to want all 66 # machines in the group to be used, limits on which subset used are 67 # often chosen via dependency labels. 68 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 69 # "infinity" is around 3.3 million. 70 INFINITE_MACHINES = 333333333 71 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 72 invalid = dbmodels.BooleanField(default=False, 73 editable=settings.FULL_ADMIN) 74 75 name_field = 'name' 76 objects = model_logic.ModelWithInvalidManager() 77 valid_objects = model_logic.ValidObjectsManager() 78 79 80 def enqueue_job(self, job, is_template=False): 81 """Enqueue a job on an associated atomic group of hosts. 82 83 @param job: A job to enqueue. 84 @param is_template: Whether the status should be "Template". 85 """ 86 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 87 is_template=is_template) 88 queue_entry.save() 89 90 91 def clean_object(self): 92 self.label_set.clear() 93 94 95 class Meta: 96 """Metadata for class AtomicGroup.""" 97 db_table = 'afe_atomic_groups' 98 99 100 def __unicode__(self): 101 return unicode(self.name) 102 103 104class Label(model_logic.ModelWithInvalid, dbmodels.Model): 105 """\ 106 Required: 107 name: label name 108 109 Optional: 110 kernel_config: URL/path to kernel config for jobs run on this label. 111 platform: If True, this is a platform label (defaults to False). 112 only_if_needed: If True, a Host with this label can only be used if that 113 label is requested by the job/test (either as the meta_host or 114 in the job_dependencies). 115 atomic_group: The atomic group associated with this label. 116 """ 117 name = dbmodels.CharField(max_length=255, unique=True) 118 kernel_config = dbmodels.CharField(max_length=255, blank=True) 119 platform = dbmodels.BooleanField(default=False) 120 invalid = dbmodels.BooleanField(default=False, 121 editable=settings.FULL_ADMIN) 122 only_if_needed = dbmodels.BooleanField(default=False) 123 124 name_field = 'name' 125 objects = model_logic.ModelWithInvalidManager() 126 valid_objects = model_logic.ValidObjectsManager() 127 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 128 129 130 def clean_object(self): 131 self.host_set.clear() 132 self.test_set.clear() 133 134 135 def enqueue_job(self, job, is_template=False): 136 """Enqueue a job on any host of this label. 137 138 @param job: A job to enqueue. 139 @param is_template: Whether the status should be "Template". 140 """ 141 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 142 is_template=is_template) 143 queue_entry.save() 144 145 146 147 class Meta: 148 """Metadata for class Label.""" 149 db_table = 'afe_labels' 150 151 152 def __unicode__(self): 153 return unicode(self.name) 154 155 156 def is_replaced_by_static(self): 157 """Detect whether a label is replaced by a static label. 158 159 'Static' means it can only be modified by skylab inventory tools. 160 """ 161 if RESPECT_STATIC_LABELS: 162 replaced = ReplacedLabel.objects.filter(label__id=self.id) 163 if len(replaced) > 0: 164 return True 165 166 return False 167 168 169class StaticLabel(model_logic.ModelWithInvalid, dbmodels.Model): 170 """\ 171 Required: 172 name: label name 173 174 Optional: 175 kernel_config: URL/path to kernel config for jobs run on this label. 176 platform: If True, this is a platform label (defaults to False). 177 only_if_needed: Deprecated. This is always False. 178 atomic_group: Deprecated. This is always NULL. 179 """ 180 name = dbmodels.CharField(max_length=255, unique=True) 181 kernel_config = dbmodels.CharField(max_length=255, blank=True) 182 platform = dbmodels.BooleanField(default=False) 183 invalid = dbmodels.BooleanField(default=False, 184 editable=settings.FULL_ADMIN) 185 only_if_needed = dbmodels.BooleanField(default=False) 186 187 name_field = 'name' 188 objects = model_logic.ModelWithInvalidManager() 189 valid_objects = model_logic.ValidObjectsManager() 190 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 191 192 def clean_object(self): 193 self.host_set.clear() 194 self.test_set.clear() 195 196 197 class Meta: 198 """Metadata for class StaticLabel.""" 199 db_table = 'afe_static_labels' 200 201 202 def __unicode__(self): 203 return unicode(self.name) 204 205 206class ReplacedLabel(dbmodels.Model, model_logic.ModelExtensions): 207 """The tag to indicate Whether to replace labels with static labels.""" 208 label = dbmodels.ForeignKey(Label) 209 objects = model_logic.ExtendedManager() 210 211 212 class Meta: 213 """Metadata for class ReplacedLabel.""" 214 db_table = 'afe_replaced_labels' 215 216 217 def __unicode__(self): 218 return unicode(self.label) 219 220 221class Shard(dbmodels.Model, model_logic.ModelExtensions): 222 223 hostname = dbmodels.CharField(max_length=255, unique=True) 224 225 name_field = 'hostname' 226 227 labels = dbmodels.ManyToManyField(Label, blank=True, 228 db_table='afe_shards_labels') 229 230 class Meta: 231 """Metadata for class ParameterizedJob.""" 232 db_table = 'afe_shards' 233 234 235class Drone(dbmodels.Model, model_logic.ModelExtensions): 236 """ 237 A scheduler drone 238 239 hostname: the drone's hostname 240 """ 241 hostname = dbmodels.CharField(max_length=255, unique=True) 242 243 name_field = 'hostname' 244 objects = model_logic.ExtendedManager() 245 246 247 def save(self, *args, **kwargs): 248 if not User.current_user().is_superuser(): 249 raise Exception('Only superusers may edit drones') 250 super(Drone, self).save(*args, **kwargs) 251 252 253 def delete(self): 254 if not User.current_user().is_superuser(): 255 raise Exception('Only superusers may delete drones') 256 super(Drone, self).delete() 257 258 259 class Meta: 260 """Metadata for class Drone.""" 261 db_table = 'afe_drones' 262 263 def __unicode__(self): 264 return unicode(self.hostname) 265 266 267class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 268 """ 269 A set of scheduler drones 270 271 These will be used by the scheduler to decide what drones a job is allowed 272 to run on. 273 274 name: the drone set's name 275 drones: the drones that are part of the set 276 """ 277 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 278 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 279 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 280 'SCHEDULER', 'default_drone_set_name', default=None) 281 282 name = dbmodels.CharField(max_length=255, unique=True) 283 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 284 285 name_field = 'name' 286 objects = model_logic.ExtendedManager() 287 288 289 def save(self, *args, **kwargs): 290 if not User.current_user().is_superuser(): 291 raise Exception('Only superusers may edit drone sets') 292 super(DroneSet, self).save(*args, **kwargs) 293 294 295 def delete(self): 296 if not User.current_user().is_superuser(): 297 raise Exception('Only superusers may delete drone sets') 298 super(DroneSet, self).delete() 299 300 301 @classmethod 302 def drone_sets_enabled(cls): 303 """Returns whether drone sets are enabled. 304 305 @param cls: Implicit class object. 306 """ 307 return cls.DRONE_SETS_ENABLED 308 309 310 @classmethod 311 def default_drone_set_name(cls): 312 """Returns the default drone set name. 313 314 @param cls: Implicit class object. 315 """ 316 return cls.DEFAULT_DRONE_SET_NAME 317 318 319 @classmethod 320 def get_default(cls): 321 """Gets the default drone set name, compatible with Job.add_object. 322 323 @param cls: Implicit class object. 324 """ 325 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 326 327 328 @classmethod 329 def resolve_name(cls, drone_set_name): 330 """ 331 Returns the name of one of these, if not None, in order of preference: 332 1) the drone set given, 333 2) the current user's default drone set, or 334 3) the global default drone set 335 336 or returns None if drone sets are disabled 337 338 @param cls: Implicit class object. 339 @param drone_set_name: A drone set name. 340 """ 341 if not cls.drone_sets_enabled(): 342 return None 343 344 user = User.current_user() 345 user_drone_set_name = user.drone_set and user.drone_set.name 346 347 return drone_set_name or user_drone_set_name or cls.get_default().name 348 349 350 def get_drone_hostnames(self): 351 """ 352 Gets the hostnames of all drones in this drone set 353 """ 354 return set(self.drones.all().values_list('hostname', flat=True)) 355 356 357 class Meta: 358 """Metadata for class DroneSet.""" 359 db_table = 'afe_drone_sets' 360 361 def __unicode__(self): 362 return unicode(self.name) 363 364 365class User(dbmodels.Model, model_logic.ModelExtensions): 366 """\ 367 Required: 368 login :user login name 369 370 Optional: 371 access_level: 0=User (default), 1=Admin, 100=Root 372 """ 373 ACCESS_ROOT = 100 374 ACCESS_ADMIN = 1 375 ACCESS_USER = 0 376 377 AUTOTEST_SYSTEM = 'autotest_system' 378 379 login = dbmodels.CharField(max_length=255, unique=True) 380 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 381 382 # user preferences 383 reboot_before = dbmodels.SmallIntegerField( 384 choices=model_attributes.RebootBefore.choices(), blank=True, 385 default=DEFAULT_REBOOT_BEFORE) 386 reboot_after = dbmodels.SmallIntegerField( 387 choices=model_attributes.RebootAfter.choices(), blank=True, 388 default=DEFAULT_REBOOT_AFTER) 389 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 390 show_experimental = dbmodels.BooleanField(default=False) 391 392 name_field = 'login' 393 objects = model_logic.ExtendedManager() 394 395 396 def save(self, *args, **kwargs): 397 # is this a new object being saved for the first time? 398 first_time = (self.id is None) 399 user = thread_local.get_user() 400 if user and not user.is_superuser() and user.login != self.login: 401 raise AclAccessViolation("You cannot modify user " + self.login) 402 super(User, self).save(*args, **kwargs) 403 if first_time: 404 everyone = AclGroup.objects.get(name='Everyone') 405 everyone.users.add(self) 406 407 408 def is_superuser(self): 409 """Returns whether the user has superuser access.""" 410 return self.access_level >= self.ACCESS_ROOT 411 412 413 @classmethod 414 def current_user(cls): 415 """Returns the current user. 416 417 @param cls: Implicit class object. 418 """ 419 user = thread_local.get_user() 420 if user is None: 421 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 422 user.access_level = cls.ACCESS_ROOT 423 user.save() 424 return user 425 426 427 @classmethod 428 def get_record(cls, data): 429 """Check the database for an identical record. 430 431 Check for a record with matching id and login. If one exists, 432 return it. If one does not exist there is a possibility that 433 the following cases have happened: 434 1. Same id, different login 435 We received: "1 chromeos-test" 436 And we have: "1 debug-user" 437 In this case we need to delete "1 debug_user" and insert 438 "1 chromeos-test". 439 440 2. Same login, different id: 441 We received: "1 chromeos-test" 442 And we have: "2 chromeos-test" 443 In this case we need to delete "2 chromeos-test" and insert 444 "1 chromeos-test". 445 446 As long as this method deletes bad records and raises the 447 DoesNotExist exception the caller will handle creating the 448 new record. 449 450 @raises: DoesNotExist, if a record with the matching login and id 451 does not exist. 452 """ 453 454 # Both the id and login should be uniqe but there are cases when 455 # we might already have a user with the same login/id because 456 # current_user will proactively create a user record if it doesn't 457 # exist. Since we want to avoid conflict between the main and 458 # shard, just delete any existing user records that don't match 459 # what we're about to deserialize from the main. 460 try: 461 return cls.objects.get(login=data['login'], id=data['id']) 462 except cls.DoesNotExist: 463 cls.delete_matching_record(login=data['login']) 464 cls.delete_matching_record(id=data['id']) 465 raise 466 467 468 class Meta: 469 """Metadata for class User.""" 470 db_table = 'afe_users' 471 472 def __unicode__(self): 473 return unicode(self.login) 474 475 476class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 477 model_logic.ModelWithAttributes): 478 """\ 479 Required: 480 hostname 481 482 optional: 483 locked: if true, host is locked and will not be queued 484 485 Internal: 486 From AbstractHostModel: 487 status: string describing status of host 488 invalid: true if the host has been deleted 489 protection: indicates what can be done to this host during repair 490 lock_time: DateTime at which the host was locked 491 dirty: true if the host has been used without being rebooted 492 Local: 493 locked_by: user that locked the host, or null if the host is unlocked 494 """ 495 496 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 497 'hostattribute_set', 498 'labels', 499 'shard']) 500 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) 501 502 503 def custom_deserialize_relation(self, link, data): 504 assert link == 'shard', 'Link %s should not be deserialized' % link 505 self.shard = Shard.deserialize(data) 506 507 508 # Note: Only specify foreign keys here, specify all native host columns in 509 # rdb_model_extensions instead. 510 Protection = host_protections.Protection 511 labels = dbmodels.ManyToManyField(Label, blank=True, 512 db_table='afe_hosts_labels') 513 static_labels = dbmodels.ManyToManyField( 514 StaticLabel, blank=True, db_table='afe_static_hosts_labels') 515 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 516 name_field = 'hostname' 517 objects = model_logic.ModelWithInvalidManager() 518 valid_objects = model_logic.ValidObjectsManager() 519 leased_objects = model_logic.LeasedHostManager() 520 521 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 522 523 def __init__(self, *args, **kwargs): 524 super(Host, self).__init__(*args, **kwargs) 525 self._record_attributes(['status']) 526 527 528 @classmethod 529 def classify_labels(cls, label_names): 530 """Split labels to static & non-static. 531 532 @label_names: a list of labels (string). 533 534 @returns: a list of StaticLabel objects & a list of 535 (non-static) Label objects. 536 """ 537 if not label_names: 538 return [], [] 539 540 labels = Label.objects.filter(name__in=label_names) 541 542 if not RESPECT_STATIC_LABELS: 543 return [], labels 544 545 return cls.classify_label_objects(labels) 546 547 548 @classmethod 549 def classify_label_objects(cls, label_objects): 550 if not RESPECT_STATIC_LABELS: 551 return [], label_objects 552 553 replaced_labels = ReplacedLabel.objects.filter(label__in=label_objects) 554 replaced_ids = [l.label.id for l in replaced_labels] 555 non_static_labels = [ 556 l for l in label_objects if not l.id in replaced_ids] 557 static_label_names = [ 558 l.name for l in label_objects if l.id in replaced_ids] 559 static_labels = StaticLabel.objects.filter(name__in=static_label_names) 560 return static_labels, non_static_labels 561 562 563 @classmethod 564 def get_hosts_with_labels(cls, label_names, initial_query): 565 """Get hosts by label filters. 566 567 @param label_names: label (string) lists for fetching hosts. 568 @param initial_query: a model_logic.QuerySet of Host object, e.g. 569 570 Host.objects.all(), Host.valid_objects.all(). 571 572 This initial_query cannot be a sliced QuerySet, e.g. 573 574 Host.objects.all().filter(query_limit=10) 575 """ 576 if not label_names: 577 return initial_query 578 579 static_labels, non_static_labels = cls.classify_labels(label_names) 580 if len(static_labels) + len(non_static_labels) != len(label_names): 581 # Some labels don't exist in afe db, which means no hosts 582 # should be matched. 583 return set() 584 585 for l in static_labels: 586 initial_query = initial_query.filter(static_labels=l) 587 588 for l in non_static_labels: 589 initial_query = initial_query.filter(labels=l) 590 591 return initial_query 592 593 594 @classmethod 595 def get_hosts_with_label_ids(cls, label_ids, initial_query): 596 """Get hosts by label_id filters. 597 598 @param label_ids: label id (int) lists for fetching hosts. 599 @param initial_query: a list of Host object, e.g. 600 [<Host: 100.107.151.253>, <Host: 100.107.151.251>, ...] 601 """ 602 labels = Label.objects.filter(id__in=label_ids) 603 label_names = [l.name for l in labels] 604 return cls.get_hosts_with_labels(label_names, initial_query) 605 606 607 @staticmethod 608 def create_one_time_host(hostname): 609 """Creates a one-time host. 610 611 @param hostname: The name for the host. 612 """ 613 query = Host.objects.filter(hostname=hostname) 614 if query.count() == 0: 615 host = Host(hostname=hostname, invalid=True) 616 host.do_validate() 617 else: 618 host = query[0] 619 if not host.invalid: 620 raise model_logic.ValidationError({ 621 'hostname' : '%s already exists in the autotest DB. ' 622 'Select it rather than entering it as a one time ' 623 'host.' % hostname 624 }) 625 host.protection = host_protections.Protection.DO_NOT_REPAIR 626 host.locked = False 627 host.save() 628 host.clean_object() 629 return host 630 631 632 @classmethod 633 def _assign_to_shard_nothing_helper(cls): 634 """Does nothing. 635 636 This method is called in the middle of assign_to_shard, and does 637 nothing. It exists to allow integration tests to simulate a race 638 condition.""" 639 640 641 @classmethod 642 def assign_to_shard(cls, shard, known_ids): 643 """Assigns hosts to a shard. 644 645 For all labels that have been assigned to a shard, all hosts that 646 have at least one of the shard's labels are assigned to the shard. 647 Hosts that are assigned to the shard but aren't already present on the 648 shard are returned. 649 650 Any boards that are in |known_ids| but that do not belong to the shard 651 are incorrect ids, which are also returned so that the shard can remove 652 them locally. 653 654 Board to shard mapping is many-to-one. Many different boards can be 655 hosted in a shard. However, DUTs of a single board cannot be distributed 656 into more than one shard. 657 658 @param shard: The shard object to assign labels/hosts for. 659 @param known_ids: List of all host-ids the shard already knows. 660 This is used to figure out which hosts should be sent 661 to the shard. If shard_ids were used instead, hosts 662 would only be transferred once, even if the client 663 failed persisting them. 664 The number of hosts usually lies in O(100), so the 665 overhead is acceptable. 666 667 @returns a tuple of (hosts objects that should be sent to the shard, 668 incorrect host ids that should not belong to] 669 shard) 670 """ 671 # Disclaimer: concurrent heartbeats should theoretically not occur in 672 # the current setup. As they may be introduced in the near future, 673 # this comment will be left here. 674 675 # Sending stuff twice is acceptable, but forgetting something isn't. 676 # Detecting duplicates on the client is easy, but here it's harder. The 677 # following options were considered: 678 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 679 # than select returned, as concurrently more hosts might have been 680 # inserted 681 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 682 # hosts for the shard, this is overhead 683 # - SELECT and then UPDATE only selected without requerying afterwards: 684 # returns the old state of the records. 685 new_hosts = [] 686 687 possible_new_host_ids = set(Host.objects.filter( 688 labels__in=shard.labels.all(), 689 leased=False 690 ).exclude( 691 id__in=known_ids, 692 ).values_list('pk', flat=True)) 693 694 # No-op in production, used to simulate race condition in tests. 695 cls._assign_to_shard_nothing_helper() 696 697 if possible_new_host_ids: 698 Host.objects.filter( 699 pk__in=possible_new_host_ids, 700 labels__in=shard.labels.all(), 701 leased=False 702 ).update(shard=shard) 703 new_hosts = list(Host.objects.filter( 704 pk__in=possible_new_host_ids, 705 shard=shard 706 ).all()) 707 708 invalid_host_ids = list(Host.objects.filter( 709 id__in=known_ids 710 ).exclude( 711 shard=shard 712 ).values_list('pk', flat=True)) 713 714 return new_hosts, invalid_host_ids 715 716 def resurrect_object(self, old_object): 717 super(Host, self).resurrect_object(old_object) 718 # invalid hosts can be in use by the scheduler (as one-time hosts), so 719 # don't change the status 720 self.status = old_object.status 721 722 723 def clean_object(self): 724 self.aclgroup_set.clear() 725 self.labels.clear() 726 self.static_labels.clear() 727 728 729 def save(self, *args, **kwargs): 730 # extra spaces in the hostname can be a sneaky source of errors 731 self.hostname = self.hostname.strip() 732 # is this a new object being saved for the first time? 733 first_time = (self.id is None) 734 if not first_time: 735 AclGroup.check_for_acl_violation_hosts([self]) 736 # If locked is changed, send its status and user made the change to 737 # metaDB. Locks are important in host history because if a device is 738 # locked then we don't really care what state it is in. 739 if self.locked and not self.locked_by: 740 self.locked_by = User.current_user() 741 if not self.lock_time: 742 self.lock_time = datetime.now() 743 self.dirty = True 744 elif not self.locked and self.locked_by: 745 self.locked_by = None 746 self.lock_time = None 747 super(Host, self).save(*args, **kwargs) 748 if first_time: 749 everyone = AclGroup.objects.get(name='Everyone') 750 everyone.hosts.add(self) 751 # remove attributes that may have lingered from an old host and 752 # should not be associated with a new host 753 for host_attribute in self.hostattribute_set.all(): 754 self.delete_attribute(host_attribute.attribute) 755 self._check_for_updated_attributes() 756 757 758 def delete(self): 759 AclGroup.check_for_acl_violation_hosts([self]) 760 logging.info('Preconditions for deleting host %s...', self.hostname) 761 for queue_entry in self.hostqueueentry_set.all(): 762 logging.info(' Deleting and aborting hqe %s...', queue_entry) 763 queue_entry.deleted = True 764 queue_entry.abort() 765 logging.info(' ... done with hqe %s.', queue_entry) 766 for host_attribute in self.hostattribute_set.all(): 767 logging.info(' Deleting attribute %s...', host_attribute) 768 self.delete_attribute(host_attribute.attribute) 769 logging.info(' ... done with attribute %s.', host_attribute) 770 logging.info('... preconditions done for host %s.', self.hostname) 771 logging.info('Deleting host %s...', self.hostname) 772 super(Host, self).delete() 773 logging.info('... done.') 774 775 776 def on_attribute_changed(self, attribute, old_value): 777 assert attribute == 'status' 778 logging.info('%s -> %s', self.hostname, self.status) 779 780 781 def enqueue_job(self, job, is_template=False): 782 """Enqueue a job on this host. 783 784 @param job: A job to enqueue. 785 @param is_template: Whther the status should be "Template". 786 """ 787 queue_entry = HostQueueEntry.create(host=self, job=job, 788 is_template=is_template) 789 # allow recovery of dead hosts from the frontend 790 if not self.active_queue_entry() and self.is_dead(): 791 self.status = Host.Status.READY 792 self.save() 793 queue_entry.save() 794 795 block = IneligibleHostQueue(job=job, host=self) 796 block.save() 797 798 799 def platform(self): 800 """The platform of the host.""" 801 # TODO(showard): slighly hacky? 802 platforms = self.labels.filter(platform=True) 803 if len(platforms) == 0: 804 return None 805 return platforms[0] 806 platform.short_description = 'Platform' 807 808 809 @classmethod 810 def check_no_platform(cls, hosts): 811 """Verify the specified hosts have no associated platforms. 812 813 @param cls: Implicit class object. 814 @param hosts: The hosts to verify. 815 @raises model_logic.ValidationError if any hosts already have a 816 platform. 817 """ 818 Host.objects.populate_relationships(hosts, Label, 'label_list') 819 Host.objects.populate_relationships(hosts, StaticLabel, 820 'staticlabel_list') 821 errors = [] 822 for host in hosts: 823 platforms = [label.name for label in host.label_list 824 if label.platform] 825 if RESPECT_STATIC_LABELS: 826 platforms += [label.name for label in host.staticlabel_list 827 if label.platform] 828 829 if platforms: 830 # do a join, just in case this host has multiple platforms, 831 # we'll be able to see it 832 errors.append('Host %s already has a platform: %s' % ( 833 host.hostname, ', '.join(platforms))) 834 if errors: 835 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 836 837 838 @classmethod 839 def check_board_labels_allowed(cls, hosts, new_labels=[]): 840 """Verify the specified hosts have valid board labels and the given 841 new board labels can be added. 842 843 @param cls: Implicit class object. 844 @param hosts: The hosts to verify. 845 @param new_labels: A list of labels to be added to the hosts. 846 847 @raises model_logic.ValidationError if any host has invalid board labels 848 or the given board labels cannot be added to the hsots. 849 """ 850 Host.objects.populate_relationships(hosts, Label, 'label_list') 851 Host.objects.populate_relationships(hosts, StaticLabel, 852 'staticlabel_list') 853 errors = [] 854 for host in hosts: 855 boards = [label.name for label in host.label_list 856 if label.name.startswith('board:')] 857 if RESPECT_STATIC_LABELS: 858 boards += [label.name for label in host.staticlabel_list 859 if label.name.startswith('board:')] 860 861 new_boards = [name for name in new_labels 862 if name.startswith('board:')] 863 if len(boards) + len(new_boards) > 1: 864 # do a join, just in case this host has multiple boards, 865 # we'll be able to see it 866 errors.append('Host %s already has board labels: %s' % ( 867 host.hostname, ', '.join(boards))) 868 if errors: 869 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 870 871 872 def is_dead(self): 873 """Returns whether the host is dead (has status repair failed).""" 874 return self.status == Host.Status.REPAIR_FAILED 875 876 877 def active_queue_entry(self): 878 """Returns the active queue entry for this host, or None if none.""" 879 active = list(self.hostqueueentry_set.filter(active=True)) 880 if not active: 881 return None 882 assert len(active) == 1, ('More than one active entry for ' 883 'host ' + self.hostname) 884 return active[0] 885 886 887 def _get_attribute_model_and_args(self, attribute): 888 return HostAttribute, dict(host=self, attribute=attribute) 889 890 891 def _get_static_attribute_model_and_args(self, attribute): 892 return StaticHostAttribute, dict(host=self, attribute=attribute) 893 894 895 def _is_replaced_by_static_attribute(self, attribute): 896 if RESPECT_STATIC_ATTRIBUTES: 897 model, args = self._get_static_attribute_model_and_args(attribute) 898 try: 899 static_attr = model.objects.get(**args) 900 return True 901 except StaticHostAttribute.DoesNotExist: 902 return False 903 904 return False 905 906 907 @classmethod 908 def get_attribute_model(cls): 909 """Return the attribute model. 910 911 Override method in parent class. See ModelExtensions for details. 912 @returns: The attribute model of Host. 913 """ 914 return HostAttribute 915 916 917 class Meta: 918 """Metadata for the Host class.""" 919 db_table = 'afe_hosts' 920 921 922 def __unicode__(self): 923 return unicode(self.hostname) 924 925 926class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): 927 """Arbitrary keyvals associated with hosts.""" 928 929 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 930 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 931 host = dbmodels.ForeignKey(Host) 932 attribute = dbmodels.CharField(max_length=90) 933 value = dbmodels.CharField(max_length=300) 934 935 objects = model_logic.ExtendedManager() 936 937 class Meta: 938 """Metadata for the HostAttribute class.""" 939 db_table = 'afe_host_attributes' 940 941 942 @classmethod 943 def get_record(cls, data): 944 """Check the database for an identical record. 945 946 Use host_id and attribute to search for a existing record. 947 948 @raises: DoesNotExist, if no record found 949 @raises: MultipleObjectsReturned if multiple records found. 950 """ 951 # TODO(fdeng): We should use host_id and attribute together as 952 # a primary key in the db. 953 return cls.objects.get(host_id=data['host_id'], 954 attribute=data['attribute']) 955 956 957 @classmethod 958 def deserialize(cls, data): 959 """Override deserialize in parent class. 960 961 Do not deserialize id as id is not kept consistent on main and shards. 962 963 @param data: A dictionary of data to deserialize. 964 965 @returns: A HostAttribute object. 966 """ 967 if data: 968 data.pop('id') 969 return super(HostAttribute, cls).deserialize(data) 970 971 972class StaticHostAttribute(dbmodels.Model, model_logic.ModelExtensions): 973 """Static arbitrary keyvals associated with hosts.""" 974 975 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 976 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 977 host = dbmodels.ForeignKey(Host) 978 attribute = dbmodels.CharField(max_length=90) 979 value = dbmodels.CharField(max_length=300) 980 981 objects = model_logic.ExtendedManager() 982 983 class Meta: 984 """Metadata for the StaticHostAttribute class.""" 985 db_table = 'afe_static_host_attributes' 986 987 988 @classmethod 989 def get_record(cls, data): 990 """Check the database for an identical record. 991 992 Use host_id and attribute to search for a existing record. 993 994 @raises: DoesNotExist, if no record found 995 @raises: MultipleObjectsReturned if multiple records found. 996 """ 997 return cls.objects.get(host_id=data['host_id'], 998 attribute=data['attribute']) 999 1000 1001 @classmethod 1002 def deserialize(cls, data): 1003 """Override deserialize in parent class. 1004 1005 Do not deserialize id as id is not kept consistent on main and shards. 1006 1007 @param data: A dictionary of data to deserialize. 1008 1009 @returns: A StaticHostAttribute object. 1010 """ 1011 if data: 1012 data.pop('id') 1013 return super(StaticHostAttribute, cls).deserialize(data) 1014 1015 1016class Test(dbmodels.Model, model_logic.ModelExtensions): 1017 """\ 1018 Required: 1019 author: author name 1020 description: description of the test 1021 name: test name 1022 time: short, medium, long 1023 test_class: This describes the class for your the test belongs in. 1024 test_category: This describes the category for your tests 1025 test_type: Client or Server 1026 path: path to pass to run_test() 1027 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 1028 async job. If it's >1 it's sync job for that number of machines 1029 i.e. if sync_count = 2 it is a sync job that requires two 1030 machines. 1031 Optional: 1032 dependencies: What the test requires to run. Comma deliminated list 1033 dependency_labels: many-to-many relationship with labels corresponding to 1034 test dependencies. 1035 experimental: If this is set to True production servers will ignore the test 1036 run_verify: Whether or not the scheduler should run the verify stage 1037 run_reset: Whether or not the scheduler should run the reset stage 1038 test_retry: Number of times to retry test if the test did not complete 1039 successfully. (optional, default: 0) 1040 """ 1041 TestTime = autotest_enum.AutotestEnum('SHORT', 'MEDIUM', 'LONG', 1042 start_value=1) 1043 1044 name = dbmodels.CharField(max_length=255, unique=True) 1045 author = dbmodels.CharField(max_length=255) 1046 test_class = dbmodels.CharField(max_length=255) 1047 test_category = dbmodels.CharField(max_length=255) 1048 dependencies = dbmodels.CharField(max_length=255, blank=True) 1049 description = dbmodels.TextField(blank=True) 1050 experimental = dbmodels.BooleanField(default=True) 1051 run_verify = dbmodels.BooleanField(default=False) 1052 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 1053 default=TestTime.MEDIUM) 1054 test_type = dbmodels.SmallIntegerField( 1055 choices=control_data.CONTROL_TYPE.choices()) 1056 sync_count = dbmodels.IntegerField(default=1) 1057 path = dbmodels.CharField(max_length=255, unique=True) 1058 test_retry = dbmodels.IntegerField(blank=True, default=0) 1059 run_reset = dbmodels.BooleanField(default=True) 1060 1061 dependency_labels = ( 1062 dbmodels.ManyToManyField(Label, blank=True, 1063 db_table='afe_autotests_dependency_labels')) 1064 name_field = 'name' 1065 objects = model_logic.ExtendedManager() 1066 1067 1068 def admin_description(self): 1069 """Returns a string representing the admin description.""" 1070 escaped_description = saxutils.escape(self.description) 1071 return '<span style="white-space:pre">%s</span>' % escaped_description 1072 admin_description.allow_tags = True 1073 admin_description.short_description = 'Description' 1074 1075 1076 class Meta: 1077 """Metadata for class Test.""" 1078 db_table = 'afe_autotests' 1079 1080 def __unicode__(self): 1081 return unicode(self.name) 1082 1083 1084class TestParameter(dbmodels.Model): 1085 """ 1086 A declared parameter of a test 1087 """ 1088 test = dbmodels.ForeignKey(Test) 1089 name = dbmodels.CharField(max_length=255) 1090 1091 class Meta: 1092 """Metadata for class TestParameter.""" 1093 db_table = 'afe_test_parameters' 1094 unique_together = ('test', 'name') 1095 1096 def __unicode__(self): 1097 return u'%s (%s)' % (self.name, self.test.name) 1098 1099 1100class Profiler(dbmodels.Model, model_logic.ModelExtensions): 1101 """\ 1102 Required: 1103 name: profiler name 1104 test_type: Client or Server 1105 1106 Optional: 1107 description: arbirary text description 1108 """ 1109 name = dbmodels.CharField(max_length=255, unique=True) 1110 description = dbmodels.TextField(blank=True) 1111 1112 name_field = 'name' 1113 objects = model_logic.ExtendedManager() 1114 1115 1116 class Meta: 1117 """Metadata for class Profiler.""" 1118 db_table = 'afe_profilers' 1119 1120 def __unicode__(self): 1121 return unicode(self.name) 1122 1123 1124class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 1125 """\ 1126 Required: 1127 name: name of ACL group 1128 1129 Optional: 1130 description: arbitrary description of group 1131 """ 1132 1133 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 1134 1135 name = dbmodels.CharField(max_length=255, unique=True) 1136 description = dbmodels.CharField(max_length=255, blank=True) 1137 users = dbmodels.ManyToManyField(User, blank=False, 1138 db_table='afe_acl_groups_users') 1139 hosts = dbmodels.ManyToManyField(Host, blank=True, 1140 db_table='afe_acl_groups_hosts') 1141 1142 name_field = 'name' 1143 objects = model_logic.ExtendedManager() 1144 1145 @staticmethod 1146 def check_for_acl_violation_hosts(hosts): 1147 """Verify the current user has access to the specified hosts. 1148 1149 @param hosts: The hosts to verify against. 1150 @raises AclAccessViolation if the current user doesn't have access 1151 to a host. 1152 """ 1153 user = User.current_user() 1154 if user.is_superuser(): 1155 return 1156 accessible_host_ids = set( 1157 host.id for host in Host.objects.filter(aclgroup__users=user)) 1158 for host in hosts: 1159 # Check if the user has access to this host, 1160 # but only if it is not a metahost or a one-time-host. 1161 no_access = (isinstance(host, Host) 1162 and not host.invalid 1163 and int(host.id) not in accessible_host_ids) 1164 if no_access: 1165 raise AclAccessViolation("%s does not have access to %s" % 1166 (str(user), str(host))) 1167 1168 1169 @staticmethod 1170 def check_abort_permissions(queue_entries): 1171 """Look for queue entries that aren't abortable by the current user. 1172 1173 An entry is not abortable if: 1174 * the job isn't owned by this user, and 1175 * the machine isn't ACL-accessible, or 1176 * the machine is in the "Everyone" ACL 1177 1178 @param queue_entries: The queue entries to check. 1179 @raises AclAccessViolation if a queue entry is not abortable by the 1180 current user. 1181 """ 1182 user = User.current_user() 1183 if user.is_superuser(): 1184 return 1185 not_owned = queue_entries.exclude(job__owner=user.login) 1186 # I do this using ID sets instead of just Django filters because 1187 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 1188 # 1.0. 1189 # TODO: Use Django filters, now that we're using 1.0. 1190 accessible_ids = set( 1191 entry.id for entry 1192 in not_owned.filter(host__aclgroup__users__login=user.login)) 1193 public_ids = set(entry.id for entry 1194 in not_owned.filter(host__aclgroup__name='Everyone')) 1195 cannot_abort = [entry for entry in not_owned.select_related() 1196 if entry.id not in accessible_ids 1197 or entry.id in public_ids] 1198 if len(cannot_abort) == 0: 1199 return 1200 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 1201 entry.host_or_metahost_name()) 1202 for entry in cannot_abort) 1203 raise AclAccessViolation('You cannot abort the following job entries: ' 1204 + entry_names) 1205 1206 1207 def check_for_acl_violation_acl_group(self): 1208 """Verifies the current user has acces to this ACL group. 1209 1210 @raises AclAccessViolation if the current user doesn't have access to 1211 this ACL group. 1212 """ 1213 user = User.current_user() 1214 if user.is_superuser(): 1215 return 1216 if self.name == 'Everyone': 1217 raise AclAccessViolation("You cannot modify 'Everyone'!") 1218 if not user in self.users.all(): 1219 raise AclAccessViolation("You do not have access to %s" 1220 % self.name) 1221 1222 @staticmethod 1223 def on_host_membership_change(): 1224 """Invoked when host membership changes.""" 1225 everyone = AclGroup.objects.get(name='Everyone') 1226 1227 # find hosts that aren't in any ACL group and add them to Everyone 1228 # TODO(showard): this is a bit of a hack, since the fact that this query 1229 # works is kind of a coincidence of Django internals. This trick 1230 # doesn't work in general (on all foreign key relationships). I'll 1231 # replace it with a better technique when the need arises. 1232 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 1233 everyone.hosts.add(*orphaned_hosts.distinct()) 1234 1235 # find hosts in both Everyone and another ACL group, and remove them 1236 # from Everyone 1237 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 1238 acled_hosts = set() 1239 for host in hosts_in_everyone: 1240 # Has an ACL group other than Everyone 1241 if host.aclgroup_set.count() > 1: 1242 acled_hosts.add(host) 1243 everyone.hosts.remove(*acled_hosts) 1244 1245 1246 def delete(self): 1247 if (self.name == 'Everyone'): 1248 raise AclAccessViolation("You cannot delete 'Everyone'!") 1249 self.check_for_acl_violation_acl_group() 1250 super(AclGroup, self).delete() 1251 self.on_host_membership_change() 1252 1253 1254 def add_current_user_if_empty(self): 1255 """Adds the current user if the set of users is empty.""" 1256 if not self.users.count(): 1257 self.users.add(User.current_user()) 1258 1259 1260 def perform_after_save(self, change): 1261 """Called after a save. 1262 1263 @param change: Whether there was a change. 1264 """ 1265 if not change: 1266 self.users.add(User.current_user()) 1267 self.add_current_user_if_empty() 1268 self.on_host_membership_change() 1269 1270 1271 def save(self, *args, **kwargs): 1272 change = bool(self.id) 1273 if change: 1274 # Check the original object for an ACL violation 1275 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 1276 super(AclGroup, self).save(*args, **kwargs) 1277 self.perform_after_save(change) 1278 1279 1280 class Meta: 1281 """Metadata for class AclGroup.""" 1282 db_table = 'afe_acl_groups' 1283 1284 def __unicode__(self): 1285 return unicode(self.name) 1286 1287 1288class ParameterizedJob(dbmodels.Model): 1289 """ 1290 Auxiliary configuration for a parameterized job. 1291 1292 This class is obsolete, and ought to be dead. Due to a series of 1293 unfortunate events, it can't be deleted: 1294 * In `class Job` we're required to keep a reference to this class 1295 for the sake of the scheduler unit tests. 1296 * The existence of the reference in `Job` means that certain 1297 methods here will get called from the `get_jobs` RPC. 1298 So, the definitions below seem to be the minimum stub we can support 1299 unless/until we change the database schema. 1300 """ 1301 1302 @classmethod 1303 def smart_get(cls, id_or_name, *args, **kwargs): 1304 """For compatibility with Job.add_object. 1305 1306 @param cls: Implicit class object. 1307 @param id_or_name: The ID or name to get. 1308 @param args: Non-keyword arguments. 1309 @param kwargs: Keyword arguments. 1310 """ 1311 return cls.objects.get(pk=id_or_name) 1312 1313 1314 def job(self): 1315 """Returns the job if it exists, or else None.""" 1316 jobs = self.job_set.all() 1317 assert jobs.count() <= 1 1318 return jobs and jobs[0] or None 1319 1320 1321 class Meta: 1322 """Metadata for class ParameterizedJob.""" 1323 db_table = 'afe_parameterized_jobs' 1324 1325 def __unicode__(self): 1326 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 1327 1328 1329class JobManager(model_logic.ExtendedManager): 1330 'Custom manager to provide efficient status counts querying.' 1331 def get_status_counts(self, job_ids): 1332 """Returns a dict mapping the given job IDs to their status count dicts. 1333 1334 @param job_ids: A list of job IDs. 1335 """ 1336 if not job_ids: 1337 return {} 1338 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1339 cursor = connection.cursor() 1340 cursor.execute(""" 1341 SELECT job_id, status, aborted, complete, COUNT(*) 1342 FROM afe_host_queue_entries 1343 WHERE job_id IN %s 1344 GROUP BY job_id, status, aborted, complete 1345 """ % id_list) 1346 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1347 for job_id, status, aborted, complete, count in cursor.fetchall(): 1348 job_dict = all_job_counts[job_id] 1349 full_status = HostQueueEntry.compute_full_status(status, aborted, 1350 complete) 1351 job_dict.setdefault(full_status, 0) 1352 job_dict[full_status] += count 1353 return all_job_counts 1354 1355 1356class Job(dbmodels.Model, model_logic.ModelExtensions): 1357 """\ 1358 owner: username of job owner 1359 name: job name (does not have to be unique) 1360 priority: Integer priority value. Higher is more important. 1361 control_file: contents of control file 1362 control_type: Client or Server 1363 created_on: date of job creation 1364 submitted_on: date of job submission 1365 synch_count: how many hosts should be used per autoserv execution 1366 run_verify: Whether or not to run the verify phase 1367 run_reset: Whether or not to run the reset phase 1368 timeout: DEPRECATED - hours from queuing time until job times out 1369 timeout_mins: minutes from job queuing time until the job times out 1370 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1371 times out 1372 max_runtime_mins: minutes from job starting time until job times out 1373 email_list: list of people to email on completion delimited by any of: 1374 white space, ',', ':', ';' 1375 dependency_labels: many-to-many relationship with labels corresponding to 1376 job dependencies 1377 reboot_before: Never, If dirty, or Always 1378 reboot_after: Never, If all tests passed, or Always 1379 parse_failed_repair: if True, a failed repair launched by this job will have 1380 its results parsed as part of the job. 1381 drone_set: The set of drones to run this job on 1382 parent_job: Parent job (optional) 1383 test_retry: Number of times to retry test if the test did not complete 1384 successfully. (optional, default: 0) 1385 require_ssp: Require server-side packaging unless require_ssp is set to 1386 False. (optional, default: None) 1387 """ 1388 1389 # TODO: Investigate, if jobkeyval_set is really needed. 1390 # dynamic_suite will write them into an attached file for the drone, but 1391 # it doesn't seem like they are actually used. If they aren't used, remove 1392 # jobkeyval_set here. 1393 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1394 'hostqueueentry_set', 1395 'jobkeyval_set', 1396 'shard']) 1397 1398 EXCLUDE_KNOWN_JOBS_CLAUSE = ''' 1399 AND NOT (afe_host_queue_entries.aborted = 0 1400 AND afe_jobs.id IN (%(known_ids)s)) 1401 ''' 1402 1403 EXCLUDE_OLD_JOBS_CLAUSE = 'AND (afe_jobs.created_on > "%(cutoff)s")' 1404 1405 SQL_SHARD_JOBS = ''' 1406 SELECT DISTINCT(afe_jobs.id) FROM afe_jobs 1407 INNER JOIN afe_host_queue_entries 1408 ON (afe_jobs.id = afe_host_queue_entries.job_id) 1409 LEFT OUTER JOIN afe_jobs_dependency_labels 1410 ON (afe_jobs.id = afe_jobs_dependency_labels.job_id) 1411 JOIN afe_shards_labels 1412 ON (afe_shards_labels.label_id = afe_jobs_dependency_labels.label_id 1413 OR afe_shards_labels.label_id = afe_host_queue_entries.meta_host) 1414 WHERE (afe_shards_labels.shard_id = %(shard_id)s 1415 AND afe_host_queue_entries.complete != 1 1416 AND afe_host_queue_entries.active != 1 1417 %(exclude_known_jobs)s 1418 %(exclude_old_jobs)s) 1419 ''' 1420 1421 # Jobs can be created with assigned hosts and have no dependency 1422 # labels nor meta_host. 1423 # We are looking for: 1424 # - a job whose hqe's meta_host is null 1425 # - a job whose hqe has a host 1426 # - one of the host's labels matches the shard's label. 1427 # Non-aborted known jobs, completed jobs, active jobs, jobs 1428 # without hqe are exluded as we do with SQL_SHARD_JOBS. 1429 SQL_SHARD_JOBS_WITH_HOSTS = ''' 1430 SELECT DISTINCT(afe_jobs.id) FROM afe_jobs 1431 INNER JOIN afe_host_queue_entries 1432 ON (afe_jobs.id = afe_host_queue_entries.job_id) 1433 LEFT OUTER JOIN %(host_label_table)s 1434 ON (afe_host_queue_entries.host_id = %(host_label_table)s.host_id) 1435 WHERE (%(host_label_table)s.%(host_label_column)s IN %(label_ids)s 1436 AND afe_host_queue_entries.complete != 1 1437 AND afe_host_queue_entries.active != 1 1438 AND afe_host_queue_entries.meta_host IS NULL 1439 AND afe_host_queue_entries.host_id IS NOT NULL 1440 %(exclude_known_jobs)s 1441 %(exclude_old_jobs)s) 1442 ''' 1443 1444 # Even if we had filters about complete, active and aborted 1445 # bits in the above two SQLs, there is a chance that 1446 # the result may still contain a job with an hqe with 'complete=1' 1447 # or 'active=1'.' 1448 # This happens when a job has two (or more) hqes and at least 1449 # one hqe has different bits than others. 1450 # We use a second sql to ensure we exclude all un-desired jobs. 1451 SQL_JOBS_TO_EXCLUDE = ''' 1452 SELECT afe_jobs.id FROM afe_jobs 1453 INNER JOIN afe_host_queue_entries 1454 ON (afe_jobs.id = afe_host_queue_entries.job_id) 1455 WHERE (afe_jobs.id in (%(candidates)s) 1456 AND (afe_host_queue_entries.complete=1 1457 OR afe_host_queue_entries.active=1)) 1458 ''' 1459 1460 def _deserialize_relation(self, link, data): 1461 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1462 for obj in data: 1463 obj['job_id'] = self.id 1464 1465 super(Job, self)._deserialize_relation(link, data) 1466 1467 1468 def custom_deserialize_relation(self, link, data): 1469 assert link == 'shard', 'Link %s should not be deserialized' % link 1470 self.shard = Shard.deserialize(data) 1471 1472 1473 def sanity_check_update_from_shard(self, shard, updated_serialized): 1474 # If the job got aborted on the main after the client fetched it 1475 # no shard_id will be set. The shard might still push updates though, 1476 # as the job might complete before the abort bit syncs to the shard. 1477 # Alternative considered: The main scheduler could be changed to not 1478 # set aborted jobs to completed that are sharded out. But that would 1479 # require database queries and seemed more complicated to implement. 1480 # This seems safe to do, as there won't be updates pushed from the wrong 1481 # shards should be powered off and wiped hen they are removed from the 1482 # main. 1483 if self.shard_id and self.shard_id != shard.id: 1484 raise error.IgnorableUnallowedRecordsSentToMain( 1485 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1486 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1487 shard.id)) 1488 1489 1490 RebootBefore = model_attributes.RebootBefore 1491 RebootAfter = model_attributes.RebootAfter 1492 # TIMEOUT is deprecated. 1493 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1494 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1495 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1496 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1497 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1498 # completed. 1499 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1500 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1501 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1502 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1503 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1504 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, default=False) 1505 FETCH_READONLY_JOBS = global_config.global_config.get_config_value( 1506 'AUTOTEST_WEB','readonly_heartbeat', type=bool, default=False) 1507 SKIP_JOBS_CREATED_BEFORE = global_config.global_config.get_config_value( 1508 'SHARD', 'skip_jobs_created_before', type=int, default=0) 1509 1510 1511 1512 owner = dbmodels.CharField(max_length=255) 1513 name = dbmodels.CharField(max_length=255) 1514 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1515 control_file = dbmodels.TextField(null=True, blank=True) 1516 control_type = dbmodels.SmallIntegerField( 1517 choices=control_data.CONTROL_TYPE.choices(), 1518 blank=True, # to allow 0 1519 default=control_data.CONTROL_TYPE.CLIENT) 1520 created_on = dbmodels.DateTimeField() 1521 synch_count = dbmodels.IntegerField(blank=True, default=0) 1522 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1523 run_verify = dbmodels.BooleanField(default=False) 1524 email_list = dbmodels.CharField(max_length=250, blank=True) 1525 dependency_labels = ( 1526 dbmodels.ManyToManyField(Label, blank=True, 1527 db_table='afe_jobs_dependency_labels')) 1528 reboot_before = dbmodels.SmallIntegerField( 1529 choices=model_attributes.RebootBefore.choices(), blank=True, 1530 default=DEFAULT_REBOOT_BEFORE) 1531 reboot_after = dbmodels.SmallIntegerField( 1532 choices=model_attributes.RebootAfter.choices(), blank=True, 1533 default=DEFAULT_REBOOT_AFTER) 1534 parse_failed_repair = dbmodels.BooleanField( 1535 default=DEFAULT_PARSE_FAILED_REPAIR) 1536 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1537 # completed. 1538 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1539 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1540 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1541 1542 # TODO(jrbarnette) We have to keep `parameterized_job` around or it 1543 # breaks the scheduler_models unit tests (and fixing the unit tests 1544 # will break the scheduler, so don't do that). 1545 # 1546 # The ultimate fix is to delete the column from the database table 1547 # at which point, you _must_ delete this. Until you're ready to do 1548 # that, DON'T MUCK WITH IT. 1549 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1550 blank=True) 1551 1552 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1553 1554 test_retry = dbmodels.IntegerField(blank=True, default=0) 1555 1556 run_reset = dbmodels.BooleanField(default=True) 1557 1558 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1559 1560 # If this is None on the main, a shard should be found. 1561 # If this is None on a shard, it should be synced back to the main 1562 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1563 1564 # If this is None, server-side packaging will be used for server side test. 1565 require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True) 1566 1567 # custom manager 1568 objects = JobManager() 1569 1570 1571 @decorators.cached_property 1572 def labels(self): 1573 """All the labels of this job""" 1574 # We need to convert dependency_labels to a list, because all() gives us 1575 # back an iterator, and storing/caching an iterator means we'd only be 1576 # able to read from it once. 1577 return list(self.dependency_labels.all()) 1578 1579 1580 def is_server_job(self): 1581 """Returns whether this job is of type server.""" 1582 return self.control_type == control_data.CONTROL_TYPE.SERVER 1583 1584 1585 @classmethod 1586 def create(cls, owner, options, hosts): 1587 """Creates a job. 1588 1589 The job is created by taking some information (the listed args) and 1590 filling in the rest of the necessary information. 1591 1592 @param cls: Implicit class object. 1593 @param owner: The owner for the job. 1594 @param options: An options object. 1595 @param hosts: The hosts to use. 1596 """ 1597 AclGroup.check_for_acl_violation_hosts(hosts) 1598 1599 control_file = options.get('control_file') 1600 1601 user = User.current_user() 1602 if options.get('reboot_before') is None: 1603 options['reboot_before'] = user.get_reboot_before_display() 1604 if options.get('reboot_after') is None: 1605 options['reboot_after'] = user.get_reboot_after_display() 1606 1607 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1608 1609 if options.get('timeout_mins') is None and options.get('timeout'): 1610 options['timeout_mins'] = options['timeout'] * 60 1611 1612 job = cls.add_object( 1613 owner=owner, 1614 name=options['name'], 1615 priority=options['priority'], 1616 control_file=control_file, 1617 control_type=options['control_type'], 1618 synch_count=options.get('synch_count'), 1619 # timeout needs to be deleted in the future. 1620 timeout=options.get('timeout'), 1621 timeout_mins=options.get('timeout_mins'), 1622 max_runtime_mins=options.get('max_runtime_mins'), 1623 run_verify=options.get('run_verify'), 1624 email_list=options.get('email_list'), 1625 reboot_before=options.get('reboot_before'), 1626 reboot_after=options.get('reboot_after'), 1627 parse_failed_repair=options.get('parse_failed_repair'), 1628 created_on=datetime.now(), 1629 drone_set=drone_set, 1630 parent_job=options.get('parent_job_id'), 1631 test_retry=options.get('test_retry'), 1632 run_reset=options.get('run_reset'), 1633 require_ssp=options.get('require_ssp')) 1634 1635 job.dependency_labels = options['dependencies'] 1636 1637 if options.get('keyvals'): 1638 for key, value in options['keyvals'].iteritems(): 1639 # None (or NULL) is not acceptable by DB, so change it to an 1640 # empty string in case. 1641 JobKeyval.objects.create(job=job, key=key, 1642 value='' if value is None else value) 1643 1644 return job 1645 1646 1647 @classmethod 1648 def assign_to_shard(cls, shard, known_ids): 1649 """Assigns unassigned jobs to a shard. 1650 1651 For all labels that have been assigned to this shard, all jobs that 1652 have this label are assigned to this shard. 1653 1654 @param shard: The shard to assign jobs to. 1655 @param known_ids: List of all ids of incomplete jobs the shard already 1656 knows about. 1657 1658 @returns The job objects that should be sent to the shard. 1659 """ 1660 with cls._readonly_job_query_context(): 1661 job_ids = cls._get_new_jobs_for_shard(shard, known_ids) 1662 if not job_ids: 1663 return [] 1664 cls._assign_jobs_to_shard(job_ids, shard) 1665 return cls._jobs_with_ids(job_ids) 1666 1667 1668 @classmethod 1669 @contextlib.contextmanager 1670 def _readonly_job_query_context(cls): 1671 #TODO: Get rid of this kludge if/when we update Django to >=1.7 1672 #correct usage would be .raw(..., using='readonly') 1673 old_db = Job.objects._db 1674 try: 1675 if cls.FETCH_READONLY_JOBS: 1676 Job.objects._db = 'readonly' 1677 yield 1678 finally: 1679 Job.objects._db = old_db 1680 1681 1682 @classmethod 1683 def _assign_jobs_to_shard(cls, job_ids, shard): 1684 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1685 1686 1687 @classmethod 1688 def _jobs_with_ids(cls, job_ids): 1689 return list(Job.objects.filter(pk__in=job_ids).all()) 1690 1691 1692 @classmethod 1693 def _get_new_jobs_for_shard(cls, shard, known_ids): 1694 job_ids = cls._get_jobs_without_hosts(shard, known_ids) 1695 job_ids |= cls._get_jobs_with_hosts(shard, known_ids) 1696 if job_ids: 1697 job_ids -= cls._filter_finished_jobs(job_ids) 1698 return job_ids 1699 1700 1701 @classmethod 1702 def _filter_finished_jobs(cls, job_ids): 1703 query = Job.objects.raw( 1704 cls.SQL_JOBS_TO_EXCLUDE % 1705 {'candidates': ','.join([str(i) for i in job_ids])}) 1706 return set([j.id for j in query]) 1707 1708 1709 @classmethod 1710 def _get_jobs_without_hosts(cls, shard, known_ids): 1711 raw_sql = cls.SQL_SHARD_JOBS % { 1712 'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids), 1713 'exclude_old_jobs': cls._exclude_old_jobs_clause(), 1714 'shard_id': shard.id 1715 } 1716 return set([j.id for j in Job.objects.raw(raw_sql)]) 1717 1718 1719 @classmethod 1720 def _get_jobs_with_hosts(cls, shard, known_ids): 1721 job_ids = set([]) 1722 static_labels, non_static_labels = Host.classify_label_objects( 1723 shard.labels.all()) 1724 if static_labels: 1725 label_ids = [str(l.id) for l in static_labels] 1726 query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 1727 'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids), 1728 'exclude_old_jobs': cls._exclude_old_jobs_clause(), 1729 'host_label_table': 'afe_static_hosts_labels', 1730 'host_label_column': 'staticlabel_id', 1731 'label_ids': '(%s)' % ','.join(label_ids)}) 1732 job_ids |= set([j.id for j in query]) 1733 if non_static_labels: 1734 label_ids = [str(l.id) for l in non_static_labels] 1735 query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 1736 'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids), 1737 'exclude_old_jobs': cls._exclude_old_jobs_clause(), 1738 'host_label_table': 'afe_hosts_labels', 1739 'host_label_column': 'label_id', 1740 'label_ids': '(%s)' % ','.join(label_ids)}) 1741 job_ids |= set([j.id for j in query]) 1742 return job_ids 1743 1744 1745 @classmethod 1746 def _exclude_known_jobs_clause(cls, known_ids): 1747 if not known_ids: 1748 return '' 1749 return (cls.EXCLUDE_KNOWN_JOBS_CLAUSE % 1750 {'known_ids': ','.join([str(i) for i in known_ids])}) 1751 1752 1753 @classmethod 1754 def _exclude_old_jobs_clause(cls): 1755 """Filter queried jobs to be created within a few hours in the past. 1756 1757 With this clause, any jobs older than a configurable number of hours are 1758 skipped in the jobs query. 1759 The job creation window affects the overall query performance. Longer 1760 creation windows require a range query over more Job table rows using 1761 the created_on column index. c.f. http://crbug.com/966872#c35 1762 """ 1763 if cls.SKIP_JOBS_CREATED_BEFORE <= 0: 1764 return '' 1765 cutoff = datetime.now()- timedelta(hours=cls.SKIP_JOBS_CREATED_BEFORE) 1766 return (cls.EXCLUDE_OLD_JOBS_CLAUSE % 1767 {'cutoff': cutoff.strftime('%Y-%m-%d %H:%M:%S')}) 1768 1769 1770 def queue(self, hosts, is_template=False): 1771 """Enqueue a job on the given hosts. 1772 1773 @param hosts: The hosts to use. 1774 @param is_template: Whether the status should be "Template". 1775 """ 1776 if not hosts: 1777 # hostless job 1778 entry = HostQueueEntry.create(job=self, is_template=is_template) 1779 entry.save() 1780 return 1781 1782 for host in hosts: 1783 host.enqueue_job(self, is_template=is_template) 1784 1785 1786 def user(self): 1787 """Gets the user of this job, or None if it doesn't exist.""" 1788 try: 1789 return User.objects.get(login=self.owner) 1790 except self.DoesNotExist: 1791 return None 1792 1793 1794 def abort(self): 1795 """Aborts this job.""" 1796 for queue_entry in self.hostqueueentry_set.all(): 1797 queue_entry.abort() 1798 1799 1800 def tag(self): 1801 """Returns a string tag for this job.""" 1802 return server_utils.get_job_tag(self.id, self.owner) 1803 1804 1805 def keyval_dict(self): 1806 """Returns all keyvals for this job as a dictionary.""" 1807 return dict((keyval.key, keyval.value) 1808 for keyval in self.jobkeyval_set.all()) 1809 1810 1811 @classmethod 1812 def get_attribute_model(cls): 1813 """Return the attribute model. 1814 1815 Override method in parent class. This class is called when 1816 deserializing the one-to-many relationship betwen Job and JobKeyval. 1817 On deserialization, we will try to clear any existing job keyvals 1818 associated with a job to avoid any inconsistency. 1819 Though Job doesn't implement ModelWithAttribute, we still treat 1820 it as an attribute model for this purpose. 1821 1822 @returns: The attribute model of Job. 1823 """ 1824 return JobKeyval 1825 1826 1827 class Meta: 1828 """Metadata for class Job.""" 1829 db_table = 'afe_jobs' 1830 1831 def __unicode__(self): 1832 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1833 1834 1835class JobHandoff(dbmodels.Model, model_logic.ModelExtensions): 1836 """Jobs that have been handed off to lucifer.""" 1837 1838 job = dbmodels.OneToOneField(Job, on_delete=dbmodels.CASCADE, 1839 primary_key=True) 1840 created = dbmodels.DateTimeField(auto_now_add=True) 1841 completed = dbmodels.BooleanField(default=False) 1842 drone = dbmodels.CharField( 1843 max_length=128, null=True, 1844 help_text=''' 1845The hostname of the drone the job is running on and whose job_aborter 1846should be responsible for aborting the job if the job process dies. 1847NULL means any drone's job_aborter has free reign to abort the job. 1848''') 1849 1850 class Meta: 1851 """Metadata for class Job.""" 1852 db_table = 'afe_job_handoffs' 1853 1854 1855class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1856 """Keyvals associated with jobs""" 1857 1858 SERIALIZATION_LINKS_TO_KEEP = set(['job']) 1859 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 1860 1861 job = dbmodels.ForeignKey(Job) 1862 key = dbmodels.CharField(max_length=90) 1863 value = dbmodels.CharField(max_length=300) 1864 1865 objects = model_logic.ExtendedManager() 1866 1867 1868 @classmethod 1869 def get_record(cls, data): 1870 """Check the database for an identical record. 1871 1872 Use job_id and key to search for a existing record. 1873 1874 @raises: DoesNotExist, if no record found 1875 @raises: MultipleObjectsReturned if multiple records found. 1876 """ 1877 # TODO(fdeng): We should use job_id and key together as 1878 # a primary key in the db. 1879 return cls.objects.get(job_id=data['job_id'], key=data['key']) 1880 1881 1882 @classmethod 1883 def deserialize(cls, data): 1884 """Override deserialize in parent class. 1885 1886 Do not deserialize id as id is not kept consistent on main and shards. 1887 1888 @param data: A dictionary of data to deserialize. 1889 1890 @returns: A JobKeyval object. 1891 """ 1892 if data: 1893 data.pop('id') 1894 return super(JobKeyval, cls).deserialize(data) 1895 1896 1897 class Meta: 1898 """Metadata for class JobKeyval.""" 1899 db_table = 'afe_job_keyvals' 1900 1901 1902class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1903 """Represents an ineligible host queue.""" 1904 job = dbmodels.ForeignKey(Job) 1905 host = dbmodels.ForeignKey(Host) 1906 1907 objects = model_logic.ExtendedManager() 1908 1909 class Meta: 1910 """Metadata for class IneligibleHostQueue.""" 1911 db_table = 'afe_ineligible_host_queues' 1912 1913 1914class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1915 """Represents a host queue entry.""" 1916 1917 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1918 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 1919 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) 1920 1921 1922 def custom_deserialize_relation(self, link, data): 1923 assert link == 'meta_host' 1924 self.meta_host = Label.deserialize(data) 1925 1926 1927 def sanity_check_update_from_shard(self, shard, updated_serialized, 1928 job_ids_sent): 1929 if self.job_id not in job_ids_sent: 1930 raise error.IgnorableUnallowedRecordsSentToMain( 1931 'Sent HostQueueEntry without corresponding ' 1932 'job entry: %s' % updated_serialized) 1933 1934 1935 Status = host_queue_entry_states.Status 1936 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1937 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1938 PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES 1939 IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES 1940 1941 job = dbmodels.ForeignKey(Job) 1942 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1943 status = dbmodels.CharField(max_length=255) 1944 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1945 db_column='meta_host') 1946 active = dbmodels.BooleanField(default=False) 1947 complete = dbmodels.BooleanField(default=False) 1948 deleted = dbmodels.BooleanField(default=False) 1949 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1950 default='') 1951 # If atomic_group is set, this is a virtual HostQueueEntry that will 1952 # be expanded into many actual hosts within the group at schedule time. 1953 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1954 aborted = dbmodels.BooleanField(default=False) 1955 started_on = dbmodels.DateTimeField(null=True, blank=True) 1956 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1957 1958 objects = model_logic.ExtendedManager() 1959 1960 1961 def __init__(self, *args, **kwargs): 1962 super(HostQueueEntry, self).__init__(*args, **kwargs) 1963 self._record_attributes(['status']) 1964 1965 1966 @classmethod 1967 def create(cls, job, host=None, meta_host=None, 1968 is_template=False): 1969 """Creates a new host queue entry. 1970 1971 @param cls: Implicit class object. 1972 @param job: The associated job. 1973 @param host: The associated host. 1974 @param meta_host: The associated meta host. 1975 @param is_template: Whether the status should be "Template". 1976 """ 1977 if is_template: 1978 status = cls.Status.TEMPLATE 1979 else: 1980 status = cls.Status.QUEUED 1981 1982 return cls(job=job, host=host, meta_host=meta_host, status=status) 1983 1984 1985 def save(self, *args, **kwargs): 1986 self._set_active_and_complete() 1987 super(HostQueueEntry, self).save(*args, **kwargs) 1988 self._check_for_updated_attributes() 1989 1990 1991 def execution_path(self): 1992 """ 1993 Path to this entry's results (relative to the base results directory). 1994 """ 1995 return server_utils.get_hqe_exec_path(self.job.tag(), 1996 self.execution_subdir) 1997 1998 1999 def host_or_metahost_name(self): 2000 """Returns the first non-None name found in priority order. 2001 2002 The priority order checked is: (1) host name; (2) meta host name 2003 """ 2004 if self.host: 2005 return self.host.hostname 2006 else: 2007 assert self.meta_host 2008 return self.meta_host.name 2009 2010 2011 def _set_active_and_complete(self): 2012 if self.status in self.ACTIVE_STATUSES: 2013 self.active, self.complete = True, False 2014 elif self.status in self.COMPLETE_STATUSES: 2015 self.active, self.complete = False, True 2016 else: 2017 self.active, self.complete = False, False 2018 2019 2020 def on_attribute_changed(self, attribute, old_value): 2021 assert attribute == 'status' 2022 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 2023 self.status) 2024 2025 2026 def is_meta_host_entry(self): 2027 'True if this is a entry has a meta_host instead of a host.' 2028 return self.host is None and self.meta_host is not None 2029 2030 2031 # This code is shared between rpc_interface and models.HostQueueEntry. 2032 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 2033 # a class method was the best way to refactor it. Attempting to put it in 2034 # rpc_utils or a new utils module failed as that would require us to import 2035 # models.py but to call it from here we would have to import the utils.py 2036 # thus creating a cycle. 2037 @classmethod 2038 def abort_host_queue_entries(cls, host_queue_entries): 2039 """Aborts a collection of host_queue_entries. 2040 2041 Abort these host queue entry and all host queue entries of jobs created 2042 by them. 2043 2044 @param host_queue_entries: List of host queue entries we want to abort. 2045 """ 2046 # This isn't completely immune to race conditions since it's not atomic, 2047 # but it should be safe given the scheduler's behavior. 2048 2049 # TODO(milleral): crbug.com/230100 2050 # The |abort_host_queue_entries| rpc does nearly exactly this, 2051 # however, trying to re-use the code generates some horrible 2052 # circular import error. I'd be nice to refactor things around 2053 # sometime so the code could be reused. 2054 2055 # Fixpoint algorithm to find the whole tree of HQEs to abort to 2056 # minimize the total number of database queries: 2057 children = set() 2058 new_children = set(host_queue_entries) 2059 while new_children: 2060 children.update(new_children) 2061 new_child_ids = [hqe.job_id for hqe in new_children] 2062 new_children = HostQueueEntry.objects.filter( 2063 job__parent_job__in=new_child_ids, 2064 complete=False, aborted=False).all() 2065 # To handle circular parental relationships 2066 new_children = set(new_children) - children 2067 2068 # Associate a user with the host queue entries that we're about 2069 # to abort so that we can look up who to blame for the aborts. 2070 child_ids = [hqe.id for hqe in children] 2071 # Get a list of hqe ids that already exists, so we can exclude them when 2072 # we do bulk_create later to avoid IntegrityError. 2073 existing_hqe_ids = set(AbortedHostQueueEntry.objects. 2074 filter(queue_entry_id__in=child_ids). 2075 values_list('queue_entry_id', flat=True)) 2076 now = datetime.now() 2077 user = User.current_user() 2078 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 2079 aborted_by=user, aborted_on=now) for hqe in children 2080 if hqe.id not in existing_hqe_ids] 2081 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 2082 # Bulk update all of the HQEs to set the abort bit. 2083 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 2084 2085 2086 def abort(self): 2087 """ Aborts this host queue entry. 2088 2089 Abort this host queue entry and all host queue entries of jobs created by 2090 this one. 2091 2092 """ 2093 if not self.complete and not self.aborted: 2094 HostQueueEntry.abort_host_queue_entries([self]) 2095 2096 2097 @classmethod 2098 def compute_full_status(cls, status, aborted, complete): 2099 """Returns a modified status msg if the host queue entry was aborted. 2100 2101 @param cls: Implicit class object. 2102 @param status: The original status message. 2103 @param aborted: Whether the host queue entry was aborted. 2104 @param complete: Whether the host queue entry was completed. 2105 """ 2106 if aborted and not complete: 2107 return 'Aborted (%s)' % status 2108 return status 2109 2110 2111 def full_status(self): 2112 """Returns the full status of this host queue entry, as a string.""" 2113 return self.compute_full_status(self.status, self.aborted, 2114 self.complete) 2115 2116 2117 def _postprocess_object_dict(self, object_dict): 2118 object_dict['full_status'] = self.full_status() 2119 2120 2121 class Meta: 2122 """Metadata for class HostQueueEntry.""" 2123 db_table = 'afe_host_queue_entries' 2124 2125 2126 2127 def __unicode__(self): 2128 hostname = None 2129 if self.host: 2130 hostname = self.host.hostname 2131 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 2132 2133 2134class HostQueueEntryStartTimes(dbmodels.Model): 2135 """An auxilary table to HostQueueEntry to index by start time.""" 2136 insert_time = dbmodels.DateTimeField() 2137 highest_hqe_id = dbmodels.IntegerField() 2138 2139 class Meta: 2140 """Metadata for class HostQueueEntryStartTimes.""" 2141 db_table = 'afe_host_queue_entry_start_times' 2142 2143 2144class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 2145 """Represents an aborted host queue entry.""" 2146 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 2147 aborted_by = dbmodels.ForeignKey(User) 2148 aborted_on = dbmodels.DateTimeField() 2149 2150 objects = model_logic.ExtendedManager() 2151 2152 2153 def save(self, *args, **kwargs): 2154 self.aborted_on = datetime.now() 2155 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 2156 2157 class Meta: 2158 """Metadata for class AbortedHostQueueEntry.""" 2159 db_table = 'afe_aborted_host_queue_entries' 2160 2161 2162class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 2163 """\ 2164 Tasks to run on hosts at the next time they are in the Ready state. Use this 2165 for high-priority tasks, such as forced repair or forced reinstall. 2166 2167 host: host to run this task on 2168 task: special task to run 2169 time_requested: date and time the request for this task was made 2170 is_active: task is currently running 2171 is_complete: task has finished running 2172 is_aborted: task was aborted 2173 time_started: date and time the task started 2174 time_finished: date and time the task finished 2175 queue_entry: Host queue entry waiting on this task (or None, if task was not 2176 started in preparation of a job) 2177 """ 2178 Task = autotest_enum.AutotestEnum('Verify', 'Cleanup', 'Repair', 'Reset', 2179 'Provision', string_values=True) 2180 2181 host = dbmodels.ForeignKey(Host, blank=False, null=False) 2182 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 2183 blank=False, null=False) 2184 requested_by = dbmodels.ForeignKey(User) 2185 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 2186 null=False) 2187 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 2188 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 2189 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 2190 time_started = dbmodels.DateTimeField(null=True, blank=True) 2191 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 2192 success = dbmodels.BooleanField(default=False, blank=False, null=False) 2193 time_finished = dbmodels.DateTimeField(null=True, blank=True) 2194 2195 objects = model_logic.ExtendedManager() 2196 2197 2198 def save(self, **kwargs): 2199 if self.queue_entry: 2200 self.requested_by = User.objects.get( 2201 login=self.queue_entry.job.owner) 2202 super(SpecialTask, self).save(**kwargs) 2203 2204 2205 def execution_path(self): 2206 """Returns the execution path for a special task.""" 2207 return server_utils.get_special_task_exec_path( 2208 self.host.hostname, self.id, self.task, self.time_requested) 2209 2210 2211 # property to emulate HostQueueEntry.status 2212 @property 2213 def status(self): 2214 """Returns a host queue entry status appropriate for a speical task.""" 2215 return server_utils.get_special_task_status( 2216 self.is_complete, self.success, self.is_active) 2217 2218 2219 # property to emulate HostQueueEntry.started_on 2220 @property 2221 def started_on(self): 2222 """Returns the time at which this special task started.""" 2223 return self.time_started 2224 2225 2226 @classmethod 2227 def schedule_special_task(cls, host, task): 2228 """Schedules a special task on a host if not already scheduled. 2229 2230 @param cls: Implicit class object. 2231 @param host: The host to use. 2232 @param task: The task to schedule. 2233 """ 2234 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 2235 is_active=False, 2236 is_complete=False) 2237 if existing_tasks: 2238 return existing_tasks[0] 2239 2240 special_task = SpecialTask(host=host, task=task, 2241 requested_by=User.current_user()) 2242 special_task.save() 2243 return special_task 2244 2245 2246 def abort(self): 2247 """ Abort this special task.""" 2248 self.is_aborted = True 2249 self.save() 2250 2251 2252 def activate(self): 2253 """ 2254 Sets a task as active and sets the time started to the current time. 2255 """ 2256 logging.info('Starting: %s', self) 2257 self.is_active = True 2258 self.time_started = datetime.now() 2259 self.save() 2260 2261 2262 def finish(self, success): 2263 """Sets a task as completed. 2264 2265 @param success: Whether or not the task was successful. 2266 """ 2267 logging.info('Finished: %s', self) 2268 self.is_active = False 2269 self.is_complete = True 2270 self.success = success 2271 if self.time_started: 2272 self.time_finished = datetime.now() 2273 self.save() 2274 2275 2276 class Meta: 2277 """Metadata for class SpecialTask.""" 2278 db_table = 'afe_special_tasks' 2279 2280 2281 def __unicode__(self): 2282 result = u'Special Task %s (host %s, task %s, time %s)' % ( 2283 self.id, self.host, self.task, self.time_requested) 2284 if self.is_complete: 2285 result += u' (completed)' 2286 elif self.is_active: 2287 result += u' (active)' 2288 2289 return result 2290 2291 2292class StableVersion(dbmodels.Model, model_logic.ModelExtensions): 2293 2294 board = dbmodels.CharField(max_length=255, unique=True) 2295 version = dbmodels.CharField(max_length=255) 2296 2297 class Meta: 2298 """Metadata for class StableVersion.""" 2299 db_table = 'afe_stable_versions' 2300 2301 def save(self, *args, **kwargs): 2302 if os.getenv("OVERRIDE_STABLE_VERSION_BAN"): 2303 super(StableVersion, self).save(*args, **kwargs) 2304 else: 2305 raise RuntimeError("the ability to save StableVersions has been intentionally removed") 2306 2307 # pylint:disable=undefined-variable 2308 def delete(self): 2309 if os.getenv("OVERRIDE_STABLE_VERSION_BAN"): 2310 super(StableVersion, self).delete(*args, **kwargs) 2311 else: 2312 raise RuntimeError("the ability to delete StableVersions has been intentionally removed") 2313