1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import multiprocessing 7import os 8import threading 9 10from autotest_lib.client.common_lib import autotemp 11from autotest_lib.server import utils 12 13_MASTER_SSH_COMMAND_TEMPLATE = ( 14 '/usr/bin/ssh -a -x -N ' 15 '-o ControlMaster=yes ' # Create multiplex socket. 16 '-o ControlPath=%(socket)s ' 17 '-o StrictHostKeyChecking=no ' 18 '-o UserKnownHostsFile=/dev/null ' 19 '-o BatchMode=yes ' 20 '-o ConnectTimeout=30 ' 21 '-o ServerAliveInterval=900 ' 22 '-o ServerAliveCountMax=3 ' 23 '-o ConnectionAttempts=4 ' 24 '-o Protocol=2 ' 25 '-l %(user)s -p %(port)d %(hostname)s') 26 27 28class MasterSsh(object): 29 """Manages multiplex ssh connection.""" 30 31 def __init__(self, hostname, user, port): 32 self._hostname = hostname 33 self._user = user 34 self._port = port 35 36 self._master_job = None 37 self._master_tempdir = None 38 39 self._lock = multiprocessing.Lock() 40 41 def __del__(self): 42 self.close() 43 44 @property 45 def _socket_path(self): 46 return os.path.join(self._master_tempdir.name, 'socket') 47 48 @property 49 def ssh_option(self): 50 """Returns the ssh option to use this multiplexed ssh. 51 52 If background process is not running, returns an empty string. 53 """ 54 if not self._master_tempdir: 55 return '' 56 return '-o ControlPath=%s' % (self._socket_path,) 57 58 def maybe_start(self, timeout=5): 59 """Starts the background process to run multiplex ssh connection. 60 61 If there already is a background process running, this does nothing. 62 If there is a stale process or a stale socket, first clean them up, 63 then create a background process. 64 65 @param timeout: timeout in seconds (default 5) to wait for master ssh 66 connection to be established. If timeout is reached, a 67 warning message is logged, but no other action is 68 taken. 69 """ 70 # Multiple processes might try in parallel to clean up the old master 71 # ssh connection and create a new one, therefore use a lock to protect 72 # against race conditions. 73 with self._lock: 74 # If a previously started master SSH connection is not running 75 # anymore, it needs to be cleaned up and then restarted. 76 if (self._master_job and (not os.path.exists(self._socket_path) or 77 self._master_job.sp.poll() is not None)): 78 logging.info( 79 'Master ssh connection to %s is down.', self._hostname) 80 self._close_internal() 81 82 # Start a new master SSH connection. 83 if not self._master_job: 84 # Create a shared socket in a temp location. 85 self._master_tempdir = autotemp.tempdir(dir=_short_tmpdir()) 86 87 # Start the master SSH connection in the background. 88 master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % { 89 'hostname': self._hostname, 90 'user': self._user, 91 'port': self._port, 92 'socket': self._socket_path, 93 } 94 logging.info( 95 'Starting master ssh connection \'%s\'', master_cmd) 96 self._master_job = utils.BgJob( 97 master_cmd, nickname='master-ssh', 98 stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL, 99 unjoinable=True) 100 101 # To prevent a race between the master ssh connection 102 # startup and its first attempted use, wait for socket file to 103 # exist before returning. 104 try: 105 utils.poll_for_condition( 106 condition=lambda: os.path.exists(self._socket_path), 107 timeout=timeout, 108 sleep_interval=0.2, 109 desc='master-ssh connection up') 110 except utils.TimeoutError: 111 # poll_for_conditional already logs an error upon timeout 112 pass 113 114 115 def close(self): 116 """Releases all resources used by multiplexed ssh connection.""" 117 with self._lock: 118 self._close_internal() 119 120 def _close_internal(self): 121 # Assume that when this is called, _lock should be acquired, already. 122 if self._master_job: 123 logging.debug('Nuking ssh master_job') 124 utils.nuke_subprocess(self._master_job.sp) 125 self._master_job = None 126 127 if self._master_tempdir: 128 logging.debug('Cleaning ssh master_tempdir') 129 self._master_tempdir.clean() 130 self._master_tempdir = None 131 132 133class ConnectionPool(object): 134 """Holds SSH multiplex connection instance.""" 135 136 def __init__(self): 137 self._pool = {} 138 self._lock = threading.Lock() 139 140 def get(self, hostname, user, port): 141 """Returns MasterSsh instance for the given endpoint. 142 143 If the pool holds the instance already, returns it. If not, create the 144 instance, and returns it. 145 146 Caller has the responsibility to call maybe_start() before using it. 147 148 @param hostname: Host name of the endpoint. 149 @param user: User name to log in. 150 @param port: Port number sshd is listening. 151 """ 152 key = (hostname, user, port) 153 logging.debug('Get master ssh connection for %s@%s:%d', user, hostname, 154 port) 155 156 with self._lock: 157 conn = self._pool.get(key) 158 if not conn: 159 conn = MasterSsh(hostname, user, port) 160 self._pool[key] = conn 161 return conn 162 163 def shutdown(self): 164 """Closes all ssh multiplex connections.""" 165 for ssh in self._pool.itervalues(): 166 ssh.close() 167 168 169def _short_tmpdir(): 170 # crbug/865171 Unix domain socket paths are limited to 108 characters. 171 # crbug/945523 Swarming does not like too many top-level directories in 172 # /tmp. 173 # So use a shared parent directory in /tmp 174 user = os.environ.get("USER", "no_USER")[:8] 175 d = '/tmp/ssh-master_%s' % user 176 if not os.path.exists(d): 177 os.mkdir(d) 178 return d 179