1#pylint: disable-msg=C0111 2 3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Scheduler library classes. 8""" 9 10import collections 11import logging 12 13import common 14 15from autotest_lib.frontend import setup_django_environment 16 17from autotest_lib.client.common_lib import utils 18from autotest_lib.frontend.afe import models 19from autotest_lib.server.cros.dynamic_suite import constants 20from autotest_lib.scheduler import scheduler_models 21from autotest_lib.scheduler import scheduler_lib 22 23try: 24 from chromite.lib import metrics 25except ImportError: 26 metrics = utils.metrics_mock 27 28 29_job_timer_name = 'chromeos/autotest/scheduler/job_query_durations/%s' 30class AFEJobQueryManager(object): 31 """Query manager for AFE Jobs.""" 32 33 # A subquery to only get inactive hostless jobs. 34 hostless_query = 'host_id IS NULL AND meta_host IS NULL' 35 36 37 @metrics.SecondsTimerDecorator( 38 _job_timer_name % 'get_pending_queue_entries') 39 def get_pending_queue_entries(self, only_hostless=False): 40 """ 41 Fetch a list of new host queue entries. 42 43 The ordering of this list is important, as every new agent 44 we schedule can potentially contribute to the process count 45 on the drone, which has a static limit. The sort order 46 prioritizes jobs as follows: 47 1. High priority jobs: Based on the afe_job's priority 48 2. With hosts and metahosts: This will only happen if we don't 49 activate the hqe after assigning a host to it in 50 schedule_new_jobs. 51 3. With hosts but without metahosts: When tests are scheduled 52 through the frontend the owner of the job would have chosen 53 a host for it. 54 4. Without hosts but with metahosts: This is the common case of 55 a new test that needs a DUT. We assign a host and set it to 56 active so it shouldn't show up in case 2 on the next tick. 57 5. Without hosts and without metahosts: Hostless suite jobs, that 58 will result in new jobs that fall under category 4. 59 60 A note about the ordering of cases 3 and 4: 61 Prioritizing one case above the other leads to earlier acquisition 62 of the following resources: 1. process slots on the drone 2. machines. 63 - When a user schedules a job through the afe they choose a specific 64 host for it. Jobs with metahost can utilize any host that satisfies 65 the metahost criterion. This means that if we had scheduled 4 before 66 3 there is a good chance that a job which could've used another host, 67 will now use the host assigned to a metahost-less job. Given the 68 availability of machines in pool:suites, this almost guarantees 69 starvation for jobs scheduled through the frontend. 70 - Scheduling 4 before 3 also has its pros however, since a suite 71 has the concept of a time out, whereas users can wait. If we hit the 72 process count on the drone a suite can timeout waiting on the test, 73 but a user job generally has a much longer timeout, and relatively 74 harmless consequences. 75 The current ordering was chosed because it is more likely that we will 76 run out of machines in pool:suites than processes on the drone. 77 78 @returns A list of HQEs ordered according to sort_order. 79 """ 80 sort_order = ('afe_jobs.priority DESC, ' 81 'ISNULL(host_id), ' 82 'ISNULL(meta_host), ' 83 'parent_job_id, ' 84 'job_id') 85 # Don't execute jobs that should be executed by a shard in the global 86 # scheduler. 87 # This won't prevent the shard scheduler to run this, as the shard db 88 # doesn't have an an entry in afe_shards_labels. 89 query=('NOT complete AND NOT active AND status="Queued"' 90 'AND NOT aborted AND afe_shards_labels.id IS NULL') 91 92 # TODO(jakobjuelich, beeps): Optimize this query. Details: 93 # Compressed output of EXPLAIN <query>: 94 # +------------------------+--------+-------------------------+-------+ 95 # | table | type | key | rows | 96 # +------------------------+--------+-------------------------+-------+ 97 # | afe_host_queue_entries | ref | host_queue_entry_status | 30536 | 98 # | afe_shards_labels | ref | shard_label_id_fk | 1 | 99 # | afe_jobs | eq_ref | PRIMARY | 1 | 100 # +------------------------+--------+-------------------------+-------+ 101 # This shows the first part of the query fetches a lot of objects, that 102 # are then filtered. The joins are comparably fast: There's usually just 103 # one or none shard mapping that can be answered fully using an index 104 # (shard_label_id_fk), similar thing applies to the job. 105 # 106 # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued), 107 # it might be more efficient to filter on the meta_host first, instead 108 # of the status. 109 if only_hostless: 110 query = '%s AND (%s)' % (query, self.hostless_query) 111 return list(scheduler_models.HostQueueEntry.fetch( 112 joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) ' 113 'LEFT JOIN afe_shards_labels ON (' 114 'meta_host=afe_shards_labels.label_id)'), 115 where=query, order_by=sort_order)) 116 117 118 @metrics.SecondsTimerDecorator( 119 _job_timer_name % 'get_prioritized_special_tasks') 120 def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False): 121 """ 122 Returns all queued SpecialTasks prioritized for repair first, then 123 cleanup, then verify. 124 125 @param only_tasks_with_leased_hosts: If true, this method only returns 126 tasks with leased hosts. 127 128 @return: list of afe.models.SpecialTasks sorted according to priority. 129 """ 130 queued_tasks = models.SpecialTask.objects.filter(is_active=False, 131 is_complete=False, 132 host__locked=False) 133 # exclude hosts with active queue entries unless the SpecialTask is for 134 # that queue entry 135 queued_tasks = models.SpecialTask.objects.add_join( 136 queued_tasks, 'afe_host_queue_entries', 'host_id', 137 join_condition='afe_host_queue_entries.active', 138 join_from_key='host_id', force_left_join=True) 139 queued_tasks = queued_tasks.extra( 140 where=['(afe_host_queue_entries.id IS NULL OR ' 141 'afe_host_queue_entries.id = ' 142 'afe_special_tasks.queue_entry_id)']) 143 if only_tasks_with_leased_hosts: 144 queued_tasks = queued_tasks.filter(host__leased=True) 145 146 # reorder tasks by priority 147 task_priority_order = [models.SpecialTask.Task.REPAIR, 148 models.SpecialTask.Task.CLEANUP, 149 models.SpecialTask.Task.VERIFY, 150 models.SpecialTask.Task.RESET, 151 models.SpecialTask.Task.PROVISION] 152 def task_priority_key(task): 153 return task_priority_order.index(task.task) 154 return sorted(queued_tasks, key=task_priority_key) 155 156 157 @classmethod 158 def get_overlapping_jobs(cls): 159 """A helper method to get all active jobs using the same host. 160 161 @return: A list of dictionaries with the hqe id, job_id and host_id 162 of the currently overlapping jobs. 163 """ 164 # Filter all active hqes and stand alone special tasks to make sure 165 # a host isn't being used by two jobs at the same time. An incomplete 166 # stand alone special task can share a host with an active hqe, an 167 # example of this is the cleanup scheduled in gathering. 168 hqe_hosts = list(models.HostQueueEntry.objects.filter( 169 active=1, complete=0, host_id__isnull=False).values_list( 170 'host_id', flat=True)) 171 special_task_hosts = list(models.SpecialTask.objects.filter( 172 is_active=1, is_complete=0, host_id__isnull=False, 173 queue_entry_id__isnull=True).values_list('host_id', flat=True)) 174 host_counts = collections.Counter( 175 hqe_hosts + special_task_hosts).most_common() 176 multiple_hosts = [count[0] for count in host_counts if count[1] > 1] 177 return list(models.HostQueueEntry.objects.filter( 178 host_id__in=multiple_hosts, active=True).values( 179 'id', 'job_id', 'host_id')) 180 181 182 @metrics.SecondsTimerDecorator( 183 _job_timer_name % 'get_suite_host_assignment') 184 def get_suite_host_assignment(self): 185 """A helper method to get how many hosts each suite is holding. 186 187 @return: Two dictionaries (suite_host_num, hosts_to_suites) 188 suite_host_num maps suite job id to number of hosts 189 holding by its child jobs. 190 hosts_to_suites contains current hosts held by 191 any suites, and maps the host id to its parent_job_id. 192 """ 193 query = models.HostQueueEntry.objects.filter( 194 host_id__isnull=False, complete=0, active=1, 195 job__parent_job_id__isnull=False) 196 suite_host_num = {} 197 hosts_to_suites = {} 198 for hqe in query: 199 host_id = hqe.host_id 200 parent_job_id = hqe.job.parent_job_id 201 count = suite_host_num.get(parent_job_id, 0) 202 suite_host_num[parent_job_id] = count + 1 203 hosts_to_suites[host_id] = parent_job_id 204 return suite_host_num, hosts_to_suites 205 206 207 @metrics.SecondsTimerDecorator( _job_timer_name % 'get_min_duts_of_suites') 208 def get_min_duts_of_suites(self, suite_job_ids): 209 """Load suite_min_duts job keyval for a set of suites. 210 211 @param suite_job_ids: A set of suite job ids. 212 213 @return: A dictionary where the key is a suite job id, 214 the value is the value of 'suite_min_duts'. 215 """ 216 query = models.JobKeyval.objects.filter( 217 job_id__in=suite_job_ids, 218 key=constants.SUITE_MIN_DUTS_KEY, value__isnull=False) 219 return dict((keyval.job_id, int(keyval.value)) for keyval in query) 220 221 222_host_timer_name = 'chromeos/autotest/scheduler/host_query_durations/%s' 223class AFEHostQueryManager(object): 224 """Query manager for AFE Hosts.""" 225 226 def __init__(self): 227 """Create an AFEHostQueryManager. 228 229 @param db: A connection to the database with the afe_hosts table. 230 """ 231 self._db = scheduler_lib.ConnectionManager().get_connection() 232 233 234 def _process_many2many_dict(self, rows, flip=False): 235 result = {} 236 for row in rows: 237 left_id, right_id = int(row[0]), int(row[1]) 238 if flip: 239 left_id, right_id = right_id, left_id 240 result.setdefault(left_id, set()).add(right_id) 241 return result 242 243 244 def _get_sql_id_list(self, id_list): 245 return ','.join(str(item_id) for item_id in id_list) 246 247 248 def _get_many2many_dict(self, query, id_list, flip=False): 249 if not id_list: 250 return {} 251 query %= self._get_sql_id_list(id_list) 252 rows = self._db.execute(query) 253 return self._process_many2many_dict(rows, flip) 254 255 256 def _get_ready_hosts(self): 257 # We don't lose anything by re-doing these checks 258 # even though we release hosts on the same conditions. 259 # In the future we might have multiple clients that 260 # release_hosts and/or lock them independent of the 261 # scheduler tick. 262 hosts = scheduler_models.Host.fetch( 263 where="NOT afe_hosts.leased " 264 "AND NOT afe_hosts.locked " 265 "AND (afe_hosts.status IS NULL " 266 "OR afe_hosts.status = 'Ready')") 267 return dict((host.id, host) for host in hosts) 268 269 270 @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_acl_groups') 271 def _get_job_acl_groups(self, job_ids): 272 query = """ 273 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id 274 FROM afe_jobs 275 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner 276 INNER JOIN afe_acl_groups_users ON 277 afe_acl_groups_users.user_id = afe_users.id 278 WHERE afe_jobs.id IN (%s) 279 """ 280 return self._get_many2many_dict(query, job_ids) 281 282 283 def _get_job_ineligible_hosts(self, job_ids): 284 query = """ 285 SELECT job_id, host_id 286 FROM afe_ineligible_host_queues 287 WHERE job_id IN (%s) 288 """ 289 return self._get_many2many_dict(query, job_ids) 290 291 292 @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_dependencies') 293 def _get_job_dependencies(self, job_ids): 294 query = """ 295 SELECT job_id, label_id 296 FROM afe_jobs_dependency_labels 297 WHERE job_id IN (%s) 298 """ 299 return self._get_many2many_dict(query, job_ids) 300 301 302 @classmethod 303 def find_unused_healty_hosts(cls): 304 """Get hosts that are currently unused and in the READY state. 305 306 @return: A list of host objects, one for each unused healthy host. 307 """ 308 # Avoid any host with a currently active queue entry against it. 309 hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe ' 310 'ON (afe_hosts.id = active_hqe.host_id AND ' 311 'active_hqe.active)') 312 313 # Avoid any host with a new special task against it. There are 2 cases 314 # when an inactive but incomplete special task will not use the host 315 # this tick: 1. When the host is locked 2. When an active hqe already 316 # has special tasks for the same host. In both these cases this host 317 # will not be in the ready hosts list anyway. In all other cases, 318 # an incomplete special task will grab the host before a new job does 319 # by assigning an agent to it. 320 special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks ' 321 'ON (afe_hosts.id = new_tasks.host_id AND ' 322 'new_tasks.is_complete=0)') 323 324 return scheduler_models.Host.fetch( 325 joins='%s %s' % (hqe_join, special_task_join), 326 where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL " 327 "AND afe_hosts.leased " 328 "AND NOT afe_hosts.locked " 329 "AND (afe_hosts.status IS NULL " 330 "OR afe_hosts.status = 'Ready')") 331 332 @metrics.SecondsTimerDecorator(_host_timer_name % 'set_leased') 333 def set_leased(self, leased_value, **kwargs): 334 """Modify the leased bit on the hosts with ids in host_ids. 335 336 @param leased_value: The True/False value of the leased column for 337 the hosts with ids in host_ids. 338 @param kwargs: The args to use in finding matching hosts. 339 """ 340 logging.info('Setting leased = %s for the hosts that match %s', 341 leased_value, kwargs) 342 models.Host.objects.filter(**kwargs).update(leased=leased_value) 343 344 345 @metrics.SecondsTimerDecorator(_host_timer_name % 'get_labels') 346 def _get_labels(self, job_dependencies): 347 """ 348 Calculate a dict mapping label id to label object so that we don't 349 frequently round trip to the database every time we need a label. 350 351 @param job_dependencies: A dict mapping an integer job id to a list of 352 integer label id's. ie. {job_id: [label_id]} 353 @return: A dict mapping an integer label id to a scheduler model label 354 object. ie. {label_id: label_object} 355 356 """ 357 id_to_label = dict() 358 # Pull all the labels on hosts we might look at 359 host_labels = scheduler_models.Label.fetch( 360 where="id IN (SELECT label_id FROM afe_hosts_labels)") 361 id_to_label.update([(label.id, label) for label in host_labels]) 362 # and pull all the labels on jobs we might look at. 363 job_label_set = set() 364 for job_deps in job_dependencies.values(): 365 job_label_set.update(job_deps) 366 # On the rare/impossible chance that no jobs have any labels, we 367 # can skip this. 368 if job_label_set: 369 job_string_label_list = ','.join([str(x) for x in job_label_set]) 370 job_labels = scheduler_models.Label.fetch( 371 where="id IN (%s)" % job_string_label_list) 372 id_to_label.update([(label.id, label) for label in job_labels]) 373 return id_to_label 374 375 376 def refresh(self, pending_queue_entries): 377 """Update the query manager. 378 379 Cache information about a list of queue entries and eligible hosts 380 from the database so clients can avoid expensive round trips during 381 host acquisition. 382 383 @param pending_queue_entries: A list of queue entries about which we 384 need information. 385 """ 386 self._hosts_available = self._get_ready_hosts() 387 relevant_jobs = [queue_entry.job_id 388 for queue_entry in pending_queue_entries] 389 self._job_acls = self._get_job_acl_groups(relevant_jobs) 390 self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs)) 391 self._job_dependencies = (self._get_job_dependencies(relevant_jobs)) 392 host_ids = self._hosts_available.keys() 393 self._labels = self._get_labels(self._job_dependencies) 394