1# Copyright 2017 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import Queue 6import collections 7import logging 8import threading 9import time 10 11import common 12from autotest_lib.client.bin import utils 13from autotest_lib.client.common_lib import error 14from autotest_lib.site_utils.lxc.container_pool import error as lxc_error 15from autotest_lib.site_utils.lxc.constants import \ 16 CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX 17 18try: 19 from chromite.lib import metrics 20 from infra_libs import ts_mon 21except ImportError: 22 import mock 23 metrics = utils.metrics_mock 24 ts_mon = mock.Mock() 25 26# The maximum number of concurrent workers. Each worker is responsible for 27# managing the creation of a single container. 28# TODO(kenobi): This may be need to be adjusted for different hosts (e.g. full 29# vs quarter shards) 30_MAX_CONCURRENT_WORKERS = 5 31# Timeout (in seconds) for container creation. After this amount of time, 32# container creation tasks are abandoned and retried. 33_CONTAINER_CREATION_TIMEOUT = 600 34# The period (in seconds) affects the rate at which the monitor thread runs its 35# event loop. This drives a number of other factors, e.g. how long to wait for 36# the thread to respond to shutdown requests. 37_MIN_MONITOR_PERIOD = 0.1 38# The maximum number of errors per hour. After this limit is reached, further 39# pool creation is throttled. 40_MAX_ERRORS_PER_HOUR = 200 41 42 43class Pool(object): 44 """A fixed-size pool of LXC containers. 45 46 Containers are created using a factory that is passed to the Pool. A pool 47 size is passed at construction time - this is the number of containers the 48 Pool will attempt to maintain. Whenever the number of containers falls 49 below the given size, the Pool will start creating new containers to 50 replenish itself. 51 52 In order to avoid overloading the host, the number of simultaneous container 53 creations is limited to _MAX_CONCURRENT_WORKERS. 54 55 When container creation produces errors, those errors are saved (see 56 Pool.errors). It is the client's responsibility to periodically check and 57 empty out the error queue. 58 """ 59 60 def __init__(self, factory, size): 61 """Creates a new Pool instance. 62 63 @param factory: A factory object that will be called upon to create new 64 containers. The passed object must have a method called 65 "create_container" that takes no arguments and returns 66 an instance of a Container. 67 @param size: The size of the Pool. The Pool attempts to keep this many 68 containers running at all times. 69 """ 70 # Pools of size less than 2 don't make sense. Don't allow them. 71 if size < 2: 72 raise ValueError('Invalid pool size.') 73 74 logging.debug('Pool.__init__ called. Size: %d', size) 75 self._pool = Queue.Queue(size) 76 self._monitor = _Monitor(factory, self._pool) 77 self._monitor.start() 78 79 80 def get(self, timeout=0): 81 """Gets a container from the pool. 82 83 @param timeout: Number of seconds to wait before returning. 84 - If 0 (the default), return immediately. If a 85 Container is not immediately available, return None. 86 - If a positive number, block at most <timeout> seconds, 87 then return None if a Container was not available 88 within that time. 89 - If None, block indefinitely until a Container is 90 available. 91 92 @return: A container from the pool. 93 """ 94 try: 95 # Block only if timeout is not zero. 96 logging.info('Pool.get called.') 97 return self._pool.get(block=(timeout != 0), 98 timeout=timeout) 99 except Queue.Empty: 100 return None 101 102 103 def cleanup(self, timeout=0): 104 """Cleans up the container pool. 105 106 Stops all worker threads, and destroys all Containers still in the Pool. 107 108 @param timeout: For testing. If this is non-zero, it specifies the 109 number of seconds to wait for each worker to shut down. 110 An error is raised if shutdown has not occurred by then. 111 If zero (the default), don't wait for worker threads to 112 shut down, just return immediately. 113 """ 114 logging.info('Pool.cleanup called.') 115 # Stop the monitor thread, then drain the pool. 116 self._monitor.stop(timeout) 117 118 try: 119 dcount = 0 120 logging.debug('Emptying container pool') 121 while True: 122 container = self._pool.get(block=False) 123 dcount += 1 124 container.destroy() 125 except Queue.Empty: 126 pass 127 finally: 128 metrics.Counter(METRICS_PREFIX + '/containers_cleaned_up' 129 ).increment_by(dcount) 130 logging.debug('Done. Destroyed %d containers', dcount) 131 132 133 @property 134 def size(self): 135 """Returns the current size of the pool. 136 137 Note that the pool is asynchronous. Returning a size greater than zero 138 does not guarantee that a subsequent call to Pool.get will not block. 139 Conversely, returning a size of zero does not guarantee that a 140 subsequent call to Pool.get will block. 141 """ 142 return self._pool.qsize() 143 144 145 @property 146 def capacity(self): 147 """Returns the max size of the pool.""" 148 return self._pool.maxsize 149 150 151 @property 152 def errors(self): 153 """Returns worker errors. 154 155 @return: A Queue containing all the errors encountered on worker 156 threads. 157 """ 158 return self._monitor.errors; 159 160 161 @property 162 def worker_count(self): 163 """Returns the number of currently active workers. 164 165 Note that this isn't quite the same as the number of currently alive 166 worker threads. Worker threads that have timed out or been cancelled 167 may be technically alive, but they are not included in this count. 168 """ 169 return len(self._monitor._workers) 170 171 172class _Monitor(threading.Thread): 173 """A thread that manages the creation of containers for the pool. 174 175 Container creation is potentially time-consuming and can hang or crash. The 176 Monitor class manages a pool of independent threads, each responsible for 177 the creation of a single Container. This provides parallelized container 178 creation and ensures that a single Container creation hanging/crashing does 179 not starve or crash the Pool. 180 """ 181 182 def __init__(self, factory, pool): 183 """Creates a new monitor. 184 185 @param factory: A container factory. 186 @param pool: A pool instance to push created containers into. 187 """ 188 super(_Monitor, self).__init__(name='pool_monitor') 189 190 self._factory = factory 191 self._pool = pool 192 193 # List of worker threads. Access this only from the monitor thread. 194 self._worker_max = _MAX_CONCURRENT_WORKERS 195 self._workers = [] 196 197 # A flag for stopping the monitor. 198 self._stop = False 199 200 # Stores errors from worker threads. 201 self.errors = Queue.Queue() 202 203 # Throttle on errors, to avoid log spam and CPU spinning. 204 self._error_timestamps = collections.deque() 205 206 207 def run(self): 208 """Supplies the container pool with containers.""" 209 logging.debug('Start event loop.') 210 while not self._stop: 211 self._clear_old_errors() 212 self._create_workers() 213 self._poll_workers() 214 time.sleep(_MIN_MONITOR_PERIOD) 215 logging.debug('Exit event loop.') 216 217 # Clean up - stop all workers. 218 for worker in self._workers: 219 worker.cancel() 220 221 222 def stop(self, timeout=0): 223 """Stops this thread. 224 225 This function blocks until the monitor thread has stopped. 226 227 @param timeout: If this is a non-zero number, wait this long (in 228 seconds) for each worker thread to stop. If zero (the 229 default), don't wait for worker threads to exit. 230 231 @raise WorkerTimeoutError: If a worker thread does not exit within the 232 specified timeout. 233 """ 234 logging.info('Stop requested.') 235 self._stop = True 236 self.join() 237 logging.info('Stopped.') 238 # Wait for workers if timeout was requested. 239 if timeout > 0: 240 logging.debug('Waiting for workers to terminate...') 241 for worker in self._workers: 242 worker.join(timeout) 243 if worker.is_alive(): 244 raise lxc_error.WorkerTimeoutError() 245 246 247 def _create_workers(self): 248 """Spawns workers to handle container requests. 249 250 This method modifies the _workers list and should only be called from 251 within run(). 252 """ 253 if self._pool.full(): 254 return 255 256 # Do not exceed the worker limit. 257 if len(self._workers) >= self._worker_max: 258 return 259 260 too_many_errors = len(self._error_timestamps) >= _MAX_ERRORS_PER_HOUR 261 metrics.Counter(METRICS_PREFIX + '/error_throttled', 262 field_spec=[ts_mon.BooleanField('throttled')] 263 ).increment(fields={'throttled': too_many_errors}) 264 # Throttle if too many errors occur. 265 if too_many_errors: 266 logging.warning('Error throttled (until %d)', 267 self._error_timestamps[0] + 3600) 268 return 269 270 # Create workers to refill the pool. 271 qsize = self._pool.qsize() 272 shortfall = self._pool.maxsize - qsize 273 old_worker_count = len(self._workers) 274 275 # Avoid spamming - only log if the monitor is taking some action. Log 276 # this before creating worker threads, because we are counting live 277 # threads and want to avoid race conditions w.r.t. threads actually 278 # starting. 279 if (old_worker_count < shortfall and 280 old_worker_count < self._worker_max): 281 # This can include workers that aren't currently in the self._worker 282 # list, e.g. workers that were dropped from the list because they 283 # timed out. 284 active_workers = sum([1 for t in threading.enumerate() 285 if type(t) is _Worker]) 286 # qsize : Current size of the container pool. 287 # shortfall: Number of empty slots currently in the pool. 288 # workers : m+n, where m is the current number of active worker 289 # threads and n is the number of new threads created. 290 logging.debug('qsize:%d shortfall:%d workers:%d', 291 qsize, shortfall, active_workers) 292 if len(self._workers) < shortfall: 293 worker = _Worker(self._factory, 294 self._on_worker_result, 295 self._on_worker_error) 296 worker.start() 297 self._workers.append(worker) 298 299 300 def _poll_workers(self): 301 """Checks worker states and deals with them. 302 303 This method modifies the _workers list and should only be called from 304 within run(). 305 306 Completed workers are taken off the worker list and their results/errors 307 are logged. 308 """ 309 completed = [] 310 incomplete = [] 311 for worker in self._workers: 312 if worker.check_health(): 313 incomplete.append(worker) 314 else: 315 completed.append(worker) 316 317 self._workers = incomplete 318 319 320 def _on_worker_result(self, result): 321 """Receives results from completed worker threads. 322 323 Pass this as the result callback for worker threads. Worker threads 324 should call this when they produce a container. 325 """ 326 logging.debug('Worker result: %r', result) 327 self._pool.put(result) 328 329 330 def _on_worker_error(self, worker, err): 331 """Receives errors from worker threads. 332 333 Pass this as the error callback for worker threads. Worker threads 334 should call this if errors occur. 335 """ 336 timestamp = time.time() 337 self._error_timestamps.append(timestamp) 338 self.errors.put(err) 339 logging.error('[%d] Worker error: %s', worker.ident, err) 340 341 342 def _clear_old_errors(self): 343 """Clears errors more than an hour old out of the log.""" 344 one_hour_ago = time.time() - 3600 345 metrics.Counter(METRICS_PREFIX + '/recent_errors' 346 ).increment_by(len(self._error_timestamps)) 347 while (self._error_timestamps and 348 self._error_timestamps[0] < one_hour_ago): 349 self._error_timestamps.popleft() 350 # Avoid logspam - log only when some action was taken. 351 logging.error('Worker error count: %d', len(self._error_timestamps)) 352 353 354class _Worker(threading.Thread): 355 """A worker thread tasked with managing the creation of a single container. 356 357 The worker is a daemon thread that calls upon a container factory to create 358 a single container. If container creation raises any exceptions, they are 359 logged and the worker exits. The worker also provides a mechanism for the 360 parent thread to impose timeouts on container creation. 361 """ 362 363 def __init__(self, factory, result_cb, error_cb): 364 """Creates a new Worker. 365 366 @param factory: A factory object that will be called upon to create 367 Containers. 368 """ 369 super(_Worker, self).__init__(name='pool_worker') 370 # Hanging worker threads should not keep the pool process alive. 371 self.daemon = True 372 373 self._factory = factory 374 375 self._result_cb = result_cb 376 self._error_cb = error_cb 377 378 self._cancelled = False 379 self._start_time = None 380 381 # A lock for breaking race conditions in worker cancellation. Use a 382 # recursive lock because _check_health requires it. 383 self._completion_lock = threading.RLock() 384 self._completed = False 385 386 387 def run(self): 388 """Creates a single container.""" 389 self._start_time = time.time() 390 container = None 391 try: 392 container = self._factory.create_container() 393 container.start(wait_for_network=True) 394 except Exception as e: 395 logging.error('Worker error: %s', error.format_error()) 396 self._error_cb(self, e) 397 finally: 398 # All this has to happen atomically, otherwise race conditions can 399 # arise w.r.t. cancellation. 400 with self._completion_lock: 401 self._completed = True 402 if self._cancelled: 403 # If the job was cancelled, destroy the container instead of 404 # putting it in the result queue. Do not release the 405 # throttle, as it would have been released when the 406 # cancellation occurred. 407 if container is not None: 408 container.destroy() 409 else: 410 # Put the container in the result field. Release the 411 # throttle so another task can be picked up. 412 # Container may be None if errors occurred. 413 if container is not None: 414 self._result_cb(container) 415 416 417 def cancel(self): 418 """Cancels the work request. 419 420 The container will be destroyed when created, instead of being added to 421 the container pool. 422 """ 423 with self._completion_lock: 424 if self._completed: 425 return False 426 else: 427 self._cancelled = True 428 return True 429 430 431 def check_health(self): 432 """Checks that a worker is alive and has not timed out. 433 434 Checks the run time of the worker to make sure it hasn't timed out. 435 Cancels workers that exceed the timeout. 436 437 @return: True if the worker is alive and has not timed out, False 438 otherwise. 439 """ 440 # Acquire the completion lock so as to avoid race conditions if the 441 # factory happens to return just as we are timing out. 442 with self._completion_lock: 443 if not self.is_alive() or self._cancelled or self._completed: 444 return False 445 446 # Thread hasn't started yet - count this as healthy. 447 if self._start_time is None: 448 return True 449 450 # If alive, check the timeout and cancel if necessary. 451 runtime = time.time() - self._start_time 452 if runtime > _CONTAINER_CREATION_TIMEOUT: 453 if self.cancel(): 454 self._error_cb(self, lxc_error.WorkerTimeoutError()) 455 return False 456 457 return True 458