1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 package org.tensorflow; 17 18 import java.lang.ref.PhantomReference; 19 import java.lang.ref.Reference; 20 import java.lang.ref.ReferenceQueue; 21 import java.util.IdentityHashMap; 22 import java.util.Map; 23 import java.util.concurrent.ExecutorService; 24 import java.util.concurrent.Executors; 25 26 /** 27 * An environment for executing TensorFlow operations eagerly. 28 * 29 * <p>Eager execution is an imperative programming environment that evaluates operations 30 * immediately, without building graphs. Operations return concrete values instead of constructing a 31 * computational graph to run later, as with {@link Graph}s and {@link Session}s. 32 * 33 * <p>This makes it easy to develop with TensorFlow and debug models, as it behaves more like a 34 * standard programming library. 35 * 36 * <p>Instances of a {@code EagerSession} are thread-safe. 37 */ 38 public final class EagerSession implements ExecutionEnvironment, AutoCloseable { 39 40 /** 41 * Controls how to act when we try to run an operation on a given device but some input tensors 42 * are not on that device. 43 */ 44 public static enum DevicePlacementPolicy { 45 46 /** Running operations with input tensors on the wrong device will fail. */ 47 EXPLICIT(0), 48 49 /** Copy the tensor to the right device but log a warning. */ 50 WARN(1), 51 52 /** 53 * Silently copy the tensor, which has a performance cost since the operation will be blocked 54 * till the copy completes. This is the default placement policy. 55 */ 56 SILENT(2), 57 58 /** Placement policy which silently copies int32 tensors but not other dtypes. */ 59 SILENT_FOR_INT32(3); 60 DevicePlacementPolicy(int code)61 private DevicePlacementPolicy(int code) { 62 this.code = code; 63 } 64 65 private final int code; 66 } 67 68 /** 69 * Controls how TensorFlow resources are cleaned up when they are no longer needed. 70 * 71 * <p>All resources allocated during an {@code EagerSession} are deleted when the session is 72 * closed. To prevent out-of-memory errors, it is also strongly suggest to cleanup those resources 73 * during the session. For example, executing n operations in a loop of m iterations will allocate 74 * a minimum of n*m resources while in most cases, only resources of the last iteration are still 75 * being used. 76 * 77 * <p>{@code EagerSession} instances can be notified in different ways when TensorFlow objects are 78 * no longer being referred, so they can proceed to the cleanup of any resources they owned. 79 */ 80 public static enum ResourceCleanupStrategy { 81 82 /** 83 * Monitor and delete unused resources from a new thread running in background. 84 * 85 * <p>This is the most reliable approach to cleanup TensorFlow resources, at the cost of 86 * starting and running an additional thread dedicated to this task. Each {@code EagerSession} 87 * instance has its own thread, which is stopped only when the session is closed. 88 * 89 * <p>This strategy is used by default. 90 */ 91 IN_BACKGROUND, 92 93 /** 94 * Monitor and delete unused resources from existing threads, before or after they complete 95 * another task. 96 * 97 * <p>Unused resources are released when a call to the TensorFlow library reaches a safe point 98 * for cleanup. This is done synchronously and might block for a short period of time the thread 99 * who triggered that call. 100 * 101 * <p>This strategy should be used only if, for some reasons, no additional thread should be 102 * allocated for cleanup. Otherwise, {@link #IN_BACKGROUND} should be preferred. 103 */ 104 ON_SAFE_POINTS, 105 106 /** 107 * Only delete resources when the session is closed. 108 * 109 * <p>All resources allocated during the session will remained in memory until the session is 110 * explicitly closed (or via the traditional `try-with-resource` technique). No extra task for 111 * resource cleanup will be attempted. 112 * 113 * <p>This strategy can lead up to out-of-memory errors and its usage is not recommended, unless 114 * the scope of the session is limited to execute only a small amount of operations. 115 */ 116 ON_SESSION_CLOSE, 117 } 118 119 public static class Options { 120 121 /** 122 * Controls how operations dispatched are actually executed. 123 * 124 * <p>When set to true, each operation are executed asynchronously (in which case some 125 * operations might return "non-ready" outputs). When set to false, all operations are executed 126 * synchronously. 127 * 128 * <p>Synchronous execution is used by default. 129 * 130 * @param value true for asynchronous execution, false for synchronous. 131 */ async(boolean value)132 public Options async(boolean value) { 133 async = value; 134 return this; 135 } 136 137 /** 138 * Controls how to act when we try to run an operation on a given device but some input tensors 139 * are not on that device. 140 * 141 * <p>{@link DevicePlacementPolicy#SILENT} is used by default. 142 * 143 * @param value policy to apply 144 * @see DevicePlacementPolicy 145 */ devicePlacementPolicy(DevicePlacementPolicy value)146 public Options devicePlacementPolicy(DevicePlacementPolicy value) { 147 devicePlacementPolicy = value; 148 return this; 149 } 150 151 /** 152 * Controls how TensorFlow resources are cleaned up when no longer needed. 153 * 154 * <p>{@link ResourceCleanupStrategy#IN_BACKGROUND} is used by default. 155 * 156 * @param value strategy to use 157 * @see ResourceCleanupStrategy 158 */ resourceCleanupStrategy(ResourceCleanupStrategy value)159 public Options resourceCleanupStrategy(ResourceCleanupStrategy value) { 160 resourceCleanupStrategy = value; 161 return this; 162 } 163 164 /** 165 * Configures the session based on the data found in the provided buffer, which is serialized 166 * TensorFlow config proto. 167 * 168 * <p>Warning: the support of this feature is subject to changes since TensorFlow protos might 169 * not be supported on public endpoints in the future. 170 * 171 * <p>See also: <a href="https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/protobuf/config.proto">config.proto</a> 172 * 173 * @param value a serialized config proto 174 */ config(byte[] value)175 public Options config(byte[] value) { 176 config = value; 177 return this; 178 } 179 180 /** Builds an eager session with the selected options. */ build()181 public EagerSession build() { 182 return new EagerSession(this, new ReferenceQueue<Object>()); 183 } 184 185 // For garbage-collection tests only buildForGcTest(ReferenceQueue<Object> gcQueue)186 EagerSession buildForGcTest(ReferenceQueue<Object> gcQueue) { 187 return new EagerSession(this, gcQueue); 188 } 189 190 private boolean async; 191 private DevicePlacementPolicy devicePlacementPolicy; 192 private ResourceCleanupStrategy resourceCleanupStrategy; 193 private byte[] config; 194 Options()195 private Options() { 196 async = false; 197 devicePlacementPolicy = DevicePlacementPolicy.SILENT; 198 resourceCleanupStrategy = ResourceCleanupStrategy.IN_BACKGROUND; 199 config = null; 200 } 201 } 202 203 /** 204 * Initializes the default eager session, which remains active for the lifetime of the 205 * application. 206 * 207 * <p>This method is implicitly invoked on the first call to {@link #getDefault()}, but can also 208 * be invoked explicitly to override default options. 209 * 210 * <p>Note that calling this method more than once will throw an {@code IllegalArgumentException} 211 * as the default session cannot be modified once it has been created. Therefore, it is important 212 * to explicitly initialize it before {@link #getDefault()} is invoked for the first time from any 213 * thread. 214 * 215 * <p>Example usage: 216 * 217 * <pre>{@code 218 * // Initializing default session to override default options is valid but 219 * // is optional 220 * EagerSession.initDefault(EagerSession.options().async(true)); 221 * 222 * // Starting to build eager operations using default session, by calling 223 * // EagerSession.getDefault() implicitly 224 * Ops tf = Ops.create(); 225 * 226 * // Initializing default session more than once or after using it is not 227 * // permitted and throws an exception 228 * EagerSession.initDefault(EagerSession.options().async(true)); // throws 229 * }</pre> 230 * 231 * @param options options to use to build default session 232 * @return default eager session 233 * @throws IllegalStateException if the default session is already initialized 234 * @see #getDefault() 235 */ initDefault(Options options)236 public static EagerSession initDefault(Options options) { 237 synchronized (EagerSession.class) { 238 if (defaultSession != null) { 239 throw new IllegalStateException("Default eager session is already initialized"); 240 } 241 defaultSession = options.build(); 242 } 243 return defaultSession; 244 } 245 246 /** 247 * Returns the default eager session 248 * 249 * <p>Once initialized, the default eager session remains active for the whole life of the 250 * application, as opposed to sessions obtained from {@link #create()} or {@link Options#build()} 251 * which should be closed after their usage. 252 * 253 * <p>The default set of {@link Options} is used to initialize the session on the first call. To 254 * override this behavior, it is possible to invoke {@link #initDefault(Options)} with a different 255 * set of options prior to this first call. 256 * 257 * <p>Example usage: 258 * 259 * <pre>{@code 260 * // Starting to build eager operations using default session, by calling 261 * // EagerSession.getDefault() implicitly 262 * Ops tf = Ops.create(); 263 * 264 * // Starting to build eager operations using default session, by calling 265 * // EagerSession.getDefault() explicitly 266 * Ops tf = Ops.create(EagerSession.getDefault()); 267 * }</pre> 268 * 269 * @return default eager session 270 * @see #initDefault 271 */ getDefault()272 public static EagerSession getDefault() { 273 if (defaultSession == null) { 274 synchronized (EagerSession.class) { 275 if (defaultSession == null) { 276 defaultSession = options().build(); 277 } 278 } 279 } 280 return defaultSession; 281 } 282 283 /** 284 * Returns an {@code EagerSession} configured with default options. 285 * 286 * <p><b>WARNING:</b>Instances of {@code EagerSession} returned by this method must be explicitly 287 * freed by invoking {@link #close()} when they are no longer needed. This could be achieve using 288 * the `try-with-resources` technique. 289 * 290 * <p>Example usage: 291 * 292 * <pre>{@code 293 * try (EagerSession session = EagerSession.create()) { 294 * Ops tf = Ops.create(session); 295 * // build execute operations eagerly... 296 * } 297 * }</pre> 298 */ create()299 public static EagerSession create() { 300 return options().build(); 301 } 302 303 /** 304 * Returns an object that configures and builds a {@code EagerSession} with custom options. 305 * 306 * <p><b>WARNING:</b>Instances of {@code EagerSession} returned by this method must be explicitly 307 * freed by invoking {@link #close()} when they are no longer needed. This could be achieve using 308 * the `try-with-resources` technique. 309 * 310 * <p>Example usage: 311 * 312 * <pre>{@code 313 * try (EagerSession session = EagerSession.options().async(true).build()) { 314 * Ops tf = Ops.create(session); 315 * // build execute operations eagerly and asynchronously... 316 * } 317 * }</pre> 318 */ options()319 public static EagerSession.Options options() { 320 return new Options(); 321 } 322 323 @Override close()324 public synchronized void close() { 325 if (this == defaultSession) { 326 throw new IllegalStateException("Default eager session cannot be closed"); 327 } 328 if (nativeHandle != 0L) { 329 if (resourceCleanupStrategy == ResourceCleanupStrategy.IN_BACKGROUND) { 330 nativeResources.stopCleanupThread(); 331 } 332 nativeResources.deleteAll(); 333 delete(nativeHandle); 334 nativeHandle = 0L; 335 } 336 } 337 338 @Override opBuilder(String type, String name)339 public OperationBuilder opBuilder(String type, String name) { 340 if (resourceCleanupStrategy == ResourceCleanupStrategy.ON_SAFE_POINTS) { 341 nativeResources.tryCleanup(); 342 } 343 checkSession(); 344 return new EagerOperationBuilder(this, type, name); 345 } 346 nativeHandle()347 long nativeHandle() { 348 checkSession(); 349 return nativeHandle; 350 } 351 resourceCleanupStrategy()352 ResourceCleanupStrategy resourceCleanupStrategy() { 353 return resourceCleanupStrategy; 354 } 355 356 /** 357 * A reference to one or more allocated native resources. 358 * 359 * <p>Any Java objects owning native resources must declare a reference to those resources in a 360 * subclass that extends from {@code NativeReference}. When {@link NativeReference#delete()} is 361 * invoked, the resources must be freed. For example: 362 * 363 * <pre>{@code 364 * private static class NativeReference extends EagerSession.NativeReference { 365 * 366 * NativeReference(EagerSession session, MyClass referent, long handle) { 367 * super(session, referent); 368 * this.handle = handle; 369 * } 370 * 371 * @Override 372 * void delete() { 373 * MyClass.nativeDelete(handle); 374 * } 375 * 376 * private final long handle; 377 * } 378 * }</pre> 379 * 380 * A Java object "owns" a native resource if this resource should not survive beyond the lifetime 381 * of this object. 382 * 383 * <p><b>IMPORTANT</b>: All nested subclasses of {@code NativeReference} must be declared as 384 * static, otherwise their instances will hold an implicit reference to their enclosing object, 385 * preventing the garbage collector to release them when they are no longer needed. 386 */ 387 abstract static class NativeReference extends PhantomReference<Object> { 388 389 /** Attach a new phantom reference of {@code referent} to {@code session}. */ NativeReference(EagerSession session, Object referent)390 public NativeReference(EagerSession session, Object referent) { 391 super(referent, session.nativeResources.garbageQueue); 392 session.checkSession(); 393 nativeResources = session.nativeResources; 394 nativeResources.attach(this); 395 } 396 397 /** 398 * Detach this reference from its current session. 399 * 400 * <p>Clearing a NativeReference does not invoke {@link #delete()}, thus won't release the 401 * native resources it refers to. It can be used when passing the ownership of those resources 402 * to another object. 403 * 404 * <p>If native resources needs to be deleted as well, call {@link #delete()} explicitly. 405 */ 406 @Override clear()407 public void clear() { 408 nativeResources.detach(this); 409 super.clear(); 410 } 411 412 /** Releases all native resources owned by the referred object, now deleted. */ delete()413 abstract void delete(); 414 415 private final NativeResourceCollector nativeResources; 416 } 417 418 /** 419 * Collects native references attached to this session and releases their resources if they are no 420 * longer needed. 421 */ 422 private static class NativeResourceCollector { 423 NativeResourceCollector(ReferenceQueue<Object> garbageQueue)424 NativeResourceCollector(ReferenceQueue<Object> garbageQueue) { 425 this.garbageQueue = garbageQueue; 426 } 427 attach(NativeReference nativeRef)428 void attach(NativeReference nativeRef) { 429 synchronized (nativeRefs) { 430 nativeRefs.put(nativeRef, null); 431 } 432 } 433 detach(NativeReference nativeRef)434 void detach(NativeReference nativeRef) { 435 synchronized (nativeRefs) { 436 nativeRefs.remove(nativeRef); 437 } 438 } 439 delete(NativeReference nativeRef)440 void delete(NativeReference nativeRef) { 441 synchronized (nativeRefs) { 442 if (!nativeRefs.keySet().remove(nativeRef)) { 443 return; // safety check 444 } 445 } 446 nativeRef.delete(); 447 } 448 deleteAll()449 void deleteAll() { 450 synchronized (nativeRefs) { 451 for (NativeReference nativeRef : nativeRefs.keySet()) { 452 nativeRef.delete(); 453 } 454 nativeRefs.clear(); 455 } 456 } 457 tryCleanup()458 void tryCleanup() { 459 Reference<?> nativeRef; 460 synchronized (nativeRefs) { 461 while ((nativeRef = garbageQueue.poll()) != null) { 462 delete((NativeReference) nativeRef); 463 } 464 } 465 } 466 startCleanupThread()467 synchronized void startCleanupThread() { 468 if (cleanupInBackground) { 469 return; // ignore if cleanup thread is already running 470 } 471 try { 472 cleanupInBackground = true; 473 cleanupService.execute( 474 new Runnable() { 475 @Override 476 public void run() { 477 try { 478 while (cleanupInBackground) { 479 NativeReference nativeRef = (NativeReference) garbageQueue.remove(); 480 delete(nativeRef); 481 } 482 } catch (InterruptedException e) { 483 // exit 484 } 485 } 486 }); 487 } catch (Exception e) { 488 cleanupInBackground = false; 489 throw e; 490 } 491 } 492 stopCleanupThread()493 void stopCleanupThread() { 494 cleanupInBackground = false; 495 cleanupService.shutdownNow(); // returns without waiting for the thread to stop 496 } 497 498 private final ExecutorService cleanupService = Executors.newSingleThreadExecutor(); 499 private final Map<NativeReference, Void> nativeRefs = new IdentityHashMap<>(); 500 private final ReferenceQueue<Object> garbageQueue; 501 private volatile boolean cleanupInBackground = false; 502 } 503 504 private static volatile EagerSession defaultSession = null; 505 506 private final NativeResourceCollector nativeResources; 507 private final ResourceCleanupStrategy resourceCleanupStrategy; 508 private long nativeHandle; 509 EagerSession(Options options, ReferenceQueue<Object> garbageQueue)510 private EagerSession(Options options, ReferenceQueue<Object> garbageQueue) { 511 this.nativeResources = new NativeResourceCollector(garbageQueue); 512 this.nativeHandle = allocate(options.async, options.devicePlacementPolicy.code, options.config); 513 this.resourceCleanupStrategy = options.resourceCleanupStrategy; 514 515 if (resourceCleanupStrategy == ResourceCleanupStrategy.IN_BACKGROUND) { 516 nativeResources.startCleanupThread(); 517 } 518 } 519 checkSession()520 private void checkSession() { 521 if (nativeHandle == 0L) { 522 throw new IllegalStateException("Eager session has been closed"); 523 } 524 } 525 allocate(boolean async, int devicePlacementPolicy, byte[] config)526 private static native long allocate(boolean async, int devicePlacementPolicy, byte[] config); 527 delete(long handle)528 private static native void delete(long handle); 529 530 static { TensorFlow.init()531 TensorFlow.init(); 532 } 533 } 534