• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2011 jMonkeyEngine
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  *   notice, this list of conditions and the following disclaimer.
11  *
12  * * Redistributions in binary form must reproduce the above copyright
13  *   notice, this list of conditions and the following disclaimer in the
14  *   documentation and/or other materials provided with the distribution.
15  *
16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17  *   may be used to endorse or promote products derived from this software
18  *   without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 package com.jme3.network.base;
34 
35 import com.jme3.network.*;
36 import com.jme3.network.kernel.Endpoint;
37 import com.jme3.network.kernel.Kernel;
38 import com.jme3.network.message.ChannelInfoMessage;
39 import com.jme3.network.message.ClientRegistrationMessage;
40 import com.jme3.network.message.DisconnectMessage;
41 import java.io.IOException;
42 import java.nio.ByteBuffer;
43 import java.util.*;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.CopyOnWriteArrayList;
46 import java.util.concurrent.atomic.AtomicInteger;
47 import java.util.logging.Level;
48 import java.util.logging.Logger;
49 
50 /**
51  *  A default implementation of the Server interface that delegates
52  *  its network connectivity to kernel.Kernel.
53  *
54  *  @version   $Revision: 9114 $
55  *  @author    Paul Speed
56  */
57 public class DefaultServer implements Server
58 {
59     static Logger log = Logger.getLogger(DefaultServer.class.getName());
60 
61     // First two channels are reserved for reliable and
62     // unreliable
63     private static final int CH_RELIABLE = 0;
64     private static final int CH_UNRELIABLE = 1;
65     private static final int CH_FIRST = 2;
66 
67     private boolean isRunning = false;
68     private AtomicInteger nextId = new AtomicInteger(0);
69     private String gameName;
70     private int version;
71     private KernelFactory kernelFactory = KernelFactory.DEFAULT;
72     private KernelAdapter reliableAdapter;
73     private KernelAdapter fastAdapter;
74     private List<KernelAdapter> channels = new ArrayList<KernelAdapter>();
75     private List<Integer> alternatePorts = new ArrayList<Integer>();
76     private Redispatch dispatcher = new Redispatch();
77     private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>();
78     private Map<Endpoint,HostedConnection> endpointConnections
79                             = new ConcurrentHashMap<Endpoint,HostedConnection>();
80 
81     // Keeps track of clients for whom we've only received the UDP
82     // registration message
83     private Map<Long,Connection> connecting = new ConcurrentHashMap<Long,Connection>();
84 
85     private MessageListenerRegistry<HostedConnection> messageListeners
86                             = new MessageListenerRegistry<HostedConnection>();
87     private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
88 
DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )89     public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
90     {
91         if( reliable == null )
92             throw new IllegalArgumentException( "Default server reqiures a reliable kernel instance." );
93 
94         this.gameName = gameName;
95         this.version = version;
96 
97         reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true );
98         channels.add( reliableAdapter );
99         if( fast != null ) {
100             fastAdapter = new KernelAdapter( this, fast, dispatcher, false );
101             channels.add( fastAdapter );
102         }
103     }
104 
getGameName()105     public String getGameName()
106     {
107         return gameName;
108     }
109 
getVersion()110     public int getVersion()
111     {
112         return version;
113     }
114 
addChannel( int port )115     public int addChannel( int port )
116     {
117         if( isRunning )
118             throw new IllegalStateException( "Channels cannot be added once server is started." );
119 
120         // Note: it does bug me that channels aren't 100% universal and
121         // setup externally but it requires a more invasive set of changes
122         // for "connection types" and some kind of registry of kernel and
123         // connector factories.  This really would be the best approach and
124         // would allow all kinds of channel customization maybe... but for
125         // now, we hard-code the standard connections and treat the +2 extras
126         // differently.
127 
128         // Check for consistency with the channels list
129         if( channels.size() - CH_FIRST != alternatePorts.size() )
130             throw new IllegalStateException( "Channel and port lists do not match." );
131 
132         try {
133             int result = alternatePorts.size();
134             alternatePorts.add(port);
135 
136             Kernel kernel = kernelFactory.createKernel(result, port);
137             channels.add( new KernelAdapter(this, kernel, dispatcher, true) );
138 
139             return result;
140         } catch( IOException e ) {
141             throw new RuntimeException( "Error adding channel for port:" + port, e );
142         }
143     }
144 
checkChannel( int channel )145     protected void checkChannel( int channel )
146     {
147         if( channel < 0 || channel >= alternatePorts.size() )
148             throw new IllegalArgumentException( "Channel is undefined:" + channel );
149     }
150 
start()151     public void start()
152     {
153         if( isRunning )
154             throw new IllegalStateException( "Server is already started." );
155 
156         // Initialize the kernels
157         for( KernelAdapter ka : channels ) {
158             ka.initialize();
159         }
160 
161         // Start em up
162         for( KernelAdapter ka : channels ) {
163             ka.start();
164         }
165 
166         isRunning = true;
167     }
168 
isRunning()169     public boolean isRunning()
170     {
171         return isRunning;
172     }
173 
close()174     public void close()
175     {
176         if( !isRunning )
177             throw new IllegalStateException( "Server is not started." );
178 
179         try {
180             // Kill the adpaters, they will kill the kernels
181             for( KernelAdapter ka : channels ) {
182                 ka.close();
183             }
184 
185             isRunning = false;
186         } catch( InterruptedException e ) {
187             throw new RuntimeException( "Interrupted while closing", e );
188         }
189     }
190 
broadcast( Message message )191     public void broadcast( Message message )
192     {
193         broadcast( null, message );
194     }
195 
broadcast( Filter<? super HostedConnection> filter, Message message )196     public void broadcast( Filter<? super HostedConnection> filter, Message message )
197     {
198         if( connections.isEmpty() )
199             return;
200 
201         ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
202 
203         FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
204 
205         if( message.isReliable() || fastAdapter == null ) {
206             // Don't need to copy the data because message protocol is already
207             // giving us a fresh buffer
208             reliableAdapter.broadcast( adapter, buffer, true, false );
209         } else {
210             fastAdapter.broadcast( adapter, buffer, false, false );
211         }
212     }
213 
broadcast( int channel, Filter<? super HostedConnection> filter, Message message )214     public void broadcast( int channel, Filter<? super HostedConnection> filter, Message message )
215     {
216         if( connections.isEmpty() )
217             return;
218 
219         checkChannel(channel);
220 
221         ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
222 
223         FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
224 
225         channels.get(channel+CH_FIRST).broadcast( adapter, buffer, true, false );
226     }
227 
getConnection( int id )228     public HostedConnection getConnection( int id )
229     {
230         return connections.get(id);
231     }
232 
hasConnections()233     public boolean hasConnections()
234     {
235         return !connections.isEmpty();
236     }
237 
getConnections()238     public Collection<HostedConnection> getConnections()
239     {
240         return Collections.unmodifiableCollection((Collection<HostedConnection>)connections.values());
241     }
242 
addConnectionListener( ConnectionListener listener )243     public void addConnectionListener( ConnectionListener listener )
244     {
245         connectionListeners.add(listener);
246     }
247 
removeConnectionListener( ConnectionListener listener )248     public void removeConnectionListener( ConnectionListener listener )
249     {
250         connectionListeners.remove(listener);
251     }
252 
addMessageListener( MessageListener<? super HostedConnection> listener )253     public void addMessageListener( MessageListener<? super HostedConnection> listener )
254     {
255         messageListeners.addMessageListener( listener );
256     }
257 
addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )258     public void addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
259     {
260         messageListeners.addMessageListener( listener, classes );
261     }
262 
removeMessageListener( MessageListener<? super HostedConnection> listener )263     public void removeMessageListener( MessageListener<? super HostedConnection> listener )
264     {
265         messageListeners.removeMessageListener( listener );
266     }
267 
removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )268     public void removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
269     {
270         messageListeners.removeMessageListener( listener, classes );
271     }
272 
dispatch( HostedConnection source, Message m )273     protected void dispatch( HostedConnection source, Message m )
274     {
275         if( source == null ) {
276             messageListeners.messageReceived( source, m );
277         } else {
278 
279             // A semi-heavy handed way to make sure the listener
280             // doesn't get called at the same time from two different
281             // threads for the same hosted connection.
282             synchronized( source ) {
283                 messageListeners.messageReceived( source, m );
284             }
285         }
286     }
287 
fireConnectionAdded( HostedConnection conn )288     protected void fireConnectionAdded( HostedConnection conn )
289     {
290         for( ConnectionListener l : connectionListeners ) {
291             l.connectionAdded( this, conn );
292         }
293     }
294 
fireConnectionRemoved( HostedConnection conn )295     protected void fireConnectionRemoved( HostedConnection conn )
296     {
297         for( ConnectionListener l : connectionListeners ) {
298             l.connectionRemoved( this, conn );
299         }
300     }
301 
getChannel( KernelAdapter ka )302     protected int getChannel( KernelAdapter ka )
303     {
304         return channels.indexOf(ka);
305     }
306 
registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m )307     protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m )
308     {
309         Connection addedConnection = null;
310 
311         // generally this will only be called by one thread but it's
312         // important enough I won't take chances
313         synchronized( this ) {
314             // Grab the random ID that the client created when creating
315             // its two registration messages
316             long tempId = m.getId();
317 
318             // See if we already have one
319             Connection c = connecting.remove(tempId);
320             if( c == null ) {
321                 c = new Connection(channels.size());
322                 log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p );
323             } else {
324                 log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p );
325             }
326 
327             // Fill in what we now know
328             int channel = getChannel(ka);
329             c.setChannel(channel, p);
330             log.log( Level.FINE, "Setting up channel:{0}", channel );
331 
332             // If it's channel 0 then this is the initial connection
333             // and we will send the connection information
334             if( channel == CH_RELIABLE ) {
335                 // Validate the name and version which is only sent
336                 // over the reliable connection at this point.
337                 if( !getGameName().equals(m.getGameName())
338                     || getVersion() != m.getVersion() ) {
339 
340                     log.log( Level.INFO, "Kicking client due to name/version mismatch:{0}.", c );
341 
342                     // Need to kick them off... I may regret doing this from within
343                     // the sync block but the alternative is more code
344                     c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion()
345                              + "  client:" + m.getGameName() + " v" + m.getVersion() );
346                     return;
347                 }
348 
349                 // Else send the extra channel information to the client
350                 if( !alternatePorts.isEmpty() ) {
351                     ChannelInfoMessage cim = new ChannelInfoMessage( m.getId(), alternatePorts );
352                     c.send(cim);
353                 }
354             }
355 
356             if( c.isComplete() ) {
357                 // Then we are fully connected
358                 if( connections.put( c.getId(), c ) == null ) {
359 
360                     for( Endpoint cp : c.channels ) {
361                         if( cp == null )
362                             continue;
363                         endpointConnections.put( cp, c );
364                     }
365 
366                     addedConnection = c;
367                 }
368             } else {
369                 // Need to keep getting channels so we'll keep it in
370                 // the map
371                 connecting.put(tempId, c);
372             }
373         }
374 
375         // Best to do this outside of the synch block to avoid
376         // over synchronizing which is the path to deadlocks
377         if( addedConnection != null ) {
378             log.log( Level.INFO, "Client registered:{0}.", addedConnection );
379 
380             // Send the ID back to the client letting it know it's
381             // fully connected.
382             m = new ClientRegistrationMessage();
383             m.setId( addedConnection.getId() );
384             m.setReliable(true);
385             addedConnection.send(m);
386 
387             // Now we can notify the listeners about the
388             // new connection.
389             fireConnectionAdded( addedConnection );
390         }
391     }
392 
getConnection( Endpoint endpoint )393     protected HostedConnection getConnection( Endpoint endpoint )
394     {
395         return endpointConnections.get(endpoint);
396     }
397 
connectionClosed( Endpoint p )398     protected void connectionClosed( Endpoint p )
399     {
400         if( p.isConnected() ) {
401             log.log( Level.INFO, "Connection closed:{0}.", p );
402         } else {
403             log.log( Level.FINE, "Connection closed:{0}.", p );
404         }
405 
406         // Try to find the endpoint in all ways that it might
407         // exist.  Note: by this point the raw network channel is
408         // closed already.
409 
410         // Also note: this method will be called multiple times per
411         // HostedConnection if it has multiple endpoints.
412 
413         Connection removed = null;
414         synchronized( this ) {
415             // Just in case the endpoint was still connecting
416             connecting.values().remove(p);
417 
418             // And the regular management
419             removed = (Connection)endpointConnections.remove(p);
420             if( removed != null ) {
421                 connections.remove( removed.getId() );
422             }
423 
424             log.log( Level.FINE, "Connections size:{0}", connections.size() );
425             log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() );
426         }
427 
428         // Better not to fire events while we hold a lock
429         // so always do this outside the synch block.
430         // Note: checking removed.closed just to avoid spurious log messages
431         //       since in general we are called back for every endpoint closing.
432         if( removed != null && !removed.closed ) {
433 
434             log.log( Level.INFO, "Client closed:{0}.", removed );
435 
436             removed.closeConnection();
437         }
438     }
439 
440     protected class Connection implements HostedConnection
441     {
442         private int id;
443         private boolean closed;
444         private Endpoint[] channels;
445         private int setChannelCount = 0;
446 
447         private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>();
448 
Connection( int channelCount )449         public Connection( int channelCount )
450         {
451             id = nextId.getAndIncrement();
452             channels = new Endpoint[channelCount];
453         }
454 
setChannel( int channel, Endpoint p )455         void setChannel( int channel, Endpoint p )
456         {
457             if( channels[channel] != null && channels[channel] != p ) {
458                 throw new RuntimeException( "Channel has already been set:" + channel
459                                             + " = " + channels[channel] + ", cannot be set to:" + p );
460             }
461             channels[channel] = p;
462             if( p != null )
463                 setChannelCount++;
464         }
465 
isComplete()466         boolean isComplete()
467         {
468             return setChannelCount == channels.length;
469         }
470 
getServer()471         public Server getServer()
472         {
473             return DefaultServer.this;
474         }
475 
getId()476         public int getId()
477         {
478             return id;
479         }
480 
getAddress()481         public String getAddress()
482         {
483             return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress();
484         }
485 
send( Message message )486         public void send( Message message )
487         {
488             ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
489             if( message.isReliable() || channels[CH_UNRELIABLE] == null ) {
490                 channels[CH_RELIABLE].send( buffer );
491             } else {
492                 channels[CH_UNRELIABLE].send( buffer );
493             }
494         }
495 
send( int channel, Message message )496         public void send( int channel, Message message )
497         {
498             checkChannel(channel);
499             ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
500             channels[channel+CH_FIRST].send(buffer);
501         }
502 
closeConnection()503         protected void closeConnection()
504         {
505             if( closed )
506                 return;
507             closed = true;
508 
509             // Make sure all endpoints are closed.  Note: reliable
510             // should always already be closed through all paths that I
511             // can conceive... but it doesn't hurt to be sure.
512             for( Endpoint p : channels ) {
513                 if( p == null )
514                     continue;
515                 p.close();
516             }
517 
518             fireConnectionRemoved( this );
519         }
520 
close( String reason )521         public void close( String reason )
522         {
523             // Send a reason
524             DisconnectMessage m = new DisconnectMessage();
525             m.setType( DisconnectMessage.KICK );
526             m.setReason( reason );
527             m.setReliable( true );
528             send( m );
529 
530             // Just close the reliable endpoint
531             // fast will be cleaned up as a side-effect
532             // when closeConnection() is called by the
533             // connectionClosed() endpoint callback.
534             if( channels[CH_RELIABLE] != null ) {
535                 // Close with flush so we make sure our
536                 // message gets out
537                 channels[CH_RELIABLE].close(true);
538             }
539         }
540 
setAttribute( String name, Object value )541         public Object setAttribute( String name, Object value )
542         {
543             if( value == null )
544                 return sessionData.remove(name);
545             return sessionData.put(name, value);
546         }
547 
548         @SuppressWarnings("unchecked")
getAttribute( String name )549         public <T> T getAttribute( String name )
550         {
551             return (T)sessionData.get(name);
552         }
553 
attributeNames()554         public Set<String> attributeNames()
555         {
556             return Collections.unmodifiableSet(sessionData.keySet());
557         }
558 
toString()559         public String toString()
560         {
561             return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE]
562                                      + ", fast=" + channels[CH_UNRELIABLE] + " ]";
563         }
564     }
565 
566     protected class Redispatch implements MessageListener<HostedConnection>
567     {
messageReceived( HostedConnection source, Message m )568         public void messageReceived( HostedConnection source, Message m )
569         {
570             dispatch( source, m );
571         }
572     }
573 
574     protected class FilterAdapter implements Filter<Endpoint>
575     {
576         private Filter<? super HostedConnection> delegate;
577 
FilterAdapter( Filter<? super HostedConnection> delegate )578         public FilterAdapter( Filter<? super HostedConnection> delegate )
579         {
580             this.delegate = delegate;
581         }
582 
apply( Endpoint input )583         public boolean apply( Endpoint input )
584         {
585             HostedConnection conn = getConnection( input );
586             if( conn == null )
587                 return false;
588             return delegate.apply(conn);
589         }
590     }
591 }
592