• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 package software.amazon.awssdk.crt.mqtt5;
6 
7 import java.util.concurrent.CompletableFuture;
8 import java.util.function.Consumer;
9 
10 import software.amazon.awssdk.crt.CrtResource;
11 import software.amazon.awssdk.crt.CrtRuntimeException;
12 import software.amazon.awssdk.crt.http.HttpProxyOptions;
13 import software.amazon.awssdk.crt.http.HttpRequest;
14 import software.amazon.awssdk.crt.io.ClientBootstrap;
15 import software.amazon.awssdk.crt.io.SocketOptions;
16 import software.amazon.awssdk.crt.io.TlsContext;
17 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;
18 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket;
19 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket;
20 import software.amazon.awssdk.crt.mqtt5.packets.SubAckPacket;
21 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket;
22 import software.amazon.awssdk.crt.mqtt5.packets.UnsubAckPacket;
23 import software.amazon.awssdk.crt.mqtt5.packets.UnsubscribePacket;
24 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket.ConnectPacketBuilder;
25 
26 
27  /**
28  * This class wraps the aws-c-mqtt MQTT5 client to provide the basic MQTT5 pub/sub functionalities
29  * via the AWS Common Runtime
30  *
31  * One Mqtt5Client class creates one connection.
32  *
33  */
34 public class Mqtt5Client extends CrtResource {
35 
36     /**
37      * A private reference to the websocket handshake from the MQTT5 client options
38      */
39     private Consumer<Mqtt5WebsocketHandshakeTransformArgs> websocketHandshakeTransform;
40 
41     /**
42      * A boolean that holds whether the client's current state is connected or not
43      */
44     private boolean isConnected;
45 
46     /**
47      * A private config used to save config for mqtt3 connection creation
48      */
49     private Mqtt5ClientOptions clientOptions;
50 
51     /**
52      * Creates a Mqtt5Client instance using the provided Mqtt5ClientOptions. Once the Mqtt5Client is created,
53      * changing the settings will not cause a change in already created Mqtt5Client's.
54      *
55      * @param options The Mqtt5Options class to use to configure the new Mqtt5Client.
56      * @throws CrtRuntimeException If the system is unable to allocate space for a native MQTT5 client structure
57      */
Mqtt5Client(Mqtt5ClientOptions options)58     public Mqtt5Client(Mqtt5ClientOptions options) throws CrtRuntimeException {
59         clientOptions = options;
60         ClientBootstrap bootstrap = options.getBootstrap();
61         SocketOptions socketOptions = options.getSocketOptions();
62         TlsContext tlsContext = options.getTlsContext();
63         HttpProxyOptions proxyOptions = options.getHttpProxyOptions();
64         ConnectPacket connectionOptions = options.getConnectOptions();
65         this.websocketHandshakeTransform = options.getWebsocketHandshakeTransform();
66 
67         if (bootstrap == null) {
68             bootstrap = ClientBootstrap.getOrCreateStaticDefault();
69         }
70 
71         if (connectionOptions == null) {
72             ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder();
73             connectionOptions = connectBuilder.build();
74         }
75 
76         acquireNativeHandle(mqtt5ClientNew(
77             options,
78             connectionOptions,
79             bootstrap,
80             this
81         ));
82 
83         if (bootstrap != null) {
84             addReferenceTo(bootstrap);
85         }
86         if (socketOptions != null) {
87             addReferenceTo(socketOptions);
88         }
89         if (tlsContext != null) {
90             addReferenceTo(tlsContext);
91         }
92         if (proxyOptions != null) {
93             if (proxyOptions.getTlsContext() != null) {
94                 addReferenceTo(proxyOptions.getTlsContext());
95             }
96         }
97         isConnected = false;
98     }
99 
100     /**
101      * Cleans up the native resources associated with this client. The client is unusable after this call
102      */
103     @Override
releaseNativeHandle()104     protected void releaseNativeHandle() {
105         if (!isNull()) {
106             mqtt5ClientDestroy(getNativeHandle());
107         }
108     }
109 
110     /**
111      * Determines whether a resource releases its dependencies at the same time the native handle is released or if it waits.
112      * Resources that wait are responsible for calling releaseReferences() manually.
113      */
114     @Override
canReleaseReferencesImmediately()115     protected boolean canReleaseReferencesImmediately() { return false; }
116 
117     /**
118      * Notifies the Mqtt5Client that you want it maintain connectivity to the configured endpoint.
119      * The client will attempt to stay connected using the properties of the reconnect-related parameters
120      * in the Mqtt5Client configuration.
121      *
122      * This is an asynchronous operation.
123      *
124      * @throws CrtRuntimeException If the native client returns an error when starting
125      */
start()126     public void start() throws CrtRuntimeException {
127         mqtt5ClientInternalStart(getNativeHandle());
128     }
129 
130     /**
131      * Notifies the Mqtt5Client that you want it to end connectivity to the configured endpoint, disconnecting any
132      * existing connection and halting any reconnect attempts.
133      *
134      * This is an asynchronous operation.
135      *
136      * @param disconnectPacket (optional) Properties of a DISCONNECT packet to send as part of the shutdown process. When
137      * disconnectPacket is null, no DISCONNECT packets will be sent.
138      * @throws CrtRuntimeException If the native client is unable to initialize the stop process.
139      */
stop(DisconnectPacket disconnectPacket)140     public void stop(DisconnectPacket disconnectPacket) throws CrtRuntimeException {
141         mqtt5ClientInternalStop(getNativeHandle(), disconnectPacket);
142     }
143 
144     /**
145      * Notifies the Mqtt5Client that you want it to end connectivity to the configured endpoint, disconnecting any
146      * existing connection and halting any reconnect attempts. No DISCONNECT packets will be sent.
147      *
148      * This is an asynchronous operation.
149      *
150      * @throws CrtRuntimeException If the native client is unable to initialize the stop process.
151      */
stop()152     public void stop() throws CrtRuntimeException {
153         stop(null);
154     }
155 
156     /**
157      * Tells the Mqtt5Client to attempt to send a PUBLISH packet.
158      *
159      * Will return a future containing a PublishPacket if the publish is successful.
160      * The data in the PublishPacket varies depending on the QoS of the Publish. For QoS 0, the PublishPacket
161      * will not contain data. For QoS 1, the PublishPacket will contain a PubAckPacket.
162      * See PublishPacket class documentation for more info.
163      *
164      * @param publishPacket PUBLISH packet to send to the server
165      * @return A future that will be rejected with an error or resolved with a PublishResult response
166      */
publish(PublishPacket publishPacket)167     public CompletableFuture<PublishResult> publish(PublishPacket publishPacket) {
168         CompletableFuture<PublishResult> publishFuture = new CompletableFuture<>();
169         mqtt5ClientInternalPublish(getNativeHandle(), publishPacket, publishFuture);
170         return publishFuture;
171     }
172 
173     /**
174      * Tells the Mqtt5Client to attempt to subscribe to one or more topic filters.
175      *
176      * @param subscribePacket SUBSCRIBE packet to send to the server
177      * @return a future that will be rejected with an error or resolved with the SUBACK response
178      */
subscribe(SubscribePacket subscribePacket)179     public CompletableFuture<SubAckPacket> subscribe(SubscribePacket subscribePacket) {
180         CompletableFuture<SubAckPacket> subscribeFuture = new CompletableFuture<>();
181         mqtt5ClientInternalSubscribe(getNativeHandle(), subscribePacket, subscribeFuture);
182         return subscribeFuture;
183     }
184 
185     /**
186      * Tells the Mqtt5Client to attempt to unsubscribe from one or more topic filters.
187      *
188      * @param unsubscribePacket UNSUBSCRIBE packet to send to the server
189      * @return a future that will be rejected with an error or resolved with the UNSUBACK response
190      */
unsubscribe(UnsubscribePacket unsubscribePacket)191     public CompletableFuture<UnsubAckPacket> unsubscribe(UnsubscribePacket unsubscribePacket) {
192         CompletableFuture<UnsubAckPacket> unsubscribeFuture = new CompletableFuture<>();
193         mqtt5ClientInternalUnsubscribe(getNativeHandle(), unsubscribePacket, unsubscribeFuture);
194         return unsubscribeFuture;
195     }
196 
197     /**
198      * Returns statistics about the current state of the Mqtt5Client's queue of operations.
199      * @return Current state of the client's queue of operations.
200      */
getOperationStatistics()201     public Mqtt5ClientOperationStatistics getOperationStatistics() {
202         return mqtt5ClientInternalGetOperationStatistics(getNativeHandle());
203     }
204 
205     /**
206      * Returns the connectivity state for the Mqtt5Client.
207      * @return True if the client is connected, false otherwise
208      */
getIsConnected()209     public synchronized boolean getIsConnected() {
210         return isConnected;
211     }
212 
213     /**
214      * Sets the connectivity state of the Mqtt5Client. Is used by JNI.
215      * @param connected The current connectivity state of the Mqtt5Client
216      */
setIsConnected(boolean connected)217     private synchronized void setIsConnected(boolean connected) {
218         isConnected = connected;
219     }
220 
221 
222     /*******************************************************************************
223      * Mqtt5 to Mqtt3 Adapter
224      ******************************************************************************/
225 
226     /**
227      * Returns the Mqtt5ClientOptions used for the Mqtt5Client
228      *
229      * @return Mqtt5ClientOptions
230      */
getClientOptions()231     public Mqtt5ClientOptions getClientOptions()
232     {
233         return clientOptions;
234     }
235 
236     /*******************************************************************************
237      * websocket methods
238      ******************************************************************************/
239 
240     /**
241      * Called from native when a websocket handshake request is being prepared.
242      * @param handshakeRequest The HttpRequest being prepared
243      * @param nativeUserData Native data
244      */
onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData)245     private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData) {
246         CompletableFuture<HttpRequest> future = new CompletableFuture<>();
247         future.whenComplete((x, throwable) -> {
248             mqtt5ClientInternalWebsocketHandshakeComplete(getNativeHandle(), x != null ? x.marshalForJni() : null,
249                     throwable, nativeUserData);
250         });
251 
252         Mqtt5WebsocketHandshakeTransformArgs args = new Mqtt5WebsocketHandshakeTransformArgs(this, handshakeRequest, future);
253 
254         Consumer<Mqtt5WebsocketHandshakeTransformArgs> transform = this.websocketHandshakeTransform;
255         if (transform != null) {
256             transform.accept(args);
257         } else {
258             args.complete(handshakeRequest);
259         }
260     }
261 
262 
263     /*******************************************************************************
264      * native methods
265      ******************************************************************************/
mqtt5ClientNew( Mqtt5ClientOptions options, ConnectPacket connect_options, ClientBootstrap bootstrap, Mqtt5Client client )266     private static native long mqtt5ClientNew(
267         Mqtt5ClientOptions options,
268         ConnectPacket connect_options,
269         ClientBootstrap bootstrap,
270         Mqtt5Client client
271     ) throws CrtRuntimeException;
mqtt5ClientDestroy(long client)272     private static native void mqtt5ClientDestroy(long client);
mqtt5ClientInternalStart(long client)273     private static native void mqtt5ClientInternalStart(long client);
mqtt5ClientInternalStop(long client, DisconnectPacket disconnect_options)274     private static native void mqtt5ClientInternalStop(long client, DisconnectPacket disconnect_options);
mqtt5ClientInternalPublish(long client, PublishPacket publish_options, CompletableFuture<PublishResult> publish_result)275     private static native void mqtt5ClientInternalPublish(long client, PublishPacket publish_options, CompletableFuture<PublishResult> publish_result);
mqtt5ClientInternalSubscribe(long client, SubscribePacket subscribe_options, CompletableFuture<SubAckPacket> subscribe_suback)276     private static native void mqtt5ClientInternalSubscribe(long client, SubscribePacket subscribe_options, CompletableFuture<SubAckPacket> subscribe_suback);
mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture<UnsubAckPacket> unsubscribe_suback)277     private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture<UnsubAckPacket> unsubscribe_suback);
mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData)278     private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException;
mqtt5ClientInternalGetOperationStatistics(long client)279     private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client);
280 }
281