/** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ package software.amazon.awssdk.crt.mqtt; import java.util.function.Consumer; import software.amazon.awssdk.crt.CrtResource; import software.amazon.awssdk.crt.http.HttpProxyOptions; import software.amazon.awssdk.crt.io.ClientTlsContext; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.crt.mqtt5.Mqtt5Client; /** * Encapsulates all per-mqtt-connection configuration */ public final class MqttConnectionConfig extends CrtResource { /* connection */ private String endpoint; private int port; private SocketOptions socketOptions; /* mqtt */ private MqttClient mqttClient; private Mqtt5Client mqtt5Client; private String clientId; private String username; private String password; private MqttClientConnectionEvents connectionCallbacks; private int keepAliveSecs = 0; private int pingTimeoutMs = 0; private long minReconnectTimeoutSecs = 0L; private long maxReconnectTimeoutSecs = 0L; private int protocolOperationTimeoutMs = 0; private boolean cleanSession = true; /* will */ private MqttMessage willMessage; private QualityOfService deprecatedWillQos; private Boolean deprecatedWillRetain; /* websockets */ private boolean useWebsockets = false; private HttpProxyOptions proxyOptions; private Consumer websocketHandshakeTransform; public MqttConnectionConfig() {} /** * Required override method that must begin the release process of the acquired native handle */ @Override protected void releaseNativeHandle() {} /** * Override that determines whether a resource releases its dependencies at the same time the native handle is released or if it waits. * Resources with asynchronous shutdown processes should override this with false, and establish a callback from native code that * invokes releaseReferences() when the asynchronous shutdown process has completed. See HttpClientConnectionManager for an example. */ @Override protected boolean canReleaseReferencesImmediately() { return true; } /** * Configures the connection-related callbacks for a connection * * @param connectionCallbacks connection event callbacks to use */ public void setConnectionCallbacks(MqttClientConnectionEvents connectionCallbacks) { this.connectionCallbacks = connectionCallbacks; } /** * Queries the connection-related callbacks for a connection * * @return the connection event callbacks to use */ public MqttClientConnectionEvents getConnectionCallbacks() { return connectionCallbacks; } /** * Configures the client_id to use with a connection * * @param clientId The client id for a connection. Needs to be unique across * all devices/clients.this.credentialsProvider */ public void setClientId(String clientId) { this.clientId = clientId; } /** * Queries the client_id being used by a connection * * @return The client id for a connection. */ public String getClientId() { return clientId; } /** * Configures the IoT endpoint for a connection * * @param endpoint The IoT endpoint to connect to */ public void setEndpoint(String endpoint) { this.endpoint = endpoint; } /** * Queries the IoT endpoint used by a connection * * @return The IoT endpoint used by a connection */ public String getEndpoint() { return endpoint; } /** * Configures the port to connect to. * * @param port The port to connect to. Usually 8883 for MQTT, or 443 for websockets */ public void setPort(int port) { this.port = port; } /** * Queries the port to connect to. * * @return The port to connect to */ public int getPort() { return port; } /** * Configures the common settings to use for a connection's socket * * @param socketOptions The socket settings */ public void setSocketOptions(SocketOptions socketOptions) { swapReferenceTo(this.socketOptions, socketOptions); this.socketOptions = socketOptions; } /** * Queries the common settings to use for a connection's socket * * @return The socket settings */ public SocketOptions getSocketOptions() { return socketOptions; } /** * Configures whether or not the service should try to resume prior subscriptions, if it has any * * @param cleanSession true if the session should drop prior subscriptions when * a connection is established, false to resume the session */ public void setCleanSession(boolean cleanSession) { this.cleanSession = cleanSession; } /** * Queries whether or not the service should try to resume prior subscriptions, if it has any * * @return true if the session should drop prior subscriptions when * a connection is established, false to resume the session */ public boolean getCleanSession() { return cleanSession; } /** * @deprecated Configures MQTT keep-alive via PING messages. Note that this is not TCP * keepalive. Please use setKeepAliveSecs instead. * * @param keepAliveMs How often in milliseconds to send an MQTT PING message to the * service to keep a connection alive * */ @Deprecated public void setKeepAliveMs(int keepAliveMs) { this.keepAliveSecs = keepAliveMs/1000; } /** * @deprecated Queries the MQTT keep-alive via PING messages. Please use * getKeepAliveSecs instead. * * @return How often in milliseconds to send an MQTT PING message to the * service to keep a connection alive */ @Deprecated public int getKeepAliveMs() { return keepAliveSecs*1000; } /** * Configures MQTT keep-alive via PING messages. Note that this is not TCP * keepalive. Note: AWS IoT Core only allows 30-1200 Secs. Anything larger than * 65535 will be capped. * * @param keepAliveSecs How often in seconds to send an MQTT PING message to the * service to keep a connection alive * */ public void setKeepAliveSecs(int keepAliveSecs) { this.keepAliveSecs = keepAliveSecs; } /** * Queries the MQTT keep-alive via PING messages. * * @return How often in seconds to send an MQTT PING message to the * service to keep a connection alive */ public int getKeepAliveSecs() { return keepAliveSecs; } /** * Configures ping timeout value. If a response is not received within this * interval, the connection will be reestablished. * * @param pingTimeoutMs How long to wait for a ping response (in milliseconds) before resetting the connection */ public void setPingTimeoutMs(int pingTimeoutMs) { this.pingTimeoutMs = pingTimeoutMs; } /** * Queries ping timeout value. If a response is not received within this * interval, the connection will be reestablished. * * @return How long to wait for a ping response before resetting the connection */ public int getPingTimeoutMs() { return pingTimeoutMs; } /** * Configures the minimum and maximum reconnect timeouts. * * The time between reconnect attempts will start at min and multiply by 2 until max is reached. * Default value for min is 1, for max 128. Set either one to zero will use the default setting. * * @param minTimeoutSecs The timeout to start with * @param maxTimeoutSecs The highest allowable wait time between reconnect attempts */ public void setReconnectTimeoutSecs(long minTimeoutSecs, long maxTimeoutSecs) { this.minReconnectTimeoutSecs = minTimeoutSecs; this.maxReconnectTimeoutSecs = maxTimeoutSecs; } /** * Return the minimum reconnect timeout. * * @return The timeout to start with */ public long getMinReconnectTimeoutSecs() { return minReconnectTimeoutSecs; } /** * Return the maximum reconnect timeout. * * @return The highest allowable wait time between reconnect attempts */ public long getMaxReconnectTimeoutSecs() { return maxReconnectTimeoutSecs; } /** * Configures timeout value for requests that response is required on healthy connection. * If a response is not received within this interval, the request will fail as server not receiving it. * Applied to publish (QoS>0) and unsubscribe * * @param protocolOperationTimeoutMs How long to wait for a request response (in milliseconds) before failing */ public void setProtocolOperationTimeoutMs(int protocolOperationTimeoutMs) { this.protocolOperationTimeoutMs = protocolOperationTimeoutMs; } /** * Queries timeout value for requests that response is required on healthy connection. * If a response is not received within this interval, the request will fail as server not receiving it. * Applied to publish (QoS>0) and unsubscribe * * @return How long to wait for a request response (in milliseconds) before failing */ public int getProtocolOperationTimeoutMs() { return protocolOperationTimeoutMs; } /** * Configures the mqtt client to use for a connection * * @param mqttClient the mqtt client to use */ public void setMqttClient(MqttClient mqttClient) { swapReferenceTo(this.mqttClient, mqttClient); this.mqttClient = mqttClient; } /** * Queries the mqtt client to use for a connection * * @return the mqtt client to use */ public MqttClient getMqttClient() { return mqttClient; } /** * Configures the mqtt5 client to use for a connection * * @param mqtt5Client the mqtt client to use */ public void setMqtt5Client(Mqtt5Client mqtt5Client) { swapReferenceTo(this.mqtt5Client, mqtt5Client); this.mqtt5Client = mqtt5Client; } /** * Queries the mqtt5 client to use for a connection * * @return the mqtt5 client to use */ public Mqtt5Client getMqtt5Client() { return mqtt5Client; } /** * Sets the login credentials for a connection. * * @param user Login username * @param pass Login password */ public void setLogin(String user, String pass) throws MqttException { this.username = user; this.password = pass; } /** * Configures the username to use as part of the CONNECT attempt * * @param username username to use for the mqtt connect operation */ public void setUsername(String username) { this.username = username; } /** * Queries the username to use as part of the CONNECT attempt * * @return username to use for the mqtt connect operation */ public String getUsername() { return username; } /** * Configures the password to use as part of the CONNECT attempt * * @param password password to use for the mqtt connect operation */ public void setPassword(String password) { this.password = password; } /** * Queries the password to use as part of the CONNECT attempt * * @return password to use for the mqtt connect operation */ public String getPassword() { return password; } /** * Configures the last will and testament message to be delivered to a topic when a connection disconnects * * @param willMessage the message to publish as the will */ public void setWillMessage(MqttMessage willMessage) { this.willMessage = willMessage; } /** * Queries the last will and testament message to be delivered to a topic when a connection disconnects * * @return the message to publish as the will */ public MqttMessage getWillMessage() { if (willMessage == null || (deprecatedWillRetain == null && deprecatedWillQos == null)) { return willMessage; } QualityOfService qos = deprecatedWillQos == null ? willMessage.getQos() : deprecatedWillQos; boolean retain = deprecatedWillRetain == null ? willMessage.getRetain() : deprecatedWillRetain; return new MqttMessage(willMessage.getTopic(), willMessage.getPayload(), qos, retain); } /** * @deprecated Set QoS directly on the will's {@link MqttMessage}. * @param qos Quality of Service */ @Deprecated public void setWillQos(QualityOfService qos) { this.deprecatedWillQos = qos; } /** * @deprecated Query QoS directly from the will's {@link MqttMessage}. * @return Quality of Service */ @Deprecated public QualityOfService getWillQos() { if (deprecatedWillQos != null) { return deprecatedWillQos; } if (willMessage != null) { return willMessage.getQos(); } return null; } /** * @deprecated Set retain directly on the will's {@link MqttMessage}. * @param retain whether will's should be sent with retain property set */ @Deprecated public void setWillRetain(boolean retain) { this.deprecatedWillRetain = retain; } /** * @deprecated Query retain directly from the will's {@link MqttMessage}. * @return whether will will be sent with retain property set */ @Deprecated public boolean getWillRetain() { if (deprecatedWillRetain != null) { return deprecatedWillRetain; } if (willMessage != null) { return willMessage.getRetain(); } return false; } /** * Configures whether or not to use websockets for the mqtt connection * * @param useWebsockets whether or not to use websockets */ public void setUseWebsockets(boolean useWebsockets) { this.useWebsockets = useWebsockets; } /** * Queries whether or not to use websockets for the mqtt connection * * @return whether or not to use websockets */ public boolean getUseWebsockets() { return useWebsockets; } /** * @deprecated use setHttpProxyOptions instead * Configures proxy options for a websocket-based mqtt connection * * @param proxyOptions proxy options to use for the base http connection */ public void setWebsocketProxyOptions(HttpProxyOptions proxyOptions) { this.proxyOptions = proxyOptions; } /** * @deprecated use getHttpProxyOptions instead * Queries proxy options for a websocket-based mqtt connection * * @return proxy options to use for the base http connection */ public HttpProxyOptions getWebsocketProxyOptions() { return proxyOptions; } /** * Configures proxy options for the mqtt connection * * @param proxyOptions proxy options to use for the connection */ public void setHttpProxyOptions(HttpProxyOptions proxyOptions) { this.proxyOptions = proxyOptions; } /** * Queries proxy options for an mqtt connection * * @return proxy options to use for the connection */ public HttpProxyOptions getHttpProxyOptions() { return proxyOptions; } /** * Set a transform operation to use on each websocket handshake http request. * The transform may modify the http request before it is sent to the server. * The transform MUST call handshakeTransform.complete() or handshakeTransform.completeExceptionally() * when the transform is complete, failure to do so will stall the mqtt connection indefinitely. * The transform operation may be asynchronous. * * The default websocket handshake http request uses path "/mqtt". * All required headers for a websocket handshake are present, * plus the optional header "Sec-WebSocket-Protocol: mqtt". * * This is only applicable to websocket-based mqtt connections. * * @param handshakeTransform http request handshake transform */ public void setWebsocketHandshakeTransform(Consumer handshakeTransform) { this.websocketHandshakeTransform = handshakeTransform; } /** * Queries the handshake http request transform to use when upgrading the connection * * @return http request handshake transform */ public Consumer getWebsocketHandshakeTransform() { return websocketHandshakeTransform; } /** * Creates a (shallow) clone of this config object * * @return shallow clone of this config object */ public MqttConnectionConfig clone() { try (MqttConnectionConfig clone = new MqttConnectionConfig()) { clone.setEndpoint(getEndpoint()); clone.setPort(getPort()); clone.setSocketOptions(getSocketOptions()); clone.setMqttClient(getMqttClient()); clone.setMqtt5Client(getMqtt5Client()); clone.setClientId(getClientId()); clone.setUsername(getUsername()); clone.setPassword(getPassword()); clone.setConnectionCallbacks(getConnectionCallbacks()); clone.setKeepAliveSecs(getKeepAliveSecs()); clone.setPingTimeoutMs(getPingTimeoutMs()); clone.setProtocolOperationTimeoutMs(getProtocolOperationTimeoutMs()); clone.setCleanSession(getCleanSession()); clone.setWillMessage(getWillMessage()); clone.setUseWebsockets(getUseWebsockets()); clone.setHttpProxyOptions(getHttpProxyOptions()); clone.setWebsocketHandshakeTransform(getWebsocketHandshakeTransform()); clone.setReconnectTimeoutSecs(getMinReconnectTimeoutSecs(), getMaxReconnectTimeoutSecs()); // success, bump up the ref count so we can escape the try-with-resources block clone.addRef(); return clone; } } }