• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 /**
3  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4  * SPDX-License-Identifier: Apache-2.0.
5  */
6 package software.amazon.awssdk.crt.mqtt;
7 
8 import java.util.function.Consumer;
9 
10 import software.amazon.awssdk.crt.CrtResource;
11 import software.amazon.awssdk.crt.http.HttpProxyOptions;
12 import software.amazon.awssdk.crt.io.ClientTlsContext;
13 import software.amazon.awssdk.crt.io.SocketOptions;
14 import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
15 
16 /**
17  * Encapsulates all per-mqtt-connection configuration
18  */
19 public final class MqttConnectionConfig extends CrtResource {
20     /* connection */
21     private String endpoint;
22     private int port;
23     private SocketOptions socketOptions;
24 
25     /* mqtt */
26     private MqttClient mqttClient;
27     private Mqtt5Client mqtt5Client;
28     private String clientId;
29     private String username;
30     private String password;
31     private MqttClientConnectionEvents connectionCallbacks;
32     private int keepAliveSecs = 0;
33     private int pingTimeoutMs = 0;
34     private long minReconnectTimeoutSecs = 0L;
35     private long maxReconnectTimeoutSecs = 0L;
36     private int protocolOperationTimeoutMs = 0;
37     private boolean cleanSession = true;
38 
39     /* will */
40     private MqttMessage willMessage;
41     private QualityOfService deprecatedWillQos;
42     private Boolean deprecatedWillRetain;
43 
44     /* websockets */
45     private boolean useWebsockets = false;
46     private HttpProxyOptions proxyOptions;
47     private Consumer<WebsocketHandshakeTransformArgs> websocketHandshakeTransform;
48 
MqttConnectionConfig()49     public MqttConnectionConfig() {}
50 
51 
52     /**
53      * Required override method that must begin the release process of the acquired native handle
54      */
55     @Override
releaseNativeHandle()56     protected void releaseNativeHandle() {}
57 
58     /**
59      * Override that determines whether a resource releases its dependencies at the same time the native handle is released or if it waits.
60      * Resources with asynchronous shutdown processes should override this with false, and establish a callback from native code that
61      * invokes releaseReferences() when the asynchronous shutdown process has completed.  See HttpClientConnectionManager for an example.
62      */
63     @Override
canReleaseReferencesImmediately()64     protected boolean canReleaseReferencesImmediately() { return true; }
65 
66     /**
67      * Configures the connection-related callbacks for a connection
68      *
69      * @param connectionCallbacks connection event callbacks to use
70      */
setConnectionCallbacks(MqttClientConnectionEvents connectionCallbacks)71     public void setConnectionCallbacks(MqttClientConnectionEvents connectionCallbacks) {
72         this.connectionCallbacks = connectionCallbacks;
73     }
74 
75     /**
76      * Queries the connection-related callbacks for a connection
77      *
78      * @return the connection event callbacks to use
79      */
getConnectionCallbacks()80     public MqttClientConnectionEvents getConnectionCallbacks() {
81         return connectionCallbacks;
82     }
83 
84     /**
85      * Configures the client_id to use with a connection
86      *
87      * @param clientId The client id for a connection. Needs to be unique across
88      *                  all devices/clients.this.credentialsProvider
89      */
setClientId(String clientId)90     public void setClientId(String clientId) {
91         this.clientId = clientId;
92     }
93 
94     /**
95      * Queries the client_id being used by a connection
96      *
97      * @return The client id for a connection.
98      */
getClientId()99     public String getClientId() {
100         return clientId;
101     }
102 
103     /**
104      * Configures the IoT endpoint for a connection
105      *
106      * @param endpoint The IoT endpoint to connect to
107      */
setEndpoint(String endpoint)108     public void setEndpoint(String endpoint) {
109         this.endpoint = endpoint;
110     }
111 
112     /**
113      * Queries the IoT endpoint used by a connection
114      *
115      * @return The IoT endpoint used by a connection
116      */
getEndpoint()117     public String getEndpoint() {
118         return endpoint;
119     }
120 
121     /**
122      * Configures the port to connect to.
123      *
124      * @param port The port to connect to. Usually 8883 for MQTT, or 443 for websockets
125      */
setPort(int port)126     public void setPort(int port) {
127         this.port = port;
128     }
129 
130     /**
131      * Queries the port to connect to.
132      *
133      * @return The port to connect to
134      */
getPort()135     public int getPort() {
136         return port;
137     }
138 
139     /**
140      * Configures the common settings to use for a connection's socket
141      *
142      * @param socketOptions The socket settings
143      */
setSocketOptions(SocketOptions socketOptions)144     public void setSocketOptions(SocketOptions socketOptions) {
145         swapReferenceTo(this.socketOptions, socketOptions);
146         this.socketOptions = socketOptions;
147     }
148 
149     /**
150      * Queries the common settings to use for a connection's socket
151      *
152      * @return The socket settings
153      */
getSocketOptions()154     public SocketOptions getSocketOptions() {
155         return socketOptions;
156     }
157 
158     /**
159      * Configures whether or not the service should try to resume prior subscriptions, if it has any
160      *
161      * @param cleanSession true if the session should drop prior subscriptions when
162      *                     a connection is established, false to resume the session
163      */
setCleanSession(boolean cleanSession)164     public void setCleanSession(boolean cleanSession) {
165         this.cleanSession = cleanSession;
166     }
167 
168     /**
169      * Queries whether or not the service should try to resume prior subscriptions, if it has any
170      *
171      * @return true if the session should drop prior subscriptions when
172      *                     a connection is established, false to resume the session
173      */
getCleanSession()174     public boolean getCleanSession() {
175         return cleanSession;
176     }
177 
178     /**
179      * @deprecated Configures MQTT keep-alive via PING messages. Note that this is not TCP
180      * keepalive. Please use setKeepAliveSecs instead.
181      *
182      * @param keepAliveMs How often in milliseconds to send an MQTT PING message to the
183      *                   service to keep a connection alive
184      *
185      */
186     @Deprecated
setKeepAliveMs(int keepAliveMs)187     public void setKeepAliveMs(int keepAliveMs) {
188         this.keepAliveSecs = keepAliveMs/1000;
189     }
190 
191     /**
192      * @deprecated Queries the MQTT keep-alive via PING messages. Please use
193      * getKeepAliveSecs instead.
194      *
195      * @return How often in milliseconds to send an MQTT PING message to the
196      *                   service to keep a connection alive
197      */
198     @Deprecated
getKeepAliveMs()199     public int getKeepAliveMs() {
200         return keepAliveSecs*1000;
201     }
202 
203     /**
204      * Configures MQTT keep-alive via PING messages. Note that this is not TCP
205      * keepalive. Note: AWS IoT Core only allows 30-1200 Secs. Anything larger than
206      * 65535 will be capped.
207      *
208      * @param keepAliveSecs How often in seconds to send an MQTT PING message to the
209      *                   service to keep a connection alive
210      *
211      */
setKeepAliveSecs(int keepAliveSecs)212     public void setKeepAliveSecs(int keepAliveSecs) {
213         this.keepAliveSecs = keepAliveSecs;
214     }
215 
216     /**
217      * Queries the MQTT keep-alive via PING messages.
218      *
219      * @return How often in seconds to send an MQTT PING message to the
220      *                   service to keep a connection alive
221      */
getKeepAliveSecs()222     public int getKeepAliveSecs() {
223         return keepAliveSecs;
224     }
225 
226     /**
227      * Configures ping timeout value.  If a response is not received within this
228      * interval, the connection will be reestablished.
229      *
230      * @param pingTimeoutMs How long to wait for a ping response (in milliseconds) before resetting the connection
231      */
setPingTimeoutMs(int pingTimeoutMs)232     public void setPingTimeoutMs(int pingTimeoutMs) {
233         this.pingTimeoutMs = pingTimeoutMs;
234     }
235 
236     /**
237      * Queries ping timeout value.  If a response is not received within this
238      * interval, the connection will be reestablished.
239      *
240      * @return How long to wait for a ping response before resetting the connection
241      */
getPingTimeoutMs()242     public int getPingTimeoutMs() {
243         return pingTimeoutMs;
244     }
245 
246     /**
247      * Configures the minimum and maximum reconnect timeouts.
248      *
249      * The time between reconnect attempts will start at min and multiply by 2 until max is reached.
250      * Default value for min is 1, for max 128. Set either one to zero will use the default setting.
251      *
252      * @param minTimeoutSecs The timeout to start with
253      * @param maxTimeoutSecs The highest allowable wait time between reconnect attempts
254      */
setReconnectTimeoutSecs(long minTimeoutSecs, long maxTimeoutSecs)255     public void setReconnectTimeoutSecs(long minTimeoutSecs, long maxTimeoutSecs) {
256         this.minReconnectTimeoutSecs = minTimeoutSecs;
257         this.maxReconnectTimeoutSecs = maxTimeoutSecs;
258     }
259 
260     /**
261      * Return the minimum reconnect timeout.
262      *
263      * @return The timeout to start with
264      */
getMinReconnectTimeoutSecs()265     public long getMinReconnectTimeoutSecs() {
266         return minReconnectTimeoutSecs;
267     }
268 
269     /**
270      * Return the maximum reconnect timeout.
271      *
272      * @return The highest allowable wait time between reconnect attempts
273      */
getMaxReconnectTimeoutSecs()274     public long getMaxReconnectTimeoutSecs() {
275         return maxReconnectTimeoutSecs;
276     }
277 
278     /**
279      * Configures timeout value for requests that response is required on healthy connection.
280      * If a response is not received within this interval, the request will fail as server not receiving it.
281      * Applied to publish (QoS&gt;0) and unsubscribe
282      *
283      * @param protocolOperationTimeoutMs How long to wait for a request response (in milliseconds) before failing
284      */
setProtocolOperationTimeoutMs(int protocolOperationTimeoutMs)285     public void setProtocolOperationTimeoutMs(int protocolOperationTimeoutMs) {
286         this.protocolOperationTimeoutMs = protocolOperationTimeoutMs;
287     }
288 
289     /**
290      * Queries timeout value for requests that response is required on healthy connection.
291      * If a response is not received within this interval, the request will fail as server not receiving it.
292      * Applied to publish (QoS&gt;0) and unsubscribe
293      *
294      * @return How long to wait for a request response (in milliseconds) before failing
295      */
getProtocolOperationTimeoutMs()296     public int getProtocolOperationTimeoutMs() {
297         return protocolOperationTimeoutMs;
298     }
299 
300     /**
301      * Configures the mqtt client to use for a connection
302      *
303      * @param mqttClient the mqtt client to use
304      */
setMqttClient(MqttClient mqttClient)305     public void setMqttClient(MqttClient mqttClient) {
306         swapReferenceTo(this.mqttClient, mqttClient);
307         this.mqttClient = mqttClient;
308     }
309 
310     /**
311      * Queries the mqtt client to use for a connection
312      *
313      * @return the mqtt client to use
314      */
getMqttClient()315     public MqttClient getMqttClient() {
316         return mqttClient;
317     }
318 
319     /**
320      * Configures the mqtt5 client to use for a connection
321      *
322      * @param mqtt5Client the mqtt client to use
323      */
setMqtt5Client(Mqtt5Client mqtt5Client)324     public void setMqtt5Client(Mqtt5Client mqtt5Client) {
325         swapReferenceTo(this.mqtt5Client, mqtt5Client);
326         this.mqtt5Client = mqtt5Client;
327     }
328 
329     /**
330      * Queries the mqtt5 client to use for a connection
331      *
332      * @return the mqtt5 client to use
333      */
getMqtt5Client()334     public Mqtt5Client getMqtt5Client() {
335         return mqtt5Client;
336     }
337 
338     /**
339      * Sets the login credentials for a connection.
340      *
341      * @param user Login username
342      * @param pass Login password
343      */
setLogin(String user, String pass)344     public void setLogin(String user, String pass) throws MqttException {
345         this.username = user;
346         this.password = pass;
347     }
348 
349     /**
350      * Configures the username to use as part of the CONNECT attempt
351      *
352      * @param username username to use for the mqtt connect operation
353      */
setUsername(String username)354     public void setUsername(String username) {
355         this.username = username;
356     }
357 
358     /**
359      * Queries the username to use as part of the CONNECT attempt
360      *
361      * @return username to use for the mqtt connect operation
362      */
getUsername()363     public String getUsername() {
364         return username;
365     }
366 
367     /**
368      * Configures the password to use as part of the CONNECT attempt
369      *
370      * @param password password to use for the mqtt connect operation
371      */
setPassword(String password)372     public void setPassword(String password) {
373         this.password = password;
374     }
375 
376     /**
377      * Queries the password to use as part of the CONNECT attempt
378      *
379      * @return password to use for the mqtt connect operation
380      */
getPassword()381     public String getPassword() {
382         return password;
383     }
384 
385     /**
386      * Configures the last will and testament message to be delivered to a topic when a connection disconnects
387      *
388      * @param willMessage the message to publish as the will
389      */
setWillMessage(MqttMessage willMessage)390     public void setWillMessage(MqttMessage willMessage) {
391         this.willMessage = willMessage;
392     }
393 
394     /**
395      * Queries the last will and testament message to be delivered to a topic when a connection disconnects
396      *
397      * @return the message to publish as the will
398      */
getWillMessage()399     public MqttMessage getWillMessage() {
400         if (willMessage == null || (deprecatedWillRetain == null && deprecatedWillQos == null)) {
401             return willMessage;
402         }
403         QualityOfService qos = deprecatedWillQos == null ? willMessage.getQos() : deprecatedWillQos;
404         boolean retain = deprecatedWillRetain == null ? willMessage.getRetain() : deprecatedWillRetain;
405         return new MqttMessage(willMessage.getTopic(), willMessage.getPayload(), qos, retain);
406     }
407 
408     /**
409      * @deprecated Set QoS directly on the will's {@link MqttMessage}.
410      * @param qos Quality of Service
411      */
412     @Deprecated
setWillQos(QualityOfService qos)413     public void setWillQos(QualityOfService qos) {
414         this.deprecatedWillQos = qos;
415     }
416 
417     /**
418      * @deprecated Query QoS directly from the will's {@link MqttMessage}.
419      * @return Quality of Service
420      */
421     @Deprecated
getWillQos()422     public QualityOfService getWillQos() {
423         if (deprecatedWillQos != null) {
424             return deprecatedWillQos;
425         }
426         if (willMessage != null) {
427             return willMessage.getQos();
428         }
429         return null;
430     }
431 
432     /**
433      * @deprecated Set retain directly on the will's {@link MqttMessage}.
434      * @param retain whether will's should be sent with retain property set
435      */
436     @Deprecated
setWillRetain(boolean retain)437     public void setWillRetain(boolean retain) {
438         this.deprecatedWillRetain = retain;
439     }
440 
441     /**
442      * @deprecated Query retain directly from the will's {@link MqttMessage}.
443      * @return whether will will be sent with retain property set
444      */
445     @Deprecated
getWillRetain()446     public boolean getWillRetain() {
447         if (deprecatedWillRetain != null) {
448             return deprecatedWillRetain;
449         }
450         if (willMessage != null) {
451             return willMessage.getRetain();
452         }
453         return false;
454     }
455 
456     /**
457      * Configures whether or not to use websockets for the mqtt connection
458      *
459      * @param useWebsockets whether or not to use websockets
460      */
setUseWebsockets(boolean useWebsockets)461     public void setUseWebsockets(boolean useWebsockets) {
462         this.useWebsockets = useWebsockets;
463     }
464 
465     /**
466      * Queries whether or not to use websockets for the mqtt connection
467      *
468      * @return whether or not to use websockets
469      */
getUseWebsockets()470     public boolean getUseWebsockets() {
471         return useWebsockets;
472     }
473 
474     /**
475      * @deprecated use setHttpProxyOptions instead
476      * Configures proxy options for a websocket-based mqtt connection
477      *
478      * @param proxyOptions proxy options to use for the base http connection
479      */
setWebsocketProxyOptions(HttpProxyOptions proxyOptions)480     public void setWebsocketProxyOptions(HttpProxyOptions proxyOptions) {
481         this.proxyOptions = proxyOptions;
482     }
483 
484     /**
485      * @deprecated use getHttpProxyOptions instead
486      * Queries proxy options for a websocket-based mqtt connection
487      *
488      * @return proxy options to use for the base http connection
489      */
getWebsocketProxyOptions()490     public HttpProxyOptions getWebsocketProxyOptions() {
491         return proxyOptions;
492     }
493 
494 
495     /**
496      * Configures proxy options for the mqtt connection
497      *
498      * @param proxyOptions proxy options to use for the connection
499      */
setHttpProxyOptions(HttpProxyOptions proxyOptions)500     public void setHttpProxyOptions(HttpProxyOptions proxyOptions) {
501         this.proxyOptions = proxyOptions;
502     }
503 
504     /**
505      * Queries proxy options for an mqtt connection
506      *
507      * @return proxy options to use for the connection
508      */
getHttpProxyOptions()509     public HttpProxyOptions getHttpProxyOptions() {
510         return proxyOptions;
511     }
512 
513     /**
514      * Set a transform operation to use on each websocket handshake http request.
515      * The transform may modify the http request before it is sent to the server.
516      * The transform MUST call handshakeTransform.complete() or handshakeTransform.completeExceptionally()
517      * when the transform is complete, failure to do so will stall the mqtt connection indefinitely.
518      * The transform operation may be asynchronous.
519      *
520      * The default websocket handshake http request uses path "/mqtt".
521      * All required headers for a websocket handshake are present,
522      * plus the optional header "Sec-WebSocket-Protocol: mqtt".
523      *
524      * This is only applicable to websocket-based mqtt connections.
525      *
526      * @param handshakeTransform http request handshake transform
527      */
setWebsocketHandshakeTransform(Consumer<WebsocketHandshakeTransformArgs> handshakeTransform)528     public void setWebsocketHandshakeTransform(Consumer<WebsocketHandshakeTransformArgs> handshakeTransform) {
529         this.websocketHandshakeTransform = handshakeTransform;
530     }
531 
532     /**
533      * Queries the handshake http request transform to use when upgrading the connection
534      *
535      * @return http request handshake transform
536      */
getWebsocketHandshakeTransform()537     public Consumer<WebsocketHandshakeTransformArgs> getWebsocketHandshakeTransform() {
538         return websocketHandshakeTransform;
539     }
540 
541     /**
542      * Creates a (shallow) clone of this config object
543      *
544      * @return shallow clone of this config object
545      */
clone()546     public MqttConnectionConfig clone() {
547         try (MqttConnectionConfig clone = new MqttConnectionConfig()) {
548             clone.setEndpoint(getEndpoint());
549             clone.setPort(getPort());
550             clone.setSocketOptions(getSocketOptions());
551 
552             clone.setMqttClient(getMqttClient());
553             clone.setMqtt5Client(getMqtt5Client());
554             clone.setClientId(getClientId());
555             clone.setUsername(getUsername());
556             clone.setPassword(getPassword());
557             clone.setConnectionCallbacks(getConnectionCallbacks());
558             clone.setKeepAliveSecs(getKeepAliveSecs());
559             clone.setPingTimeoutMs(getPingTimeoutMs());
560             clone.setProtocolOperationTimeoutMs(getProtocolOperationTimeoutMs());
561             clone.setCleanSession(getCleanSession());
562 
563             clone.setWillMessage(getWillMessage());
564 
565             clone.setUseWebsockets(getUseWebsockets());
566             clone.setHttpProxyOptions(getHttpProxyOptions());
567             clone.setWebsocketHandshakeTransform(getWebsocketHandshakeTransform());
568 
569             clone.setReconnectTimeoutSecs(getMinReconnectTimeoutSecs(), getMaxReconnectTimeoutSecs());
570 
571             // success, bump up the ref count so we can escape the try-with-resources block
572             clone.addRef();
573             return clone;
574         }
575     }
576 }
577