• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Conditions Of Use
3  *
4  * This software was developed by employees of the National Institute of
5  * Standards and Technology (NIST), an agency of the Federal Government.
6  * Pursuant to title 15 Untied States Code Section 105, works of NIST
7  * employees are not subject to copyright protection in the United States
8  * and are considered to be in the public domain.  As a result, a formal
9  * license is not needed to use the software.
10  *
11  * This software is provided by NIST as a service and is expressly
12  * provided "AS IS."  NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
13  * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
14  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
15  * AND DATA ACCURACY.  NIST does not warrant or make any representations
16  * regarding the use of the software or the results thereof, including but
17  * not limited to the correctness, accuracy, reliability or usefulness of
18  * the software.
19  *
20  * Permission to use this software is contingent upon your acceptance
21  * of the terms of this agreement
22  *
23  * .
24  *
25  */
26 /******************************************************************************
27  * Product of NIST/ITL Advanced Networking Technologies Division (ANTD).      *
28  ******************************************************************************/
29 package gov.nist.javax.sip.stack;
30 
31 import java.net.Socket;
32 import java.net.ServerSocket;
33 import java.io.IOException;
34 import java.net.SocketException;
35 import gov.nist.core.*;
36 import java.net.*;
37 import java.util.*;
38 
39 /*
40  * Acknowledgement: Jeff Keyser suggested that a Stop mechanism be added to this. Niklas Uhrberg
41  * suggested that a means to limit the number of simultaneous active connections should be added.
42  * Mike Andrews suggested that the thread be accessible so as to implement clean stop using
43  * Thread.join(). Roger M. Persson contributed a bug fix for cleanup on stop().
44  *
45  */
46 
47 /**
48  * Sit in a loop waiting for incoming tcp connections and start a new thread to handle each new
49  * connection. This is the active object that creates new TCP MessageChannels (one for each new
50  * accept socket).
51  *
52  * @version 1.2 $Revision: 1.31 $ $Date: 2009/08/31 16:18:00 $
53  *
54  * @author M. Ranganathan <br/>
55  *
56  *
57  */
58 public class TCPMessageProcessor extends MessageProcessor {
59 
60     protected int nConnections;
61 
62     private boolean isRunning;
63 
64     private Hashtable tcpMessageChannels;
65 
66     private ArrayList<TCPMessageChannel> incomingTcpMessageChannels;
67 
68     private ServerSocket sock;
69 
70     protected int useCount;
71 
72     /**
73      * Constructor.
74      *
75      * @param sipStack SIPStack structure.
76      * @param port port where this message processor listens.
77      */
TCPMessageProcessor(InetAddress ipAddress, SIPTransactionStack sipStack, int port)78     protected TCPMessageProcessor(InetAddress ipAddress, SIPTransactionStack sipStack, int port) {
79         super(ipAddress, port, "tcp",sipStack);
80 
81         this.sipStack = sipStack;
82 
83         this.tcpMessageChannels = new Hashtable();
84         this.incomingTcpMessageChannels = new ArrayList<TCPMessageChannel>();
85     }
86 
87     /**
88      * Start the processor.
89      */
start()90     public void start() throws IOException {
91         Thread thread = new Thread(this);
92         thread.setName("TCPMessageProcessorThread");
93         thread.setPriority(Thread.MAX_PRIORITY);
94         thread.setDaemon(true);
95         this.sock = sipStack.getNetworkLayer().createServerSocket(getPort(), 0, getIpAddress());
96         if (getIpAddress().getHostAddress().equals(IN_ADDR_ANY)
97                 || getIpAddress().getHostAddress().equals(IN6_ADDR_ANY)) {
98             // Store the address to which we are actually bound
99             super.setIpAddress(sock.getInetAddress());
100 
101         }
102         this.isRunning = true;
103         thread.start();
104 
105     }
106 
107     /**
108      * Run method for the thread that gets created for each accept socket.
109      */
run()110     public void run() {
111         // Accept new connectins on our socket.
112         while (this.isRunning) {
113             try {
114                 synchronized (this) {
115                     // sipStack.maxConnections == -1 means we are
116                     // willing to handle an "infinite" number of
117                     // simultaneous connections (no resource limitation).
118                     // This is the default behavior.
119                     while (sipStack.maxConnections != -1
120                             && this.nConnections >= sipStack.maxConnections) {
121                         try {
122                             this.wait();
123 
124                             if (!this.isRunning)
125                                 return;
126                         } catch (InterruptedException ex) {
127                             break;
128                         }
129                     }
130                     this.nConnections++;
131                 }
132 
133                 Socket newsock = sock.accept();
134                 if (sipStack.isLoggingEnabled()) {
135                     getSIPStack().getStackLogger().logDebug("Accepting new connection!");
136                 }
137                 // Note that for an incoming message channel, the
138                 // thread is already running
139 
140                 incomingTcpMessageChannels.add(new TCPMessageChannel(newsock, sipStack, this));
141             } catch (SocketException ex) {
142                 this.isRunning = false;
143             } catch (IOException ex) {
144                 // Problem accepting connection.
145                 if (sipStack.isLoggingEnabled())
146                     getSIPStack().getStackLogger().logException(ex);
147                 continue;
148             } catch (Exception ex) {
149                 InternalErrorHandler.handleException(ex);
150             }
151         }
152     }
153 
154     /**
155      * Return the transport string.
156      *
157      * @return the transport string
158      */
getTransport()159     public String getTransport() {
160         return "tcp";
161     }
162 
163     /**
164      * Returns the stack.
165      *
166      * @return my sip stack.
167      */
getSIPStack()168     public SIPTransactionStack getSIPStack() {
169         return sipStack;
170     }
171 
172     /**
173      * Stop the message processor. Feature suggested by Jeff Keyser.
174      */
stop()175     public synchronized void stop() {
176         isRunning = false;
177         // this.listeningPoint = null;
178         try {
179             sock.close();
180         } catch (IOException e) {
181             e.printStackTrace();
182         }
183 
184         Collection en = tcpMessageChannels.values();
185         for (Iterator it = en.iterator(); it.hasNext();) {
186             TCPMessageChannel next = (TCPMessageChannel) it.next();
187             next.close();
188         }
189         // RRPN: fix
190         for (Iterator incomingMCIterator = incomingTcpMessageChannels.iterator(); incomingMCIterator
191                 .hasNext();) {
192             TCPMessageChannel next = (TCPMessageChannel) incomingMCIterator.next();
193             next.close();
194         }
195 
196         this.notify();
197     }
198 
remove(TCPMessageChannel tcpMessageChannel)199     protected synchronized void remove(TCPMessageChannel tcpMessageChannel) {
200 
201         String key = tcpMessageChannel.getKey();
202         if (sipStack.isLoggingEnabled()) {
203             sipStack.getStackLogger().logDebug(Thread.currentThread() + " removing " + key);
204         }
205 
206         /** May have been removed already */
207         if (tcpMessageChannels.get(key) == tcpMessageChannel) {
208             this.tcpMessageChannels.remove(key);
209         }
210 
211         incomingTcpMessageChannels.remove(tcpMessageChannel);
212     }
213 
createMessageChannel(HostPort targetHostPort)214     public synchronized MessageChannel createMessageChannel(HostPort targetHostPort)
215             throws IOException {
216         String key = MessageChannel.getKey(targetHostPort, "TCP");
217         if (tcpMessageChannels.get(key) != null) {
218             return (TCPMessageChannel) this.tcpMessageChannels.get(key);
219         } else {
220             TCPMessageChannel retval = new TCPMessageChannel(targetHostPort.getInetAddress(),
221                     targetHostPort.getPort(), sipStack, this);
222             this.tcpMessageChannels.put(key, retval);
223             retval.isCached = true;
224             if (sipStack.isLoggingEnabled()) {
225                 sipStack.getStackLogger().logDebug("key " + key);
226                 sipStack.getStackLogger().logDebug("Creating " + retval);
227             }
228             return retval;
229         }
230     }
231 
cacheMessageChannel(TCPMessageChannel messageChannel)232     protected synchronized void cacheMessageChannel(TCPMessageChannel messageChannel) {
233         String key = messageChannel.getKey();
234         TCPMessageChannel currentChannel = (TCPMessageChannel) tcpMessageChannels.get(key);
235         if (currentChannel != null) {
236             if (sipStack.isLoggingEnabled())
237                 sipStack.getStackLogger().logDebug("Closing " + key);
238             currentChannel.close();
239         }
240         if (sipStack.isLoggingEnabled())
241             sipStack.getStackLogger().logDebug("Caching " + key);
242         this.tcpMessageChannels.put(key, messageChannel);
243 
244     }
245 
createMessageChannel(InetAddress host, int port)246     public synchronized MessageChannel createMessageChannel(InetAddress host, int port)
247             throws IOException {
248         try {
249             String key = MessageChannel.getKey(host, port, "TCP");
250             if (tcpMessageChannels.get(key) != null) {
251                 return (TCPMessageChannel) this.tcpMessageChannels.get(key);
252             } else {
253                 TCPMessageChannel retval = new TCPMessageChannel(host, port, sipStack, this);
254                 this.tcpMessageChannels.put(key, retval);
255                 retval.isCached = true;
256                 if (sipStack.isLoggingEnabled()) {
257                     sipStack.getStackLogger().logDebug("key " + key);
258                     sipStack.getStackLogger().logDebug("Creating " + retval);
259                 }
260                 return retval;
261             }
262         } catch (UnknownHostException ex) {
263             throw new IOException(ex.getMessage());
264         }
265     }
266 
267     /**
268      * TCP can handle an unlimited number of bytes.
269      */
getMaximumMessageSize()270     public int getMaximumMessageSize() {
271         return Integer.MAX_VALUE;
272     }
273 
inUse()274     public boolean inUse() {
275         return this.useCount != 0;
276     }
277 
278     /**
279      * Default target port for TCP
280      */
getDefaultTargetPort()281     public int getDefaultTargetPort() {
282         return 5060;
283     }
284 
285     /**
286      * TCP is not a secure protocol.
287      */
isSecure()288     public boolean isSecure() {
289         return false;
290     }
291 }
292