1 /* 2 * Copyright (c) 2011 jMonkeyEngine 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are 7 * met: 8 * 9 * * Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * * Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * * Neither the name of 'jMonkeyEngine' nor the names of its contributors 17 * may be used to endorse or promote products derived from this software 18 * without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 22 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 package com.jme3.network.kernel.udp; 34 35 import com.jme3.network.Filter; 36 import com.jme3.network.kernel.*; 37 import java.io.IOException; 38 import java.net.*; 39 import java.nio.ByteBuffer; 40 import java.util.Map; 41 import java.util.concurrent.ConcurrentHashMap; 42 import java.util.concurrent.ExecutorService; 43 import java.util.concurrent.Executors; 44 import java.util.concurrent.atomic.AtomicBoolean; 45 import java.util.logging.Level; 46 import java.util.logging.Logger; 47 48 /** 49 * A Kernel implementation using UDP packets. 50 * 51 * @version $Revision: 8944 $ 52 * @author Paul Speed 53 */ 54 public class UdpKernel extends AbstractKernel 55 { 56 static Logger log = Logger.getLogger(UdpKernel.class.getName()); 57 58 private InetSocketAddress address; 59 private HostThread thread; 60 61 private ExecutorService writer; 62 63 // The nature of UDP means that even through a firewall, 64 // a user would have to have a unique address+port since UDP 65 // can't really be NAT'ed. 66 private Map<SocketAddress,UdpEndpoint> socketEndpoints = new ConcurrentHashMap<SocketAddress,UdpEndpoint>(); 67 UdpKernel( InetAddress host, int port )68 public UdpKernel( InetAddress host, int port ) 69 { 70 this( new InetSocketAddress(host, port) ); 71 } 72 UdpKernel( int port )73 public UdpKernel( int port ) throws IOException 74 { 75 this( new InetSocketAddress(port) ); 76 } 77 UdpKernel( InetSocketAddress address )78 public UdpKernel( InetSocketAddress address ) 79 { 80 this.address = address; 81 } 82 createHostThread()83 protected HostThread createHostThread() 84 { 85 return new HostThread(); 86 } 87 initialize()88 public void initialize() 89 { 90 if( thread != null ) 91 throw new IllegalStateException( "Kernel already initialized." ); 92 93 writer = Executors.newFixedThreadPool(2, new NamedThreadFactory(toString() + "-writer")); 94 95 thread = createHostThread(); 96 97 try { 98 thread.connect(); 99 thread.start(); 100 } catch( IOException e ) { 101 throw new KernelException( "Error hosting:" + address, e ); 102 } 103 } 104 terminate()105 public void terminate() throws InterruptedException 106 { 107 if( thread == null ) 108 throw new IllegalStateException( "Kernel not initialized." ); 109 110 try { 111 thread.close(); 112 writer.shutdown(); 113 thread = null; 114 } catch( IOException e ) { 115 throw new KernelException( "Error closing host connection:" + address, e ); 116 } 117 } 118 119 /** 120 * Dispatches the data to all endpoints managed by the 121 * kernel. 'routing' is currently ignored. 122 */ broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, boolean copy )123 public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, 124 boolean copy ) 125 { 126 if( reliable ) 127 throw new UnsupportedOperationException( "Reliable send not supported by this kernel." ); 128 129 if( copy ) { 130 // Copy the data just once 131 byte[] temp = new byte[data.remaining()]; 132 System.arraycopy(data.array(), data.position(), temp, 0, data.remaining()); 133 data = ByteBuffer.wrap(temp); 134 } 135 136 // Hand it to all of the endpoints that match our routing 137 for( UdpEndpoint p : socketEndpoints.values() ) { 138 // Does it match the filter? 139 if( filter != null && !filter.apply(p) ) 140 continue; 141 142 // Send the data 143 p.send( data ); 144 } 145 } 146 getEndpoint( SocketAddress address, boolean create )147 protected Endpoint getEndpoint( SocketAddress address, boolean create ) 148 { 149 UdpEndpoint p = socketEndpoints.get(address); 150 if( p == null && create ) { 151 p = new UdpEndpoint( this, nextEndpointId(), address, thread.getSocket() ); 152 socketEndpoints.put( address, p ); 153 154 // Add an event for it. 155 addEvent( EndpointEvent.createAdd( this, p ) ); 156 } 157 return p; 158 } 159 160 /** 161 * Called by the endpoints when they need to be closed. 162 */ closeEndpoint( UdpEndpoint p )163 protected void closeEndpoint( UdpEndpoint p ) throws IOException 164 { 165 // Just book-keeping to do here. 166 if( socketEndpoints.remove( p.getRemoteAddress() ) == null ) 167 return; 168 169 log.log( Level.INFO, "Closing endpoint:{0}.", p ); 170 log.log( Level.FINE, "Socket endpoints size:{0}", socketEndpoints.size() ); 171 172 addEvent( EndpointEvent.createRemove( this, p ) ); 173 174 // If there are no pending messages then add one so that the 175 // kernel-user knows to wake up if it is only listening for 176 // envelopes. 177 if( !hasEnvelopes() ) { 178 // Note: this is not really a race condition. At worst, our 179 // event has already been handled by now and it does no harm 180 // to check again. 181 addEnvelope( EVENTS_PENDING ); 182 } 183 } 184 newData( DatagramPacket packet )185 protected void newData( DatagramPacket packet ) 186 { 187 // So the tricky part here is figuring out the endpoint and 188 // whether it's new or not. In these UDP schemes, firewalls have 189 // to be ported back to a specific machine so we will consider 190 // the address + port (ie: SocketAddress) the defacto unique 191 // ID. 192 Endpoint p = getEndpoint( packet.getSocketAddress(), true ); 193 194 // We'll copy the data to trim it. 195 byte[] data = new byte[packet.getLength()]; 196 System.arraycopy(packet.getData(), 0, data, 0, data.length); 197 198 Envelope env = new Envelope( p, data, false ); 199 addEnvelope( env ); 200 } 201 enqueueWrite( Endpoint endpoint, DatagramPacket packet )202 protected void enqueueWrite( Endpoint endpoint, DatagramPacket packet ) 203 { 204 writer.execute( new MessageWriter(endpoint, packet) ); 205 } 206 207 protected class MessageWriter implements Runnable 208 { 209 private Endpoint endpoint; 210 private DatagramPacket packet; 211 MessageWriter( Endpoint endpoint, DatagramPacket packet )212 public MessageWriter( Endpoint endpoint, DatagramPacket packet ) 213 { 214 this.endpoint = endpoint; 215 this.packet = packet; 216 } 217 run()218 public void run() 219 { 220 // Not guaranteed to always work but an extra datagram 221 // to a dead connection isn't so big of a deal. 222 if( !endpoint.isConnected() ) { 223 return; 224 } 225 226 try { 227 thread.getSocket().send(packet); 228 } catch( Exception e ) { 229 KernelException exc = new KernelException( "Error sending datagram to:" + address, e ); 230 exc.fillInStackTrace(); 231 reportError(exc); 232 } 233 } 234 } 235 236 protected class HostThread extends Thread 237 { 238 private DatagramSocket socket; 239 private AtomicBoolean go = new AtomicBoolean(true); 240 241 private byte[] buffer = new byte[65535]; // slightly bigger than needed. 242 HostThread()243 public HostThread() 244 { 245 setName( "UDP Host@" + address ); 246 setDaemon(true); 247 } 248 getSocket()249 protected DatagramSocket getSocket() 250 { 251 return socket; 252 } 253 connect()254 public void connect() throws IOException 255 { 256 socket = new DatagramSocket( address ); 257 log.log( Level.INFO, "Hosting UDP connection:{0}.", address ); 258 } 259 close()260 public void close() throws IOException, InterruptedException 261 { 262 // Set the thread to stop 263 go.set(false); 264 265 // Make sure the channel is closed 266 socket.close(); 267 268 // And wait for it 269 join(); 270 } 271 run()272 public void run() 273 { 274 log.log( Level.INFO, "Kernel started for connection:{0}.", address ); 275 276 // An atomic is safest and costs almost nothing 277 while( go.get() ) { 278 try { 279 // Could reuse the packet but I don't see the 280 // point and it may lead to subtle bugs if not properly 281 // reset. 282 DatagramPacket packet = new DatagramPacket( buffer, buffer.length ); 283 socket.receive(packet); 284 285 newData( packet ); 286 } catch( IOException e ) { 287 if( !go.get() ) 288 return; 289 reportError( e ); 290 } 291 } 292 } 293 } 294 } 295