• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 package software.amazon.awssdk.crt.io;
6 
7 import java.util.concurrent.CompletableFuture;
8 import software.amazon.awssdk.crt.CrtResource;
9 import software.amazon.awssdk.crt.CrtRuntimeException;
10 import software.amazon.awssdk.crt.Log;
11 
12 /**
13  * This class wraps the aws_event_loop_group from aws-c-io to provide
14  * access to an event loop for the MQTT protocol stack in the AWS Common
15  * Runtime.
16  */
17 public final class EventLoopGroup extends CrtResource {
18 
19     private final CompletableFuture<Void> shutdownComplete = new CompletableFuture<>();
20 
21     /**
22      * Creates a new event loop group for the I/O subsystem to use to run non-blocking I/O requests
23      * @param numThreads The number of threads that the event loop group may run tasks across. Usually 1.
24      * @throws CrtRuntimeException If the system is unable to allocate space for a native event loop group
25      */
EventLoopGroup(int numThreads)26     public EventLoopGroup(int numThreads) throws CrtRuntimeException {
27         acquireNativeHandle(eventLoopGroupNew(this, numThreads));
28     }
29 
30     /**
31      * Creates a new event loop group for the I/O subsystem to use to run non-blocking I/O requests. When using this
32      * constructor, the threads will be pinned to a particular cpuGroup (e.g. a particular NUMA node).
33      * @param cpuGroup the index of the cpu group to bind to (for example NUMA node 0 would be 0, NUMA node 1 would be 1 etc...)
34      * @param numThreads The number of threads that the event loop group may run tasks across. Usually 1.
35      * @throws CrtRuntimeException If the system is unable to allocate space for a native event loop group
36      */
EventLoopGroup(int cpuGroup, int numThreads)37     public EventLoopGroup(int cpuGroup, int numThreads) throws CrtRuntimeException {
38         acquireNativeHandle(eventLoopGroupNewPinnedToCpuGroup(this, cpuGroup, numThreads));
39     }
40 
41     /**
42      * Determines whether a resource releases its dependencies at the same time the native handle is released or if it waits.
43      * Resources that wait are responsible for calling releaseReferences() manually.
44      */
45     @Override
canReleaseReferencesImmediately()46     protected boolean canReleaseReferencesImmediately() { return false; }
47 
48     /**
49      * Stops the event loop group's tasks and frees all resources associated with the the group. This should be called
50      * after all other clients/connections and other resources are cleaned up, or else they will not clean up completely.
51      */
52     @Override
releaseNativeHandle()53     protected void releaseNativeHandle() {
54         if (!isNull()) {
55             eventLoopGroupDestroy(getNativeHandle());
56         }
57     }
58 
59     /**
60      * Called from Native when the asynchronous cleanup process needed for event loop groups has completed.
61      */
onCleanupComplete()62     private void onCleanupComplete() {
63         Log.log(Log.LogLevel.Trace, Log.LogSubject.IoEventLoop, "EventLoopGroup.onCleanupComplete");
64 
65         releaseReferences();
66 
67         this.shutdownComplete.complete(null);
68     }
69 
getShutdownCompleteFuture()70     public CompletableFuture<Void> getShutdownCompleteFuture() { return shutdownComplete; }
71 
72 
73     /*
74      * Static interface for access to a default, lazily-created event loop group for users who don't
75      * want to deal with the associated resource management.  Client bootstraps will use this event loop
76      * group if they are passed a null value.
77      */
78 
79     /**
80      * Sets the number of threads for the static default event loop group, should it ever be created.  Has no
81      * effect if the static default event loop group has already been created.
82      *
83      * @param numThreads number of threads for the static default event loop group
84      */
setStaticDefaultNumThreads(int numThreads)85     public static void setStaticDefaultNumThreads(int numThreads) {
86         synchronized (EventLoopGroup.class) {
87             staticDefaultNumThreads = Math.max(1, numThreads);
88         }
89     }
90 
91     /**
92      * Closes the static EventLoopGroup, if it exists.  Primarily intended for tests that use the static
93      * default EventLoopGroup, before they call waitForNoResources().
94      */
closeStaticDefault()95     public static void closeStaticDefault() {
96         synchronized (EventLoopGroup.class) {
97             if (staticDefaultEventLoopGroup != null) {
98                 staticDefaultEventLoopGroup.close();
99             }
100             staticDefaultEventLoopGroup = null;
101         }
102     }
103 
104     /**
105      * Gets the static default EventLoopGroup, creating it if necessary.
106      *
107      * This default will be used when a EventLoopGroup is not explicitly passed but is needed
108      * to allow the process to function. An example of this would be in the MQTT connection creation workflow.
109      *
110      * The EventLoopGroup will automatically pick a default number of threads based on the system.
111      *
112      * The default EventLoopGroup will be automatically managed and released by the API handle when it's
113      * resources are being freed, not requiring any manual memory management.
114      * @return the static default event loop group
115      */
getOrCreateStaticDefault()116     static EventLoopGroup getOrCreateStaticDefault() {
117         EventLoopGroup elg = null;
118         synchronized (EventLoopGroup.class) {
119             if (staticDefaultEventLoopGroup == null) {
120                 staticDefaultEventLoopGroup = new EventLoopGroup(staticDefaultNumThreads);
121             }
122 
123             elg = staticDefaultEventLoopGroup;
124         }
125 
126         return elg;
127     }
128 
129     private static int staticDefaultNumThreads = Math.max(1, Runtime.getRuntime().availableProcessors());
130     private static EventLoopGroup staticDefaultEventLoopGroup;
131 
132     /*******************************************************************************
133      * native methods
134      ******************************************************************************/
eventLoopGroupNew(EventLoopGroup thisObj, int numThreads)135     private static native long eventLoopGroupNew(EventLoopGroup thisObj, int numThreads) throws CrtRuntimeException;
eventLoopGroupNewPinnedToCpuGroup(EventLoopGroup thisObj, int cpuGroup, int numThreads)136     private static native long eventLoopGroupNewPinnedToCpuGroup(EventLoopGroup thisObj, int cpuGroup, int numThreads) throws CrtRuntimeException;
137 
eventLoopGroupDestroy(long elg)138     private static native void eventLoopGroupDestroy(long elg);
139 };
140