1 /* 2 * Conditions Of Use 3 * 4 * This software was developed by employees of the National Institute of 5 * Standards and Technology (NIST), an agency of the Federal Government. 6 * Pursuant to title 15 Untied States Code Section 105, works of NIST 7 * employees are not subject to copyright protection in the United States 8 * and are considered to be in the public domain. As a result, a formal 9 * license is not needed to use the software. 10 * 11 * This software is provided by NIST as a service and is expressly 12 * provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED 13 * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF 14 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT 15 * AND DATA ACCURACY. NIST does not warrant or make any representations 16 * regarding the use of the software or the results thereof, including but 17 * not limited to the correctness, accuracy, reliability or usefulness of 18 * the software. 19 * 20 * Permission to use this software is contingent upon your acceptance 21 * of the terms of this agreement 22 * 23 * . 24 * 25 */ 26 /****************************************************************************** 27 * Product of NIST/ITL Advanced Networking Technologies Division (ANTD). * 28 ******************************************************************************/ 29 package gov.nist.javax.sip.stack; 30 31 import java.net.Socket; 32 import java.net.ServerSocket; 33 import java.io.IOException; 34 import java.net.SocketException; 35 import gov.nist.core.*; 36 import java.net.*; 37 import java.util.*; 38 39 /* 40 * Acknowledgement: Jeff Keyser suggested that a Stop mechanism be added to this. Niklas Uhrberg 41 * suggested that a means to limit the number of simultaneous active connections should be added. 42 * Mike Andrews suggested that the thread be accessible so as to implement clean stop using 43 * Thread.join(). Roger M. Persson contributed a bug fix for cleanup on stop(). 44 * 45 */ 46 47 /** 48 * Sit in a loop waiting for incoming tcp connections and start a new thread to handle each new 49 * connection. This is the active object that creates new TCP MessageChannels (one for each new 50 * accept socket). 51 * 52 * @version 1.2 $Revision: 1.31 $ $Date: 2009/08/31 16:18:00 $ 53 * 54 * @author M. Ranganathan <br/> 55 * 56 * 57 */ 58 public class TCPMessageProcessor extends MessageProcessor { 59 60 protected int nConnections; 61 62 private boolean isRunning; 63 64 private Hashtable tcpMessageChannels; 65 66 private ArrayList<TCPMessageChannel> incomingTcpMessageChannels; 67 68 private ServerSocket sock; 69 70 protected int useCount; 71 72 /** 73 * Constructor. 74 * 75 * @param sipStack SIPStack structure. 76 * @param port port where this message processor listens. 77 */ TCPMessageProcessor(InetAddress ipAddress, SIPTransactionStack sipStack, int port)78 protected TCPMessageProcessor(InetAddress ipAddress, SIPTransactionStack sipStack, int port) { 79 super(ipAddress, port, "tcp",sipStack); 80 81 this.sipStack = sipStack; 82 83 this.tcpMessageChannels = new Hashtable(); 84 this.incomingTcpMessageChannels = new ArrayList<TCPMessageChannel>(); 85 } 86 87 /** 88 * Start the processor. 89 */ start()90 public void start() throws IOException { 91 Thread thread = new Thread(this); 92 thread.setName("TCPMessageProcessorThread"); 93 thread.setPriority(Thread.MAX_PRIORITY); 94 thread.setDaemon(true); 95 this.sock = sipStack.getNetworkLayer().createServerSocket(getPort(), 0, getIpAddress()); 96 if (getIpAddress().getHostAddress().equals(IN_ADDR_ANY) 97 || getIpAddress().getHostAddress().equals(IN6_ADDR_ANY)) { 98 // Store the address to which we are actually bound 99 super.setIpAddress(sock.getInetAddress()); 100 101 } 102 this.isRunning = true; 103 thread.start(); 104 105 } 106 107 /** 108 * Run method for the thread that gets created for each accept socket. 109 */ run()110 public void run() { 111 // Accept new connectins on our socket. 112 while (this.isRunning) { 113 try { 114 synchronized (this) { 115 // sipStack.maxConnections == -1 means we are 116 // willing to handle an "infinite" number of 117 // simultaneous connections (no resource limitation). 118 // This is the default behavior. 119 while (sipStack.maxConnections != -1 120 && this.nConnections >= sipStack.maxConnections) { 121 try { 122 this.wait(); 123 124 if (!this.isRunning) 125 return; 126 } catch (InterruptedException ex) { 127 break; 128 } 129 } 130 this.nConnections++; 131 } 132 133 Socket newsock = sock.accept(); 134 if (sipStack.isLoggingEnabled()) { 135 getSIPStack().getStackLogger().logDebug("Accepting new connection!"); 136 } 137 // Note that for an incoming message channel, the 138 // thread is already running 139 140 incomingTcpMessageChannels.add(new TCPMessageChannel(newsock, sipStack, this)); 141 } catch (SocketException ex) { 142 this.isRunning = false; 143 } catch (IOException ex) { 144 // Problem accepting connection. 145 if (sipStack.isLoggingEnabled()) 146 getSIPStack().getStackLogger().logException(ex); 147 continue; 148 } catch (Exception ex) { 149 InternalErrorHandler.handleException(ex); 150 } 151 } 152 } 153 154 /** 155 * Return the transport string. 156 * 157 * @return the transport string 158 */ getTransport()159 public String getTransport() { 160 return "tcp"; 161 } 162 163 /** 164 * Returns the stack. 165 * 166 * @return my sip stack. 167 */ getSIPStack()168 public SIPTransactionStack getSIPStack() { 169 return sipStack; 170 } 171 172 /** 173 * Stop the message processor. Feature suggested by Jeff Keyser. 174 */ stop()175 public synchronized void stop() { 176 isRunning = false; 177 // this.listeningPoint = null; 178 try { 179 sock.close(); 180 } catch (IOException e) { 181 e.printStackTrace(); 182 } 183 184 Collection en = tcpMessageChannels.values(); 185 for (Iterator it = en.iterator(); it.hasNext();) { 186 TCPMessageChannel next = (TCPMessageChannel) it.next(); 187 next.close(); 188 } 189 // RRPN: fix 190 for (Iterator incomingMCIterator = incomingTcpMessageChannels.iterator(); incomingMCIterator 191 .hasNext();) { 192 TCPMessageChannel next = (TCPMessageChannel) incomingMCIterator.next(); 193 next.close(); 194 } 195 196 this.notify(); 197 } 198 remove(TCPMessageChannel tcpMessageChannel)199 protected synchronized void remove(TCPMessageChannel tcpMessageChannel) { 200 201 String key = tcpMessageChannel.getKey(); 202 if (sipStack.isLoggingEnabled()) { 203 sipStack.getStackLogger().logDebug(Thread.currentThread() + " removing " + key); 204 } 205 206 /** May have been removed already */ 207 if (tcpMessageChannels.get(key) == tcpMessageChannel) { 208 this.tcpMessageChannels.remove(key); 209 } 210 211 incomingTcpMessageChannels.remove(tcpMessageChannel); 212 } 213 createMessageChannel(HostPort targetHostPort)214 public synchronized MessageChannel createMessageChannel(HostPort targetHostPort) 215 throws IOException { 216 String key = MessageChannel.getKey(targetHostPort, "TCP"); 217 if (tcpMessageChannels.get(key) != null) { 218 return (TCPMessageChannel) this.tcpMessageChannels.get(key); 219 } else { 220 TCPMessageChannel retval = new TCPMessageChannel(targetHostPort.getInetAddress(), 221 targetHostPort.getPort(), sipStack, this); 222 this.tcpMessageChannels.put(key, retval); 223 retval.isCached = true; 224 if (sipStack.isLoggingEnabled()) { 225 sipStack.getStackLogger().logDebug("key " + key); 226 sipStack.getStackLogger().logDebug("Creating " + retval); 227 } 228 return retval; 229 } 230 } 231 cacheMessageChannel(TCPMessageChannel messageChannel)232 protected synchronized void cacheMessageChannel(TCPMessageChannel messageChannel) { 233 String key = messageChannel.getKey(); 234 TCPMessageChannel currentChannel = (TCPMessageChannel) tcpMessageChannels.get(key); 235 if (currentChannel != null) { 236 if (sipStack.isLoggingEnabled()) 237 sipStack.getStackLogger().logDebug("Closing " + key); 238 currentChannel.close(); 239 } 240 if (sipStack.isLoggingEnabled()) 241 sipStack.getStackLogger().logDebug("Caching " + key); 242 this.tcpMessageChannels.put(key, messageChannel); 243 244 } 245 createMessageChannel(InetAddress host, int port)246 public synchronized MessageChannel createMessageChannel(InetAddress host, int port) 247 throws IOException { 248 try { 249 String key = MessageChannel.getKey(host, port, "TCP"); 250 if (tcpMessageChannels.get(key) != null) { 251 return (TCPMessageChannel) this.tcpMessageChannels.get(key); 252 } else { 253 TCPMessageChannel retval = new TCPMessageChannel(host, port, sipStack, this); 254 this.tcpMessageChannels.put(key, retval); 255 retval.isCached = true; 256 if (sipStack.isLoggingEnabled()) { 257 sipStack.getStackLogger().logDebug("key " + key); 258 sipStack.getStackLogger().logDebug("Creating " + retval); 259 } 260 return retval; 261 } 262 } catch (UnknownHostException ex) { 263 throw new IOException(ex.getMessage()); 264 } 265 } 266 267 /** 268 * TCP can handle an unlimited number of bytes. 269 */ getMaximumMessageSize()270 public int getMaximumMessageSize() { 271 return Integer.MAX_VALUE; 272 } 273 inUse()274 public boolean inUse() { 275 return this.useCount != 0; 276 } 277 278 /** 279 * Default target port for TCP 280 */ getDefaultTargetPort()281 public int getDefaultTargetPort() { 282 return 5060; 283 } 284 285 /** 286 * TCP is not a secure protocol. 287 */ isSecure()288 public boolean isSecure() { 289 return false; 290 } 291 } 292