1 /** 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * SPDX-License-Identifier: Apache-2.0. 4 */ 5 package software.amazon.awssdk.crt.io; 6 7 import java.util.concurrent.CompletableFuture; 8 import software.amazon.awssdk.crt.CrtResource; 9 import software.amazon.awssdk.crt.CrtRuntimeException; 10 import software.amazon.awssdk.crt.Log; 11 12 /** 13 * This class wraps the aws_event_loop_group from aws-c-io to provide 14 * access to an event loop for the MQTT protocol stack in the AWS Common 15 * Runtime. 16 */ 17 public final class EventLoopGroup extends CrtResource { 18 19 private final CompletableFuture<Void> shutdownComplete = new CompletableFuture<>(); 20 21 /** 22 * Creates a new event loop group for the I/O subsystem to use to run non-blocking I/O requests 23 * @param numThreads The number of threads that the event loop group may run tasks across. Usually 1. 24 * @throws CrtRuntimeException If the system is unable to allocate space for a native event loop group 25 */ EventLoopGroup(int numThreads)26 public EventLoopGroup(int numThreads) throws CrtRuntimeException { 27 acquireNativeHandle(eventLoopGroupNew(this, numThreads)); 28 } 29 30 /** 31 * Creates a new event loop group for the I/O subsystem to use to run non-blocking I/O requests. When using this 32 * constructor, the threads will be pinned to a particular cpuGroup (e.g. a particular NUMA node). 33 * @param cpuGroup the index of the cpu group to bind to (for example NUMA node 0 would be 0, NUMA node 1 would be 1 etc...) 34 * @param numThreads The number of threads that the event loop group may run tasks across. Usually 1. 35 * @throws CrtRuntimeException If the system is unable to allocate space for a native event loop group 36 */ EventLoopGroup(int cpuGroup, int numThreads)37 public EventLoopGroup(int cpuGroup, int numThreads) throws CrtRuntimeException { 38 acquireNativeHandle(eventLoopGroupNewPinnedToCpuGroup(this, cpuGroup, numThreads)); 39 } 40 41 /** 42 * Determines whether a resource releases its dependencies at the same time the native handle is released or if it waits. 43 * Resources that wait are responsible for calling releaseReferences() manually. 44 */ 45 @Override canReleaseReferencesImmediately()46 protected boolean canReleaseReferencesImmediately() { return false; } 47 48 /** 49 * Stops the event loop group's tasks and frees all resources associated with the the group. This should be called 50 * after all other clients/connections and other resources are cleaned up, or else they will not clean up completely. 51 */ 52 @Override releaseNativeHandle()53 protected void releaseNativeHandle() { 54 if (!isNull()) { 55 eventLoopGroupDestroy(getNativeHandle()); 56 } 57 } 58 59 /** 60 * Called from Native when the asynchronous cleanup process needed for event loop groups has completed. 61 */ onCleanupComplete()62 private void onCleanupComplete() { 63 Log.log(Log.LogLevel.Trace, Log.LogSubject.IoEventLoop, "EventLoopGroup.onCleanupComplete"); 64 65 releaseReferences(); 66 67 this.shutdownComplete.complete(null); 68 } 69 getShutdownCompleteFuture()70 public CompletableFuture<Void> getShutdownCompleteFuture() { return shutdownComplete; } 71 72 73 /* 74 * Static interface for access to a default, lazily-created event loop group for users who don't 75 * want to deal with the associated resource management. Client bootstraps will use this event loop 76 * group if they are passed a null value. 77 */ 78 79 /** 80 * Sets the number of threads for the static default event loop group, should it ever be created. Has no 81 * effect if the static default event loop group has already been created. 82 * 83 * @param numThreads number of threads for the static default event loop group 84 */ setStaticDefaultNumThreads(int numThreads)85 public static void setStaticDefaultNumThreads(int numThreads) { 86 synchronized (EventLoopGroup.class) { 87 staticDefaultNumThreads = Math.max(1, numThreads); 88 } 89 } 90 91 /** 92 * Closes the static EventLoopGroup, if it exists. Primarily intended for tests that use the static 93 * default EventLoopGroup, before they call waitForNoResources(). 94 */ closeStaticDefault()95 public static void closeStaticDefault() { 96 synchronized (EventLoopGroup.class) { 97 if (staticDefaultEventLoopGroup != null) { 98 staticDefaultEventLoopGroup.close(); 99 } 100 staticDefaultEventLoopGroup = null; 101 } 102 } 103 104 /** 105 * Gets the static default EventLoopGroup, creating it if necessary. 106 * 107 * This default will be used when a EventLoopGroup is not explicitly passed but is needed 108 * to allow the process to function. An example of this would be in the MQTT connection creation workflow. 109 * 110 * The EventLoopGroup will automatically pick a default number of threads based on the system. 111 * 112 * The default EventLoopGroup will be automatically managed and released by the API handle when it's 113 * resources are being freed, not requiring any manual memory management. 114 * @return the static default event loop group 115 */ getOrCreateStaticDefault()116 static EventLoopGroup getOrCreateStaticDefault() { 117 EventLoopGroup elg = null; 118 synchronized (EventLoopGroup.class) { 119 if (staticDefaultEventLoopGroup == null) { 120 staticDefaultEventLoopGroup = new EventLoopGroup(staticDefaultNumThreads); 121 } 122 123 elg = staticDefaultEventLoopGroup; 124 } 125 126 return elg; 127 } 128 129 private static int staticDefaultNumThreads = Math.max(1, Runtime.getRuntime().availableProcessors()); 130 private static EventLoopGroup staticDefaultEventLoopGroup; 131 132 /******************************************************************************* 133 * native methods 134 ******************************************************************************/ eventLoopGroupNew(EventLoopGroup thisObj, int numThreads)135 private static native long eventLoopGroupNew(EventLoopGroup thisObj, int numThreads) throws CrtRuntimeException; eventLoopGroupNewPinnedToCpuGroup(EventLoopGroup thisObj, int cpuGroup, int numThreads)136 private static native long eventLoopGroupNewPinnedToCpuGroup(EventLoopGroup thisObj, int cpuGroup, int numThreads) throws CrtRuntimeException; 137 eventLoopGroupDestroy(long elg)138 private static native void eventLoopGroupDestroy(long elg); 139 }; 140