1'''This class extends pexpect.spawn to specialize setting up SSH connections. 2This adds methods for login, logout, and expecting the shell prompt. 3 4PEXPECT LICENSE 5 6 This license is approved by the OSI and FSF as GPL-compatible. 7 http://opensource.org/licenses/isc-license.txt 8 9 Copyright (c) 2012, Noah Spurrier <noah@noah.org> 10 PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY 11 PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE 12 COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. 13 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 14 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 15 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 16 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 17 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 18 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 19 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 20 21''' 22 23from pexpect import ExceptionPexpect, TIMEOUT, EOF, spawn 24import time 25import os 26import sys 27import re 28 29__all__ = ['ExceptionPxssh', 'pxssh'] 30 31# Exception classes used by this module. 32class ExceptionPxssh(ExceptionPexpect): 33 '''Raised for pxssh exceptions. 34 ''' 35 36if sys.version_info > (3, 0): 37 from shlex import quote 38else: 39 _find_unsafe = re.compile(r'[^\w@%+=:,./-]').search 40 41 def quote(s): 42 """Return a shell-escaped version of the string *s*.""" 43 if not s: 44 return "''" 45 if _find_unsafe(s) is None: 46 return s 47 48 # use single quotes, and put single quotes into double quotes 49 # the string $'b is then quoted as '$'"'"'b' 50 return "'" + s.replace("'", "'\"'\"'") + "'" 51 52class pxssh (spawn): 53 '''This class extends pexpect.spawn to specialize setting up SSH 54 connections. This adds methods for login, logout, and expecting the shell 55 prompt. It does various tricky things to handle many situations in the SSH 56 login process. For example, if the session is your first login, then pxssh 57 automatically accepts the remote certificate; or if you have public key 58 authentication setup then pxssh won't wait for the password prompt. 59 60 pxssh uses the shell prompt to synchronize output from the remote host. In 61 order to make this more robust it sets the shell prompt to something more 62 unique than just $ or #. This should work on most Borne/Bash or Csh style 63 shells. 64 65 Example that runs a few commands on a remote server and prints the result:: 66 67 from pexpect import pxssh 68 import getpass 69 try: 70 s = pxssh.pxssh() 71 hostname = raw_input('hostname: ') 72 username = raw_input('username: ') 73 password = getpass.getpass('password: ') 74 s.login(hostname, username, password) 75 s.sendline('uptime') # run a command 76 s.prompt() # match the prompt 77 print(s.before) # print everything before the prompt. 78 s.sendline('ls -l') 79 s.prompt() 80 print(s.before) 81 s.sendline('df') 82 s.prompt() 83 print(s.before) 84 s.logout() 85 except pxssh.ExceptionPxssh as e: 86 print("pxssh failed on login.") 87 print(e) 88 89 Example showing how to specify SSH options:: 90 91 from pexpect import pxssh 92 s = pxssh.pxssh(options={ 93 "StrictHostKeyChecking": "no", 94 "UserKnownHostsFile": "/dev/null"}) 95 ... 96 97 Note that if you have ssh-agent running while doing development with pxssh 98 then this can lead to a lot of confusion. Many X display managers (xdm, 99 gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI 100 dialog box popup asking for a password during development. You should turn 101 off any key agents during testing. The 'force_password' attribute will turn 102 off public key authentication. This will only work if the remote SSH server 103 is configured to allow password logins. Example of using 'force_password' 104 attribute:: 105 106 s = pxssh.pxssh() 107 s.force_password = True 108 hostname = raw_input('hostname: ') 109 username = raw_input('username: ') 110 password = getpass.getpass('password: ') 111 s.login (hostname, username, password) 112 113 `debug_command_string` is only for the test suite to confirm that the string 114 generated for SSH is correct, using this will not allow you to do 115 anything other than get a string back from `pxssh.pxssh.login()`. 116 ''' 117 118 def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, 119 logfile=None, cwd=None, env=None, ignore_sighup=True, echo=True, 120 options={}, encoding=None, codec_errors='strict', 121 debug_command_string=False): 122 123 spawn.__init__(self, None, timeout=timeout, maxread=maxread, 124 searchwindowsize=searchwindowsize, logfile=logfile, 125 cwd=cwd, env=env, ignore_sighup=ignore_sighup, echo=echo, 126 encoding=encoding, codec_errors=codec_errors) 127 128 self.name = '<pxssh>' 129 130 #SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a 131 #slightly different string than the regular expression to match it. This 132 #is because when you set the prompt the command will echo back, but we 133 #don't want to match the echoed command. So if we make the set command 134 #slightly different than the regex we eliminate the problem. To make the 135 #set command different we add a backslash in front of $. The $ doesn't 136 #need to be escaped, but it doesn't hurt and serves to make the set 137 #prompt command different than the regex. 138 139 # used to match the command-line prompt 140 self.UNIQUE_PROMPT = r"\[PEXPECT\][\$\#] " 141 self.PROMPT = self.UNIQUE_PROMPT 142 143 # used to set shell command-line prompt to UNIQUE_PROMPT. 144 self.PROMPT_SET_SH = r"PS1='[PEXPECT]\$ '" 145 self.PROMPT_SET_CSH = r"set prompt='[PEXPECT]\$ '" 146 self.SSH_OPTS = ("-o'RSAAuthentication=no'" 147 + " -o 'PubkeyAuthentication=no'") 148# Disabling host key checking, makes you vulnerable to MITM attacks. 149# + " -o 'StrictHostKeyChecking=no'" 150# + " -o 'UserKnownHostsFile /dev/null' ") 151 # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from 152 # displaying a GUI password dialog. I have not figured out how to 153 # disable only SSH_ASKPASS without also disabling X11 forwarding. 154 # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! 155 #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" 156 self.force_password = False 157 158 self.debug_command_string = debug_command_string 159 160 # User defined SSH options, eg, 161 # ssh.otions = dict(StrictHostKeyChecking="no",UserKnownHostsFile="/dev/null") 162 self.options = options 163 164 def levenshtein_distance(self, a, b): 165 '''This calculates the Levenshtein distance between a and b. 166 ''' 167 168 n, m = len(a), len(b) 169 if n > m: 170 a,b = b,a 171 n,m = m,n 172 current = range(n+1) 173 for i in range(1,m+1): 174 previous, current = current, [i]+[0]*n 175 for j in range(1,n+1): 176 add, delete = previous[j]+1, current[j-1]+1 177 change = previous[j-1] 178 if a[j-1] != b[i-1]: 179 change = change + 1 180 current[j] = min(add, delete, change) 181 return current[n] 182 183 def try_read_prompt(self, timeout_multiplier): 184 '''This facilitates using communication timeouts to perform 185 synchronization as quickly as possible, while supporting high latency 186 connections with a tunable worst case performance. Fast connections 187 should be read almost immediately. Worst case performance for this 188 method is timeout_multiplier * 3 seconds. 189 ''' 190 191 # maximum time allowed to read the first response 192 first_char_timeout = timeout_multiplier * 0.5 193 194 # maximum time allowed between subsequent characters 195 inter_char_timeout = timeout_multiplier * 0.1 196 197 # maximum time for reading the entire prompt 198 total_timeout = timeout_multiplier * 3.0 199 200 prompt = self.string_type() 201 begin = time.time() 202 expired = 0.0 203 timeout = first_char_timeout 204 205 while expired < total_timeout: 206 try: 207 prompt += self.read_nonblocking(size=1, timeout=timeout) 208 expired = time.time() - begin # updated total time expired 209 timeout = inter_char_timeout 210 except TIMEOUT: 211 break 212 213 return prompt 214 215 def sync_original_prompt (self, sync_multiplier=1.0): 216 '''This attempts to find the prompt. Basically, press enter and record 217 the response; press enter again and record the response; if the two 218 responses are similar then assume we are at the original prompt. 219 This can be a slow function. Worst case with the default sync_multiplier 220 can take 12 seconds. Low latency connections are more likely to fail 221 with a low sync_multiplier. Best case sync time gets worse with a 222 high sync multiplier (500 ms with default). ''' 223 224 # All of these timing pace values are magic. 225 # I came up with these based on what seemed reliable for 226 # connecting to a heavily loaded machine I have. 227 self.sendline() 228 time.sleep(0.1) 229 230 try: 231 # Clear the buffer before getting the prompt. 232 self.try_read_prompt(sync_multiplier) 233 except TIMEOUT: 234 pass 235 236 self.sendline() 237 x = self.try_read_prompt(sync_multiplier) 238 239 self.sendline() 240 a = self.try_read_prompt(sync_multiplier) 241 242 self.sendline() 243 b = self.try_read_prompt(sync_multiplier) 244 245 ld = self.levenshtein_distance(a,b) 246 len_a = len(a) 247 if len_a == 0: 248 return False 249 if float(ld)/len_a < 0.4: 250 return True 251 return False 252 253 ### TODO: This is getting messy and I'm pretty sure this isn't perfect. 254 ### TODO: I need to draw a flow chart for this. 255 ### TODO: Unit tests for SSH tunnels, remote SSH command exec, disabling original prompt sync 256 def login (self, server, username, password='', terminal_type='ansi', 257 original_prompt=r"[#$]", login_timeout=10, port=None, 258 auto_prompt_reset=True, ssh_key=None, quiet=True, 259 sync_multiplier=1, check_local_ip=True, 260 password_regex=r'(?i)(?:password:)|(?:passphrase for key)', 261 ssh_tunnels={}, spawn_local_ssh=True, 262 sync_original_prompt=True, ssh_config=None): 263 '''This logs the user into the given server. 264 265 It uses 266 'original_prompt' to try to find the prompt right after login. When it 267 finds the prompt it immediately tries to reset the prompt to something 268 more easily matched. The default 'original_prompt' is very optimistic 269 and is easily fooled. It's more reliable to try to match the original 270 prompt as exactly as possible to prevent false matches by server 271 strings such as the "Message Of The Day". On many systems you can 272 disable the MOTD on the remote server by creating a zero-length file 273 called :file:`~/.hushlogin` on the remote server. If a prompt cannot be found 274 then this will not necessarily cause the login to fail. In the case of 275 a timeout when looking for the prompt we assume that the original 276 prompt was so weird that we could not match it, so we use a few tricks 277 to guess when we have reached the prompt. Then we hope for the best and 278 blindly try to reset the prompt to something more unique. If that fails 279 then login() raises an :class:`ExceptionPxssh` exception. 280 281 In some situations it is not possible or desirable to reset the 282 original prompt. In this case, pass ``auto_prompt_reset=False`` to 283 inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh 284 uses a unique prompt in the :meth:`prompt` method. If the original prompt is 285 not reset then this will disable the :meth:`prompt` method unless you 286 manually set the :attr:`PROMPT` attribute. 287 288 Set ``password_regex`` if there is a MOTD message with `password` in it. 289 Changing this is like playing in traffic, don't (p)expect it to match straight 290 away. 291 292 If you require to connect to another SSH server from the your original SSH 293 connection set ``spawn_local_ssh`` to `False` and this will use your current 294 session to do so. Setting this option to `False` and not having an active session 295 will trigger an error. 296 297 Set ``ssh_key`` to a file path to an SSH private key to use that SSH key 298 for the session authentication. 299 Set ``ssh_key`` to `True` to force passing the current SSH authentication socket 300 to the desired ``hostname``. 301 302 Set ``ssh_config`` to a file path string of an SSH client config file to pass that 303 file to the client to handle itself. You may set any options you wish in here, however 304 doing so will require you to post extra information that you may not want to if you 305 run into issues. 306 ''' 307 308 session_regex_array = ["(?i)are you sure you want to continue connecting", original_prompt, password_regex, "(?i)permission denied", "(?i)terminal type", TIMEOUT] 309 session_init_regex_array = [] 310 session_init_regex_array.extend(session_regex_array) 311 session_init_regex_array.extend(["(?i)connection closed by remote host", EOF]) 312 313 ssh_options = ''.join([" -o '%s=%s'" % (o, v) for (o, v) in self.options.items()]) 314 if quiet: 315 ssh_options = ssh_options + ' -q' 316 if not check_local_ip: 317 ssh_options = ssh_options + " -o'NoHostAuthenticationForLocalhost=yes'" 318 if self.force_password: 319 ssh_options = ssh_options + ' ' + self.SSH_OPTS 320 if ssh_config is not None: 321 if spawn_local_ssh and not os.path.isfile(ssh_config): 322 raise ExceptionPxssh('SSH config does not exist or is not a file.') 323 ssh_options = ssh_options + '-F ' + ssh_config 324 if port is not None: 325 ssh_options = ssh_options + ' -p %s'%(str(port)) 326 if ssh_key is not None: 327 # Allow forwarding our SSH key to the current session 328 if ssh_key==True: 329 ssh_options = ssh_options + ' -A' 330 else: 331 if spawn_local_ssh and not os.path.isfile(ssh_key): 332 raise ExceptionPxssh('private ssh key does not exist or is not a file.') 333 ssh_options = ssh_options + ' -i %s' % (ssh_key) 334 335 # SSH tunnels, make sure you know what you're putting into the lists 336 # under each heading. Do not expect these to open 100% of the time, 337 # The port you're requesting might be bound. 338 # 339 # The structure should be like this: 340 # { 'local': ['2424:localhost:22'], # Local SSH tunnels 341 # 'remote': ['2525:localhost:22'], # Remote SSH tunnels 342 # 'dynamic': [8888] } # Dynamic/SOCKS tunnels 343 if ssh_tunnels!={} and isinstance({},type(ssh_tunnels)): 344 tunnel_types = { 345 'local':'L', 346 'remote':'R', 347 'dynamic':'D' 348 } 349 for tunnel_type in tunnel_types: 350 cmd_type = tunnel_types[tunnel_type] 351 if tunnel_type in ssh_tunnels: 352 tunnels = ssh_tunnels[tunnel_type] 353 for tunnel in tunnels: 354 if spawn_local_ssh==False: 355 tunnel = quote(str(tunnel)) 356 ssh_options = ssh_options + ' -' + cmd_type + ' ' + str(tunnel) 357 cmd = "ssh %s -l %s %s" % (ssh_options, username, server) 358 if self.debug_command_string: 359 return(cmd) 360 361 # Are we asking for a local ssh command or to spawn one in another session? 362 if spawn_local_ssh: 363 spawn._spawn(self, cmd) 364 else: 365 self.sendline(cmd) 366 367 # This does not distinguish between a remote server 'password' prompt 368 # and a local ssh 'passphrase' prompt (for unlocking a private key). 369 i = self.expect(session_init_regex_array, timeout=login_timeout) 370 371 # First phase 372 if i==0: 373 # New certificate -- always accept it. 374 # This is what you get if SSH does not have the remote host's 375 # public key stored in the 'known_hosts' cache. 376 self.sendline("yes") 377 i = self.expect(session_regex_array) 378 if i==2: # password or passphrase 379 self.sendline(password) 380 i = self.expect(session_regex_array) 381 if i==4: 382 self.sendline(terminal_type) 383 i = self.expect(session_regex_array) 384 if i==7: 385 self.close() 386 raise ExceptionPxssh('Could not establish connection to host') 387 388 # Second phase 389 if i==0: 390 # This is weird. This should not happen twice in a row. 391 self.close() 392 raise ExceptionPxssh('Weird error. Got "are you sure" prompt twice.') 393 elif i==1: # can occur if you have a public key pair set to authenticate. 394 ### TODO: May NOT be OK if expect() got tricked and matched a false prompt. 395 pass 396 elif i==2: # password prompt again 397 # For incorrect passwords, some ssh servers will 398 # ask for the password again, others return 'denied' right away. 399 # If we get the password prompt again then this means 400 # we didn't get the password right the first time. 401 self.close() 402 raise ExceptionPxssh('password refused') 403 elif i==3: # permission denied -- password was bad. 404 self.close() 405 raise ExceptionPxssh('permission denied') 406 elif i==4: # terminal type again? WTF? 407 self.close() 408 raise ExceptionPxssh('Weird error. Got "terminal type" prompt twice.') 409 elif i==5: # Timeout 410 #This is tricky... I presume that we are at the command-line prompt. 411 #It may be that the shell prompt was so weird that we couldn't match 412 #it. Or it may be that we couldn't log in for some other reason. I 413 #can't be sure, but it's safe to guess that we did login because if 414 #I presume wrong and we are not logged in then this should be caught 415 #later when I try to set the shell prompt. 416 pass 417 elif i==6: # Connection closed by remote host 418 self.close() 419 raise ExceptionPxssh('connection closed') 420 else: # Unexpected 421 self.close() 422 raise ExceptionPxssh('unexpected login response') 423 if sync_original_prompt: 424 if not self.sync_original_prompt(sync_multiplier): 425 self.close() 426 raise ExceptionPxssh('could not synchronize with original prompt') 427 # We appear to be in. 428 # set shell prompt to something unique. 429 if auto_prompt_reset: 430 if not self.set_unique_prompt(): 431 self.close() 432 raise ExceptionPxssh('could not set shell prompt ' 433 '(received: %r, expected: %r).' % ( 434 self.before, self.PROMPT,)) 435 return True 436 437 def logout (self): 438 '''Sends exit to the remote shell. 439 440 If there are stopped jobs then this automatically sends exit twice. 441 ''' 442 self.sendline("exit") 443 index = self.expect([EOF, "(?i)there are stopped jobs"]) 444 if index==1: 445 self.sendline("exit") 446 self.expect(EOF) 447 self.close() 448 449 def prompt(self, timeout=-1): 450 '''Match the next shell prompt. 451 452 This is little more than a short-cut to the :meth:`~pexpect.spawn.expect` 453 method. Note that if you called :meth:`login` with 454 ``auto_prompt_reset=False``, then before calling :meth:`prompt` you must 455 set the :attr:`PROMPT` attribute to a regex that it will use for 456 matching the prompt. 457 458 Calling :meth:`prompt` will erase the contents of the :attr:`before` 459 attribute even if no prompt is ever matched. If timeout is not given or 460 it is set to -1 then self.timeout is used. 461 462 :return: True if the shell prompt was matched, False if the timeout was 463 reached. 464 ''' 465 466 if timeout == -1: 467 timeout = self.timeout 468 i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) 469 if i==1: 470 return False 471 return True 472 473 def set_unique_prompt(self): 474 '''This sets the remote prompt to something more unique than ``#`` or ``$``. 475 This makes it easier for the :meth:`prompt` method to match the shell prompt 476 unambiguously. This method is called automatically by the :meth:`login` 477 method, but you may want to call it manually if you somehow reset the 478 shell prompt. For example, if you 'su' to a different user then you 479 will need to manually reset the prompt. This sends shell commands to 480 the remote host to set the prompt, so this assumes the remote host is 481 ready to receive commands. 482 483 Alternatively, you may use your own prompt pattern. In this case you 484 should call :meth:`login` with ``auto_prompt_reset=False``; then set the 485 :attr:`PROMPT` attribute to a regular expression. After that, the 486 :meth:`prompt` method will try to match your prompt pattern. 487 ''' 488 489 self.sendline("unset PROMPT_COMMAND") 490 self.sendline(self.PROMPT_SET_SH) # sh-style 491 i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) 492 if i == 0: # csh-style 493 self.sendline(self.PROMPT_SET_CSH) 494 i = self.expect([TIMEOUT, self.PROMPT], timeout=10) 495 if i == 0: 496 return False 497 return True 498 499# vi:ts=4:sw=4:expandtab:ft=python: 500