1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import multiprocessing 7import os 8import threading 9import time 10 11from autotest_lib.client.common_lib import autotemp 12from autotest_lib.server import utils 13 14_MASTER_SSH_COMMAND_TEMPLATE = ( 15 '/usr/bin/ssh -a -x -N ' 16 '-o ControlMaster=yes ' # Create multiplex socket. 17 '-o ControlPath=%(socket)s ' 18 '-o StrictHostKeyChecking=no ' 19 '-o UserKnownHostsFile=/dev/null ' 20 '-o BatchMode=yes ' 21 '-o ConnectTimeout=30 ' 22 '-o ServerAliveInterval=900 ' 23 '-o ServerAliveCountMax=3 ' 24 '-o ConnectionAttempts=4 ' 25 '-o Protocol=2 ' 26 '-l %(user)s -p %(port)d %(hostname)s') 27 28 29class MasterSsh(object): 30 """Manages multiplex ssh connection.""" 31 32 def __init__(self, hostname, user, port): 33 self._hostname = hostname 34 self._user = user 35 self._port = port 36 37 self._master_job = None 38 self._master_tempdir = None 39 40 self._lock = multiprocessing.Lock() 41 42 def __del__(self): 43 self.close() 44 45 @property 46 def _socket_path(self): 47 return os.path.join(self._master_tempdir.name, 'socket') 48 49 @property 50 def ssh_option(self): 51 """Returns the ssh option to use this multiplexed ssh. 52 53 If background process is not running, returns an empty string. 54 """ 55 if not self._master_tempdir: 56 return '' 57 return '-o ControlPath=%s' % (self._socket_path,) 58 59 def maybe_start(self, timeout=5): 60 """Starts the background process to run multiplex ssh connection. 61 62 If there already is a background process running, this does nothing. 63 If there is a stale process or a stale socket, first clean them up, 64 then create a background process. 65 66 @param timeout: timeout in seconds (default 5) to wait for master ssh 67 connection to be established. If timeout is reached, a 68 warning message is logged, but no other action is 69 taken. 70 """ 71 # Multiple processes might try in parallel to clean up the old master 72 # ssh connection and create a new one, therefore use a lock to protect 73 # against race conditions. 74 with self._lock: 75 # If a previously started master SSH connection is not running 76 # anymore, it needs to be cleaned up and then restarted. 77 if (self._master_job and (not os.path.exists(self._socket_path) or 78 self._master_job.sp.poll() is not None)): 79 logging.info( 80 'Master ssh connection to %s is down.', self._hostname) 81 self._close_internal() 82 83 # Start a new master SSH connection. 84 if not self._master_job: 85 # Create a shared socket in a temp location. 86 self._master_tempdir = autotemp.tempdir(unique_id='ssh-master') 87 88 # Start the master SSH connection in the background. 89 master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % { 90 'hostname': self._hostname, 91 'user': self._user, 92 'port': self._port, 93 'socket': self._socket_path, 94 } 95 logging.info( 96 'Starting master ssh connection \'%s\'', master_cmd) 97 self._master_job = utils.BgJob( 98 master_cmd, nickname='master-ssh', 99 stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL, 100 unjoinable=True) 101 102 # To prevent a race between the the master ssh connection 103 # startup and its first attempted use, wait for socket file to 104 # exist before returning. 105 end_time = time.time() + timeout 106 while time.time() < end_time: 107 if os.path.exists(self._socket_path): 108 break 109 time.sleep(.2) 110 else: 111 logging.info('Timed out waiting for master-ssh connection ' 112 'to be established.') 113 114 def close(self): 115 """Releases all resources used by multiplexed ssh connection.""" 116 with self._lock: 117 self._close_internal() 118 119 def _close_internal(self): 120 # Assume that when this is called, _lock should be acquired, already. 121 if self._master_job: 122 logging.debug('Nuking ssh master_job') 123 utils.nuke_subprocess(self._master_job.sp) 124 self._master_job = None 125 126 if self._master_tempdir: 127 logging.debug('Cleaning ssh master_tempdir') 128 self._master_tempdir.clean() 129 self._master_tempdir = None 130 131 132class ConnectionPool(object): 133 """Holds SSH multiplex connection instance.""" 134 135 def __init__(self): 136 self._pool = {} 137 self._lock = threading.Lock() 138 139 def get(self, hostname, user, port): 140 """Returns MasterSsh instance for the given endpoint. 141 142 If the pool holds the instance already, returns it. If not, create the 143 instance, and returns it. 144 145 Caller has the responsibility to call maybe_start() before using it. 146 147 @param hostname: Host name of the endpoint. 148 @param user: User name to log in. 149 @param port: Port number sshd is listening. 150 """ 151 key = (hostname, user, port) 152 logging.debug('Get master ssh connection for %s@%s:%d', user, hostname, 153 port) 154 155 with self._lock: 156 conn = self._pool.get(key) 157 if not conn: 158 conn = MasterSsh(hostname, user, port) 159 self._pool[key] = conn 160 return conn 161 162 def shutdown(self): 163 """Closes all ssh multiplex connections.""" 164 for ssh in self._pool.itervalues(): 165 ssh.close() 166