1# pylint: disable=missing-docstring 2 3import contextlib 4from datetime import datetime 5from datetime import timedelta 6import logging 7import os 8 9import django.core 10import six 11try: 12 from django.db import models as dbmodels, connection 13except django.core.exceptions.ImproperlyConfigured: 14 raise ImportError('Django database not yet configured. Import either ' 15 'setup_django_environment or ' 16 'setup_django_lite_environment from ' 17 'autotest_lib.frontend before any imports that ' 18 'depend on django models.') 19from xml.sax import saxutils 20import common 21from autotest_lib.frontend.afe import model_logic, model_attributes 22from autotest_lib.frontend.afe import rdb_model_extensions 23from autotest_lib.frontend import settings, thread_local 24from autotest_lib.client.common_lib import autotest_enum, error, host_protections 25from autotest_lib.client.common_lib import global_config 26from autotest_lib.client.common_lib import host_queue_entry_states 27from autotest_lib.client.common_lib import control_data, priorities, decorators 28from autotest_lib.server import utils as server_utils 29 30# job options and user preferences 31DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 32DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 33 34RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 35 'SKYLAB', 'respect_static_labels', type=bool, default=False) 36 37RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value( 38 'SKYLAB', 'respect_static_attributes', type=bool, default=False) 39 40 41class AclAccessViolation(Exception): 42 """\ 43 Raised when an operation is attempted with proper permissions as 44 dictated by ACLs. 45 """ 46 47 48class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 49 """\ 50 An atomic group defines a collection of hosts which must only be scheduled 51 all at once. Any host with a label having an atomic group will only be 52 scheduled for a job at the same time as other hosts sharing that label. 53 54 Required: 55 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 56 max_number_of_machines: The maximum number of machines that will be 57 scheduled at once when scheduling jobs to this atomic group. 58 The job.synch_count is considered the minimum. 59 60 Optional: 61 description: Arbitrary text description of this group's purpose. 62 """ 63 name = dbmodels.CharField(max_length=255, unique=True) 64 description = dbmodels.TextField(blank=True) 65 # This magic value is the default to simplify the scheduler logic. 66 # It must be "large". The common use of atomic groups is to want all 67 # machines in the group to be used, limits on which subset used are 68 # often chosen via dependency labels. 69 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 70 # "infinity" is around 3.3 million. 71 INFINITE_MACHINES = 333333333 72 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 73 invalid = dbmodels.BooleanField(default=False, 74 editable=settings.FULL_ADMIN) 75 76 name_field = 'name' 77 objects = model_logic.ModelWithInvalidManager() 78 valid_objects = model_logic.ValidObjectsManager() 79 80 81 def enqueue_job(self, job, is_template=False): 82 """Enqueue a job on an associated atomic group of hosts. 83 84 @param job: A job to enqueue. 85 @param is_template: Whether the status should be "Template". 86 """ 87 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 88 is_template=is_template) 89 queue_entry.save() 90 91 92 def clean_object(self): 93 self.label_set.clear() 94 95 96 class Meta: 97 """Metadata for class AtomicGroup.""" 98 db_table = 'afe_atomic_groups' 99 100 101 def __unicode__(self): 102 return unicode(self.name) 103 104 105class Label(model_logic.ModelWithInvalid, dbmodels.Model): 106 """\ 107 Required: 108 name: label name 109 110 Optional: 111 kernel_config: URL/path to kernel config for jobs run on this label. 112 platform: If True, this is a platform label (defaults to False). 113 only_if_needed: If True, a Host with this label can only be used if that 114 label is requested by the job/test (either as the meta_host or 115 in the job_dependencies). 116 atomic_group: The atomic group associated with this label. 117 """ 118 name = dbmodels.CharField(max_length=255, unique=True) 119 kernel_config = dbmodels.CharField(max_length=255, blank=True) 120 platform = dbmodels.BooleanField(default=False) 121 invalid = dbmodels.BooleanField(default=False, 122 editable=settings.FULL_ADMIN) 123 only_if_needed = dbmodels.BooleanField(default=False) 124 125 name_field = 'name' 126 objects = model_logic.ModelWithInvalidManager() 127 valid_objects = model_logic.ValidObjectsManager() 128 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 129 130 131 def clean_object(self): 132 self.host_set.clear() 133 self.test_set.clear() 134 135 136 def enqueue_job(self, job, is_template=False): 137 """Enqueue a job on any host of this label. 138 139 @param job: A job to enqueue. 140 @param is_template: Whether the status should be "Template". 141 """ 142 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 143 is_template=is_template) 144 queue_entry.save() 145 146 147 148 class Meta: 149 """Metadata for class Label.""" 150 db_table = 'afe_labels' 151 152 153 def __unicode__(self): 154 return unicode(self.name) 155 156 157 def is_replaced_by_static(self): 158 """Detect whether a label is replaced by a static label. 159 160 'Static' means it can only be modified by skylab inventory tools. 161 """ 162 if RESPECT_STATIC_LABELS: 163 replaced = ReplacedLabel.objects.filter(label__id=self.id) 164 if len(replaced) > 0: 165 return True 166 167 return False 168 169 170class StaticLabel(model_logic.ModelWithInvalid, dbmodels.Model): 171 """\ 172 Required: 173 name: label name 174 175 Optional: 176 kernel_config: URL/path to kernel config for jobs run on this label. 177 platform: If True, this is a platform label (defaults to False). 178 only_if_needed: Deprecated. This is always False. 179 atomic_group: Deprecated. This is always NULL. 180 """ 181 name = dbmodels.CharField(max_length=255, unique=True) 182 kernel_config = dbmodels.CharField(max_length=255, blank=True) 183 platform = dbmodels.BooleanField(default=False) 184 invalid = dbmodels.BooleanField(default=False, 185 editable=settings.FULL_ADMIN) 186 only_if_needed = dbmodels.BooleanField(default=False) 187 188 name_field = 'name' 189 objects = model_logic.ModelWithInvalidManager() 190 valid_objects = model_logic.ValidObjectsManager() 191 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 192 193 def clean_object(self): 194 self.host_set.clear() 195 self.test_set.clear() 196 197 198 class Meta: 199 """Metadata for class StaticLabel.""" 200 db_table = 'afe_static_labels' 201 202 203 def __unicode__(self): 204 return unicode(self.name) 205 206 207class ReplacedLabel(dbmodels.Model, model_logic.ModelExtensions): 208 """The tag to indicate Whether to replace labels with static labels.""" 209 label = dbmodels.ForeignKey(Label) 210 objects = model_logic.ExtendedManager() 211 212 213 class Meta: 214 """Metadata for class ReplacedLabel.""" 215 db_table = 'afe_replaced_labels' 216 217 218 def __unicode__(self): 219 return unicode(self.label) 220 221 222class Shard(dbmodels.Model, model_logic.ModelExtensions): 223 224 hostname = dbmodels.CharField(max_length=255, unique=True) 225 226 name_field = 'hostname' 227 228 labels = dbmodels.ManyToManyField(Label, blank=True, 229 db_table='afe_shards_labels') 230 231 class Meta: 232 """Metadata for class ParameterizedJob.""" 233 db_table = 'afe_shards' 234 235 236class Drone(dbmodels.Model, model_logic.ModelExtensions): 237 """ 238 A scheduler drone 239 240 hostname: the drone's hostname 241 """ 242 hostname = dbmodels.CharField(max_length=255, unique=True) 243 244 name_field = 'hostname' 245 objects = model_logic.ExtendedManager() 246 247 248 def save(self, *args, **kwargs): 249 if not User.current_user().is_superuser(): 250 raise Exception('Only superusers may edit drones') 251 super(Drone, self).save(*args, **kwargs) 252 253 254 def delete(self): 255 if not User.current_user().is_superuser(): 256 raise Exception('Only superusers may delete drones') 257 super(Drone, self).delete() 258 259 260 class Meta: 261 """Metadata for class Drone.""" 262 db_table = 'afe_drones' 263 264 def __unicode__(self): 265 return unicode(self.hostname) 266 267 268class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 269 """ 270 A set of scheduler drones 271 272 These will be used by the scheduler to decide what drones a job is allowed 273 to run on. 274 275 name: the drone set's name 276 drones: the drones that are part of the set 277 """ 278 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 279 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 280 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 281 'SCHEDULER', 'default_drone_set_name', default=None) 282 283 name = dbmodels.CharField(max_length=255, unique=True) 284 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 285 286 name_field = 'name' 287 objects = model_logic.ExtendedManager() 288 289 290 def save(self, *args, **kwargs): 291 if not User.current_user().is_superuser(): 292 raise Exception('Only superusers may edit drone sets') 293 super(DroneSet, self).save(*args, **kwargs) 294 295 296 def delete(self): 297 if not User.current_user().is_superuser(): 298 raise Exception('Only superusers may delete drone sets') 299 super(DroneSet, self).delete() 300 301 302 @classmethod 303 def drone_sets_enabled(cls): 304 """Returns whether drone sets are enabled. 305 306 @param cls: Implicit class object. 307 """ 308 return cls.DRONE_SETS_ENABLED 309 310 311 @classmethod 312 def default_drone_set_name(cls): 313 """Returns the default drone set name. 314 315 @param cls: Implicit class object. 316 """ 317 return cls.DEFAULT_DRONE_SET_NAME 318 319 320 @classmethod 321 def get_default(cls): 322 """Gets the default drone set name, compatible with Job.add_object. 323 324 @param cls: Implicit class object. 325 """ 326 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 327 328 329 @classmethod 330 def resolve_name(cls, drone_set_name): 331 """ 332 Returns the name of one of these, if not None, in order of preference: 333 1) the drone set given, 334 2) the current user's default drone set, or 335 3) the global default drone set 336 337 or returns None if drone sets are disabled 338 339 @param cls: Implicit class object. 340 @param drone_set_name: A drone set name. 341 """ 342 if not cls.drone_sets_enabled(): 343 return None 344 345 user = User.current_user() 346 user_drone_set_name = user.drone_set and user.drone_set.name 347 348 return drone_set_name or user_drone_set_name or cls.get_default().name 349 350 351 def get_drone_hostnames(self): 352 """ 353 Gets the hostnames of all drones in this drone set 354 """ 355 return set(self.drones.all().values_list('hostname', flat=True)) 356 357 358 class Meta: 359 """Metadata for class DroneSet.""" 360 db_table = 'afe_drone_sets' 361 362 def __unicode__(self): 363 return unicode(self.name) 364 365 366class User(dbmodels.Model, model_logic.ModelExtensions): 367 """\ 368 Required: 369 login :user login name 370 371 Optional: 372 access_level: 0=User (default), 1=Admin, 100=Root 373 """ 374 ACCESS_ROOT = 100 375 ACCESS_ADMIN = 1 376 ACCESS_USER = 0 377 378 AUTOTEST_SYSTEM = 'autotest_system' 379 380 login = dbmodels.CharField(max_length=255, unique=True) 381 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 382 383 # user preferences 384 reboot_before = dbmodels.SmallIntegerField( 385 choices=model_attributes.RebootBefore.choices(), blank=True, 386 default=DEFAULT_REBOOT_BEFORE) 387 reboot_after = dbmodels.SmallIntegerField( 388 choices=model_attributes.RebootAfter.choices(), blank=True, 389 default=DEFAULT_REBOOT_AFTER) 390 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 391 show_experimental = dbmodels.BooleanField(default=False) 392 393 name_field = 'login' 394 objects = model_logic.ExtendedManager() 395 396 397 def save(self, *args, **kwargs): 398 # is this a new object being saved for the first time? 399 first_time = (self.id is None) 400 user = thread_local.get_user() 401 if user and not user.is_superuser() and user.login != self.login: 402 raise AclAccessViolation("You cannot modify user " + self.login) 403 super(User, self).save(*args, **kwargs) 404 if first_time: 405 everyone = AclGroup.objects.get(name='Everyone') 406 everyone.users.add(self) 407 408 409 def is_superuser(self): 410 """Returns whether the user has superuser access.""" 411 return self.access_level >= self.ACCESS_ROOT 412 413 414 @classmethod 415 def current_user(cls): 416 """Returns the current user. 417 418 @param cls: Implicit class object. 419 """ 420 user = thread_local.get_user() 421 if user is None: 422 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 423 user.access_level = cls.ACCESS_ROOT 424 user.save() 425 return user 426 427 428 @classmethod 429 def get_record(cls, data): 430 """Check the database for an identical record. 431 432 Check for a record with matching id and login. If one exists, 433 return it. If one does not exist there is a possibility that 434 the following cases have happened: 435 1. Same id, different login 436 We received: "1 chromeos-test" 437 And we have: "1 debug-user" 438 In this case we need to delete "1 debug_user" and insert 439 "1 chromeos-test". 440 441 2. Same login, different id: 442 We received: "1 chromeos-test" 443 And we have: "2 chromeos-test" 444 In this case we need to delete "2 chromeos-test" and insert 445 "1 chromeos-test". 446 447 As long as this method deletes bad records and raises the 448 DoesNotExist exception the caller will handle creating the 449 new record. 450 451 @raises: DoesNotExist, if a record with the matching login and id 452 does not exist. 453 """ 454 455 # Both the id and login should be uniqe but there are cases when 456 # we might already have a user with the same login/id because 457 # current_user will proactively create a user record if it doesn't 458 # exist. Since we want to avoid conflict between the main and 459 # shard, just delete any existing user records that don't match 460 # what we're about to deserialize from the main. 461 try: 462 return cls.objects.get(login=data['login'], id=data['id']) 463 except cls.DoesNotExist: 464 cls.delete_matching_record(login=data['login']) 465 cls.delete_matching_record(id=data['id']) 466 raise 467 468 469 class Meta: 470 """Metadata for class User.""" 471 db_table = 'afe_users' 472 473 def __unicode__(self): 474 return unicode(self.login) 475 476 477class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 478 model_logic.ModelWithAttributes): 479 """\ 480 Required: 481 hostname 482 483 optional: 484 locked: if true, host is locked and will not be queued 485 486 Internal: 487 From AbstractHostModel: 488 status: string describing status of host 489 invalid: true if the host has been deleted 490 protection: indicates what can be done to this host during repair 491 lock_time: DateTime at which the host was locked 492 dirty: true if the host has been used without being rebooted 493 Local: 494 locked_by: user that locked the host, or null if the host is unlocked 495 """ 496 497 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 498 'hostattribute_set', 499 'labels', 500 'shard']) 501 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) 502 503 504 def custom_deserialize_relation(self, link, data): 505 assert link == 'shard', 'Link %s should not be deserialized' % link 506 self.shard = Shard.deserialize(data) 507 508 509 # Note: Only specify foreign keys here, specify host columns in 510 # rdb_model_extensions instead. 511 Protection = host_protections.Protection 512 labels = dbmodels.ManyToManyField(Label, blank=True, 513 db_table='afe_hosts_labels') 514 static_labels = dbmodels.ManyToManyField( 515 StaticLabel, blank=True, db_table='afe_static_hosts_labels') 516 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 517 name_field = 'hostname' 518 objects = model_logic.ModelWithInvalidManager() 519 valid_objects = model_logic.ValidObjectsManager() 520 leased_objects = model_logic.LeasedHostManager() 521 522 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 523 524 def __init__(self, *args, **kwargs): 525 super(Host, self).__init__(*args, **kwargs) 526 self._record_attributes(['status']) 527 528 529 @classmethod 530 def classify_labels(cls, label_names): 531 """Split labels to static & non-static. 532 533 @label_names: a list of labels (string). 534 535 @returns: a list of StaticLabel objects & a list of 536 (non-static) Label objects. 537 """ 538 if not label_names: 539 return [], [] 540 541 labels = Label.objects.filter(name__in=label_names) 542 543 if not RESPECT_STATIC_LABELS: 544 return [], labels 545 546 return cls.classify_label_objects(labels) 547 548 549 @classmethod 550 def classify_label_objects(cls, label_objects): 551 if not RESPECT_STATIC_LABELS: 552 return [], label_objects 553 554 replaced_labels = ReplacedLabel.objects.filter(label__in=label_objects) 555 replaced_ids = [l.label.id for l in replaced_labels] 556 non_static_labels = [ 557 l for l in label_objects if not l.id in replaced_ids] 558 static_label_names = [ 559 l.name for l in label_objects if l.id in replaced_ids] 560 static_labels = StaticLabel.objects.filter(name__in=static_label_names) 561 return static_labels, non_static_labels 562 563 564 @classmethod 565 def get_hosts_with_labels(cls, label_names, initial_query): 566 """Get hosts by label filters. 567 568 @param label_names: label (string) lists for fetching hosts. 569 @param initial_query: a model_logic.QuerySet of Host object, e.g. 570 571 Host.objects.all(), Host.valid_objects.all(). 572 573 This initial_query cannot be a sliced QuerySet, e.g. 574 575 Host.objects.all().filter(query_limit=10) 576 """ 577 if not label_names: 578 return initial_query 579 580 static_labels, non_static_labels = cls.classify_labels(label_names) 581 if len(static_labels) + len(non_static_labels) != len(label_names): 582 # Some labels don't exist in afe db, which means no hosts 583 # should be matched. 584 return set() 585 586 for l in static_labels: 587 initial_query = initial_query.filter(static_labels=l) 588 589 for l in non_static_labels: 590 initial_query = initial_query.filter(labels=l) 591 592 return initial_query 593 594 595 @classmethod 596 def get_hosts_with_label_ids(cls, label_ids, initial_query): 597 """Get hosts by label_id filters. 598 599 @param label_ids: label id (int) lists for fetching hosts. 600 @param initial_query: a list of Host object, e.g. 601 [<Host: 100.107.151.253>, <Host: 100.107.151.251>, ...] 602 """ 603 labels = Label.objects.filter(id__in=label_ids) 604 label_names = [l.name for l in labels] 605 return cls.get_hosts_with_labels(label_names, initial_query) 606 607 608 @staticmethod 609 def create_one_time_host(hostname): 610 """Creates a one-time host. 611 612 @param hostname: The name for the host. 613 """ 614 query = Host.objects.filter(hostname=hostname) 615 if query.count() == 0: 616 host = Host(hostname=hostname, invalid=True) 617 host.do_validate() 618 else: 619 host = query[0] 620 if not host.invalid: 621 raise model_logic.ValidationError({ 622 'hostname' : '%s already exists in the autotest DB. ' 623 'Select it rather than entering it as a one time ' 624 'host.' % hostname 625 }) 626 host.protection = host_protections.Protection.DO_NOT_REPAIR 627 host.locked = False 628 host.save() 629 host.clean_object() 630 return host 631 632 633 @classmethod 634 def _assign_to_shard_nothing_helper(cls): 635 """Does nothing. 636 637 This method is called in the middle of assign_to_shard, and does 638 nothing. It exists to allow integration tests to simulate a race 639 condition.""" 640 641 642 @classmethod 643 def assign_to_shard(cls, shard, known_ids): 644 """Assigns hosts to a shard. 645 646 For all labels that have been assigned to a shard, all hosts that 647 have at least one of the shard's labels are assigned to the shard. 648 Hosts that are assigned to the shard but aren't already present on the 649 shard are returned. 650 651 Any boards that are in |known_ids| but that do not belong to the shard 652 are incorrect ids, which are also returned so that the shard can remove 653 them locally. 654 655 Board to shard mapping is many-to-one. Many different boards can be 656 hosted in a shard. However, DUTs of a single board cannot be distributed 657 into more than one shard. 658 659 @param shard: The shard object to assign labels/hosts for. 660 @param known_ids: List of all host-ids the shard already knows. 661 This is used to figure out which hosts should be sent 662 to the shard. If shard_ids were used instead, hosts 663 would only be transferred once, even if the client 664 failed persisting them. 665 The number of hosts usually lies in O(100), so the 666 overhead is acceptable. 667 668 @returns a tuple of (hosts objects that should be sent to the shard, 669 incorrect host ids that should not belong to] 670 shard) 671 """ 672 # Disclaimer: concurrent heartbeats should theoretically not occur in 673 # the current setup. As they may be introduced in the near future, 674 # this comment will be left here. 675 676 # Sending stuff twice is acceptable, but forgetting something isn't. 677 # Detecting duplicates on the client is easy, but here it's harder. The 678 # following options were considered: 679 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 680 # than select returned, as concurrently more hosts might have been 681 # inserted 682 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 683 # hosts for the shard, this is overhead 684 # - SELECT and then UPDATE only selected without requerying afterwards: 685 # returns the old state of the records. 686 new_hosts = [] 687 688 possible_new_host_ids = set(Host.objects.filter( 689 labels__in=shard.labels.all(), 690 leased=False 691 ).exclude( 692 id__in=known_ids, 693 ).values_list('pk', flat=True)) 694 695 # No-op in production, used to simulate race condition in tests. 696 cls._assign_to_shard_nothing_helper() 697 698 if possible_new_host_ids: 699 Host.objects.filter( 700 pk__in=possible_new_host_ids, 701 labels__in=shard.labels.all(), 702 leased=False 703 ).update(shard=shard) 704 new_hosts = list(Host.objects.filter( 705 pk__in=possible_new_host_ids, 706 shard=shard 707 ).all()) 708 709 invalid_host_ids = list(Host.objects.filter( 710 id__in=known_ids 711 ).exclude( 712 shard=shard 713 ).values_list('pk', flat=True)) 714 715 return new_hosts, invalid_host_ids 716 717 def resurrect_object(self, old_object): 718 super(Host, self).resurrect_object(old_object) 719 # invalid hosts can be in use by the scheduler (as one-time hosts), so 720 # don't change the status 721 self.status = old_object.status 722 723 724 def clean_object(self): 725 self.aclgroup_set.clear() 726 self.labels.clear() 727 self.static_labels.clear() 728 729 730 def save(self, *args, **kwargs): 731 # extra spaces in the hostname can be a sneaky source of errors 732 self.hostname = self.hostname.strip() 733 # is this a new object being saved for the first time? 734 first_time = (self.id is None) 735 if not first_time: 736 AclGroup.check_for_acl_violation_hosts([self]) 737 # If locked is changed, send its status and user made the change to 738 # metaDB. Locks are important in host history because if a device is 739 # locked then we don't really care what state it is in. 740 if self.locked and not self.locked_by: 741 self.locked_by = User.current_user() 742 if not self.lock_time: 743 self.lock_time = datetime.now() 744 self.dirty = True 745 elif not self.locked and self.locked_by: 746 self.locked_by = None 747 self.lock_time = None 748 super(Host, self).save(*args, **kwargs) 749 if first_time: 750 everyone = AclGroup.objects.get(name='Everyone') 751 everyone.hosts.add(self) 752 # remove attributes that may have lingered from an old host and 753 # should not be associated with a new host 754 for host_attribute in self.hostattribute_set.all(): 755 self.delete_attribute(host_attribute.attribute) 756 self._check_for_updated_attributes() 757 758 759 def delete(self): 760 AclGroup.check_for_acl_violation_hosts([self]) 761 logging.info('Preconditions for deleting host %s...', self.hostname) 762 for queue_entry in self.hostqueueentry_set.all(): 763 logging.info(' Deleting and aborting hqe %s...', queue_entry) 764 queue_entry.deleted = True 765 queue_entry.abort() 766 logging.info(' ... done with hqe %s.', queue_entry) 767 for host_attribute in self.hostattribute_set.all(): 768 logging.info(' Deleting attribute %s...', host_attribute) 769 self.delete_attribute(host_attribute.attribute) 770 logging.info(' ... done with attribute %s.', host_attribute) 771 logging.info('... preconditions done for host %s.', self.hostname) 772 logging.info('Deleting host %s...', self.hostname) 773 super(Host, self).delete() 774 logging.info('... done.') 775 776 777 def on_attribute_changed(self, attribute, old_value): 778 assert attribute == 'status' 779 logging.info('%s -> %s', self.hostname, self.status) 780 781 782 def enqueue_job(self, job, is_template=False): 783 """Enqueue a job on this host. 784 785 @param job: A job to enqueue. 786 @param is_template: Whther the status should be "Template". 787 """ 788 queue_entry = HostQueueEntry.create(host=self, job=job, 789 is_template=is_template) 790 # allow recovery of dead hosts from the frontend 791 if not self.active_queue_entry() and self.is_dead(): 792 self.status = Host.Status.READY 793 self.save() 794 queue_entry.save() 795 796 block = IneligibleHostQueue(job=job, host=self) 797 block.save() 798 799 800 def platform(self): 801 """The platform of the host.""" 802 # TODO(showard): slighly hacky? 803 platforms = self.labels.filter(platform=True) 804 if len(platforms) == 0: 805 return None 806 return platforms[0] 807 platform.short_description = 'Platform' 808 809 810 @classmethod 811 def check_no_platform(cls, hosts): 812 """Verify the specified hosts have no associated platforms. 813 814 @param cls: Implicit class object. 815 @param hosts: The hosts to verify. 816 @raises model_logic.ValidationError if any hosts already have a 817 platform. 818 """ 819 Host.objects.populate_relationships(hosts, Label, 'label_list') 820 Host.objects.populate_relationships(hosts, StaticLabel, 821 'staticlabel_list') 822 errors = [] 823 for host in hosts: 824 platforms = [label.name for label in host.label_list 825 if label.platform] 826 if RESPECT_STATIC_LABELS: 827 platforms += [label.name for label in host.staticlabel_list 828 if label.platform] 829 830 if platforms: 831 # do a join, just in case this host has multiple platforms, 832 # we'll be able to see it 833 errors.append('Host %s already has a platform: %s' % ( 834 host.hostname, ', '.join(platforms))) 835 if errors: 836 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 837 838 839 @classmethod 840 def check_board_labels_allowed(cls, hosts, new_labels=[]): 841 """Verify the specified hosts have valid board labels and the given 842 new board labels can be added. 843 844 @param cls: Implicit class object. 845 @param hosts: The hosts to verify. 846 @param new_labels: A list of labels to be added to the hosts. 847 848 @raises model_logic.ValidationError if any host has invalid board labels 849 or the given board labels cannot be added to the hsots. 850 """ 851 Host.objects.populate_relationships(hosts, Label, 'label_list') 852 Host.objects.populate_relationships(hosts, StaticLabel, 853 'staticlabel_list') 854 errors = [] 855 for host in hosts: 856 boards = [label.name for label in host.label_list 857 if label.name.startswith('board:')] 858 if RESPECT_STATIC_LABELS: 859 boards += [label.name for label in host.staticlabel_list 860 if label.name.startswith('board:')] 861 862 new_boards = [name for name in new_labels 863 if name.startswith('board:')] 864 if len(boards) + len(new_boards) > 1: 865 # do a join, just in case this host has multiple boards, 866 # we'll be able to see it 867 errors.append('Host %s already has board labels: %s' % ( 868 host.hostname, ', '.join(boards))) 869 if errors: 870 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 871 872 873 def is_dead(self): 874 """Returns whether the host is dead (has status repair failed).""" 875 return self.status == Host.Status.REPAIR_FAILED 876 877 878 def active_queue_entry(self): 879 """Returns the active queue entry for this host, or None if none.""" 880 active = list(self.hostqueueentry_set.filter(active=True)) 881 if not active: 882 return None 883 assert len(active) == 1, ('More than one active entry for ' 884 'host ' + self.hostname) 885 return active[0] 886 887 888 def _get_attribute_model_and_args(self, attribute): 889 return HostAttribute, dict(host=self, attribute=attribute) 890 891 892 def _get_static_attribute_model_and_args(self, attribute): 893 return StaticHostAttribute, dict(host=self, attribute=attribute) 894 895 896 def _is_replaced_by_static_attribute(self, attribute): 897 if RESPECT_STATIC_ATTRIBUTES: 898 model, args = self._get_static_attribute_model_and_args(attribute) 899 try: 900 static_attr = model.objects.get(**args) 901 return True 902 except StaticHostAttribute.DoesNotExist: 903 return False 904 905 return False 906 907 908 @classmethod 909 def get_attribute_model(cls): 910 """Return the attribute model. 911 912 Override method in parent class. See ModelExtensions for details. 913 @returns: The attribute model of Host. 914 """ 915 return HostAttribute 916 917 918 class Meta: 919 """Metadata for the Host class.""" 920 db_table = 'afe_hosts' 921 922 923 def __unicode__(self): 924 return unicode(self.hostname) 925 926 927class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): 928 """Arbitrary keyvals associated with hosts.""" 929 930 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 931 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 932 host = dbmodels.ForeignKey(Host) 933 attribute = dbmodels.CharField(max_length=90) 934 value = dbmodels.CharField(max_length=300) 935 936 objects = model_logic.ExtendedManager() 937 938 class Meta: 939 """Metadata for the HostAttribute class.""" 940 db_table = 'afe_host_attributes' 941 942 943 @classmethod 944 def get_record(cls, data): 945 """Check the database for an identical record. 946 947 Use host_id and attribute to search for a existing record. 948 949 @raises: DoesNotExist, if no record found 950 @raises: MultipleObjectsReturned if multiple records found. 951 """ 952 # TODO(fdeng): We should use host_id and attribute together as 953 # a primary key in the db. 954 return cls.objects.get(host_id=data['host_id'], 955 attribute=data['attribute']) 956 957 958 @classmethod 959 def deserialize(cls, data): 960 """Override deserialize in parent class. 961 962 Do not deserialize id as id is not kept consistent on main and shards. 963 964 @param data: A dictionary of data to deserialize. 965 966 @returns: A HostAttribute object. 967 """ 968 if data: 969 data.pop('id') 970 return super(HostAttribute, cls).deserialize(data) 971 972 973class StaticHostAttribute(dbmodels.Model, model_logic.ModelExtensions): 974 """Static arbitrary keyvals associated with hosts.""" 975 976 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 977 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 978 host = dbmodels.ForeignKey(Host) 979 attribute = dbmodels.CharField(max_length=90) 980 value = dbmodels.CharField(max_length=300) 981 982 objects = model_logic.ExtendedManager() 983 984 class Meta: 985 """Metadata for the StaticHostAttribute class.""" 986 db_table = 'afe_static_host_attributes' 987 988 989 @classmethod 990 def get_record(cls, data): 991 """Check the database for an identical record. 992 993 Use host_id and attribute to search for a existing record. 994 995 @raises: DoesNotExist, if no record found 996 @raises: MultipleObjectsReturned if multiple records found. 997 """ 998 return cls.objects.get(host_id=data['host_id'], 999 attribute=data['attribute']) 1000 1001 1002 @classmethod 1003 def deserialize(cls, data): 1004 """Override deserialize in parent class. 1005 1006 Do not deserialize id as id is not kept consistent on main and shards. 1007 1008 @param data: A dictionary of data to deserialize. 1009 1010 @returns: A StaticHostAttribute object. 1011 """ 1012 if data: 1013 data.pop('id') 1014 return super(StaticHostAttribute, cls).deserialize(data) 1015 1016 1017class Test(dbmodels.Model, model_logic.ModelExtensions): 1018 """\ 1019 Required: 1020 author: author name 1021 description: description of the test 1022 name: test name 1023 time: short, medium, long 1024 test_class: This describes the class for your the test belongs in. 1025 test_category: This describes the category for your tests 1026 test_type: Client or Server 1027 path: path to pass to run_test() 1028 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 1029 async job. If it's >1 it's sync job for that number of machines 1030 i.e. if sync_count = 2 it is a sync job that requires two 1031 machines. 1032 Optional: 1033 dependencies: What the test requires to run. Comma deliminated list 1034 dependency_labels: many-to-many relationship with labels corresponding to 1035 test dependencies. 1036 experimental: If this is set to True production servers will ignore the test 1037 run_verify: Whether or not the scheduler should run the verify stage 1038 run_reset: Whether or not the scheduler should run the reset stage 1039 test_retry: Number of times to retry test if the test did not complete 1040 successfully. (optional, default: 0) 1041 """ 1042 TestTime = autotest_enum.AutotestEnum('SHORT', 'MEDIUM', 'LONG', 1043 start_value=1) 1044 1045 name = dbmodels.CharField(max_length=255, unique=True) 1046 author = dbmodels.CharField(max_length=255) 1047 test_class = dbmodels.CharField(max_length=255) 1048 test_category = dbmodels.CharField(max_length=255) 1049 dependencies = dbmodels.CharField(max_length=255, blank=True) 1050 description = dbmodels.TextField(blank=True) 1051 experimental = dbmodels.BooleanField(default=True) 1052 run_verify = dbmodels.BooleanField(default=False) 1053 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 1054 default=TestTime.MEDIUM) 1055 test_type = dbmodels.SmallIntegerField( 1056 choices=control_data.CONTROL_TYPE.choices()) 1057 sync_count = dbmodels.IntegerField(default=1) 1058 path = dbmodels.CharField(max_length=255, unique=True) 1059 test_retry = dbmodels.IntegerField(blank=True, default=0) 1060 run_reset = dbmodels.BooleanField(default=True) 1061 1062 dependency_labels = ( 1063 dbmodels.ManyToManyField(Label, blank=True, 1064 db_table='afe_autotests_dependency_labels')) 1065 name_field = 'name' 1066 objects = model_logic.ExtendedManager() 1067 1068 1069 def admin_description(self): 1070 """Returns a string representing the admin description.""" 1071 escaped_description = saxutils.escape(self.description) 1072 return '<span style="white-space:pre">%s</span>' % escaped_description 1073 admin_description.allow_tags = True 1074 admin_description.short_description = 'Description' 1075 1076 1077 class Meta: 1078 """Metadata for class Test.""" 1079 db_table = 'afe_autotests' 1080 1081 def __unicode__(self): 1082 return unicode(self.name) 1083 1084 1085class TestParameter(dbmodels.Model): 1086 """ 1087 A declared parameter of a test 1088 """ 1089 test = dbmodels.ForeignKey(Test) 1090 name = dbmodels.CharField(max_length=255) 1091 1092 class Meta: 1093 """Metadata for class TestParameter.""" 1094 db_table = 'afe_test_parameters' 1095 unique_together = ('test', 'name') 1096 1097 def __unicode__(self): 1098 return u'%s (%s)' % (self.name, self.test.name) 1099 1100 1101class Profiler(dbmodels.Model, model_logic.ModelExtensions): 1102 """\ 1103 Required: 1104 name: profiler name 1105 test_type: Client or Server 1106 1107 Optional: 1108 description: arbirary text description 1109 """ 1110 name = dbmodels.CharField(max_length=255, unique=True) 1111 description = dbmodels.TextField(blank=True) 1112 1113 name_field = 'name' 1114 objects = model_logic.ExtendedManager() 1115 1116 1117 class Meta: 1118 """Metadata for class Profiler.""" 1119 db_table = 'afe_profilers' 1120 1121 def __unicode__(self): 1122 return unicode(self.name) 1123 1124 1125class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 1126 """\ 1127 Required: 1128 name: name of ACL group 1129 1130 Optional: 1131 description: arbitrary description of group 1132 """ 1133 1134 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 1135 1136 name = dbmodels.CharField(max_length=255, unique=True) 1137 description = dbmodels.CharField(max_length=255, blank=True) 1138 users = dbmodels.ManyToManyField(User, blank=False, 1139 db_table='afe_acl_groups_users') 1140 hosts = dbmodels.ManyToManyField(Host, blank=True, 1141 db_table='afe_acl_groups_hosts') 1142 1143 name_field = 'name' 1144 objects = model_logic.ExtendedManager() 1145 1146 @staticmethod 1147 def check_for_acl_violation_hosts(hosts): 1148 """Verify the current user has access to the specified hosts. 1149 1150 @param hosts: The hosts to verify against. 1151 @raises AclAccessViolation if the current user doesn't have access 1152 to a host. 1153 """ 1154 user = User.current_user() 1155 if user.is_superuser(): 1156 return 1157 accessible_host_ids = set( 1158 host.id for host in Host.objects.filter(aclgroup__users=user)) 1159 for host in hosts: 1160 # Check if the user has access to this host, 1161 # but only if it is not a metahost or a one-time-host. 1162 no_access = (isinstance(host, Host) 1163 and not host.invalid 1164 and int(host.id) not in accessible_host_ids) 1165 if no_access: 1166 raise AclAccessViolation("%s does not have access to %s" % 1167 (str(user), str(host))) 1168 1169 1170 @staticmethod 1171 def check_abort_permissions(queue_entries): 1172 """Look for queue entries that aren't abortable by the current user. 1173 1174 An entry is not abortable if: 1175 * the job isn't owned by this user, and 1176 * the machine isn't ACL-accessible, or 1177 * the machine is in the "Everyone" ACL 1178 1179 @param queue_entries: The queue entries to check. 1180 @raises AclAccessViolation if a queue entry is not abortable by the 1181 current user. 1182 """ 1183 user = User.current_user() 1184 if user.is_superuser(): 1185 return 1186 not_owned = queue_entries.exclude(job__owner=user.login) 1187 # I do this using ID sets instead of just Django filters because 1188 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 1189 # 1.0. 1190 # TODO: Use Django filters, now that we're using 1.0. 1191 accessible_ids = set( 1192 entry.id for entry 1193 in not_owned.filter(host__aclgroup__users__login=user.login)) 1194 public_ids = set(entry.id for entry 1195 in not_owned.filter(host__aclgroup__name='Everyone')) 1196 cannot_abort = [entry for entry in not_owned.select_related() 1197 if entry.id not in accessible_ids 1198 or entry.id in public_ids] 1199 if len(cannot_abort) == 0: 1200 return 1201 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 1202 entry.host_or_metahost_name()) 1203 for entry in cannot_abort) 1204 raise AclAccessViolation('You cannot abort the following job entries: ' 1205 + entry_names) 1206 1207 1208 def check_for_acl_violation_acl_group(self): 1209 """Verifies the current user has acces to this ACL group. 1210 1211 @raises AclAccessViolation if the current user doesn't have access to 1212 this ACL group. 1213 """ 1214 user = User.current_user() 1215 if user.is_superuser(): 1216 return 1217 if self.name == 'Everyone': 1218 raise AclAccessViolation("You cannot modify 'Everyone'!") 1219 if not user in self.users.all(): 1220 raise AclAccessViolation("You do not have access to %s" 1221 % self.name) 1222 1223 @staticmethod 1224 def on_host_membership_change(): 1225 """Invoked when host membership changes.""" 1226 everyone = AclGroup.objects.get(name='Everyone') 1227 1228 # find hosts that aren't in any ACL group and add them to Everyone 1229 # TODO(showard): this is a bit of a hack, since the fact that this query 1230 # works is kind of a coincidence of Django internals. This trick 1231 # doesn't work in general (on all foreign key relationships). I'll 1232 # replace it with a better technique when the need arises. 1233 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 1234 everyone.hosts.add(*orphaned_hosts.distinct()) 1235 1236 # find hosts in both Everyone and another ACL group, and remove them 1237 # from Everyone 1238 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 1239 acled_hosts = set() 1240 for host in hosts_in_everyone: 1241 # Has an ACL group other than Everyone 1242 if host.aclgroup_set.count() > 1: 1243 acled_hosts.add(host) 1244 everyone.hosts.remove(*acled_hosts) 1245 1246 1247 def delete(self): 1248 if (self.name == 'Everyone'): 1249 raise AclAccessViolation("You cannot delete 'Everyone'!") 1250 self.check_for_acl_violation_acl_group() 1251 super(AclGroup, self).delete() 1252 self.on_host_membership_change() 1253 1254 1255 def add_current_user_if_empty(self): 1256 """Adds the current user if the set of users is empty.""" 1257 if not self.users.count(): 1258 self.users.add(User.current_user()) 1259 1260 1261 def perform_after_save(self, change): 1262 """Called after a save. 1263 1264 @param change: Whether there was a change. 1265 """ 1266 if not change: 1267 self.users.add(User.current_user()) 1268 self.add_current_user_if_empty() 1269 self.on_host_membership_change() 1270 1271 1272 def save(self, *args, **kwargs): 1273 change = bool(self.id) 1274 if change: 1275 # Check the original object for an ACL violation 1276 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 1277 super(AclGroup, self).save(*args, **kwargs) 1278 self.perform_after_save(change) 1279 1280 1281 class Meta: 1282 """Metadata for class AclGroup.""" 1283 db_table = 'afe_acl_groups' 1284 1285 def __unicode__(self): 1286 return unicode(self.name) 1287 1288 1289class ParameterizedJob(dbmodels.Model): 1290 """ 1291 Auxiliary configuration for a parameterized job. 1292 1293 This class is obsolete, and ought to be dead. Due to a series of 1294 unfortunate events, it can't be deleted: 1295 * In `class Job` we're required to keep a reference to this class 1296 for the sake of the scheduler unit tests. 1297 * The existence of the reference in `Job` means that certain 1298 methods here will get called from the `get_jobs` RPC. 1299 So, the definitions below seem to be the minimum stub we can support 1300 unless/until we change the database schema. 1301 """ 1302 1303 @classmethod 1304 def smart_get(cls, id_or_name, *args, **kwargs): 1305 """For compatibility with Job.add_object. 1306 1307 @param cls: Implicit class object. 1308 @param id_or_name: The ID or name to get. 1309 @param args: Non-keyword arguments. 1310 @param kwargs: Keyword arguments. 1311 """ 1312 return cls.objects.get(pk=id_or_name) 1313 1314 1315 def job(self): 1316 """Returns the job if it exists, or else None.""" 1317 jobs = self.job_set.all() 1318 assert jobs.count() <= 1 1319 return jobs and jobs[0] or None 1320 1321 1322 class Meta: 1323 """Metadata for class ParameterizedJob.""" 1324 db_table = 'afe_parameterized_jobs' 1325 1326 def __unicode__(self): 1327 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 1328 1329 1330class JobManager(model_logic.ExtendedManager): 1331 'Custom manager to provide efficient status counts querying.' 1332 def get_status_counts(self, job_ids): 1333 """Returns a dict mapping the given job IDs to their status count dicts. 1334 1335 @param job_ids: A list of job IDs. 1336 """ 1337 if not job_ids: 1338 return {} 1339 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1340 cursor = connection.cursor() 1341 cursor.execute(""" 1342 SELECT job_id, status, aborted, complete, COUNT(*) 1343 FROM afe_host_queue_entries 1344 WHERE job_id IN %s 1345 GROUP BY job_id, status, aborted, complete 1346 """ % id_list) 1347 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1348 for job_id, status, aborted, complete, count in cursor.fetchall(): 1349 job_dict = all_job_counts[job_id] 1350 full_status = HostQueueEntry.compute_full_status(status, aborted, 1351 complete) 1352 job_dict.setdefault(full_status, 0) 1353 job_dict[full_status] += count 1354 return all_job_counts 1355 1356 1357class Job(dbmodels.Model, model_logic.ModelExtensions): 1358 """\ 1359 owner: username of job owner 1360 name: job name (does not have to be unique) 1361 priority: Integer priority value. Higher is more important. 1362 control_file: contents of control file 1363 control_type: Client or Server 1364 created_on: date of job creation 1365 submitted_on: date of job submission 1366 synch_count: how many hosts should be used per autoserv execution 1367 run_verify: Whether or not to run the verify phase 1368 run_reset: Whether or not to run the reset phase 1369 timeout: DEPRECATED - hours from queuing time until job times out 1370 timeout_mins: minutes from job queuing time until the job times out 1371 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1372 times out 1373 max_runtime_mins: minutes from job starting time until job times out 1374 email_list: list of people to email on completion delimited by any of: 1375 white space, ',', ':', ';' 1376 dependency_labels: many-to-many relationship with labels corresponding to 1377 job dependencies 1378 reboot_before: Never, If dirty, or Always 1379 reboot_after: Never, If all tests passed, or Always 1380 parse_failed_repair: if True, a failed repair launched by this job will have 1381 its results parsed as part of the job. 1382 drone_set: The set of drones to run this job on 1383 parent_job: Parent job (optional) 1384 test_retry: Number of times to retry test if the test did not complete 1385 successfully. (optional, default: 0) 1386 require_ssp: Require server-side packaging unless require_ssp is set to 1387 False. (optional, default: None) 1388 """ 1389 1390 # TODO: Investigate, if jobkeyval_set is really needed. 1391 # dynamic_suite will write them into an attached file for the drone, but 1392 # it doesn't seem like they are actually used. If they aren't used, remove 1393 # jobkeyval_set here. 1394 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1395 'hostqueueentry_set', 1396 'jobkeyval_set', 1397 'shard']) 1398 1399 EXCLUDE_KNOWN_JOBS_CLAUSE = ''' 1400 AND NOT (afe_host_queue_entries.aborted = 0 1401 AND afe_jobs.id IN (%(known_ids)s)) 1402 ''' 1403 1404 EXCLUDE_OLD_JOBS_CLAUSE = 'AND (afe_jobs.created_on > "%(cutoff)s")' 1405 1406 SQL_SHARD_JOBS = ''' 1407 SELECT DISTINCT(afe_jobs.id) FROM afe_jobs 1408 INNER JOIN afe_host_queue_entries 1409 ON (afe_jobs.id = afe_host_queue_entries.job_id) 1410 LEFT OUTER JOIN afe_jobs_dependency_labels 1411 ON (afe_jobs.id = afe_jobs_dependency_labels.job_id) 1412 JOIN afe_shards_labels 1413 ON (afe_shards_labels.label_id = afe_jobs_dependency_labels.label_id 1414 OR afe_shards_labels.label_id = afe_host_queue_entries.meta_host) 1415 WHERE (afe_shards_labels.shard_id = %(shard_id)s 1416 AND afe_host_queue_entries.complete != 1 1417 AND afe_host_queue_entries.active != 1 1418 %(exclude_known_jobs)s 1419 %(exclude_old_jobs)s) 1420 ''' 1421 1422 # Jobs can be created with assigned hosts and have no dependency 1423 # labels nor meta_host. 1424 # We are looking for: 1425 # - a job whose hqe's meta_host is null 1426 # - a job whose hqe has a host 1427 # - one of the host's labels matches the shard's label. 1428 # Non-aborted known jobs, completed jobs, active jobs, jobs 1429 # without hqe are exluded as we do with SQL_SHARD_JOBS. 1430 SQL_SHARD_JOBS_WITH_HOSTS = ''' 1431 SELECT DISTINCT(afe_jobs.id) FROM afe_jobs 1432 INNER JOIN afe_host_queue_entries 1433 ON (afe_jobs.id = afe_host_queue_entries.job_id) 1434 LEFT OUTER JOIN %(host_label_table)s 1435 ON (afe_host_queue_entries.host_id = %(host_label_table)s.host_id) 1436 WHERE (%(host_label_table)s.%(host_label_column)s IN %(label_ids)s 1437 AND afe_host_queue_entries.complete != 1 1438 AND afe_host_queue_entries.active != 1 1439 AND afe_host_queue_entries.meta_host IS NULL 1440 AND afe_host_queue_entries.host_id IS NOT NULL 1441 %(exclude_known_jobs)s 1442 %(exclude_old_jobs)s) 1443 ''' 1444 1445 # Even if we had filters about complete, active and aborted 1446 # bits in the above two SQLs, there is a chance that 1447 # the result may still contain a job with an hqe with 'complete=1' 1448 # or 'active=1'.' 1449 # This happens when a job has two (or more) hqes and at least 1450 # one hqe has different bits than others. 1451 # We use a second sql to ensure we exclude all un-desired jobs. 1452 SQL_JOBS_TO_EXCLUDE = ''' 1453 SELECT afe_jobs.id FROM afe_jobs 1454 INNER JOIN afe_host_queue_entries 1455 ON (afe_jobs.id = afe_host_queue_entries.job_id) 1456 WHERE (afe_jobs.id in (%(candidates)s) 1457 AND (afe_host_queue_entries.complete=1 1458 OR afe_host_queue_entries.active=1)) 1459 ''' 1460 1461 def _deserialize_relation(self, link, data): 1462 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1463 for obj in data: 1464 obj['job_id'] = self.id 1465 1466 super(Job, self)._deserialize_relation(link, data) 1467 1468 1469 def custom_deserialize_relation(self, link, data): 1470 assert link == 'shard', 'Link %s should not be deserialized' % link 1471 self.shard = Shard.deserialize(data) 1472 1473 1474 def _check_update_from_shard(self, shard, updated_serialized): 1475 # If the job got aborted on the main after the client fetched it 1476 # no shard_id will be set. The shard might still push updates though, 1477 # as the job might complete before the abort bit syncs to the shard. 1478 # Alternative considered: The main scheduler could be changed to not 1479 # set aborted jobs to completed that are sharded out. But that would 1480 # require database queries and seemed more complicated to implement. 1481 # This seems safe to do, as there won't be updates pushed from the wrong 1482 # shards should be powered off and wiped hen they are removed from the 1483 # main. 1484 if self.shard_id and self.shard_id != shard.id: 1485 raise error.IgnorableUnallowedRecordsSentToMain( 1486 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1487 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1488 shard.id)) 1489 1490 1491 RebootBefore = model_attributes.RebootBefore 1492 RebootAfter = model_attributes.RebootAfter 1493 # TIMEOUT is deprecated. 1494 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1495 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1496 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1497 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1498 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1499 # completed. 1500 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1501 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1502 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1503 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1504 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1505 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, default=False) 1506 FETCH_READONLY_JOBS = global_config.global_config.get_config_value( 1507 'AUTOTEST_WEB','readonly_heartbeat', type=bool, default=False) 1508 # TODO(ayatane): Deprecated, not removed due to difficulty untangling imports 1509 SKIP_JOBS_CREATED_BEFORE = 0 1510 1511 1512 1513 owner = dbmodels.CharField(max_length=255) 1514 name = dbmodels.CharField(max_length=255) 1515 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1516 control_file = dbmodels.TextField(null=True, blank=True) 1517 control_type = dbmodels.SmallIntegerField( 1518 choices=control_data.CONTROL_TYPE.choices(), 1519 blank=True, # to allow 0 1520 default=control_data.CONTROL_TYPE.CLIENT) 1521 created_on = dbmodels.DateTimeField() 1522 synch_count = dbmodels.IntegerField(blank=True, default=0) 1523 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1524 run_verify = dbmodels.BooleanField(default=False) 1525 email_list = dbmodels.CharField(max_length=250, blank=True) 1526 dependency_labels = ( 1527 dbmodels.ManyToManyField(Label, blank=True, 1528 db_table='afe_jobs_dependency_labels')) 1529 reboot_before = dbmodels.SmallIntegerField( 1530 choices=model_attributes.RebootBefore.choices(), blank=True, 1531 default=DEFAULT_REBOOT_BEFORE) 1532 reboot_after = dbmodels.SmallIntegerField( 1533 choices=model_attributes.RebootAfter.choices(), blank=True, 1534 default=DEFAULT_REBOOT_AFTER) 1535 parse_failed_repair = dbmodels.BooleanField( 1536 default=DEFAULT_PARSE_FAILED_REPAIR) 1537 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1538 # completed. 1539 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1540 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1541 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1542 1543 # TODO(jrbarnette) We have to keep `parameterized_job` around or it 1544 # breaks the scheduler_models unit tests (and fixing the unit tests 1545 # will break the scheduler, so don't do that). 1546 # 1547 # The ultimate fix is to delete the column from the database table 1548 # at which point, you _must_ delete this. Until you're ready to do 1549 # that, DON'T MUCK WITH IT. 1550 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1551 blank=True) 1552 1553 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1554 1555 test_retry = dbmodels.IntegerField(blank=True, default=0) 1556 1557 run_reset = dbmodels.BooleanField(default=True) 1558 1559 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1560 1561 # If this is None on the main, a shard should be found. 1562 # If this is None on a shard, it should be synced back to the main 1563 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1564 1565 # If this is None, server-side packaging will be used for server side test. 1566 require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True) 1567 1568 # custom manager 1569 objects = JobManager() 1570 1571 1572 @decorators.cached_property 1573 def labels(self): 1574 """All the labels of this job""" 1575 # We need to convert dependency_labels to a list, because all() gives us 1576 # back an iterator, and storing/caching an iterator means we'd only be 1577 # able to read from it once. 1578 return list(self.dependency_labels.all()) 1579 1580 1581 def is_server_job(self): 1582 """Returns whether this job is of type server.""" 1583 return self.control_type == control_data.CONTROL_TYPE.SERVER 1584 1585 1586 @classmethod 1587 def create(cls, owner, options, hosts): 1588 """Creates a job. 1589 1590 The job is created by taking some information (the listed args) and 1591 filling in the rest of the necessary information. 1592 1593 @param cls: Implicit class object. 1594 @param owner: The owner for the job. 1595 @param options: An options object. 1596 @param hosts: The hosts to use. 1597 """ 1598 AclGroup.check_for_acl_violation_hosts(hosts) 1599 1600 control_file = options.get('control_file') 1601 1602 user = User.current_user() 1603 if options.get('reboot_before') is None: 1604 options['reboot_before'] = user.get_reboot_before_display() 1605 if options.get('reboot_after') is None: 1606 options['reboot_after'] = user.get_reboot_after_display() 1607 1608 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1609 1610 if options.get('timeout_mins') is None and options.get('timeout'): 1611 options['timeout_mins'] = options['timeout'] * 60 1612 1613 job = cls.add_object( 1614 owner=owner, 1615 name=options['name'], 1616 priority=options['priority'], 1617 control_file=control_file, 1618 control_type=options['control_type'], 1619 synch_count=options.get('synch_count'), 1620 # timeout needs to be deleted in the future. 1621 timeout=options.get('timeout'), 1622 timeout_mins=options.get('timeout_mins'), 1623 max_runtime_mins=options.get('max_runtime_mins'), 1624 run_verify=options.get('run_verify'), 1625 email_list=options.get('email_list'), 1626 reboot_before=options.get('reboot_before'), 1627 reboot_after=options.get('reboot_after'), 1628 parse_failed_repair=options.get('parse_failed_repair'), 1629 created_on=datetime.now(), 1630 drone_set=drone_set, 1631 parent_job=options.get('parent_job_id'), 1632 test_retry=options.get('test_retry'), 1633 run_reset=options.get('run_reset'), 1634 require_ssp=options.get('require_ssp')) 1635 1636 job.dependency_labels = options['dependencies'] 1637 1638 if options.get('keyvals'): 1639 for key, value in six.iteritems(options['keyvals']): 1640 # None (or NULL) is not acceptable by DB, so change it to an 1641 # empty string in case. 1642 JobKeyval.objects.create(job=job, key=key, 1643 value='' if value is None else value) 1644 1645 return job 1646 1647 1648 @classmethod 1649 def assign_to_shard(cls, shard, known_ids): 1650 """Assigns unassigned jobs to a shard. 1651 1652 For all labels that have been assigned to this shard, all jobs that 1653 have this label are assigned to this shard. 1654 1655 @param shard: The shard to assign jobs to. 1656 @param known_ids: List of all ids of incomplete jobs the shard already 1657 knows about. 1658 1659 @returns The job objects that should be sent to the shard. 1660 """ 1661 with cls._readonly_job_query_context(): 1662 job_ids = cls._get_new_jobs_for_shard(shard, known_ids) 1663 if not job_ids: 1664 return [] 1665 cls._assign_jobs_to_shard(job_ids, shard) 1666 return cls._jobs_with_ids(job_ids) 1667 1668 1669 @classmethod 1670 @contextlib.contextmanager 1671 def _readonly_job_query_context(cls): 1672 #TODO: Get rid of this kludge if/when we update Django to >=1.7 1673 #correct usage would be .raw(..., using='readonly') 1674 old_db = Job.objects._db 1675 try: 1676 if cls.FETCH_READONLY_JOBS: 1677 Job.objects._db = 'readonly' 1678 yield 1679 finally: 1680 Job.objects._db = old_db 1681 1682 1683 @classmethod 1684 def _assign_jobs_to_shard(cls, job_ids, shard): 1685 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1686 1687 1688 @classmethod 1689 def _jobs_with_ids(cls, job_ids): 1690 return list(Job.objects.filter(pk__in=job_ids).all()) 1691 1692 1693 @classmethod 1694 def _get_new_jobs_for_shard(cls, shard, known_ids): 1695 job_ids = cls._get_jobs_without_hosts(shard, known_ids) 1696 job_ids |= cls._get_jobs_with_hosts(shard, known_ids) 1697 if job_ids: 1698 job_ids -= cls._filter_finished_jobs(job_ids) 1699 return job_ids 1700 1701 1702 @classmethod 1703 def _filter_finished_jobs(cls, job_ids): 1704 query = Job.objects.raw( 1705 cls.SQL_JOBS_TO_EXCLUDE % 1706 {'candidates': ','.join([str(i) for i in job_ids])}) 1707 return set([j.id for j in query]) 1708 1709 1710 @classmethod 1711 def _get_jobs_without_hosts(cls, shard, known_ids): 1712 raw_sql = cls.SQL_SHARD_JOBS % { 1713 'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids), 1714 'exclude_old_jobs': cls._exclude_old_jobs_clause(), 1715 'shard_id': shard.id 1716 } 1717 return set([j.id for j in Job.objects.raw(raw_sql)]) 1718 1719 1720 @classmethod 1721 def _get_jobs_with_hosts(cls, shard, known_ids): 1722 job_ids = set([]) 1723 static_labels, non_static_labels = Host.classify_label_objects( 1724 shard.labels.all()) 1725 if static_labels: 1726 label_ids = [str(l.id) for l in static_labels] 1727 query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 1728 'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids), 1729 'exclude_old_jobs': cls._exclude_old_jobs_clause(), 1730 'host_label_table': 'afe_static_hosts_labels', 1731 'host_label_column': 'staticlabel_id', 1732 'label_ids': '(%s)' % ','.join(label_ids)}) 1733 job_ids |= set([j.id for j in query]) 1734 if non_static_labels: 1735 label_ids = [str(l.id) for l in non_static_labels] 1736 query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % { 1737 'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids), 1738 'exclude_old_jobs': cls._exclude_old_jobs_clause(), 1739 'host_label_table': 'afe_hosts_labels', 1740 'host_label_column': 'label_id', 1741 'label_ids': '(%s)' % ','.join(label_ids)}) 1742 job_ids |= set([j.id for j in query]) 1743 return job_ids 1744 1745 1746 @classmethod 1747 def _exclude_known_jobs_clause(cls, known_ids): 1748 if not known_ids: 1749 return '' 1750 return (cls.EXCLUDE_KNOWN_JOBS_CLAUSE % 1751 {'known_ids': ','.join([str(i) for i in known_ids])}) 1752 1753 1754 @classmethod 1755 def _exclude_old_jobs_clause(cls): 1756 """Filter queried jobs to be created within a few hours in the past. 1757 1758 With this clause, any jobs older than a configurable number of hours are 1759 skipped in the jobs query. 1760 The job creation window affects the overall query performance. Longer 1761 creation windows require a range query over more Job table rows using 1762 the created_on column index. c.f. http://crbug.com/966872#c35 1763 """ 1764 if cls.SKIP_JOBS_CREATED_BEFORE <= 0: 1765 return '' 1766 cutoff = datetime.now()- timedelta(hours=cls.SKIP_JOBS_CREATED_BEFORE) 1767 return (cls.EXCLUDE_OLD_JOBS_CLAUSE % 1768 {'cutoff': cutoff.strftime('%Y-%m-%d %H:%M:%S')}) 1769 1770 1771 def queue(self, hosts, is_template=False): 1772 """Enqueue a job on the given hosts. 1773 1774 @param hosts: The hosts to use. 1775 @param is_template: Whether the status should be "Template". 1776 """ 1777 if not hosts: 1778 # hostless job 1779 entry = HostQueueEntry.create(job=self, is_template=is_template) 1780 entry.save() 1781 return 1782 1783 for host in hosts: 1784 host.enqueue_job(self, is_template=is_template) 1785 1786 1787 def user(self): 1788 """Gets the user of this job, or None if it doesn't exist.""" 1789 try: 1790 return User.objects.get(login=self.owner) 1791 except self.DoesNotExist: 1792 return None 1793 1794 1795 def abort(self): 1796 """Aborts this job.""" 1797 for queue_entry in self.hostqueueentry_set.all(): 1798 queue_entry.abort() 1799 1800 1801 def tag(self): 1802 """Returns a string tag for this job.""" 1803 return server_utils.get_job_tag(self.id, self.owner) 1804 1805 1806 def keyval_dict(self): 1807 """Returns all keyvals for this job as a dictionary.""" 1808 return dict((keyval.key, keyval.value) 1809 for keyval in self.jobkeyval_set.all()) 1810 1811 1812 @classmethod 1813 def get_attribute_model(cls): 1814 """Return the attribute model. 1815 1816 Override method in parent class. This class is called when 1817 deserializing the one-to-many relationship betwen Job and JobKeyval. 1818 On deserialization, we will try to clear any existing job keyvals 1819 associated with a job to avoid any inconsistency. 1820 Though Job doesn't implement ModelWithAttribute, we still treat 1821 it as an attribute model for this purpose. 1822 1823 @returns: The attribute model of Job. 1824 """ 1825 return JobKeyval 1826 1827 1828 class Meta: 1829 """Metadata for class Job.""" 1830 db_table = 'afe_jobs' 1831 1832 def __unicode__(self): 1833 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1834 1835 1836class JobHandoff(dbmodels.Model, model_logic.ModelExtensions): 1837 """Jobs that have been handed off to lucifer.""" 1838 1839 job = dbmodels.OneToOneField(Job, on_delete=dbmodels.CASCADE, 1840 primary_key=True) 1841 created = dbmodels.DateTimeField(auto_now_add=True) 1842 completed = dbmodels.BooleanField(default=False) 1843 drone = dbmodels.CharField( 1844 max_length=128, null=True, 1845 help_text=''' 1846The hostname of the drone the job is running on and whose job_aborter 1847should be responsible for aborting the job if the job process dies. 1848NULL means any drone's job_aborter has free reign to abort the job. 1849''') 1850 1851 class Meta: 1852 """Metadata for class Job.""" 1853 db_table = 'afe_job_handoffs' 1854 1855 1856class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1857 """Keyvals associated with jobs""" 1858 1859 SERIALIZATION_LINKS_TO_KEEP = set(['job']) 1860 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 1861 1862 job = dbmodels.ForeignKey(Job) 1863 key = dbmodels.CharField(max_length=90) 1864 value = dbmodels.CharField(max_length=300) 1865 1866 objects = model_logic.ExtendedManager() 1867 1868 1869 @classmethod 1870 def get_record(cls, data): 1871 """Check the database for an identical record. 1872 1873 Use job_id and key to search for a existing record. 1874 1875 @raises: DoesNotExist, if no record found 1876 @raises: MultipleObjectsReturned if multiple records found. 1877 """ 1878 # TODO(fdeng): We should use job_id and key together as 1879 # a primary key in the db. 1880 return cls.objects.get(job_id=data['job_id'], key=data['key']) 1881 1882 1883 @classmethod 1884 def deserialize(cls, data): 1885 """Override deserialize in parent class. 1886 1887 Do not deserialize id as id is not kept consistent on main and shards. 1888 1889 @param data: A dictionary of data to deserialize. 1890 1891 @returns: A JobKeyval object. 1892 """ 1893 if data: 1894 data.pop('id') 1895 return super(JobKeyval, cls).deserialize(data) 1896 1897 1898 class Meta: 1899 """Metadata for class JobKeyval.""" 1900 db_table = 'afe_job_keyvals' 1901 1902 1903class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1904 """Represents an ineligible host queue.""" 1905 job = dbmodels.ForeignKey(Job) 1906 host = dbmodels.ForeignKey(Host) 1907 1908 objects = model_logic.ExtendedManager() 1909 1910 class Meta: 1911 """Metadata for class IneligibleHostQueue.""" 1912 db_table = 'afe_ineligible_host_queues' 1913 1914 1915class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1916 """Represents a host queue entry.""" 1917 1918 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1919 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 1920 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) 1921 1922 1923 def custom_deserialize_relation(self, link, data): 1924 assert link == 'meta_host' 1925 self.meta_host = Label.deserialize(data) 1926 1927 1928 def _check_update_from_shard(self, shard, updated_serialized, 1929 job_ids_sent): 1930 if self.job_id not in job_ids_sent: 1931 raise error.IgnorableUnallowedRecordsSentToMain( 1932 'Sent HostQueueEntry without corresponding ' 1933 'job entry: %s' % updated_serialized) 1934 1935 1936 Status = host_queue_entry_states.Status 1937 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1938 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1939 PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES 1940 IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES 1941 1942 job = dbmodels.ForeignKey(Job) 1943 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1944 status = dbmodels.CharField(max_length=255) 1945 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1946 db_column='meta_host') 1947 active = dbmodels.BooleanField(default=False) 1948 complete = dbmodels.BooleanField(default=False) 1949 deleted = dbmodels.BooleanField(default=False) 1950 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1951 default='') 1952 # If atomic_group is set, this is a virtual HostQueueEntry that will 1953 # be expanded into many actual hosts within the group at schedule time. 1954 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1955 aborted = dbmodels.BooleanField(default=False) 1956 started_on = dbmodels.DateTimeField(null=True, blank=True) 1957 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1958 1959 objects = model_logic.ExtendedManager() 1960 1961 1962 def __init__(self, *args, **kwargs): 1963 super(HostQueueEntry, self).__init__(*args, **kwargs) 1964 self._record_attributes(['status']) 1965 1966 1967 @classmethod 1968 def create(cls, job, host=None, meta_host=None, 1969 is_template=False): 1970 """Creates a new host queue entry. 1971 1972 @param cls: Implicit class object. 1973 @param job: The associated job. 1974 @param host: The associated host. 1975 @param meta_host: The associated meta host. 1976 @param is_template: Whether the status should be "Template". 1977 """ 1978 if is_template: 1979 status = cls.Status.TEMPLATE 1980 else: 1981 status = cls.Status.QUEUED 1982 1983 return cls(job=job, host=host, meta_host=meta_host, status=status) 1984 1985 1986 def save(self, *args, **kwargs): 1987 self._set_active_and_complete() 1988 super(HostQueueEntry, self).save(*args, **kwargs) 1989 self._check_for_updated_attributes() 1990 1991 1992 def execution_path(self): 1993 """ 1994 Path to this entry's results (relative to the base results directory). 1995 """ 1996 return server_utils.get_hqe_exec_path(self.job.tag(), 1997 self.execution_subdir) 1998 1999 2000 def host_or_metahost_name(self): 2001 """Returns the first non-None name found in priority order. 2002 2003 The priority order checked is: (1) host name; (2) meta host name 2004 """ 2005 if self.host: 2006 return self.host.hostname 2007 else: 2008 assert self.meta_host 2009 return self.meta_host.name 2010 2011 2012 def _set_active_and_complete(self): 2013 if self.status in self.ACTIVE_STATUSES: 2014 self.active, self.complete = True, False 2015 elif self.status in self.COMPLETE_STATUSES: 2016 self.active, self.complete = False, True 2017 else: 2018 self.active, self.complete = False, False 2019 2020 2021 def on_attribute_changed(self, attribute, old_value): 2022 assert attribute == 'status' 2023 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 2024 self.status) 2025 2026 2027 def is_meta_host_entry(self): 2028 'True if this is a entry has a meta_host instead of a host.' 2029 return self.host is None and self.meta_host is not None 2030 2031 2032 # This code is shared between rpc_interface and models.HostQueueEntry. 2033 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 2034 # a class method was the best way to refactor it. Attempting to put it in 2035 # rpc_utils or a new utils module failed as that would require us to import 2036 # models.py but to call it from here we would have to import the utils.py 2037 # thus creating a cycle. 2038 @classmethod 2039 def abort_host_queue_entries(cls, host_queue_entries): 2040 """Aborts a collection of host_queue_entries. 2041 2042 Abort these host queue entry and all host queue entries of jobs created 2043 by them. 2044 2045 @param host_queue_entries: List of host queue entries we want to abort. 2046 """ 2047 # This isn't completely immune to race conditions since it's not atomic, 2048 # but it should be safe given the scheduler's behavior. 2049 2050 # TODO(milleral): crbug.com/230100 2051 # The |abort_host_queue_entries| rpc does nearly exactly this, 2052 # however, trying to re-use the code generates some horrible 2053 # circular import error. I'd be nice to refactor things around 2054 # sometime so the code could be reused. 2055 2056 # Fixpoint algorithm to find the whole tree of HQEs to abort to 2057 # minimize the total number of database queries: 2058 children = set() 2059 new_children = set(host_queue_entries) 2060 while new_children: 2061 children.update(new_children) 2062 new_child_ids = [hqe.job_id for hqe in new_children] 2063 new_children = HostQueueEntry.objects.filter( 2064 job__parent_job__in=new_child_ids, 2065 complete=False, aborted=False).all() 2066 # To handle circular parental relationships 2067 new_children = set(new_children) - children 2068 2069 # Associate a user with the host queue entries that we're about 2070 # to abort so that we can look up who to blame for the aborts. 2071 child_ids = [hqe.id for hqe in children] 2072 # Get a list of hqe ids that already exists, so we can exclude them when 2073 # we do bulk_create later to avoid IntegrityError. 2074 existing_hqe_ids = set(AbortedHostQueueEntry.objects. 2075 filter(queue_entry_id__in=child_ids). 2076 values_list('queue_entry_id', flat=True)) 2077 now = datetime.now() 2078 user = User.current_user() 2079 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 2080 aborted_by=user, aborted_on=now) for hqe in children 2081 if hqe.id not in existing_hqe_ids] 2082 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 2083 # Bulk update all of the HQEs to set the abort bit. 2084 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 2085 2086 2087 def abort(self): 2088 """ Aborts this host queue entry. 2089 2090 Abort this host queue entry and all host queue entries of jobs created by 2091 this one. 2092 2093 """ 2094 if not self.complete and not self.aborted: 2095 HostQueueEntry.abort_host_queue_entries([self]) 2096 2097 2098 @classmethod 2099 def compute_full_status(cls, status, aborted, complete): 2100 """Returns a modified status msg if the host queue entry was aborted. 2101 2102 @param cls: Implicit class object. 2103 @param status: The original status message. 2104 @param aborted: Whether the host queue entry was aborted. 2105 @param complete: Whether the host queue entry was completed. 2106 """ 2107 if aborted and not complete: 2108 return 'Aborted (%s)' % status 2109 return status 2110 2111 2112 def full_status(self): 2113 """Returns the full status of this host queue entry, as a string.""" 2114 return self.compute_full_status(self.status, self.aborted, 2115 self.complete) 2116 2117 2118 def _postprocess_object_dict(self, object_dict): 2119 object_dict['full_status'] = self.full_status() 2120 2121 2122 class Meta: 2123 """Metadata for class HostQueueEntry.""" 2124 db_table = 'afe_host_queue_entries' 2125 2126 2127 2128 def __unicode__(self): 2129 hostname = None 2130 if self.host: 2131 hostname = self.host.hostname 2132 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 2133 2134 2135class HostQueueEntryStartTimes(dbmodels.Model): 2136 """An auxilary table to HostQueueEntry to index by start time.""" 2137 insert_time = dbmodels.DateTimeField() 2138 highest_hqe_id = dbmodels.IntegerField() 2139 2140 class Meta: 2141 """Metadata for class HostQueueEntryStartTimes.""" 2142 db_table = 'afe_host_queue_entry_start_times' 2143 2144 2145class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 2146 """Represents an aborted host queue entry.""" 2147 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 2148 aborted_by = dbmodels.ForeignKey(User) 2149 aborted_on = dbmodels.DateTimeField() 2150 2151 objects = model_logic.ExtendedManager() 2152 2153 2154 def save(self, *args, **kwargs): 2155 self.aborted_on = datetime.now() 2156 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 2157 2158 class Meta: 2159 """Metadata for class AbortedHostQueueEntry.""" 2160 db_table = 'afe_aborted_host_queue_entries' 2161 2162 2163class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 2164 """\ 2165 Tasks to run on hosts at the next time they are in the Ready state. Use this 2166 for high-priority tasks, such as forced repair or forced reinstall. 2167 2168 host: host to run this task on 2169 task: special task to run 2170 time_requested: date and time the request for this task was made 2171 is_active: task is currently running 2172 is_complete: task has finished running 2173 is_aborted: task was aborted 2174 time_started: date and time the task started 2175 time_finished: date and time the task finished 2176 queue_entry: Host queue entry waiting on this task (or None, if task was not 2177 started in preparation of a job) 2178 """ 2179 Task = autotest_enum.AutotestEnum('Verify', 'Cleanup', 'Repair', 'Reset', 2180 'Provision', string_values=True) 2181 2182 host = dbmodels.ForeignKey(Host, blank=False, null=False) 2183 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 2184 blank=False, null=False) 2185 requested_by = dbmodels.ForeignKey(User) 2186 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 2187 null=False) 2188 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 2189 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 2190 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 2191 time_started = dbmodels.DateTimeField(null=True, blank=True) 2192 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 2193 success = dbmodels.BooleanField(default=False, blank=False, null=False) 2194 time_finished = dbmodels.DateTimeField(null=True, blank=True) 2195 2196 objects = model_logic.ExtendedManager() 2197 2198 2199 def save(self, **kwargs): 2200 if self.queue_entry: 2201 self.requested_by = User.objects.get( 2202 login=self.queue_entry.job.owner) 2203 super(SpecialTask, self).save(**kwargs) 2204 2205 2206 def execution_path(self): 2207 """Returns the execution path for a special task.""" 2208 return server_utils.get_special_task_exec_path( 2209 self.host.hostname, self.id, self.task, self.time_requested) 2210 2211 2212 # property to emulate HostQueueEntry.status 2213 @property 2214 def status(self): 2215 """Returns a host queue entry status appropriate for a speical task.""" 2216 return server_utils.get_special_task_status( 2217 self.is_complete, self.success, self.is_active) 2218 2219 2220 # property to emulate HostQueueEntry.started_on 2221 @property 2222 def started_on(self): 2223 """Returns the time at which this special task started.""" 2224 return self.time_started 2225 2226 2227 @classmethod 2228 def schedule_special_task(cls, host, task): 2229 """Schedules a special task on a host if not already scheduled. 2230 2231 @param cls: Implicit class object. 2232 @param host: The host to use. 2233 @param task: The task to schedule. 2234 """ 2235 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 2236 is_active=False, 2237 is_complete=False) 2238 if existing_tasks: 2239 return existing_tasks[0] 2240 2241 special_task = SpecialTask(host=host, task=task, 2242 requested_by=User.current_user()) 2243 special_task.save() 2244 return special_task 2245 2246 2247 def abort(self): 2248 """ Abort this special task.""" 2249 self.is_aborted = True 2250 self.save() 2251 2252 2253 def activate(self): 2254 """ 2255 Sets a task as active and sets the time started to the current time. 2256 """ 2257 logging.info('Starting: %s', self) 2258 self.is_active = True 2259 self.time_started = datetime.now() 2260 self.save() 2261 2262 2263 def finish(self, success): 2264 """Sets a task as completed. 2265 2266 @param success: Whether or not the task was successful. 2267 """ 2268 logging.info('Finished: %s', self) 2269 self.is_active = False 2270 self.is_complete = True 2271 self.success = success 2272 if self.time_started: 2273 self.time_finished = datetime.now() 2274 self.save() 2275 2276 2277 class Meta: 2278 """Metadata for class SpecialTask.""" 2279 db_table = 'afe_special_tasks' 2280 2281 2282 def __unicode__(self): 2283 result = u'Special Task %s (host %s, task %s, time %s)' % ( 2284 self.id, self.host, self.task, self.time_requested) 2285 if self.is_complete: 2286 result += u' (completed)' 2287 elif self.is_active: 2288 result += u' (active)' 2289 2290 return result 2291 2292 2293class StableVersion(dbmodels.Model, model_logic.ModelExtensions): 2294 2295 board = dbmodels.CharField(max_length=255, unique=True) 2296 version = dbmodels.CharField(max_length=255) 2297 2298 class Meta: 2299 """Metadata for class StableVersion.""" 2300 db_table = 'afe_stable_versions' 2301 2302 def save(self, *args, **kwargs): 2303 if os.getenv("OVERRIDE_STABLE_VERSION_BAN"): 2304 super(StableVersion, self).save(*args, **kwargs) 2305 else: 2306 raise RuntimeError("the ability to save StableVersions has been intentionally removed") 2307 2308 # pylint:disable=undefined-variable 2309 def delete(self): 2310 if os.getenv("OVERRIDE_STABLE_VERSION_BAN"): 2311 super(StableVersion, self).delete(*args, **kwargs) 2312 else: 2313 raise RuntimeError("the ability to delete StableVersions has been intentionally removed") 2314