• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import Queue
6import collections
7import logging
8import threading
9import time
10
11import common
12from autotest_lib.client.bin import utils
13from autotest_lib.client.common_lib import error
14from autotest_lib.site_utils.lxc.container_pool import error as lxc_error
15from autotest_lib.site_utils.lxc.constants import \
16    CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX
17
18try:
19    from chromite.lib import metrics
20    from infra_libs import ts_mon
21except ImportError:
22    import mock
23    metrics = utils.metrics_mock
24    ts_mon = mock.Mock()
25
26# The maximum number of concurrent workers.  Each worker is responsible for
27# managing the creation of a single container.
28# TODO(kenobi): This may be need to be adjusted for different hosts (e.g. full
29# vs quarter shards)
30_MAX_CONCURRENT_WORKERS = 5
31# Timeout (in seconds) for container creation.  After this amount of time,
32# container creation tasks are abandoned and retried.
33_CONTAINER_CREATION_TIMEOUT = 600
34# The period (in seconds) affects the rate at which the monitor thread runs its
35# event loop.  This drives a number of other factors, e.g. how long to wait for
36# the thread to respond to shutdown requests.
37_MIN_MONITOR_PERIOD = 0.1
38# The maximum number of errors per hour.  After this limit is reached, further
39# pool creation is throttled.
40_MAX_ERRORS_PER_HOUR = 200
41
42
43class Pool(object):
44    """A fixed-size pool of LXC containers.
45
46    Containers are created using a factory that is passed to the Pool.  A pool
47    size is passed at construction time - this is the number of containers the
48    Pool will attempt to maintain.  Whenever the number of containers falls
49    below the given size, the Pool will start creating new containers to
50    replenish itself.
51
52    In order to avoid overloading the host, the number of simultaneous container
53    creations is limited to _MAX_CONCURRENT_WORKERS.
54
55    When container creation produces errors, those errors are saved (see
56    Pool.errors).  It is the client's responsibility to periodically check and
57    empty out the error queue.
58    """
59
60    def __init__(self, factory, size):
61        """Creates a new Pool instance.
62
63        @param factory: A factory object that will be called upon to create new
64                        containers.  The passed object must have a method called
65                        "create_container" that takes no arguments and returns
66                        an instance of a Container.
67        @param size: The size of the Pool.  The Pool attempts to keep this many
68                     containers running at all times.
69        """
70        # Pools of size less than 2 don't make sense.  Don't allow them.
71        if size < 2:
72            raise ValueError('Invalid pool size.')
73
74        logging.debug('Pool.__init__ called.  Size: %d', size)
75        self._pool = Queue.Queue(size)
76        self._monitor = _Monitor(factory, self._pool)
77        self._monitor.start()
78
79
80    def get(self, timeout=0):
81        """Gets a container from the pool.
82
83        @param timeout: Number of seconds to wait before returning.
84                        - If 0 (the default), return immediately.  If a
85                          Container is not immediately available, return None.
86                        - If a positive number, block at most <timeout> seconds,
87                          then return None if a Container was not available
88                          within that time.
89                        - If None, block indefinitely until a Container is
90                          available.
91
92        @return: A container from the pool.
93        """
94        try:
95            # Block only if timeout is not zero.
96            logging.info('Pool.get called.')
97            return self._pool.get(block=(timeout != 0),
98                                  timeout=timeout)
99        except Queue.Empty:
100            return None
101
102
103    def cleanup(self, timeout=0):
104        """Cleans up the container pool.
105
106        Stops all worker threads, and destroys all Containers still in the Pool.
107
108        @param timeout: For testing.  If this is non-zero, it specifies the
109                        number of seconds to wait for each worker to shut down.
110                        An error is raised if shutdown has not occurred by then.
111                        If zero (the default), don't wait for worker threads to
112                        shut down, just return immediately.
113        """
114        logging.info('Pool.cleanup called.')
115        # Stop the monitor thread, then drain the pool.
116        self._monitor.stop(timeout)
117
118        try:
119            dcount = 0
120            logging.debug('Emptying container pool')
121            while True:
122                container = self._pool.get(block=False)
123                dcount += 1
124                container.destroy()
125        except Queue.Empty:
126            pass
127        finally:
128            metrics.Counter(METRICS_PREFIX + '/containers_cleaned_up'
129                            ).increment_by(dcount)
130            logging.debug('Done.  Destroyed %d containers', dcount)
131
132
133    @property
134    def size(self):
135        """Returns the current size of the pool.
136
137        Note that the pool is asynchronous.  Returning a size greater than zero
138        does not guarantee that a subsequent call to Pool.get will not block.
139        Conversely, returning a size of zero does not guarantee that a
140        subsequent call to Pool.get will block.
141        """
142        return self._pool.qsize()
143
144
145    @property
146    def capacity(self):
147        """Returns the max size of the pool."""
148        return self._pool.maxsize
149
150
151    @property
152    def errors(self):
153        """Returns worker errors.
154
155        @return: A Queue containing all the errors encountered on worker
156                 threads.
157        """
158        return self._monitor.errors;
159
160
161    @property
162    def worker_count(self):
163        """Returns the number of currently active workers.
164
165        Note that this isn't quite the same as the number of currently alive
166        worker threads.  Worker threads that have timed out or been cancelled
167        may be technically alive, but they are not included in this count.
168        """
169        return len(self._monitor._workers)
170
171
172class _Monitor(threading.Thread):
173    """A thread that manages the creation of containers for the pool.
174
175    Container creation is potentially time-consuming and can hang or crash.  The
176    Monitor class manages a pool of independent threads, each responsible for
177    the creation of a single Container.  This provides parallelized container
178    creation and ensures that a single Container creation hanging/crashing does
179    not starve or crash the Pool.
180    """
181
182    def __init__(self, factory, pool):
183        """Creates a new monitor.
184
185        @param factory: A container factory.
186        @param pool: A pool instance to push created containers into.
187        """
188        super(_Monitor, self).__init__(name='pool_monitor')
189
190        self._factory = factory
191        self._pool = pool
192
193        # List of worker threads.  Access this only from the monitor thread.
194        self._worker_max = _MAX_CONCURRENT_WORKERS
195        self._workers = []
196
197        # A flag for stopping the monitor.
198        self._stop = False
199
200        # Stores errors from worker threads.
201        self.errors = Queue.Queue()
202
203        # Throttle on errors, to avoid log spam and CPU spinning.
204        self._error_timestamps = collections.deque()
205
206
207    def run(self):
208        """Supplies the container pool with containers."""
209        logging.debug('Start event loop.')
210        while not self._stop:
211            self._clear_old_errors()
212            self._create_workers()
213            self._poll_workers()
214            time.sleep(_MIN_MONITOR_PERIOD)
215        logging.debug('Exit event loop.')
216
217        # Clean up - stop all workers.
218        for worker in self._workers:
219            worker.cancel()
220
221
222    def stop(self, timeout=0):
223        """Stops this thread.
224
225        This function blocks until the monitor thread has stopped.
226
227        @param timeout: If this is a non-zero number, wait this long (in
228                        seconds) for each worker thread to stop.  If zero (the
229                        default), don't wait for worker threads to exit.
230
231        @raise WorkerTimeoutError: If a worker thread does not exit within the
232                                   specified timeout.
233        """
234        logging.info('Stop requested.')
235        self._stop = True
236        self.join()
237        logging.info('Stopped.')
238        # Wait for workers if timeout was requested.
239        if timeout > 0:
240            logging.debug('Waiting for workers to terminate...')
241            for worker in self._workers:
242                worker.join(timeout)
243                if worker.is_alive():
244                    raise lxc_error.WorkerTimeoutError()
245
246
247    def _create_workers(self):
248        """Spawns workers to handle container requests.
249
250        This method modifies the _workers list and should only be called from
251        within run().
252        """
253        if self._pool.full():
254            return
255
256        # Do not exceed the worker limit.
257        if len(self._workers) >= self._worker_max:
258            return
259
260        too_many_errors = len(self._error_timestamps) >= _MAX_ERRORS_PER_HOUR
261        metrics.Counter(METRICS_PREFIX + '/error_throttled',
262                        field_spec=[ts_mon.BooleanField('throttled')]
263                        ).increment(fields={'throttled': too_many_errors})
264        # Throttle if too many errors occur.
265        if too_many_errors:
266            logging.warning('Error throttled (until %d)',
267                            self._error_timestamps[0] + 3600)
268            return
269
270        # Create workers to refill the pool.
271        qsize = self._pool.qsize()
272        shortfall = self._pool.maxsize - qsize
273        old_worker_count = len(self._workers)
274
275        # Avoid spamming - only log if the monitor is taking some action.  Log
276        # this before creating worker threads, because we are counting live
277        # threads and want to avoid race conditions w.r.t. threads actually
278        # starting.
279        if (old_worker_count < shortfall and
280                old_worker_count < self._worker_max):
281            # This can include workers that aren't currently in the self._worker
282            # list, e.g. workers that were dropped from the list because they
283            # timed out.
284            active_workers = sum([1 for t in threading.enumerate()
285                                  if type(t) is _Worker])
286            # qsize    : Current size of the container pool.
287            # shortfall: Number of empty slots currently in the pool.
288            # workers  : m+n, where m is the current number of active worker
289            #            threads and n is the number of new threads created.
290            logging.debug('qsize:%d shortfall:%d workers:%d',
291                          qsize, shortfall, active_workers)
292        if len(self._workers) < shortfall:
293            worker = _Worker(self._factory,
294                             self._on_worker_result,
295                             self._on_worker_error)
296            worker.start()
297            self._workers.append(worker)
298
299
300    def _poll_workers(self):
301        """Checks worker states and deals with them.
302
303        This method modifies the _workers list and should only be called from
304        within run().
305
306        Completed workers are taken off the worker list and their results/errors
307        are logged.
308        """
309        completed = []
310        incomplete = []
311        for worker in self._workers:
312            if worker.check_health():
313                incomplete.append(worker)
314            else:
315                completed.append(worker)
316
317        self._workers = incomplete
318
319
320    def _on_worker_result(self, result):
321        """Receives results from completed worker threads.
322
323        Pass this as the result callback for worker threads.  Worker threads
324        should call this when they produce a container.
325        """
326        logging.debug('Worker result: %r', result)
327        self._pool.put(result)
328
329
330    def _on_worker_error(self, worker, err):
331        """Receives errors from worker threads.
332
333        Pass this as the error callback for worker threads.  Worker threads
334        should call this if errors occur.
335        """
336        timestamp = time.time()
337        self._error_timestamps.append(timestamp)
338        self.errors.put(err)
339        logging.error('[%d] Worker error: %s', worker.ident, err)
340
341
342    def _clear_old_errors(self):
343        """Clears errors more than an hour old out of the log."""
344        one_hour_ago = time.time() - 3600
345        metrics.Counter(METRICS_PREFIX + '/recent_errors'
346                        ).increment_by(len(self._error_timestamps))
347        while (self._error_timestamps and
348               self._error_timestamps[0] < one_hour_ago):
349            self._error_timestamps.popleft()
350            # Avoid logspam - log only when some action was taken.
351            logging.error('Worker error count: %d', len(self._error_timestamps))
352
353
354class _Worker(threading.Thread):
355    """A worker thread tasked with managing the creation of a single container.
356
357    The worker is a daemon thread that calls upon a container factory to create
358    a single container.  If container creation raises any exceptions, they are
359    logged and the worker exits.  The worker also provides a mechanism for the
360    parent thread to impose timeouts on container creation.
361    """
362
363    def __init__(self, factory, result_cb, error_cb):
364        """Creates a new Worker.
365
366        @param factory: A factory object that will be called upon to create
367                        Containers.
368        """
369        super(_Worker, self).__init__(name='pool_worker')
370        # Hanging worker threads should not keep the pool process alive.
371        self.daemon = True
372
373        self._factory = factory
374
375        self._result_cb = result_cb
376        self._error_cb = error_cb
377
378        self._cancelled = False
379        self._start_time = None
380
381        # A lock for breaking race conditions in worker cancellation.  Use a
382        # recursive lock because _check_health requires it.
383        self._completion_lock = threading.RLock()
384        self._completed = False
385
386
387    def run(self):
388        """Creates a single container."""
389        self._start_time = time.time()
390        container = None
391        try:
392            container = self._factory.create_container()
393            container.start(wait_for_network=True)
394        except Exception as e:
395            logging.error('Worker error: %s', error.format_error())
396            self._error_cb(self, e)
397        finally:
398            # All this has to happen atomically, otherwise race conditions can
399            # arise w.r.t. cancellation.
400            with self._completion_lock:
401                self._completed = True
402                if self._cancelled:
403                    # If the job was cancelled, destroy the container instead of
404                    # putting it in the result queue.  Do not release the
405                    # throttle, as it would have been released when the
406                    # cancellation occurred.
407                    if container is not None:
408                        container.destroy()
409                else:
410                    # Put the container in the result field.  Release the
411                    # throttle so another task can be picked up.
412                    # Container may be None if errors occurred.
413                    if container is not None:
414                        self._result_cb(container)
415
416
417    def cancel(self):
418        """Cancels the work request.
419
420        The container will be destroyed when created, instead of being added to
421        the container pool.
422        """
423        with self._completion_lock:
424            if self._completed:
425                return False
426            else:
427                self._cancelled = True
428                return True
429
430
431    def check_health(self):
432        """Checks that a worker is alive and has not timed out.
433
434        Checks the run time of the worker to make sure it hasn't timed out.
435        Cancels workers that exceed the timeout.
436
437        @return: True if the worker is alive and has not timed out, False
438                 otherwise.
439        """
440        # Acquire the completion lock so as to avoid race conditions if the
441        # factory happens to return just as we are timing out.
442        with self._completion_lock:
443            if not self.is_alive() or self._cancelled or self._completed:
444                return False
445
446            # Thread hasn't started yet - count this as healthy.
447            if self._start_time is None:
448                return True
449
450            # If alive, check the timeout and cancel if necessary.
451            runtime = time.time() - self._start_time
452            if runtime > _CONTAINER_CREATION_TIMEOUT:
453                if self.cancel():
454                    self._error_cb(self, lxc_error.WorkerTimeoutError())
455                    return False
456
457        return True
458