• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import multiprocessing
7import os
8import threading
9
10from autotest_lib.client.common_lib import autotemp
11from autotest_lib.server import utils
12
13_MASTER_SSH_COMMAND_TEMPLATE = (
14    '/usr/bin/ssh -a -x -N '
15    '-o ControlMaster=yes '  # Create multiplex socket.
16    '-o ControlPath=%(socket)s '
17    '-o StrictHostKeyChecking=no '
18    '-o UserKnownHostsFile=/dev/null '
19    '-o BatchMode=yes '
20    '-o ConnectTimeout=30 '
21    '-o ServerAliveInterval=900 '
22    '-o ServerAliveCountMax=3 '
23    '-o ConnectionAttempts=4 '
24    '-o Protocol=2 '
25    '-l %(user)s -p %(port)d %(hostname)s')
26
27
28class MasterSsh(object):
29    """Manages multiplex ssh connection."""
30
31    def __init__(self, hostname, user, port):
32        self._hostname = hostname
33        self._user = user
34        self._port = port
35
36        self._master_job = None
37        self._master_tempdir = None
38
39        self._lock = multiprocessing.Lock()
40
41    def __del__(self):
42        self.close()
43
44    @property
45    def _socket_path(self):
46        return os.path.join(self._master_tempdir.name, 'socket')
47
48    @property
49    def ssh_option(self):
50        """Returns the ssh option to use this multiplexed ssh.
51
52        If background process is not running, returns an empty string.
53        """
54        if not self._master_tempdir:
55            return ''
56        return '-o ControlPath=%s' % (self._socket_path,)
57
58    def maybe_start(self, timeout=5):
59        """Starts the background process to run multiplex ssh connection.
60
61        If there already is a background process running, this does nothing.
62        If there is a stale process or a stale socket, first clean them up,
63        then create a background process.
64
65        @param timeout: timeout in seconds (default 5) to wait for master ssh
66                        connection to be established. If timeout is reached, a
67                        warning message is logged, but no other action is
68                        taken.
69        """
70        # Multiple processes might try in parallel to clean up the old master
71        # ssh connection and create a new one, therefore use a lock to protect
72        # against race conditions.
73        with self._lock:
74            # If a previously started master SSH connection is not running
75            # anymore, it needs to be cleaned up and then restarted.
76            if (self._master_job and (not os.path.exists(self._socket_path) or
77                                      self._master_job.sp.poll() is not None)):
78                logging.info(
79                        'Master ssh connection to %s is down.', self._hostname)
80                self._close_internal()
81
82            # Start a new master SSH connection.
83            if not self._master_job:
84                # Create a shared socket in a temp location.
85                self._master_tempdir = autotemp.tempdir(dir=_short_tmpdir())
86
87                # Start the master SSH connection in the background.
88                master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % {
89                        'hostname': self._hostname,
90                        'user': self._user,
91                        'port': self._port,
92                        'socket': self._socket_path,
93                }
94                logging.info(
95                        'Starting master ssh connection \'%s\'', master_cmd)
96                self._master_job = utils.BgJob(
97                         master_cmd, nickname='master-ssh',
98                         stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL,
99                         unjoinable=True)
100
101                # To prevent a race between the master ssh connection
102                # startup and its first attempted use, wait for socket file to
103                # exist before returning.
104                try:
105                    utils.poll_for_condition(
106                            condition=lambda: os.path.exists(self._socket_path),
107                            timeout=timeout,
108                            sleep_interval=0.2,
109                            desc='master-ssh connection up')
110                except utils.TimeoutError:
111                    # poll_for_conditional already logs an error upon timeout
112                    pass
113
114
115    def close(self):
116        """Releases all resources used by multiplexed ssh connection."""
117        with self._lock:
118            self._close_internal()
119
120    def _close_internal(self):
121        # Assume that when this is called, _lock should be acquired, already.
122        if self._master_job:
123            logging.debug('Nuking ssh master_job')
124            utils.nuke_subprocess(self._master_job.sp)
125            self._master_job = None
126
127        if self._master_tempdir:
128            logging.debug('Cleaning ssh master_tempdir')
129            self._master_tempdir.clean()
130            self._master_tempdir = None
131
132
133class ConnectionPool(object):
134    """Holds SSH multiplex connection instance."""
135
136    def __init__(self):
137        self._pool = {}
138        self._lock = threading.Lock()
139
140    def get(self, hostname, user, port):
141        """Returns MasterSsh instance for the given endpoint.
142
143        If the pool holds the instance already, returns it. If not, create the
144        instance, and returns it.
145
146        Caller has the responsibility to call maybe_start() before using it.
147
148        @param hostname: Host name of the endpoint.
149        @param user: User name to log in.
150        @param port: Port number sshd is listening.
151        """
152        key = (hostname, user, port)
153        logging.debug('Get master ssh connection for %s@%s:%d', user, hostname,
154                      port)
155
156        with self._lock:
157            conn = self._pool.get(key)
158            if not conn:
159                conn = MasterSsh(hostname, user, port)
160                self._pool[key] = conn
161            return conn
162
163    def shutdown(self):
164        """Closes all ssh multiplex connections."""
165        for ssh in self._pool.itervalues():
166            ssh.close()
167
168
169def _short_tmpdir():
170    # crbug/865171 Unix domain socket paths are limited to 108 characters.
171    # crbug/945523 Swarming does not like too many top-level directories in
172    # /tmp.
173    # So use a shared parent directory in /tmp
174    user = os.environ.get("USER", "no_USER")[:8]
175    d = '/tmp/ssh-master_%s' % user
176    if not os.path.exists(d):
177        os.mkdir(d)
178    return d
179