• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2011 jMonkeyEngine
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  *   notice, this list of conditions and the following disclaimer.
11  *
12  * * Redistributions in binary form must reproduce the above copyright
13  *   notice, this list of conditions and the following disclaimer in the
14  *   documentation and/or other materials provided with the distribution.
15  *
16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17  *   may be used to endorse or promote products derived from this software
18  *   without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 package com.jme3.network.base;
34 
35 import com.jme3.network.Filter;
36 import com.jme3.network.HostedConnection;
37 import com.jme3.network.Message;
38 import com.jme3.network.MessageListener;
39 import com.jme3.network.kernel.Endpoint;
40 import com.jme3.network.kernel.EndpointEvent;
41 import com.jme3.network.kernel.Envelope;
42 import com.jme3.network.kernel.Kernel;
43 import com.jme3.network.message.ClientRegistrationMessage;
44 import java.nio.ByteBuffer;
45 import java.util.Map;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.logging.Level;
49 import java.util.logging.Logger;
50 
51 /**
52  *  Wraps a single Kernel and forwards new messages
53  *  to the supplied message dispatcher and new endpoint
54  *  events to the connection dispatcher.  This is used
55  *  by DefaultServer to manage its kernel objects.
56  *
57  *  <p>This adapter assumes a simple protocol where two
58  *  bytes define a (short) object size with the object data
59  *  to follow.  Note: this limits the size of serialized
60  *  objects to 32676 bytes... even though, for example,
61  *  datagram packets can hold twice that. :P</p>
62  *
63  *  @version   $Revision: 8944 $
64  *  @author    Paul Speed
65  */
66 public class KernelAdapter extends Thread
67 {
68     static Logger log = Logger.getLogger(KernelAdapter.class.getName());
69 
70     private DefaultServer server; // this is unfortunate
71     private Kernel kernel;
72     private MessageListener<HostedConnection> messageDispatcher;
73     private AtomicBoolean go = new AtomicBoolean(true);
74 
75     // Keeps track of the in-progress messages that are received
76     // on reliable connections
77     private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>();
78 
79     // Marks the messages as reliable or not if they came
80     // through this connector.
81     private boolean reliable;
82 
KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher, boolean reliable )83     public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher,
84                           boolean reliable )
85     {
86         super( String.valueOf(kernel) );
87         this.server = server;
88         this.kernel = kernel;
89         this.messageDispatcher = messageDispatcher;
90         this.reliable = reliable;
91         setDaemon(true);
92     }
93 
getKernel()94     public Kernel getKernel()
95     {
96         return kernel;
97     }
98 
initialize()99     public void initialize()
100     {
101         kernel.initialize();
102     }
103 
broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, boolean copy )104     public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
105                            boolean copy )
106     {
107         kernel.broadcast( filter, data, reliable, copy );
108     }
109 
close()110     public void close() throws InterruptedException
111     {
112         go.set(false);
113 
114         // Kill the kernel
115         kernel.terminate();
116     }
117 
reportError( Endpoint p, Object context, Exception e )118     protected void reportError( Endpoint p, Object context, Exception e )
119     {
120         // Should really be queued up so the outer thread can
121         // retrieve them.  For now we'll just log it.  FIXME
122         log.log( Level.SEVERE, "Unhandled error, endpoint:" + p + ", context:" + context, e );
123 
124         // In lieu of other options, at least close the endpoint
125         p.close();
126     }
127 
getConnection( Endpoint p )128     protected HostedConnection getConnection( Endpoint p )
129     {
130         return server.getConnection(p);
131     }
132 
connectionClosed( Endpoint p )133     protected void connectionClosed( Endpoint p )
134     {
135         // Remove any message buffer we've been accumulating
136         // on behalf of this endpoing
137         messageBuffers.remove(p);
138 
139         log.log( Level.FINE, "Buffers size:{0}", messageBuffers.size() );
140 
141         server.connectionClosed(p);
142     }
143 
144     /**
145      *  Note on threading for those writing their own server
146      *  or adapter implementations.  The rule that a single connection be
147      *  processed by only one thread at a time is more about ensuring that
148      *  the messages are delivered in the order that they are received
149      *  than for any user-code safety.  99% of the time the user code should
150      *  be writing for multithreaded access anyway.
151      *
152      *  <p>The issue with the messages is that if a an implementation is
153      *  using a general thread pool then it would be possible for a
154      *  naive implementation to have one thread grab an Envelope from
155      *  connection 1's and another grab the next Envelope.  Since an Envelope
156      *  may contain several messages, delivering the second thread's messages
157      *  before or during the first's would be really confusing and hard
158      *  to code for in user code.</p>
159      *
160      *  <p>And that's why this note is here.  DefaultServer does a rudimentary
161      *  per-connection locking but it couldn't possibly guard against
162      *  out of order Envelope processing.</p>
163      */
dispatch( Endpoint p, Message m )164     protected void dispatch( Endpoint p, Message m )
165     {
166         // Because this class is the only one with the information
167         // to do it... we need to pull of the registration message
168         // here.
169         if( m instanceof ClientRegistrationMessage ) {
170             server.registerClient( this, p, (ClientRegistrationMessage)m );
171             return;
172         }
173 
174         try {
175             HostedConnection source = getConnection(p);
176             if( source == null ) {
177                 if( reliable ) {
178                     // If it's a reliable connection then it's slightly more
179                     // concerning but this can happen all the time for a UDP endpoint.
180                     log.log( Level.WARNING, "Recieved message from unconnected endpoint:" + p + "  message:" + m );
181                 }
182                 return;
183             }
184             messageDispatcher.messageReceived( source, m );
185         } catch( Exception e ) {
186             reportError(p, m, e);
187         }
188     }
189 
getMessageBuffer( Endpoint p )190     protected MessageProtocol getMessageBuffer( Endpoint p )
191     {
192         if( !reliable ) {
193             // Since UDP comes in packets and they aren't split
194             // up, there is no reason to buffer.  In fact, there would
195             // be a down side because there is no way for us to reliably
196             // clean these up later since we'd create another one for
197             // any random UDP packet that comes to the port.
198             return new MessageProtocol();
199         } else {
200             // See if we already have one
201             MessageProtocol result = messageBuffers.get(p);
202             if( result == null ) {
203                 result = new MessageProtocol();
204                 messageBuffers.put(p, result);
205             }
206             return result;
207         }
208     }
209 
createAndDispatch( Envelope env )210     protected void createAndDispatch( Envelope env )
211     {
212         MessageProtocol protocol = getMessageBuffer(env.getSource());
213 
214         byte[] data = env.getData();
215         ByteBuffer buffer = ByteBuffer.wrap(data);
216 
217         int count = protocol.addBuffer( buffer );
218         if( count == 0 ) {
219             // This can happen if there was only a partial message
220             // received.  However, this should never happen for unreliable
221             // connections.
222             if( !reliable ) {
223                 // Log some additional information about the packet.
224                 int len = Math.min( 10, data.length );
225                 StringBuilder sb = new StringBuilder();
226                 for( int i = 0; i < len; i++ ) {
227                     sb.append( "[" + Integer.toHexString(data[i]) + "]" );
228                 }
229                 log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb );
230                 throw new RuntimeException( "Envelope contained incomplete data:" + env );
231             }
232         }
233 
234         // Should be complete... and maybe we should check but we don't
235         Message m = null;
236         while( (m = protocol.getMessage()) != null ) {
237             m.setReliable(reliable);
238             dispatch( env.getSource(), m );
239         }
240     }
241 
createAndDispatch( EndpointEvent event )242     protected void createAndDispatch( EndpointEvent event )
243     {
244         // Only need to tell the server about disconnects
245         if( event.getType() == EndpointEvent.Type.REMOVE ) {
246             connectionClosed( event.getEndpoint() );
247         }
248     }
249 
flushEvents()250     protected void flushEvents()
251     {
252         EndpointEvent event;
253         while( (event = kernel.nextEvent()) != null ) {
254             try {
255                 createAndDispatch( event );
256             } catch( Exception e ) {
257                 reportError(event.getEndpoint(), event, e);
258             }
259         }
260     }
261 
run()262     public void run()
263     {
264         while( go.get() ) {
265 
266             try {
267                 // Check for pending events
268                 flushEvents();
269 
270                 // Grab the next envelope
271                 Envelope e = kernel.read();
272                 if( e == Kernel.EVENTS_PENDING )
273                     continue; // We'll catch it up above
274 
275                 // Check for pending events that might have
276                 // come in while we were blocking.  This is usually
277                 // when the connection add events come through
278                 flushEvents();
279 
280                 try {
281                     createAndDispatch( e );
282                 } catch( Exception ex ) {
283                     reportError(e.getSource(), e, ex);
284                 }
285 
286             } catch( InterruptedException ex ) {
287                 if( !go.get() )
288                     return;
289                 throw new RuntimeException( "Unexpected interruption", ex );
290             }
291         }
292     }
293 
294 }
295 
296 
297