• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Conditions Of Use
3  *
4  * This software was developed by employees of the National Institute of
5  * Standards and Technology (NIST), an agency of the Federal Government.
6  * Pursuant to title 15 Untied States Code Section 105, works of NIST
7  * employees are not subject to copyright protection in the United States
8  * and are considered to be in the public domain.  As a result, a formal
9  * license is not needed to use the software.
10  *
11  * This software is provided by NIST as a service and is expressly
12  * provided "AS IS."  NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
13  * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
14  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
15  * AND DATA ACCURACY.  NIST does not warrant or make any representations
16  * regarding the use of the software or the results thereof, including but
17  * not limited to the correctness, accuracy, reliability or usefulness of
18  * the software.
19  *
20  * Permission to use this software is contingent upon your acceptance
21  * of the terms of this agreement
22  *
23  * .
24  *
25  */
26 /******************************************************************************
27  * Product of NIST/ITL Advanced Networking Technologies Division (ANTD)       *
28  ******************************************************************************/
29 package gov.nist.javax.sip.parser;
30 
31 /*
32  *
33  * Lamine Brahimi and Yann Duponchel (IBM Zurich) noticed that the parser was
34  * blocking so I threw out some cool pipelining which ran fast but only worked
35  * when the phase of the moon matched its mood. Now things are serialized and
36  * life goes slower but more reliably.
37  *
38  */
39 import gov.nist.core.*;
40 import gov.nist.javax.sip.message.*;
41 import gov.nist.javax.sip.header.*;
42 import java.text.ParseException;
43 import java.io.*;
44 
45 /**
46  * This implements a pipelined message parser suitable for use with a stream -
47  * oriented input such as TCP. The client uses this class by instatiating with
48  * an input stream from which input is read and fed to a message parser. It
49  * keeps reading from the input stream and process messages in a never ending
50  * interpreter loop. The message listener interface gets called for processing
51  * messages or for processing errors. The payload specified by the
52  * content-length header is read directly from the input stream. This can be
53  * accessed from the SIPMessage using the getContent and getContentBytes methods
54  * provided by the SIPMessage class.
55  *
56  * @version 1.2 $Revision: 1.23 $ $Date: 2009/08/16 17:28:28 $
57  *
58  * @author M. Ranganathan
59  *
60  * @see SIPMessageListener
61  */
62 public final class PipelinedMsgParser implements Runnable {
63 
64 
65 
66     /**
67      * The message listener that is registered with this parser. (The message
68      * listener has methods that can process correct and erroneous messages.)
69      */
70     protected SIPMessageListener sipMessageListener;
71     private Thread mythread; // Preprocessor thread
72     //private byte[] messageBody;
73     //private boolean errorFlag;
74     private Pipeline rawInputStream;
75     private int maxMessageSize;
76     private int sizeCounter;
77     //private int messageSize;
78 
79     /**
80      * default constructor.
81      */
PipelinedMsgParser()82     protected PipelinedMsgParser() {
83         super();
84 
85     }
86 
87     private static int uid = 0;
88 
getNewUid()89     private static synchronized int getNewUid() {
90         return uid++;
91     }
92 
93     /**
94      * Constructor when we are given a message listener and an input stream
95      * (could be a TCP connection or a file)
96      *
97      * @param sipMessageListener
98      *            Message listener which has methods that get called back from
99      *            the parser when a parse is complete
100      * @param in
101      *            Input stream from which to read the input.
102      * @param debug
103      *            Enable/disable tracing or lexical analyser switch.
104      */
PipelinedMsgParser(SIPMessageListener sipMessageListener, Pipeline in, boolean debug, int maxMessageSize)105     public PipelinedMsgParser(SIPMessageListener sipMessageListener,
106             Pipeline in, boolean debug, int maxMessageSize) {
107         this();
108         this.sipMessageListener = sipMessageListener;
109         rawInputStream = in;
110         this.maxMessageSize = maxMessageSize;
111         mythread = new Thread(this);
112         mythread.setName("PipelineThread-" + getNewUid());
113 
114     }
115 
116     /**
117      * This is the constructor for the pipelined parser.
118      *
119      * @param mhandler
120      *            a SIPMessageListener implementation that provides the message
121      *            handlers to handle correctly and incorrectly parsed messages.
122      * @param in
123      *            An input stream to read messages from.
124      */
125 
PipelinedMsgParser(SIPMessageListener mhandler, Pipeline in, int maxMsgSize)126     public PipelinedMsgParser(SIPMessageListener mhandler, Pipeline in,
127             int maxMsgSize) {
128         this(mhandler, in, false, maxMsgSize);
129     }
130 
131     /**
132      * This is the constructor for the pipelined parser.
133      *
134      * @param in -
135      *            An input stream to read messages from.
136      */
137 
PipelinedMsgParser(Pipeline in)138     public PipelinedMsgParser(Pipeline in) {
139         this(null, in, false, 0);
140     }
141 
142     /**
143      * Start reading and processing input.
144      */
processInput()145     public void processInput() {
146         mythread.start();
147     }
148 
149     /**
150      * Create a new pipelined parser from an existing one.
151      *
152      * @return A new pipelined parser that reads from the same input stream.
153      */
clone()154     protected Object clone() {
155         PipelinedMsgParser p = new PipelinedMsgParser();
156 
157         p.rawInputStream = this.rawInputStream;
158         p.sipMessageListener = this.sipMessageListener;
159         Thread mythread = new Thread(p);
160         mythread.setName("PipelineThread");
161         return p;
162     }
163 
164     /**
165      * Add a class that implements a SIPMessageListener interface whose methods
166      * get called * on successful parse and error conditons.
167      *
168      * @param mlistener
169      *            a SIPMessageListener implementation that can react to correct
170      *            and incorrect pars.
171      */
172 
setMessageListener(SIPMessageListener mlistener)173     public void setMessageListener(SIPMessageListener mlistener) {
174         sipMessageListener = mlistener;
175     }
176 
177     /**
178      * read a line of input (I cannot use buffered reader because we may need to
179      * switch encodings mid-stream!
180      */
readLine(InputStream inputStream)181     private String readLine(InputStream inputStream) throws IOException {
182         StringBuffer retval = new StringBuffer("");
183         while (true) {
184             char ch;
185             int i = inputStream.read();
186             if (i == -1) {
187                 throw new IOException("End of stream");
188             } else
189                 ch = (char) i;
190             // reduce the available read size by 1 ("size" of a char).
191             if (this.maxMessageSize > 0) {
192                 this.sizeCounter--;
193                 if (this.sizeCounter <= 0)
194                     throw new IOException("Max size exceeded!");
195             }
196             if (ch != '\r')
197                 retval.append(ch);
198             if (ch == '\n') {
199                 break;
200             }
201         }
202         return retval.toString();
203     }
204 
205     /**
206      * This is input reading thread for the pipelined parser. You feed it input
207      * through the input stream (see the constructor) and it calls back an event
208      * listener interface for message processing or error. It cleans up the
209      * input - dealing with things like line continuation
210      */
run()211     public void run() {
212 
213         Pipeline inputStream = this.rawInputStream;
214         // inputStream = new MyFilterInputStream(this.rawInputStream);
215         // I cannot use buffered reader here because we may need to switch
216         // encodings to read the message body.
217         try {
218             while (true) {
219                 this.sizeCounter = this.maxMessageSize;
220                 // this.messageSize = 0;
221                 StringBuffer inputBuffer = new StringBuffer();
222 
223                 if (Debug.parserDebug)
224                     Debug.println("Starting parse!");
225 
226                 String line1;
227                 String line2 = null;
228 
229                 while (true) {
230                     try {
231                         line1 = readLine(inputStream);
232                         // ignore blank lines.
233                         if (line1.equals("\n")) {
234                             if (Debug.parserDebug) {
235                                 Debug.println("Discarding blank line. ");
236                             }
237                             continue;
238                         } else
239                             break;
240                     } catch (IOException ex) {
241                         Debug.printStackTrace(ex);
242                         this.rawInputStream.stopTimer();
243                         return;
244 
245                     }
246                 }
247 
248                 inputBuffer.append(line1);
249                 // Guard against bad guys.
250                 this.rawInputStream.startTimer();
251 
252                 Debug.println("Reading Input Stream");
253                 while (true) {
254                     try {
255                         line2 = readLine(inputStream);
256                         inputBuffer.append(line2);
257                         if (line2.trim().equals(""))
258                             break;
259                     } catch (IOException ex) {
260                         this.rawInputStream.stopTimer();
261                         Debug.printStackTrace(ex);
262                         return;
263 
264                     }
265                 }
266 
267                 // Stop the timer that will kill the read.
268                 this.rawInputStream.stopTimer();
269                 inputBuffer.append(line2);
270                 StringMsgParser smp = new StringMsgParser(sipMessageListener);
271                 smp.readBody = false;
272                 SIPMessage sipMessage = null;
273 
274                 try {
275                     if (Debug.debug) {
276                         Debug.println("About to parse : " + inputBuffer.toString());
277                     }
278                     sipMessage = smp.parseSIPMessage(inputBuffer.toString());
279                     if (sipMessage == null) {
280                         this.rawInputStream.stopTimer();
281                         continue;
282                     }
283                 } catch (ParseException ex) {
284                     // Just ignore the parse exception.
285                     Debug.logError("Detected a parse error", ex);
286                     continue;
287                 }
288 
289                 if (Debug.debug) {
290                     Debug.println("Completed parsing message");
291                 }
292                 ContentLength cl = (ContentLength) sipMessage
293                         .getContentLength();
294                 int contentLength = 0;
295                 if (cl != null) {
296                     contentLength = cl.getContentLength();
297                 } else {
298                     contentLength = 0;
299                 }
300 
301                 if (Debug.debug) {
302                     Debug.println("contentLength " + contentLength);
303                 }
304 
305                 if (contentLength == 0) {
306                     sipMessage.removeContent();
307                 } else if (maxMessageSize == 0
308                         || contentLength < this.sizeCounter) {
309                     byte[] message_body = new byte[contentLength];
310                     int nread = 0;
311                     while (nread < contentLength) {
312                         // Start my starvation timer.
313                         // This ensures that the other end
314                         // writes at least some data in
315                         // or we will close the pipe from
316                         // him. This prevents DOS attack
317                         // that takes up all our connections.
318                         this.rawInputStream.startTimer();
319                         try {
320 
321                             int readlength = inputStream.read(message_body,
322                                     nread, contentLength - nread);
323                             if (readlength > 0) {
324                                 nread += readlength;
325                             } else {
326                                 break;
327                             }
328                         } catch (IOException ex) {
329                             Debug.logError("Exception Reading Content",ex);
330                             break;
331                         } finally {
332                             // Stop my starvation timer.
333                             this.rawInputStream.stopTimer();
334                         }
335                     }
336                     sipMessage.setMessageContent(message_body);
337                 }
338                 // Content length too large - process the message and
339                 // return error from there.
340                 if (sipMessageListener != null) {
341                     try {
342                         sipMessageListener.processMessage(sipMessage);
343                     } catch (Exception ex) {
344                         // fatal error in processing - close the
345                         // connection.
346                         break;
347                     }
348                 }
349             }
350         } finally {
351             try {
352                 inputStream.close();
353             } catch (IOException e) {
354                 InternalErrorHandler.handleException(e);
355             }
356         }
357     }
358 
close()359     public void close() {
360         try {
361             this.rawInputStream.close();
362         } catch (IOException ex) {
363             // Ignore.
364         }
365     }
366 }
367 /*
368  * $Log: PipelinedMsgParser.java,v $
369  * Revision 1.23  2009/08/16 17:28:28  mranga
370  * Issue number:  208
371  * Obtained from:
372  * Submitted by:
373  * Reviewed by:
374  *
375  * Add authentication mechanism that uses H(username:domain:password)
376  *
377  * Revision 1.22  2009/07/17 18:58:02  emcho
378  * Converts indentation tabs to spaces so that we have a uniform indentation policy in the whole project.
379  *
380  * Revision 1.21  2008/05/24 04:10:01  mranga
381  *
382  * Issue number:   158
383  * Obtained from:
384  * Submitted by:
385  * Reviewed by:   mranga
386  *
387  * Deliver tx timeout for Canceled INVITE. Fix pipeline thread exit.
388  *
389  * Revision 1.20  2008/05/22 19:38:07  jbemmel
390  * Fix for issue 149: the logic wasn't always closing the internal socket pipe,
391  * causing the pipe reader thread to block indefinitely
392  *
393  * Repeatedly starting/stopping the stack then gives hanging threads
394  * Revision 1.19 2007/01/28 13:06:21 mranga
395  * Issue number: 99 Obtained from: Submitted by: Reviewed by: mranga
396  *
397  * Fixed PRACK handling null pointer exception (for proxy case) and cleanup of
398  * unused variables.
399  *
400  * CVS: ----------------------------------------------------------------------
401  * CVS: Issue number: CVS: If this change addresses one or more issues, CVS:
402  * then enter the issue number(s) here. CVS: Obtained from: CVS: If this change
403  * has been taken from another system, CVS: then name the system in this line,
404  * otherwise delete it. CVS: Submitted by: CVS: If this code has been
405  * contributed to the project by someone else; i.e., CVS: they sent us a patch
406  * or a set of diffs, then include their name/email CVS: address here. If this
407  * is your work then delete this line. CVS: Reviewed by: CVS: If we are doing
408  * pre-commit code reviews and someone else has CVS: reviewed your changes,
409  * include their name(s) here. CVS: If you have not had it reviewed then delete
410  * this line.
411  *
412  * Revision 1.18 2006/07/13 09:02:10 mranga Issue number: Obtained from:
413  * Submitted by: jeroen van bemmel Reviewed by: mranga Moved some changes from
414  * jain-sip-1.2 to java.net
415  *
416  * CVS: ----------------------------------------------------------------------
417  * CVS: Issue number: CVS: If this change addresses one or more issues, CVS:
418  * then enter the issue number(s) here. CVS: Obtained from: CVS: If this change
419  * has been taken from another system, CVS: then name the system in this line,
420  * otherwise delete it. CVS: Submitted by: CVS: If this code has been
421  * contributed to the project by someone else; i.e., CVS: they sent us a patch
422  * or a set of diffs, then include their name/email CVS: address here. If this
423  * is your work then delete this line. CVS: Reviewed by: CVS: If we are doing
424  * pre-commit code reviews and someone else has CVS: reviewed your changes,
425  * include their name(s) here. CVS: If you have not had it reviewed then delete
426  * this line.
427  *
428  * Revision 1.4 2006/06/19 06:47:27 mranga javadoc fixups
429  *
430  * Revision 1.3 2006/06/17 10:18:14 mranga Added some synchronization to the
431  * sequence number checking. Small javadoc fixups
432  *
433  * Revision 1.2 2006/06/16 15:26:28 mranga Added NIST disclaimer to all public
434  * domain files. Clean up some javadoc. Fixed a leak
435  *
436  * Revision 1.1.1.1 2005/10/04 17:12:35 mranga
437  *
438  * Import
439  *
440  *
441  * Revision 1.16 2004/11/30 23:28:14 mranga Issue number: 44 Submitted by: Rob
442  * Daugherty Reviewed by: M. Ranganathan
443  *
444  * TCP Pipelining truncates content when other end of pipe is closed.
445  *
446  * Revision 1.15 2004/05/30 18:55:56 mranga Reviewed by: mranga Move to timers
447  * and eliminate the Transaction scanner Thread to improve scalability and
448  * reduce cpu usage.
449  *
450  * Revision 1.14 2004/05/16 14:13:22 mranga Reviewed by: mranga Fixed the
451  * use-count issue reported by Peter Parnes. Added property to prevent against
452  * content-length dos attacks.
453  *
454  * Revision 1.13 2004/03/19 04:22:22 mranga Reviewed by: mranga Added IO Pacing
455  * for long writes - split write into chunks and flush after each chunk to avoid
456  * socket back pressure.
457  *
458  * Revision 1.12 2004/03/18 22:01:19 mranga Reviewed by: mranga Get rid of the
459  * PipedInputStream from pipelined parser to avoid a copy.
460  *
461  * Revision 1.11 2004/03/07 22:25:23 mranga Reviewed by: mranga Added a new
462  * configuration parameter that instructs the stack to drop a server connection
463  * after server transaction termination set
464  * gov.nist.javax.sip.CACHE_SERVER_CONNECTIONS=false for this Default behavior
465  * is true.
466  *
467  * Revision 1.10 2004/02/29 15:32:58 mranga Reviewed by: mranga bug fixes on
468  * limiting the max message size.
469  *
470  * Revision 1.9 2004/02/29 00:46:34 mranga Reviewed by: mranga Added new
471  * configuration property to limit max message size for TCP transport. The
472  * property is gov.nist.javax.sip.MAX_MESSAGE_SIZE
473  *
474  * Revision 1.8 2004/02/25 21:43:03 mranga Reviewed by: mranga Added a couple of
475  * todo's and removed some debug printlns that could slow code down by a bit.
476  *
477  * Revision 1.7 2004/02/25 20:52:46 mranga Reviewed by: mranga Fix TCP transport
478  * so messages in excess of 8192 bytes are accepted.
479  *
480  * Revision 1.6 2004/01/22 18:39:41 mranga Reviewed by: M. Ranganathan Moved the
481  * ifdef SIMULATION and associated tags to the first column so Prep preprocessor
482  * can deal with them.
483  *
484  * Revision 1.5 2004/01/22 14:23:45 mranga Reviewed by: mranga Fixed some minor
485  * formatting issues.
486  *
487  * Revision 1.4 2004/01/22 13:26:31 sverker Issue number: Obtained from:
488  * Submitted by: sverker Reviewed by: mranga
489  *
490  * Major reformat of code to conform with style guide. Resolved compiler and
491  * javadoc warnings. Added CVS tags.
492  *
493  * CVS: ----------------------------------------------------------------------
494  * CVS: Issue number: CVS: If this change addresses one or more issues, CVS:
495  * then enter the issue number(s) here. CVS: Obtained from: CVS: If this change
496  * has been taken from another system, CVS: then name the system in this line,
497  * otherwise delete it. CVS: Submitted by: CVS: If this code has been
498  * contributed to the project by someone else; i.e., CVS: they sent us a patch
499  * or a set of diffs, then include their name/email CVS: address here. If this
500  * is your work then delete this line. CVS: Reviewed by: CVS: If we are doing
501  * pre-commit code reviews and someone else has CVS: reviewed your changes,
502  * include their name(s) here. CVS: If you have not had it reviewed then delete
503  * this line.
504  *
505  */
506