1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Rdb server module. 6""" 7 8import logging 9 10import common 11 12from django.core import exceptions as django_exceptions 13from django.db.models import fields 14from django.db.models import Q 15from autotest_lib.frontend.afe import models 16from autotest_lib.scheduler import rdb_cache_manager 17from autotest_lib.scheduler import rdb_hosts 18from autotest_lib.scheduler import rdb_requests 19from autotest_lib.scheduler import rdb_utils 20from autotest_lib.server import utils 21 22try: 23 from chromite.lib import metrics 24except ImportError: 25 metrics = utils.metrics_mock 26 27 28_rdb_timer_name = 'chromeos/autotest/scheduler/rdb/durations/%s' 29_is_master = not utils.is_shard() 30 31# Qeury managers: Provide a layer of abstraction over the database by 32# encapsulating common query patterns used by the rdb. 33class BaseHostQueryManager(object): 34 """Base manager for host queries on all hosts. 35 """ 36 37 host_objects = models.Host.objects 38 39 40 def update_hosts(self, host_ids, **kwargs): 41 """Update fields on a hosts. 42 43 @param host_ids: A list of ids of hosts to update. 44 @param kwargs: A key value dictionary corresponding to column, value 45 in the host database. 46 """ 47 self.host_objects.filter(id__in=host_ids).update(**kwargs) 48 49 50 @rdb_hosts.return_rdb_host 51 def get_hosts(self, ids): 52 """Get host objects for the given ids. 53 54 @param ids: The ids for which we need host objects. 55 56 @returns: A list of RDBServerHostWrapper objects, ordered by host_id. 57 """ 58 return self.host_objects.filter(id__in=ids).order_by('id') 59 60 61 @rdb_hosts.return_rdb_host 62 def find_hosts(self, deps, acls): 63 """Finds valid hosts matching deps, acls. 64 65 @param deps: A list/frozenset of dependencies (label id) to match. 66 @param acls: A list/frozenset of acls, at least one of which must 67 coincide with an acl group the chosen host is in. 68 69 @return: A set of matching hosts available. 70 """ 71 hosts_available = self.host_objects.filter(invalid=0) 72 hosts_available = hosts_available.filter(Q(aclgroup__id__in=acls)) 73 hosts_available = models.Host.get_hosts_with_label_ids( 74 list(deps), hosts_available) 75 return set(hosts_available) 76 77 78class AvailableHostQueryManager(BaseHostQueryManager): 79 """Query manager for requests on un-leased, un-locked hosts. 80 """ 81 82 host_objects = models.Host.leased_objects 83 84 85# Request Handlers: Used in conjunction with requests in rdb_utils, these 86# handlers acquire hosts for a request and record the acquisition in 87# an response_map dictionary keyed on the request itself, with the host/hosts 88# as values. 89class BaseHostRequestHandler(object): 90 """Handler for requests related to hosts, leased or unleased. 91 92 This class is only capable of blindly returning host information. 93 """ 94 95 def __init__(self): 96 self.host_query_manager = BaseHostQueryManager() 97 self.response_map = {} 98 99 100 def update_response_map(self, request, response, append=False): 101 """Record a response for a request. 102 103 The response_map only contains requests that were either satisfied, or 104 that ran into an exception. Often this translates to reserving hosts 105 against a request. If the rdb hit an exception processing a request, the 106 exception gets recorded in the map for the client to reraise. 107 108 @param response: A response for the request. 109 @param request: The request that has reserved these hosts. 110 @param append: Boolean, whether to append new hosts in 111 |response| for existing request. 112 Will not append if existing response is 113 a list of exceptions. 114 115 @raises RDBException: If an empty values is added to the map. 116 """ 117 if not response: 118 raise rdb_utils.RDBException('response_map dict can only contain ' 119 'valid responses. Request %s, response %s is invalid.' % 120 (request, response)) 121 exist_response = self.response_map.setdefault(request, []) 122 if exist_response and not append: 123 raise rdb_utils.RDBException('Request %s already has response %s ' 124 'the rdb cannot return multiple ' 125 'responses for the same request.' % 126 (request, response)) 127 if exist_response and append and not isinstance( 128 exist_response[0], rdb_hosts.RDBHost): 129 # Do not append if existing response contains exception. 130 return 131 exist_response.extend(response) 132 133 134 def _check_response_map(self): 135 """Verify that we never give the same host to different requests. 136 137 @raises RDBException: If the same host is assigned to multiple requests. 138 """ 139 unique_hosts = set([]) 140 for request, response in self.response_map.iteritems(): 141 # Each value in the response map can only either be a list of 142 # RDBHosts or a list of RDBExceptions, not a mix of both. 143 if isinstance(response[0], rdb_hosts.RDBHost): 144 if any([host in unique_hosts for host in response]): 145 raise rdb_utils.RDBException( 146 'Assigning the same host to multiple requests. New ' 147 'hosts %s, request %s, response_map: %s' % 148 (response, request, self.response_map)) 149 else: 150 unique_hosts = unique_hosts.union(response) 151 152 153 def _record_exceptions(self, request, exceptions): 154 """Record a list of exceptions for a request. 155 156 @param request: The request for which the exceptions were hit. 157 @param exceptions: The exceptions hit while processing the request. 158 """ 159 rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions] 160 self.update_response_map(request, rdb_exceptions) 161 162 163 def get_response(self): 164 """Convert all RDBServerHostWrapper objects to host info dictionaries. 165 166 @return: A dictionary mapping requests to a list of matching host_infos. 167 168 @raises RDBException: If the same host is assigned to multiple requests. 169 """ 170 self._check_response_map() 171 for request, response in self.response_map.iteritems(): 172 self.response_map[request] = [reply.wire_format() 173 for reply in response] 174 return self.response_map 175 176 177 def update_hosts(self, update_requests): 178 """Updates host tables with a payload. 179 180 @param update_requests: A list of update requests, as defined in 181 rdb_requests.UpdateHostRequest. 182 """ 183 # Last payload for a host_id wins in the case of conflicting requests. 184 unique_host_requests = {} 185 for request in update_requests: 186 if unique_host_requests.get(request.host_id): 187 unique_host_requests[request.host_id].update(request.payload) 188 else: 189 unique_host_requests[request.host_id] = request.payload 190 191 # Batch similar payloads so we can do them in one table scan. 192 similar_requests = {} 193 for host_id, payload in unique_host_requests.iteritems(): 194 similar_requests.setdefault(payload, []).append(host_id) 195 196 # If fields of the update don't match columns in the database, 197 # record the exception in the response map. This also means later 198 # updates will get applied even if previous updates fail. 199 for payload, hosts in similar_requests.iteritems(): 200 try: 201 response = self.host_query_manager.update_hosts(hosts, **payload) 202 except (django_exceptions.FieldError, 203 fields.FieldDoesNotExist, ValueError) as e: 204 for host in hosts: 205 # Since update requests have a consistent hash this will map 206 # to the same key as the original request. 207 request = rdb_requests.UpdateHostRequest( 208 host_id=host, payload=payload).get_request() 209 self._record_exceptions(request, [e]) 210 211 212 def batch_get_hosts(self, host_requests): 213 """Get hosts matching the requests. 214 215 This method does not acquire the hosts, i.e it reserves hosts against 216 requests leaving their leased state untouched. 217 218 @param host_requests: A list of requests, as defined in 219 rdb_utils.BaseHostRequest. 220 """ 221 host_ids = set([request.host_id for request in host_requests]) 222 host_map = {} 223 224 # This list will not contain available hosts if executed using 225 # an AvailableHostQueryManager. 226 for host in self.host_query_manager.get_hosts(host_ids): 227 host_map[host.id] = host 228 for request in host_requests: 229 if request.host_id in host_map: 230 self.update_response_map(request, [host_map[request.host_id]]) 231 else: 232 logging.warning('rdb could not get host for request: %s, it ' 233 'is already leased or locked', request) 234 235 236class AvailableHostRequestHandler(BaseHostRequestHandler): 237 """Handler for requests related to available (unleased and unlocked) hosts. 238 239 This class is capable of acquiring or validating hosts for requests. 240 """ 241 242 243 def __init__(self): 244 self.host_query_manager = AvailableHostQueryManager() 245 self.cache = rdb_cache_manager.RDBHostCacheManager() 246 self.response_map = {} 247 self.unsatisfied_requests = 0 248 self.leased_hosts_count = 0 249 self.request_accountant = None 250 251 252 @metrics.SecondsTimerDecorator(_rdb_timer_name % 'lease_hosts') 253 def lease_hosts(self, hosts): 254 """Leases a list of hosts. 255 256 @param hosts: A list of RDBServerHostWrapper instances to lease. 257 258 @return: The list of RDBServerHostWrappers that were successfully 259 leased. 260 """ 261 #TODO(beeps): crbug.com/353183. 262 unleased_hosts = set(hosts) 263 leased_hosts = set([]) 264 for host in unleased_hosts: 265 try: 266 host.lease() 267 except rdb_utils.RDBException as e: 268 logging.error('Unable to lease host %s: %s', host.hostname, e) 269 else: 270 leased_hosts.add(host) 271 return list(leased_hosts) 272 273 274 @classmethod 275 def valid_host_assignment(cls, request, host): 276 """Check if a host, request pairing is valid. 277 278 @param request: The request to match against the host. 279 @param host: An RDBServerHostWrapper instance. 280 281 @return: True if the host, request assignment is valid. 282 283 @raises RDBException: If the request already has another host_ids 284 associated with it. 285 """ 286 if request.host_id and request.host_id != host.id: 287 raise rdb_utils.RDBException( 288 'Cannot assign a different host for request: %s, it ' 289 'already has one: %s ' % (request, host.id)) 290 291 # Getting all labels and acls might result in large queries, so 292 # bail early if the host is already leased. 293 if host.leased: 294 return False 295 # If a host is invalid it must be a one time host added to the 296 # afe specifically for this purpose, so it doesn't require acl checking. 297 acl_match = (request.acls.intersection(host.acls) or host.invalid) 298 label_match = (request.deps.intersection(host.labels) == request.deps) 299 return acl_match and label_match 300 301 302 @classmethod 303 def _sort_hosts_by_preferred_deps(cls, hosts, preferred_deps): 304 """Sort hosts in the order of how many preferred deps it has. 305 306 This allows rdb always choose the hosts with the most preferred deps 307 for a request. One important use case is including cros-version as 308 a preferred dependence. By choosing a host with the same cros-version, 309 we can save the time on provisioning it. Note this is not guaranteed 310 if preferred_deps contains other labels as well. 311 312 @param hosts: A list of hosts to sort. 313 @param preferred_deps: A list of deps that are preferred. 314 315 @return: A list of sorted hosts. 316 317 """ 318 hosts = sorted( 319 hosts, 320 key=lambda host: len(set(preferred_deps) & set(host.labels)), 321 reverse=True) 322 return hosts 323 324 325 @rdb_cache_manager.memoize_hosts 326 def _acquire_hosts(self, request, hosts_required, is_acquire_min_duts=False, 327 **kwargs): 328 """Acquire hosts for a group of similar requests. 329 330 Find and acquire hosts that can satisfy a group of requests. 331 1. If the caching decorator doesn't pass in a list of matching hosts 332 via the MEMOIZE_KEY this method will directly check the database for 333 matching hosts. 334 2. If all matching hosts are not leased for this request, the remaining 335 hosts are returned to the caching decorator, to place in the cache. 336 337 @param hosts_required: Number of hosts required to satisfy request. 338 @param request: The request for hosts. 339 @param is_acquire_min_duts: Boolean. Indicate whether this is to 340 acquire minimum required duts, only used 341 for stats purpose. 342 343 @return: The list of excess matching hosts. 344 """ 345 hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, []) 346 if not hosts: 347 hosts = self.host_query_manager.find_hosts( 348 request.deps, request.acls) 349 350 # <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)-> 351 # | -leased_hosts- | -stale cached hosts- | -unleased matching- | 352 # --used this request---used by earlier request----------unused-------- 353 hosts = self._sort_hosts_by_preferred_deps( 354 hosts, request.preferred_deps) 355 attempt_lease_hosts = min(len(hosts), hosts_required) 356 leased_host_count = 0 357 if attempt_lease_hosts: 358 leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts]) 359 if leased_hosts: 360 self.update_response_map(request, leased_hosts, append=True) 361 362 # [:attempt_leased_hosts] - leased_hosts will include hosts that 363 # failed leasing, most likely because they're already leased, so 364 # don't cache them again. 365 leased_host_count = len(leased_hosts) 366 failed_leasing = attempt_lease_hosts - leased_host_count 367 if failed_leasing > 0: 368 # For the sake of simplicity this calculation assumes that 369 # leasing only fails if there's a stale cached host already 370 # leased by a previous request, ergo, we can only get here 371 # through a cache hit. 372 line_length = len(hosts) 373 self.cache.stale_entries.append( 374 (float(failed_leasing)/line_length) * 100) 375 self.leased_hosts_count += leased_host_count 376 if is_acquire_min_duts: 377 self.request_accountant.record_acquire_min_duts( 378 request, hosts_required, leased_host_count) 379 self.unsatisfied_requests += max(hosts_required - leased_host_count, 0) 380 # Cache the unleased matching hosts against the request. 381 return hosts[attempt_lease_hosts:] 382 383 384 @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_acquire_hosts') 385 def batch_acquire_hosts(self, host_requests): 386 """Acquire hosts for a list of requests. 387 388 The act of acquisition involves finding and leasing a set of hosts 389 that match the parameters of a request. Each acquired host is added 390 to the response_map dictionary as an RDBServerHostWrapper. 391 392 @param host_requests: A list of requests to acquire hosts. 393 """ 394 distinct_requests = 0 395 396 logging.debug('Processing %s host acquisition requests', 397 len(host_requests)) 398 metrics.Gauge('chromeos/autotest/scheduler/pending_host_acq_requests' 399 ).set(len(host_requests)) 400 401 self.request_accountant = rdb_utils.RequestAccountant(host_requests) 402 # First pass tries to satisfy min_duts for each suite. 403 for request in self.request_accountant.requests: 404 to_acquire = self.request_accountant.get_min_duts(request) 405 if to_acquire > 0: 406 self._acquire_hosts(request, to_acquire, 407 is_acquire_min_duts=True) 408 distinct_requests += 1 409 410 # Second pass tries to allocate duts to the rest unsatisfied requests. 411 for request in self.request_accountant.requests: 412 to_acquire = self.request_accountant.get_duts(request) 413 if to_acquire > 0: 414 self._acquire_hosts(request, to_acquire, 415 is_acquire_min_duts=False) 416 417 self.cache.record_stats() 418 logging.debug('Host acquisition stats: distinct requests: %s, leased ' 419 'hosts: %s, unsatisfied requests: %s', distinct_requests, 420 self.leased_hosts_count, self.unsatisfied_requests) 421 422 423 @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_validate_hosts') 424 def batch_validate_hosts(self, requests): 425 """Validate requests with hosts. 426 427 Reserve all hosts, check each one for validity and discard invalid 428 request-host pairings. Lease the remaining hsots. 429 430 @param requests: A list of requests to validate. 431 432 @raises RDBException: If multiple hosts or the wrong host is returned 433 for a response. 434 """ 435 # The following cases are possible for frontend requests: 436 # 1. Multiple requests for 1 host, with different acls/deps/priority: 437 # These form distinct requests because they hash differently. 438 # The response map will contain entries like: {r1: h1, r2: h1} 439 # after the batch_get_hosts call. There are 2 sub-cases: 440 # a. Same deps/acls, different priority: 441 # Since we sort the requests based on priority, the 442 # higher priority request r1, will lease h1. The 443 # validation of r2, h1 will fail because of the r1 lease. 444 # b. Different deps/acls, only one of which matches the host: 445 # The matching request will lease h1. The other host 446 # pairing will get dropped from the response map. 447 # 2. Multiple requests with the same acls/deps/priority and 1 host: 448 # These all have the same request hash, so the response map will 449 # contain: {r: h}, regardless of the number of r's. If this is not 450 # a valid host assignment it will get dropped from the response. 451 self.batch_get_hosts(set(requests)) 452 for request in sorted(self.response_map.keys(), 453 key=lambda request: request.priority, reverse=True): 454 hosts = self.response_map[request] 455 if len(hosts) > 1: 456 raise rdb_utils.RDBException('Got multiple hosts for a single ' 457 'request. Hosts: %s, request %s.' % (hosts, request)) 458 # Job-shard is 1:1 mapping. Because a job can only belongs 459 # to one shard, or belongs to master, we disallow frontend job 460 # that spans hosts on and off shards or across multiple shards, 461 # which would otherwise break the 1:1 mapping. 462 # As such, on master, if a request asks for multiple hosts and 463 # if any host is found on shard, we assume other requested hosts 464 # would also be on the same shard. We can safely drop this request. 465 ignore_request = _is_master and any( 466 [host.shard_id for host in hosts]) 467 if (not ignore_request and 468 (self.valid_host_assignment(request, hosts[0]) and 469 self.lease_hosts(hosts))): 470 continue 471 del self.response_map[request] 472 logging.warning('Request %s was not able to lease host %s', 473 request, hosts[0]) 474 475 476# Request dispatchers: Create the appropriate request handler, send a list 477# of requests to one of its methods. The corresponding request handler in 478# rdb_lib must understand how to match each request with a response from a 479# dispatcher, the easiest way to achieve this is to returned the response_map 480# attribute of the request handler, after making the appropriate requests. 481def get_hosts(host_requests): 482 """Get host information about the requested hosts. 483 484 @param host_requests: A list of requests as defined in BaseHostRequest. 485 @return: A dictionary mapping each request to a list of hosts. 486 """ 487 rdb_handler = BaseHostRequestHandler() 488 rdb_handler.batch_get_hosts(host_requests) 489 return rdb_handler.get_response() 490 491 492def update_hosts(update_requests): 493 """Update hosts. 494 495 @param update_requests: A list of updates to host tables 496 as defined in UpdateHostRequest. 497 """ 498 rdb_handler = BaseHostRequestHandler() 499 rdb_handler.update_hosts(update_requests) 500 return rdb_handler.get_response() 501 502 503def rdb_host_request_dispatcher(host_requests): 504 """Dispatcher for all host acquisition queries. 505 506 @param host_requests: A list of requests for acquiring hosts, as defined in 507 AcquireHostRequest. 508 @return: A dictionary mapping each request to a list of hosts, or 509 an empty list if none could satisfy the request. Eg: 510 {AcquireHostRequest.template: [host_info_dictionaries]} 511 """ 512 validation_requests = [] 513 require_hosts_requests = [] 514 515 # Validation requests are made by a job scheduled against a specific host 516 # specific host (eg: through the frontend) and only require the rdb to 517 # match the parameters of the host against the request. Acquisition 518 # requests are made by jobs that need hosts (eg: suites) and the rdb needs 519 # to find hosts matching the parameters of the request. 520 for request in host_requests: 521 if request.host_id: 522 validation_requests.append(request) 523 else: 524 require_hosts_requests.append(request) 525 526 rdb_handler = AvailableHostRequestHandler() 527 rdb_handler.batch_validate_hosts(validation_requests) 528 rdb_handler.batch_acquire_hosts(require_hosts_requests) 529 return rdb_handler.get_response() 530