1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import multiprocessing 7import os 8import threading 9 10from autotest_lib.client.common_lib import autotemp 11from autotest_lib.server import utils 12 13_MASTER_SSH_COMMAND_TEMPLATE = ( 14 '/usr/bin/ssh -a -x -N ' 15 '-o ControlMaster=yes ' # Create multiplex socket. 16 '-o ControlPath=%(socket)s ' 17 '-o StrictHostKeyChecking=no ' 18 '-o UserKnownHostsFile=/dev/null ' 19 '-o BatchMode=yes ' 20 '-o ConnectTimeout=30 ' 21 '-o ServerAliveInterval=900 ' 22 '-o ServerAliveCountMax=3 ' 23 '-o ConnectionAttempts=4 ' 24 '-o Protocol=2 ' 25 '-l %(user)s -p %(port)d %(hostname)s') 26 27 28class MasterSsh(object): 29 """Manages multiplex ssh connection.""" 30 31 def __init__(self, hostname, user, port): 32 self._hostname = hostname 33 self._user = user 34 self._port = port 35 36 self._master_job = None 37 self._master_tempdir = None 38 39 self._lock = multiprocessing.Lock() 40 41 def __del__(self): 42 self.close() 43 44 @property 45 def _socket_path(self): 46 return os.path.join(self._master_tempdir.name, 'socket') 47 48 @property 49 def ssh_option(self): 50 """Returns the ssh option to use this multiplexed ssh. 51 52 If background process is not running, returns an empty string. 53 """ 54 if not self._master_tempdir: 55 return '' 56 return '-o ControlPath=%s' % (self._socket_path,) 57 58 def maybe_start(self, timeout=5): 59 """Starts the background process to run multiplex ssh connection. 60 61 If there already is a background process running, this does nothing. 62 If there is a stale process or a stale socket, first clean them up, 63 then create a background process. 64 65 @param timeout: timeout in seconds (default 5) to wait for master ssh 66 connection to be established. If timeout is reached, a 67 warning message is logged, but no other action is 68 taken. 69 """ 70 # Multiple processes might try in parallel to clean up the old master 71 # ssh connection and create a new one, therefore use a lock to protect 72 # against race conditions. 73 with self._lock: 74 # If a previously started master SSH connection is not running 75 # anymore, it needs to be cleaned up and then restarted. 76 if (self._master_job and (not os.path.exists(self._socket_path) or 77 self._master_job.sp.poll() is not None)): 78 logging.info( 79 'Master ssh connection to %s is down.', self._hostname) 80 self._close_internal() 81 82 # Start a new master SSH connection. 83 if not self._master_job: 84 # Create a shared socket in a temp location. 85 self._master_tempdir = autotemp.tempdir( 86 unique_id='ssh-master', dir='/tmp') 87 88 # Start the master SSH connection in the background. 89 master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % { 90 'hostname': self._hostname, 91 'user': self._user, 92 'port': self._port, 93 'socket': self._socket_path, 94 } 95 logging.info( 96 'Starting master ssh connection \'%s\'', master_cmd) 97 self._master_job = utils.BgJob( 98 master_cmd, nickname='master-ssh', 99 stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL, 100 unjoinable=True) 101 102 # To prevent a race between the master ssh connection 103 # startup and its first attempted use, wait for socket file to 104 # exist before returning. 105 try: 106 utils.poll_for_condition( 107 condition=lambda: os.path.exists(self._socket_path), 108 timeout=timeout, 109 sleep_interval=0.2, 110 desc='Wait for a socket file to exist') 111 # log the issue if it fails, but don't throw an exception 112 except utils.TimeoutError: 113 logging.info('Timed out waiting for master-ssh connection ' 114 'to be established.') 115 116 117 def close(self): 118 """Releases all resources used by multiplexed ssh connection.""" 119 with self._lock: 120 self._close_internal() 121 122 def _close_internal(self): 123 # Assume that when this is called, _lock should be acquired, already. 124 if self._master_job: 125 logging.debug('Nuking ssh master_job') 126 utils.nuke_subprocess(self._master_job.sp) 127 self._master_job = None 128 129 if self._master_tempdir: 130 logging.debug('Cleaning ssh master_tempdir') 131 self._master_tempdir.clean() 132 self._master_tempdir = None 133 134 135class ConnectionPool(object): 136 """Holds SSH multiplex connection instance.""" 137 138 def __init__(self): 139 self._pool = {} 140 self._lock = threading.Lock() 141 142 def get(self, hostname, user, port): 143 """Returns MasterSsh instance for the given endpoint. 144 145 If the pool holds the instance already, returns it. If not, create the 146 instance, and returns it. 147 148 Caller has the responsibility to call maybe_start() before using it. 149 150 @param hostname: Host name of the endpoint. 151 @param user: User name to log in. 152 @param port: Port number sshd is listening. 153 """ 154 key = (hostname, user, port) 155 logging.debug('Get master ssh connection for %s@%s:%d', user, hostname, 156 port) 157 158 with self._lock: 159 conn = self._pool.get(key) 160 if not conn: 161 conn = MasterSsh(hostname, user, port) 162 self._pool[key] = conn 163 return conn 164 165 def shutdown(self): 166 """Closes all ssh multiplex connections.""" 167 for ssh in self._pool.itervalues(): 168 ssh.close() 169