• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * $RCSfile$
3  * $Revision$
4  * $Date$
5  *
6  * Copyright 2003-2006 Jive Software.
7  *
8  * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 package org.jivesoftware.smackx.filetransfer;
21 
22 import org.jivesoftware.smack.PacketCollector;
23 import org.jivesoftware.smack.SmackConfiguration;
24 import org.jivesoftware.smack.Connection;
25 import org.jivesoftware.smack.XMPPException;
26 import org.jivesoftware.smack.filter.OrFilter;
27 import org.jivesoftware.smack.filter.PacketFilter;
28 import org.jivesoftware.smack.packet.Packet;
29 import org.jivesoftware.smackx.packet.StreamInitiation;
30 
31 import java.io.InputStream;
32 import java.io.OutputStream;
33 import java.util.concurrent.*;
34 import java.util.List;
35 import java.util.ArrayList;
36 
37 
38 /**
39  * The fault tolerant negotiator takes two stream negotiators, the primary and the secondary
40  * negotiator. If the primary negotiator fails during the stream negotiaton process, the second
41  * negotiator is used.
42  */
43 public class FaultTolerantNegotiator extends StreamNegotiator {
44 
45     private StreamNegotiator primaryNegotiator;
46     private StreamNegotiator secondaryNegotiator;
47     private Connection connection;
48     private PacketFilter primaryFilter;
49     private PacketFilter secondaryFilter;
50 
FaultTolerantNegotiator(Connection connection, StreamNegotiator primary, StreamNegotiator secondary)51     public FaultTolerantNegotiator(Connection connection, StreamNegotiator primary,
52             StreamNegotiator secondary) {
53         this.primaryNegotiator = primary;
54         this.secondaryNegotiator = secondary;
55         this.connection = connection;
56     }
57 
getInitiationPacketFilter(String from, String streamID)58     public PacketFilter getInitiationPacketFilter(String from, String streamID) {
59         if (primaryFilter == null || secondaryFilter == null) {
60             primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID);
61             secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID);
62         }
63         return new OrFilter(primaryFilter, secondaryFilter);
64     }
65 
negotiateIncomingStream(Packet streamInitiation)66     InputStream negotiateIncomingStream(Packet streamInitiation) throws XMPPException {
67         throw new UnsupportedOperationException("Negotiation only handled by create incoming " +
68                 "stream method.");
69     }
70 
initiateIncomingStream(Connection connection, StreamInitiation initiation)71     final Packet initiateIncomingStream(Connection connection, StreamInitiation initiation) {
72         throw new UnsupportedOperationException("Initiation handled by createIncomingStream " +
73                 "method");
74     }
75 
createIncomingStream(StreamInitiation initiation)76     public InputStream createIncomingStream(StreamInitiation initiation) throws XMPPException {
77         PacketCollector collector = connection.createPacketCollector(
78                 getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()));
79 
80         connection.sendPacket(super.createInitiationAccept(initiation, getNamespaces()));
81 
82         ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);
83         CompletionService<InputStream> service
84                 = new ExecutorCompletionService<InputStream>(threadPoolExecutor);
85         List<Future<InputStream>> futures = new ArrayList<Future<InputStream>>();
86         InputStream stream = null;
87         XMPPException exception = null;
88         try {
89             futures.add(service.submit(new NegotiatorService(collector)));
90             futures.add(service.submit(new NegotiatorService(collector)));
91 
92             int i = 0;
93             while (stream == null && i < futures.size()) {
94                 Future<InputStream> future;
95                 try {
96                     i++;
97                     future = service.poll(10, TimeUnit.SECONDS);
98                 }
99                 catch (InterruptedException e) {
100                     continue;
101                 }
102 
103                 if (future == null) {
104                     continue;
105                 }
106 
107                 try {
108                     stream = future.get();
109                 }
110                 catch (InterruptedException e) {
111                     /* Do Nothing */
112                 }
113                 catch (ExecutionException e) {
114                     exception = new XMPPException(e.getCause());
115                 }
116             }
117         }
118         finally {
119             for (Future<InputStream> future : futures) {
120                 future.cancel(true);
121             }
122             collector.cancel();
123             threadPoolExecutor.shutdownNow();
124         }
125         if (stream == null) {
126             if (exception != null) {
127                 throw exception;
128             }
129             else {
130                 throw new XMPPException("File transfer negotiation failed.");
131             }
132         }
133 
134         return stream;
135     }
136 
determineNegotiator(Packet streamInitiation)137     private StreamNegotiator determineNegotiator(Packet streamInitiation) {
138         return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator;
139     }
140 
createOutgoingStream(String streamID, String initiator, String target)141     public OutputStream createOutgoingStream(String streamID, String initiator, String target)
142             throws XMPPException {
143         OutputStream stream;
144         try {
145             stream = primaryNegotiator.createOutgoingStream(streamID, initiator, target);
146         }
147         catch (XMPPException ex) {
148             stream = secondaryNegotiator.createOutgoingStream(streamID, initiator, target);
149         }
150 
151         return stream;
152     }
153 
getNamespaces()154     public String[] getNamespaces() {
155         String[] primary = primaryNegotiator.getNamespaces();
156         String[] secondary = secondaryNegotiator.getNamespaces();
157 
158         String[] namespaces = new String[primary.length + secondary.length];
159         System.arraycopy(primary, 0, namespaces, 0, primary.length);
160         System.arraycopy(secondary, 0, namespaces, primary.length, secondary.length);
161 
162         return namespaces;
163     }
164 
cleanup()165     public void cleanup() {
166     }
167 
168     private class NegotiatorService implements Callable<InputStream> {
169 
170         private PacketCollector collector;
171 
NegotiatorService(PacketCollector collector)172         NegotiatorService(PacketCollector collector) {
173             this.collector = collector;
174         }
175 
call()176         public InputStream call() throws Exception {
177             Packet streamInitiation = collector.nextResult(
178                     SmackConfiguration.getPacketReplyTimeout() * 2);
179             if (streamInitiation == null) {
180                 throw new XMPPException("No response from remote client");
181             }
182             StreamNegotiator negotiator = determineNegotiator(streamInitiation);
183             return negotiator.negotiateIncomingStream(streamInitiation);
184         }
185     }
186 }
187