1 /* 2 * Copyright (c) 2006-2011 Christian Plattner. All rights reserved. 3 * Please refer to the LICENSE.txt for licensing details. 4 */ 5 6 package ch.ethz.ssh2.transport; 7 8 import java.io.IOException; 9 import java.io.InputStream; 10 import java.io.OutputStream; 11 import java.net.InetAddress; 12 import java.net.InetSocketAddress; 13 import java.net.Socket; 14 import java.net.SocketTimeoutException; 15 import java.net.UnknownHostException; 16 import java.security.SecureRandom; 17 import java.util.List; 18 import java.util.Vector; 19 20 import ch.ethz.ssh2.ConnectionInfo; 21 import ch.ethz.ssh2.ConnectionMonitor; 22 import ch.ethz.ssh2.DHGexParameters; 23 import ch.ethz.ssh2.HTTPProxyData; 24 import ch.ethz.ssh2.HTTPProxyException; 25 import ch.ethz.ssh2.ProxyData; 26 import ch.ethz.ssh2.ServerHostKeyVerifier; 27 import ch.ethz.ssh2.crypto.Base64; 28 import ch.ethz.ssh2.crypto.CryptoWishList; 29 import ch.ethz.ssh2.crypto.cipher.BlockCipher; 30 import ch.ethz.ssh2.crypto.digest.MAC; 31 import ch.ethz.ssh2.log.Logger; 32 import ch.ethz.ssh2.packets.PacketDisconnect; 33 import ch.ethz.ssh2.packets.Packets; 34 import ch.ethz.ssh2.packets.TypesReader; 35 import ch.ethz.ssh2.util.StringEncoder; 36 import ch.ethz.ssh2.util.Tokenizer; 37 38 /* 39 * Yes, the "standard" is a big mess. On one side, the say that arbitary channel 40 * packets are allowed during kex exchange, on the other side we need to blindly 41 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that 42 * the next packet is not a channel data packet? Yes, we could check if it is in 43 * the KEX range. But the standard says nothing about this. The OpenSSH guys 44 * block local "normal" traffic during KEX. That's fine - however, they assume 45 * that the other side is doing the same. During re-key, if they receive traffic 46 * other than KEX, they become horribly irritated and kill the connection. Since 47 * we are very likely going to communicate with OpenSSH servers, we have to play 48 * the same game - even though we could do better. 49 * 50 * btw: having stdout and stderr on the same channel, with a shared window, is 51 * also a VERY good idea... =( 52 */ 53 54 /** 55 * TransportManager. 56 * 57 * @author Christian Plattner 58 * @version $Id: TransportManager.java 41 2011-06-02 10:36:41Z dkocher@sudo.ch $ 59 */ 60 public class TransportManager 61 { 62 private static final Logger log = Logger.getLogger(TransportManager.class); 63 64 private static class HandlerEntry 65 { 66 MessageHandler mh; 67 int low; 68 int high; 69 } 70 71 private final List<byte[]> asynchronousQueue = new Vector<byte[]>(); 72 private Thread asynchronousThread = null; 73 74 class AsynchronousWorker extends Thread 75 { 76 @Override run()77 public void run() 78 { 79 while (true) 80 { 81 byte[] msg = null; 82 83 synchronized (asynchronousQueue) 84 { 85 if (asynchronousQueue.size() == 0) 86 { 87 /* After the queue is empty for about 2 seconds, stop this thread */ 88 89 try 90 { 91 asynchronousQueue.wait(2000); 92 } 93 catch (InterruptedException ignore) 94 { 95 } 96 97 if (asynchronousQueue.size() == 0) 98 { 99 asynchronousThread = null; 100 return; 101 } 102 } 103 104 msg = asynchronousQueue.remove(0); 105 } 106 107 /* The following invocation may throw an IOException. 108 * There is no point in handling it - it simply means 109 * that the connection has a problem and we should stop 110 * sending asynchronously messages. We do not need to signal that 111 * we have exited (asynchronousThread = null): further 112 * messages in the queue cannot be sent by this or any 113 * other thread. 114 * Other threads will sooner or later (when receiving or 115 * sending the next message) get the same IOException and 116 * get to the same conclusion. 117 */ 118 119 try 120 { 121 sendMessage(msg); 122 } 123 catch (IOException e) 124 { 125 return; 126 } 127 } 128 } 129 } 130 131 private String hostname; 132 private int port; 133 private final Socket sock = new Socket(); 134 135 private final Object connectionSemaphore = new Object(); 136 137 private boolean flagKexOngoing = false; 138 private boolean connectionClosed = false; 139 140 private Throwable reasonClosedCause = null; 141 142 private TransportConnection tc; 143 private KexManager km; 144 145 private final List<HandlerEntry> messageHandlers = new Vector<HandlerEntry>(); 146 147 private Thread receiveThread; 148 149 private List<ConnectionMonitor> connectionMonitors = new Vector<ConnectionMonitor>(); 150 private boolean monitorsWereInformed = false; 151 152 /** 153 * There were reports that there are JDKs which use 154 * the resolver even though one supplies a dotted IP 155 * address in the Socket constructor. That is why we 156 * try to generate the InetAdress "by hand". 157 * 158 * @param host 159 * @return the InetAddress 160 * @throws UnknownHostException 161 */ createInetAddress(String host)162 private InetAddress createInetAddress(String host) throws UnknownHostException 163 { 164 /* Check if it is a dotted IP4 address */ 165 166 InetAddress addr = parseIPv4Address(host); 167 168 if (addr != null) 169 { 170 return addr; 171 } 172 173 return InetAddress.getByName(host); 174 } 175 parseIPv4Address(String host)176 private InetAddress parseIPv4Address(String host) throws UnknownHostException 177 { 178 if (host == null) 179 { 180 return null; 181 } 182 183 String[] quad = Tokenizer.parseTokens(host, '.'); 184 185 if ((quad == null) || (quad.length != 4)) 186 { 187 return null; 188 } 189 190 byte[] addr = new byte[4]; 191 192 for (int i = 0; i < 4; i++) 193 { 194 int part = 0; 195 196 if ((quad[i].length() == 0) || (quad[i].length() > 3)) 197 { 198 return null; 199 } 200 201 for (int k = 0; k < quad[i].length(); k++) 202 { 203 char c = quad[i].charAt(k); 204 205 /* No, Character.isDigit is not the same */ 206 if ((c < '0') || (c > '9')) 207 { 208 return null; 209 } 210 211 part = part * 10 + (c - '0'); 212 } 213 214 if (part > 255) /* 300.1.2.3 is invalid =) */ 215 { 216 return null; 217 } 218 219 addr[i] = (byte) part; 220 } 221 222 return InetAddress.getByAddress(host, addr); 223 } 224 TransportManager(String host, int port)225 public TransportManager(String host, int port) throws IOException 226 { 227 this.hostname = host; 228 this.port = port; 229 } 230 getPacketOverheadEstimate()231 public int getPacketOverheadEstimate() 232 { 233 return tc.getPacketOverheadEstimate(); 234 } 235 setTcpNoDelay(boolean state)236 public void setTcpNoDelay(boolean state) throws IOException 237 { 238 sock.setTcpNoDelay(state); 239 } 240 setSoTimeout(int timeout)241 public void setSoTimeout(int timeout) throws IOException 242 { 243 sock.setSoTimeout(timeout); 244 } 245 getConnectionInfo(int kexNumber)246 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException 247 { 248 return km.getOrWaitForConnectionInfo(kexNumber); 249 } 250 getReasonClosedCause()251 public Throwable getReasonClosedCause() 252 { 253 synchronized (connectionSemaphore) 254 { 255 return reasonClosedCause; 256 } 257 } 258 getSessionIdentifier()259 public byte[] getSessionIdentifier() 260 { 261 return km.sessionId; 262 } 263 close(Throwable cause, boolean useDisconnectPacket)264 public void close(Throwable cause, boolean useDisconnectPacket) 265 { 266 if (useDisconnectPacket == false) 267 { 268 /* OK, hard shutdown - do not aquire the semaphore, 269 * perhaps somebody is inside (and waits until the remote 270 * side is ready to accept new data). */ 271 272 try 273 { 274 sock.close(); 275 } 276 catch (IOException ignore) 277 { 278 } 279 280 /* OK, whoever tried to send data, should now agree that 281 * there is no point in further waiting =) 282 * It is safe now to aquire the semaphore. 283 */ 284 } 285 286 synchronized (connectionSemaphore) 287 { 288 if (connectionClosed == false) 289 { 290 if (useDisconnectPacket == true) 291 { 292 try 293 { 294 byte[] msg = new PacketDisconnect(Packets.SSH_DISCONNECT_BY_APPLICATION, cause.getMessage(), "") 295 .getPayload(); 296 if (tc != null) 297 { 298 tc.sendMessage(msg); 299 } 300 } 301 catch (IOException ignore) 302 { 303 } 304 305 try 306 { 307 sock.close(); 308 } 309 catch (IOException ignore) 310 { 311 } 312 } 313 314 connectionClosed = true; 315 reasonClosedCause = cause; /* may be null */ 316 } 317 connectionSemaphore.notifyAll(); 318 } 319 320 /* No check if we need to inform the monitors */ 321 322 List<ConnectionMonitor> monitors = new Vector<ConnectionMonitor>(); 323 324 synchronized (this) 325 { 326 /* Short term lock to protect "connectionMonitors" 327 * and "monitorsWereInformed" 328 * (they may be modified concurrently) 329 */ 330 331 if (monitorsWereInformed == false) 332 { 333 monitorsWereInformed = true; 334 monitors.addAll(connectionMonitors); 335 } 336 } 337 338 for (ConnectionMonitor cmon : monitors) 339 { 340 try 341 { 342 cmon.connectionLost(reasonClosedCause); 343 } 344 catch (Exception ignore) 345 { 346 } 347 } 348 } 349 establishConnection(ProxyData proxyData, int connectTimeout)350 private void establishConnection(ProxyData proxyData, int connectTimeout) throws IOException 351 { 352 /* See the comment for createInetAddress() */ 353 354 if (proxyData == null) 355 { 356 InetAddress addr = createInetAddress(hostname); 357 sock.connect(new InetSocketAddress(addr, port), connectTimeout); 358 return; 359 } 360 361 if (proxyData instanceof HTTPProxyData) 362 { 363 HTTPProxyData pd = (HTTPProxyData) proxyData; 364 365 /* At the moment, we only support HTTP proxies */ 366 367 InetAddress addr = createInetAddress(pd.proxyHost); 368 sock.connect(new InetSocketAddress(addr, pd.proxyPort), connectTimeout); 369 370 /* OK, now tell the proxy where we actually want to connect to */ 371 372 StringBuilder sb = new StringBuilder(); 373 374 sb.append("CONNECT "); 375 sb.append(hostname); 376 sb.append(':'); 377 sb.append(port); 378 sb.append(" HTTP/1.0\r\n"); 379 380 if ((pd.proxyUser != null) && (pd.proxyPass != null)) 381 { 382 String credentials = pd.proxyUser + ":" + pd.proxyPass; 383 char[] encoded = Base64.encode(StringEncoder.GetBytes(credentials)); 384 sb.append("Proxy-Authorization: Basic "); 385 sb.append(encoded); 386 sb.append("\r\n"); 387 } 388 389 if (pd.requestHeaderLines != null) 390 { 391 for (int i = 0; i < pd.requestHeaderLines.length; i++) 392 { 393 if (pd.requestHeaderLines[i] != null) 394 { 395 sb.append(pd.requestHeaderLines[i]); 396 sb.append("\r\n"); 397 } 398 } 399 } 400 401 sb.append("\r\n"); 402 403 OutputStream out = sock.getOutputStream(); 404 405 out.write(StringEncoder.GetBytes(sb.toString())); 406 out.flush(); 407 408 /* Now parse the HTTP response */ 409 410 byte[] buffer = new byte[1024]; 411 InputStream in = sock.getInputStream(); 412 413 int len = ClientServerHello.readLineRN(in, buffer); 414 415 String httpReponse = StringEncoder.GetString(buffer, 0, len); 416 417 if (httpReponse.startsWith("HTTP/") == false) 418 { 419 throw new IOException("The proxy did not send back a valid HTTP response."); 420 } 421 422 /* "HTTP/1.X XYZ X" => 14 characters minimum */ 423 424 if ((httpReponse.length() < 14) || (httpReponse.charAt(8) != ' ') || (httpReponse.charAt(12) != ' ')) 425 { 426 throw new IOException("The proxy did not send back a valid HTTP response."); 427 } 428 429 int errorCode = 0; 430 431 try 432 { 433 errorCode = Integer.parseInt(httpReponse.substring(9, 12)); 434 } 435 catch (NumberFormatException ignore) 436 { 437 throw new IOException("The proxy did not send back a valid HTTP response."); 438 } 439 440 if ((errorCode < 0) || (errorCode > 999)) 441 { 442 throw new IOException("The proxy did not send back a valid HTTP response."); 443 } 444 445 if (errorCode != 200) 446 { 447 throw new HTTPProxyException(httpReponse.substring(13), errorCode); 448 } 449 450 /* OK, read until empty line */ 451 452 while (true) 453 { 454 len = ClientServerHello.readLineRN(in, buffer); 455 if (len == 0) 456 { 457 break; 458 } 459 } 460 return; 461 } 462 463 throw new IOException("Unsupported ProxyData"); 464 } 465 initialize(String identification, CryptoWishList cwl, ServerHostKeyVerifier verifier, DHGexParameters dhgex, int connectTimeout, SecureRandom rnd, ProxyData proxyData)466 public void initialize(String identification, CryptoWishList cwl, ServerHostKeyVerifier verifier, 467 DHGexParameters dhgex, int connectTimeout, SecureRandom rnd, ProxyData proxyData) 468 throws IOException 469 { 470 /* First, establish the TCP connection to the SSH-2 server */ 471 472 establishConnection(proxyData, connectTimeout); 473 474 /* Parse the server line and say hello - important: this information is later needed for the 475 * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object 476 * for later use. 477 */ 478 479 ClientServerHello csh = new ClientServerHello(identification, sock.getInputStream(), sock.getOutputStream()); 480 481 tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd); 482 483 km = new KexManager(this, csh, cwl, hostname, port, verifier, rnd); 484 km.initiateKEX(cwl, dhgex); 485 486 receiveThread = new Thread(new Runnable() 487 { 488 public void run() 489 { 490 try 491 { 492 receiveLoop(); 493 } 494 catch (IOException e) 495 { 496 close(e, false); 497 498 log.warning("Receive thread: error in receiveLoop: " + e.getMessage()); 499 } 500 501 if (log.isDebugEnabled()) 502 { 503 log.debug("Receive thread: back from receiveLoop"); 504 } 505 506 /* Tell all handlers that it is time to say goodbye */ 507 508 if (km != null) 509 { 510 try 511 { 512 km.handleMessage(null, 0); 513 } 514 catch (IOException ignored) 515 { 516 } 517 } 518 519 for (HandlerEntry he : messageHandlers) 520 { 521 try 522 { 523 he.mh.handleMessage(null, 0); 524 } 525 catch (Exception ignore) 526 { 527 } 528 } 529 } 530 }); 531 532 receiveThread.setDaemon(true); 533 receiveThread.start(); 534 } 535 registerMessageHandler(MessageHandler mh, int low, int high)536 public void registerMessageHandler(MessageHandler mh, int low, int high) 537 { 538 HandlerEntry he = new HandlerEntry(); 539 he.mh = mh; 540 he.low = low; 541 he.high = high; 542 543 synchronized (messageHandlers) 544 { 545 messageHandlers.add(he); 546 } 547 } 548 removeMessageHandler(MessageHandler mh, int low, int high)549 public void removeMessageHandler(MessageHandler mh, int low, int high) 550 { 551 synchronized (messageHandlers) 552 { 553 for (int i = 0; i < messageHandlers.size(); i++) 554 { 555 HandlerEntry he = messageHandlers.get(i); 556 if ((he.mh == mh) && (he.low == low) && (he.high == high)) 557 { 558 messageHandlers.remove(i); 559 break; 560 } 561 } 562 } 563 } 564 sendKexMessage(byte[] msg)565 public void sendKexMessage(byte[] msg) throws IOException 566 { 567 synchronized (connectionSemaphore) 568 { 569 if (connectionClosed) 570 { 571 throw (IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause); 572 } 573 574 flagKexOngoing = true; 575 576 try 577 { 578 tc.sendMessage(msg); 579 } 580 catch (IOException e) 581 { 582 close(e, false); 583 throw e; 584 } 585 } 586 } 587 kexFinished()588 public void kexFinished() throws IOException 589 { 590 synchronized (connectionSemaphore) 591 { 592 flagKexOngoing = false; 593 connectionSemaphore.notifyAll(); 594 } 595 } 596 forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex)597 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex) throws IOException 598 { 599 km.initiateKEX(cwl, dhgex); 600 } 601 changeRecvCipher(BlockCipher bc, MAC mac)602 public void changeRecvCipher(BlockCipher bc, MAC mac) 603 { 604 tc.changeRecvCipher(bc, mac); 605 } 606 changeSendCipher(BlockCipher bc, MAC mac)607 public void changeSendCipher(BlockCipher bc, MAC mac) 608 { 609 tc.changeSendCipher(bc, mac); 610 } 611 sendAsynchronousMessage(byte[] msg)612 public void sendAsynchronousMessage(byte[] msg) throws IOException 613 { 614 synchronized (asynchronousQueue) 615 { 616 asynchronousQueue.add(msg); 617 618 /* This limit should be flexible enough. We need this, otherwise the peer 619 * can flood us with global requests (and other stuff where we have to reply 620 * with an asynchronous message) and (if the server just sends data and does not 621 * read what we send) this will probably put us in a low memory situation 622 * (our send queue would grow and grow and...) */ 623 624 if (asynchronousQueue.size() > 100) 625 { 626 throw new IOException("Error: the peer is not consuming our asynchronous replies."); 627 } 628 629 /* Check if we have an asynchronous sending thread */ 630 631 if (asynchronousThread == null) 632 { 633 asynchronousThread = new AsynchronousWorker(); 634 asynchronousThread.setDaemon(true); 635 asynchronousThread.start(); 636 637 /* The thread will stop after 2 seconds of inactivity (i.e., empty queue) */ 638 } 639 } 640 } 641 setConnectionMonitors(List<ConnectionMonitor> monitors)642 public void setConnectionMonitors(List<ConnectionMonitor> monitors) 643 { 644 synchronized (this) 645 { 646 connectionMonitors = new Vector<ConnectionMonitor>(); 647 connectionMonitors.addAll(monitors); 648 } 649 } 650 651 /** 652 * True if no response message expected. 653 */ 654 private boolean idle; 655 sendMessage(byte[] msg)656 public void sendMessage(byte[] msg) throws IOException 657 { 658 if (Thread.currentThread() == receiveThread) 659 { 660 throw new IOException("Assertion error: sendMessage may never be invoked by the receiver thread!"); 661 } 662 663 boolean wasInterrupted = false; 664 665 try 666 { 667 synchronized (connectionSemaphore) 668 { 669 while (true) 670 { 671 if (connectionClosed) 672 { 673 throw (IOException) new IOException("Sorry, this connection is closed.") 674 .initCause(reasonClosedCause); 675 } 676 677 if (flagKexOngoing == false) 678 { 679 break; 680 } 681 682 try 683 { 684 connectionSemaphore.wait(); 685 } 686 catch (InterruptedException e) 687 { 688 wasInterrupted = true; 689 } 690 } 691 692 try 693 { 694 tc.sendMessage(msg); 695 idle = false; 696 } 697 catch (IOException e) 698 { 699 close(e, false); 700 throw e; 701 } 702 } 703 } 704 finally 705 { 706 if (wasInterrupted) 707 Thread.currentThread().interrupt(); 708 } 709 } 710 receiveLoop()711 public void receiveLoop() throws IOException 712 { 713 byte[] msg = new byte[35000]; 714 715 while (true) 716 { 717 int msglen; 718 try 719 { 720 msglen = tc.receiveMessage(msg, 0, msg.length); 721 } 722 catch (SocketTimeoutException e) 723 { 724 // Timeout in read 725 if (idle) 726 { 727 log.debug("Ignoring socket timeout"); 728 continue; 729 } 730 throw e; 731 } 732 idle = true; 733 734 int type = msg[0] & 0xff; 735 736 if (type == Packets.SSH_MSG_IGNORE) 737 { 738 continue; 739 } 740 741 if (type == Packets.SSH_MSG_DEBUG) 742 { 743 if (log.isDebugEnabled()) 744 { 745 TypesReader tr = new TypesReader(msg, 0, msglen); 746 tr.readByte(); 747 tr.readBoolean(); 748 StringBuilder debugMessageBuffer = new StringBuilder(); 749 debugMessageBuffer.append(tr.readString("UTF-8")); 750 751 for (int i = 0; i < debugMessageBuffer.length(); i++) 752 { 753 char c = debugMessageBuffer.charAt(i); 754 755 if ((c >= 32) && (c <= 126)) 756 { 757 continue; 758 } 759 debugMessageBuffer.setCharAt(i, '\uFFFD'); 760 } 761 762 log.debug("DEBUG Message from remote: '" + debugMessageBuffer.toString() + "'"); 763 } 764 continue; 765 } 766 767 if (type == Packets.SSH_MSG_UNIMPLEMENTED) 768 { 769 throw new IOException("Peer sent UNIMPLEMENTED message, that should not happen."); 770 } 771 772 if (type == Packets.SSH_MSG_DISCONNECT) 773 { 774 TypesReader tr = new TypesReader(msg, 0, msglen); 775 tr.readByte(); 776 int reason_code = tr.readUINT32(); 777 StringBuilder reasonBuffer = new StringBuilder(); 778 reasonBuffer.append(tr.readString("UTF-8")); 779 780 /* 781 * Do not get fooled by servers that send abnormal long error 782 * messages 783 */ 784 785 if (reasonBuffer.length() > 255) 786 { 787 reasonBuffer.setLength(255); 788 reasonBuffer.setCharAt(254, '.'); 789 reasonBuffer.setCharAt(253, '.'); 790 reasonBuffer.setCharAt(252, '.'); 791 } 792 793 /* 794 * Also, check that the server did not send characters that may 795 * screw up the receiver -> restrict to reasonable US-ASCII 796 * subset -> "printable characters" (ASCII 32 - 126). Replace 797 * all others with 0xFFFD (UNICODE replacement character). 798 */ 799 800 for (int i = 0; i < reasonBuffer.length(); i++) 801 { 802 char c = reasonBuffer.charAt(i); 803 804 if ((c >= 32) && (c <= 126)) 805 { 806 continue; 807 } 808 reasonBuffer.setCharAt(i, '\uFFFD'); 809 } 810 811 throw new IOException("Peer sent DISCONNECT message (reason code " + reason_code + "): " 812 + reasonBuffer.toString()); 813 } 814 815 /* 816 * Is it a KEX Packet? 817 */ 818 819 if ((type == Packets.SSH_MSG_KEXINIT) || (type == Packets.SSH_MSG_NEWKEYS) 820 || ((type >= 30) && (type <= 49))) 821 { 822 km.handleMessage(msg, msglen); 823 continue; 824 } 825 826 MessageHandler mh = null; 827 828 for (int i = 0; i < messageHandlers.size(); i++) 829 { 830 HandlerEntry he = messageHandlers.get(i); 831 if ((he.low <= type) && (type <= he.high)) 832 { 833 mh = he.mh; 834 break; 835 } 836 } 837 838 if (mh == null) 839 { 840 throw new IOException("Unexpected SSH message (type " + type + ")"); 841 } 842 843 mh.handleMessage(msg, msglen); 844 } 845 } 846 } 847