1# pylint: disable=missing-docstring 2 3"""Database model classes for the scheduler. 4 5Contains model classes abstracting the various DB tables used by the scheduler. 6These overlap the Django models in basic functionality, but were written before 7the Django models existed and have not yet been phased out. Some of them 8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic 9which would probably be ill-suited for inclusion in the general Django model 10classes. 11 12Globals: 13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches 14 one of these statuses, an email will be sent to the job's email_list. 15 comes from global_config. 16_base_url: URL to the local AFE server, used to construct URLs for emails. 17_db: DatabaseConnection for this module. 18_drone_manager: reference to global DroneManager instance. 19""" 20 21import base64 22import datetime 23import errno 24import itertools 25import logging 26import re 27import weakref 28 29import google.protobuf.internal.well_known_types as types 30 31from autotest_lib.client.common_lib import global_config, host_protections 32from autotest_lib.client.common_lib import time_utils 33from autotest_lib.client.common_lib import utils 34from autotest_lib.frontend.afe import models, model_attributes 35from autotest_lib.scheduler import drone_manager, email_manager 36from autotest_lib.scheduler import rdb_lib 37from autotest_lib.scheduler import scheduler_config 38from autotest_lib.scheduler import scheduler_lib 39from autotest_lib.server import afe_urls 40from autotest_lib.server.cros import provision 41 42try: 43 from chromite.lib import metrics 44 from chromite.lib import cloud_trace 45except ImportError: 46 metrics = utils.metrics_mock 47 import mock 48 cloud_trace = mock.Mock() 49 50 51_notify_email_statuses = [] 52_base_url = None 53 54_db = None 55_drone_manager = None 56 57RESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 58 'SKYLAB', 'respect_static_labels', type=bool, default=False) 59 60 61def initialize(): 62 global _db 63 _db = scheduler_lib.ConnectionManager().get_connection() 64 65 notify_statuses_list = global_config.global_config.get_config_value( 66 scheduler_config.CONFIG_SECTION, "notify_email_statuses", 67 default='') 68 global _notify_email_statuses 69 _notify_email_statuses = [status for status in 70 re.split(r'[\s,;:]', notify_statuses_list.lower()) 71 if status] 72 73 # AUTOTEST_WEB.base_url is still a supported config option as some people 74 # may wish to override the entire url. 75 global _base_url 76 config_base_url = global_config.global_config.get_config_value( 77 scheduler_config.CONFIG_SECTION, 'base_url', default='') 78 if config_base_url: 79 _base_url = config_base_url 80 else: 81 _base_url = afe_urls.ROOT_URL 82 83 initialize_globals() 84 85 86def initialize_globals(): 87 global _drone_manager 88 _drone_manager = drone_manager.instance() 89 90 91def get_job_metadata(job): 92 """Get a dictionary of the job information. 93 94 The return value is a dictionary that includes job information like id, 95 name and parent job information. The value will be stored in metadata 96 database. 97 98 @param job: A Job object. 99 @return: A dictionary containing the job id, owner and name. 100 """ 101 if not job: 102 logging.error('Job is None, no metadata returned.') 103 return {} 104 try: 105 return {'job_id': job.id, 106 'owner': job.owner, 107 'job_name': job.name, 108 'parent_job_id': job.parent_job_id} 109 except AttributeError as e: 110 logging.error('Job has missing attribute: %s', e) 111 return {} 112 113 114class DBError(Exception): 115 """Raised by the DBObject constructor when its select fails.""" 116 117 118class DBObject(object): 119 """A miniature object relational model for the database.""" 120 121 # Subclasses MUST override these: 122 _table_name = '' 123 _fields = () 124 125 # A mapping from (type, id) to the instance of the object for that 126 # particular id. This prevents us from creating new Job() and Host() 127 # instances for every HostQueueEntry object that we instantiate as 128 # multiple HQEs often share the same Job. 129 _instances_by_type_and_id = weakref.WeakValueDictionary() 130 _initialized = False 131 132 133 def __new__(cls, id=None, **kwargs): 134 """ 135 Look to see if we already have an instance for this particular type 136 and id. If so, use it instead of creating a duplicate instance. 137 """ 138 if id is not None: 139 instance = cls._instances_by_type_and_id.get((cls, id)) 140 if instance: 141 return instance 142 return super(DBObject, cls).__new__(cls, id=id, **kwargs) 143 144 145 def __init__(self, id=None, row=None, new_record=False, always_query=True): 146 assert bool(id) or bool(row) 147 if id is not None and row is not None: 148 assert id == row[0] 149 assert self._table_name, '_table_name must be defined in your class' 150 assert self._fields, '_fields must be defined in your class' 151 if not new_record: 152 if self._initialized and not always_query: 153 return # We've already been initialized. 154 if id is None: 155 id = row[0] 156 # Tell future constructors to use us instead of re-querying while 157 # this instance is still around. 158 self._instances_by_type_and_id[(type(self), id)] = self 159 160 self.__table = self._table_name 161 162 self.__new_record = new_record 163 164 if row is None: 165 row = self._fetch_row_from_db(id) 166 167 if self._initialized: 168 differences = self._compare_fields_in_row(row) 169 if differences: 170 logging.warning( 171 'initialized %s %s instance requery is updating: %s', 172 type(self), self.id, differences) 173 self._update_fields_from_row(row) 174 self._initialized = True 175 176 177 @classmethod 178 def _clear_instance_cache(cls): 179 """Used for testing, clear the internal instance cache.""" 180 cls._instances_by_type_and_id.clear() 181 182 183 def _fetch_row_from_db(self, row_id): 184 fields = ', '.join(self._fields) 185 sql = 'SELECT %s FROM %s WHERE ID=%%s' % (fields, self.__table) 186 rows = _db.execute(sql, (row_id,)) 187 if not rows: 188 raise DBError("row not found (table=%s, row id=%s)" 189 % (self.__table, row_id)) 190 return rows[0] 191 192 193 def _assert_row_length(self, row): 194 assert len(row) == len(self._fields), ( 195 "table = %s, row = %s/%d, fields = %s/%d" % ( 196 self.__table, row, len(row), self._fields, len(self._fields))) 197 198 199 def _compare_fields_in_row(self, row): 200 """ 201 Given a row as returned by a SELECT query, compare it to our existing in 202 memory fields. Fractional seconds are stripped from datetime values 203 before comparison. 204 205 @param row - A sequence of values corresponding to fields named in 206 The class attribute _fields. 207 208 @returns A dictionary listing the differences keyed by field name 209 containing tuples of (current_value, row_value). 210 """ 211 self._assert_row_length(row) 212 differences = {} 213 for field, row_value in itertools.izip(self._fields, row): 214 current_value = getattr(self, field) 215 if (isinstance(current_value, datetime.datetime) 216 and isinstance(row_value, datetime.datetime)): 217 current_value = current_value.strftime(time_utils.TIME_FMT) 218 row_value = row_value.strftime(time_utils.TIME_FMT) 219 if current_value != row_value: 220 differences[field] = (current_value, row_value) 221 return differences 222 223 224 def _update_fields_from_row(self, row): 225 """ 226 Update our field attributes using a single row returned by SELECT. 227 228 @param row - A sequence of values corresponding to fields named in 229 the class fields list. 230 """ 231 self._assert_row_length(row) 232 233 self._valid_fields = set() 234 for field, value in itertools.izip(self._fields, row): 235 setattr(self, field, value) 236 self._valid_fields.add(field) 237 238 self._valid_fields.remove('id') 239 240 241 def update_from_database(self): 242 assert self.id is not None 243 row = self._fetch_row_from_db(self.id) 244 self._update_fields_from_row(row) 245 246 247 def count(self, where, table = None): 248 if not table: 249 table = self.__table 250 251 rows = _db.execute(""" 252 SELECT count(*) FROM %s 253 WHERE %s 254 """ % (table, where)) 255 256 assert len(rows) == 1 257 258 return int(rows[0][0]) 259 260 261 def update_field(self, field, value): 262 assert field in self._valid_fields 263 264 if getattr(self, field) == value: 265 return 266 267 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field) 268 _db.execute(query, (value, self.id)) 269 270 setattr(self, field, value) 271 272 273 def save(self): 274 if self.__new_record: 275 keys = self._fields[1:] # avoid id 276 columns = ','.join([str(key) for key in keys]) 277 values = [] 278 for key in keys: 279 value = getattr(self, key) 280 if value is None: 281 values.append('NULL') 282 else: 283 values.append('"%s"' % value) 284 values_str = ','.join(values) 285 query = ('INSERT INTO %s (%s) VALUES (%s)' % 286 (self.__table, columns, values_str)) 287 _db.execute(query) 288 # Update our id to the one the database just assigned to us. 289 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0] 290 291 292 def delete(self): 293 self._instances_by_type_and_id.pop((type(self), id), None) 294 self._initialized = False 295 self._valid_fields.clear() 296 query = 'DELETE FROM %s WHERE id=%%s' % self.__table 297 _db.execute(query, (self.id,)) 298 299 300 @staticmethod 301 def _prefix_with(string, prefix): 302 if string: 303 string = prefix + string 304 return string 305 306 307 @classmethod 308 def fetch_rows(cls, where='', params=(), joins='', order_by=''): 309 """ 310 Fetch the rows based on the given database query. 311 312 @yields the rows fetched by the given query. 313 """ 314 order_by = cls._prefix_with(order_by, 'ORDER BY ') 315 where = cls._prefix_with(where, 'WHERE ') 316 fields = [] 317 for field in cls._fields: 318 fields.append('%s.%s' % (cls._table_name, field)) 319 320 query = ('SELECT %(fields)s FROM %(table)s %(joins)s ' 321 '%(where)s %(order_by)s' % {'fields' : ', '.join(fields), 322 'table' : cls._table_name, 323 'joins' : joins, 324 'where' : where, 325 'order_by' : order_by}) 326 rows = _db.execute(query, params) 327 return rows 328 329 @classmethod 330 def fetch(cls, where='', params=(), joins='', order_by=''): 331 """ 332 Construct instances of our class based on the given database query. 333 334 @yields One class instance for each row fetched. 335 """ 336 rows = cls.fetch_rows(where=where, params=params, joins=joins, 337 order_by=order_by) 338 return [cls(id=row[0], row=row) for row in rows] 339 340 341class IneligibleHostQueue(DBObject): 342 _table_name = 'afe_ineligible_host_queues' 343 _fields = ('id', 'job_id', 'host_id') 344 345 346class AtomicGroup(DBObject): 347 _table_name = 'afe_atomic_groups' 348 _fields = ('id', 'name', 'description', 'max_number_of_machines', 349 'invalid') 350 351 352class Label(DBObject): 353 _table_name = 'afe_labels' 354 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid', 355 'only_if_needed', 'atomic_group_id') 356 357 358 def __repr__(self): 359 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % ( 360 self.name, self.id, self.atomic_group_id) 361 362 363class Host(DBObject): 364 _table_name = 'afe_hosts' 365 # TODO(ayatane): synch_id is not used, remove after fixing DB. 366 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status', 367 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty', 368 'leased', 'shard_id', 'lock_reason') 369 370 371 def set_status(self,status): 372 logging.info('%s -> %s', self.hostname, status) 373 self.update_field('status',status) 374 375 376 def _get_labels_with_platform(self, non_static_rows, static_rows): 377 """Helper function to fetch labels & platform for a host.""" 378 if not RESPECT_STATIC_LABELS: 379 return non_static_rows 380 381 combined_rows = [] 382 replaced_labels = _db.execute( 383 'SELECT label_id FROM afe_replaced_labels') 384 replaced_label_ids = {l[0] for l in replaced_labels} 385 386 # We respect afe_labels more, which means: 387 # * if non-static labels are replaced, we find its replaced static 388 # labels from afe_static_labels by label name. 389 # * if non-static labels are not replaced, we keep it. 390 # * Drop static labels which don't have reference non-static labels. 391 static_label_names = [] 392 for label_id, label_name, is_platform in non_static_rows: 393 if label_id not in replaced_label_ids: 394 combined_rows.append((label_id, label_name, is_platform)) 395 else: 396 static_label_names.append(label_name) 397 398 # Only keep static labels who have replaced non-static labels. 399 for label_id, label_name, is_platform in static_rows: 400 if label_name in static_label_names: 401 combined_rows.append((label_id, label_name, is_platform)) 402 403 return combined_rows 404 405 406 def platform_and_labels(self): 407 """ 408 Returns a tuple (platform_name, list_of_all_label_names). 409 """ 410 template = ('SELECT %(label_table)s.id, %(label_table)s.name, ' 411 '%(label_table)s.platform FROM %(label_table)s INNER ' 412 'JOIN %(host_label_table)s ' 413 'ON %(label_table)s.id = %(host_label_table)s.%(column)s ' 414 'WHERE %(host_label_table)s.host_id = %(host_id)s ' 415 'ORDER BY %(label_table)s.name') 416 static_query = template % { 417 'host_label_table': 'afe_static_hosts_labels', 418 'label_table': 'afe_static_labels', 419 'column': 'staticlabel_id', 420 'host_id': self.id 421 } 422 non_static_query = template % { 423 'host_label_table': 'afe_hosts_labels', 424 'label_table': 'afe_labels', 425 'column': 'label_id', 426 'host_id': self.id 427 } 428 non_static_rows = _db.execute(non_static_query) 429 static_rows = _db.execute(static_query) 430 431 rows = self._get_labels_with_platform(non_static_rows, static_rows) 432 platform = None 433 all_labels = [] 434 for _, label_name, is_platform in rows: 435 if is_platform: 436 platform = label_name 437 all_labels.append(label_name) 438 return platform, all_labels 439 440 441 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE) 442 443 444 @classmethod 445 def cmp_for_sort(cls, a, b): 446 """ 447 A comparison function for sorting Host objects by hostname. 448 449 This strips any trailing numeric digits, ignores leading 0s and 450 compares hostnames by the leading name and the trailing digits as a 451 number. If both hostnames do not match this pattern, they are simply 452 compared as lower case strings. 453 454 Example of how hostnames will be sorted: 455 456 alice, host1, host2, host09, host010, host10, host11, yolkfolk 457 458 This hopefully satisfy most people's hostname sorting needs regardless 459 of their exact naming schemes. Nobody sane should have both a host10 460 and host010 (but the algorithm works regardless). 461 """ 462 lower_a = a.hostname.lower() 463 lower_b = b.hostname.lower() 464 match_a = cls._ALPHANUM_HOST_RE.match(lower_a) 465 match_b = cls._ALPHANUM_HOST_RE.match(lower_b) 466 if match_a and match_b: 467 name_a, number_a_str = match_a.groups() 468 name_b, number_b_str = match_b.groups() 469 number_a = int(number_a_str.lstrip('0')) 470 number_b = int(number_b_str.lstrip('0')) 471 result = cmp((name_a, number_a), (name_b, number_b)) 472 if result == 0 and lower_a != lower_b: 473 # If they compared equal above but the lower case names are 474 # indeed different, don't report equality. abc012 != abc12. 475 return cmp(lower_a, lower_b) 476 return result 477 else: 478 return cmp(lower_a, lower_b) 479 480 481class HostQueueEntry(DBObject): 482 _table_name = 'afe_host_queue_entries' 483 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host', 484 'active', 'complete', 'deleted', 'execution_subdir', 485 'atomic_group_id', 'aborted', 'started_on', 'finished_on') 486 487 _COMPLETION_COUNT_METRIC = metrics.Counter( 488 'chromeos/autotest/scheduler/hqe_completion_count') 489 490 def __init__(self, id=None, row=None, job_row=None, **kwargs): 491 """ 492 @param id: ID field from afe_host_queue_entries table. 493 Either id or row should be specified for initialization. 494 @param row: The DB row for a particular HostQueueEntry. 495 Either id or row should be specified for initialization. 496 @param job_row: The DB row for the job of this HostQueueEntry. 497 """ 498 assert id or row 499 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs) 500 self.job = Job(self.job_id, row=job_row) 501 502 if self.host_id: 503 self.host = rdb_lib.get_hosts([self.host_id])[0] 504 self.host.dbg_str = self.get_dbg_str() 505 self.host.metadata = get_job_metadata(self.job) 506 else: 507 self.host = None 508 509 510 @classmethod 511 def clone(cls, template): 512 """ 513 Creates a new row using the values from a template instance. 514 515 The new instance will not exist in the database or have a valid 516 id attribute until its save() method is called. 517 """ 518 assert isinstance(template, cls) 519 new_row = [getattr(template, field) for field in cls._fields] 520 clone = cls(row=new_row, new_record=True) 521 clone.id = None 522 return clone 523 524 525 @classmethod 526 def fetch(cls, where='', params=(), joins='', order_by=''): 527 """ 528 Construct instances of our class based on the given database query. 529 530 @yields One class instance for each row fetched. 531 """ 532 # Override the original fetch method to pre-fetch the jobs from the DB 533 # in order to prevent each HQE making separate DB queries. 534 rows = cls.fetch_rows(where=where, params=params, joins=joins, 535 order_by=order_by) 536 if len(rows) <= 1: 537 return [cls(id=row[0], row=row) for row in rows] 538 539 job_params = ', '.join([str(row[1]) for row in rows]) 540 job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params)) 541 # Create a Job_id to Job_row match dictionary to match the HQE 542 # to its corresponding job. 543 job_dict = {job_row[0]: job_row for job_row in job_rows} 544 return [cls(id=row[0], row=row, job_row=job_dict.get(row[1])) 545 for row in rows] 546 547 548 def _view_job_url(self): 549 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id) 550 551 552 def get_labels(self): 553 """ 554 Get all labels associated with this host queue entry (either via the 555 meta_host or as a job dependency label). The labels yielded are not 556 guaranteed to be unique. 557 558 @yields Label instances associated with this host_queue_entry. 559 """ 560 if self.meta_host: 561 yield Label(id=self.meta_host, always_query=False) 562 labels = Label.fetch( 563 joins="JOIN afe_jobs_dependency_labels AS deps " 564 "ON (afe_labels.id = deps.label_id)", 565 where="deps.job_id = %d" % self.job.id) 566 for label in labels: 567 yield label 568 569 570 def set_host(self, host): 571 if host: 572 logging.info('Assigning host %s to entry %s', host.hostname, self) 573 self.update_field('host_id', host.id) 574 self.block_host(host.id) 575 else: 576 logging.info('Releasing host from %s', self) 577 self.unblock_host(self.host.id) 578 self.update_field('host_id', None) 579 580 self.host = host 581 582 583 def block_host(self, host_id): 584 logging.info("creating block %s/%s", self.job.id, host_id) 585 row = [0, self.job.id, host_id] 586 block = IneligibleHostQueue(row=row, new_record=True) 587 block.save() 588 589 590 def unblock_host(self, host_id): 591 logging.info("removing block %s/%s", self.job.id, host_id) 592 blocks = IneligibleHostQueue.fetch( 593 'job_id=%d and host_id=%d' % (self.job.id, host_id)) 594 for block in blocks: 595 block.delete() 596 597 598 def set_execution_subdir(self, subdir=None): 599 if subdir is None: 600 assert self.host 601 subdir = self.host.hostname 602 self.update_field('execution_subdir', subdir) 603 604 605 def _get_hostname(self): 606 if self.host: 607 return self.host.hostname 608 return 'no host' 609 610 611 def get_dbg_str(self): 612 """Get a debug string to identify this host. 613 614 @return: A string containing the hqe and job id. 615 """ 616 try: 617 return 'HQE: %s, for job: %s' % (self.id, self.job_id) 618 except AttributeError as e: 619 return 'HQE has not been initialized yet: %s' % e 620 621 622 def __str__(self): 623 flags = [] 624 if self.active: 625 flags.append('active') 626 if self.complete: 627 flags.append('complete') 628 if self.deleted: 629 flags.append('deleted') 630 if self.aborted: 631 flags.append('aborted') 632 flags_str = ','.join(flags) 633 if flags_str: 634 flags_str = ' [%s]' % flags_str 635 return ("%s and host: %s has status:%s%s" % 636 (self.get_dbg_str(), self._get_hostname(), self.status, 637 flags_str)) 638 639 640 def set_status(self, status): 641 logging.info("%s -> %s", self, status) 642 643 self.update_field('status', status) 644 645 active = (status in models.HostQueueEntry.ACTIVE_STATUSES) 646 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES) 647 648 self.update_field('active', active) 649 650 # The ordering of these operations is important. Once we set the 651 # complete bit this job will become indistinguishable from all 652 # the other complete jobs, unless we first set shard_id to NULL 653 # to signal to the shard_client that we need to upload it. However, 654 # we can only set both these after we've updated finished_on etc 655 # within _on_complete or the job will get synced in an intermediate 656 # state. This means that if someone sigkills the scheduler between 657 # setting finished_on and complete, we will have inconsistent jobs. 658 # This should be fine, because nothing critical checks finished_on, 659 # and the scheduler should never be killed mid-tick. 660 if complete: 661 self._on_complete(status) 662 self._email_on_job_complete() 663 664 self.update_field('complete', complete) 665 666 should_email_status = (status.lower() in _notify_email_statuses or 667 'all' in _notify_email_statuses) 668 if should_email_status: 669 self._email_on_status(status) 670 logging.debug('HQE Set Status Complete') 671 672 673 def _on_complete(self, status): 674 metric_fields = {'status': status.lower()} 675 if self.host: 676 metric_fields['board'] = self.host.board or '' 677 if len(self.host.pools) == 1: 678 metric_fields['pool'] = self.host.pools[0] 679 else: 680 metric_fields['pool'] = 'MULTIPLE' 681 else: 682 metric_fields['board'] = 'NO_HOST' 683 metric_fields['pool'] = 'NO_HOST' 684 self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields) 685 if status is not models.HostQueueEntry.Status.ABORTED: 686 self.job.stop_if_necessary() 687 if self.started_on: 688 self.set_finished_on_now() 689 self._log_trace() 690 if self.job.shard_id is not None: 691 # If shard_id is None, the job will be synced back to the master 692 self.job.update_field('shard_id', None) 693 if not self.execution_subdir: 694 return 695 # unregister any possible pidfiles associated with this queue entry 696 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES: 697 pidfile_id = _drone_manager.get_pidfile_id_from( 698 self.execution_path(), pidfile_name=pidfile_name) 699 _drone_manager.unregister_pidfile(pidfile_id) 700 701 def _log_trace(self): 702 """Emits a Cloud Trace span for the HQE's duration.""" 703 if self.started_on and self.finished_on: 704 span = cloud_trace.Span('HQE', spanId='0', 705 traceId=hqe_trace_id(self.id)) 706 # TODO(phobbs) make a .SetStart() and .SetEnd() helper method 707 span.startTime = types.Timestamp() 708 span.startTime.FromDatetime(self.started_on) 709 span.endTime = types.Timestamp() 710 span.endTime.FromDatetime(self.finished_on) 711 # TODO(phobbs) any LogSpan calls need to be wrapped in this for 712 # safety during tests, so this should be caught within LogSpan. 713 try: 714 cloud_trace.LogSpan(span) 715 except IOError as e: 716 if e.errno == errno.ENOENT: 717 logging.warning('Error writing to cloud trace results ' 718 'directory: %s', e) 719 720 721 def _get_status_email_contents(self, status, summary=None, hostname=None): 722 """ 723 Gather info for the status notification e-mails. 724 725 If needed, we could start using the Django templating engine to create 726 the subject and the e-mail body, but that doesn't seem necessary right 727 now. 728 729 @param status: Job status text. Mandatory. 730 @param summary: Job summary text. Optional. 731 @param hostname: A hostname for the job. Optional. 732 733 @return: Tuple (subject, body) for the notification e-mail. 734 """ 735 job_stats = Job(id=self.job.id).get_execution_details() 736 737 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' % 738 (self.job.id, self.job.name, status)) 739 740 if hostname is not None: 741 subject += '| Hostname: %s ' % hostname 742 743 if status not in ["1 Failed", "Failed"]: 744 subject += '| Success Rate: %.2f %%' % job_stats['success_rate'] 745 746 body = "Job ID: %s\n" % self.job.id 747 body += "Job name: %s\n" % self.job.name 748 if hostname is not None: 749 body += "Host: %s\n" % hostname 750 if summary is not None: 751 body += "Summary: %s\n" % summary 752 body += "Status: %s\n" % status 753 body += "Results interface URL: %s\n" % self._view_job_url() 754 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time'] 755 if int(job_stats['total_executed']) > 0: 756 body += "User tests executed: %s\n" % job_stats['total_executed'] 757 body += "User tests passed: %s\n" % job_stats['total_passed'] 758 body += "User tests failed: %s\n" % job_stats['total_failed'] 759 body += ("User tests success rate: %.2f %%\n" % 760 job_stats['success_rate']) 761 762 if job_stats['failed_rows']: 763 body += "Failures:\n" 764 body += job_stats['failed_rows'] 765 766 return subject, body 767 768 769 def _email_on_status(self, status): 770 hostname = self._get_hostname() 771 subject, body = self._get_status_email_contents(status, None, hostname) 772 email_manager.manager.send_email(self.job.email_list, subject, body) 773 774 775 def _email_on_job_complete(self): 776 if not self.job.is_finished(): 777 return 778 779 summary = [] 780 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id) 781 for queue_entry in hosts_queue: 782 summary.append("Host: %s Status: %s" % 783 (queue_entry._get_hostname(), 784 queue_entry.status)) 785 786 summary = "\n".join(summary) 787 status_counts = models.Job.objects.get_status_counts( 788 [self.job.id])[self.job.id] 789 status = ', '.join('%d %s' % (count, status) for status, count 790 in status_counts.iteritems()) 791 792 subject, body = self._get_status_email_contents(status, summary, None) 793 email_manager.manager.send_email(self.job.email_list, subject, body) 794 795 796 def schedule_pre_job_tasks(self): 797 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s", 798 self.job.name, self.meta_host, self.atomic_group_id, 799 self.job.id, self.id, self.host.hostname, self.status) 800 801 self._do_schedule_pre_job_tasks() 802 803 804 def _do_schedule_pre_job_tasks(self): 805 self.job.schedule_pre_job_tasks(queue_entry=self) 806 807 808 def requeue(self): 809 assert self.host 810 self.set_status(models.HostQueueEntry.Status.QUEUED) 811 self.update_field('started_on', None) 812 self.update_field('finished_on', None) 813 # verify/cleanup failure sets the execution subdir, so reset it here 814 self.set_execution_subdir('') 815 if self.meta_host: 816 self.set_host(None) 817 818 819 @property 820 def aborted_by(self): 821 self._load_abort_info() 822 return self._aborted_by 823 824 825 @property 826 def aborted_on(self): 827 self._load_abort_info() 828 return self._aborted_on 829 830 831 def _load_abort_info(self): 832 """ Fetch info about who aborted the job. """ 833 if hasattr(self, "_aborted_by"): 834 return 835 rows = _db.execute(""" 836 SELECT afe_users.login, 837 afe_aborted_host_queue_entries.aborted_on 838 FROM afe_aborted_host_queue_entries 839 INNER JOIN afe_users 840 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id 841 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s 842 """, (self.id,)) 843 if rows: 844 self._aborted_by, self._aborted_on = rows[0] 845 else: 846 self._aborted_by = self._aborted_on = None 847 848 849 def on_pending(self): 850 """ 851 Called when an entry in a synchronous job has passed verify. If the 852 job is ready to run, sets the entries to STARTING. Otherwise, it leaves 853 them in PENDING. 854 """ 855 self.set_status(models.HostQueueEntry.Status.PENDING) 856 if not self.host: 857 raise scheduler_lib.NoHostIdError( 858 'Failed to recover a job whose host_queue_entry_id=%r due' 859 ' to no host_id.' 860 % self.id) 861 self.host.set_status(models.Host.Status.PENDING) 862 863 # Some debug code here: sends an email if an asynchronous job does not 864 # immediately enter Starting. 865 # TODO: Remove this once we figure out why asynchronous jobs are getting 866 # stuck in Pending. 867 self.job.run_if_ready(queue_entry=self) 868 if (self.job.synch_count == 1 and 869 self.status == models.HostQueueEntry.Status.PENDING): 870 subject = 'Job %s (id %s)' % (self.job.name, self.job.id) 871 message = 'Asynchronous job stuck in Pending' 872 email_manager.manager.enqueue_notify_email(subject, message) 873 874 875 def abort(self, dispatcher): 876 assert self.aborted and not self.complete 877 878 Status = models.HostQueueEntry.Status 879 if self.status in {Status.GATHERING, Status.PARSING}: 880 # do nothing; post-job tasks will finish and then mark this entry 881 # with status "Aborted" and take care of the host 882 return 883 884 if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}: 885 # If hqe is in any of these status, it should not have any 886 # unfinished agent before it can be aborted. 887 agents = dispatcher.get_agents_for_entry(self) 888 # Agent with finished task can be left behind. This is added to 889 # handle the special case of aborting hostless job in STARTING 890 # status, in which the agent has only a HostlessQueueTask 891 # associated. The finished HostlessQueueTask will be cleaned up in 892 # the next tick, so it's safe to leave the agent there. Without 893 # filtering out finished agent, HQE abort won't be able to proceed. 894 assert all([agent.is_done() for agent in agents]) 895 # If hqe is still in STARTING status, it may not have assigned a 896 # host yet. 897 if self.host: 898 self.host.set_status(models.Host.Status.READY) 899 elif (self.status == Status.VERIFYING or 900 self.status == Status.RESETTING): 901 models.SpecialTask.objects.create( 902 task=models.SpecialTask.Task.CLEANUP, 903 host=models.Host.objects.get(id=self.host.id), 904 requested_by=self.job.owner_model()) 905 elif self.status == Status.PROVISIONING: 906 models.SpecialTask.objects.create( 907 task=models.SpecialTask.Task.REPAIR, 908 host=models.Host.objects.get(id=self.host.id), 909 requested_by=self.job.owner_model()) 910 911 self.set_status(Status.ABORTED) 912 913 914 def execution_tag(self): 915 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE ' 916 'complete!=1 AND execution_subdir="" AND ' 917 'status!="Queued";') 918 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET ' 919 'status="Aborted" WHERE id=%s;') 920 try: 921 assert self.execution_subdir 922 except AssertionError: 923 # TODO(scottz): Remove temporary fix/info gathering pathway for 924 # crosbug.com/31595 once issue is root caused. 925 logging.error('No execution_subdir for host queue id:%s.', self.id) 926 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES) 927 for row in _db.execute(SQL_SUSPECT_ENTRIES): 928 logging.error(row) 929 logging.error('====DB DEBUG====\n') 930 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id 931 logging.error('EXECUTING: %s', fix_query) 932 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id) 933 raise AssertionError(('self.execution_subdir not found. ' 934 'See log for details.')) 935 936 return "%s/%s" % (self.job.tag(), self.execution_subdir) 937 938 939 def execution_path(self): 940 return self.execution_tag() 941 942 943 def set_started_on_now(self): 944 self.update_field('started_on', datetime.datetime.now()) 945 946 947 def set_finished_on_now(self): 948 self.update_field('finished_on', datetime.datetime.now()) 949 950 951 def is_hostless(self): 952 return (self.host_id is None 953 and self.meta_host is None) 954 955 956def hqe_trace_id(hqe_id): 957 """Constructs the canonical trace id based on the HQE's id. 958 959 Encodes 'HQE' in base16 and concatenates with the hex representation 960 of the HQE's id. 961 962 @param hqe_id: The HostQueueEntry's id. 963 964 Returns: 965 A trace id (in hex format) 966 """ 967 return base64.b16encode('HQE') + hex(hqe_id)[2:] 968 969 970class Job(DBObject): 971 _table_name = 'afe_jobs' 972 _fields = ('id', 'owner', 'name', 'priority', 'control_file', 973 'control_type', 'created_on', 'synch_count', 'timeout', 974 'run_verify', 'email_list', 'reboot_before', 'reboot_after', 975 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id', 976 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id', 977 'test_retry', 'run_reset', 'timeout_mins', 'shard_id', 978 'require_ssp') 979 980 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on 981 # all status='Pending' atomic group HQEs incase a delay was running when the 982 # scheduler was restarted and no more hosts ever successfully exit Verify. 983 984 def __init__(self, id=None, row=None, **kwargs): 985 assert id or row 986 super(Job, self).__init__(id=id, row=row, **kwargs) 987 self._owner_model = None # caches model instance of owner 988 self.update_image_path = None # path of OS image to install 989 990 991 def model(self): 992 return models.Job.objects.get(id=self.id) 993 994 995 def owner_model(self): 996 # work around the fact that the Job owner field is a string, not a 997 # foreign key 998 if not self._owner_model: 999 self._owner_model = models.User.objects.get(login=self.owner) 1000 return self._owner_model 1001 1002 1003 def tag(self): 1004 return "%s-%s" % (self.id, self.owner) 1005 1006 1007 def get_execution_details(self): 1008 """ 1009 Get test execution details for this job. 1010 1011 @return: Dictionary with test execution details 1012 """ 1013 def _find_test_jobs(rows): 1014 """ 1015 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.* 1016 Those are autotest 'internal job' tests, so they should not be 1017 counted when evaluating the test stats. 1018 1019 @param rows: List of rows (matrix) with database results. 1020 """ 1021 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]') 1022 n_test_jobs = 0 1023 for r in rows: 1024 test_name = r[0] 1025 if job_test_pattern.match(test_name): 1026 n_test_jobs += 1 1027 1028 return n_test_jobs 1029 1030 stats = {} 1031 1032 rows = _db.execute(""" 1033 SELECT t.test, s.word, t.reason 1034 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s 1035 WHERE t.job_idx = j.job_idx 1036 AND s.status_idx = t.status 1037 AND j.afe_job_id = %s 1038 ORDER BY t.reason 1039 """ % self.id) 1040 1041 failed_rows = [r for r in rows if not r[1] == 'GOOD'] 1042 1043 n_test_jobs = _find_test_jobs(rows) 1044 n_test_jobs_failed = _find_test_jobs(failed_rows) 1045 1046 total_executed = len(rows) - n_test_jobs 1047 total_failed = len(failed_rows) - n_test_jobs_failed 1048 1049 if total_executed > 0: 1050 success_rate = 100 - ((total_failed / float(total_executed)) * 100) 1051 else: 1052 success_rate = 0 1053 1054 stats['total_executed'] = total_executed 1055 stats['total_failed'] = total_failed 1056 stats['total_passed'] = total_executed - total_failed 1057 stats['success_rate'] = success_rate 1058 1059 status_header = ("Test Name", "Status", "Reason") 1060 if failed_rows: 1061 stats['failed_rows'] = utils.matrix_to_string(failed_rows, 1062 status_header) 1063 else: 1064 stats['failed_rows'] = '' 1065 1066 time_row = _db.execute(""" 1067 SELECT started_time, finished_time 1068 FROM tko_jobs 1069 WHERE afe_job_id = %s 1070 """ % self.id) 1071 1072 if time_row: 1073 t_begin, t_end = time_row[0] 1074 try: 1075 delta = t_end - t_begin 1076 minutes, seconds = divmod(delta.seconds, 60) 1077 hours, minutes = divmod(minutes, 60) 1078 stats['execution_time'] = ("%02d:%02d:%02d" % 1079 (hours, minutes, seconds)) 1080 # One of t_end or t_begin are None 1081 except TypeError: 1082 stats['execution_time'] = '(could not determine)' 1083 else: 1084 stats['execution_time'] = '(none)' 1085 1086 return stats 1087 1088 1089 def keyval_dict(self): 1090 return self.model().keyval_dict() 1091 1092 1093 def _pending_count(self): 1094 """The number of HostQueueEntries for this job in the Pending state.""" 1095 pending_entries = models.HostQueueEntry.objects.filter( 1096 job=self.id, status=models.HostQueueEntry.Status.PENDING) 1097 return pending_entries.count() 1098 1099 1100 def is_ready(self): 1101 pending_count = self._pending_count() 1102 ready = (pending_count >= self.synch_count) 1103 1104 if not ready: 1105 logging.info( 1106 'Job %s not ready: %s pending, %s required ', 1107 self, pending_count, self.synch_count) 1108 1109 return ready 1110 1111 1112 def num_machines(self, clause = None): 1113 sql = "job_id=%s" % self.id 1114 if clause: 1115 sql += " AND (%s)" % clause 1116 return self.count(sql, table='afe_host_queue_entries') 1117 1118 1119 def num_queued(self): 1120 return self.num_machines('not complete') 1121 1122 1123 def num_active(self): 1124 return self.num_machines('active') 1125 1126 1127 def num_complete(self): 1128 return self.num_machines('complete') 1129 1130 1131 def is_finished(self): 1132 return self.num_complete() == self.num_machines() 1133 1134 1135 def _not_yet_run_entries(self, include_active=True): 1136 if include_active: 1137 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES) 1138 else: 1139 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES) 1140 return models.HostQueueEntry.objects.filter(job=self.id, 1141 status__in=statuses) 1142 1143 1144 def _stop_all_entries(self): 1145 """Stops the job's inactive pre-job HQEs.""" 1146 entries_to_stop = self._not_yet_run_entries( 1147 include_active=False) 1148 for child_entry in entries_to_stop: 1149 assert not child_entry.complete, ( 1150 '%s status=%s, active=%s, complete=%s' % 1151 (child_entry.id, child_entry.status, child_entry.active, 1152 child_entry.complete)) 1153 if child_entry.status == models.HostQueueEntry.Status.PENDING: 1154 child_entry.host.status = models.Host.Status.READY 1155 child_entry.host.save() 1156 child_entry.status = models.HostQueueEntry.Status.STOPPED 1157 child_entry.save() 1158 1159 1160 def stop_if_necessary(self): 1161 not_yet_run = self._not_yet_run_entries() 1162 if not_yet_run.count() < self.synch_count: 1163 self._stop_all_entries() 1164 1165 1166 def _next_group_name(self): 1167 """@returns a directory name to use for the next host group results.""" 1168 group_name = '' 1169 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name)) 1170 query = models.HostQueueEntry.objects.filter( 1171 job=self.id).values('execution_subdir').distinct() 1172 subdirs = (entry['execution_subdir'] for entry in query) 1173 group_matches = (group_count_re.match(subdir) for subdir in subdirs) 1174 ids = [int(match.group(1)) for match in group_matches if match] 1175 if ids: 1176 next_id = max(ids) + 1 1177 else: 1178 next_id = 0 1179 return '%sgroup%d' % (group_name, next_id) 1180 1181 1182 def get_group_entries(self, queue_entry_from_group): 1183 """ 1184 @param queue_entry_from_group: A HostQueueEntry instance to find other 1185 group entries on this job for. 1186 1187 @returns A list of HostQueueEntry objects all executing this job as 1188 part of the same group as the one supplied (having the same 1189 execution_subdir). 1190 """ 1191 execution_subdir = queue_entry_from_group.execution_subdir 1192 return list(HostQueueEntry.fetch( 1193 where='job_id=%s AND execution_subdir=%s', 1194 params=(self.id, execution_subdir))) 1195 1196 1197 def _should_run_cleanup(self, queue_entry): 1198 if self.reboot_before == model_attributes.RebootBefore.ALWAYS: 1199 return True 1200 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY: 1201 return queue_entry.host.dirty 1202 return False 1203 1204 1205 def _should_run_verify(self, queue_entry): 1206 do_not_verify = (queue_entry.host.protection == 1207 host_protections.Protection.DO_NOT_VERIFY) 1208 if do_not_verify: 1209 return False 1210 # If RebootBefore is set to NEVER, then we won't run reset because 1211 # we can't cleanup, so we need to weaken a Reset into a Verify. 1212 weaker_reset = (self.run_reset and 1213 self.reboot_before == model_attributes.RebootBefore.NEVER) 1214 return self.run_verify or weaker_reset 1215 1216 1217 def _should_run_reset(self, queue_entry): 1218 can_verify = (queue_entry.host.protection != 1219 host_protections.Protection.DO_NOT_VERIFY) 1220 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER 1221 return (can_reboot and can_verify and (self.run_reset or 1222 (self._should_run_cleanup(queue_entry) and 1223 self._should_run_verify(queue_entry)))) 1224 1225 1226 def _should_run_provision(self, queue_entry): 1227 """ 1228 Determine if the queue_entry needs to have a provision task run before 1229 it to provision queue_entry.host. 1230 1231 @param queue_entry: The host queue entry in question. 1232 @returns: True if we should schedule a provision task, False otherwise. 1233 1234 """ 1235 # If we get to this point, it means that the scheduler has already 1236 # vetted that all the unprovisionable labels match, so we can just 1237 # find all labels on the job that aren't on the host to get the list 1238 # of what we need to provision. (See the scheduling logic in 1239 # host_scheduler.py:is_host_eligable_for_job() where we discard all 1240 # actionable labels when assigning jobs to hosts.) 1241 job_labels = {x.name for x in queue_entry.get_labels()} 1242 # Skip provision if `skip_provision` is listed in the job labels. 1243 if provision.SKIP_PROVISION in job_labels: 1244 return False 1245 _, host_labels = queue_entry.host.platform_and_labels() 1246 # If there are any labels on the job that are not on the host and they 1247 # are labels that provisioning knows how to change, then that means 1248 # there is provisioning work to do. If there's no provisioning work to 1249 # do, then obviously we have no reason to schedule a provision task! 1250 diff = job_labels - set(host_labels) 1251 if any([provision.Provision.acts_on(x) for x in diff]): 1252 return True 1253 return False 1254 1255 1256 def _queue_special_task(self, queue_entry, task): 1257 """ 1258 Create a special task and associate it with a host queue entry. 1259 1260 @param queue_entry: The queue entry this special task should be 1261 associated with. 1262 @param task: One of the members of the enum models.SpecialTask.Task. 1263 @returns: None 1264 1265 """ 1266 models.SpecialTask.objects.create( 1267 host=models.Host.objects.get(id=queue_entry.host_id), 1268 queue_entry=queue_entry, task=task) 1269 1270 1271 def schedule_pre_job_tasks(self, queue_entry): 1272 """ 1273 Queue all of the special tasks that need to be run before a host 1274 queue entry may run. 1275 1276 If no special taskes need to be scheduled, then |on_pending| will be 1277 called directly. 1278 1279 @returns None 1280 1281 """ 1282 task_queued = False 1283 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id) 1284 1285 if self._should_run_provision(queue_entry): 1286 self._queue_special_task(hqe_model, 1287 models.SpecialTask.Task.PROVISION) 1288 task_queued = True 1289 elif self._should_run_reset(queue_entry): 1290 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET) 1291 task_queued = True 1292 else: 1293 if self._should_run_cleanup(queue_entry): 1294 self._queue_special_task(hqe_model, 1295 models.SpecialTask.Task.CLEANUP) 1296 task_queued = True 1297 if self._should_run_verify(queue_entry): 1298 self._queue_special_task(hqe_model, 1299 models.SpecialTask.Task.VERIFY) 1300 task_queued = True 1301 1302 if not task_queued: 1303 queue_entry.on_pending() 1304 1305 1306 def _assign_new_group(self, queue_entries): 1307 if len(queue_entries) == 1: 1308 group_subdir_name = queue_entries[0].host.hostname 1309 else: 1310 group_subdir_name = self._next_group_name() 1311 logging.info('Running synchronous job %d hosts %s as %s', 1312 self.id, [entry.host.hostname for entry in queue_entries], 1313 group_subdir_name) 1314 1315 for queue_entry in queue_entries: 1316 queue_entry.set_execution_subdir(group_subdir_name) 1317 1318 1319 def _choose_group_to_run(self, include_queue_entry): 1320 """ 1321 @returns A tuple containing a list of HostQueueEntry instances to be 1322 used to run this Job, a string group name to suggest giving 1323 to this job in the results database. 1324 """ 1325 chosen_entries = [include_queue_entry] 1326 num_entries_wanted = self.synch_count 1327 num_entries_wanted -= len(chosen_entries) 1328 1329 if num_entries_wanted > 0: 1330 where_clause = 'job_id = %s AND status = "Pending" AND id != %s' 1331 pending_entries = list(HostQueueEntry.fetch( 1332 where=where_clause, 1333 params=(self.id, include_queue_entry.id))) 1334 1335 # Sort the chosen hosts by hostname before slicing. 1336 def cmp_queue_entries_by_hostname(entry_a, entry_b): 1337 return Host.cmp_for_sort(entry_a.host, entry_b.host) 1338 pending_entries.sort(cmp=cmp_queue_entries_by_hostname) 1339 chosen_entries += pending_entries[:num_entries_wanted] 1340 1341 # Sanity check. We'll only ever be called if this can be met. 1342 if len(chosen_entries) < self.synch_count: 1343 message = ('job %s got less than %s chosen entries: %s' % ( 1344 self.id, self.synch_count, chosen_entries)) 1345 logging.error(message) 1346 email_manager.manager.enqueue_notify_email( 1347 'Job not started, too few chosen entries', message) 1348 return [] 1349 1350 self._assign_new_group(chosen_entries) 1351 return chosen_entries 1352 1353 1354 def run_if_ready(self, queue_entry): 1355 """ 1356 Run this job by kicking its HQEs into status='Starting' if enough 1357 hosts are ready for it to run. 1358 1359 Cleans up by kicking HQEs into status='Stopped' if this Job is not 1360 ready to run. 1361 """ 1362 if not self.is_ready(): 1363 self.stop_if_necessary() 1364 else: 1365 self.run(queue_entry) 1366 1367 1368 def request_abort(self): 1369 """Request that this Job be aborted on the next scheduler cycle.""" 1370 self.model().abort() 1371 1372 1373 def run(self, queue_entry): 1374 """ 1375 @param queue_entry: The HostQueueEntry instance calling this method. 1376 """ 1377 queue_entries = self._choose_group_to_run(queue_entry) 1378 if queue_entries: 1379 self._finish_run(queue_entries) 1380 1381 1382 def _finish_run(self, queue_entries): 1383 for queue_entry in queue_entries: 1384 queue_entry.set_status(models.HostQueueEntry.Status.STARTING) 1385 1386 1387 def __str__(self): 1388 return '%s-%s' % (self.id, self.owner) 1389