• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2009 Mike Cumings
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.kenai.jbosh;
18 
19 import com.kenai.jbosh.ComposableBody.Builder;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.Queue;
25 import java.util.Set;
26 import java.util.SortedSet;
27 import java.util.TreeSet;
28 import java.util.concurrent.CopyOnWriteArraySet;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.ScheduledFuture;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicReference;
35 import java.util.concurrent.locks.Condition;
36 import java.util.concurrent.locks.ReentrantLock;
37 import java.util.logging.Level;
38 import java.util.logging.Logger;
39 
40 /**
41  * BOSH Client session instance.  Each communication session with a remote
42  * connection manager is represented and handled by an instance of this
43  * class.  This is the main entry point for client-side communications.
44  * To create a new session, a client configuration must first be created
45  * and then used to create a client instance:
46  * <pre>
47  * BOSHClientConfig cfg = BOSHClientConfig.Builder.create(
48  *         "http://server:1234/httpbind", "jabber.org")
49  *     .setFrom("user@jabber.org")
50  *     .build();
51  * BOSHClient client = BOSHClient.create(cfg);
52  * </pre>
53  * Additional client configuration options are available.  See the
54  * {@code BOSHClientConfig.Builder} class for more information.
55  * <p/>
56  * Once a {@code BOSHClient} instance has been created, communication with
57  * the remote connection manager can begin.  No attempt will be made to
58  * establish a connection to the connection manager until the first call
59  * is made to the {@code send(ComposableBody)} method.  Note that it is
60  * possible to send an empty body to cause an immediate connection attempt
61  * to the connection manager.  Sending an empty message would look like
62  * the following:
63  * <pre>
64  * client.send(ComposableBody.builder().build());
65  * </pre>
66  * For more information on creating body messages with content, see the
67  * {@code ComposableBody.Builder} class documentation.
68  * <p/>
69  * Once a session has been successfully started, the client instance can be
70  * used to send arbitrary payload data.  All aspects of the BOSH
71  * protocol involving setting and processing attributes in the BOSH
72  * namespace will be handled by the client code transparently and behind the
73  * scenes.  The user of the client instance can therefore concentrate
74  * entirely on the content of the message payload, leaving the semantics of
75  * the BOSH protocol to the client implementation.
76  * <p/>
77  * To be notified of incoming messages from the remote connection manager,
78  * a {@code BOSHClientResponseListener} should be added to the client instance.
79  * All incoming messages will be published to all response listeners as they
80  * arrive and are processed.  As with the transmission of payload data via
81  * the {@code send(ComposableBody)} method, there is no need to worry about
82  * handling of the BOSH attributes, since this is handled behind the scenes.
83  * <p/>
84  * If the connection to the remote connection manager is terminated (either
85  * explicitly or due to a terminal condition of some sort), all connection
86  * listeners will be notified.  After the connection has been closed, the
87  * client instance is considered dead and a new one must be created in order
88  * to resume communications with the remote server.
89  * <p/>
90  * Instances of this class are thread-safe.
91  *
92  * @see BOSHClientConfig.Builder
93  * @see BOSHClientResponseListener
94  * @see BOSHClientConnListener
95  * @see ComposableBody.Builder
96  */
97 public final class BOSHClient {
98 
99     /**
100      * Logger.
101      */
102     private static final Logger LOG = Logger.getLogger(
103             BOSHClient.class.getName());
104 
105     /**
106      * Value of the 'type' attribute used for session termination.
107      */
108     private static final String TERMINATE = "terminate";
109 
110     /**
111      * Value of the 'type' attribute used for recoverable errors.
112      */
113     private static final String ERROR = "error";
114 
115     /**
116      * Message to use for interrupted exceptions.
117      */
118     private static final String INTERRUPTED = "Interrupted";
119 
120     /**
121      * Message used for unhandled exceptions.
122      */
123     private static final String UNHANDLED = "Unhandled Exception";
124 
125     /**
126      * Message used whena null listener is detected.
127      */
128     private static final String NULL_LISTENER = "Listener may not b enull";
129 
130     /**
131      * Default empty request delay.
132      */
133     private static final int DEFAULT_EMPTY_REQUEST_DELAY = 100;
134 
135     /**
136      * Amount of time to wait before sending an empty request, in
137      * milliseconds.
138      */
139     private static final int EMPTY_REQUEST_DELAY = Integer.getInteger(
140             BOSHClient.class.getName() + ".emptyRequestDelay",
141             DEFAULT_EMPTY_REQUEST_DELAY);
142 
143     /**
144      * Default value for the pause margin.
145      */
146     private static final int DEFAULT_PAUSE_MARGIN = 500;
147 
148     /**
149      * The amount of time in milliseconds which will be reserved as a
150      * safety margin when scheduling empty requests against a maxpause
151      * value.   This should give us enough time to build the message
152      * and transport it to the remote host.
153      */
154     private static final int PAUSE_MARGIN = Integer.getInteger(
155             BOSHClient.class.getName() + ".pauseMargin",
156             DEFAULT_PAUSE_MARGIN);
157 
158     /**
159      * Flag indicating whether or not we want to perform assertions.
160      */
161     private static final boolean ASSERTIONS;
162 
163     /**
164      * Connection listeners.
165      */
166     private final Set<BOSHClientConnListener> connListeners =
167             new CopyOnWriteArraySet<BOSHClientConnListener>();
168 
169     /**
170      * Request listeners.
171      */
172     private final Set<BOSHClientRequestListener> requestListeners =
173             new CopyOnWriteArraySet<BOSHClientRequestListener>();
174 
175     /**
176      * Response listeners.
177      */
178     private final Set<BOSHClientResponseListener> responseListeners =
179             new CopyOnWriteArraySet<BOSHClientResponseListener>();
180 
181     /**
182      * Lock instance.
183      */
184     private final ReentrantLock lock = new ReentrantLock();
185 
186     /**
187      * Condition indicating that there are messages to be exchanged.
188      */
189     private final Condition notEmpty = lock.newCondition();
190 
191     /**
192      * Condition indicating that there are available slots for sending
193      * messages.
194      */
195     private final Condition notFull = lock.newCondition();
196 
197     /**
198      * Condition indicating that there are no outstanding connections.
199      */
200     private final Condition drained = lock.newCondition();
201 
202     /**
203      * Session configuration.
204      */
205     private final BOSHClientConfig cfg;
206 
207     /**
208      * Processor thread runnable instance.
209      */
210     private final Runnable procRunnable = new Runnable() {
211         /**
212          * Process incoming messages.
213          */
214         public void run() {
215             processMessages();
216         }
217     };
218 
219     /**
220      * Processor thread runnable instance.
221      */
222     private final Runnable emptyRequestRunnable = new Runnable() {
223         /**
224          * Process incoming messages.
225          */
226         public void run() {
227             sendEmptyRequest();
228         }
229     };
230 
231     /**
232      * HTTPSender instance.
233      */
234     private final HTTPSender httpSender =
235             new ApacheHTTPSender();
236 
237     /**
238      * Storage for test hook implementation.
239      */
240     private final AtomicReference<ExchangeInterceptor> exchInterceptor =
241             new AtomicReference<ExchangeInterceptor>();
242 
243     /**
244      * Request ID sequence to use for the session.
245      */
246     private final RequestIDSequence requestIDSeq = new RequestIDSequence();
247 
248     /**
249      * ScheduledExcecutor to use for deferred tasks.
250      */
251     private final ScheduledExecutorService schedExec =
252             Executors.newSingleThreadScheduledExecutor();
253 
254     /************************************************************
255      * The following vars must be accessed via the lock instance.
256      */
257 
258     /**
259      * Thread which is used to process responses from the connection
260      * manager.  Becomes null when session is terminated.
261      */
262     private Thread procThread;
263 
264     /**
265      * Future for sending a deferred empty request, if needed.
266      */
267     private ScheduledFuture emptyRequestFuture;
268 
269     /**
270      * Connection Manager session parameters.  Only available when in a
271      * connected state.
272      */
273     private CMSessionParams cmParams;
274 
275     /**
276      * List of active/outstanding requests.
277      */
278     private Queue<HTTPExchange> exchanges = new LinkedList<HTTPExchange>();
279 
280     /**
281      * Set of RIDs which have been received, for the purpose of sending
282      * response acknowledgements.
283      */
284     private SortedSet<Long> pendingResponseAcks = new TreeSet<Long>();
285 
286     /**
287      * The highest RID that we've already received a response for.  This value
288      * is used to implement response acks.
289      */
290     private Long responseAck = Long.valueOf(-1L);
291 
292     /**
293      * List of requests which have been made but not yet acknowledged.  This
294      * list remains unpopulated if the CM is not acking requests.
295      */
296     private List<ComposableBody> pendingRequestAcks =
297             new ArrayList<ComposableBody>();
298 
299     ///////////////////////////////////////////////////////////////////////////
300     // Classes:
301 
302     /**
303      * Class used in testing to dynamically manipulate received exchanges
304      * at test runtime.
305      */
306     abstract static class ExchangeInterceptor {
307         /**
308          * Limit construction.
309          */
ExchangeInterceptor()310         ExchangeInterceptor() {
311             // Empty;
312         }
313 
314         /**
315          * Hook to manipulate an HTTPExchange as is is about to be processed.
316          *
317          * @param exch original exchange that would be processed
318          * @return replacement exchange instance, or {@code null} to skip
319          *  processing of this exchange
320          */
interceptExchange(final HTTPExchange exch)321         abstract HTTPExchange interceptExchange(final HTTPExchange exch);
322     }
323 
324     ///////////////////////////////////////////////////////////////////////////
325     // Constructors:
326 
327     /**
328      * Determine whether or not we should perform assertions.  Assertions
329      * can be specified via system property explicitly, or defaulted to
330      * the JVM assertions status.
331      */
332     static {
333         final String prop =
334                 BOSHClient.class.getSimpleName() + ".assertionsEnabled";
335         boolean enabled = false;
336         if (System.getProperty(prop) == null) {
337             assert enabled = true;
338         } else {
339             enabled = Boolean.getBoolean(prop);
340         }
341         ASSERTIONS = enabled;
342     }
343 
344     /**
345      * Prevent direct construction.
346      */
BOSHClient(final BOSHClientConfig sessCfg)347     private BOSHClient(final BOSHClientConfig sessCfg) {
348         cfg = sessCfg;
349         init();
350     }
351 
352     ///////////////////////////////////////////////////////////////////////////
353     // Public methods:
354 
355     /**
356      * Create a new BOSH client session using the client configuration
357      * information provided.
358      *
359      * @param clientCfg session configuration
360      * @return BOSH session instance
361      */
create(final BOSHClientConfig clientCfg)362     public static BOSHClient create(final BOSHClientConfig clientCfg) {
363         if (clientCfg == null) {
364             throw(new IllegalArgumentException(
365                     "Client configuration may not be null"));
366         }
367         return new BOSHClient(clientCfg);
368     }
369 
370     /**
371      * Get the client configuration that was used to create this client
372      * instance.
373      *
374      * @return client configuration
375      */
getBOSHClientConfig()376     public BOSHClientConfig getBOSHClientConfig() {
377         return cfg;
378     }
379 
380     /**
381      * Adds a connection listener to the session.
382      *
383      * @param listener connection listener to add, if not already added
384      */
addBOSHClientConnListener( final BOSHClientConnListener listener)385     public void addBOSHClientConnListener(
386             final BOSHClientConnListener listener) {
387         if (listener == null) {
388             throw(new IllegalArgumentException(NULL_LISTENER));
389         }
390         connListeners.add(listener);
391     }
392 
393     /**
394      * Removes a connection listener from the session.
395      *
396      * @param listener connection listener to remove, if previously added
397      */
removeBOSHClientConnListener( final BOSHClientConnListener listener)398     public void removeBOSHClientConnListener(
399             final BOSHClientConnListener listener) {
400         if (listener == null) {
401             throw(new IllegalArgumentException(NULL_LISTENER));
402         }
403         connListeners.remove(listener);
404     }
405 
406     /**
407      * Adds a request message listener to the session.
408      *
409      * @param listener request listener to add, if not already added
410      */
addBOSHClientRequestListener( final BOSHClientRequestListener listener)411     public void addBOSHClientRequestListener(
412             final BOSHClientRequestListener listener) {
413         if (listener == null) {
414             throw(new IllegalArgumentException(NULL_LISTENER));
415         }
416         requestListeners.add(listener);
417     }
418 
419     /**
420      * Removes a request message listener from the session, if previously
421      * added.
422      *
423      * @param listener instance to remove
424      */
removeBOSHClientRequestListener( final BOSHClientRequestListener listener)425     public void removeBOSHClientRequestListener(
426             final BOSHClientRequestListener listener) {
427         if (listener == null) {
428             throw(new IllegalArgumentException(NULL_LISTENER));
429         }
430         requestListeners.remove(listener);
431     }
432 
433     /**
434      * Adds a response message listener to the session.
435      *
436      * @param listener response listener to add, if not already added
437      */
addBOSHClientResponseListener( final BOSHClientResponseListener listener)438     public void addBOSHClientResponseListener(
439             final BOSHClientResponseListener listener) {
440         if (listener == null) {
441             throw(new IllegalArgumentException(NULL_LISTENER));
442         }
443         responseListeners.add(listener);
444     }
445 
446     /**
447      * Removes a response message listener from the session, if previously
448      * added.
449      *
450      * @param listener instance to remove
451      */
removeBOSHClientResponseListener( final BOSHClientResponseListener listener)452     public void removeBOSHClientResponseListener(
453             final BOSHClientResponseListener listener) {
454         if (listener == null) {
455             throw(new IllegalArgumentException(NULL_LISTENER));
456         }
457         responseListeners.remove(listener);
458     }
459 
460     /**
461      * Send the provided message data to the remote connection manager.  The
462      * provided message body does not need to have any BOSH-specific attribute
463      * information set.  It only needs to contain the actual message payload
464      * that should be delivered to the remote server.
465      * <p/>
466      * The first call to this method will result in a connection attempt
467      * to the remote connection manager.  Subsequent calls to this method
468      * will block until the underlying session state allows for the message
469      * to be transmitted.  In certain scenarios - such as when the maximum
470      * number of outbound connections has been reached - calls to this method
471      * will block for short periods of time.
472      *
473      * @param body message data to send to remote server
474      * @throws BOSHException on message transmission failure
475      */
send(final ComposableBody body)476     public void send(final ComposableBody body) throws BOSHException {
477         assertUnlocked();
478         if (body == null) {
479             throw(new IllegalArgumentException(
480                     "Message body may not be null"));
481         }
482 
483         HTTPExchange exch;
484         CMSessionParams params;
485         lock.lock();
486         try {
487             blockUntilSendable(body);
488             if (!isWorking() && !isTermination(body)) {
489                 throw(new BOSHException(
490                         "Cannot send message when session is closed"));
491             }
492 
493             long rid = requestIDSeq.getNextRID();
494             ComposableBody request = body;
495             params = cmParams;
496             if (params == null && exchanges.isEmpty()) {
497                 // This is the first message being sent
498                 request = applySessionCreationRequest(rid, body);
499             } else {
500                 request = applySessionData(rid, body);
501                 if (cmParams.isAckingRequests()) {
502                     pendingRequestAcks.add(request);
503                 }
504             }
505             exch = new HTTPExchange(request);
506             exchanges.add(exch);
507             notEmpty.signalAll();
508             clearEmptyRequest();
509         } finally {
510             lock.unlock();
511         }
512         AbstractBody finalReq = exch.getRequest();
513         HTTPResponse resp = httpSender.send(params, finalReq);
514         exch.setHTTPResponse(resp);
515         fireRequestSent(finalReq);
516     }
517 
518     /**
519      * Attempt to pause the current session.  When supported by the remote
520      * connection manager, pausing the session will result in the connection
521      * manager closing out all outstanding requests (including the pause
522      * request) and increases the inactivity timeout of the session.  The
523      * exact value of the temporary timeout is dependent upon the connection
524      * manager.  This method should be used if a client encounters an
525      * exceptional temporary situation during which it will be unable to send
526      * requests to the connection manager for a period of time greater than
527      * the maximum inactivity period.
528      *
529      * The session will revert back to it's normal, unpaused state when the
530      * client sends it's next message.
531      *
532      * @return {@code true} if the connection manager supports session pausing,
533      *  {@code false} if the connection manager does not support session
534      *  pausing or if the session has not yet been established
535      */
pause()536     public boolean pause() {
537         assertUnlocked();
538         lock.lock();
539         AttrMaxPause maxPause = null;
540         try {
541             if (cmParams == null) {
542                 return false;
543             }
544 
545             maxPause = cmParams.getMaxPause();
546             if (maxPause == null) {
547                 return false;
548             }
549         } finally {
550             lock.unlock();
551         }
552         try {
553             send(ComposableBody.builder()
554                     .setAttribute(Attributes.PAUSE, maxPause.toString())
555                     .build());
556         } catch (BOSHException boshx) {
557             LOG.log(Level.FINEST, "Could not send pause", boshx);
558         }
559         return true;
560     }
561 
562     /**
563      * End the BOSH session by disconnecting from the remote BOSH connection
564      * manager.
565      *
566      * @throws BOSHException when termination message cannot be sent
567      */
disconnect()568     public void disconnect() throws BOSHException {
569         disconnect(ComposableBody.builder().build());
570     }
571 
572     /**
573      * End the BOSH session by disconnecting from the remote BOSH connection
574      * manager, sending the provided content in the final connection
575      * termination message.
576      *
577      * @param msg final message to send
578      * @throws BOSHException when termination message cannot be sent
579      */
disconnect(final ComposableBody msg)580     public void disconnect(final ComposableBody msg) throws BOSHException {
581         if (msg == null) {
582             throw(new IllegalArgumentException(
583                     "Message body may not be null"));
584         }
585 
586         Builder builder = msg.rebuild();
587         builder.setAttribute(Attributes.TYPE, TERMINATE);
588         send(builder.build());
589     }
590 
591     /**
592      * Forcibly close this client session instance.  The preferred mechanism
593      * to close the connection is to send a disconnect message and wait for
594      * organic termination.  Calling this method simply shuts down the local
595      * session without sending a termination message, releasing all resources
596      * associated with the session.
597      */
close()598     public void close() {
599         dispose(new BOSHException("Session explicitly closed by caller"));
600     }
601 
602     ///////////////////////////////////////////////////////////////////////////
603     // Package-private methods:
604 
605     /**
606      * Get the current CM session params.
607      *
608      * @return current session params, or {@code null}
609      */
getCMSessionParams()610     CMSessionParams getCMSessionParams() {
611         lock.lock();
612         try {
613             return cmParams;
614         } finally {
615             lock.unlock();
616         }
617     }
618 
619     /**
620      * Wait until no more messages are waiting to be processed.
621      */
drain()622     void drain() {
623         lock.lock();
624         try {
625             LOG.finest("Waiting while draining...");
626             while (isWorking()
627                     && (emptyRequestFuture == null
628                     || emptyRequestFuture.isDone())) {
629                 try {
630                     drained.await();
631                 } catch (InterruptedException intx) {
632                     LOG.log(Level.FINEST, INTERRUPTED, intx);
633                 }
634             }
635             LOG.finest("Drained");
636         } finally {
637             lock.unlock();
638         }
639     }
640 
641     /**
642      * Test method used to forcibly discard next exchange.
643      *
644      * @param interceptor exchange interceptor
645      */
setExchangeInterceptor(final ExchangeInterceptor interceptor)646     void setExchangeInterceptor(final ExchangeInterceptor interceptor) {
647         exchInterceptor.set(interceptor);
648     }
649 
650 
651     ///////////////////////////////////////////////////////////////////////////
652     // Private methods:
653 
654     /**
655      * Initialize the session.  This initializes the underlying HTTP
656      * transport implementation and starts the receive thread.
657      */
init()658     private void init() {
659         assertUnlocked();
660 
661         lock.lock();
662         try {
663             httpSender.init(cfg);
664             procThread = new Thread(procRunnable);
665             procThread.setDaemon(true);
666             procThread.setName(BOSHClient.class.getSimpleName()
667                     + "[" + System.identityHashCode(this)
668                     + "]: Receive thread");
669             procThread.start();
670         } finally {
671             lock.unlock();
672         }
673     }
674 
675     /**
676      * Destroy this session.
677      *
678      * @param cause the reason for the session termination, or {@code null}
679      *  for normal termination
680      */
dispose(final Throwable cause)681     private void dispose(final Throwable cause) {
682         assertUnlocked();
683 
684         lock.lock();
685         try {
686             if (procThread == null) {
687                 // Already disposed
688                 return;
689             }
690             procThread = null;
691         } finally {
692             lock.unlock();
693         }
694 
695         if (cause == null) {
696             fireConnectionClosed();
697         } else {
698             fireConnectionClosedOnError(cause);
699         }
700 
701         lock.lock();
702         try {
703             clearEmptyRequest();
704             exchanges = null;
705             cmParams = null;
706             pendingResponseAcks = null;
707             pendingRequestAcks = null;
708             notEmpty.signalAll();
709             notFull.signalAll();
710             drained.signalAll();
711         } finally {
712             lock.unlock();
713         }
714 
715         httpSender.destroy();
716         schedExec.shutdownNow();
717     }
718 
719     /**
720      * Determines if the message body specified indicates a request to
721      * pause the session.
722      *
723      * @param msg message to evaluate
724      * @return {@code true} if the message is a pause request, {@code false}
725      *  otherwise
726      */
isPause(final AbstractBody msg)727     private static boolean isPause(final AbstractBody msg) {
728         return msg.getAttribute(Attributes.PAUSE) != null;
729     }
730 
731     /**
732      * Determines if the message body specified indicates a termination of
733      * the session.
734      *
735      * @param msg message to evaluate
736      * @return {@code true} if the message is a session termination,
737      *  {@code false} otherwise
738      */
isTermination(final AbstractBody msg)739     private static boolean isTermination(final AbstractBody msg) {
740         return TERMINATE.equals(msg.getAttribute(Attributes.TYPE));
741     }
742 
743     /**
744      * Evaluates the HTTP response code and response message and returns the
745      * terminal binding condition that it describes, if any.
746      *
747      * @param respCode HTTP response code
748      * @param respBody response body
749      * @return terminal binding condition, or {@code null} if not a terminal
750      *  binding condition message
751      */
getTerminalBindingCondition( final int respCode, final AbstractBody respBody)752     private TerminalBindingCondition getTerminalBindingCondition(
753             final int respCode,
754             final AbstractBody respBody) {
755         assertLocked();
756 
757         if (isTermination(respBody)) {
758             String str = respBody.getAttribute(Attributes.CONDITION);
759             return TerminalBindingCondition.forString(str);
760         }
761         // Check for deprecated HTTP Error Conditions
762         if (cmParams != null && cmParams.getVersion() == null) {
763             return TerminalBindingCondition.forHTTPResponseCode(respCode);
764         }
765         return null;
766     }
767 
768     /**
769      * Determines if the message specified is immediately sendable or if it
770      * needs to block until the session state changes.
771      *
772      * @param msg message to evaluate
773      * @return {@code true} if the message can be immediately sent,
774      *  {@code false} otherwise
775      */
isImmediatelySendable(final AbstractBody msg)776     private boolean isImmediatelySendable(final AbstractBody msg) {
777         assertLocked();
778 
779         if (cmParams == null) {
780             // block if we're waiting for a response to our first request
781             return exchanges.isEmpty();
782         }
783 
784         AttrRequests requests = cmParams.getRequests();
785         if (requests == null) {
786             return true;
787         }
788         int maxRequests = requests.intValue();
789         if (exchanges.size() < maxRequests) {
790             return true;
791         }
792         if (exchanges.size() == maxRequests
793                 && (isTermination(msg) || isPause(msg))) {
794             // One additional terminate or pause message is allowed
795             return true;
796         }
797         return false;
798     }
799 
800     /**
801      * Determines whether or not the session is still active.
802      *
803      * @return {@code true} if it is, {@code false} otherwise
804      */
isWorking()805     private boolean isWorking() {
806         assertLocked();
807 
808         return procThread != null;
809     }
810 
811     /**
812      * Blocks until either the message provided becomes immediately
813      * sendable or until the session is terminated.
814      *
815      * @param msg message to evaluate
816      */
blockUntilSendable(final AbstractBody msg)817     private void blockUntilSendable(final AbstractBody msg) {
818         assertLocked();
819 
820         while (isWorking() && !isImmediatelySendable(msg)) {
821             try {
822                 notFull.await();
823             } catch (InterruptedException intx) {
824                 LOG.log(Level.FINEST, INTERRUPTED, intx);
825             }
826         }
827     }
828 
829     /**
830      * Modifies the specified body message such that it becomes a new
831      * BOSH session creation request.
832      *
833      * @param rid request ID to use
834      * @param orig original body to modify
835      * @return modified message which acts as a session creation request
836      */
applySessionCreationRequest( final long rid, final ComposableBody orig)837     private ComposableBody applySessionCreationRequest(
838             final long rid, final ComposableBody orig) throws BOSHException {
839         assertLocked();
840 
841         Builder builder = orig.rebuild();
842         builder.setAttribute(Attributes.TO, cfg.getTo());
843         builder.setAttribute(Attributes.XML_LANG, cfg.getLang());
844         builder.setAttribute(Attributes.VER,
845                 AttrVersion.getSupportedVersion().toString());
846         builder.setAttribute(Attributes.WAIT, "60");
847         builder.setAttribute(Attributes.HOLD, "1");
848         builder.setAttribute(Attributes.RID, Long.toString(rid));
849         applyRoute(builder);
850         applyFrom(builder);
851         builder.setAttribute(Attributes.ACK, "1");
852 
853         // Make sure the following are NOT present (i.e., during retries)
854         builder.setAttribute(Attributes.SID, null);
855         return builder.build();
856     }
857 
858     /**
859      * Applies routing information to the request message who's builder has
860      * been provided.
861      *
862      * @param builder builder instance to add routing information to
863      */
applyRoute(final Builder builder)864     private void applyRoute(final Builder builder) {
865         assertLocked();
866 
867         String route = cfg.getRoute();
868         if (route != null) {
869             builder.setAttribute(Attributes.ROUTE, route);
870         }
871     }
872 
873     /**
874      * Applies the local station ID information to the request message who's
875      * builder has been provided.
876      *
877      * @param builder builder instance to add station ID information to
878      */
applyFrom(final Builder builder)879     private void applyFrom(final Builder builder) {
880         assertLocked();
881 
882         String from = cfg.getFrom();
883         if (from != null) {
884             builder.setAttribute(Attributes.FROM, from);
885         }
886     }
887 
888     /**
889      * Applies existing session data to the outbound request, returning the
890      * modified request.
891      *
892      * This method assumes the lock is currently held.
893      *
894      * @param rid request ID to use
895      * @param orig original/raw request
896      * @return modified request with session information applied
897      */
applySessionData( final long rid, final ComposableBody orig)898     private ComposableBody applySessionData(
899             final long rid,
900             final ComposableBody orig) throws BOSHException {
901         assertLocked();
902 
903         Builder builder = orig.rebuild();
904         builder.setAttribute(Attributes.SID,
905                 cmParams.getSessionID().toString());
906         builder.setAttribute(Attributes.RID, Long.toString(rid));
907         applyResponseAcknowledgement(builder, rid);
908         return builder.build();
909     }
910 
911     /**
912      * Sets the 'ack' attribute of the request to the value of the highest
913      * 'rid' of a request for which it has already received a response in the
914      * case where it has also received all responses associated with lower
915      * 'rid' values.  The only exception is that, after its session creation
916      * request, the client SHOULD NOT include an 'ack' attribute in any request
917      * if it has received responses to all its previous requests.
918      *
919      * @param builder message builder
920      * @param rid current request RID
921      */
applyResponseAcknowledgement( final Builder builder, final long rid)922     private void applyResponseAcknowledgement(
923             final Builder builder,
924             final long rid) {
925         assertLocked();
926 
927         if (responseAck.equals(Long.valueOf(-1L))) {
928             // We have not received any responses yet
929             return;
930         }
931 
932         Long prevRID = Long.valueOf(rid - 1L);
933         if (responseAck.equals(prevRID)) {
934             // Implicit ack
935             return;
936         }
937 
938         builder.setAttribute(Attributes.ACK, responseAck.toString());
939     }
940 
941     /**
942      * While we are "connected", process received responses.
943      *
944      * This method is run in the processing thread.
945      */
processMessages()946     private void processMessages() {
947         LOG.log(Level.FINEST, "Processing thread starting");
948         try {
949             HTTPExchange exch;
950             do {
951                 exch = nextExchange();
952                 if (exch == null) {
953                     break;
954                 }
955 
956                 // Test hook to manipulate what the client sees:
957                 ExchangeInterceptor interceptor = exchInterceptor.get();
958                 if (interceptor != null) {
959                     HTTPExchange newExch = interceptor.interceptExchange(exch);
960                     if (newExch == null) {
961                         LOG.log(Level.FINE, "Discarding exchange on request "
962                                 + "of test hook: RID="
963                                 + exch.getRequest().getAttribute(
964                                     Attributes.RID));
965                         lock.lock();
966                         try {
967                             exchanges.remove(exch);
968                         } finally {
969                             lock.unlock();
970                         }
971                         continue;
972                     }
973                     exch = newExch;
974                 }
975 
976                 processExchange(exch);
977             } while (true);
978         } finally {
979             LOG.log(Level.FINEST, "Processing thread exiting");
980         }
981 
982     }
983 
984     /**
985      * Get the next message exchange to process, blocking until one becomes
986      * available if nothing is already waiting for processing.
987      *
988      * @return next available exchange to process, or {@code null} if no
989      *  exchanges are immediately available
990      */
nextExchange()991     private HTTPExchange nextExchange() {
992         assertUnlocked();
993 
994         final Thread thread = Thread.currentThread();
995         HTTPExchange exch = null;
996         lock.lock();
997         try {
998             do {
999                 if (!thread.equals(procThread)) {
1000                     break;
1001                 }
1002                 exch = exchanges.peek();
1003                 if (exch == null) {
1004                     try {
1005                         notEmpty.await();
1006                     } catch (InterruptedException intx) {
1007                         LOG.log(Level.FINEST, INTERRUPTED, intx);
1008                     }
1009                 }
1010             } while (exch == null);
1011         } finally {
1012             lock.unlock();
1013         }
1014         return exch;
1015     }
1016 
1017     /**
1018      * Process the next, provided exchange.  This is the main processing
1019      * method of the receive thread.
1020      *
1021      * @param exch message exchange to process
1022      */
processExchange(final HTTPExchange exch)1023     private void processExchange(final HTTPExchange exch) {
1024         assertUnlocked();
1025 
1026         HTTPResponse resp;
1027         AbstractBody body;
1028         int respCode;
1029         try {
1030             resp = exch.getHTTPResponse();
1031             body = resp.getBody();
1032             respCode = resp.getHTTPStatus();
1033         } catch (BOSHException boshx) {
1034             LOG.log(Level.FINEST, "Could not obtain response", boshx);
1035             dispose(boshx);
1036             return;
1037         } catch (InterruptedException intx) {
1038             LOG.log(Level.FINEST, INTERRUPTED, intx);
1039             dispose(intx);
1040             return;
1041         }
1042         fireResponseReceived(body);
1043 
1044         // Process the message with the current session state
1045         AbstractBody req = exch.getRequest();
1046         CMSessionParams params;
1047         List<HTTPExchange> toResend = null;
1048         lock.lock();
1049         try {
1050             // Check for session creation response info, if needed
1051             if (cmParams == null) {
1052                 cmParams = CMSessionParams.fromSessionInit(req, body);
1053 
1054                 // The following call handles the lock. It's not an escape.
1055                 fireConnectionEstablished();
1056             }
1057             params = cmParams;
1058 
1059             checkForTerminalBindingConditions(body, respCode);
1060             if (isTermination(body)) {
1061                 // Explicit termination
1062                 lock.unlock();
1063                 dispose(null);
1064                 return;
1065             }
1066 
1067             if (isRecoverableBindingCondition(body)) {
1068                 // Retransmit outstanding requests
1069                 if (toResend == null) {
1070                     toResend = new ArrayList<HTTPExchange>(exchanges.size());
1071                 }
1072                 for (HTTPExchange exchange : exchanges) {
1073                     HTTPExchange resendExch =
1074                             new HTTPExchange(exchange.getRequest());
1075                     toResend.add(resendExch);
1076                 }
1077                 for (HTTPExchange exchange : toResend) {
1078                     exchanges.add(exchange);
1079                 }
1080             } else {
1081                 // Process message as normal
1082                 processRequestAcknowledgements(req, body);
1083                 processResponseAcknowledgementData(req);
1084                 HTTPExchange resendExch =
1085                         processResponseAcknowledgementReport(body);
1086                 if (resendExch != null && toResend == null) {
1087                     toResend = new ArrayList<HTTPExchange>(1);
1088                     toResend.add(resendExch);
1089                     exchanges.add(resendExch);
1090                 }
1091             }
1092         } catch (BOSHException boshx) {
1093             LOG.log(Level.FINEST, "Could not process response", boshx);
1094             lock.unlock();
1095             dispose(boshx);
1096             return;
1097         } finally {
1098             if (lock.isHeldByCurrentThread()) {
1099                 try {
1100                     exchanges.remove(exch);
1101                     if (exchanges.isEmpty()) {
1102                         scheduleEmptyRequest(processPauseRequest(req));
1103                     }
1104                     notFull.signalAll();
1105                 } finally {
1106                     lock.unlock();
1107                 }
1108             }
1109         }
1110 
1111         if (toResend != null) {
1112             for (HTTPExchange resend : toResend) {
1113                 HTTPResponse response =
1114                         httpSender.send(params, resend.getRequest());
1115                 resend.setHTTPResponse(response);
1116                 fireRequestSent(resend.getRequest());
1117             }
1118         }
1119     }
1120 
1121     /**
1122      * Clears any scheduled empty requests.
1123      */
clearEmptyRequest()1124     private void clearEmptyRequest() {
1125         assertLocked();
1126 
1127         if (emptyRequestFuture != null) {
1128             emptyRequestFuture.cancel(false);
1129             emptyRequestFuture = null;
1130         }
1131     }
1132 
1133     /**
1134      * Calculates the default empty request delay/interval to use for the
1135      * active session.
1136      *
1137      * @return delay in milliseconds
1138      */
getDefaultEmptyRequestDelay()1139     private long getDefaultEmptyRequestDelay() {
1140         assertLocked();
1141 
1142         // Figure out how long we should wait before sending an empty request
1143         AttrPolling polling = cmParams.getPollingInterval();
1144         long delay;
1145         if (polling == null) {
1146             delay = EMPTY_REQUEST_DELAY;
1147         } else {
1148             delay = polling.getInMilliseconds();
1149         }
1150         return delay;
1151     }
1152 
1153     /**
1154      * Schedule an empty request to be sent if no other requests are
1155      * sent in a reasonable amount of time.
1156      */
scheduleEmptyRequest(long delay)1157     private void scheduleEmptyRequest(long delay) {
1158         assertLocked();
1159         if (delay < 0L) {
1160             throw(new IllegalArgumentException(
1161                     "Empty request delay must be >= 0 (was: " + delay + ")"));
1162         }
1163 
1164         clearEmptyRequest();
1165         if (!isWorking()) {
1166             return;
1167         }
1168 
1169         // Schedule the transmission
1170         if (LOG.isLoggable(Level.FINER)) {
1171             LOG.finer("Scheduling empty request in " + delay + "ms");
1172         }
1173         try {
1174             emptyRequestFuture = schedExec.schedule(emptyRequestRunnable,
1175                     delay, TimeUnit.MILLISECONDS);
1176         } catch (RejectedExecutionException rex) {
1177             LOG.log(Level.FINEST, "Could not schedule empty request", rex);
1178         }
1179         drained.signalAll();
1180     }
1181 
1182     /**
1183      * Sends an empty request to maintain session requirements.  If a request
1184      * is sent within a reasonable time window, the empty request transmission
1185      * will be cancelled.
1186      */
sendEmptyRequest()1187     private void sendEmptyRequest() {
1188         assertUnlocked();
1189         // Send an empty request
1190         LOG.finest("Sending empty request");
1191         try {
1192             send(ComposableBody.builder().build());
1193         } catch (BOSHException boshx) {
1194             dispose(boshx);
1195         }
1196     }
1197 
1198     /**
1199      * Assert that the internal lock is held.
1200      */
assertLocked()1201     private void assertLocked() {
1202         if (ASSERTIONS) {
1203             if (!lock.isHeldByCurrentThread()) {
1204                 throw(new AssertionError("Lock is not held by current thread"));
1205             }
1206             return;
1207         }
1208     }
1209 
1210     /**
1211      * Assert that the internal lock is *not* held.
1212      */
assertUnlocked()1213     private void assertUnlocked() {
1214         if (ASSERTIONS) {
1215             if (lock.isHeldByCurrentThread()) {
1216                 throw(new AssertionError("Lock is held by current thread"));
1217             }
1218             return;
1219         }
1220     }
1221 
1222     /**
1223      * Checks to see if the response indicates a terminal binding condition
1224      * (as per XEP-0124 section 17).  If it does, an exception is thrown.
1225      *
1226      * @param body response body to evaluate
1227      * @param code HTTP response code
1228      * @throws BOSHException if a terminal binding condition is detected
1229      */
checkForTerminalBindingConditions( final AbstractBody body, final int code)1230     private void checkForTerminalBindingConditions(
1231             final AbstractBody body,
1232             final int code)
1233             throws BOSHException {
1234         TerminalBindingCondition cond =
1235                 getTerminalBindingCondition(code, body);
1236         if (cond != null) {
1237             throw(new BOSHException(
1238                     "Terminal binding condition encountered: "
1239                     + cond.getCondition() + "  ("
1240                     + cond.getMessage() + ")"));
1241         }
1242     }
1243 
1244     /**
1245      * Determines whether or not the response indicates a recoverable
1246      * binding condition (as per XEP-0124 section 17).
1247      *
1248      * @param resp response body
1249      * @return {@code true} if it does, {@code false} otherwise
1250      */
isRecoverableBindingCondition( final AbstractBody resp)1251     private static boolean isRecoverableBindingCondition(
1252             final AbstractBody resp) {
1253         return ERROR.equals(resp.getAttribute(Attributes.TYPE));
1254     }
1255 
1256     /**
1257      * Process the request to determine if the empty request delay
1258      * can be determined by looking to see if the request is a pause
1259      * request.  If it can, the request's delay is returned, otherwise
1260      * the default delay is returned.
1261      *
1262      * @return delay in milliseconds that should elapse prior to an
1263      *  empty message being sent
1264      */
processPauseRequest( final AbstractBody req)1265     private long processPauseRequest(
1266             final AbstractBody req) {
1267         assertLocked();
1268 
1269         if (cmParams != null && cmParams.getMaxPause() != null) {
1270             try {
1271                 AttrPause pause = AttrPause.createFromString(
1272                         req.getAttribute(Attributes.PAUSE));
1273                 if (pause != null) {
1274                     long delay = pause.getInMilliseconds() - PAUSE_MARGIN;
1275                     if (delay < 0) {
1276                         delay = EMPTY_REQUEST_DELAY;
1277                     }
1278                     return delay;
1279                 }
1280             } catch (BOSHException boshx) {
1281                 LOG.log(Level.FINEST, "Could not extract", boshx);
1282             }
1283         }
1284 
1285         return getDefaultEmptyRequestDelay();
1286     }
1287 
1288     /**
1289      * Check the response for request acknowledgements and take appropriate
1290      * action.
1291      *
1292      * This method assumes the lock is currently held.
1293      *
1294      * @param req request
1295      * @param resp response
1296      */
processRequestAcknowledgements( final AbstractBody req, final AbstractBody resp)1297     private void processRequestAcknowledgements(
1298             final AbstractBody req, final AbstractBody resp) {
1299         assertLocked();
1300 
1301         if (!cmParams.isAckingRequests()) {
1302             return;
1303         }
1304 
1305         // If a report or time attribute is set, we aren't acking anything
1306         if (resp.getAttribute(Attributes.REPORT) != null) {
1307             return;
1308         }
1309 
1310         // Figure out what the highest acked RID is
1311         String acked = resp.getAttribute(Attributes.ACK);
1312         Long ackUpTo;
1313         if (acked == null) {
1314             // Implicit ack of all prior requests up until RID
1315             ackUpTo = Long.parseLong(req.getAttribute(Attributes.RID));
1316         } else {
1317             ackUpTo = Long.parseLong(acked);
1318         }
1319 
1320         // Remove the acked requests from the list
1321         if (LOG.isLoggable(Level.FINEST)) {
1322             LOG.finest("Removing pending acks up to: " + ackUpTo);
1323         }
1324         Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
1325         while (iter.hasNext()) {
1326             AbstractBody pending = iter.next();
1327             Long pendingRID = Long.parseLong(
1328                     pending.getAttribute(Attributes.RID));
1329             if (pendingRID.compareTo(ackUpTo) <= 0) {
1330                 iter.remove();
1331             }
1332         }
1333     }
1334 
1335     /**
1336      * Process the response in order to update the response acknowlegement
1337      * data.
1338      *
1339      * This method assumes the lock is currently held.
1340      *
1341      * @param req request
1342      */
processResponseAcknowledgementData( final AbstractBody req)1343     private void processResponseAcknowledgementData(
1344             final AbstractBody req) {
1345         assertLocked();
1346 
1347         Long rid = Long.parseLong(req.getAttribute(Attributes.RID));
1348         if (responseAck.equals(Long.valueOf(-1L))) {
1349             // This is the first request
1350             responseAck = rid;
1351         } else {
1352             pendingResponseAcks.add(rid);
1353             // Remove up until the first missing response (or end of queue)
1354             Long whileVal = responseAck;
1355             while (whileVal.equals(pendingResponseAcks.first())) {
1356                 responseAck = whileVal;
1357                 pendingResponseAcks.remove(whileVal);
1358                 whileVal = Long.valueOf(whileVal.longValue() + 1);
1359             }
1360         }
1361     }
1362 
1363     /**
1364      * Process the response in order to check for and respond to any potential
1365      * ack reports.
1366      *
1367      * This method assumes the lock is currently held.
1368      *
1369      * @param resp response
1370      * @return exchange to transmit if a resend is to be performed, or
1371      *  {@code null} if no resend is necessary
1372      * @throws BOSHException when a a retry is needed but cannot be performed
1373      */
processResponseAcknowledgementReport( final AbstractBody resp)1374     private HTTPExchange processResponseAcknowledgementReport(
1375             final AbstractBody resp)
1376             throws BOSHException {
1377         assertLocked();
1378 
1379         String reportStr = resp.getAttribute(Attributes.REPORT);
1380         if (reportStr == null) {
1381             // No report on this message
1382             return null;
1383         }
1384 
1385         Long report = Long.parseLong(reportStr);
1386         Long time = Long.parseLong(resp.getAttribute(Attributes.TIME));
1387         if (LOG.isLoggable(Level.FINE)) {
1388             LOG.fine("Received report of missing request (RID="
1389                     + report + ", time=" + time + "ms)");
1390         }
1391 
1392         // Find the missing request
1393         Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
1394         AbstractBody req = null;
1395         while (iter.hasNext() && req == null) {
1396             AbstractBody pending = iter.next();
1397             Long pendingRID = Long.parseLong(
1398                     pending.getAttribute(Attributes.RID));
1399             if (report.equals(pendingRID)) {
1400                 req = pending;
1401             }
1402         }
1403 
1404         if (req == null) {
1405             throw(new BOSHException("Report of missing message with RID '"
1406                     + reportStr
1407                     + "' but local copy of that request was not found"));
1408         }
1409 
1410         // Resend the missing request
1411         HTTPExchange exch = new HTTPExchange(req);
1412         exchanges.add(exch);
1413         notEmpty.signalAll();
1414         return exch;
1415     }
1416 
1417     /**
1418      * Notifies all request listeners that the specified request is being
1419      * sent.
1420      *
1421      * @param request request being sent
1422      */
fireRequestSent(final AbstractBody request)1423     private void fireRequestSent(final AbstractBody request) {
1424         assertUnlocked();
1425 
1426         BOSHMessageEvent event = null;
1427         for (BOSHClientRequestListener listener : requestListeners) {
1428             if (event == null) {
1429                 event = BOSHMessageEvent.createRequestSentEvent(this, request);
1430             }
1431             try {
1432                 listener.requestSent(event);
1433             } catch (Exception ex) {
1434                 LOG.log(Level.WARNING, UNHANDLED, ex);
1435             }
1436         }
1437     }
1438 
1439     /**
1440      * Notifies all response listeners that the specified response has been
1441      * received.
1442      *
1443      * @param response response received
1444      */
fireResponseReceived(final AbstractBody response)1445     private void fireResponseReceived(final AbstractBody response) {
1446         assertUnlocked();
1447 
1448         BOSHMessageEvent event = null;
1449         for (BOSHClientResponseListener listener : responseListeners) {
1450             if (event == null) {
1451                 event = BOSHMessageEvent.createResponseReceivedEvent(
1452                         this, response);
1453             }
1454             try {
1455                 listener.responseReceived(event);
1456             } catch (Exception ex) {
1457                 LOG.log(Level.WARNING, UNHANDLED, ex);
1458             }
1459         }
1460     }
1461 
1462     /**
1463      * Notifies all connection listeners that the session has been successfully
1464      * established.
1465      */
fireConnectionEstablished()1466     private void fireConnectionEstablished() {
1467         final boolean hadLock = lock.isHeldByCurrentThread();
1468         if (hadLock) {
1469             lock.unlock();
1470         }
1471         try {
1472             BOSHClientConnEvent event = null;
1473             for (BOSHClientConnListener listener : connListeners) {
1474                 if (event == null) {
1475                     event = BOSHClientConnEvent
1476                             .createConnectionEstablishedEvent(this);
1477                 }
1478                 try {
1479                     listener.connectionEvent(event);
1480                 } catch (Exception ex) {
1481                     LOG.log(Level.WARNING, UNHANDLED, ex);
1482                 }
1483             }
1484         } finally {
1485             if (hadLock) {
1486                 lock.lock();
1487             }
1488         }
1489     }
1490 
1491     /**
1492      * Notifies all connection listeners that the session has been
1493      * terminated normally.
1494      */
fireConnectionClosed()1495     private void fireConnectionClosed() {
1496         assertUnlocked();
1497 
1498         BOSHClientConnEvent event = null;
1499         for (BOSHClientConnListener listener : connListeners) {
1500             if (event == null) {
1501                 event = BOSHClientConnEvent.createConnectionClosedEvent(this);
1502             }
1503             try {
1504                 listener.connectionEvent(event);
1505             } catch (Exception ex) {
1506                 LOG.log(Level.WARNING, UNHANDLED, ex);
1507             }
1508         }
1509     }
1510 
1511     /**
1512      * Notifies all connection listeners that the session has been
1513      * terminated due to the exceptional condition provided.
1514      *
1515      * @param cause cause of the termination
1516      */
fireConnectionClosedOnError( final Throwable cause)1517     private void fireConnectionClosedOnError(
1518             final Throwable cause) {
1519         assertUnlocked();
1520 
1521         BOSHClientConnEvent event = null;
1522         for (BOSHClientConnListener listener : connListeners) {
1523             if (event == null) {
1524                 event = BOSHClientConnEvent
1525                         .createConnectionClosedOnErrorEvent(
1526                         this, pendingRequestAcks, cause);
1527             }
1528             try {
1529                 listener.connectionEvent(event);
1530             } catch (Exception ex) {
1531                 LOG.log(Level.WARNING, UNHANDLED, ex);
1532             }
1533         }
1534     }
1535 
1536 }
1537