• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2006-2011 Christian Plattner. All rights reserved.
3  * Please refer to the LICENSE.txt for licensing details.
4  */
5 
6 package ch.ethz.ssh2.transport;
7 
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.io.OutputStream;
11 import java.net.InetAddress;
12 import java.net.InetSocketAddress;
13 import java.net.Socket;
14 import java.net.SocketTimeoutException;
15 import java.net.UnknownHostException;
16 import java.security.SecureRandom;
17 import java.util.List;
18 import java.util.Vector;
19 
20 import ch.ethz.ssh2.ConnectionInfo;
21 import ch.ethz.ssh2.ConnectionMonitor;
22 import ch.ethz.ssh2.DHGexParameters;
23 import ch.ethz.ssh2.HTTPProxyData;
24 import ch.ethz.ssh2.HTTPProxyException;
25 import ch.ethz.ssh2.ProxyData;
26 import ch.ethz.ssh2.ServerHostKeyVerifier;
27 import ch.ethz.ssh2.crypto.Base64;
28 import ch.ethz.ssh2.crypto.CryptoWishList;
29 import ch.ethz.ssh2.crypto.cipher.BlockCipher;
30 import ch.ethz.ssh2.crypto.digest.MAC;
31 import ch.ethz.ssh2.log.Logger;
32 import ch.ethz.ssh2.packets.PacketDisconnect;
33 import ch.ethz.ssh2.packets.Packets;
34 import ch.ethz.ssh2.packets.TypesReader;
35 import ch.ethz.ssh2.util.StringEncoder;
36 import ch.ethz.ssh2.util.Tokenizer;
37 
38 /*
39  * Yes, the "standard" is a big mess. On one side, the say that arbitary channel
40  * packets are allowed during kex exchange, on the other side we need to blindly
41  * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
42  * the next packet is not a channel data packet? Yes, we could check if it is in
43  * the KEX range. But the standard says nothing about this. The OpenSSH guys
44  * block local "normal" traffic during KEX. That's fine - however, they assume
45  * that the other side is doing the same. During re-key, if they receive traffic
46  * other than KEX, they become horribly irritated and kill the connection. Since
47  * we are very likely going to communicate with OpenSSH servers, we have to play
48  * the same game - even though we could do better.
49  *
50  * btw: having stdout and stderr on the same channel, with a shared window, is
51  * also a VERY good idea... =(
52  */
53 
54 /**
55  * TransportManager.
56  *
57  * @author Christian Plattner
58  * @version $Id: TransportManager.java 41 2011-06-02 10:36:41Z dkocher@sudo.ch $
59  */
60 public class TransportManager
61 {
62 	private static final Logger log = Logger.getLogger(TransportManager.class);
63 
64 	private static class HandlerEntry
65 	{
66 		MessageHandler mh;
67 		int low;
68 		int high;
69 	}
70 
71 	private final List<byte[]> asynchronousQueue = new Vector<byte[]>();
72 	private Thread asynchronousThread = null;
73 
74 	class AsynchronousWorker extends Thread
75 	{
76 		@Override
run()77 		public void run()
78 		{
79 			while (true)
80 			{
81 				byte[] msg = null;
82 
83 				synchronized (asynchronousQueue)
84 				{
85 					if (asynchronousQueue.size() == 0)
86 					{
87 						/* After the queue is empty for about 2 seconds, stop this thread */
88 
89 						try
90 						{
91 							asynchronousQueue.wait(2000);
92 						}
93 						catch (InterruptedException ignore)
94 						{
95 						}
96 
97 						if (asynchronousQueue.size() == 0)
98 						{
99 							asynchronousThread = null;
100 							return;
101 						}
102 					}
103 
104 					msg = asynchronousQueue.remove(0);
105 				}
106 
107 				/* The following invocation may throw an IOException.
108 				 * There is no point in handling it - it simply means
109 				 * that the connection has a problem and we should stop
110 				 * sending asynchronously messages. We do not need to signal that
111 				 * we have exited (asynchronousThread = null): further
112 				 * messages in the queue cannot be sent by this or any
113 				 * other thread.
114 				 * Other threads will sooner or later (when receiving or
115 				 * sending the next message) get the same IOException and
116 				 * get to the same conclusion.
117 				 */
118 
119 				try
120 				{
121 					sendMessage(msg);
122 				}
123 				catch (IOException e)
124 				{
125 					return;
126 				}
127 			}
128 		}
129 	}
130 
131 	private String hostname;
132 	private int port;
133 	private final Socket sock = new Socket();
134 
135 	private final Object connectionSemaphore = new Object();
136 
137 	private boolean flagKexOngoing = false;
138 	private boolean connectionClosed = false;
139 
140 	private Throwable reasonClosedCause = null;
141 
142 	private TransportConnection tc;
143 	private KexManager km;
144 
145 	private final List<HandlerEntry> messageHandlers = new Vector<HandlerEntry>();
146 
147 	private Thread receiveThread;
148 
149 	private List<ConnectionMonitor> connectionMonitors = new Vector<ConnectionMonitor>();
150 	private boolean monitorsWereInformed = false;
151 
152 	/**
153 	 * There were reports that there are JDKs which use
154 	 * the resolver even though one supplies a dotted IP
155 	 * address in the Socket constructor. That is why we
156 	 * try to generate the InetAdress "by hand".
157 	 *
158 	 * @param host
159 	 * @return the InetAddress
160 	 * @throws UnknownHostException
161 	 */
createInetAddress(String host)162 	private InetAddress createInetAddress(String host) throws UnknownHostException
163 	{
164 		/* Check if it is a dotted IP4 address */
165 
166 		InetAddress addr = parseIPv4Address(host);
167 
168 		if (addr != null)
169 		{
170 			return addr;
171 		}
172 
173 		return InetAddress.getByName(host);
174 	}
175 
parseIPv4Address(String host)176 	private InetAddress parseIPv4Address(String host) throws UnknownHostException
177 	{
178 		if (host == null)
179 		{
180 			return null;
181 		}
182 
183 		String[] quad = Tokenizer.parseTokens(host, '.');
184 
185 		if ((quad == null) || (quad.length != 4))
186 		{
187 			return null;
188 		}
189 
190 		byte[] addr = new byte[4];
191 
192 		for (int i = 0; i < 4; i++)
193 		{
194 			int part = 0;
195 
196 			if ((quad[i].length() == 0) || (quad[i].length() > 3))
197 			{
198 				return null;
199 			}
200 
201 			for (int k = 0; k < quad[i].length(); k++)
202 			{
203 				char c = quad[i].charAt(k);
204 
205 				/* No, Character.isDigit is not the same */
206 				if ((c < '0') || (c > '9'))
207 				{
208 					return null;
209 				}
210 
211 				part = part * 10 + (c - '0');
212 			}
213 
214 			if (part > 255) /* 300.1.2.3 is invalid =) */
215 			{
216 				return null;
217 			}
218 
219 			addr[i] = (byte) part;
220 		}
221 
222 		return InetAddress.getByAddress(host, addr);
223 	}
224 
TransportManager(String host, int port)225 	public TransportManager(String host, int port) throws IOException
226 	{
227 		this.hostname = host;
228 		this.port = port;
229 	}
230 
getPacketOverheadEstimate()231 	public int getPacketOverheadEstimate()
232 	{
233 		return tc.getPacketOverheadEstimate();
234 	}
235 
setTcpNoDelay(boolean state)236 	public void setTcpNoDelay(boolean state) throws IOException
237 	{
238 		sock.setTcpNoDelay(state);
239 	}
240 
setSoTimeout(int timeout)241 	public void setSoTimeout(int timeout) throws IOException
242 	{
243 		sock.setSoTimeout(timeout);
244 	}
245 
getConnectionInfo(int kexNumber)246 	public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException
247 	{
248 		return km.getOrWaitForConnectionInfo(kexNumber);
249 	}
250 
getReasonClosedCause()251 	public Throwable getReasonClosedCause()
252 	{
253 		synchronized (connectionSemaphore)
254 		{
255 			return reasonClosedCause;
256 		}
257 	}
258 
getSessionIdentifier()259 	public byte[] getSessionIdentifier()
260 	{
261 		return km.sessionId;
262 	}
263 
close(Throwable cause, boolean useDisconnectPacket)264 	public void close(Throwable cause, boolean useDisconnectPacket)
265 	{
266 		if (useDisconnectPacket == false)
267 		{
268 			/* OK, hard shutdown - do not aquire the semaphore,
269 			 * perhaps somebody is inside (and waits until the remote
270 			 * side is ready to accept new data). */
271 
272 			try
273 			{
274 				sock.close();
275 			}
276 			catch (IOException ignore)
277 			{
278 			}
279 
280 			/* OK, whoever tried to send data, should now agree that
281 			 * there is no point in further waiting =)
282 			 * It is safe now to aquire the semaphore.
283 			 */
284 		}
285 
286 		synchronized (connectionSemaphore)
287 		{
288 			if (connectionClosed == false)
289 			{
290 				if (useDisconnectPacket == true)
291 				{
292 					try
293 					{
294 						byte[] msg = new PacketDisconnect(Packets.SSH_DISCONNECT_BY_APPLICATION, cause.getMessage(), "")
295 								.getPayload();
296 						if (tc != null)
297 						{
298 							tc.sendMessage(msg);
299 						}
300 					}
301 					catch (IOException ignore)
302 					{
303 					}
304 
305 					try
306 					{
307 						sock.close();
308 					}
309 					catch (IOException ignore)
310 					{
311 					}
312 				}
313 
314 				connectionClosed = true;
315 				reasonClosedCause = cause; /* may be null */
316 			}
317 			connectionSemaphore.notifyAll();
318 		}
319 
320 		/* No check if we need to inform the monitors */
321 
322 		List<ConnectionMonitor> monitors = new Vector<ConnectionMonitor>();
323 
324 		synchronized (this)
325 		{
326 			/* Short term lock to protect "connectionMonitors"
327 			 * and "monitorsWereInformed"
328 			 * (they may be modified concurrently)
329 			 */
330 
331 			if (monitorsWereInformed == false)
332 			{
333 				monitorsWereInformed = true;
334 				monitors.addAll(connectionMonitors);
335 			}
336 		}
337 
338 		for (ConnectionMonitor cmon : monitors)
339 		{
340 			try
341 			{
342 				cmon.connectionLost(reasonClosedCause);
343 			}
344 			catch (Exception ignore)
345 			{
346 			}
347 		}
348 	}
349 
establishConnection(ProxyData proxyData, int connectTimeout)350 	private void establishConnection(ProxyData proxyData, int connectTimeout) throws IOException
351 	{
352 		/* See the comment for createInetAddress() */
353 
354 		if (proxyData == null)
355 		{
356 			InetAddress addr = createInetAddress(hostname);
357 			sock.connect(new InetSocketAddress(addr, port), connectTimeout);
358 			return;
359 		}
360 
361 		if (proxyData instanceof HTTPProxyData)
362 		{
363 			HTTPProxyData pd = (HTTPProxyData) proxyData;
364 
365 			/* At the moment, we only support HTTP proxies */
366 
367 			InetAddress addr = createInetAddress(pd.proxyHost);
368 			sock.connect(new InetSocketAddress(addr, pd.proxyPort), connectTimeout);
369 
370 			/* OK, now tell the proxy where we actually want to connect to */
371 
372 			StringBuilder sb = new StringBuilder();
373 
374 			sb.append("CONNECT ");
375 			sb.append(hostname);
376 			sb.append(':');
377 			sb.append(port);
378 			sb.append(" HTTP/1.0\r\n");
379 
380 			if ((pd.proxyUser != null) && (pd.proxyPass != null))
381 			{
382 				String credentials = pd.proxyUser + ":" + pd.proxyPass;
383 				char[] encoded = Base64.encode(StringEncoder.GetBytes(credentials));
384 				sb.append("Proxy-Authorization: Basic ");
385 				sb.append(encoded);
386 				sb.append("\r\n");
387 			}
388 
389 			if (pd.requestHeaderLines != null)
390 			{
391 				for (int i = 0; i < pd.requestHeaderLines.length; i++)
392 				{
393 					if (pd.requestHeaderLines[i] != null)
394 					{
395 						sb.append(pd.requestHeaderLines[i]);
396 						sb.append("\r\n");
397 					}
398 				}
399 			}
400 
401 			sb.append("\r\n");
402 
403 			OutputStream out = sock.getOutputStream();
404 
405 			out.write(StringEncoder.GetBytes(sb.toString()));
406 			out.flush();
407 
408 			/* Now parse the HTTP response */
409 
410 			byte[] buffer = new byte[1024];
411 			InputStream in = sock.getInputStream();
412 
413 			int len = ClientServerHello.readLineRN(in, buffer);
414 
415 			String httpReponse = StringEncoder.GetString(buffer, 0, len);
416 
417 			if (httpReponse.startsWith("HTTP/") == false)
418 			{
419 				throw new IOException("The proxy did not send back a valid HTTP response.");
420 			}
421 
422 			/* "HTTP/1.X XYZ X" => 14 characters minimum */
423 
424 			if ((httpReponse.length() < 14) || (httpReponse.charAt(8) != ' ') || (httpReponse.charAt(12) != ' '))
425 			{
426 				throw new IOException("The proxy did not send back a valid HTTP response.");
427 			}
428 
429 			int errorCode = 0;
430 
431 			try
432 			{
433 				errorCode = Integer.parseInt(httpReponse.substring(9, 12));
434 			}
435 			catch (NumberFormatException ignore)
436 			{
437 				throw new IOException("The proxy did not send back a valid HTTP response.");
438 			}
439 
440 			if ((errorCode < 0) || (errorCode > 999))
441 			{
442 				throw new IOException("The proxy did not send back a valid HTTP response.");
443 			}
444 
445 			if (errorCode != 200)
446 			{
447 				throw new HTTPProxyException(httpReponse.substring(13), errorCode);
448 			}
449 
450 			/* OK, read until empty line */
451 
452 			while (true)
453 			{
454 				len = ClientServerHello.readLineRN(in, buffer);
455 				if (len == 0)
456 				{
457 					break;
458 				}
459 			}
460 			return;
461 		}
462 
463 		throw new IOException("Unsupported ProxyData");
464 	}
465 
initialize(String identification, CryptoWishList cwl, ServerHostKeyVerifier verifier, DHGexParameters dhgex, int connectTimeout, SecureRandom rnd, ProxyData proxyData)466 	public void initialize(String identification, CryptoWishList cwl, ServerHostKeyVerifier verifier,
467 						   DHGexParameters dhgex, int connectTimeout, SecureRandom rnd, ProxyData proxyData)
468 			throws IOException
469 	{
470 		/* First, establish the TCP connection to the SSH-2 server */
471 
472 		establishConnection(proxyData, connectTimeout);
473 
474 		/* Parse the server line and say hello - important: this information is later needed for the
475 		 * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
476 		 * for later use.
477 		 */
478 
479 		ClientServerHello csh = new ClientServerHello(identification, sock.getInputStream(), sock.getOutputStream());
480 
481 		tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd);
482 
483 		km = new KexManager(this, csh, cwl, hostname, port, verifier, rnd);
484 		km.initiateKEX(cwl, dhgex);
485 
486 		receiveThread = new Thread(new Runnable()
487 		{
488 			public void run()
489 			{
490 				try
491 				{
492 					receiveLoop();
493 				}
494 				catch (IOException e)
495 				{
496 					close(e, false);
497 
498 					log.warning("Receive thread: error in receiveLoop: " + e.getMessage());
499 				}
500 
501 				if (log.isDebugEnabled())
502 				{
503 					log.debug("Receive thread: back from receiveLoop");
504 				}
505 
506 				/* Tell all handlers that it is time to say goodbye */
507 
508 				if (km != null)
509 				{
510 					try
511 					{
512 						km.handleMessage(null, 0);
513 					}
514 					catch (IOException ignored)
515 					{
516 					}
517 				}
518 
519 				for (HandlerEntry he : messageHandlers)
520 				{
521 					try
522 					{
523 						he.mh.handleMessage(null, 0);
524 					}
525 					catch (Exception ignore)
526 					{
527 					}
528 				}
529 			}
530 		});
531 
532 		receiveThread.setDaemon(true);
533 		receiveThread.start();
534 	}
535 
registerMessageHandler(MessageHandler mh, int low, int high)536 	public void registerMessageHandler(MessageHandler mh, int low, int high)
537 	{
538 		HandlerEntry he = new HandlerEntry();
539 		he.mh = mh;
540 		he.low = low;
541 		he.high = high;
542 
543 		synchronized (messageHandlers)
544 		{
545 			messageHandlers.add(he);
546 		}
547 	}
548 
removeMessageHandler(MessageHandler mh, int low, int high)549 	public void removeMessageHandler(MessageHandler mh, int low, int high)
550 	{
551 		synchronized (messageHandlers)
552 		{
553 			for (int i = 0; i < messageHandlers.size(); i++)
554 			{
555 				HandlerEntry he = messageHandlers.get(i);
556 				if ((he.mh == mh) && (he.low == low) && (he.high == high))
557 				{
558 					messageHandlers.remove(i);
559 					break;
560 				}
561 			}
562 		}
563 	}
564 
sendKexMessage(byte[] msg)565 	public void sendKexMessage(byte[] msg) throws IOException
566 	{
567 		synchronized (connectionSemaphore)
568 		{
569 			if (connectionClosed)
570 			{
571 				throw (IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
572 			}
573 
574 			flagKexOngoing = true;
575 
576 			try
577 			{
578 				tc.sendMessage(msg);
579 			}
580 			catch (IOException e)
581 			{
582 				close(e, false);
583 				throw e;
584 			}
585 		}
586 	}
587 
kexFinished()588 	public void kexFinished() throws IOException
589 	{
590 		synchronized (connectionSemaphore)
591 		{
592 			flagKexOngoing = false;
593 			connectionSemaphore.notifyAll();
594 		}
595 	}
596 
forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex)597 	public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex) throws IOException
598 	{
599 		km.initiateKEX(cwl, dhgex);
600 	}
601 
changeRecvCipher(BlockCipher bc, MAC mac)602 	public void changeRecvCipher(BlockCipher bc, MAC mac)
603 	{
604 		tc.changeRecvCipher(bc, mac);
605 	}
606 
changeSendCipher(BlockCipher bc, MAC mac)607 	public void changeSendCipher(BlockCipher bc, MAC mac)
608 	{
609 		tc.changeSendCipher(bc, mac);
610 	}
611 
sendAsynchronousMessage(byte[] msg)612 	public void sendAsynchronousMessage(byte[] msg) throws IOException
613 	{
614 		synchronized (asynchronousQueue)
615 		{
616 			asynchronousQueue.add(msg);
617 
618 			/* This limit should be flexible enough. We need this, otherwise the peer
619 			 * can flood us with global requests (and other stuff where we have to reply
620 			 * with an asynchronous message) and (if the server just sends data and does not
621 			 * read what we send) this will probably put us in a low memory situation
622 			 * (our send queue would grow and grow and...) */
623 
624 			if (asynchronousQueue.size() > 100)
625 			{
626 				throw new IOException("Error: the peer is not consuming our asynchronous replies.");
627 			}
628 
629 			/* Check if we have an asynchronous sending thread */
630 
631 			if (asynchronousThread == null)
632 			{
633 				asynchronousThread = new AsynchronousWorker();
634 				asynchronousThread.setDaemon(true);
635 				asynchronousThread.start();
636 
637 				/* The thread will stop after 2 seconds of inactivity (i.e., empty queue) */
638 			}
639 		}
640 	}
641 
setConnectionMonitors(List<ConnectionMonitor> monitors)642 	public void setConnectionMonitors(List<ConnectionMonitor> monitors)
643 	{
644 		synchronized (this)
645 		{
646 			connectionMonitors = new Vector<ConnectionMonitor>();
647 			connectionMonitors.addAll(monitors);
648 		}
649 	}
650 
651 	/**
652 	 * True if no response message expected.
653 	 */
654 	private boolean idle;
655 
sendMessage(byte[] msg)656 	public void sendMessage(byte[] msg) throws IOException
657 	{
658 		if (Thread.currentThread() == receiveThread)
659 		{
660 			throw new IOException("Assertion error: sendMessage may never be invoked by the receiver thread!");
661 		}
662 
663 		boolean wasInterrupted = false;
664 
665 		try
666 		{
667 			synchronized (connectionSemaphore)
668 			{
669 				while (true)
670 				{
671 					if (connectionClosed)
672 					{
673 						throw (IOException) new IOException("Sorry, this connection is closed.")
674 								.initCause(reasonClosedCause);
675 					}
676 
677 					if (flagKexOngoing == false)
678 					{
679 						break;
680 					}
681 
682 					try
683 					{
684 						connectionSemaphore.wait();
685 					}
686 					catch (InterruptedException e)
687 					{
688 						wasInterrupted = true;
689 					}
690 				}
691 
692 				try
693 				{
694 					tc.sendMessage(msg);
695 					idle = false;
696 				}
697 				catch (IOException e)
698 				{
699 					close(e, false);
700 					throw e;
701 				}
702 			}
703 		}
704 		finally
705 		{
706 			if (wasInterrupted)
707 				Thread.currentThread().interrupt();
708 		}
709 	}
710 
receiveLoop()711 	public void receiveLoop() throws IOException
712 	{
713 		byte[] msg = new byte[35000];
714 
715 		while (true)
716 		{
717 			int msglen;
718 			try
719 			{
720 				msglen = tc.receiveMessage(msg, 0, msg.length);
721 			}
722 			catch (SocketTimeoutException e)
723 			{
724 				// Timeout in read
725 				if (idle)
726 				{
727 					log.debug("Ignoring socket timeout");
728 					continue;
729 				}
730 				throw e;
731 			}
732 			idle = true;
733 
734 			int type = msg[0] & 0xff;
735 
736 			if (type == Packets.SSH_MSG_IGNORE)
737 			{
738 				continue;
739 			}
740 
741 			if (type == Packets.SSH_MSG_DEBUG)
742 			{
743 				if (log.isDebugEnabled())
744 				{
745 					TypesReader tr = new TypesReader(msg, 0, msglen);
746 					tr.readByte();
747 					tr.readBoolean();
748 					StringBuilder debugMessageBuffer = new StringBuilder();
749 					debugMessageBuffer.append(tr.readString("UTF-8"));
750 
751 					for (int i = 0; i < debugMessageBuffer.length(); i++)
752 					{
753 						char c = debugMessageBuffer.charAt(i);
754 
755 						if ((c >= 32) && (c <= 126))
756 						{
757 							continue;
758 						}
759 						debugMessageBuffer.setCharAt(i, '\uFFFD');
760 					}
761 
762 					log.debug("DEBUG Message from remote: '" + debugMessageBuffer.toString() + "'");
763 				}
764 				continue;
765 			}
766 
767 			if (type == Packets.SSH_MSG_UNIMPLEMENTED)
768 			{
769 				throw new IOException("Peer sent UNIMPLEMENTED message, that should not happen.");
770 			}
771 
772 			if (type == Packets.SSH_MSG_DISCONNECT)
773 			{
774 				TypesReader tr = new TypesReader(msg, 0, msglen);
775 				tr.readByte();
776 				int reason_code = tr.readUINT32();
777 				StringBuilder reasonBuffer = new StringBuilder();
778 				reasonBuffer.append(tr.readString("UTF-8"));
779 
780 				/*
781 				 * Do not get fooled by servers that send abnormal long error
782 				 * messages
783 				 */
784 
785 				if (reasonBuffer.length() > 255)
786 				{
787 					reasonBuffer.setLength(255);
788 					reasonBuffer.setCharAt(254, '.');
789 					reasonBuffer.setCharAt(253, '.');
790 					reasonBuffer.setCharAt(252, '.');
791 				}
792 
793 				/*
794 				 * Also, check that the server did not send characters that may
795 				 * screw up the receiver -> restrict to reasonable US-ASCII
796 				 * subset -> "printable characters" (ASCII 32 - 126). Replace
797 				 * all others with 0xFFFD (UNICODE replacement character).
798 				 */
799 
800 				for (int i = 0; i < reasonBuffer.length(); i++)
801 				{
802 					char c = reasonBuffer.charAt(i);
803 
804 					if ((c >= 32) && (c <= 126))
805 					{
806 						continue;
807 					}
808 					reasonBuffer.setCharAt(i, '\uFFFD');
809 				}
810 
811 				throw new IOException("Peer sent DISCONNECT message (reason code " + reason_code + "): "
812 						+ reasonBuffer.toString());
813 			}
814 
815 			/*
816 			 * Is it a KEX Packet?
817 			 */
818 
819 			if ((type == Packets.SSH_MSG_KEXINIT) || (type == Packets.SSH_MSG_NEWKEYS)
820 					|| ((type >= 30) && (type <= 49)))
821 			{
822 				km.handleMessage(msg, msglen);
823 				continue;
824 			}
825 
826 			MessageHandler mh = null;
827 
828 			for (int i = 0; i < messageHandlers.size(); i++)
829 			{
830 				HandlerEntry he = messageHandlers.get(i);
831 				if ((he.low <= type) && (type <= he.high))
832 				{
833 					mh = he.mh;
834 					break;
835 				}
836 			}
837 
838 			if (mh == null)
839 			{
840 				throw new IOException("Unexpected SSH message (type " + type + ")");
841 			}
842 
843 			mh.handleMessage(msg, msglen);
844 		}
845 	}
846 }
847