/** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ package software.amazon.awssdk.crt.mqtt5; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import software.amazon.awssdk.crt.CrtResource; import software.amazon.awssdk.crt.CrtRuntimeException; import software.amazon.awssdk.crt.http.HttpProxyOptions; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.crt.io.TlsContext; import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket; import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket; import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket; import software.amazon.awssdk.crt.mqtt5.packets.SubAckPacket; import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket; import software.amazon.awssdk.crt.mqtt5.packets.UnsubAckPacket; import software.amazon.awssdk.crt.mqtt5.packets.UnsubscribePacket; import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket.ConnectPacketBuilder; /** * This class wraps the aws-c-mqtt MQTT5 client to provide the basic MQTT5 pub/sub functionalities * via the AWS Common Runtime * * One Mqtt5Client class creates one connection. * */ public class Mqtt5Client extends CrtResource { /** * A private reference to the websocket handshake from the MQTT5 client options */ private Consumer websocketHandshakeTransform; /** * A boolean that holds whether the client's current state is connected or not */ private boolean isConnected; /** * A private config used to save config for mqtt3 connection creation */ private Mqtt5ClientOptions clientOptions; /** * Creates a Mqtt5Client instance using the provided Mqtt5ClientOptions. Once the Mqtt5Client is created, * changing the settings will not cause a change in already created Mqtt5Client's. * * @param options The Mqtt5Options class to use to configure the new Mqtt5Client. * @throws CrtRuntimeException If the system is unable to allocate space for a native MQTT5 client structure */ public Mqtt5Client(Mqtt5ClientOptions options) throws CrtRuntimeException { clientOptions = options; ClientBootstrap bootstrap = options.getBootstrap(); SocketOptions socketOptions = options.getSocketOptions(); TlsContext tlsContext = options.getTlsContext(); HttpProxyOptions proxyOptions = options.getHttpProxyOptions(); ConnectPacket connectionOptions = options.getConnectOptions(); this.websocketHandshakeTransform = options.getWebsocketHandshakeTransform(); if (bootstrap == null) { bootstrap = ClientBootstrap.getOrCreateStaticDefault(); } if (connectionOptions == null) { ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); connectionOptions = connectBuilder.build(); } acquireNativeHandle(mqtt5ClientNew( options, connectionOptions, bootstrap, this )); if (bootstrap != null) { addReferenceTo(bootstrap); } if (socketOptions != null) { addReferenceTo(socketOptions); } if (tlsContext != null) { addReferenceTo(tlsContext); } if (proxyOptions != null) { if (proxyOptions.getTlsContext() != null) { addReferenceTo(proxyOptions.getTlsContext()); } } isConnected = false; } /** * Cleans up the native resources associated with this client. The client is unusable after this call */ @Override protected void releaseNativeHandle() { if (!isNull()) { mqtt5ClientDestroy(getNativeHandle()); } } /** * Determines whether a resource releases its dependencies at the same time the native handle is released or if it waits. * Resources that wait are responsible for calling releaseReferences() manually. */ @Override protected boolean canReleaseReferencesImmediately() { return false; } /** * Notifies the Mqtt5Client that you want it maintain connectivity to the configured endpoint. * The client will attempt to stay connected using the properties of the reconnect-related parameters * in the Mqtt5Client configuration. * * This is an asynchronous operation. * * @throws CrtRuntimeException If the native client returns an error when starting */ public void start() throws CrtRuntimeException { mqtt5ClientInternalStart(getNativeHandle()); } /** * Notifies the Mqtt5Client that you want it to end connectivity to the configured endpoint, disconnecting any * existing connection and halting any reconnect attempts. * * This is an asynchronous operation. * * @param disconnectPacket (optional) Properties of a DISCONNECT packet to send as part of the shutdown process. When * disconnectPacket is null, no DISCONNECT packets will be sent. * @throws CrtRuntimeException If the native client is unable to initialize the stop process. */ public void stop(DisconnectPacket disconnectPacket) throws CrtRuntimeException { mqtt5ClientInternalStop(getNativeHandle(), disconnectPacket); } /** * Notifies the Mqtt5Client that you want it to end connectivity to the configured endpoint, disconnecting any * existing connection and halting any reconnect attempts. No DISCONNECT packets will be sent. * * This is an asynchronous operation. * * @throws CrtRuntimeException If the native client is unable to initialize the stop process. */ public void stop() throws CrtRuntimeException { stop(null); } /** * Tells the Mqtt5Client to attempt to send a PUBLISH packet. * * Will return a future containing a PublishPacket if the publish is successful. * The data in the PublishPacket varies depending on the QoS of the Publish. For QoS 0, the PublishPacket * will not contain data. For QoS 1, the PublishPacket will contain a PubAckPacket. * See PublishPacket class documentation for more info. * * @param publishPacket PUBLISH packet to send to the server * @return A future that will be rejected with an error or resolved with a PublishResult response */ public CompletableFuture publish(PublishPacket publishPacket) { CompletableFuture publishFuture = new CompletableFuture<>(); mqtt5ClientInternalPublish(getNativeHandle(), publishPacket, publishFuture); return publishFuture; } /** * Tells the Mqtt5Client to attempt to subscribe to one or more topic filters. * * @param subscribePacket SUBSCRIBE packet to send to the server * @return a future that will be rejected with an error or resolved with the SUBACK response */ public CompletableFuture subscribe(SubscribePacket subscribePacket) { CompletableFuture subscribeFuture = new CompletableFuture<>(); mqtt5ClientInternalSubscribe(getNativeHandle(), subscribePacket, subscribeFuture); return subscribeFuture; } /** * Tells the Mqtt5Client to attempt to unsubscribe from one or more topic filters. * * @param unsubscribePacket UNSUBSCRIBE packet to send to the server * @return a future that will be rejected with an error or resolved with the UNSUBACK response */ public CompletableFuture unsubscribe(UnsubscribePacket unsubscribePacket) { CompletableFuture unsubscribeFuture = new CompletableFuture<>(); mqtt5ClientInternalUnsubscribe(getNativeHandle(), unsubscribePacket, unsubscribeFuture); return unsubscribeFuture; } /** * Returns statistics about the current state of the Mqtt5Client's queue of operations. * @return Current state of the client's queue of operations. */ public Mqtt5ClientOperationStatistics getOperationStatistics() { return mqtt5ClientInternalGetOperationStatistics(getNativeHandle()); } /** * Returns the connectivity state for the Mqtt5Client. * @return True if the client is connected, false otherwise */ public synchronized boolean getIsConnected() { return isConnected; } /** * Sets the connectivity state of the Mqtt5Client. Is used by JNI. * @param connected The current connectivity state of the Mqtt5Client */ private synchronized void setIsConnected(boolean connected) { isConnected = connected; } /******************************************************************************* * Mqtt5 to Mqtt3 Adapter ******************************************************************************/ /** * Returns the Mqtt5ClientOptions used for the Mqtt5Client * * @return Mqtt5ClientOptions */ public Mqtt5ClientOptions getClientOptions() { return clientOptions; } /******************************************************************************* * websocket methods ******************************************************************************/ /** * Called from native when a websocket handshake request is being prepared. * @param handshakeRequest The HttpRequest being prepared * @param nativeUserData Native data */ private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData) { CompletableFuture future = new CompletableFuture<>(); future.whenComplete((x, throwable) -> { mqtt5ClientInternalWebsocketHandshakeComplete(getNativeHandle(), x != null ? x.marshalForJni() : null, throwable, nativeUserData); }); Mqtt5WebsocketHandshakeTransformArgs args = new Mqtt5WebsocketHandshakeTransformArgs(this, handshakeRequest, future); Consumer transform = this.websocketHandshakeTransform; if (transform != null) { transform.accept(args); } else { args.complete(handshakeRequest); } } /******************************************************************************* * native methods ******************************************************************************/ private static native long mqtt5ClientNew( Mqtt5ClientOptions options, ConnectPacket connect_options, ClientBootstrap bootstrap, Mqtt5Client client ) throws CrtRuntimeException; private static native void mqtt5ClientDestroy(long client); private static native void mqtt5ClientInternalStart(long client); private static native void mqtt5ClientInternalStop(long client, DisconnectPacket disconnect_options); private static native void mqtt5ClientInternalPublish(long client, PublishPacket publish_options, CompletableFuture publish_result); private static native void mqtt5ClientInternalSubscribe(long client, SubscribePacket subscribe_options, CompletableFuture subscribe_suback); private static native void mqtt5ClientInternalUnsubscribe(long client, UnsubscribePacket unsubscribe_options, CompletableFuture unsubscribe_suback); private static native void mqtt5ClientInternalWebsocketHandshakeComplete(long connection, byte[] marshalledRequest, Throwable throwable, long nativeUserData) throws CrtRuntimeException; private static native Mqtt5ClientOperationStatistics mqtt5ClientInternalGetOperationStatistics(long client); }