1 2 /** 3 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 * SPDX-License-Identifier: Apache-2.0. 5 */ 6 package software.amazon.awssdk.crt.mqtt; 7 8 import software.amazon.awssdk.crt.AsyncCallback; 9 import software.amazon.awssdk.crt.CrtResource; 10 import software.amazon.awssdk.crt.CrtRuntimeException; 11 import software.amazon.awssdk.crt.http.HttpHeader; 12 import software.amazon.awssdk.crt.http.HttpProxyOptions; 13 import software.amazon.awssdk.crt.http.HttpRequest; 14 import software.amazon.awssdk.crt.io.SocketOptions; 15 import software.amazon.awssdk.crt.io.TlsContext; 16 import software.amazon.awssdk.crt.mqtt.MqttConnectionConfig; 17 import software.amazon.awssdk.crt.mqtt5.Mqtt5Client; 18 import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions; 19 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket; 20 21 import java.util.concurrent.CompletableFuture; 22 import java.util.function.Consumer; 23 24 /** 25 * This class wraps aws-c-mqtt to provide the basic MQTT pub/sub functionality 26 * via the AWS Common Runtime 27 * 28 * MqttClientConnection represents a single connection from one MqttClient to an 29 * MQTT service endpoint 30 */ 31 public class MqttClientConnection extends CrtResource { 32 33 private static final int MAX_PORT = 65535; 34 35 private MqttConnectionConfig config; 36 37 private AsyncCallback connectAck; 38 39 /** 40 * Wraps the handler provided by the user so that an MqttMessage can be 41 * constructed from the published buffer and topic 42 */ 43 private class MessageHandler { 44 Consumer<MqttMessage> callback; 45 MessageHandler(Consumer<MqttMessage> callback)46 private MessageHandler(Consumer<MqttMessage> callback) { 47 this.callback = callback; 48 } 49 50 /* called from native when a message is delivered */ deliver(String topic, byte[] payload, boolean dup, int qos, boolean retain)51 void deliver(String topic, byte[] payload, boolean dup, int qos, boolean retain) { 52 QualityOfService qosEnum = QualityOfService.getEnumValueFromInteger(qos); 53 callback.accept(new MqttMessage(topic, payload, qosEnum, retain, dup)); 54 } 55 } 56 57 /** 58 * Static help function to create a MqttConnectionConfig from a 59 * Mqtt5ClientOptions 60 */ s_toMqtt3ConnectionConfig(Mqtt5ClientOptions mqtt5options)61 private static MqttConnectionConfig s_toMqtt3ConnectionConfig(Mqtt5ClientOptions mqtt5options) { 62 MqttConnectionConfig options = new MqttConnectionConfig(); 63 options.setEndpoint(mqtt5options.getHostName()); 64 options.setPort(mqtt5options.getPort() != null ? Math.toIntExact(mqtt5options.getPort()) : 0); 65 options.setSocketOptions(mqtt5options.getSocketOptions()); 66 if (mqtt5options.getConnectOptions() != null) { 67 options.setClientId(mqtt5options.getConnectOptions().getClientId()); 68 options.setKeepAliveSecs( 69 mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds() != null 70 ? Math.toIntExact(mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds()) 71 : 0); 72 } 73 options.setCleanSession( 74 mqtt5options.getSessionBehavior().compareTo(Mqtt5ClientOptions.ClientSessionBehavior.CLEAN) <= 0); 75 options.setPingTimeoutMs( 76 mqtt5options.getPingTimeoutMs() != null ? Math.toIntExact(mqtt5options.getPingTimeoutMs()) : 0); 77 options.setProtocolOperationTimeoutMs(mqtt5options.getAckTimeoutSeconds() != null 78 ? Math.toIntExact(mqtt5options.getAckTimeoutSeconds()) * 1000 79 : 0); 80 return options; 81 } 82 83 /** 84 * Constructs a new MqttClientConnection. Connections are reusable after being 85 * disconnected. 86 * 87 * @param config Configuration to use 88 * @throws MqttException If mqttClient is null 89 */ MqttClientConnection(MqttConnectionConfig config)90 public MqttClientConnection(MqttConnectionConfig config) throws MqttException { 91 if (config.getMqttClient() == null) { 92 throw new MqttException("mqttClient must not be null"); 93 } 94 if (config.getClientId() == null) { 95 throw new MqttException("clientId must not be null"); 96 } 97 if (config.getEndpoint() == null) { 98 throw new MqttException("endpoint must not be null"); 99 } 100 if (config.getPort() <= 0 || config.getPort() > MAX_PORT) { 101 throw new MqttException("port must be a positive integer between 1 and 65535"); 102 } 103 104 try { 105 acquireNativeHandle(mqttClientConnectionNewFrom311Client(config.getMqttClient().getNativeHandle(), this)); 106 SetupConfig(config); 107 108 } catch (CrtRuntimeException ex) { 109 throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage()); 110 } 111 } 112 113 /** 114 * Constructs a new MqttClientConnection from a Mqtt5Client. Connections are 115 * reusable after being 116 * disconnected. 117 * 118 * @param mqtt5client the mqtt5 client to setup from 119 * @param callbacks connection callbacks triggered when receive connection 120 * events 121 * 122 * @throws MqttException If mqttClient is null 123 */ MqttClientConnection(Mqtt5Client mqtt5client, MqttClientConnectionEvents callbacks)124 public MqttClientConnection(Mqtt5Client mqtt5client, MqttClientConnectionEvents callbacks) throws MqttException { 125 if (mqtt5client == null) { 126 throw new MqttException("mqttClient must not be null"); 127 } 128 129 try (MqttConnectionConfig config = s_toMqtt3ConnectionConfig(mqtt5client.getClientOptions())) { 130 config.setMqtt5Client(mqtt5client); 131 if (callbacks != null) { 132 config.setConnectionCallbacks(callbacks); 133 } 134 135 if (config.getClientId() == null) { 136 throw new MqttException("clientId must not be null"); 137 } 138 if (config.getEndpoint() == null) { 139 throw new MqttException("endpoint must not be null"); 140 } 141 if (config.getPort() <= 0 || config.getPort() > MAX_PORT) { 142 throw new MqttException("port must be a positive integer between 1 and 65535"); 143 } 144 145 try { 146 acquireNativeHandle( 147 mqttClientConnectionNewFrom5Client(config.getMqtt5Client().getNativeHandle(), this)); 148 SetupConfig(config); 149 150 } catch (CrtRuntimeException ex) { 151 throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage()); 152 } 153 } catch (Exception e) { 154 throw new MqttException("Failed to setup mqtt3 connection : " + e.getMessage()); 155 } 156 157 } 158 SetupConfig(MqttConnectionConfig config)159 private void SetupConfig(MqttConnectionConfig config) throws MqttException { 160 try { 161 if (config.getUsername() != null) { 162 mqttClientConnectionSetLogin(getNativeHandle(), config.getUsername(), config.getPassword()); 163 } 164 165 if (config.getMinReconnectTimeoutSecs() != 0L && config.getMaxReconnectTimeoutSecs() != 0L) { 166 mqttClientConnectionSetReconnectTimeout(getNativeHandle(), config.getMinReconnectTimeoutSecs(), 167 config.getMaxReconnectTimeoutSecs()); 168 } 169 170 MqttMessage message = config.getWillMessage(); 171 if (message != null) { 172 mqttClientConnectionSetWill(getNativeHandle(), message.getTopic(), message.getQos().getValue(), 173 message.getRetain(), message.getPayload()); 174 } 175 176 if (config.getUseWebsockets()) { 177 mqttClientConnectionUseWebsockets(getNativeHandle()); 178 } 179 180 if (config.getHttpProxyOptions() != null) { 181 HttpProxyOptions options = config.getHttpProxyOptions(); 182 TlsContext proxyTlsContext = options.getTlsContext(); 183 mqttClientConnectionSetHttpProxyOptions(getNativeHandle(), 184 options.getConnectionType().getValue(), 185 options.getHost(), 186 options.getPort(), 187 proxyTlsContext != null ? proxyTlsContext.getNativeHandle() : 0, 188 options.getAuthorizationType().getValue(), 189 options.getAuthorizationUsername(), 190 options.getAuthorizationPassword()); 191 } 192 193 addReferenceTo(config); 194 this.config = config; 195 196 } catch (CrtRuntimeException ex) { 197 throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage()); 198 } 199 } 200 201 /** 202 * Frees native resources associated with this connection. 203 */ 204 @Override releaseNativeHandle()205 protected void releaseNativeHandle() { 206 mqttClientConnectionDestroy(getNativeHandle()); 207 } 208 209 /** 210 * Determines whether a resource releases its dependencies at the same time the 211 * native handle is released or if it waits. Resources that wait are responsible 212 * for calling releaseReferences() manually. 213 */ 214 @Override canReleaseReferencesImmediately()215 protected boolean canReleaseReferencesImmediately() { 216 return false; 217 } 218 219 // Called from native when the connection is established the first time onConnectionComplete(int errorCode, boolean sessionPresent)220 private void onConnectionComplete(int errorCode, boolean sessionPresent) { 221 if (connectAck != null) { 222 if (errorCode == 0) { 223 connectAck.onSuccess(sessionPresent); 224 } else { 225 connectAck.onFailure(new MqttException(errorCode)); 226 } 227 connectAck = null; 228 } 229 } 230 231 // Called when the connection drops or is disconnected. If errorCode == 0, the 232 // disconnect was intentional. onConnectionInterrupted(int errorCode, AsyncCallback callback)233 private void onConnectionInterrupted(int errorCode, AsyncCallback callback) { 234 if (callback != null) { 235 if (errorCode == 0) { 236 callback.onSuccess(); 237 } else { 238 callback.onFailure(new MqttException(errorCode)); 239 } 240 } 241 MqttClientConnectionEvents callbacks = config.getConnectionCallbacks(); 242 if (callbacks != null) { 243 callbacks.onConnectionInterrupted(errorCode); 244 } 245 } 246 247 // called when the connection or reconnection is successful onConnectionSuccess(boolean sessionPresent)248 private void onConnectionSuccess(boolean sessionPresent) { 249 MqttClientConnectionEvents callbacks = config.getConnectionCallbacks(); 250 if (callbacks != null) { 251 OnConnectionSuccessReturn returnData = new OnConnectionSuccessReturn(sessionPresent); 252 callbacks.onConnectionSuccess(returnData); 253 } 254 } 255 256 // called when the connection drops onConnectionFailure(int errorCode)257 private void onConnectionFailure(int errorCode) { 258 MqttClientConnectionEvents callbacks = config.getConnectionCallbacks(); 259 if (callbacks != null) { 260 OnConnectionFailureReturn returnData = new OnConnectionFailureReturn(errorCode); 261 callbacks.onConnectionFailure(returnData); 262 } 263 } 264 265 // Called when a reconnect succeeds, and also on initial connection success. onConnectionResumed(boolean sessionPresent)266 private void onConnectionResumed(boolean sessionPresent) { 267 MqttClientConnectionEvents callbacks = config.getConnectionCallbacks(); 268 if (callbacks != null) { 269 callbacks.onConnectionResumed(sessionPresent); 270 OnConnectionSuccessReturn returnData = new OnConnectionSuccessReturn(sessionPresent); 271 callbacks.onConnectionSuccess(returnData); 272 } 273 } 274 275 // Called when the connection is disconnected successfully and intentionally. onConnectionClosed()276 private void onConnectionClosed() { 277 if (config != null) { 278 MqttClientConnectionEvents callbacks = config.getConnectionCallbacks(); 279 if (callbacks != null) { 280 OnConnectionClosedReturn returnData = new OnConnectionClosedReturn(); 281 callbacks.onConnectionClosed(returnData); 282 } 283 } 284 } 285 286 /** 287 * Connect to the service endpoint and start a session 288 * 289 * @return Future result is true if resuming a session, false if clean session 290 * @throws MqttException If the port is out of range 291 */ connect()292 public CompletableFuture<Boolean> connect() throws MqttException { 293 294 TlsContext tls = null; 295 if (config.getMqttClient() != null) { 296 tls = config.getMqttClient().getTlsContext(); 297 } else if (config.getMqtt5Client() != null) { 298 tls = config.getMqtt5Client().getClientOptions().getTlsContext(); 299 } 300 301 // Just clamp the pingTimeout, no point in throwing 302 short pingTimeout = (short) Math.max(0, Math.min(config.getPingTimeoutMs(), Short.MAX_VALUE)); 303 304 int port = config.getPort(); 305 if (port > MAX_PORT || port <= 0) { 306 throw new MqttException("Port must be betweeen 0 and 65535"); 307 } 308 CompletableFuture<Boolean> future = new CompletableFuture<>(); 309 connectAck = AsyncCallback.wrapFuture(future, null); 310 SocketOptions socketOptions = config.getSocketOptions(); 311 312 try { 313 mqttClientConnectionConnect(getNativeHandle(), config.getEndpoint(), port, 314 socketOptions != null ? socketOptions.getNativeHandle() : 0, 315 tls != null ? tls.getNativeHandle() : 0, config.getClientId(), config.getCleanSession(), 316 config.getKeepAliveSecs(), pingTimeout, config.getProtocolOperationTimeoutMs()); 317 318 } catch (CrtRuntimeException ex) { 319 future.completeExceptionally(ex); 320 } 321 return future; 322 } 323 324 /** 325 * Disconnects the current session 326 * 327 * @return When this future completes, the disconnection is complete 328 */ disconnect()329 public CompletableFuture<Void> disconnect() { 330 CompletableFuture<Void> future = new CompletableFuture<>(); 331 if (isNull()) { 332 future.complete(null); 333 return future; 334 } 335 AsyncCallback disconnectAck = AsyncCallback.wrapFuture(future, null); 336 mqttClientConnectionDisconnect(getNativeHandle(), disconnectAck); 337 return future; 338 } 339 340 /** 341 * Subscribes to a topic 342 * 343 * @param topic The topic to subscribe to 344 * @param qos {@link QualityOfService} for this subscription 345 * @param handler A handler which can receive an MqttMessage when a message is 346 * published to the topic 347 * @return Future result is the packet/message id associated with the subscribe 348 * operation 349 */ subscribe(String topic, QualityOfService qos, Consumer<MqttMessage> handler)350 public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos, Consumer<MqttMessage> handler) { 351 CompletableFuture<Integer> future = new CompletableFuture<>(); 352 if (isNull()) { 353 future.completeExceptionally(new MqttException("Invalid connection during subscribe")); 354 return future; 355 } 356 357 AsyncCallback subAck = AsyncCallback.wrapFuture(future, 0); 358 try { 359 int packetId = mqttClientConnectionSubscribe(getNativeHandle(), topic, qos.getValue(), 360 handler != null ? new MessageHandler(handler) : null, subAck); 361 // When the future completes, complete the returned future with the packetId 362 return future.thenApply(unused -> packetId); 363 } catch (CrtRuntimeException ex) { 364 future.completeExceptionally(ex); 365 return future; 366 } 367 } 368 369 /** 370 * Subscribes to a topic without a handler (messages will only be delivered to 371 * the OnMessage handler) 372 * 373 * @param topic The topic to subscribe to 374 * @param qos {@link QualityOfService} for this subscription 375 * @return Future result is the packet/message id associated with the subscribe 376 * operation 377 */ subscribe(String topic, QualityOfService qos)378 public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos) { 379 return subscribe(topic, qos, null); 380 } 381 382 /** 383 * Sets a handler to be invoked whenever a message arrives, subscription or not 384 * 385 * @param handler A handler which can receive any MqttMessage 386 */ onMessage(Consumer<MqttMessage> handler)387 public void onMessage(Consumer<MqttMessage> handler) { 388 mqttClientConnectionOnMessage(getNativeHandle(), new MessageHandler(handler)); 389 } 390 391 /** 392 * Unsubscribes from a topic 393 * 394 * @param topic The topic to unsubscribe from 395 * @return Future result is the packet/message id associated with the 396 * unsubscribe operation 397 */ unsubscribe(String topic)398 public CompletableFuture<Integer> unsubscribe(String topic) { 399 CompletableFuture<Integer> future = new CompletableFuture<>(); 400 if (isNull()) { 401 future.completeExceptionally(new MqttException("Invalid connection during unsubscribe")); 402 return future; 403 } 404 405 AsyncCallback unsubAck = AsyncCallback.wrapFuture(future, 0); 406 int packetId = mqttClientConnectionUnsubscribe(getNativeHandle(), topic, unsubAck); 407 // When the future completes, complete the returned future with the packetId 408 return future.thenApply(unused -> packetId); 409 } 410 411 /** 412 * Publishes a message to a topic. 413 * 414 * @param message The message to publish. 415 * 416 * @return Future value is the packet/message id associated with the publish 417 * operation 418 */ publish(MqttMessage message)419 public CompletableFuture<Integer> publish(MqttMessage message) { 420 CompletableFuture<Integer> future = new CompletableFuture<>(); 421 if (isNull()) { 422 future.completeExceptionally(new MqttException("Invalid connection during publish")); 423 } 424 425 AsyncCallback pubAck = AsyncCallback.wrapFuture(future, 0); 426 try { 427 int packetId = mqttClientConnectionPublish(getNativeHandle(), message.getTopic(), 428 message.getQos().getValue(), message.getRetain(), message.getPayload(), pubAck); 429 // When the future completes, complete the returned future with the packetId 430 return future.thenApply(unused -> packetId); 431 } catch (CrtRuntimeException ex) { 432 future.completeExceptionally(ex); 433 return future; 434 } 435 } 436 437 @Deprecated publish(MqttMessage message, QualityOfService qos, boolean retain)438 public CompletableFuture<Integer> publish(MqttMessage message, QualityOfService qos, boolean retain) { 439 return publish(new MqttMessage(message.getTopic(), message.getPayload(), qos, retain)); 440 } 441 442 // Called from native when a websocket handshake request is being prepared. onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData)443 private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData) { 444 CompletableFuture<HttpRequest> future = new CompletableFuture<>(); 445 future.whenComplete((x, throwable) -> { 446 mqttClientConnectionWebsocketHandshakeComplete(getNativeHandle(), x != null ? x.marshalForJni() : null, 447 throwable, nativeUserData); 448 }); 449 450 WebsocketHandshakeTransformArgs args = new WebsocketHandshakeTransformArgs(this, handshakeRequest, future); 451 452 Consumer<WebsocketHandshakeTransformArgs> transform = config.getWebsocketHandshakeTransform(); 453 if (transform != null) { 454 transform.accept(args); 455 } else { 456 args.complete(handshakeRequest); 457 } 458 } 459 460 /** 461 * Returns statistics about the current state of the MqttClientConnection's 462 * queue of operations. 463 * 464 * @return Current state of the connection's queue of operations. 465 */ getOperationStatistics()466 public MqttClientConnectionOperationStatistics getOperationStatistics() { 467 return mqttClientConnectionGetOperationStatistics(getNativeHandle()); 468 } 469 470 /******************************************************************************* 471 * Native methods 472 ******************************************************************************/ mqttClientConnectionNewFrom311Client(long client, MqttClientConnection thisObj)473 private static native long mqttClientConnectionNewFrom311Client(long client, MqttClientConnection thisObj) 474 throws CrtRuntimeException; 475 mqttClientConnectionNewFrom5Client(long client, MqttClientConnection thisObj)476 private static native long mqttClientConnectionNewFrom5Client(long client, MqttClientConnection thisObj) 477 throws CrtRuntimeException; 478 mqttClientConnectionDestroy(long connection)479 private static native void mqttClientConnectionDestroy(long connection); 480 mqttClientConnectionConnect(long connection, String endpoint, int port, long socketOptions, long tlsContext, String clientId, boolean cleanSession, int keepAliveMs, short pingTimeoutMs, int protocolOperationTimeoutMs)481 private static native void mqttClientConnectionConnect(long connection, String endpoint, int port, 482 long socketOptions, long tlsContext, String clientId, boolean cleanSession, int keepAliveMs, 483 short pingTimeoutMs, int protocolOperationTimeoutMs) throws CrtRuntimeException; 484 mqttClientConnectionDisconnect(long connection, AsyncCallback ack)485 private static native void mqttClientConnectionDisconnect(long connection, AsyncCallback ack); 486 mqttClientConnectionSubscribe(long connection, String topic, int qos, MessageHandler handler, AsyncCallback ack)487 private static native short mqttClientConnectionSubscribe(long connection, String topic, int qos, 488 MessageHandler handler, AsyncCallback ack) throws CrtRuntimeException; 489 mqttClientConnectionOnMessage(long connection, MessageHandler handler)490 private static native void mqttClientConnectionOnMessage(long connection, MessageHandler handler) 491 throws CrtRuntimeException; 492 mqttClientConnectionUnsubscribe(long connection, String topic, AsyncCallback ack)493 private static native short mqttClientConnectionUnsubscribe(long connection, String topic, AsyncCallback ack); 494 mqttClientConnectionPublish(long connection, String topic, int qos, boolean retain, byte[] payload, AsyncCallback ack)495 private static native short mqttClientConnectionPublish(long connection, String topic, int qos, boolean retain, 496 byte[] payload, AsyncCallback ack) throws CrtRuntimeException; 497 mqttClientConnectionSetWill(long connection, String topic, int qos, boolean retain, byte[] payload)498 private static native boolean mqttClientConnectionSetWill(long connection, String topic, int qos, boolean retain, 499 byte[] payload) throws CrtRuntimeException; 500 mqttClientConnectionSetLogin(long connection, String username, String password)501 private static native void mqttClientConnectionSetLogin(long connection, String username, String password) 502 throws CrtRuntimeException; 503 mqttClientConnectionSetReconnectTimeout(long connection, long minTimeout, long maxTimeout)504 private static native void mqttClientConnectionSetReconnectTimeout(long connection, long minTimeout, 505 long maxTimeout) 506 throws CrtRuntimeException; 507 mqttClientConnectionUseWebsockets(long connection)508 private static native void mqttClientConnectionUseWebsockets(long connection) throws CrtRuntimeException; 509 mqttClientConnectionWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData)510 private static native void mqttClientConnectionWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, 511 Throwable throwable, 512 long nativeUserData) throws CrtRuntimeException; 513 mqttClientConnectionSetHttpProxyOptions(long connection, int proxyConnectionType, String proxyHost, int proxyPort, long proxyTlsContext, int proxyAuthorizationType, String proxyAuthorizationUsername, String proxyAuthorizationPassword)514 private static native void mqttClientConnectionSetHttpProxyOptions(long connection, 515 int proxyConnectionType, 516 String proxyHost, 517 int proxyPort, 518 long proxyTlsContext, 519 int proxyAuthorizationType, 520 String proxyAuthorizationUsername, 521 String proxyAuthorizationPassword) throws CrtRuntimeException; 522 mqttClientConnectionGetOperationStatistics( long connection)523 private static native MqttClientConnectionOperationStatistics mqttClientConnectionGetOperationStatistics( 524 long connection); 525 526 }; 527