• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import logging
11import multiprocessing
12import os
13import threading
14
15from autotest_lib.client.common_lib import autotemp
16from autotest_lib.server import utils
17import six
18
19_MASTER_SSH_COMMAND_TEMPLATE = (
20    '/usr/bin/ssh -a -x -N '
21    '-o ControlMaster=yes '  # Create multiplex socket.
22    '-o ControlPath=%(socket)s '
23    '-o StrictHostKeyChecking=no '
24    '-o UserKnownHostsFile=/dev/null '
25    '-o BatchMode=yes '
26    '-o ConnectTimeout=30 '
27    '-o ServerAliveInterval=30 '
28    '-o ServerAliveCountMax=1 '
29    '-o ConnectionAttempts=1 '
30    '-o Protocol=2 '
31    '-l %(user)s -p %(port)d %(hostname)s')
32
33
34class MasterSsh(object):
35    """Manages multiplex ssh connection."""
36
37    def __init__(self, hostname, user, port):
38        self._hostname = hostname
39        self._user = user
40        self._port = port
41
42        self._master_job = None
43        self._master_tempdir = None
44
45        self._lock = multiprocessing.Lock()
46
47    def __del__(self):
48        self.close()
49
50    @property
51    def _socket_path(self):
52        return os.path.join(self._master_tempdir.name, 'socket')
53
54    @property
55    def ssh_option(self):
56        """Returns the ssh option to use this multiplexed ssh.
57
58        If background process is not running, returns an empty string.
59        """
60        if not self._master_tempdir:
61            return ''
62        return '-o ControlPath=%s' % (self._socket_path,)
63
64    def maybe_start(self, timeout=5):
65        """Starts the background process to run multiplex ssh connection.
66
67        If there already is a background process running, this does nothing.
68        If there is a stale process or a stale socket, first clean them up,
69        then create a background process.
70
71        @param timeout: timeout in seconds (default 5) to wait for master ssh
72                        connection to be established. If timeout is reached, a
73                        warning message is logged, but no other action is
74                        taken.
75        """
76        # Multiple processes might try in parallel to clean up the old master
77        # ssh connection and create a new one, therefore use a lock to protect
78        # against race conditions.
79        with self._lock:
80            # If a previously started master SSH connection is not running
81            # anymore, it needs to be cleaned up and then restarted.
82            if (self._master_job and (not os.path.exists(self._socket_path) or
83                                      self._master_job.sp.poll() is not None)):
84                logging.info(
85                        'Master ssh connection to %s is down.', self._hostname)
86                self._close_internal()
87
88            # Start a new master SSH connection.
89            if not self._master_job:
90                # Create a shared socket in a temp location.
91                self._master_tempdir = autotemp.tempdir(dir=_short_tmpdir())
92
93                # Start the master SSH connection in the background.
94                master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % {
95                        'hostname': self._hostname,
96                        'user': self._user,
97                        'port': self._port,
98                        'socket': self._socket_path,
99                }
100                logging.info(
101                        'Starting master ssh connection \'%s\'', master_cmd)
102                self._master_job = utils.BgJob(
103                         master_cmd, nickname='master-ssh',
104                         stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL,
105                         unjoinable=True)
106
107                # To prevent a race between the master ssh connection
108                # startup and its first attempted use, wait for socket file to
109                # exist before returning.
110                try:
111                    utils.poll_for_condition(
112                            condition=lambda: os.path.exists(self._socket_path),
113                            timeout=timeout,
114                            sleep_interval=0.2,
115                            desc='master-ssh connection up')
116                except utils.TimeoutError:
117                    # poll_for_conditional already logs an error upon timeout
118                    pass
119
120
121    def close(self):
122        """Releases all resources used by multiplexed ssh connection."""
123        with self._lock:
124            self._close_internal()
125
126    def _close_internal(self):
127        # Assume that when this is called, _lock should be acquired, already.
128        if self._master_job:
129            logging.debug('Nuking ssh master_job')
130            utils.nuke_subprocess(self._master_job.sp)
131            self._master_job = None
132
133        if self._master_tempdir:
134            logging.debug('Cleaning ssh master_tempdir')
135            self._master_tempdir.clean()
136            self._master_tempdir = None
137
138
139class ConnectionPool(object):
140    """Holds SSH multiplex connection instance."""
141
142    def __init__(self):
143        self._pool = {}
144        self._lock = threading.Lock()
145
146    def get(self, hostname, user, port):
147        """Returns MasterSsh instance for the given endpoint.
148
149        If the pool holds the instance already, returns it. If not, create the
150        instance, and returns it.
151
152        Caller has the responsibility to call maybe_start() before using it.
153
154        @param hostname: Host name of the endpoint.
155        @param user: User name to log in.
156        @param port: Port number sshd is listening.
157        """
158        key = (hostname, user, port)
159        logging.debug('Get master ssh connection for %s@%s:%d', user, hostname,
160                      port)
161
162        with self._lock:
163            conn = self._pool.get(key)
164            if not conn:
165                conn = MasterSsh(hostname, user, port)
166                self._pool[key] = conn
167            return conn
168
169    def shutdown(self):
170        """Closes all ssh multiplex connections."""
171        for ssh in six.itervalues(self._pool):
172            ssh.close()
173
174
175def _short_tmpdir():
176    # crbug/865171 Unix domain socket paths are limited to 108 characters.
177    # crbug/945523 Swarming does not like too many top-level directories in
178    # /tmp.
179    # So use a shared parent directory in /tmp
180    user = os.environ.get("USER", "no_USER")[:8]
181    d = '/tmp/ssh-master_%s' % user
182    if not os.path.exists(d):
183        os.mkdir(d)
184    return d
185