• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2011 jMonkeyEngine
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  *   notice, this list of conditions and the following disclaimer.
11  *
12  * * Redistributions in binary form must reproduce the above copyright
13  *   notice, this list of conditions and the following disclaimer in the
14  *   documentation and/or other materials provided with the distribution.
15  *
16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17  *   may be used to endorse or promote products derived from this software
18  *   without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 package com.jme3.network.base;
34 
35 import com.jme3.network.ErrorListener;
36 import com.jme3.network.Message;
37 import com.jme3.network.MessageListener;
38 import com.jme3.network.kernel.Connector;
39 import com.jme3.network.kernel.ConnectorException;
40 import java.nio.ByteBuffer;
41 import java.util.concurrent.ArrayBlockingQueue;
42 import java.util.concurrent.BlockingQueue;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 
45 /**
46  *  Wraps a single Connector and forwards new messages
47  *  to the supplied message dispatcher.  This is used
48  *  by DefaultClient to manage its connector objects.
49  *  This is only responsible for message reading and provides
50  *  no support for buffering writes.
51  *
52  *  <p>This adapter assumes a simple protocol where two
53  *  bytes define a (short) object size with the object data
54  *  to follow.  Note: this limits the size of serialized
55  *  objects to 32676 bytes... even though, for example,
56  *  datagram packets can hold twice that. :P</p>
57  *
58  *  @version   $Revision: 8944 $
59  *  @author    Paul Speed
60  */
61 public class ConnectorAdapter extends Thread
62 {
63     private static final int OUTBOUND_BACKLOG = 16000;
64 
65     private Connector connector;
66     private MessageListener<Object> dispatcher;
67     private ErrorListener<Object> errorHandler;
68     private AtomicBoolean go = new AtomicBoolean(true);
69 
70     private BlockingQueue<ByteBuffer> outbound;
71 
72     // Writes messages out on a background thread
73     private WriterThread writer;
74 
75     // Marks the messages as reliable or not if they came
76     // through this connector.
77     private boolean reliable;
78 
ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher, ErrorListener<Object> errorHandler, boolean reliable )79     public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher,
80                              ErrorListener<Object> errorHandler, boolean reliable )
81     {
82         super( String.valueOf(connector) );
83         this.connector = connector;
84         this.dispatcher = dispatcher;
85         this.errorHandler = errorHandler;
86         this.reliable = reliable;
87         setDaemon(true);
88 
89         // The backlog makes sure that the outbound channel blocks once
90         // a certain backlog level is reached.  It is set high so that it
91         // is only reached in the worst cases... which are usually things like
92         // raw throughput tests.  Technically, a saturated TCP channel could
93         // back up quite a bit if the buffers are full and the socket has
94         // stalled but 16,000 messages is still a big backlog.
95         outbound = new ArrayBlockingQueue<ByteBuffer>(OUTBOUND_BACKLOG);
96 
97         // Note: this technically adds a potential deadlock case
98         // with the above code where there wasn't one before.  For example,
99         // if a TCP outbound queue fills to capacity and a client sends
100         // in such a way that they block TCP message handling then if the HostedConnection
101         // on the server is similarly blocked then the TCP network buffers may
102         // all get full and no outbound messages move and we forever block
103         // on the queue.
104         // However, in practice this can't really happen... or at least it's
105         // the sign of other really bad things.
106         // First, currently the server-side outbound queues are all unbounded and
107         // so won't ever block the handling of messages if the outbound channel is full.
108         // Second, there would have to be a huge amount of data backlog for this
109         // to ever occur anyway.
110         // Third, it's a sign of a really poor architecture if 16,000 messages
111         // can go out in a way that blocks reads.
112 
113         writer = new WriterThread();
114         writer.start();
115     }
116 
close()117     public void close()
118     {
119         go.set(false);
120 
121         // Kill the writer service
122         writer.shutdown();
123 
124         if( connector.isConnected() )
125             {
126             // Kill the connector
127             connector.close();
128             }
129     }
130 
dispatch( Message m )131     protected void dispatch( Message m )
132     {
133         dispatcher.messageReceived( null, m );
134     }
135 
write( ByteBuffer data )136     public void write( ByteBuffer data )
137     {
138         try {
139             outbound.put( data );
140         } catch( InterruptedException e ) {
141             throw new RuntimeException( "Interrupted while waiting for queue to drain", e );
142         }
143     }
144 
handleError( Exception e )145     protected void handleError( Exception e )
146     {
147         if( !go.get() )
148             return;
149 
150         errorHandler.handleError( this, e );
151     }
152 
run()153     public void run()
154     {
155         MessageProtocol protocol = new MessageProtocol();
156 
157         try {
158             while( go.get() ) {
159                 ByteBuffer buffer = connector.read();
160                 if( buffer == null ) {
161                     if( go.get() ) {
162                         throw new ConnectorException( "Connector closed." );
163                     } else {
164                         // Just dump out because a null buffer is expected
165                         // from a closed/closing connector
166                         break;
167                     }
168                 }
169 
170                 protocol.addBuffer( buffer );
171 
172                 Message m = null;
173                 while( (m = protocol.getMessage()) != null ) {
174                     m.setReliable( reliable );
175                     dispatch( m );
176                 }
177             }
178         } catch( Exception e ) {
179             handleError( e );
180         }
181     }
182 
183     protected class WriterThread extends Thread
184     {
WriterThread()185         public WriterThread()
186         {
187             super( String.valueOf(connector) + "-writer" );
188         }
189 
shutdown()190         public void shutdown()
191         {
192             interrupt();
193         }
194 
write( ByteBuffer data )195         private void write( ByteBuffer data )
196         {
197             try {
198                 connector.write(data);
199             } catch( Exception e ) {
200                 handleError( e );
201             }
202         }
203 
run()204         public void run()
205         {
206             while( go.get() ) {
207                 try {
208                     ByteBuffer data = outbound.take();
209                     write(data);
210                 } catch( InterruptedException e ) {
211                     if( !go.get() )
212                         return;
213                     throw new RuntimeException( "Interrupted waiting for data", e );
214                 }
215             }
216         }
217     }
218 }
219