• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import threading
8import time
9
10import common
11from autotest_lib.client.common_lib import utils
12from autotest_lib.site_utils.lxc import base_image
13from autotest_lib.site_utils.lxc import constants
14from autotest_lib.site_utils.lxc import container_factory
15from autotest_lib.site_utils.lxc import zygote
16from autotest_lib.site_utils.lxc.constants import \
17    CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX
18from autotest_lib.site_utils.lxc.container_pool import async_listener
19from autotest_lib.site_utils.lxc.container_pool import error
20from autotest_lib.site_utils.lxc.container_pool import message
21from autotest_lib.site_utils.lxc.container_pool import pool
22
23try:
24    import cPickle as pickle
25except:
26    import pickle
27
28try:
29    from chromite.lib import metrics
30    from infra_libs import ts_mon
31except ImportError:
32    import mock
33    metrics = utils.metrics_mock
34    ts_mon = mock.Mock()
35
36
37# The minimum period between polling for new connections, in seconds.
38_MIN_POLLING_PERIOD = 0.1
39
40
41class Service(object):
42    """A manager for a pool of LXC containers.
43
44    The Service class manages client communication with an underlying container
45    pool.  It listens for incoming client connections, then spawns threads to
46    deal with communication with each client.
47    """
48
49    def __init__(self, host_dir, pool=None):
50        """Sets up a new container pool service.
51
52        @param host_dir: A SharedHostDir.  This will be used for Zygote
53                         configuration as well as for general pool operation
54                         (e.g. opening linux domain sockets for communication).
55        @param pool: (for testing) A container pool that the service will
56                     maintain.  This parameter exists for DI, for testing.
57                     Under normal circumstances the service instantiates the
58                     container pool internally.
59        """
60        # Create socket for receiving container pool requests.  This also acts
61        # as a mutex, preventing multiple container pools from being
62        # instantiated.
63        self._socket_path = os.path.join(
64                host_dir.path, constants.DEFAULT_CONTAINER_POOL_SOCKET)
65        self._connection_listener = async_listener.AsyncListener(
66                self._socket_path)
67        self._client_threads = []
68        self._stop_event = None
69        self._running = False
70        self._pool = pool
71
72
73    def start(self, pool_size=constants.DEFAULT_CONTAINER_POOL_SIZE):
74        """Starts the service.
75
76        @param pool_size: The desired size of the container pool.  This
77                          parameter has no effect if a pre-created pool was DI'd
78                          into the Service constructor.
79        """
80        self._running = True
81
82        # Start the container pool.
83        if self._pool is None:
84            factory = container_factory.ContainerFactory(
85                    base_container=base_image.BaseImage().get(),
86                    container_class=zygote.Zygote)
87            self._pool = pool.Pool(factory=factory, size=pool_size)
88
89        # Start listening asynchronously for incoming connections.
90        self._connection_listener.start()
91
92        # Poll for incoming connections, and spawn threads to handle them.
93        logging.debug('Start event loop.')
94        while self._stop_event is None:
95            self._handle_incoming_connections()
96            self._cleanup_closed_connections()
97            # TODO(kenobi): Poll for and log errors from pool.
98            metrics.Counter(METRICS_PREFIX + '/tick').increment()
99            time.sleep(_MIN_POLLING_PERIOD)
100
101        logging.debug('Exit event loop.')
102
103        # Stopped - stop all the client threads, stop listening, then signal
104        # that shutdown is complete.
105        for thread in self._client_threads:
106            thread.stop()
107        try:
108            self._connection_listener.close()
109        except Exception as e:
110            logging.error('Error stopping pool service: %r', e)
111            raise
112        finally:
113            # Clean up the container pool.
114            self._pool.cleanup()
115            # Make sure state is consistent.
116            self._stop_event.set()
117            self._stop_event = None
118            self._running = False
119            metrics.Counter(METRICS_PREFIX + '/service_stopped').increment()
120            logging.debug('Container pool stopped.')
121
122
123    def stop(self):
124        """Stops the service."""
125        self._stop_event = threading.Event()
126        return self._stop_event
127
128
129    def is_running(self):
130        """Returns whether or not the service is currently running."""
131        return self._running
132
133
134    def get_status(self):
135        """Returns a dictionary of values describing the current status."""
136        status = {}
137        status['running'] = self._running
138        status['socket_path'] = self._socket_path
139        if self._running:
140            status['pool capacity'] = self._pool.capacity
141            status['pool size'] = self._pool.size
142            status['pool worker count'] = self._pool.worker_count
143            status['pool errors'] = self._pool.errors.qsize()
144            status['client thread count'] = len(self._client_threads)
145        return status
146
147
148    def _handle_incoming_connections(self):
149        """Checks for connections, and spawn sub-threads to handle requests."""
150        connection = self._connection_listener.get_connection()
151        if connection is not None:
152            # Spawn a thread to deal with the new connection.
153            thread = _ClientThread(self, self._pool, connection)
154            thread.start()
155            self._client_threads.append(thread)
156            thread_count = len(self._client_threads)
157            metrics.Counter(METRICS_PREFIX + '/client_threads'
158                          ).increment_by(thread_count)
159            logging.debug('Client thread count: %d', thread_count)
160
161
162    def _cleanup_closed_connections(self):
163        """Cleans up dead client threads."""
164        # We don't need to lock because all operations on self._client_threads
165        # take place on the main thread.
166        self._client_threads = [t for t in self._client_threads if t.is_alive()]
167
168
169class _ClientThread(threading.Thread):
170    """A class that handles communication with a pool client.
171
172    Use a thread-per-connection model instead of select()/poll() for a few
173    reasons:
174    - the number of simultaneous clients is not expected to be high enough for
175      select or poll to really pay off.
176    - one thread per connection is more robust - if a single client somehow
177      crashes its communication thread, that will not affect the other
178      communication threads or the main pool service.
179    """
180
181    def __init__(self, service, pool, connection):
182        self._service = service
183        self._pool = pool
184        self._connection = connection
185        self._running = False
186        super(_ClientThread, self).__init__(name='client_thread')
187
188
189    def run(self):
190        """Handles messages coming in from clients.
191
192        The thread main loop monitors the connection and handles incoming
193        messages.  Polling is used so that the loop condition can be checked
194        regularly - this enables the thread to exit cleanly if required.
195
196        Any kind of error will exit the event loop and close the connection.
197        """
198        logging.debug('Start event loop.')
199        try:
200            self._running = True
201            while self._running:
202                # Poll and deal with messages every second.  The timeout enables
203                # the thread to exit cleanly when stop() is called.
204                if self._connection.poll(1):
205                    try:
206                        msg = self._connection.recv()
207                    except (AttributeError,
208                            ImportError,
209                            IndexError,
210                            pickle.UnpicklingError) as e:
211                        # All of these can occur while unpickling data.
212                        logging.error('Error while receiving message: %r', e)
213                        # Exit if an error occurs
214                        break
215                    except EOFError:
216                        # EOFError means the client closed the connection.  This
217                        # is not an error - just exit.
218                        break
219
220                    try:
221                        response = self._handle_message(msg)
222                        # Always send the response, even if it's None.  This
223                        # provides more consistent client-side behaviour.
224                        self._connection.send(response)
225                    except error.UnknownMessageTypeError as e:
226                        # The message received was a valid python object, but
227                        # not a valid Message.
228                        logging.error('Message error: %s', e)
229                        # Exit if an error occurs
230                        break
231                    except EOFError:
232                        # EOFError means the client closed the connection early.
233                        # TODO(chromium:794685): Return container to pool.
234                        logging.error('Client closed connection before return.')
235                        break
236
237        finally:
238            # Always close the connection.
239            logging.debug('Exit event loop.')
240            self._connection.close()
241
242
243    def stop(self):
244        """Stops the client thread."""
245        self._running = False
246
247
248    def _handle_message(self, msg):
249        """Handles incoming messages.
250
251        @param msg: The incoming message to be handled.
252
253        @return: A pickleable object (or None) that should be sent back to the
254                 client.
255        """
256
257        # Only handle Message objects.
258        if not isinstance(msg, message.Message):
259            raise error.UnknownMessageTypeError(
260                    'Invalid message class %s' % type(msg))
261
262        # Use a dispatch table to simulate switch/case.
263        handlers = {
264            message.ECHO: self._echo,
265            message.GET: self._get,
266            message.SHUTDOWN: self._shutdown,
267            message.STATUS: self._status,
268        }
269        try:
270            return handlers[msg.type](**msg.args)
271        except KeyError:
272            raise error.UnknownMessageTypeError(
273                    'Invalid message type %s' % msg.type)
274
275
276    def _echo(self, msg):
277        """Handles ECHO messages.
278
279        @param msg: A string that will be echoed back to the client.
280
281        @return: The message, for echoing back to the client.
282        """
283        # Just echo the message back, for testing aliveness.
284        logging.debug('Echo: %r', msg)
285        return msg
286
287
288    def _shutdown(self):
289        """Handles SHUTDOWN messages.
290
291        @return: An ACK message.  This function is synchronous (i.e. it blocks,
292                 and only returns the ACK when shutdown is complete).
293        """
294        logging.debug('Received shutdown request.')
295        # Request shutdown.  Wait for the service to actually stop before
296        # sending the response.
297        self._service.stop().wait()
298        logging.debug('Service shutdown complete.')
299        return message.ack()
300
301
302    def _status(self):
303        """Handles STATUS messages.
304
305        @return: The result of the service status call.
306        """
307        logging.debug('Received status request.')
308        return self._service.get_status()
309
310
311    def _get(self, id, timeout):
312        """Gets a container from the pool.
313
314        @param id: A ContainerId to assign to the new container.
315        @param timeout: A timeout (in seconds) to wait for the pool.  If a
316                        container is not available from the pool within the
317                        given period, None will be returned.
318
319        @return: A container from the pool.
320        """
321        logging.debug('Received get request (id=%s)', id)
322        container = self._pool.get(timeout)
323        # Assign an ID to the container as soon as it is removed from the pool.
324        # This associates the container with the process to which it will be
325        # handed off.
326        if container is not None:
327            logging.debug(
328                'Assigning container (name=%s, id=%s)', container.name, id)
329            container.id = id
330        else:
331            logging.debug('No container (id=%s)', id)
332        metrics.Counter(METRICS_PREFIX + '/container_requests',
333                        field_spec=[ts_mon.BooleanField('success')]
334                        ).increment(fields={'success': (container is not None)})
335        return container
336