• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2011 jMonkeyEngine
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  *   notice, this list of conditions and the following disclaimer.
11  *
12  * * Redistributions in binary form must reproduce the above copyright
13  *   notice, this list of conditions and the following disclaimer in the
14  *   documentation and/or other materials provided with the distribution.
15  *
16  * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
17  *   may be used to endorse or promote products derived from this software
18  *   without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 package com.jme3.network.kernel.udp;
34 
35 import com.jme3.network.Filter;
36 import com.jme3.network.kernel.*;
37 import java.io.IOException;
38 import java.net.*;
39 import java.nio.ByteBuffer;
40 import java.util.Map;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.logging.Level;
46 import java.util.logging.Logger;
47 
48 /**
49  *  A Kernel implementation using UDP packets.
50  *
51  *  @version   $Revision: 8944 $
52  *  @author    Paul Speed
53  */
54 public class UdpKernel extends AbstractKernel
55 {
56     static Logger log = Logger.getLogger(UdpKernel.class.getName());
57 
58     private InetSocketAddress address;
59     private HostThread thread;
60 
61     private ExecutorService writer;
62 
63     // The nature of UDP means that even through a firewall,
64     // a user would have to have a unique address+port since UDP
65     // can't really be NAT'ed.
66     private Map<SocketAddress,UdpEndpoint> socketEndpoints = new ConcurrentHashMap<SocketAddress,UdpEndpoint>();
67 
UdpKernel( InetAddress host, int port )68     public UdpKernel( InetAddress host, int port )
69     {
70         this( new InetSocketAddress(host, port) );
71     }
72 
UdpKernel( int port )73     public UdpKernel( int port ) throws IOException
74     {
75         this( new InetSocketAddress(port) );
76     }
77 
UdpKernel( InetSocketAddress address )78     public UdpKernel( InetSocketAddress address )
79     {
80         this.address = address;
81     }
82 
createHostThread()83     protected HostThread createHostThread()
84     {
85         return new HostThread();
86     }
87 
initialize()88     public void initialize()
89     {
90         if( thread != null )
91             throw new IllegalStateException( "Kernel already initialized." );
92 
93         writer = Executors.newFixedThreadPool(2, new NamedThreadFactory(toString() + "-writer"));
94 
95         thread = createHostThread();
96 
97         try {
98             thread.connect();
99             thread.start();
100         } catch( IOException e ) {
101             throw new KernelException( "Error hosting:" + address, e );
102         }
103     }
104 
terminate()105     public void terminate() throws InterruptedException
106     {
107         if( thread == null )
108             throw new IllegalStateException( "Kernel not initialized." );
109 
110         try {
111             thread.close();
112             writer.shutdown();
113             thread = null;
114         } catch( IOException e ) {
115             throw new KernelException( "Error closing host connection:" + address, e );
116         }
117     }
118 
119     /**
120      *  Dispatches the data to all endpoints managed by the
121      *  kernel.  'routing' is currently ignored.
122      */
broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, boolean copy )123     public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
124                            boolean copy )
125     {
126         if( reliable )
127             throw new UnsupportedOperationException( "Reliable send not supported by this kernel." );
128 
129         if( copy ) {
130             // Copy the data just once
131             byte[] temp = new byte[data.remaining()];
132             System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
133             data = ByteBuffer.wrap(temp);
134         }
135 
136         // Hand it to all of the endpoints that match our routing
137         for( UdpEndpoint p : socketEndpoints.values() ) {
138             // Does it match the filter?
139             if( filter != null && !filter.apply(p) )
140                 continue;
141 
142             // Send the data
143             p.send( data );
144         }
145     }
146 
getEndpoint( SocketAddress address, boolean create )147     protected Endpoint getEndpoint( SocketAddress address, boolean create )
148     {
149         UdpEndpoint p = socketEndpoints.get(address);
150         if( p == null && create ) {
151             p = new UdpEndpoint( this, nextEndpointId(), address, thread.getSocket() );
152             socketEndpoints.put( address, p );
153 
154             // Add an event for it.
155             addEvent( EndpointEvent.createAdd( this, p ) );
156         }
157         return p;
158     }
159 
160     /**
161      *  Called by the endpoints when they need to be closed.
162      */
closeEndpoint( UdpEndpoint p )163     protected void closeEndpoint( UdpEndpoint p ) throws IOException
164     {
165         // Just book-keeping to do here.
166         if( socketEndpoints.remove( p.getRemoteAddress() ) == null )
167             return;
168 
169         log.log( Level.INFO, "Closing endpoint:{0}.", p );
170         log.log( Level.FINE, "Socket endpoints size:{0}", socketEndpoints.size() );
171 
172         addEvent( EndpointEvent.createRemove( this, p ) );
173 
174         // If there are no pending messages then add one so that the
175         // kernel-user knows to wake up if it is only listening for
176         // envelopes.
177         if( !hasEnvelopes() ) {
178             // Note: this is not really a race condition.  At worst, our
179             // event has already been handled by now and it does no harm
180             // to check again.
181             addEnvelope( EVENTS_PENDING );
182         }
183     }
184 
newData( DatagramPacket packet )185     protected void newData( DatagramPacket packet )
186     {
187         // So the tricky part here is figuring out the endpoint and
188         // whether it's new or not.  In these UDP schemes, firewalls have
189         // to be ported back to a specific machine so we will consider
190         // the address + port (ie: SocketAddress) the defacto unique
191         // ID.
192         Endpoint p = getEndpoint( packet.getSocketAddress(), true );
193 
194         // We'll copy the data to trim it.
195         byte[] data = new byte[packet.getLength()];
196         System.arraycopy(packet.getData(), 0, data, 0, data.length);
197 
198         Envelope env = new Envelope( p, data, false );
199         addEnvelope( env );
200     }
201 
enqueueWrite( Endpoint endpoint, DatagramPacket packet )202     protected void enqueueWrite( Endpoint endpoint, DatagramPacket packet )
203     {
204         writer.execute( new MessageWriter(endpoint, packet) );
205     }
206 
207     protected class MessageWriter implements Runnable
208     {
209         private Endpoint endpoint;
210         private DatagramPacket packet;
211 
MessageWriter( Endpoint endpoint, DatagramPacket packet )212         public MessageWriter( Endpoint endpoint, DatagramPacket packet )
213         {
214             this.endpoint = endpoint;
215             this.packet = packet;
216         }
217 
run()218         public void run()
219         {
220             // Not guaranteed to always work but an extra datagram
221             // to a dead connection isn't so big of a deal.
222             if( !endpoint.isConnected() ) {
223                 return;
224             }
225 
226             try {
227                 thread.getSocket().send(packet);
228             } catch( Exception e ) {
229                 KernelException exc = new KernelException( "Error sending datagram to:" + address, e );
230                 exc.fillInStackTrace();
231                 reportError(exc);
232             }
233         }
234     }
235 
236     protected class HostThread extends Thread
237     {
238         private DatagramSocket socket;
239         private AtomicBoolean go = new AtomicBoolean(true);
240 
241         private byte[] buffer = new byte[65535]; // slightly bigger than needed.
242 
HostThread()243         public HostThread()
244         {
245             setName( "UDP Host@" + address );
246             setDaemon(true);
247         }
248 
getSocket()249         protected DatagramSocket getSocket()
250         {
251             return socket;
252         }
253 
connect()254         public void connect() throws IOException
255         {
256             socket = new DatagramSocket( address );
257             log.log( Level.INFO, "Hosting UDP connection:{0}.", address );
258         }
259 
close()260         public void close() throws IOException, InterruptedException
261         {
262             // Set the thread to stop
263             go.set(false);
264 
265             // Make sure the channel is closed
266             socket.close();
267 
268             // And wait for it
269             join();
270         }
271 
run()272         public void run()
273         {
274             log.log( Level.INFO, "Kernel started for connection:{0}.", address );
275 
276             // An atomic is safest and costs almost nothing
277             while( go.get() ) {
278                 try {
279                     // Could reuse the packet but I don't see the
280                     // point and it may lead to subtle bugs if not properly
281                     // reset.
282                     DatagramPacket packet = new DatagramPacket( buffer, buffer.length );
283                     socket.receive(packet);
284 
285                     newData( packet );
286                 } catch( IOException e ) {
287                     if( !go.get() )
288                         return;
289                     reportError( e );
290                 }
291             }
292         }
293     }
294 }
295