1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import multiprocessing 7import os 8import time 9 10from autotest_lib.client.common_lib import autotemp 11from autotest_lib.server import utils 12 13_MASTER_SSH_COMMAND_TEMPLATE = ( 14 '/usr/bin/ssh -a -x -N ' 15 '-o ControlMaster=yes ' # Create multiplex socket. 16 '-o ControlPath=%(socket)s ' 17 '-o StrictHostKeyChecking=no ' 18 '-o UserKnownHostsFile=/dev/null ' 19 '-o BatchMode=yes ' 20 '-o ConnectTimeout=30 ' 21 '-o ServerAliveInterval=900 ' 22 '-o ServerAliveCountMax=3 ' 23 '-o ConnectionAttempts=4 ' 24 '-o Protocol=2 ' 25 '-l %(user)s -p %(port)d %(hostname)s') 26 27 28class MasterSsh(object): 29 """Manages multiplex ssh connection.""" 30 31 def __init__(self, hostname, user, port): 32 self._hostname = hostname 33 self._user = user 34 self._port = port 35 36 self._master_job = None 37 self._master_tempdir = None 38 39 self._lock = multiprocessing.Lock() 40 41 @property 42 def _socket_path(self): 43 return os.path.join(self._master_tempdir.name, 'socket') 44 45 @property 46 def ssh_option(self): 47 """Returns the ssh option to use this multiplexed ssh. 48 49 If background process is not running, returns an empty string. 50 """ 51 if not self._master_tempdir: 52 return '' 53 return '-o ControlPath=%s' % (self._socket_path,) 54 55 def maybe_start(self, timeout=5): 56 """Starts the background process to run multiplex ssh connection. 57 58 If there already is a background process running, this does nothing. 59 If there is a stale process or a stale socket, first clean them up, 60 then create a background process. 61 62 @param timeout: timeout in seconds (default 5) to wait for master ssh 63 connection to be established. If timeout is reached, a 64 warning message is logged, but no other action is 65 taken. 66 """ 67 # Multiple processes might try in parallel to clean up the old master 68 # ssh connection and create a new one, therefore use a lock to protect 69 # against race conditions. 70 with self._lock: 71 # If a previously started master SSH connection is not running 72 # anymore, it needs to be cleaned up and then restarted. 73 if (self._master_job and (not os.path.exists(self._socket_path) or 74 self._master_job.sp.poll() is not None)): 75 logging.info( 76 'Master ssh connection to %s is down.', self._hostname) 77 self._close_internal() 78 79 # Start a new master SSH connection. 80 if not self._master_job: 81 # Create a shared socket in a temp location. 82 self._master_tempdir = autotemp.tempdir(unique_id='ssh-master') 83 84 # Start the master SSH connection in the background. 85 master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % { 86 'hostname': self._hostname, 87 'user': self._user, 88 'port': self._port, 89 'socket': self._socket_path, 90 } 91 logging.info( 92 'Starting master ssh connection \'%s\'', master_cmd) 93 self._master_job = utils.BgJob( 94 master_cmd, nickname='master-ssh', 95 stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL, 96 unjoinable=True) 97 98 # To prevent a race between the the master ssh connection 99 # startup and its first attempted use, wait for socket file to 100 # exist before returning. 101 end_time = time.time() + timeout 102 while time.time() < end_time: 103 if os.path.exists(self._socket_path): 104 break 105 time.sleep(.2) 106 else: 107 logging.info('Timed out waiting for master-ssh connection ' 108 'to be established.') 109 110 def close(self): 111 """Releases all resources used by multiplexed ssh connection.""" 112 with self._lock: 113 self._close_internal() 114 115 def _close_internal(self): 116 # Assume that when this is called, _lock should be acquired, already. 117 if self._master_job: 118 logging.debug('Nuking ssh master_job') 119 utils.nuke_subprocess(self._master_job.sp) 120 self._master_job = None 121 122 if self._master_tempdir: 123 logging.debug('Cleaning ssh master_tempdir') 124 self._master_tempdir.clean() 125 self._master_tempdir = None 126 127 128class ConnectionPool(object): 129 """Holds SSH multiplex connection instance.""" 130 131 def __init__(self): 132 self._pool = {} 133 134 def get(self, hostname, user, port): 135 """Returns MasterSsh instance for the given endpoint. 136 137 If the pool holds the instance already, returns it. If not, create the 138 instance, and returns it. 139 140 Caller has the responsibility to call maybe_start() before using it. 141 142 @param hostname: Host name of the endpoint. 143 @param user: User name to log in. 144 @param port: Port number sshd is listening. 145 """ 146 key = (hostname, user, port) 147 master_ssh = self._pool.get(key) 148 if not master_ssh: 149 master_ssh = MasterSsh(hostname, user, port) 150 self._pool[key] = master_ssh 151 return master_ssh 152 153 def shutdown(self): 154 """Closes all ssh multiplex connections.""" 155 for ssh in self._pool.itervalues(): 156 ssh.close() 157