1"""This class extends pexpect.spawn to specialize setting up SSH connections. 2This adds methods for login, logout, and expecting the shell prompt. 3 4$Id: pxssh.py 487 2007-08-29 22:33:29Z noah $ 5""" 6 7from pexpect import * 8import pexpect 9import time 10 11__all__ = ['ExceptionPxssh', 'pxssh'] 12 13# Exception classes used by this module. 14class ExceptionPxssh(ExceptionPexpect): 15 """Raised for pxssh exceptions. 16 """ 17 18class pxssh (spawn): 19 20 """This class extends pexpect.spawn to specialize setting up SSH 21 connections. This adds methods for login, logout, and expecting the shell 22 prompt. It does various tricky things to handle many situations in the SSH 23 login process. For example, if the session is your first login, then pxssh 24 automatically accepts the remote certificate; or if you have public key 25 authentication setup then pxssh won't wait for the password prompt. 26 27 pxssh uses the shell prompt to synchronize output from the remote host. In 28 order to make this more robust it sets the shell prompt to something more 29 unique than just $ or #. This should work on most Borne/Bash or Csh style 30 shells. 31 32 Example that runs a few commands on a remote server and prints the result:: 33 34 import pxssh 35 import getpass 36 try: 37 s = pxssh.pxssh() 38 hostname = raw_input('hostname: ') 39 username = raw_input('username: ') 40 password = getpass.getpass('password: ') 41 s.login (hostname, username, password) 42 s.sendline ('uptime') # run a command 43 s.prompt() # match the prompt 44 print s.before # print everything before the prompt. 45 s.sendline ('ls -l') 46 s.prompt() 47 print s.before 48 s.sendline ('df') 49 s.prompt() 50 print s.before 51 s.logout() 52 except pxssh.ExceptionPxssh, e: 53 print "pxssh failed on login." 54 print str(e) 55 56 Note that if you have ssh-agent running while doing development with pxssh 57 then this can lead to a lot of confusion. Many X display managers (xdm, 58 gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI 59 dialog box popup asking for a password during development. You should turn 60 off any key agents during testing. The 'force_password' attribute will turn 61 off public key authentication. This will only work if the remote SSH server 62 is configured to allow password logins. Example of using 'force_password' 63 attribute:: 64 65 s = pxssh.pxssh() 66 s.force_password = True 67 hostname = raw_input('hostname: ') 68 username = raw_input('username: ') 69 password = getpass.getpass('password: ') 70 s.login (hostname, username, password) 71 """ 72 73 def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None): 74 spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env) 75 76 self.name = '<pxssh>' 77 78 #SUBTLE HACK ALERT! Note that the command to set the prompt uses a 79 #slightly different string than the regular expression to match it. This 80 #is because when you set the prompt the command will echo back, but we 81 #don't want to match the echoed command. So if we make the set command 82 #slightly different than the regex we eliminate the problem. To make the 83 #set command different we add a backslash in front of $. The $ doesn't 84 #need to be escaped, but it doesn't hurt and serves to make the set 85 #prompt command different than the regex. 86 87 # used to match the command-line prompt 88 self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] " 89 self.PROMPT = self.UNIQUE_PROMPT 90 91 # used to set shell command-line prompt to UNIQUE_PROMPT. 92 self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '" 93 self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '" 94 self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" 95 # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from 96 # displaying a GUI password dialog. I have not figured out how to 97 # disable only SSH_ASKPASS without also disabling X11 forwarding. 98 # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! 99 #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" 100 self.force_password = False 101 self.auto_prompt_reset = True 102 103 def levenshtein_distance(self, a,b): 104 105 """This calculates the Levenshtein distance between a and b. 106 """ 107 108 n, m = len(a), len(b) 109 if n > m: 110 a,b = b,a 111 n,m = m,n 112 current = range(n+1) 113 for i in range(1,m+1): 114 previous, current = current, [i]+[0]*n 115 for j in range(1,n+1): 116 add, delete = previous[j]+1, current[j-1]+1 117 change = previous[j-1] 118 if a[j-1] != b[i-1]: 119 change = change + 1 120 current[j] = min(add, delete, change) 121 return current[n] 122 123 def synch_original_prompt (self): 124 125 """This attempts to find the prompt. Basically, press enter and record 126 the response; press enter again and record the response; if the two 127 responses are similar then assume we are at the original prompt. """ 128 129 # All of these timing pace values are magic. 130 # I came up with these based on what seemed reliable for 131 # connecting to a heavily loaded machine I have. 132 # If latency is worse than these values then this will fail. 133 134 self.sendline() 135 time.sleep(0.5) 136 self.read_nonblocking(size=10000,timeout=1) # GAS: Clear out the cache before getting the prompt 137 time.sleep(0.1) 138 self.sendline() 139 time.sleep(0.5) 140 x = self.read_nonblocking(size=1000,timeout=1) 141 time.sleep(0.1) 142 self.sendline() 143 time.sleep(0.5) 144 a = self.read_nonblocking(size=1000,timeout=1) 145 time.sleep(0.1) 146 self.sendline() 147 time.sleep(0.5) 148 b = self.read_nonblocking(size=1000,timeout=1) 149 ld = self.levenshtein_distance(a,b) 150 len_a = len(a) 151 if len_a == 0: 152 return False 153 if float(ld)/len_a < 0.4: 154 return True 155 return False 156 157 ### TODO: This is getting messy and I'm pretty sure this isn't perfect. 158 ### TODO: I need to draw a flow chart for this. 159 def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True): 160 161 """This logs the user into the given server. It uses the 162 'original_prompt' to try to find the prompt right after login. When it 163 finds the prompt it immediately tries to reset the prompt to something 164 more easily matched. The default 'original_prompt' is very optimistic 165 and is easily fooled. It's more reliable to try to match the original 166 prompt as exactly as possible to prevent false matches by server 167 strings such as the "Message Of The Day". On many systems you can 168 disable the MOTD on the remote server by creating a zero-length file 169 called "~/.hushlogin" on the remote server. If a prompt cannot be found 170 then this will not necessarily cause the login to fail. In the case of 171 a timeout when looking for the prompt we assume that the original 172 prompt was so weird that we could not match it, so we use a few tricks 173 to guess when we have reached the prompt. Then we hope for the best and 174 blindly try to reset the prompt to something more unique. If that fails 175 then login() raises an ExceptionPxssh exception. 176 177 In some situations it is not possible or desirable to reset the 178 original prompt. In this case, set 'auto_prompt_reset' to False to 179 inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh 180 uses a unique prompt in the prompt() method. If the original prompt is 181 not reset then this will disable the prompt() method unless you 182 manually set the PROMPT attribute. """ 183 184 ssh_options = '-q' 185 if self.force_password: 186 ssh_options = ssh_options + ' ' + self.SSH_OPTS 187 if port is not None: 188 ssh_options = ssh_options + ' -p %s'%(str(port)) 189 cmd = "ssh %s -l %s %s" % (ssh_options, username, server) 190 191 # This does not distinguish between a remote server 'password' prompt 192 # and a local ssh 'passphrase' prompt (for unlocking a private key). 193 spawn._spawn(self, cmd) 194 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout) 195 196 # First phase 197 if i==0: 198 # New certificate -- always accept it. 199 # This is what you get if SSH does not have the remote host's 200 # public key stored in the 'known_hosts' cache. 201 self.sendline("yes") 202 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) 203 if i==2: # password or passphrase 204 self.sendline(password) 205 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) 206 if i==4: 207 self.sendline(terminal_type) 208 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) 209 210 # Second phase 211 if i==0: 212 # This is weird. This should not happen twice in a row. 213 self.close() 214 raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.') 215 elif i==1: # can occur if you have a public key pair set to authenticate. 216 ### TODO: May NOT be OK if expect() got tricked and matched a false prompt. 217 pass 218 elif i==2: # password prompt again 219 # For incorrect passwords, some ssh servers will 220 # ask for the password again, others return 'denied' right away. 221 # If we get the password prompt again then this means 222 # we didn't get the password right the first time. 223 self.close() 224 raise ExceptionPxssh ('password refused') 225 elif i==3: # permission denied -- password was bad. 226 self.close() 227 raise ExceptionPxssh ('permission denied') 228 elif i==4: # terminal type again? WTF? 229 self.close() 230 raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.') 231 elif i==5: # Timeout 232 #This is tricky... I presume that we are at the command-line prompt. 233 #It may be that the shell prompt was so weird that we couldn't match 234 #it. Or it may be that we couldn't log in for some other reason. I 235 #can't be sure, but it's safe to guess that we did login because if 236 #I presume wrong and we are not logged in then this should be caught 237 #later when I try to set the shell prompt. 238 pass 239 elif i==6: # Connection closed by remote host 240 self.close() 241 raise ExceptionPxssh ('connection closed') 242 else: # Unexpected 243 self.close() 244 raise ExceptionPxssh ('unexpected login response') 245 if not self.synch_original_prompt(): 246 self.close() 247 raise ExceptionPxssh ('could not synchronize with original prompt') 248 # We appear to be in. 249 # set shell prompt to something unique. 250 if auto_prompt_reset: 251 if not self.set_unique_prompt(): 252 self.close() 253 raise ExceptionPxssh ('could not set shell prompt\n'+self.before) 254 return True 255 256 def logout (self): 257 258 """This sends exit to the remote shell. If there are stopped jobs then 259 this automatically sends exit twice. """ 260 261 self.sendline("exit") 262 index = self.expect([EOF, "(?i)there are stopped jobs"]) 263 if index==1: 264 self.sendline("exit") 265 self.expect(EOF) 266 self.close() 267 268 def prompt (self, timeout=20): 269 270 """This matches the shell prompt. This is little more than a short-cut 271 to the expect() method. This returns True if the shell prompt was 272 matched. This returns False if there was a timeout. Note that if you 273 called login() with auto_prompt_reset set to False then you should have 274 manually set the PROMPT attribute to a regex pattern for matching the 275 prompt. """ 276 277 i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) 278 if i==1: 279 return False 280 return True 281 282 def set_unique_prompt (self): 283 284 """This sets the remote prompt to something more unique than # or $. 285 This makes it easier for the prompt() method to match the shell prompt 286 unambiguously. This method is called automatically by the login() 287 method, but you may want to call it manually if you somehow reset the 288 shell prompt. For example, if you 'su' to a different user then you 289 will need to manually reset the prompt. This sends shell commands to 290 the remote host to set the prompt, so this assumes the remote host is 291 ready to receive commands. 292 293 Alternatively, you may use your own prompt pattern. Just set the PROMPT 294 attribute to a regular expression that matches it. In this case you 295 should call login() with auto_prompt_reset=False; then set the PROMPT 296 attribute. After that the prompt() method will try to match your prompt 297 pattern.""" 298 299 self.sendline ("unset PROMPT_COMMAND") 300 self.sendline (self.PROMPT_SET_SH) # sh-style 301 i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) 302 if i == 0: # csh-style 303 self.sendline (self.PROMPT_SET_CSH) 304 i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) 305 if i == 0: 306 return False 307 return True 308 309# vi:ts=4:sw=4:expandtab:ft=python: 310