• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import multiprocessing
7import os
8import time
9
10from autotest_lib.client.common_lib import autotemp
11from autotest_lib.server import utils
12
13_MASTER_SSH_COMMAND_TEMPLATE = (
14    '/usr/bin/ssh -a -x -N '
15    '-o ControlMaster=yes '  # Create multiplex socket.
16    '-o ControlPath=%(socket)s '
17    '-o StrictHostKeyChecking=no '
18    '-o UserKnownHostsFile=/dev/null '
19    '-o BatchMode=yes '
20    '-o ConnectTimeout=30 '
21    '-o ServerAliveInterval=900 '
22    '-o ServerAliveCountMax=3 '
23    '-o ConnectionAttempts=4 '
24    '-o Protocol=2 '
25    '-l %(user)s -p %(port)d %(hostname)s')
26
27
28class MasterSsh(object):
29    """Manages multiplex ssh connection."""
30
31    def __init__(self, hostname, user, port):
32        self._hostname = hostname
33        self._user = user
34        self._port = port
35
36        self._master_job = None
37        self._master_tempdir = None
38
39        self._lock = multiprocessing.Lock()
40
41    @property
42    def _socket_path(self):
43        return os.path.join(self._master_tempdir.name, 'socket')
44
45    @property
46    def ssh_option(self):
47        """Returns the ssh option to use this multiplexed ssh.
48
49        If background process is not running, returns an empty string.
50        """
51        if not self._master_tempdir:
52            return ''
53        return '-o ControlPath=%s' % (self._socket_path,)
54
55    def maybe_start(self, timeout=5):
56        """Starts the background process to run multiplex ssh connection.
57
58        If there already is a background process running, this does nothing.
59        If there is a stale process or a stale socket, first clean them up,
60        then create a background process.
61
62        @param timeout: timeout in seconds (default 5) to wait for master ssh
63                        connection to be established. If timeout is reached, a
64                        warning message is logged, but no other action is
65                        taken.
66        """
67        # Multiple processes might try in parallel to clean up the old master
68        # ssh connection and create a new one, therefore use a lock to protect
69        # against race conditions.
70        with self._lock:
71            # If a previously started master SSH connection is not running
72            # anymore, it needs to be cleaned up and then restarted.
73            if (self._master_job and (not os.path.exists(self._socket_path) or
74                                      self._master_job.sp.poll() is not None)):
75                logging.info(
76                        'Master ssh connection to %s is down.', self._hostname)
77                self._close_internal()
78
79            # Start a new master SSH connection.
80            if not self._master_job:
81                # Create a shared socket in a temp location.
82                self._master_tempdir = autotemp.tempdir(unique_id='ssh-master')
83
84                # Start the master SSH connection in the background.
85                master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % {
86                        'hostname': self._hostname,
87                        'user': self._user,
88                        'port': self._port,
89                        'socket': self._socket_path,
90                }
91                logging.info(
92                        'Starting master ssh connection \'%s\'', master_cmd)
93                self._master_job = utils.BgJob(
94                         master_cmd, nickname='master-ssh',
95                         stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL,
96                         unjoinable=True)
97
98                # To prevent a race between the the master ssh connection
99                # startup and its first attempted use, wait for socket file to
100                # exist before returning.
101                end_time = time.time() + timeout
102                while time.time() < end_time:
103                    if os.path.exists(self._socket_path):
104                        break
105                    time.sleep(.2)
106                else:
107                    logging.info('Timed out waiting for master-ssh connection '
108                       'to be established.')
109
110    def close(self):
111        """Releases all resources used by multiplexed ssh connection."""
112        with self._lock:
113            self._close_internal()
114
115    def _close_internal(self):
116        # Assume that when this is called, _lock should be acquired, already.
117        if self._master_job:
118            logging.debug('Nuking ssh master_job')
119            utils.nuke_subprocess(self._master_job.sp)
120            self._master_job = None
121
122        if self._master_tempdir:
123            logging.debug('Cleaning ssh master_tempdir')
124            self._master_tempdir.clean()
125            self._master_tempdir = None
126
127
128class ConnectionPool(object):
129    """Holds SSH multiplex connection instance."""
130
131    def __init__(self):
132        self._pool = {}
133
134    def get(self, hostname, user, port):
135        """Returns MasterSsh instance for the given endpoint.
136
137        If the pool holds the instance already, returns it. If not, create the
138        instance, and returns it.
139
140        Caller has the responsibility to call maybe_start() before using it.
141
142        @param hostname: Host name of the endpoint.
143        @param user: User name to log in.
144        @param port: Port number sshd is listening.
145        """
146        key = (hostname, user, port)
147        master_ssh = self._pool.get(key)
148        if not master_ssh:
149            master_ssh = MasterSsh(hostname, user, port)
150            self._pool[key] = master_ssh
151        return master_ssh
152
153    def shutdown(self):
154        """Closes all ssh multiplex connections."""
155        for ssh in self._pool.itervalues():
156            ssh.close()
157