• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 /**
3  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4  * SPDX-License-Identifier: Apache-2.0.
5  */
6 package software.amazon.awssdk.crt.mqtt;
7 
8 import software.amazon.awssdk.crt.AsyncCallback;
9 import software.amazon.awssdk.crt.CrtResource;
10 import software.amazon.awssdk.crt.CrtRuntimeException;
11 import software.amazon.awssdk.crt.http.HttpHeader;
12 import software.amazon.awssdk.crt.http.HttpProxyOptions;
13 import software.amazon.awssdk.crt.http.HttpRequest;
14 import software.amazon.awssdk.crt.io.SocketOptions;
15 import software.amazon.awssdk.crt.io.TlsContext;
16 import software.amazon.awssdk.crt.mqtt.MqttConnectionConfig;
17 import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
18 import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions;
19 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;
20 
21 import java.util.concurrent.CompletableFuture;
22 import java.util.function.Consumer;
23 
24 /**
25  * This class wraps aws-c-mqtt to provide the basic MQTT pub/sub functionality
26  * via the AWS Common Runtime
27  *
28  * MqttClientConnection represents a single connection from one MqttClient to an
29  * MQTT service endpoint
30  */
31 public class MqttClientConnection extends CrtResource {
32 
33     private static final int MAX_PORT = 65535;
34 
35     private MqttConnectionConfig config;
36 
37     private AsyncCallback connectAck;
38 
39     /**
40      * Wraps the handler provided by the user so that an MqttMessage can be
41      * constructed from the published buffer and topic
42      */
43     private class MessageHandler {
44         Consumer<MqttMessage> callback;
45 
MessageHandler(Consumer<MqttMessage> callback)46         private MessageHandler(Consumer<MqttMessage> callback) {
47             this.callback = callback;
48         }
49 
50         /* called from native when a message is delivered */
deliver(String topic, byte[] payload, boolean dup, int qos, boolean retain)51         void deliver(String topic, byte[] payload, boolean dup, int qos, boolean retain) {
52             QualityOfService qosEnum = QualityOfService.getEnumValueFromInteger(qos);
53             callback.accept(new MqttMessage(topic, payload, qosEnum, retain, dup));
54         }
55     }
56 
57     /**
58      * Static help function to create a MqttConnectionConfig from a
59      * Mqtt5ClientOptions
60      */
s_toMqtt3ConnectionConfig(Mqtt5ClientOptions mqtt5options)61     private static MqttConnectionConfig s_toMqtt3ConnectionConfig(Mqtt5ClientOptions mqtt5options) {
62         MqttConnectionConfig options = new MqttConnectionConfig();
63         options.setEndpoint(mqtt5options.getHostName());
64         options.setPort(mqtt5options.getPort() != null ? Math.toIntExact(mqtt5options.getPort()) : 0);
65         options.setSocketOptions(mqtt5options.getSocketOptions());
66         if (mqtt5options.getConnectOptions() != null) {
67             options.setClientId(mqtt5options.getConnectOptions().getClientId());
68             options.setKeepAliveSecs(
69                     mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds() != null
70                             ? Math.toIntExact(mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds())
71                             : 0);
72         }
73         options.setCleanSession(
74                 mqtt5options.getSessionBehavior().compareTo(Mqtt5ClientOptions.ClientSessionBehavior.CLEAN) <= 0);
75         options.setPingTimeoutMs(
76                 mqtt5options.getPingTimeoutMs() != null ? Math.toIntExact(mqtt5options.getPingTimeoutMs()) : 0);
77         options.setProtocolOperationTimeoutMs(mqtt5options.getAckTimeoutSeconds() != null
78                 ? Math.toIntExact(mqtt5options.getAckTimeoutSeconds()) * 1000
79                 : 0);
80         return options;
81     }
82 
83     /**
84      * Constructs a new MqttClientConnection. Connections are reusable after being
85      * disconnected.
86      *
87      * @param config Configuration to use
88      * @throws MqttException If mqttClient is null
89      */
MqttClientConnection(MqttConnectionConfig config)90     public MqttClientConnection(MqttConnectionConfig config) throws MqttException {
91         if (config.getMqttClient() == null) {
92             throw new MqttException("mqttClient must not be null");
93         }
94         if (config.getClientId() == null) {
95             throw new MqttException("clientId must not be null");
96         }
97         if (config.getEndpoint() == null) {
98             throw new MqttException("endpoint must not be null");
99         }
100         if (config.getPort() <= 0 || config.getPort() > MAX_PORT) {
101             throw new MqttException("port must be a positive integer between 1 and 65535");
102         }
103 
104         try {
105             acquireNativeHandle(mqttClientConnectionNewFrom311Client(config.getMqttClient().getNativeHandle(), this));
106             SetupConfig(config);
107 
108         } catch (CrtRuntimeException ex) {
109             throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
110         }
111     }
112 
113     /**
114      * Constructs a new MqttClientConnection from a Mqtt5Client. Connections are
115      * reusable after being
116      * disconnected.
117      *
118      * @param mqtt5client the mqtt5 client to setup from
119      * @param callbacks   connection callbacks triggered when receive connection
120      *                    events
121      *
122      * @throws MqttException If mqttClient is null
123      */
MqttClientConnection(Mqtt5Client mqtt5client, MqttClientConnectionEvents callbacks)124     public MqttClientConnection(Mqtt5Client mqtt5client, MqttClientConnectionEvents callbacks) throws MqttException {
125         if (mqtt5client == null) {
126             throw new MqttException("mqttClient must not be null");
127         }
128 
129         try (MqttConnectionConfig config = s_toMqtt3ConnectionConfig(mqtt5client.getClientOptions())) {
130             config.setMqtt5Client(mqtt5client);
131             if (callbacks != null) {
132                 config.setConnectionCallbacks(callbacks);
133             }
134 
135             if (config.getClientId() == null) {
136                 throw new MqttException("clientId must not be null");
137             }
138             if (config.getEndpoint() == null) {
139                 throw new MqttException("endpoint must not be null");
140             }
141             if (config.getPort() <= 0 || config.getPort() > MAX_PORT) {
142                 throw new MqttException("port must be a positive integer between 1 and 65535");
143             }
144 
145             try {
146                 acquireNativeHandle(
147                         mqttClientConnectionNewFrom5Client(config.getMqtt5Client().getNativeHandle(), this));
148                 SetupConfig(config);
149 
150             } catch (CrtRuntimeException ex) {
151                 throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
152             }
153         } catch (Exception e) {
154             throw new MqttException("Failed to setup mqtt3 connection : " + e.getMessage());
155         }
156 
157     }
158 
SetupConfig(MqttConnectionConfig config)159     private void SetupConfig(MqttConnectionConfig config) throws MqttException {
160         try {
161             if (config.getUsername() != null) {
162                 mqttClientConnectionSetLogin(getNativeHandle(), config.getUsername(), config.getPassword());
163             }
164 
165             if (config.getMinReconnectTimeoutSecs() != 0L && config.getMaxReconnectTimeoutSecs() != 0L) {
166                 mqttClientConnectionSetReconnectTimeout(getNativeHandle(), config.getMinReconnectTimeoutSecs(),
167                         config.getMaxReconnectTimeoutSecs());
168             }
169 
170             MqttMessage message = config.getWillMessage();
171             if (message != null) {
172                 mqttClientConnectionSetWill(getNativeHandle(), message.getTopic(), message.getQos().getValue(),
173                         message.getRetain(), message.getPayload());
174             }
175 
176             if (config.getUseWebsockets()) {
177                 mqttClientConnectionUseWebsockets(getNativeHandle());
178             }
179 
180             if (config.getHttpProxyOptions() != null) {
181                 HttpProxyOptions options = config.getHttpProxyOptions();
182                 TlsContext proxyTlsContext = options.getTlsContext();
183                 mqttClientConnectionSetHttpProxyOptions(getNativeHandle(),
184                         options.getConnectionType().getValue(),
185                         options.getHost(),
186                         options.getPort(),
187                         proxyTlsContext != null ? proxyTlsContext.getNativeHandle() : 0,
188                         options.getAuthorizationType().getValue(),
189                         options.getAuthorizationUsername(),
190                         options.getAuthorizationPassword());
191             }
192 
193             addReferenceTo(config);
194             this.config = config;
195 
196         } catch (CrtRuntimeException ex) {
197             throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
198         }
199     }
200 
201     /**
202      * Frees native resources associated with this connection.
203      */
204     @Override
releaseNativeHandle()205     protected void releaseNativeHandle() {
206         mqttClientConnectionDestroy(getNativeHandle());
207     }
208 
209     /**
210      * Determines whether a resource releases its dependencies at the same time the
211      * native handle is released or if it waits. Resources that wait are responsible
212      * for calling releaseReferences() manually.
213      */
214     @Override
canReleaseReferencesImmediately()215     protected boolean canReleaseReferencesImmediately() {
216         return false;
217     }
218 
219     // Called from native when the connection is established the first time
onConnectionComplete(int errorCode, boolean sessionPresent)220     private void onConnectionComplete(int errorCode, boolean sessionPresent) {
221         if (connectAck != null) {
222             if (errorCode == 0) {
223                 connectAck.onSuccess(sessionPresent);
224             } else {
225                 connectAck.onFailure(new MqttException(errorCode));
226             }
227             connectAck = null;
228         }
229     }
230 
231     // Called when the connection drops or is disconnected. If errorCode == 0, the
232     // disconnect was intentional.
onConnectionInterrupted(int errorCode, AsyncCallback callback)233     private void onConnectionInterrupted(int errorCode, AsyncCallback callback) {
234         if (callback != null) {
235             if (errorCode == 0) {
236                 callback.onSuccess();
237             } else {
238                 callback.onFailure(new MqttException(errorCode));
239             }
240         }
241         MqttClientConnectionEvents callbacks = config.getConnectionCallbacks();
242         if (callbacks != null) {
243             callbacks.onConnectionInterrupted(errorCode);
244         }
245     }
246 
247     // called when the connection or reconnection is successful
onConnectionSuccess(boolean sessionPresent)248     private void onConnectionSuccess(boolean sessionPresent) {
249         MqttClientConnectionEvents callbacks = config.getConnectionCallbacks();
250         if (callbacks != null) {
251             OnConnectionSuccessReturn returnData = new OnConnectionSuccessReturn(sessionPresent);
252             callbacks.onConnectionSuccess(returnData);
253         }
254     }
255 
256     // called when the connection drops
onConnectionFailure(int errorCode)257     private void onConnectionFailure(int errorCode) {
258         MqttClientConnectionEvents callbacks = config.getConnectionCallbacks();
259         if (callbacks != null) {
260             OnConnectionFailureReturn returnData = new OnConnectionFailureReturn(errorCode);
261             callbacks.onConnectionFailure(returnData);
262         }
263     }
264 
265     // Called when a reconnect succeeds, and also on initial connection success.
onConnectionResumed(boolean sessionPresent)266     private void onConnectionResumed(boolean sessionPresent) {
267         MqttClientConnectionEvents callbacks = config.getConnectionCallbacks();
268         if (callbacks != null) {
269             callbacks.onConnectionResumed(sessionPresent);
270             OnConnectionSuccessReturn returnData = new OnConnectionSuccessReturn(sessionPresent);
271             callbacks.onConnectionSuccess(returnData);
272         }
273     }
274 
275     // Called when the connection is disconnected successfully and intentionally.
onConnectionClosed()276     private void onConnectionClosed() {
277         if (config != null) {
278             MqttClientConnectionEvents callbacks = config.getConnectionCallbacks();
279             if (callbacks != null) {
280                 OnConnectionClosedReturn returnData = new OnConnectionClosedReturn();
281                 callbacks.onConnectionClosed(returnData);
282             }
283         }
284     }
285 
286     /**
287      * Connect to the service endpoint and start a session
288      *
289      * @return Future result is true if resuming a session, false if clean session
290      * @throws MqttException If the port is out of range
291      */
connect()292     public CompletableFuture<Boolean> connect() throws MqttException {
293 
294         TlsContext tls = null;
295         if (config.getMqttClient() != null) {
296             tls = config.getMqttClient().getTlsContext();
297         } else if (config.getMqtt5Client() != null) {
298             tls = config.getMqtt5Client().getClientOptions().getTlsContext();
299         }
300 
301         // Just clamp the pingTimeout, no point in throwing
302         short pingTimeout = (short) Math.max(0, Math.min(config.getPingTimeoutMs(), Short.MAX_VALUE));
303 
304         int port = config.getPort();
305         if (port > MAX_PORT || port <= 0) {
306             throw new MqttException("Port must be betweeen 0 and 65535");
307         }
308         CompletableFuture<Boolean> future = new CompletableFuture<>();
309         connectAck = AsyncCallback.wrapFuture(future, null);
310         SocketOptions socketOptions = config.getSocketOptions();
311 
312         try {
313             mqttClientConnectionConnect(getNativeHandle(), config.getEndpoint(), port,
314                     socketOptions != null ? socketOptions.getNativeHandle() : 0,
315                     tls != null ? tls.getNativeHandle() : 0, config.getClientId(), config.getCleanSession(),
316                     config.getKeepAliveSecs(), pingTimeout, config.getProtocolOperationTimeoutMs());
317 
318         } catch (CrtRuntimeException ex) {
319             future.completeExceptionally(ex);
320         }
321         return future;
322     }
323 
324     /**
325      * Disconnects the current session
326      *
327      * @return When this future completes, the disconnection is complete
328      */
disconnect()329     public CompletableFuture<Void> disconnect() {
330         CompletableFuture<Void> future = new CompletableFuture<>();
331         if (isNull()) {
332             future.complete(null);
333             return future;
334         }
335         AsyncCallback disconnectAck = AsyncCallback.wrapFuture(future, null);
336         mqttClientConnectionDisconnect(getNativeHandle(), disconnectAck);
337         return future;
338     }
339 
340     /**
341      * Subscribes to a topic
342      *
343      * @param topic   The topic to subscribe to
344      * @param qos     {@link QualityOfService} for this subscription
345      * @param handler A handler which can receive an MqttMessage when a message is
346      *                published to the topic
347      * @return Future result is the packet/message id associated with the subscribe
348      *         operation
349      */
subscribe(String topic, QualityOfService qos, Consumer<MqttMessage> handler)350     public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos, Consumer<MqttMessage> handler) {
351         CompletableFuture<Integer> future = new CompletableFuture<>();
352         if (isNull()) {
353             future.completeExceptionally(new MqttException("Invalid connection during subscribe"));
354             return future;
355         }
356 
357         AsyncCallback subAck = AsyncCallback.wrapFuture(future, 0);
358         try {
359             int packetId = mqttClientConnectionSubscribe(getNativeHandle(), topic, qos.getValue(),
360                     handler != null ? new MessageHandler(handler) : null, subAck);
361             // When the future completes, complete the returned future with the packetId
362             return future.thenApply(unused -> packetId);
363         } catch (CrtRuntimeException ex) {
364             future.completeExceptionally(ex);
365             return future;
366         }
367     }
368 
369     /**
370      * Subscribes to a topic without a handler (messages will only be delivered to
371      * the OnMessage handler)
372      *
373      * @param topic The topic to subscribe to
374      * @param qos   {@link QualityOfService} for this subscription
375      * @return Future result is the packet/message id associated with the subscribe
376      *         operation
377      */
subscribe(String topic, QualityOfService qos)378     public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos) {
379         return subscribe(topic, qos, null);
380     }
381 
382     /**
383      * Sets a handler to be invoked whenever a message arrives, subscription or not
384      *
385      * @param handler A handler which can receive any MqttMessage
386      */
onMessage(Consumer<MqttMessage> handler)387     public void onMessage(Consumer<MqttMessage> handler) {
388         mqttClientConnectionOnMessage(getNativeHandle(), new MessageHandler(handler));
389     }
390 
391     /**
392      * Unsubscribes from a topic
393      *
394      * @param topic The topic to unsubscribe from
395      * @return Future result is the packet/message id associated with the
396      *         unsubscribe operation
397      */
unsubscribe(String topic)398     public CompletableFuture<Integer> unsubscribe(String topic) {
399         CompletableFuture<Integer> future = new CompletableFuture<>();
400         if (isNull()) {
401             future.completeExceptionally(new MqttException("Invalid connection during unsubscribe"));
402             return future;
403         }
404 
405         AsyncCallback unsubAck = AsyncCallback.wrapFuture(future, 0);
406         int packetId = mqttClientConnectionUnsubscribe(getNativeHandle(), topic, unsubAck);
407         // When the future completes, complete the returned future with the packetId
408         return future.thenApply(unused -> packetId);
409     }
410 
411     /**
412      * Publishes a message to a topic.
413      *
414      * @param message The message to publish.
415      *
416      * @return Future value is the packet/message id associated with the publish
417      *         operation
418      */
publish(MqttMessage message)419     public CompletableFuture<Integer> publish(MqttMessage message) {
420         CompletableFuture<Integer> future = new CompletableFuture<>();
421         if (isNull()) {
422             future.completeExceptionally(new MqttException("Invalid connection during publish"));
423         }
424 
425         AsyncCallback pubAck = AsyncCallback.wrapFuture(future, 0);
426         try {
427             int packetId = mqttClientConnectionPublish(getNativeHandle(), message.getTopic(),
428                     message.getQos().getValue(), message.getRetain(), message.getPayload(), pubAck);
429             // When the future completes, complete the returned future with the packetId
430             return future.thenApply(unused -> packetId);
431         } catch (CrtRuntimeException ex) {
432             future.completeExceptionally(ex);
433             return future;
434         }
435     }
436 
437     @Deprecated
publish(MqttMessage message, QualityOfService qos, boolean retain)438     public CompletableFuture<Integer> publish(MqttMessage message, QualityOfService qos, boolean retain) {
439         return publish(new MqttMessage(message.getTopic(), message.getPayload(), qos, retain));
440     }
441 
442     // Called from native when a websocket handshake request is being prepared.
onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData)443     private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData) {
444         CompletableFuture<HttpRequest> future = new CompletableFuture<>();
445         future.whenComplete((x, throwable) -> {
446             mqttClientConnectionWebsocketHandshakeComplete(getNativeHandle(), x != null ? x.marshalForJni() : null,
447                     throwable, nativeUserData);
448         });
449 
450         WebsocketHandshakeTransformArgs args = new WebsocketHandshakeTransformArgs(this, handshakeRequest, future);
451 
452         Consumer<WebsocketHandshakeTransformArgs> transform = config.getWebsocketHandshakeTransform();
453         if (transform != null) {
454             transform.accept(args);
455         } else {
456             args.complete(handshakeRequest);
457         }
458     }
459 
460     /**
461      * Returns statistics about the current state of the MqttClientConnection's
462      * queue of operations.
463      *
464      * @return Current state of the connection's queue of operations.
465      */
getOperationStatistics()466     public MqttClientConnectionOperationStatistics getOperationStatistics() {
467         return mqttClientConnectionGetOperationStatistics(getNativeHandle());
468     }
469 
470     /*******************************************************************************
471      * Native methods
472      ******************************************************************************/
mqttClientConnectionNewFrom311Client(long client, MqttClientConnection thisObj)473     private static native long mqttClientConnectionNewFrom311Client(long client, MqttClientConnection thisObj)
474             throws CrtRuntimeException;
475 
mqttClientConnectionNewFrom5Client(long client, MqttClientConnection thisObj)476     private static native long mqttClientConnectionNewFrom5Client(long client, MqttClientConnection thisObj)
477             throws CrtRuntimeException;
478 
mqttClientConnectionDestroy(long connection)479     private static native void mqttClientConnectionDestroy(long connection);
480 
mqttClientConnectionConnect(long connection, String endpoint, int port, long socketOptions, long tlsContext, String clientId, boolean cleanSession, int keepAliveMs, short pingTimeoutMs, int protocolOperationTimeoutMs)481     private static native void mqttClientConnectionConnect(long connection, String endpoint, int port,
482             long socketOptions, long tlsContext, String clientId, boolean cleanSession, int keepAliveMs,
483             short pingTimeoutMs, int protocolOperationTimeoutMs) throws CrtRuntimeException;
484 
mqttClientConnectionDisconnect(long connection, AsyncCallback ack)485     private static native void mqttClientConnectionDisconnect(long connection, AsyncCallback ack);
486 
mqttClientConnectionSubscribe(long connection, String topic, int qos, MessageHandler handler, AsyncCallback ack)487     private static native short mqttClientConnectionSubscribe(long connection, String topic, int qos,
488             MessageHandler handler, AsyncCallback ack) throws CrtRuntimeException;
489 
mqttClientConnectionOnMessage(long connection, MessageHandler handler)490     private static native void mqttClientConnectionOnMessage(long connection, MessageHandler handler)
491             throws CrtRuntimeException;
492 
mqttClientConnectionUnsubscribe(long connection, String topic, AsyncCallback ack)493     private static native short mqttClientConnectionUnsubscribe(long connection, String topic, AsyncCallback ack);
494 
mqttClientConnectionPublish(long connection, String topic, int qos, boolean retain, byte[] payload, AsyncCallback ack)495     private static native short mqttClientConnectionPublish(long connection, String topic, int qos, boolean retain,
496             byte[] payload, AsyncCallback ack) throws CrtRuntimeException;
497 
mqttClientConnectionSetWill(long connection, String topic, int qos, boolean retain, byte[] payload)498     private static native boolean mqttClientConnectionSetWill(long connection, String topic, int qos, boolean retain,
499             byte[] payload) throws CrtRuntimeException;
500 
mqttClientConnectionSetLogin(long connection, String username, String password)501     private static native void mqttClientConnectionSetLogin(long connection, String username, String password)
502             throws CrtRuntimeException;
503 
mqttClientConnectionSetReconnectTimeout(long connection, long minTimeout, long maxTimeout)504     private static native void mqttClientConnectionSetReconnectTimeout(long connection, long minTimeout,
505             long maxTimeout)
506             throws CrtRuntimeException;
507 
mqttClientConnectionUseWebsockets(long connection)508     private static native void mqttClientConnectionUseWebsockets(long connection) throws CrtRuntimeException;
509 
mqttClientConnectionWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData)510     private static native void mqttClientConnectionWebsocketHandshakeComplete(long connection, byte[] marshalledRequest,
511             Throwable throwable,
512             long nativeUserData) throws CrtRuntimeException;
513 
mqttClientConnectionSetHttpProxyOptions(long connection, int proxyConnectionType, String proxyHost, int proxyPort, long proxyTlsContext, int proxyAuthorizationType, String proxyAuthorizationUsername, String proxyAuthorizationPassword)514     private static native void mqttClientConnectionSetHttpProxyOptions(long connection,
515             int proxyConnectionType,
516             String proxyHost,
517             int proxyPort,
518             long proxyTlsContext,
519             int proxyAuthorizationType,
520             String proxyAuthorizationUsername,
521             String proxyAuthorizationPassword) throws CrtRuntimeException;
522 
mqttClientConnectionGetOperationStatistics( long connection)523     private static native MqttClientConnectionOperationStatistics mqttClientConnectionGetOperationStatistics(
524             long connection);
525 
526 };
527