1 /** 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * SPDX-License-Identifier: Apache-2.0. 4 */ 5 package software.amazon.awssdk.crt.mqtt5; 6 7 import java.util.concurrent.CompletableFuture; 8 import java.util.function.Consumer; 9 10 import software.amazon.awssdk.crt.CrtResource; 11 import software.amazon.awssdk.crt.CrtRuntimeException; 12 import software.amazon.awssdk.crt.http.HttpProxyOptions; 13 import software.amazon.awssdk.crt.http.HttpRequest; 14 import software.amazon.awssdk.crt.io.ClientBootstrap; 15 import software.amazon.awssdk.crt.io.SocketOptions; 16 import software.amazon.awssdk.crt.io.TlsContext; 17 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket; 18 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket; 19 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket; 20 import software.amazon.awssdk.crt.mqtt5.packets.SubAckPacket; 21 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket; 22 import software.amazon.awssdk.crt.mqtt5.packets.UnsubAckPacket; 23 import software.amazon.awssdk.crt.mqtt5.packets.UnsubscribePacket; 24 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket.ConnectPacketBuilder; 25 26 27 /** 28 * This class wraps the aws-c-mqtt MQTT5 client to provide the basic MQTT5 pub/sub functionalities 29 * via the AWS Common Runtime 30 * 31 * One Mqtt5Client class creates one connection. 32 * 33 */ 34 public class Mqtt5Client extends CrtResource { 35 36 /** 37 * A private reference to the websocket handshake from the MQTT5 client options 38 */ 39 private Consumer<Mqtt5WebsocketHandshakeTransformArgs> websocketHandshakeTransform; 40 41 /** 42 * A boolean that holds whether the client's current state is connected or not 43 */ 44 private boolean isConnected; 45 46 /** 47 * A private config used to save config for mqtt3 connection creation 48 */ 49 private Mqtt5ClientOptions clientOptions; 50 51 /** 52 * Creates a Mqtt5Client instance using the provided Mqtt5ClientOptions. Once the Mqtt5Client is created, 53 * changing the settings will not cause a change in already created Mqtt5Client's. 54 * 55 * @param options The Mqtt5Options class to use to configure the new Mqtt5Client. 56 * @throws CrtRuntimeException If the system is unable to allocate space for a native MQTT5 client structure 57 */ Mqtt5Client(Mqtt5ClientOptions options)58 public Mqtt5Client(Mqtt5ClientOptions options) throws CrtRuntimeException { 59 clientOptions = options; 60 ClientBootstrap bootstrap = options.getBootstrap(); 61 SocketOptions socketOptions = options.getSocketOptions(); 62 TlsContext tlsContext = options.getTlsContext(); 63 HttpProxyOptions proxyOptions = options.getHttpProxyOptions(); 64 ConnectPacket connectionOptions = options.getConnectOptions(); 65 this.websocketHandshakeTransform = options.getWebsocketHandshakeTransform(); 66 67 if (bootstrap == null) { 68 bootstrap = ClientBootstrap.getOrCreateStaticDefault(); 69 } 70 71 if (connectionOptions == null) { 72 ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); 73 connectionOptions = connectBuilder.build(); 74 } 75 76 acquireNativeHandle(mqtt5ClientNew( 77 options, 78 connectionOptions, 79 bootstrap, 80 this 81 )); 82 83 if (bootstrap != null) { 84 addReferenceTo(bootstrap); 85 } 86 if (socketOptions != null) { 87 addReferenceTo(socketOptions); 88 } 89 if (tlsContext != null) { 90 addReferenceTo(tlsContext); 91 } 92 if (proxyOptions != null) { 93 if (proxyOptions.getTlsContext() != null) { 94 addReferenceTo(proxyOptions.getTlsContext()); 95 } 96 } 97 isConnected = false; 98 } 99 100 /** 101 * Cleans up the native resources associated with this client. The client is unusable after this call 102 */ 103 @Override releaseNativeHandle()104 protected void releaseNativeHandle() { 105 if (!isNull()) { 106 mqtt5ClientDestroy(getNativeHandle()); 107 } 108 } 109 110 /** 111 * Determines whether a resource releases its dependencies at the same time the native handle is released or if it waits. 112 * Resources that wait are responsible for calling releaseReferences() manually. 113 */ 114 @Override canReleaseReferencesImmediately()115 protected boolean canReleaseReferencesImmediately() { return false; } 116 117 /** 118 * Notifies the Mqtt5Client that you want it maintain connectivity to the configured endpoint. 119 * The client will attempt to stay connected using the properties of the reconnect-related parameters 120 * in the Mqtt5Client configuration. 121 * 122 * This is an asynchronous operation. 123 * 124 * @throws CrtRuntimeException If the native client returns an error when starting 125 */ start()126 public void start() throws CrtRuntimeException { 127 mqtt5ClientInternalStart(getNativeHandle()); 128 } 129 130 /** 131 * Notifies the Mqtt5Client that you want it to end connectivity to the configured endpoint, disconnecting any 132 * existing connection and halting any reconnect attempts. 133 * 134 * This is an asynchronous operation. 135 * 136 * @param disconnectPacket (optional) Properties of a DISCONNECT packet to send as part of the shutdown process. When 137 * disconnectPacket is null, no DISCONNECT packets will be sent. 138 * @throws CrtRuntimeException If the native client is unable to initialize the stop process. 139 */ stop(DisconnectPacket disconnectPacket)140 public void stop(DisconnectPacket disconnectPacket) throws CrtRuntimeException { 141 mqtt5ClientInternalStop(getNativeHandle(), disconnectPacket); 142 } 143 144 /** 145 * Notifies the Mqtt5Client that you want it to end connectivity to the configured endpoint, disconnecting any 146 * existing connection and halting any reconnect attempts. No DISCONNECT packets will be sent. 147 * 148 * This is an asynchronous operation. 149 * 150 * @throws CrtRuntimeException If the native client is unable to initialize the stop process. 151 */ stop()152 public void stop() throws CrtRuntimeException { 153 stop(null); 154 } 155 156 /** 157 * Tells the Mqtt5Client to attempt to send a PUBLISH packet. 158 * 159 * Will return a future containing a PublishPacket if the publish is successful. 160 * The data in the PublishPacket varies depending on the QoS of the Publish. For QoS 0, the PublishPacket 161 * will not contain data. For QoS 1, the PublishPacket will contain a PubAckPacket. 162 * See PublishPacket class documentation for more info. 163 * 164 * @param publishPacket PUBLISH packet to send to the server 165 * @return A future that will be rejected with an error or resolved with a PublishResult response 166 */ publish(PublishPacket publishPacket)167 public CompletableFuture<PublishResult> publish(PublishPacket publishPacket) { 168 CompletableFuture<PublishResult> publishFuture = new CompletableFuture<>(); 169 mqtt5ClientInternalPublish(getNativeHandle(), publishPacket, publishFuture); 170 return publishFuture; 171 } 172 173 /** 174 * Tells the Mqtt5Client to attempt to subscribe to one or more topic filters. 175 * 176 * @param subscribePacket SUBSCRIBE packet to send to the server 177 * @return a future that will be rejected with an error or resolved with the SUBACK response 178 */ subscribe(SubscribePacket subscribePacket)179 public CompletableFuture<SubAckPacket> subscribe(SubscribePacket subscribePacket) { 180 CompletableFuture<SubAckPacket> subscribeFuture = new CompletableFuture<>(); 181 mqtt5ClientInternalSubscribe(getNativeHandle(), subscribePacket, subscribeFuture); 182 return subscribeFuture; 183 } 184 185 /** 186 * Tells the Mqtt5Client to attempt to unsubscribe from one or more topic filters. 187 * 188 * @param unsubscribePacket UNSUBSCRIBE packet to send to the server 189 * @return a future that will be rejected with an error or resolved with the UNSUBACK response 190 */ unsubscribe(UnsubscribePacket unsubscribePacket)191 public CompletableFuture<UnsubAckPacket> unsubscribe(UnsubscribePacket unsubscribePacket) { 192 CompletableFuture<UnsubAckPacket> unsubscribeFuture = new CompletableFuture<>(); 193 mqtt5ClientInternalUnsubscribe(getNativeHandle(), unsubscribePacket, unsubscribeFuture); 194 return unsubscribeFuture; 195 } 196 197 /** 198 * Returns statistics about the current state of the Mqtt5Client's queue of operations. 199 * @return Current state of the client's queue of operations. 200 */ getOperationStatistics()201 public Mqtt5ClientOperationStatistics getOperationStatistics() { 202 return mqtt5ClientInternalGetOperationStatistics(getNativeHandle()); 203 } 204 205 /** 206 * Returns the connectivity state for the Mqtt5Client. 207 * @return True if the client is connected, false otherwise 208 */ getIsConnected()209 public synchronized boolean getIsConnected() { 210 return isConnected; 211 } 212 213 /** 214 * Sets the connectivity state of the Mqtt5Client. Is used by JNI. 215 * @param connected The current connectivity state of the Mqtt5Client 216 */ setIsConnected(boolean connected)217 private synchronized void setIsConnected(boolean connected) { 218 isConnected = connected; 219 } 220 221 222 /******************************************************************************* 223 * Mqtt5 to Mqtt3 Adapter 224 ******************************************************************************/ 225 226 /** 227 * Returns the Mqtt5ClientOptions used for the Mqtt5Client 228 * 229 * @return Mqtt5ClientOptions 230 */ getClientOptions()231 public Mqtt5ClientOptions getClientOptions() 232 { 233 return clientOptions; 234 } 235 236 /******************************************************************************* 237 * websocket methods 238 ******************************************************************************/ 239 240 /** 241 * Called from native when a websocket handshake request is being prepared. 242 * @param handshakeRequest The HttpRequest being prepared 243 * @param nativeUserData Native data 244 */ onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData)245 private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData) { 246 CompletableFuture<HttpRequest> future = new CompletableFuture<>(); 247 future.whenComplete((x, throwable) -> { 248 mqtt5ClientInternalWebsocketHandshakeComplete(getNativeHandle(), x != null ? x.marshalForJni() : null, 249 throwable, nativeUserData); 250 }); 251 252 Mqtt5WebsocketHandshakeTransformArgs args = new Mqtt5WebsocketHandshakeTransformArgs(this, handshakeRequest, future); 253 254 Consumer<Mqtt5WebsocketHandshakeTransformArgs> transform = this.websocketHandshakeTransform; 255 if (transform != null) { 256 transform.accept(args); 257 } else { 258 args.complete(handshakeRequest); 259 } 260 } 261 262 263 /******************************************************************************* 264 * native methods 265 ******************************************************************************/ mqtt5ClientNew( Mqtt5ClientOptions options, ConnectPacket connect_options, ClientBootstrap bootstrap, Mqtt5Client client )266 private static native long mqtt5ClientNew( 267 Mqtt5ClientOptions options, 268 ConnectPacket connect_options, 269 ClientBootstrap bootstrap, 270 Mqtt5Client client 271 ) throws CrtRuntimeException; mqtt5ClientDestroy(long client)272 private static native void mqtt5ClientDestroy(long client); mqtt5ClientInternalStart(long client)273 private static native void mqtt5ClientInternalStart(long client); mqtt5ClientInternalStop(long client, DisconnectPacket disconnect_options)274 private static native void mqtt5ClientInternalStop(long client, DisconnectPacket disconnect_options); mqtt5ClientInternalPublish(long client, PublishPacket publish_options, CompletableFuture<PublishResult> publish_result)275 private static native void mqtt5ClientInternalPublish(long client, PublishPacket publish_options, CompletableFuture<PublishResult> publish_result); mqtt5ClientInternalSubscribe(long client, SubscribePacket subscribe_options, CompletableFuture<SubAckPacket> subscribe_suback)276 private static native void mqtt5ClientInternalSubscribe(long client, SubscribePacket subscribe_options, CompletableFuture<SubAckPacket> subscribe_suback); mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture<UnsubAckPacket> unsubscribe_suback)277 private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture<UnsubAckPacket> unsubscribe_suback); mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData)278 private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException; mqtt5ClientInternalGetOperationStatistics(long client)279 private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client); 280 } 281