• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2011 jMonkeyEngine
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  *   notice, this list of conditions and the following disclaimer.
11  *
12  * * Redistributions in binary form must reproduce the above copyright
13  *   notice, this list of conditions and the following disclaimer in the
14  *   documentation and/or other materials provided with the distribution.
15  *
16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17  *   may be used to endorse or promote products derived from this software
18  *   without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 package com.jme3.network.kernel.tcp;
34 
35 import com.jme3.network.Filter;
36 import com.jme3.network.kernel.*;
37 import java.io.IOException;
38 import java.net.InetAddress;
39 import java.net.InetSocketAddress;
40 import java.net.Socket;
41 import java.nio.ByteBuffer;
42 import java.nio.channels.*;
43 import java.nio.channels.spi.SelectorProvider;
44 import java.util.Iterator;
45 import java.util.Map;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.logging.Level;
49 import java.util.logging.Logger;
50 
51 
52 /**
53  *  A Kernel implementation based on NIO selectors.
54  *
55  *  @version   $Revision: 8944 $
56  *  @author    Paul Speed
57  */
58 public class SelectorKernel extends AbstractKernel
59 {
60     static Logger log = Logger.getLogger(SelectorKernel.class.getName());
61 
62     private InetSocketAddress address;
63     private SelectorThread thread;
64 
65     private Map<Long,NioEndpoint> endpoints = new ConcurrentHashMap<Long,NioEndpoint>();
66 
SelectorKernel( InetAddress host, int port )67     public SelectorKernel( InetAddress host, int port )
68     {
69         this( new InetSocketAddress(host, port) );
70     }
71 
SelectorKernel( int port )72     public SelectorKernel( int port ) throws IOException
73     {
74         this( new InetSocketAddress(port) );
75     }
76 
SelectorKernel( InetSocketAddress address )77     public SelectorKernel( InetSocketAddress address )
78     {
79         this.address = address;
80     }
81 
createSelectorThread()82     protected SelectorThread createSelectorThread()
83     {
84         return new SelectorThread();
85     }
86 
initialize()87     public void initialize()
88     {
89         if( thread != null )
90             throw new IllegalStateException( "Kernel already initialized." );
91 
92         thread = createSelectorThread();
93 
94         try {
95             thread.connect();
96             thread.start();
97         } catch( IOException e ) {
98             throw new KernelException( "Error hosting:" + address, e );
99         }
100     }
101 
terminate()102     public void terminate() throws InterruptedException
103     {
104         if( thread == null )
105             throw new IllegalStateException( "Kernel not initialized." );
106 
107         try {
108             thread.close();
109             thread = null;
110         } catch( IOException e ) {
111             throw new KernelException( "Error closing host connection:" + address, e );
112         }
113     }
114 
broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, boolean copy )115     public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
116                            boolean copy )
117     {
118         if( !reliable )
119             throw new UnsupportedOperationException( "Unreliable send not supported by this kernel." );
120 
121         if( copy ) {
122             // Copy the data just once
123             byte[] temp = new byte[data.remaining()];
124             System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
125             data = ByteBuffer.wrap(temp);
126         }
127 
128         // Hand it to all of the endpoints that match our routing
129         for( NioEndpoint p : endpoints.values() ) {
130             // Does it match the filter?
131             if( filter != null && !filter.apply(p) )
132                 continue;
133 
134             // Give it the data... but let each endpoint track their
135             // own completion over the shared array of bytes by
136             // duplicating it
137             p.send( data.duplicate(), false, false );
138         }
139 
140         // Wake up the selector so it can reinitialize its
141         // state accordingly.
142         wakeupSelector();
143     }
144 
addEndpoint( SocketChannel c )145     protected NioEndpoint addEndpoint( SocketChannel c )
146     {
147         // Note: we purposely do NOT put the key in the endpoint.
148         //       SelectionKeys are dangerous outside the selector thread
149         //       and this is safer.
150         NioEndpoint p = new NioEndpoint( this, nextEndpointId(), c );
151 
152         endpoints.put( p.getId(), p );
153 
154         // Enqueue an endpoint event for the listeners
155         addEvent( EndpointEvent.createAdd( this, p ) );
156 
157         return p;
158     }
159 
removeEndpoint( NioEndpoint p, SocketChannel c )160     protected void removeEndpoint( NioEndpoint p, SocketChannel c )
161     {
162         endpoints.remove( p.getId() );
163         log.log( Level.FINE, "Endpoints size:{0}", endpoints.size() );
164 
165         // Enqueue an endpoint event for the listeners
166         addEvent( EndpointEvent.createRemove( this, p ) );
167 
168         // If there are no pending messages then add one so that the
169         // kernel-user knows to wake up if it is only listening for
170         // envelopes.
171         if( !hasEnvelopes() ) {
172             // Note: this is not really a race condition.  At worst, our
173             // event has already been handled by now and it does no harm
174             // to check again.
175             addEnvelope( EVENTS_PENDING );
176         }
177     }
178 
179     /**
180      *  Called by the endpoints when they need to be closed.
181      */
closeEndpoint( NioEndpoint p )182     protected void closeEndpoint( NioEndpoint p ) throws IOException
183     {
184         //log.log( Level.INFO, "Closing endpoint:{0}.", p );
185 
186         thread.cancel(p);
187     }
188 
189     /**
190      *  Used internally by the endpoints to wakeup the selector
191      *  when they have data to send.
192      */
wakeupSelector()193     protected void wakeupSelector()
194     {
195         thread.wakeupSelector();
196     }
197 
newData( NioEndpoint p, SocketChannel c, ByteBuffer shared, int size )198     protected void newData( NioEndpoint p, SocketChannel c, ByteBuffer shared, int size )
199     {
200         // Note: if ever desirable, it would be possible to accumulate
201         //       data per source channel and only 'finalize' it when
202         //       asked for more envelopes then were ready.  I just don't
203         //       think it will be an issue in practice.  The busier the
204         //       server, the more the buffers will fill before we get to them.
205         //       And if the server isn't busy, who cares if we chop things up
206         //       smaller... the network is still likely to deliver things in
207         //       bulk anyway.
208 
209         // Must copy the shared data before we use it
210         byte[] dataCopy = new byte[size];
211 		System.arraycopy(shared.array(), 0, dataCopy, 0, size);
212 
213         Envelope env = new Envelope( p, dataCopy, true );
214         addEnvelope( env );
215     }
216 
217     /**
218      *  This class is purposely tucked neatly away because
219      *  messing with the selector from other threads for any
220      *  reason is very bad.  This is the safest architecture.
221      */
222     protected class SelectorThread extends Thread
223     {
224         private ServerSocketChannel serverChannel;
225         private Selector selector;
226         private AtomicBoolean go = new AtomicBoolean(true);
227         private ByteBuffer working = ByteBuffer.allocate( 8192 );
228 
229         /**
230          *  Because we want to keep the keys to ourselves, we'll do
231          *  the endpoint -> key mapping internally.
232          */
233         private Map<NioEndpoint,SelectionKey> endpointKeys = new ConcurrentHashMap<NioEndpoint,SelectionKey>();
234 
SelectorThread()235         public SelectorThread()
236         {
237             setName( "Selector@" + address );
238             setDaemon(true);
239         }
240 
connect()241         public void connect() throws IOException
242         {
243             // Create a new selector
244             this.selector = SelectorProvider.provider().openSelector();
245 
246             // Create a new non-blocking server socket channel
247             this.serverChannel = ServerSocketChannel.open();
248             serverChannel.configureBlocking(false);
249 
250             // Bind the server socket to the specified address and port
251             serverChannel.socket().bind(address);
252 
253             // Register the server socket channel, indicating an interest in
254             // accepting new connections
255             serverChannel.register(selector, SelectionKey.OP_ACCEPT);
256 
257             log.log( Level.INFO, "Hosting TCP connection:{0}.", address );
258         }
259 
close()260         public void close() throws IOException, InterruptedException
261         {
262             // Set the thread to stop
263             go.set(false);
264 
265             // Make sure the channel is closed
266             serverChannel.close();
267 
268             // Force the selector to stop blocking
269             wakeupSelector();
270 
271             // And wait for it
272             join();
273         }
274 
wakeupSelector()275         protected void wakeupSelector()
276         {
277             selector.wakeup();
278         }
279 
setupSelectorOptions()280         protected void setupSelectorOptions()
281         {
282             // For now, selection keys will either be in OP_READ
283             // or OP_WRITE.  So while we are writing a buffer, we
284             // will not be reading.  This is way simpler and less
285             // error prone... it can always be changed when everything
286             // else works if we are looking to micro-optimize.
287 
288             // Setup options based on the current state of
289             // the endpoints.  This could potentially be more
290             // efficiently done as change requests... or simply
291             // keeping a thread-safe set of endpoints with pending
292             // writes.  For most cases, it shouldn't matter.
293             for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) {
294                 if( e.getKey().hasPending() ) {
295                     e.getValue().interestOps(SelectionKey.OP_WRITE);
296                 }
297             }
298         }
299 
accept( SelectionKey key )300         protected void accept( SelectionKey key ) throws IOException
301         {
302             // Would only get accepts on a server channel
303             ServerSocketChannel serverChan = (ServerSocketChannel)key.channel();
304 
305             // Setup the connection to be non-blocking
306             SocketChannel remoteChan = serverChan.accept();
307             remoteChan.configureBlocking(false);
308 
309             // And disable Nagle's buffering algorithm... we want
310             // data to go when we put it there.
311             Socket sock = remoteChan.socket();
312             sock.setTcpNoDelay(true);
313 
314             // Let the selector know we're interested in reading
315             // data from the channel
316             SelectionKey endKey = remoteChan.register( selector, SelectionKey.OP_READ );
317 
318             // And now create a new endpoint
319             NioEndpoint p = addEndpoint( remoteChan );
320             endKey.attach(p);
321             endpointKeys.put(p, endKey);
322         }
323 
cancel( NioEndpoint p )324         protected void cancel( NioEndpoint p ) throws IOException
325         {
326             SelectionKey key = endpointKeys.remove(p);
327             if( key == null ) {
328                 //log.log( Level.INFO, "Endpoint already closed:{0}.", p );
329                 return;  // already closed it
330             }
331             log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() );
332 
333             log.log( Level.INFO, "Closing endpoint:{0}.", p );
334             SocketChannel c = (SocketChannel)key.channel();
335 
336             // Note: key.cancel() is specifically thread safe.  One of
337             //       the few things one can do with a key from another
338             //       thread.
339             key.cancel();
340             c.close();
341             removeEndpoint( p, c );
342         }
343 
cancel( SelectionKey key, SocketChannel c )344         protected void cancel( SelectionKey key, SocketChannel c ) throws IOException
345         {
346             NioEndpoint p = (NioEndpoint)key.attachment();
347             log.log( Level.INFO, "Closing channel endpoint:{0}.", p );
348             Object o = endpointKeys.remove(p);
349 
350             log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() );
351 
352             key.cancel();
353             c.close();
354             removeEndpoint( p, c );
355         }
356 
read( SelectionKey key )357         protected void read( SelectionKey key ) throws IOException
358         {
359             NioEndpoint p = (NioEndpoint)key.attachment();
360             SocketChannel c = (SocketChannel)key.channel();
361             working.clear();
362 
363             int size;
364             try {
365                 size = c.read(working);
366             } catch( IOException e ) {
367                 // The remove end forcibly closed the connection...
368                 // close out our end and cancel the key
369                 cancel( key, c );
370                 return;
371             }
372 
373             if( size == -1 ) {
374                 // The remote end shut down cleanly...
375                 // close out our end and cancel the key
376                 cancel( key, c );
377                 return;
378             }
379 
380             newData( p, c, working, size );
381         }
382 
write( SelectionKey key )383         protected void write( SelectionKey key ) throws IOException
384         {
385             NioEndpoint p = (NioEndpoint)key.attachment();
386             SocketChannel c = (SocketChannel)key.channel();
387 
388             // We will send what we can and move on.
389             ByteBuffer current = p.peekPending();
390             if( current == NioEndpoint.CLOSE_MARKER ) {
391                 // This connection wants to be closed now
392                 closeEndpoint(p);
393 
394                 // Nothing more to do
395                 return;
396             }
397 
398             c.write( current );
399 
400             // If we wrote all of that packet then we need to remove it
401             if( current.remaining() == 0 ) {
402                 p.removePending();
403             }
404 
405             // If we happened to empty the pending queue then let's read
406             // again.
407             if( !p.hasPending() ) {
408                 key.interestOps( SelectionKey.OP_READ );
409             }
410         }
411 
select()412         protected void select() throws IOException
413         {
414             selector.select();
415 
416             for( Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext(); ) {
417                 SelectionKey key = i.next();
418                 i.remove();
419 
420                 if( !key.isValid() )
421                     {
422                     // When does this happen?
423                     log.log( Level.INFO, "Key is not valid:{0}.", key );
424                     continue;
425                     }
426 
427                 try {
428                     if( key.isAcceptable() )
429                         accept(key);
430                     else if( key.isWritable() )
431                         write(key);
432                     else if( key.isReadable() )
433                         read(key);
434                 } catch( IOException e ) {
435                     if( !go.get() )
436                         return;  // error likely due to shutting down
437                     reportError( e );
438 
439                     // And at this level, errors likely mean the key is now
440                     // dead and it doesn't hurt to kick them anyway.  If we
441                     // find IOExceptions that are not fatal, this can be
442                     // readdressed
443                     cancel( key, (SocketChannel)key.channel() );
444                 }
445             }
446         }
447 
run()448         public void run()
449         {
450             log.log( Level.INFO, "Kernel started for connection:{0}.", address );
451 
452             // An atomic is safest and costs almost nothing
453             while( go.get() ) {
454                 // Setup any queued option changes
455                 setupSelectorOptions();
456 
457                 // Check for available keys and process them
458                 try {
459                     select();
460                 } catch( ClosedSelectorException e ) {
461                     if( !go.get() )
462                         return;  // it's because we're shutting down
463                     throw new KernelException( "Premature selector closing", e );
464                 } catch( IOException e ) {
465                     if( !go.get() )
466                         return;  // error likely due to shutting down
467                     reportError( e );
468                 }
469             }
470         }
471     }
472 }
473