1# pylint: disable=missing-docstring 2 3"""Database model classes for the scheduler. 4 5Contains model classes abstracting the various DB tables used by the scheduler. 6These overlap the Django models in basic functionality, but were written before 7the Django models existed and have not yet been phased out. Some of them 8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic 9which would probably be ill-suited for inclusion in the general Django model 10classes. 11 12Globals: 13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches 14 one of these statuses, an email will be sent to the job's email_list. 15 comes from global_config. 16_base_url: URL to the local AFE server, used to construct URLs for emails. 17_db: DatabaseConnection for this module. 18_drone_manager: reference to global DroneManager instance. 19""" 20 21import base64 22import datetime 23import errno 24import itertools 25import logging 26import re 27import weakref 28 29import google.protobuf.internal.well_known_types as types 30 31from autotest_lib.client.common_lib import global_config, host_protections 32from autotest_lib.client.common_lib import time_utils 33from autotest_lib.client.common_lib import utils 34from autotest_lib.frontend.afe import models, model_attributes 35from autotest_lib.scheduler import drone_manager, email_manager 36from autotest_lib.scheduler import rdb_lib 37from autotest_lib.scheduler import scheduler_config 38from autotest_lib.scheduler import scheduler_lib 39from autotest_lib.server import afe_urls 40from autotest_lib.server.cros import provision 41 42try: 43 from chromite.lib import metrics 44 from chromite.lib import cloud_trace 45except ImportError: 46 metrics = utils.metrics_mock 47 import mock 48 cloud_trace = mock.Mock() 49 50 51_notify_email_statuses = [] 52_base_url = None 53 54_db = None 55_drone_manager = None 56 57RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 58 'SKYLAB', 'respect_static_labels', type=bool, default=False) 59 60 61def initialize(): 62 global _db 63 _db = scheduler_lib.ConnectionManager().get_connection() 64 65 notify_statuses_list = global_config.global_config.get_config_value( 66 scheduler_config.CONFIG_SECTION, "notify_email_statuses", 67 default='') 68 global _notify_email_statuses 69 _notify_email_statuses = [status for status in 70 re.split(r'[\s,;:]', notify_statuses_list.lower()) 71 if status] 72 73 # AUTOTEST_WEB.base_url is still a supported config option as some people 74 # may wish to override the entire url. 75 global _base_url 76 config_base_url = global_config.global_config.get_config_value( 77 scheduler_config.CONFIG_SECTION, 'base_url', default='') 78 if config_base_url: 79 _base_url = config_base_url 80 else: 81 _base_url = afe_urls.ROOT_URL 82 83 initialize_globals() 84 85 86def initialize_globals(): 87 global _drone_manager 88 _drone_manager = drone_manager.instance() 89 90 91def get_job_metadata(job): 92 """Get a dictionary of the job information. 93 94 The return value is a dictionary that includes job information like id, 95 name and parent job information. The value will be stored in metadata 96 database. 97 98 @param job: A Job object. 99 @return: A dictionary containing the job id, owner and name. 100 """ 101 if not job: 102 logging.error('Job is None, no metadata returned.') 103 return {} 104 try: 105 return {'job_id': job.id, 106 'owner': job.owner, 107 'job_name': job.name, 108 'parent_job_id': job.parent_job_id} 109 except AttributeError as e: 110 logging.error('Job has missing attribute: %s', e) 111 return {} 112 113 114class DBError(Exception): 115 """Raised by the DBObject constructor when its select fails.""" 116 117 118class DBObject(object): 119 """A miniature object relational model for the database.""" 120 121 # Subclasses MUST override these: 122 _table_name = '' 123 _fields = () 124 125 # A mapping from (type, id) to the instance of the object for that 126 # particular id. This prevents us from creating new Job() and Host() 127 # instances for every HostQueueEntry object that we instantiate as 128 # multiple HQEs often share the same Job. 129 _instances_by_type_and_id = weakref.WeakValueDictionary() 130 _initialized = False 131 132 133 def __new__(cls, id=None, **kwargs): 134 """ 135 Look to see if we already have an instance for this particular type 136 and id. If so, use it instead of creating a duplicate instance. 137 """ 138 if id is not None: 139 instance = cls._instances_by_type_and_id.get((cls, id)) 140 if instance: 141 return instance 142 return super(DBObject, cls).__new__(cls, id=id, **kwargs) 143 144 145 def __init__(self, id=None, row=None, new_record=False, always_query=True): 146 assert bool(id) or bool(row) 147 if id is not None and row is not None: 148 assert id == row[0] 149 assert self._table_name, '_table_name must be defined in your class' 150 assert self._fields, '_fields must be defined in your class' 151 if not new_record: 152 if self._initialized and not always_query: 153 return # We've already been initialized. 154 if id is None: 155 id = row[0] 156 # Tell future constructors to use us instead of re-querying while 157 # this instance is still around. 158 self._instances_by_type_and_id[(type(self), id)] = self 159 160 self.__table = self._table_name 161 162 self.__new_record = new_record 163 164 if row is None: 165 row = self._fetch_row_from_db(id) 166 167 if self._initialized: 168 differences = self._compare_fields_in_row(row) 169 if differences: 170 logging.warning( 171 'initialized %s %s instance requery is updating: %s', 172 type(self), self.id, differences) 173 self._update_fields_from_row(row) 174 self._initialized = True 175 176 177 @classmethod 178 def _clear_instance_cache(cls): 179 """Used for testing, clear the internal instance cache.""" 180 cls._instances_by_type_and_id.clear() 181 182 183 def _fetch_row_from_db(self, row_id): 184 fields = ', '.join(self._fields) 185 sql = 'SELECT %s FROM %s WHERE ID=%%s' % (fields, self.__table) 186 rows = _db.execute(sql, (row_id,)) 187 if not rows: 188 raise DBError("row not found (table=%s, row id=%s)" 189 % (self.__table, row_id)) 190 return rows[0] 191 192 193 def _assert_row_length(self, row): 194 assert len(row) == len(self._fields), ( 195 "table = %s, row = %s/%d, fields = %s/%d" % ( 196 self.__table, row, len(row), self._fields, len(self._fields))) 197 198 199 def _compare_fields_in_row(self, row): 200 """ 201 Given a row as returned by a SELECT query, compare it to our existing in 202 memory fields. Fractional seconds are stripped from datetime values 203 before comparison. 204 205 @param row - A sequence of values corresponding to fields named in 206 The class attribute _fields. 207 208 @returns A dictionary listing the differences keyed by field name 209 containing tuples of (current_value, row_value). 210 """ 211 self._assert_row_length(row) 212 differences = {} 213 for field, row_value in itertools.izip(self._fields, row): 214 current_value = getattr(self, field) 215 if (isinstance(current_value, datetime.datetime) 216 and isinstance(row_value, datetime.datetime)): 217 current_value = current_value.strftime(time_utils.TIME_FMT) 218 row_value = row_value.strftime(time_utils.TIME_FMT) 219 if current_value != row_value: 220 differences[field] = (current_value, row_value) 221 return differences 222 223 224 def _update_fields_from_row(self, row): 225 """ 226 Update our field attributes using a single row returned by SELECT. 227 228 @param row - A sequence of values corresponding to fields named in 229 the class fields list. 230 """ 231 self._assert_row_length(row) 232 233 self._valid_fields = set() 234 for field, value in itertools.izip(self._fields, row): 235 setattr(self, field, value) 236 self._valid_fields.add(field) 237 238 self._valid_fields.remove('id') 239 240 241 def update_from_database(self): 242 assert self.id is not None 243 row = self._fetch_row_from_db(self.id) 244 self._update_fields_from_row(row) 245 246 247 def count(self, where, table = None): 248 if not table: 249 table = self.__table 250 251 rows = _db.execute(""" 252 SELECT count(*) FROM %s 253 WHERE %s 254 """ % (table, where)) 255 256 assert len(rows) == 1 257 258 return int(rows[0][0]) 259 260 261 def update_field(self, field, value): 262 assert field in self._valid_fields 263 264 if getattr(self, field) == value: 265 return 266 267 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field) 268 _db.execute(query, (value, self.id)) 269 270 setattr(self, field, value) 271 272 273 def save(self): 274 if self.__new_record: 275 keys = self._fields[1:] # avoid id 276 columns = ','.join([str(key) for key in keys]) 277 values = [] 278 for key in keys: 279 value = getattr(self, key) 280 if value is None: 281 values.append('NULL') 282 else: 283 values.append('"%s"' % value) 284 values_str = ','.join(values) 285 query = ('INSERT INTO %s (%s) VALUES (%s)' % 286 (self.__table, columns, values_str)) 287 _db.execute(query) 288 # Update our id to the one the database just assigned to us. 289 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0] 290 291 292 def delete(self): 293 self._instances_by_type_and_id.pop((type(self), id), None) 294 self._initialized = False 295 self._valid_fields.clear() 296 query = 'DELETE FROM %s WHERE id=%%s' % self.__table 297 _db.execute(query, (self.id,)) 298 299 300 @staticmethod 301 def _prefix_with(string, prefix): 302 if string: 303 string = prefix + string 304 return string 305 306 307 @classmethod 308 def fetch_rows(cls, where='', params=(), joins='', order_by=''): 309 """ 310 Fetch the rows based on the given database query. 311 312 @yields the rows fetched by the given query. 313 """ 314 order_by = cls._prefix_with(order_by, 'ORDER BY ') 315 where = cls._prefix_with(where, 'WHERE ') 316 fields = [] 317 for field in cls._fields: 318 fields.append('%s.%s' % (cls._table_name, field)) 319 320 query = ('SELECT %(fields)s FROM %(table)s %(joins)s ' 321 '%(where)s %(order_by)s' % {'fields' : ', '.join(fields), 322 'table' : cls._table_name, 323 'joins' : joins, 324 'where' : where, 325 'order_by' : order_by}) 326 rows = _db.execute(query, params) 327 return rows 328 329 @classmethod 330 def fetch(cls, where='', params=(), joins='', order_by=''): 331 """ 332 Construct instances of our class based on the given database query. 333 334 @yields One class instance for each row fetched. 335 """ 336 rows = cls.fetch_rows(where=where, params=params, joins=joins, 337 order_by=order_by) 338 return [cls(id=row[0], row=row) for row in rows] 339 340 341class IneligibleHostQueue(DBObject): 342 _table_name = 'afe_ineligible_host_queues' 343 _fields = ('id', 'job_id', 'host_id') 344 345 346class AtomicGroup(DBObject): 347 _table_name = 'afe_atomic_groups' 348 _fields = ('id', 'name', 'description', 'max_number_of_machines', 349 'invalid') 350 351 352class Label(DBObject): 353 _table_name = 'afe_labels' 354 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid', 355 'only_if_needed', 'atomic_group_id') 356 357 358 def __repr__(self): 359 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % ( 360 self.name, self.id, self.atomic_group_id) 361 362 363class Host(DBObject): 364 _table_name = 'afe_hosts' 365 # TODO(ayatane): synch_id is not used, remove after fixing DB. 366 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status', 367 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty', 368 'leased', 'shard_id', 'lock_reason') 369 370 371 def set_status(self,status): 372 logging.info('%s -> %s', self.hostname, status) 373 self.update_field('status',status) 374 375 376 def _get_labels_with_platform(self, non_static_rows, static_rows): 377 """Helper function to fetch labels & platform for a host.""" 378 if not RESPECT_STATIC_LABELS: 379 return non_static_rows 380 381 combined_rows = [] 382 replaced_labels = _db.execute( 383 'SELECT label_id FROM afe_replaced_labels') 384 replaced_label_ids = {l[0] for l in replaced_labels} 385 386 # We respect afe_labels more, which means: 387 # * if non-static labels are replaced, we find its replaced static 388 # labels from afe_static_labels by label name. 389 # * if non-static labels are not replaced, we keep it. 390 # * Drop static labels which don't have reference non-static labels. 391 static_label_names = [] 392 for label_id, label_name, is_platform in non_static_rows: 393 if label_id not in replaced_label_ids: 394 combined_rows.append((label_id, label_name, is_platform)) 395 else: 396 static_label_names.append(label_name) 397 398 # Only keep static labels who have replaced non-static labels. 399 for label_id, label_name, is_platform in static_rows: 400 if label_name in static_label_names: 401 combined_rows.append((label_id, label_name, is_platform)) 402 403 return combined_rows 404 405 406 def platform_and_labels(self): 407 """ 408 Returns a tuple (platform_name, list_of_all_label_names). 409 """ 410 template = ('SELECT %(label_table)s.id, %(label_table)s.name, ' 411 '%(label_table)s.platform FROM %(label_table)s INNER ' 412 'JOIN %(host_label_table)s ' 413 'ON %(label_table)s.id = %(host_label_table)s.%(column)s ' 414 'WHERE %(host_label_table)s.host_id = %(host_id)s ' 415 'ORDER BY %(label_table)s.name') 416 static_query = template % { 417 'host_label_table': 'afe_static_hosts_labels', 418 'label_table': 'afe_static_labels', 419 'column': 'staticlabel_id', 420 'host_id': self.id 421 } 422 non_static_query = template % { 423 'host_label_table': 'afe_hosts_labels', 424 'label_table': 'afe_labels', 425 'column': 'label_id', 426 'host_id': self.id 427 } 428 non_static_rows = _db.execute(non_static_query) 429 static_rows = _db.execute(static_query) 430 431 rows = self._get_labels_with_platform(non_static_rows, static_rows) 432 platform = None 433 all_labels = [] 434 for _, label_name, is_platform in rows: 435 if is_platform: 436 platform = label_name 437 all_labels.append(label_name) 438 return platform, all_labels 439 440 441 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE) 442 443 444 @classmethod 445 def cmp_for_sort(cls, a, b): 446 """ 447 A comparison function for sorting Host objects by hostname. 448 449 This strips any trailing numeric digits, ignores leading 0s and 450 compares hostnames by the leading name and the trailing digits as a 451 number. If both hostnames do not match this pattern, they are simply 452 compared as lower case strings. 453 454 Example of how hostnames will be sorted: 455 456 alice, host1, host2, host09, host010, host10, host11, yolkfolk 457 458 This hopefully satisfy most people's hostname sorting needs regardless 459 of their exact naming schemes. Nobody sane should have both a host10 460 and host010 (but the algorithm works regardless). 461 """ 462 lower_a = a.hostname.lower() 463 lower_b = b.hostname.lower() 464 match_a = cls._ALPHANUM_HOST_RE.match(lower_a) 465 match_b = cls._ALPHANUM_HOST_RE.match(lower_b) 466 if match_a and match_b: 467 name_a, number_a_str = match_a.groups() 468 name_b, number_b_str = match_b.groups() 469 number_a = int(number_a_str.lstrip('0')) 470 number_b = int(number_b_str.lstrip('0')) 471 result = cmp((name_a, number_a), (name_b, number_b)) 472 if result == 0 and lower_a != lower_b: 473 # If they compared equal above but the lower case names are 474 # indeed different, don't report equality. abc012 != abc12. 475 return cmp(lower_a, lower_b) 476 return result 477 else: 478 return cmp(lower_a, lower_b) 479 480 481class HostQueueEntry(DBObject): 482 _table_name = 'afe_host_queue_entries' 483 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host', 484 'active', 'complete', 'deleted', 'execution_subdir', 485 'atomic_group_id', 'aborted', 'started_on', 'finished_on') 486 487 _COMPLETION_COUNT_METRIC = metrics.Counter( 488 'chromeos/autotest/scheduler/hqe_completion_count') 489 490 def __init__(self, id=None, row=None, job_row=None, **kwargs): 491 """ 492 @param id: ID field from afe_host_queue_entries table. 493 Either id or row should be specified for initialization. 494 @param row: The DB row for a particular HostQueueEntry. 495 Either id or row should be specified for initialization. 496 @param job_row: The DB row for the job of this HostQueueEntry. 497 """ 498 assert id or row 499 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs) 500 self.job = Job(self.job_id, row=job_row) 501 502 if self.host_id: 503 self.host = rdb_lib.get_hosts([self.host_id])[0] 504 self.host.dbg_str = self.get_dbg_str() 505 self.host.metadata = get_job_metadata(self.job) 506 else: 507 self.host = None 508 509 510 @classmethod 511 def clone(cls, template): 512 """ 513 Creates a new row using the values from a template instance. 514 515 The new instance will not exist in the database or have a valid 516 id attribute until its save() method is called. 517 """ 518 assert isinstance(template, cls) 519 new_row = [getattr(template, field) for field in cls._fields] 520 clone = cls(row=new_row, new_record=True) 521 clone.id = None 522 return clone 523 524 525 @classmethod 526 def fetch(cls, where='', params=(), joins='', order_by=''): 527 """ 528 Construct instances of our class based on the given database query. 529 530 @yields One class instance for each row fetched. 531 """ 532 # Override the original fetch method to pre-fetch the jobs from the DB 533 # in order to prevent each HQE making separate DB queries. 534 rows = cls.fetch_rows(where=where, params=params, joins=joins, 535 order_by=order_by) 536 if len(rows) <= 1: 537 return [cls(id=row[0], row=row) for row in rows] 538 539 job_params = ', '.join([str(row[1]) for row in rows]) 540 job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params)) 541 # Create a Job_id to Job_row match dictionary to match the HQE 542 # to its corresponding job. 543 job_dict = {job_row[0]: job_row for job_row in job_rows} 544 return [cls(id=row[0], row=row, job_row=job_dict.get(row[1])) 545 for row in rows] 546 547 548 def _view_job_url(self): 549 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id) 550 551 552 def get_labels(self): 553 """ 554 Get all labels associated with this host queue entry (either via the 555 meta_host or as a job dependency label). The labels yielded are not 556 guaranteed to be unique. 557 558 @yields Label instances associated with this host_queue_entry. 559 """ 560 if self.meta_host: 561 yield Label(id=self.meta_host, always_query=False) 562 labels = Label.fetch( 563 joins="JOIN afe_jobs_dependency_labels AS deps " 564 "ON (afe_labels.id = deps.label_id)", 565 where="deps.job_id = %d" % self.job.id) 566 for label in labels: 567 yield label 568 569 570 def set_host(self, host): 571 if host: 572 logging.info('Assigning host %s to entry %s', host.hostname, self) 573 self.update_field('host_id', host.id) 574 self.block_host(host.id) 575 else: 576 logging.info('Releasing host from %s', self) 577 self.unblock_host(self.host.id) 578 self.update_field('host_id', None) 579 580 self.host = host 581 582 583 def block_host(self, host_id): 584 logging.info("creating block %s/%s", self.job.id, host_id) 585 row = [0, self.job.id, host_id] 586 block = IneligibleHostQueue(row=row, new_record=True) 587 block.save() 588 589 590 def unblock_host(self, host_id): 591 logging.info("removing block %s/%s", self.job.id, host_id) 592 blocks = IneligibleHostQueue.fetch( 593 'job_id=%d and host_id=%d' % (self.job.id, host_id)) 594 for block in blocks: 595 block.delete() 596 597 598 def set_execution_subdir(self, subdir=None): 599 if subdir is None: 600 assert self.host 601 subdir = self.host.hostname 602 self.update_field('execution_subdir', subdir) 603 604 605 def _get_hostname(self): 606 if self.host: 607 return self.host.hostname 608 return 'no host' 609 610 611 def get_dbg_str(self): 612 """Get a debug string to identify this host. 613 614 @return: A string containing the hqe and job id. 615 """ 616 try: 617 return 'HQE: %s, for job: %s' % (self.id, self.job_id) 618 except AttributeError as e: 619 return 'HQE has not been initialized yet: %s' % e 620 621 622 def __str__(self): 623 flags = [] 624 if self.active: 625 flags.append('active') 626 if self.complete: 627 flags.append('complete') 628 if self.deleted: 629 flags.append('deleted') 630 if self.aborted: 631 flags.append('aborted') 632 flags_str = ','.join(flags) 633 if flags_str: 634 flags_str = ' [%s]' % flags_str 635 return ("%s and host: %s has status:%s%s" % 636 (self.get_dbg_str(), self._get_hostname(), self.status, 637 flags_str)) 638 639 640 def set_status(self, status): 641 logging.info("%s -> %s", self, status) 642 643 self.update_field('status', status) 644 645 active = (status in models.HostQueueEntry.ACTIVE_STATUSES) 646 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES) 647 648 self.update_field('active', active) 649 650 # The ordering of these operations is important. Once we set the 651 # complete bit this job will become indistinguishable from all 652 # the other complete jobs, unless we first set shard_id to NULL 653 # to signal to the shard_client that we need to upload it. However, 654 # we can only set both these after we've updated finished_on etc 655 # within _on_complete or the job will get synced in an intermediate 656 # state. This means that if someone sigkills the scheduler between 657 # setting finished_on and complete, we will have inconsistent jobs. 658 # This should be fine, because nothing critical checks finished_on, 659 # and the scheduler should never be killed mid-tick. 660 if complete: 661 self._on_complete(status) 662 self._email_on_job_complete() 663 664 self.update_field('complete', complete) 665 666 should_email_status = (status.lower() in _notify_email_statuses or 667 'all' in _notify_email_statuses) 668 if should_email_status: 669 self._email_on_status(status) 670 logging.debug('HQE Set Status Complete') 671 672 673 def _on_complete(self, status): 674 metric_fields = {'status': status.lower()} 675 if self.host: 676 metric_fields['board'] = self.host.board or '' 677 if len(self.host.pools) == 1: 678 metric_fields['pool'] = self.host.pools[0] 679 else: 680 metric_fields['pool'] = 'MULTIPLE' 681 else: 682 metric_fields['board'] = 'NO_HOST' 683 metric_fields['pool'] = 'NO_HOST' 684 self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields) 685 if status is not models.HostQueueEntry.Status.ABORTED: 686 self.job.stop_if_necessary() 687 if self.started_on: 688 self.set_finished_on_now() 689 self._log_trace() 690 if self.job.shard_id is not None: 691 # If shard_id is None, the job will be synced back to the master 692 self.job.update_field('shard_id', None) 693 if not self.execution_subdir: 694 return 695 # unregister any possible pidfiles associated with this queue entry 696 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES: 697 pidfile_id = _drone_manager.get_pidfile_id_from( 698 self.execution_path(), pidfile_name=pidfile_name) 699 _drone_manager.unregister_pidfile(pidfile_id) 700 701 def _log_trace(self): 702 """Emits a Cloud Trace span for the HQE's duration.""" 703 if self.started_on and self.finished_on: 704 span = cloud_trace.Span('HQE', spanId='0', 705 traceId=hqe_trace_id(self.id)) 706 # TODO(phobbs) make a .SetStart() and .SetEnd() helper method 707 span.startTime = types.Timestamp() 708 span.startTime.FromDatetime(self.started_on) 709 span.endTime = types.Timestamp() 710 span.endTime.FromDatetime(self.finished_on) 711 # TODO(phobbs) any LogSpan calls need to be wrapped in this for 712 # safety during tests, so this should be caught within LogSpan. 713 try: 714 cloud_trace.LogSpan(span) 715 except IOError as e: 716 if e.errno == errno.ENOENT: 717 logging.warning('Error writing to cloud trace results ' 718 'directory: %s', e) 719 720 721 def _get_status_email_contents(self, status, summary=None, hostname=None): 722 """ 723 Gather info for the status notification e-mails. 724 725 If needed, we could start using the Django templating engine to create 726 the subject and the e-mail body, but that doesn't seem necessary right 727 now. 728 729 @param status: Job status text. Mandatory. 730 @param summary: Job summary text. Optional. 731 @param hostname: A hostname for the job. Optional. 732 733 @return: Tuple (subject, body) for the notification e-mail. 734 """ 735 job_stats = Job(id=self.job.id).get_execution_details() 736 737 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' % 738 (self.job.id, self.job.name, status)) 739 740 if hostname is not None: 741 subject += '| Hostname: %s ' % hostname 742 743 if status not in ["1 Failed", "Failed"]: 744 subject += '| Success Rate: %.2f %%' % job_stats['success_rate'] 745 746 body = "Job ID: %s\n" % self.job.id 747 body += "Job name: %s\n" % self.job.name 748 if hostname is not None: 749 body += "Host: %s\n" % hostname 750 if summary is not None: 751 body += "Summary: %s\n" % summary 752 body += "Status: %s\n" % status 753 body += "Results interface URL: %s\n" % self._view_job_url() 754 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time'] 755 if int(job_stats['total_executed']) > 0: 756 body += "User tests executed: %s\n" % job_stats['total_executed'] 757 body += "User tests passed: %s\n" % job_stats['total_passed'] 758 body += "User tests failed: %s\n" % job_stats['total_failed'] 759 body += ("User tests success rate: %.2f %%\n" % 760 job_stats['success_rate']) 761 762 if job_stats['failed_rows']: 763 body += "Failures:\n" 764 body += job_stats['failed_rows'] 765 766 return subject, body 767 768 769 def _email_on_status(self, status): 770 hostname = self._get_hostname() 771 subject, body = self._get_status_email_contents(status, None, hostname) 772 email_manager.manager.send_email(self.job.email_list, subject, body) 773 774 775 def _email_on_job_complete(self): 776 if not self.job.is_finished(): 777 return 778 779 summary = [] 780 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id) 781 for queue_entry in hosts_queue: 782 summary.append("Host: %s Status: %s" % 783 (queue_entry._get_hostname(), 784 queue_entry.status)) 785 786 summary = "\n".join(summary) 787 status_counts = models.Job.objects.get_status_counts( 788 [self.job.id])[self.job.id] 789 status = ', '.join('%d %s' % (count, status) for status, count 790 in status_counts.iteritems()) 791 792 subject, body = self._get_status_email_contents(status, summary, None) 793 email_manager.manager.send_email(self.job.email_list, subject, body) 794 795 796 def schedule_pre_job_tasks(self): 797 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s", 798 self.job.name, self.meta_host, self.atomic_group_id, 799 self.job.id, self.id, self.host.hostname, self.status) 800 801 self._do_schedule_pre_job_tasks() 802 803 804 def _do_schedule_pre_job_tasks(self): 805 self.job.schedule_pre_job_tasks(queue_entry=self) 806 807 808 def requeue(self): 809 assert self.host 810 self.set_status(models.HostQueueEntry.Status.QUEUED) 811 self.update_field('started_on', None) 812 self.update_field('finished_on', None) 813 # verify/cleanup failure sets the execution subdir, so reset it here 814 self.set_execution_subdir('') 815 if self.meta_host: 816 self.set_host(None) 817 818 819 @property 820 def aborted_by(self): 821 self._load_abort_info() 822 return self._aborted_by 823 824 825 @property 826 def aborted_on(self): 827 self._load_abort_info() 828 return self._aborted_on 829 830 831 def _load_abort_info(self): 832 """ Fetch info about who aborted the job. """ 833 if hasattr(self, "_aborted_by"): 834 return 835 rows = _db.execute(""" 836 SELECT afe_users.login, 837 afe_aborted_host_queue_entries.aborted_on 838 FROM afe_aborted_host_queue_entries 839 INNER JOIN afe_users 840 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id 841 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s 842 """, (self.id,)) 843 if rows: 844 self._aborted_by, self._aborted_on = rows[0] 845 else: 846 self._aborted_by = self._aborted_on = None 847 848 849 def on_pending(self): 850 """ 851 Called when an entry in a synchronous job has passed verify. If the 852 job is ready to run, sets the entries to STARTING. Otherwise, it leaves 853 them in PENDING. 854 """ 855 self.set_status(models.HostQueueEntry.Status.PENDING) 856 if not self.host: 857 raise scheduler_lib.NoHostIdError( 858 'Failed to recover a job whose host_queue_entry_id=%r due' 859 ' to no host_id.' 860 % self.id) 861 self.host.set_status(models.Host.Status.PENDING) 862 863 # Some debug code here: sends an email if an asynchronous job does not 864 # immediately enter Starting. 865 # TODO: Remove this once we figure out why asynchronous jobs are getting 866 # stuck in Pending. 867 self.job.run_if_ready(queue_entry=self) 868 if (self.job.synch_count == 1 and 869 self.status == models.HostQueueEntry.Status.PENDING): 870 subject = 'Job %s (id %s)' % (self.job.name, self.job.id) 871 message = 'Asynchronous job stuck in Pending' 872 email_manager.manager.enqueue_notify_email(subject, message) 873 874 875 def abort(self, dispatcher): 876 assert self.aborted and not self.complete 877 878 Status = models.HostQueueEntry.Status 879 if self.status in {Status.GATHERING, Status.PARSING}: 880 # do nothing; post-job tasks will finish and then mark this entry 881 # with status "Aborted" and take care of the host 882 return 883 884 if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}: 885 # If hqe is in any of these status, it should not have any 886 # unfinished agent before it can be aborted. 887 agents = dispatcher.get_agents_for_entry(self) 888 # Agent with finished task can be left behind. This is added to 889 # handle the special case of aborting hostless job in STARTING 890 # status, in which the agent has only a HostlessQueueTask 891 # associated. The finished HostlessQueueTask will be cleaned up in 892 # the next tick, so it's safe to leave the agent there. Without 893 # filtering out finished agent, HQE abort won't be able to proceed. 894 assert all([agent.is_done() for agent in agents]) 895 896 if self.host: 897 if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}: 898 self.host.set_status(models.Host.Status.READY) 899 elif self.status in {Status.VERIFYING, Status.RESETTING}: 900 models.SpecialTask.objects.create( 901 task=models.SpecialTask.Task.CLEANUP, 902 host=models.Host.objects.get(id=self.host.id), 903 requested_by=self.job.owner_model()) 904 elif self.status == Status.PROVISIONING: 905 models.SpecialTask.objects.create( 906 task=models.SpecialTask.Task.REPAIR, 907 host=models.Host.objects.get(id=self.host.id), 908 requested_by=self.job.owner_model()) 909 910 self.set_status(Status.ABORTED) 911 912 913 def execution_tag(self): 914 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE ' 915 'complete!=1 AND execution_subdir="" AND ' 916 'status!="Queued";') 917 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET ' 918 'status="Aborted" WHERE id=%s;') 919 try: 920 assert self.execution_subdir 921 except AssertionError: 922 # TODO(scottz): Remove temporary fix/info gathering pathway for 923 # crosbug.com/31595 once issue is root caused. 924 logging.error('No execution_subdir for host queue id:%s.', self.id) 925 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES) 926 for row in _db.execute(SQL_SUSPECT_ENTRIES): 927 logging.error(row) 928 logging.error('====DB DEBUG====\n') 929 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id 930 logging.error('EXECUTING: %s', fix_query) 931 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id) 932 raise AssertionError(('self.execution_subdir not found. ' 933 'See log for details.')) 934 935 return "%s/%s" % (self.job.tag(), self.execution_subdir) 936 937 938 def execution_path(self): 939 return self.execution_tag() 940 941 942 def set_started_on_now(self): 943 self.update_field('started_on', datetime.datetime.now()) 944 945 946 def set_finished_on_now(self): 947 self.update_field('finished_on', datetime.datetime.now()) 948 949 950 def is_hostless(self): 951 return (self.host_id is None 952 and self.meta_host is None) 953 954 955def hqe_trace_id(hqe_id): 956 """Constructs the canonical trace id based on the HQE's id. 957 958 Encodes 'HQE' in base16 and concatenates with the hex representation 959 of the HQE's id. 960 961 @param hqe_id: The HostQueueEntry's id. 962 963 Returns: 964 A trace id (in hex format) 965 """ 966 return base64.b16encode('HQE') + hex(hqe_id)[2:] 967 968 969class Job(DBObject): 970 _table_name = 'afe_jobs' 971 _fields = ('id', 'owner', 'name', 'priority', 'control_file', 972 'control_type', 'created_on', 'synch_count', 'timeout', 973 'run_verify', 'email_list', 'reboot_before', 'reboot_after', 974 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id', 975 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id', 976 'test_retry', 'run_reset', 'timeout_mins', 'shard_id', 977 'require_ssp') 978 979 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on 980 # all status='Pending' atomic group HQEs incase a delay was running when the 981 # scheduler was restarted and no more hosts ever successfully exit Verify. 982 983 def __init__(self, id=None, row=None, **kwargs): 984 assert id or row 985 super(Job, self).__init__(id=id, row=row, **kwargs) 986 self._owner_model = None # caches model instance of owner 987 self.update_image_path = None # path of OS image to install 988 989 990 def model(self): 991 return models.Job.objects.get(id=self.id) 992 993 994 def owner_model(self): 995 # work around the fact that the Job owner field is a string, not a 996 # foreign key 997 if not self._owner_model: 998 self._owner_model = models.User.objects.get(login=self.owner) 999 return self._owner_model 1000 1001 1002 def tag(self): 1003 return "%s-%s" % (self.id, self.owner) 1004 1005 1006 def get_execution_details(self): 1007 """ 1008 Get test execution details for this job. 1009 1010 @return: Dictionary with test execution details 1011 """ 1012 def _find_test_jobs(rows): 1013 """ 1014 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.* 1015 Those are autotest 'internal job' tests, so they should not be 1016 counted when evaluating the test stats. 1017 1018 @param rows: List of rows (matrix) with database results. 1019 """ 1020 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]') 1021 n_test_jobs = 0 1022 for r in rows: 1023 test_name = r[0] 1024 if job_test_pattern.match(test_name): 1025 n_test_jobs += 1 1026 1027 return n_test_jobs 1028 1029 stats = {} 1030 1031 rows = _db.execute(""" 1032 SELECT t.test, s.word, t.reason 1033 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s 1034 WHERE t.job_idx = j.job_idx 1035 AND s.status_idx = t.status 1036 AND j.afe_job_id = %s 1037 ORDER BY t.reason 1038 """ % self.id) 1039 1040 failed_rows = [r for r in rows if not r[1] == 'GOOD'] 1041 1042 n_test_jobs = _find_test_jobs(rows) 1043 n_test_jobs_failed = _find_test_jobs(failed_rows) 1044 1045 total_executed = len(rows) - n_test_jobs 1046 total_failed = len(failed_rows) - n_test_jobs_failed 1047 1048 if total_executed > 0: 1049 success_rate = 100 - ((total_failed / float(total_executed)) * 100) 1050 else: 1051 success_rate = 0 1052 1053 stats['total_executed'] = total_executed 1054 stats['total_failed'] = total_failed 1055 stats['total_passed'] = total_executed - total_failed 1056 stats['success_rate'] = success_rate 1057 1058 status_header = ("Test Name", "Status", "Reason") 1059 if failed_rows: 1060 stats['failed_rows'] = utils.matrix_to_string(failed_rows, 1061 status_header) 1062 else: 1063 stats['failed_rows'] = '' 1064 1065 time_row = _db.execute(""" 1066 SELECT started_time, finished_time 1067 FROM tko_jobs 1068 WHERE afe_job_id = %s 1069 """ % self.id) 1070 1071 if time_row: 1072 t_begin, t_end = time_row[0] 1073 try: 1074 delta = t_end - t_begin 1075 minutes, seconds = divmod(delta.seconds, 60) 1076 hours, minutes = divmod(minutes, 60) 1077 stats['execution_time'] = ("%02d:%02d:%02d" % 1078 (hours, minutes, seconds)) 1079 # One of t_end or t_begin are None 1080 except TypeError: 1081 stats['execution_time'] = '(could not determine)' 1082 else: 1083 stats['execution_time'] = '(none)' 1084 1085 return stats 1086 1087 1088 def keyval_dict(self): 1089 return self.model().keyval_dict() 1090 1091 1092 def _pending_count(self): 1093 """The number of HostQueueEntries for this job in the Pending state.""" 1094 pending_entries = models.HostQueueEntry.objects.filter( 1095 job=self.id, status=models.HostQueueEntry.Status.PENDING) 1096 return pending_entries.count() 1097 1098 1099 def is_ready(self): 1100 pending_count = self._pending_count() 1101 ready = (pending_count >= self.synch_count) 1102 1103 if not ready: 1104 logging.info( 1105 'Job %s not ready: %s pending, %s required ', 1106 self, pending_count, self.synch_count) 1107 1108 return ready 1109 1110 1111 def num_machines(self, clause = None): 1112 sql = "job_id=%s" % self.id 1113 if clause: 1114 sql += " AND (%s)" % clause 1115 return self.count(sql, table='afe_host_queue_entries') 1116 1117 1118 def num_queued(self): 1119 return self.num_machines('not complete') 1120 1121 1122 def num_active(self): 1123 return self.num_machines('active') 1124 1125 1126 def num_complete(self): 1127 return self.num_machines('complete') 1128 1129 1130 def is_finished(self): 1131 return self.num_complete() == self.num_machines() 1132 1133 1134 def _not_yet_run_entries(self, include_active=True): 1135 if include_active: 1136 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES) 1137 else: 1138 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES) 1139 return models.HostQueueEntry.objects.filter(job=self.id, 1140 status__in=statuses) 1141 1142 1143 def _stop_all_entries(self): 1144 """Stops the job's inactive pre-job HQEs.""" 1145 entries_to_stop = self._not_yet_run_entries( 1146 include_active=False) 1147 for child_entry in entries_to_stop: 1148 assert not child_entry.complete, ( 1149 '%s status=%s, active=%s, complete=%s' % 1150 (child_entry.id, child_entry.status, child_entry.active, 1151 child_entry.complete)) 1152 if child_entry.status == models.HostQueueEntry.Status.PENDING: 1153 child_entry.host.status = models.Host.Status.READY 1154 child_entry.host.save() 1155 child_entry.status = models.HostQueueEntry.Status.STOPPED 1156 child_entry.save() 1157 1158 1159 def stop_if_necessary(self): 1160 not_yet_run = self._not_yet_run_entries() 1161 if not_yet_run.count() < self.synch_count: 1162 self._stop_all_entries() 1163 1164 1165 def _next_group_name(self): 1166 """@returns a directory name to use for the next host group results.""" 1167 group_count_re = re.compile(r'group(\d+)') 1168 query = models.HostQueueEntry.objects.filter( 1169 job=self.id).values('execution_subdir').distinct() 1170 subdirs = (entry['execution_subdir'] for entry in query) 1171 group_matches = (group_count_re.match(subdir) for subdir in subdirs) 1172 ids = [int(match.group(1)) for match in group_matches if match] 1173 if ids: 1174 next_id = max(ids) + 1 1175 else: 1176 next_id = 0 1177 return 'group%d' % (next_id,) 1178 1179 1180 def get_group_entries(self, queue_entry_from_group): 1181 """ 1182 @param queue_entry_from_group: A HostQueueEntry instance to find other 1183 group entries on this job for. 1184 1185 @returns A list of HostQueueEntry objects all executing this job as 1186 part of the same group as the one supplied (having the same 1187 execution_subdir). 1188 """ 1189 execution_subdir = queue_entry_from_group.execution_subdir 1190 return list(HostQueueEntry.fetch( 1191 where='job_id=%s AND execution_subdir=%s', 1192 params=(self.id, execution_subdir))) 1193 1194 1195 def _should_run_cleanup(self, queue_entry): 1196 if self.reboot_before == model_attributes.RebootBefore.ALWAYS: 1197 return True 1198 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY: 1199 return queue_entry.host.dirty 1200 return False 1201 1202 1203 def _should_run_verify(self, queue_entry): 1204 do_not_verify = (queue_entry.host.protection == 1205 host_protections.Protection.DO_NOT_VERIFY) 1206 if do_not_verify: 1207 return False 1208 # If RebootBefore is set to NEVER, then we won't run reset because 1209 # we can't cleanup, so we need to weaken a Reset into a Verify. 1210 weaker_reset = (self.run_reset and 1211 self.reboot_before == model_attributes.RebootBefore.NEVER) 1212 return self.run_verify or weaker_reset 1213 1214 1215 def _should_run_reset(self, queue_entry): 1216 can_verify = (queue_entry.host.protection != 1217 host_protections.Protection.DO_NOT_VERIFY) 1218 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER 1219 return (can_reboot and can_verify 1220 and (self.run_reset 1221 or (self._should_run_cleanup(queue_entry) 1222 and self._should_run_verify(queue_entry)))) 1223 1224 1225 def _should_run_provision(self, queue_entry): 1226 """ 1227 Determine if the queue_entry needs to have a provision task run before 1228 it to provision queue_entry.host. 1229 1230 @param queue_entry: The host queue entry in question. 1231 @returns: True if we should schedule a provision task, False otherwise. 1232 1233 """ 1234 # If we get to this point, it means that the scheduler has already 1235 # vetted that all the unprovisionable labels match, so we can just 1236 # find all labels on the job that aren't on the host to get the list 1237 # of what we need to provision. (See the scheduling logic in 1238 # host_scheduler.py:is_host_eligable_for_job() where we discard all 1239 # actionable labels when assigning jobs to hosts.) 1240 job_labels = {x.name for x in queue_entry.get_labels()} 1241 # Skip provision if `skip_provision` is listed in the job labels. 1242 if provision.SKIP_PROVISION in job_labels: 1243 return False 1244 _, host_labels = queue_entry.host.platform_and_labels() 1245 # If there are any labels on the job that are not on the host and they 1246 # are labels that provisioning knows how to change, then that means 1247 # there is provisioning work to do. If there's no provisioning work to 1248 # do, then obviously we have no reason to schedule a provision task! 1249 diff = job_labels - set(host_labels) 1250 if any([provision.Provision.acts_on(x) for x in diff]): 1251 return True 1252 return False 1253 1254 1255 def _queue_special_task(self, queue_entry, task): 1256 """ 1257 Create a special task and associate it with a host queue entry. 1258 1259 @param queue_entry: The queue entry this special task should be 1260 associated with. 1261 @param task: One of the members of the enum models.SpecialTask.Task. 1262 @returns: None 1263 1264 """ 1265 models.SpecialTask.objects.create( 1266 host=models.Host.objects.get(id=queue_entry.host_id), 1267 queue_entry=queue_entry, task=task) 1268 1269 1270 def schedule_pre_job_tasks(self, queue_entry): 1271 """ 1272 Queue all of the special tasks that need to be run before a host 1273 queue entry may run. 1274 1275 If no special taskes need to be scheduled, then |on_pending| will be 1276 called directly. 1277 1278 @returns None 1279 1280 """ 1281 task_queued = False 1282 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id) 1283 1284 if self._should_run_provision(queue_entry): 1285 self._queue_special_task(hqe_model, 1286 models.SpecialTask.Task.PROVISION) 1287 task_queued = True 1288 elif self._should_run_reset(queue_entry): 1289 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET) 1290 task_queued = True 1291 else: 1292 if self._should_run_cleanup(queue_entry): 1293 self._queue_special_task(hqe_model, 1294 models.SpecialTask.Task.CLEANUP) 1295 task_queued = True 1296 if self._should_run_verify(queue_entry): 1297 self._queue_special_task(hqe_model, 1298 models.SpecialTask.Task.VERIFY) 1299 task_queued = True 1300 1301 if not task_queued: 1302 queue_entry.on_pending() 1303 1304 1305 def _assign_new_group(self, queue_entries): 1306 if len(queue_entries) == 1: 1307 group_subdir_name = queue_entries[0].host.hostname 1308 else: 1309 group_subdir_name = self._next_group_name() 1310 logging.info('Running synchronous job %d hosts %s as %s', 1311 self.id, [entry.host.hostname for entry in queue_entries], 1312 group_subdir_name) 1313 1314 for queue_entry in queue_entries: 1315 queue_entry.set_execution_subdir(group_subdir_name) 1316 1317 1318 def _choose_group_to_run(self, include_queue_entry): 1319 """ 1320 @returns A tuple containing a list of HostQueueEntry instances to be 1321 used to run this Job, a string group name to suggest giving 1322 to this job in the results database. 1323 """ 1324 chosen_entries = [include_queue_entry] 1325 num_entries_wanted = self.synch_count 1326 num_entries_wanted -= len(chosen_entries) 1327 1328 if num_entries_wanted > 0: 1329 where_clause = 'job_id = %s AND status = "Pending" AND id != %s' 1330 pending_entries = list(HostQueueEntry.fetch( 1331 where=where_clause, 1332 params=(self.id, include_queue_entry.id))) 1333 1334 # Sort the chosen hosts by hostname before slicing. 1335 def cmp_queue_entries_by_hostname(entry_a, entry_b): 1336 return Host.cmp_for_sort(entry_a.host, entry_b.host) 1337 pending_entries.sort(cmp=cmp_queue_entries_by_hostname) 1338 chosen_entries += pending_entries[:num_entries_wanted] 1339 1340 # Sanity check. We'll only ever be called if this can be met. 1341 if len(chosen_entries) < self.synch_count: 1342 message = ('job %s got less than %s chosen entries: %s' % ( 1343 self.id, self.synch_count, chosen_entries)) 1344 logging.error(message) 1345 email_manager.manager.enqueue_notify_email( 1346 'Job not started, too few chosen entries', message) 1347 return [] 1348 1349 self._assign_new_group(chosen_entries) 1350 return chosen_entries 1351 1352 1353 def run_if_ready(self, queue_entry): 1354 """ 1355 Run this job by kicking its HQEs into status='Starting' if enough 1356 hosts are ready for it to run. 1357 1358 Cleans up by kicking HQEs into status='Stopped' if this Job is not 1359 ready to run. 1360 """ 1361 if not self.is_ready(): 1362 self.stop_if_necessary() 1363 else: 1364 self.run(queue_entry) 1365 1366 1367 def request_abort(self): 1368 """Request that this Job be aborted on the next scheduler cycle.""" 1369 self.model().abort() 1370 1371 1372 def run(self, queue_entry): 1373 """ 1374 @param queue_entry: The HostQueueEntry instance calling this method. 1375 """ 1376 queue_entries = self._choose_group_to_run(queue_entry) 1377 if queue_entries: 1378 self._finish_run(queue_entries) 1379 1380 1381 def _finish_run(self, queue_entries): 1382 for queue_entry in queue_entries: 1383 queue_entry.set_status(models.HostQueueEntry.Status.STARTING) 1384 1385 1386 def __str__(self): 1387 return '%s-%s' % (self.id, self.owner) 1388