1 /* 2 * Conditions Of Use 3 * 4 * This software was developed by employees of the National Institute of 5 * Standards and Technology (NIST), an agency of the Federal Government. 6 * Pursuant to title 15 Untied States Code Section 105, works of NIST 7 * employees are not subject to copyright protection in the United States 8 * and are considered to be in the public domain. As a result, a formal 9 * license is not needed to use the software. 10 * 11 * This software is provided by NIST as a service and is expressly 12 * provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED 13 * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF 14 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT 15 * AND DATA ACCURACY. NIST does not warrant or make any representations 16 * regarding the use of the software or the results thereof, including but 17 * not limited to the correctness, accuracy, reliability or usefulness of 18 * the software. 19 * 20 * Permission to use this software is contingent upon your acceptance 21 * of the terms of this agreement 22 * 23 * . 24 * 25 */ 26 package gov.nist.javax.sip.parser; 27 28 import gov.nist.core.InternalErrorHandler; 29 import gov.nist.javax.sip.stack.SIPStackTimerTask; 30 31 import java.io.*; 32 import java.util.*; 33 34 /** 35 * Input class for the pipelined parser. Buffer all bytes read from the socket 36 * and make them available to the message parser. 37 * 38 * @author M. Ranganathan (Contains a bug fix contributed by Rob Daugherty ( 39 * Lucent Technologies) ) 40 * 41 */ 42 43 public class Pipeline extends InputStream { 44 private LinkedList buffList; 45 46 private Buffer currentBuffer; 47 48 private boolean isClosed; 49 50 private Timer timer; 51 52 private InputStream pipe; 53 54 private int readTimeout; 55 56 private TimerTask myTimerTask; 57 58 class MyTimer extends SIPStackTimerTask { 59 Pipeline pipeline; 60 61 private boolean isCancelled; 62 MyTimer(Pipeline pipeline)63 protected MyTimer(Pipeline pipeline) { 64 this.pipeline = pipeline; 65 } 66 runTask()67 protected void runTask() { 68 if (this.isCancelled) 69 return; 70 71 try { 72 pipeline.close(); 73 } catch (IOException ex) { 74 InternalErrorHandler.handleException(ex); 75 } 76 } 77 cancel()78 public boolean cancel() { 79 boolean retval = super.cancel(); 80 this.isCancelled = true; 81 return retval; 82 } 83 84 } 85 86 class Buffer { 87 byte[] bytes; 88 89 int length; 90 91 int ptr; 92 Buffer(byte[] bytes, int length)93 public Buffer(byte[] bytes, int length) { 94 ptr = 0; 95 this.length = length; 96 this.bytes = bytes; 97 } 98 getNextByte()99 public int getNextByte() { 100 int retval = bytes[ptr++] & 0xFF; 101 return retval; 102 } 103 104 } 105 startTimer()106 public void startTimer() { 107 if (this.readTimeout == -1) 108 return; 109 // TODO make this a tunable number. For now 4 seconds 110 // between reads seems reasonable upper limit. 111 this.myTimerTask = new MyTimer(this); 112 this.timer.schedule(this.myTimerTask, this.readTimeout); 113 } 114 stopTimer()115 public void stopTimer() { 116 if (this.readTimeout == -1) 117 return; 118 if (this.myTimerTask != null) 119 this.myTimerTask.cancel(); 120 } 121 Pipeline(InputStream pipe, int readTimeout, Timer timer)122 public Pipeline(InputStream pipe, int readTimeout, Timer timer) { 123 // pipe is the Socket stream 124 // this is recorded here to implement a timeout. 125 this.timer = timer; 126 this.pipe = pipe; 127 buffList = new LinkedList(); 128 this.readTimeout = readTimeout; 129 } 130 write(byte[] bytes, int start, int length)131 public void write(byte[] bytes, int start, int length) throws IOException { 132 if (this.isClosed) 133 throw new IOException("Closed!!"); 134 Buffer buff = new Buffer(bytes, length); 135 buff.ptr = start; 136 synchronized (this.buffList) { 137 buffList.add(buff); 138 buffList.notifyAll(); 139 } 140 } 141 write(byte[] bytes)142 public void write(byte[] bytes) throws IOException { 143 if (this.isClosed) 144 throw new IOException("Closed!!"); 145 Buffer buff = new Buffer(bytes, bytes.length); 146 synchronized (this.buffList) { 147 buffList.add(buff); 148 buffList.notifyAll(); 149 } 150 } 151 close()152 public void close() throws IOException { 153 this.isClosed = true; 154 synchronized (this.buffList) { 155 this.buffList.notifyAll(); 156 } 157 158 // JvB: added 159 this.pipe.close(); 160 } 161 read()162 public int read() throws IOException { 163 // if (this.isClosed) return -1; 164 synchronized (this.buffList) { 165 if (currentBuffer != null 166 && currentBuffer.ptr < currentBuffer.length) { 167 int retval = currentBuffer.getNextByte(); 168 if (currentBuffer.ptr == currentBuffer.length) 169 this.currentBuffer = null; 170 return retval; 171 } 172 // Bug fix contributed by Rob Daugherty. 173 if (this.isClosed && this.buffList.isEmpty()) 174 return -1; 175 try { 176 // wait till something is posted. 177 while (this.buffList.isEmpty()) { 178 this.buffList.wait(); 179 if (this.isClosed) 180 return -1; 181 } 182 currentBuffer = (Buffer) this.buffList.removeFirst(); 183 int retval = currentBuffer.getNextByte(); 184 if (currentBuffer.ptr == currentBuffer.length) 185 this.currentBuffer = null; 186 return retval; 187 } catch (InterruptedException ex) { 188 throw new IOException(ex.getMessage()); 189 } catch (NoSuchElementException ex) { 190 ex.printStackTrace(); 191 throw new IOException(ex.getMessage()); 192 } 193 } 194 } 195 196 } 197