1# Lint as: python2, python3 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import logging 11import multiprocessing 12import os 13import threading 14 15from autotest_lib.client.common_lib import autotemp 16from autotest_lib.server import utils 17import six 18 19_MASTER_SSH_COMMAND_TEMPLATE = ( 20 '/usr/bin/ssh -a -x -N ' 21 '-o ControlMaster=yes ' # Create multiplex socket. 22 '-o ControlPath=%(socket)s ' 23 '-o StrictHostKeyChecking=no ' 24 '-o UserKnownHostsFile=/dev/null ' 25 '-o BatchMode=yes ' 26 '-o ConnectTimeout=30 ' 27 '-o ServerAliveInterval=30 ' 28 '-o ServerAliveCountMax=1 ' 29 '-o ConnectionAttempts=1 ' 30 '-o Protocol=2 ' 31 '-l %(user)s -p %(port)d %(hostname)s') 32 33 34class MasterSsh(object): 35 """Manages multiplex ssh connection.""" 36 37 def __init__(self, hostname, user, port): 38 self._hostname = hostname 39 self._user = user 40 self._port = port 41 42 self._master_job = None 43 self._master_tempdir = None 44 45 self._lock = multiprocessing.Lock() 46 47 def __del__(self): 48 self.close() 49 50 @property 51 def _socket_path(self): 52 return os.path.join(self._master_tempdir.name, 'socket') 53 54 @property 55 def ssh_option(self): 56 """Returns the ssh option to use this multiplexed ssh. 57 58 If background process is not running, returns an empty string. 59 """ 60 if not self._master_tempdir: 61 return '' 62 return '-o ControlPath=%s' % (self._socket_path,) 63 64 def maybe_start(self, timeout=5): 65 """Starts the background process to run multiplex ssh connection. 66 67 If there already is a background process running, this does nothing. 68 If there is a stale process or a stale socket, first clean them up, 69 then create a background process. 70 71 @param timeout: timeout in seconds (default 5) to wait for master ssh 72 connection to be established. If timeout is reached, a 73 warning message is logged, but no other action is 74 taken. 75 """ 76 # Multiple processes might try in parallel to clean up the old master 77 # ssh connection and create a new one, therefore use a lock to protect 78 # against race conditions. 79 with self._lock: 80 # If a previously started master SSH connection is not running 81 # anymore, it needs to be cleaned up and then restarted. 82 if (self._master_job and (not os.path.exists(self._socket_path) or 83 self._master_job.sp.poll() is not None)): 84 logging.info( 85 'Master ssh connection to %s is down.', self._hostname) 86 self._close_internal() 87 88 # Start a new master SSH connection. 89 if not self._master_job: 90 # Create a shared socket in a temp location. 91 self._master_tempdir = autotemp.tempdir(dir=_short_tmpdir()) 92 93 # Start the master SSH connection in the background. 94 master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % { 95 'hostname': self._hostname, 96 'user': self._user, 97 'port': self._port, 98 'socket': self._socket_path, 99 } 100 logging.info( 101 'Starting master ssh connection \'%s\'', master_cmd) 102 self._master_job = utils.BgJob( 103 master_cmd, nickname='master-ssh', 104 stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL, 105 unjoinable=True) 106 107 # To prevent a race between the master ssh connection 108 # startup and its first attempted use, wait for socket file to 109 # exist before returning. 110 try: 111 utils.poll_for_condition( 112 condition=lambda: os.path.exists(self._socket_path), 113 timeout=timeout, 114 sleep_interval=0.2, 115 desc='master-ssh connection up') 116 except utils.TimeoutError: 117 # poll_for_conditional already logs an error upon timeout 118 pass 119 120 121 def close(self): 122 """Releases all resources used by multiplexed ssh connection.""" 123 with self._lock: 124 self._close_internal() 125 126 def _close_internal(self): 127 # Assume that when this is called, _lock should be acquired, already. 128 if self._master_job: 129 logging.debug('Nuking ssh master_job') 130 utils.nuke_subprocess(self._master_job.sp) 131 self._master_job = None 132 133 if self._master_tempdir: 134 logging.debug('Cleaning ssh master_tempdir') 135 self._master_tempdir.clean() 136 self._master_tempdir = None 137 138 139class ConnectionPool(object): 140 """Holds SSH multiplex connection instance.""" 141 142 def __init__(self): 143 self._pool = {} 144 self._lock = threading.Lock() 145 146 def get(self, hostname, user, port): 147 """Returns MasterSsh instance for the given endpoint. 148 149 If the pool holds the instance already, returns it. If not, create the 150 instance, and returns it. 151 152 Caller has the responsibility to call maybe_start() before using it. 153 154 @param hostname: Host name of the endpoint. 155 @param user: User name to log in. 156 @param port: Port number sshd is listening. 157 """ 158 key = (hostname, user, port) 159 logging.debug('Get master ssh connection for %s@%s:%d', user, hostname, 160 port) 161 162 with self._lock: 163 conn = self._pool.get(key) 164 if not conn: 165 conn = MasterSsh(hostname, user, port) 166 self._pool[key] = conn 167 return conn 168 169 def shutdown(self): 170 """Closes all ssh multiplex connections.""" 171 for ssh in six.itervalues(self._pool): 172 ssh.close() 173 174 175def _short_tmpdir(): 176 # crbug/865171 Unix domain socket paths are limited to 108 characters. 177 # crbug/945523 Swarming does not like too many top-level directories in 178 # /tmp. 179 # So use a shared parent directory in /tmp 180 user = os.environ.get("USER", "no_USER")[:8] 181 d = '/tmp/ssh-master_%s' % user 182 if not os.path.exists(d): 183 os.mkdir(d) 184 return d 185