• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
3  * you may not use this file except in compliance with the License.
4  * You may obtain a copy of the License at
5  *
6  *     http://www.apache.org/licenses/LICENSE-2.0
7  *
8  * Unless required by applicable law or agreed to in writing, software
9  * distributed under the License is distributed on an "AS IS" BASIS,
10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  * See the License for the specific language governing permissions and
12  * limitations under the License.
13  */
14 package org.jivesoftware.smackx.bytestreams.ibb;
15 
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.net.SocketTimeoutException;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.TimeUnit;
23 
24 import org.jivesoftware.smack.Connection;
25 import org.jivesoftware.smack.PacketListener;
26 import org.jivesoftware.smack.XMPPException;
27 import org.jivesoftware.smack.filter.AndFilter;
28 import org.jivesoftware.smack.filter.PacketFilter;
29 import org.jivesoftware.smack.filter.PacketTypeFilter;
30 import org.jivesoftware.smack.packet.IQ;
31 import org.jivesoftware.smack.packet.Message;
32 import org.jivesoftware.smack.packet.Packet;
33 import org.jivesoftware.smack.packet.PacketExtension;
34 import org.jivesoftware.smack.packet.XMPPError;
35 import org.jivesoftware.smack.util.StringUtils;
36 import org.jivesoftware.smack.util.SyncPacketSend;
37 import org.jivesoftware.smackx.bytestreams.BytestreamSession;
38 import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
39 import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
40 import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
41 import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
42 
43 /**
44  * InBandBytestreamSession class represents an In-Band Bytestream session.
45  * <p>
46  * In-band bytestreams are bidirectional and this session encapsulates the streams for both
47  * directions.
48  * <p>
49  * Note that closing the In-Band Bytestream session will close both streams. If both streams are
50  * closed individually the session will be closed automatically once the second stream is closed.
51  * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
52  * automatically if one of them is closed.
53  *
54  * @author Henning Staib
55  */
56 public class InBandBytestreamSession implements BytestreamSession {
57 
58     /* XMPP connection */
59     private final Connection connection;
60 
61     /* the In-Band Bytestream open request for this session */
62     private final Open byteStreamRequest;
63 
64     /*
65      * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
66      */
67     private IBBInputStream inputStream;
68 
69     /*
70      * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
71      */
72     private IBBOutputStream outputStream;
73 
74     /* JID of the remote peer */
75     private String remoteJID;
76 
77     /* flag to close both streams if one of them is closed */
78     private boolean closeBothStreamsEnabled = false;
79 
80     /* flag to indicate if session is closed */
81     private boolean isClosed = false;
82 
83     /**
84      * Constructor.
85      *
86      * @param connection the XMPP connection
87      * @param byteStreamRequest the In-Band Bytestream open request for this session
88      * @param remoteJID JID of the remote peer
89      */
InBandBytestreamSession(Connection connection, Open byteStreamRequest, String remoteJID)90     protected InBandBytestreamSession(Connection connection, Open byteStreamRequest,
91                     String remoteJID) {
92         this.connection = connection;
93         this.byteStreamRequest = byteStreamRequest;
94         this.remoteJID = remoteJID;
95 
96         // initialize streams dependent to the uses stanza type
97         switch (byteStreamRequest.getStanza()) {
98         case IQ:
99             this.inputStream = new IQIBBInputStream();
100             this.outputStream = new IQIBBOutputStream();
101             break;
102         case MESSAGE:
103             this.inputStream = new MessageIBBInputStream();
104             this.outputStream = new MessageIBBOutputStream();
105             break;
106         }
107 
108     }
109 
getInputStream()110     public InputStream getInputStream() {
111         return this.inputStream;
112     }
113 
getOutputStream()114     public OutputStream getOutputStream() {
115         return this.outputStream;
116     }
117 
getReadTimeout()118     public int getReadTimeout() {
119         return this.inputStream.readTimeout;
120     }
121 
setReadTimeout(int timeout)122     public void setReadTimeout(int timeout) {
123         if (timeout < 0) {
124             throw new IllegalArgumentException("Timeout must be >= 0");
125         }
126         this.inputStream.readTimeout = timeout;
127     }
128 
129     /**
130      * Returns whether both streams should be closed automatically if one of the streams is closed.
131      * Default is <code>false</code>.
132      *
133      * @return <code>true</code> if both streams will be closed if one of the streams is closed,
134      *         <code>false</code> if both streams can be closed independently.
135      */
isCloseBothStreamsEnabled()136     public boolean isCloseBothStreamsEnabled() {
137         return closeBothStreamsEnabled;
138     }
139 
140     /**
141      * Sets whether both streams should be closed automatically if one of the streams is closed.
142      * Default is <code>false</code>.
143      *
144      * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
145      *        the streams is closed, <code>false</code> if both streams should be closed
146      *        independently
147      */
setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled)148     public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
149         this.closeBothStreamsEnabled = closeBothStreamsEnabled;
150     }
151 
close()152     public void close() throws IOException {
153         closeByLocal(true); // close input stream
154         closeByLocal(false); // close output stream
155     }
156 
157     /**
158      * This method is invoked if a request to close the In-Band Bytestream has been received.
159      *
160      * @param closeRequest the close request from the remote peer
161      */
closeByPeer(Close closeRequest)162     protected void closeByPeer(Close closeRequest) {
163 
164         /*
165          * close streams without flushing them, because stream is already considered closed on the
166          * remote peers side
167          */
168         this.inputStream.closeInternal();
169         this.inputStream.cleanup();
170         this.outputStream.closeInternal(false);
171 
172         // acknowledge close request
173         IQ confirmClose = IQ.createResultIQ(closeRequest);
174         this.connection.sendPacket(confirmClose);
175 
176     }
177 
178     /**
179      * This method is invoked if one of the streams has been closed locally, if an error occurred
180      * locally or if the whole session should be closed.
181      *
182      * @throws IOException if an error occurs while sending the close request
183      */
closeByLocal(boolean in)184     protected synchronized void closeByLocal(boolean in) throws IOException {
185         if (this.isClosed) {
186             return;
187         }
188 
189         if (this.closeBothStreamsEnabled) {
190             this.inputStream.closeInternal();
191             this.outputStream.closeInternal(true);
192         }
193         else {
194             if (in) {
195                 this.inputStream.closeInternal();
196             }
197             else {
198                 // close stream but try to send any data left
199                 this.outputStream.closeInternal(true);
200             }
201         }
202 
203         if (this.inputStream.isClosed && this.outputStream.isClosed) {
204             this.isClosed = true;
205 
206             // send close request
207             Close close = new Close(this.byteStreamRequest.getSessionID());
208             close.setTo(this.remoteJID);
209             try {
210                 SyncPacketSend.getReply(this.connection, close);
211             }
212             catch (XMPPException e) {
213                 throw new IOException("Error while closing stream: " + e.getMessage());
214             }
215 
216             this.inputStream.cleanup();
217 
218             // remove session from manager
219             InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
220         }
221 
222     }
223 
224     /**
225      * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
226      * Subclasses of this input stream must provide a packet listener along with a packet filter to
227      * collect the In-Band Bytestream data packets.
228      */
229     private abstract class IBBInputStream extends InputStream {
230 
231         /* the data packet listener to fill the data queue */
232         private final PacketListener dataPacketListener;
233 
234         /* queue containing received In-Band Bytestream data packets */
235         protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
236 
237         /* buffer containing the data from one data packet */
238         private byte[] buffer;
239 
240         /* pointer to the next byte to read from buffer */
241         private int bufferPointer = -1;
242 
243         /* data packet sequence (range from 0 to 65535) */
244         private long seq = -1;
245 
246         /* flag to indicate if input stream is closed */
247         private boolean isClosed = false;
248 
249         /* flag to indicate if close method was invoked */
250         private boolean closeInvoked = false;
251 
252         /* timeout for read operations */
253         private int readTimeout = 0;
254 
255         /**
256          * Constructor.
257          */
IBBInputStream()258         public IBBInputStream() {
259             // add data packet listener to connection
260             this.dataPacketListener = getDataPacketListener();
261             connection.addPacketListener(this.dataPacketListener, getDataPacketFilter());
262         }
263 
264         /**
265          * Returns the packet listener that processes In-Band Bytestream data packets.
266          *
267          * @return the data packet listener
268          */
getDataPacketListener()269         protected abstract PacketListener getDataPacketListener();
270 
271         /**
272          * Returns the packet filter that accepts In-Band Bytestream data packets.
273          *
274          * @return the data packet filter
275          */
getDataPacketFilter()276         protected abstract PacketFilter getDataPacketFilter();
277 
read()278         public synchronized int read() throws IOException {
279             checkClosed();
280 
281             // if nothing read yet or whole buffer has been read fill buffer
282             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
283                 // if no data available and stream was closed return -1
284                 if (!loadBuffer()) {
285                     return -1;
286                 }
287             }
288 
289             // return byte and increment buffer pointer
290             return ((int) buffer[bufferPointer++]) & 0xff;
291         }
292 
read(byte[] b, int off, int len)293         public synchronized int read(byte[] b, int off, int len) throws IOException {
294             if (b == null) {
295                 throw new NullPointerException();
296             }
297             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
298                             || ((off + len) < 0)) {
299                 throw new IndexOutOfBoundsException();
300             }
301             else if (len == 0) {
302                 return 0;
303             }
304 
305             checkClosed();
306 
307             // if nothing read yet or whole buffer has been read fill buffer
308             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
309                 // if no data available and stream was closed return -1
310                 if (!loadBuffer()) {
311                     return -1;
312                 }
313             }
314 
315             // if more bytes wanted than available return all available
316             int bytesAvailable = buffer.length - bufferPointer;
317             if (len > bytesAvailable) {
318                 len = bytesAvailable;
319             }
320 
321             System.arraycopy(buffer, bufferPointer, b, off, len);
322             bufferPointer += len;
323             return len;
324         }
325 
read(byte[] b)326         public synchronized int read(byte[] b) throws IOException {
327             return read(b, 0, b.length);
328         }
329 
330         /**
331          * This method blocks until a data packet is received, the stream is closed or the current
332          * thread is interrupted.
333          *
334          * @return <code>true</code> if data was received, otherwise <code>false</code>
335          * @throws IOException if data packets are out of sequence
336          */
loadBuffer()337         private synchronized boolean loadBuffer() throws IOException {
338 
339             // wait until data is available or stream is closed
340             DataPacketExtension data = null;
341             try {
342                 if (this.readTimeout == 0) {
343                     while (data == null) {
344                         if (isClosed && this.dataQueue.isEmpty()) {
345                             return false;
346                         }
347                         data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
348                     }
349                 }
350                 else {
351                     data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
352                     if (data == null) {
353                         throw new SocketTimeoutException();
354                     }
355                 }
356             }
357             catch (InterruptedException e) {
358                 // Restore the interrupted status
359                 Thread.currentThread().interrupt();
360                 return false;
361             }
362 
363             // handle sequence overflow
364             if (this.seq == 65535) {
365                 this.seq = -1;
366             }
367 
368             // check if data packets sequence is successor of last seen sequence
369             long seq = data.getSeq();
370             if (seq - 1 != this.seq) {
371                 // packets out of order; close stream/session
372                 InBandBytestreamSession.this.close();
373                 throw new IOException("Packets out of sequence");
374             }
375             else {
376                 this.seq = seq;
377             }
378 
379             // set buffer to decoded data
380             buffer = data.getDecodedData();
381             bufferPointer = 0;
382             return true;
383         }
384 
385         /**
386          * Checks if this stream is closed and throws an IOException if necessary
387          *
388          * @throws IOException if stream is closed and no data should be read anymore
389          */
checkClosed()390         private void checkClosed() throws IOException {
391             /* throw no exception if there is data available, but not if close method was invoked */
392             if ((isClosed && this.dataQueue.isEmpty()) || closeInvoked) {
393                 // clear data queue in case additional data was received after stream was closed
394                 this.dataQueue.clear();
395                 throw new IOException("Stream is closed");
396             }
397         }
398 
markSupported()399         public boolean markSupported() {
400             return false;
401         }
402 
close()403         public void close() throws IOException {
404             if (isClosed) {
405                 return;
406             }
407 
408             this.closeInvoked = true;
409 
410             InBandBytestreamSession.this.closeByLocal(true);
411         }
412 
413         /**
414          * This method sets the close flag and removes the data packet listener.
415          */
closeInternal()416         private void closeInternal() {
417             if (isClosed) {
418                 return;
419             }
420             isClosed = true;
421         }
422 
423         /**
424          * Invoked if the session is closed.
425          */
cleanup()426         private void cleanup() {
427             connection.removePacketListener(this.dataPacketListener);
428         }
429 
430     }
431 
432     /**
433      * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
434      * data packets.
435      */
436     private class IQIBBInputStream extends IBBInputStream {
437 
getDataPacketListener()438         protected PacketListener getDataPacketListener() {
439             return new PacketListener() {
440 
441                 private long lastSequence = -1;
442 
443                 public void processPacket(Packet packet) {
444                     // get data packet extension
445                     DataPacketExtension data = (DataPacketExtension) packet.getExtension(
446                                     DataPacketExtension.ELEMENT_NAME,
447                                     InBandBytestreamManager.NAMESPACE);
448 
449                     /*
450                      * check if sequence was not used already (see XEP-0047 Section 2.2)
451                      */
452                     if (data.getSeq() <= this.lastSequence) {
453                         IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
454                                         XMPPError.Condition.unexpected_request));
455                         connection.sendPacket(unexpectedRequest);
456                         return;
457 
458                     }
459 
460                     // check if encoded data is valid (see XEP-0047 Section 2.2)
461                     if (data.getDecodedData() == null) {
462                         // data is invalid; respond with bad-request error
463                         IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
464                                         XMPPError.Condition.bad_request));
465                         connection.sendPacket(badRequest);
466                         return;
467                     }
468 
469                     // data is valid; add to data queue
470                     dataQueue.offer(data);
471 
472                     // confirm IQ
473                     IQ confirmData = IQ.createResultIQ((IQ) packet);
474                     connection.sendPacket(confirmData);
475 
476                     // set last seen sequence
477                     this.lastSequence = data.getSeq();
478                     if (this.lastSequence == 65535) {
479                         this.lastSequence = -1;
480                     }
481 
482                 }
483 
484             };
485         }
486 
getDataPacketFilter()487         protected PacketFilter getDataPacketFilter() {
488             /*
489              * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
490              * data packet extension, matching session ID and recipient
491              */
492             return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter());
493         }
494 
495     }
496 
497     /**
498      * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
499      * encapsulating the data packets.
500      */
501     private class MessageIBBInputStream extends IBBInputStream {
502 
getDataPacketListener()503         protected PacketListener getDataPacketListener() {
504             return new PacketListener() {
505 
506                 public void processPacket(Packet packet) {
507                     // get data packet extension
508                     DataPacketExtension data = (DataPacketExtension) packet.getExtension(
509                                     DataPacketExtension.ELEMENT_NAME,
510                                     InBandBytestreamManager.NAMESPACE);
511 
512                     // check if encoded data is valid
513                     if (data.getDecodedData() == null) {
514                         /*
515                          * TODO once a majority of XMPP server implementation support XEP-0079
516                          * Advanced Message Processing the invalid message could be answered with an
517                          * appropriate error. For now we just ignore the packet. Subsequent packets
518                          * with an increased sequence will cause the input stream to close the
519                          * stream/session.
520                          */
521                         return;
522                     }
523 
524                     // data is valid; add to data queue
525                     dataQueue.offer(data);
526 
527                     // TODO confirm packet once XMPP servers support XEP-0079
528                 }
529 
530             };
531         }
532 
533         @Override
534         protected PacketFilter getDataPacketFilter() {
535             /*
536              * filter all message stanzas containing a data packet extension, matching session ID
537              * and recipient
538              */
539             return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter());
540         }
541 
542     }
543 
544     /**
545      * IBBDataPacketFilter class filters all packets from the remote peer of this session,
546      * containing an In-Band Bytestream data packet extension whose session ID matches this sessions
547      * ID.
548      */
549     private class IBBDataPacketFilter implements PacketFilter {
550 
551         public boolean accept(Packet packet) {
552             // sender equals remote peer
553             if (!packet.getFrom().equalsIgnoreCase(remoteJID)) {
554                 return false;
555             }
556 
557             // stanza contains data packet extension
558             PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME,
559                             InBandBytestreamManager.NAMESPACE);
560             if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) {
561                 return false;
562             }
563 
564             // session ID equals this session ID
565             DataPacketExtension data = (DataPacketExtension) packetExtension;
566             if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
567                 return false;
568             }
569 
570             return true;
571         }
572 
573     }
574 
575     /**
576      * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
577      * Subclasses of this output stream must provide a method to send data over XMPP stream.
578      */
579     private abstract class IBBOutputStream extends OutputStream {
580 
581         /* buffer with the size of this sessions block size */
582         protected final byte[] buffer;
583 
584         /* pointer to next byte to write to buffer */
585         protected int bufferPointer = 0;
586 
587         /* data packet sequence (range from 0 to 65535) */
588         protected long seq = 0;
589 
590         /* flag to indicate if output stream is closed */
591         protected boolean isClosed = false;
592 
593         /**
594          * Constructor.
595          */
596         public IBBOutputStream() {
597             this.buffer = new byte[(byteStreamRequest.getBlockSize()/4)*3];
598         }
599 
600         /**
601          * Writes the given data packet to the XMPP stream.
602          *
603          * @param data the data packet
604          * @throws IOException if an I/O error occurred while sending or if the stream is closed
605          */
606         protected abstract void writeToXML(DataPacketExtension data) throws IOException;
607 
608         public synchronized void write(int b) throws IOException {
609             if (this.isClosed) {
610                 throw new IOException("Stream is closed");
611             }
612 
613             // if buffer is full flush buffer
614             if (bufferPointer >= buffer.length) {
615                 flushBuffer();
616             }
617 
618             buffer[bufferPointer++] = (byte) b;
619         }
620 
621         public synchronized void write(byte b[], int off, int len) throws IOException {
622             if (b == null) {
623                 throw new NullPointerException();
624             }
625             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
626                             || ((off + len) < 0)) {
627                 throw new IndexOutOfBoundsException();
628             }
629             else if (len == 0) {
630                 return;
631             }
632 
633             if (this.isClosed) {
634                 throw new IOException("Stream is closed");
635             }
636 
637             // is data to send greater than buffer size
638             if (len >= buffer.length) {
639 
640                 // "byte" off the first chunk to write out
641                 writeOut(b, off, buffer.length);
642 
643                 // recursively call this method with the lesser amount
644                 write(b, off + buffer.length, len - buffer.length);
645             }
646             else {
647                 writeOut(b, off, len);
648             }
649         }
650 
651         public synchronized void write(byte[] b) throws IOException {
652             write(b, 0, b.length);
653         }
654 
655         /**
656          * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
657          * capacity has been reached. This method is only called from this class so it is assured
658          * that the amount of data to send is <= buffer capacity
659          *
660          * @param b the data
661          * @param off the data
662          * @param len the number of bytes to write
663          * @throws IOException if an I/O error occurred while sending or if the stream is closed
664          */
665         private synchronized void writeOut(byte b[], int off, int len) throws IOException {
666             if (this.isClosed) {
667                 throw new IOException("Stream is closed");
668             }
669 
670             // set to 0 in case the next 'if' block is not executed
671             int available = 0;
672 
673             // is data to send greater that buffer space left
674             if (len > buffer.length - bufferPointer) {
675                 // fill buffer to capacity and send it
676                 available = buffer.length - bufferPointer;
677                 System.arraycopy(b, off, buffer, bufferPointer, available);
678                 bufferPointer += available;
679                 flushBuffer();
680             }
681 
682             // copy the data left to buffer
683             System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
684             bufferPointer += len - available;
685         }
686 
687         public synchronized void flush() throws IOException {
688             if (this.isClosed) {
689                 throw new IOException("Stream is closed");
690             }
691             flushBuffer();
692         }
693 
694         private synchronized void flushBuffer() throws IOException {
695 
696             // do nothing if no data to send available
697             if (bufferPointer == 0) {
698                 return;
699             }
700 
701             // create data packet
702             String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false);
703             DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
704                             this.seq, enc);
705 
706             // write to XMPP stream
707             writeToXML(data);
708 
709             // reset buffer pointer
710             bufferPointer = 0;
711 
712             // increment sequence, considering sequence overflow
713             this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
714 
715         }
716 
717         public void close() throws IOException {
718             if (isClosed) {
719                 return;
720             }
721             InBandBytestreamSession.this.closeByLocal(false);
722         }
723 
724         /**
725          * Sets the close flag and optionally flushes the stream.
726          *
727          * @param flush if <code>true</code> flushes the stream
728          */
729         protected void closeInternal(boolean flush) {
730             if (this.isClosed) {
731                 return;
732             }
733             this.isClosed = true;
734 
735             try {
736                 if (flush) {
737                     flushBuffer();
738                 }
739             }
740             catch (IOException e) {
741                 /*
742                  * ignore, because writeToXML() will not throw an exception if stream is already
743                  * closed
744                  */
745             }
746         }
747 
748     }
749 
750     /**
751      * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
752      * the data packets.
753      */
754     private class IQIBBOutputStream extends IBBOutputStream {
755 
756         @Override
757         protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
758             // create IQ stanza containing data packet
759             IQ iq = new Data(data);
760             iq.setTo(remoteJID);
761 
762             try {
763                 SyncPacketSend.getReply(connection, iq);
764             }
765             catch (XMPPException e) {
766                 // close session unless it is already closed
767                 if (!this.isClosed) {
768                     InBandBytestreamSession.this.close();
769                     throw new IOException("Error while sending Data: " + e.getMessage());
770                 }
771             }
772 
773         }
774 
775     }
776 
777     /**
778      * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
779      * encapsulating the data packets.
780      */
781     private class MessageIBBOutputStream extends IBBOutputStream {
782 
783         @Override
784         protected synchronized void writeToXML(DataPacketExtension data) {
785             // create message stanza containing data packet
786             Message message = new Message(remoteJID);
787             message.addExtension(data);
788 
789             connection.sendPacket(message);
790 
791         }
792 
793     }
794 
795 }
796