1 2 /** 3 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 4 * SPDX-License-Identifier: Apache-2.0. 5 */ 6 package software.amazon.awssdk.crt.mqtt; 7 8 import java.util.function.Consumer; 9 10 import software.amazon.awssdk.crt.CrtResource; 11 import software.amazon.awssdk.crt.http.HttpProxyOptions; 12 import software.amazon.awssdk.crt.io.ClientTlsContext; 13 import software.amazon.awssdk.crt.io.SocketOptions; 14 import software.amazon.awssdk.crt.mqtt5.Mqtt5Client; 15 16 /** 17 * Encapsulates all per-mqtt-connection configuration 18 */ 19 public final class MqttConnectionConfig extends CrtResource { 20 /* connection */ 21 private String endpoint; 22 private int port; 23 private SocketOptions socketOptions; 24 25 /* mqtt */ 26 private MqttClient mqttClient; 27 private Mqtt5Client mqtt5Client; 28 private String clientId; 29 private String username; 30 private String password; 31 private MqttClientConnectionEvents connectionCallbacks; 32 private int keepAliveSecs = 0; 33 private int pingTimeoutMs = 0; 34 private long minReconnectTimeoutSecs = 0L; 35 private long maxReconnectTimeoutSecs = 0L; 36 private int protocolOperationTimeoutMs = 0; 37 private boolean cleanSession = true; 38 39 /* will */ 40 private MqttMessage willMessage; 41 private QualityOfService deprecatedWillQos; 42 private Boolean deprecatedWillRetain; 43 44 /* websockets */ 45 private boolean useWebsockets = false; 46 private HttpProxyOptions proxyOptions; 47 private Consumer<WebsocketHandshakeTransformArgs> websocketHandshakeTransform; 48 MqttConnectionConfig()49 public MqttConnectionConfig() {} 50 51 52 /** 53 * Required override method that must begin the release process of the acquired native handle 54 */ 55 @Override releaseNativeHandle()56 protected void releaseNativeHandle() {} 57 58 /** 59 * Override that determines whether a resource releases its dependencies at the same time the native handle is released or if it waits. 60 * Resources with asynchronous shutdown processes should override this with false, and establish a callback from native code that 61 * invokes releaseReferences() when the asynchronous shutdown process has completed. See HttpClientConnectionManager for an example. 62 */ 63 @Override canReleaseReferencesImmediately()64 protected boolean canReleaseReferencesImmediately() { return true; } 65 66 /** 67 * Configures the connection-related callbacks for a connection 68 * 69 * @param connectionCallbacks connection event callbacks to use 70 */ setConnectionCallbacks(MqttClientConnectionEvents connectionCallbacks)71 public void setConnectionCallbacks(MqttClientConnectionEvents connectionCallbacks) { 72 this.connectionCallbacks = connectionCallbacks; 73 } 74 75 /** 76 * Queries the connection-related callbacks for a connection 77 * 78 * @return the connection event callbacks to use 79 */ getConnectionCallbacks()80 public MqttClientConnectionEvents getConnectionCallbacks() { 81 return connectionCallbacks; 82 } 83 84 /** 85 * Configures the client_id to use with a connection 86 * 87 * @param clientId The client id for a connection. Needs to be unique across 88 * all devices/clients.this.credentialsProvider 89 */ setClientId(String clientId)90 public void setClientId(String clientId) { 91 this.clientId = clientId; 92 } 93 94 /** 95 * Queries the client_id being used by a connection 96 * 97 * @return The client id for a connection. 98 */ getClientId()99 public String getClientId() { 100 return clientId; 101 } 102 103 /** 104 * Configures the IoT endpoint for a connection 105 * 106 * @param endpoint The IoT endpoint to connect to 107 */ setEndpoint(String endpoint)108 public void setEndpoint(String endpoint) { 109 this.endpoint = endpoint; 110 } 111 112 /** 113 * Queries the IoT endpoint used by a connection 114 * 115 * @return The IoT endpoint used by a connection 116 */ getEndpoint()117 public String getEndpoint() { 118 return endpoint; 119 } 120 121 /** 122 * Configures the port to connect to. 123 * 124 * @param port The port to connect to. Usually 8883 for MQTT, or 443 for websockets 125 */ setPort(int port)126 public void setPort(int port) { 127 this.port = port; 128 } 129 130 /** 131 * Queries the port to connect to. 132 * 133 * @return The port to connect to 134 */ getPort()135 public int getPort() { 136 return port; 137 } 138 139 /** 140 * Configures the common settings to use for a connection's socket 141 * 142 * @param socketOptions The socket settings 143 */ setSocketOptions(SocketOptions socketOptions)144 public void setSocketOptions(SocketOptions socketOptions) { 145 swapReferenceTo(this.socketOptions, socketOptions); 146 this.socketOptions = socketOptions; 147 } 148 149 /** 150 * Queries the common settings to use for a connection's socket 151 * 152 * @return The socket settings 153 */ getSocketOptions()154 public SocketOptions getSocketOptions() { 155 return socketOptions; 156 } 157 158 /** 159 * Configures whether or not the service should try to resume prior subscriptions, if it has any 160 * 161 * @param cleanSession true if the session should drop prior subscriptions when 162 * a connection is established, false to resume the session 163 */ setCleanSession(boolean cleanSession)164 public void setCleanSession(boolean cleanSession) { 165 this.cleanSession = cleanSession; 166 } 167 168 /** 169 * Queries whether or not the service should try to resume prior subscriptions, if it has any 170 * 171 * @return true if the session should drop prior subscriptions when 172 * a connection is established, false to resume the session 173 */ getCleanSession()174 public boolean getCleanSession() { 175 return cleanSession; 176 } 177 178 /** 179 * @deprecated Configures MQTT keep-alive via PING messages. Note that this is not TCP 180 * keepalive. Please use setKeepAliveSecs instead. 181 * 182 * @param keepAliveMs How often in milliseconds to send an MQTT PING message to the 183 * service to keep a connection alive 184 * 185 */ 186 @Deprecated setKeepAliveMs(int keepAliveMs)187 public void setKeepAliveMs(int keepAliveMs) { 188 this.keepAliveSecs = keepAliveMs/1000; 189 } 190 191 /** 192 * @deprecated Queries the MQTT keep-alive via PING messages. Please use 193 * getKeepAliveSecs instead. 194 * 195 * @return How often in milliseconds to send an MQTT PING message to the 196 * service to keep a connection alive 197 */ 198 @Deprecated getKeepAliveMs()199 public int getKeepAliveMs() { 200 return keepAliveSecs*1000; 201 } 202 203 /** 204 * Configures MQTT keep-alive via PING messages. Note that this is not TCP 205 * keepalive. Note: AWS IoT Core only allows 30-1200 Secs. Anything larger than 206 * 65535 will be capped. 207 * 208 * @param keepAliveSecs How often in seconds to send an MQTT PING message to the 209 * service to keep a connection alive 210 * 211 */ setKeepAliveSecs(int keepAliveSecs)212 public void setKeepAliveSecs(int keepAliveSecs) { 213 this.keepAliveSecs = keepAliveSecs; 214 } 215 216 /** 217 * Queries the MQTT keep-alive via PING messages. 218 * 219 * @return How often in seconds to send an MQTT PING message to the 220 * service to keep a connection alive 221 */ getKeepAliveSecs()222 public int getKeepAliveSecs() { 223 return keepAliveSecs; 224 } 225 226 /** 227 * Configures ping timeout value. If a response is not received within this 228 * interval, the connection will be reestablished. 229 * 230 * @param pingTimeoutMs How long to wait for a ping response (in milliseconds) before resetting the connection 231 */ setPingTimeoutMs(int pingTimeoutMs)232 public void setPingTimeoutMs(int pingTimeoutMs) { 233 this.pingTimeoutMs = pingTimeoutMs; 234 } 235 236 /** 237 * Queries ping timeout value. If a response is not received within this 238 * interval, the connection will be reestablished. 239 * 240 * @return How long to wait for a ping response before resetting the connection 241 */ getPingTimeoutMs()242 public int getPingTimeoutMs() { 243 return pingTimeoutMs; 244 } 245 246 /** 247 * Configures the minimum and maximum reconnect timeouts. 248 * 249 * The time between reconnect attempts will start at min and multiply by 2 until max is reached. 250 * Default value for min is 1, for max 128. Set either one to zero will use the default setting. 251 * 252 * @param minTimeoutSecs The timeout to start with 253 * @param maxTimeoutSecs The highest allowable wait time between reconnect attempts 254 */ setReconnectTimeoutSecs(long minTimeoutSecs, long maxTimeoutSecs)255 public void setReconnectTimeoutSecs(long minTimeoutSecs, long maxTimeoutSecs) { 256 this.minReconnectTimeoutSecs = minTimeoutSecs; 257 this.maxReconnectTimeoutSecs = maxTimeoutSecs; 258 } 259 260 /** 261 * Return the minimum reconnect timeout. 262 * 263 * @return The timeout to start with 264 */ getMinReconnectTimeoutSecs()265 public long getMinReconnectTimeoutSecs() { 266 return minReconnectTimeoutSecs; 267 } 268 269 /** 270 * Return the maximum reconnect timeout. 271 * 272 * @return The highest allowable wait time between reconnect attempts 273 */ getMaxReconnectTimeoutSecs()274 public long getMaxReconnectTimeoutSecs() { 275 return maxReconnectTimeoutSecs; 276 } 277 278 /** 279 * Configures timeout value for requests that response is required on healthy connection. 280 * If a response is not received within this interval, the request will fail as server not receiving it. 281 * Applied to publish (QoS>0) and unsubscribe 282 * 283 * @param protocolOperationTimeoutMs How long to wait for a request response (in milliseconds) before failing 284 */ setProtocolOperationTimeoutMs(int protocolOperationTimeoutMs)285 public void setProtocolOperationTimeoutMs(int protocolOperationTimeoutMs) { 286 this.protocolOperationTimeoutMs = protocolOperationTimeoutMs; 287 } 288 289 /** 290 * Queries timeout value for requests that response is required on healthy connection. 291 * If a response is not received within this interval, the request will fail as server not receiving it. 292 * Applied to publish (QoS>0) and unsubscribe 293 * 294 * @return How long to wait for a request response (in milliseconds) before failing 295 */ getProtocolOperationTimeoutMs()296 public int getProtocolOperationTimeoutMs() { 297 return protocolOperationTimeoutMs; 298 } 299 300 /** 301 * Configures the mqtt client to use for a connection 302 * 303 * @param mqttClient the mqtt client to use 304 */ setMqttClient(MqttClient mqttClient)305 public void setMqttClient(MqttClient mqttClient) { 306 swapReferenceTo(this.mqttClient, mqttClient); 307 this.mqttClient = mqttClient; 308 } 309 310 /** 311 * Queries the mqtt client to use for a connection 312 * 313 * @return the mqtt client to use 314 */ getMqttClient()315 public MqttClient getMqttClient() { 316 return mqttClient; 317 } 318 319 /** 320 * Configures the mqtt5 client to use for a connection 321 * 322 * @param mqtt5Client the mqtt client to use 323 */ setMqtt5Client(Mqtt5Client mqtt5Client)324 public void setMqtt5Client(Mqtt5Client mqtt5Client) { 325 swapReferenceTo(this.mqtt5Client, mqtt5Client); 326 this.mqtt5Client = mqtt5Client; 327 } 328 329 /** 330 * Queries the mqtt5 client to use for a connection 331 * 332 * @return the mqtt5 client to use 333 */ getMqtt5Client()334 public Mqtt5Client getMqtt5Client() { 335 return mqtt5Client; 336 } 337 338 /** 339 * Sets the login credentials for a connection. 340 * 341 * @param user Login username 342 * @param pass Login password 343 */ setLogin(String user, String pass)344 public void setLogin(String user, String pass) throws MqttException { 345 this.username = user; 346 this.password = pass; 347 } 348 349 /** 350 * Configures the username to use as part of the CONNECT attempt 351 * 352 * @param username username to use for the mqtt connect operation 353 */ setUsername(String username)354 public void setUsername(String username) { 355 this.username = username; 356 } 357 358 /** 359 * Queries the username to use as part of the CONNECT attempt 360 * 361 * @return username to use for the mqtt connect operation 362 */ getUsername()363 public String getUsername() { 364 return username; 365 } 366 367 /** 368 * Configures the password to use as part of the CONNECT attempt 369 * 370 * @param password password to use for the mqtt connect operation 371 */ setPassword(String password)372 public void setPassword(String password) { 373 this.password = password; 374 } 375 376 /** 377 * Queries the password to use as part of the CONNECT attempt 378 * 379 * @return password to use for the mqtt connect operation 380 */ getPassword()381 public String getPassword() { 382 return password; 383 } 384 385 /** 386 * Configures the last will and testament message to be delivered to a topic when a connection disconnects 387 * 388 * @param willMessage the message to publish as the will 389 */ setWillMessage(MqttMessage willMessage)390 public void setWillMessage(MqttMessage willMessage) { 391 this.willMessage = willMessage; 392 } 393 394 /** 395 * Queries the last will and testament message to be delivered to a topic when a connection disconnects 396 * 397 * @return the message to publish as the will 398 */ getWillMessage()399 public MqttMessage getWillMessage() { 400 if (willMessage == null || (deprecatedWillRetain == null && deprecatedWillQos == null)) { 401 return willMessage; 402 } 403 QualityOfService qos = deprecatedWillQos == null ? willMessage.getQos() : deprecatedWillQos; 404 boolean retain = deprecatedWillRetain == null ? willMessage.getRetain() : deprecatedWillRetain; 405 return new MqttMessage(willMessage.getTopic(), willMessage.getPayload(), qos, retain); 406 } 407 408 /** 409 * @deprecated Set QoS directly on the will's {@link MqttMessage}. 410 * @param qos Quality of Service 411 */ 412 @Deprecated setWillQos(QualityOfService qos)413 public void setWillQos(QualityOfService qos) { 414 this.deprecatedWillQos = qos; 415 } 416 417 /** 418 * @deprecated Query QoS directly from the will's {@link MqttMessage}. 419 * @return Quality of Service 420 */ 421 @Deprecated getWillQos()422 public QualityOfService getWillQos() { 423 if (deprecatedWillQos != null) { 424 return deprecatedWillQos; 425 } 426 if (willMessage != null) { 427 return willMessage.getQos(); 428 } 429 return null; 430 } 431 432 /** 433 * @deprecated Set retain directly on the will's {@link MqttMessage}. 434 * @param retain whether will's should be sent with retain property set 435 */ 436 @Deprecated setWillRetain(boolean retain)437 public void setWillRetain(boolean retain) { 438 this.deprecatedWillRetain = retain; 439 } 440 441 /** 442 * @deprecated Query retain directly from the will's {@link MqttMessage}. 443 * @return whether will will be sent with retain property set 444 */ 445 @Deprecated getWillRetain()446 public boolean getWillRetain() { 447 if (deprecatedWillRetain != null) { 448 return deprecatedWillRetain; 449 } 450 if (willMessage != null) { 451 return willMessage.getRetain(); 452 } 453 return false; 454 } 455 456 /** 457 * Configures whether or not to use websockets for the mqtt connection 458 * 459 * @param useWebsockets whether or not to use websockets 460 */ setUseWebsockets(boolean useWebsockets)461 public void setUseWebsockets(boolean useWebsockets) { 462 this.useWebsockets = useWebsockets; 463 } 464 465 /** 466 * Queries whether or not to use websockets for the mqtt connection 467 * 468 * @return whether or not to use websockets 469 */ getUseWebsockets()470 public boolean getUseWebsockets() { 471 return useWebsockets; 472 } 473 474 /** 475 * @deprecated use setHttpProxyOptions instead 476 * Configures proxy options for a websocket-based mqtt connection 477 * 478 * @param proxyOptions proxy options to use for the base http connection 479 */ setWebsocketProxyOptions(HttpProxyOptions proxyOptions)480 public void setWebsocketProxyOptions(HttpProxyOptions proxyOptions) { 481 this.proxyOptions = proxyOptions; 482 } 483 484 /** 485 * @deprecated use getHttpProxyOptions instead 486 * Queries proxy options for a websocket-based mqtt connection 487 * 488 * @return proxy options to use for the base http connection 489 */ getWebsocketProxyOptions()490 public HttpProxyOptions getWebsocketProxyOptions() { 491 return proxyOptions; 492 } 493 494 495 /** 496 * Configures proxy options for the mqtt connection 497 * 498 * @param proxyOptions proxy options to use for the connection 499 */ setHttpProxyOptions(HttpProxyOptions proxyOptions)500 public void setHttpProxyOptions(HttpProxyOptions proxyOptions) { 501 this.proxyOptions = proxyOptions; 502 } 503 504 /** 505 * Queries proxy options for an mqtt connection 506 * 507 * @return proxy options to use for the connection 508 */ getHttpProxyOptions()509 public HttpProxyOptions getHttpProxyOptions() { 510 return proxyOptions; 511 } 512 513 /** 514 * Set a transform operation to use on each websocket handshake http request. 515 * The transform may modify the http request before it is sent to the server. 516 * The transform MUST call handshakeTransform.complete() or handshakeTransform.completeExceptionally() 517 * when the transform is complete, failure to do so will stall the mqtt connection indefinitely. 518 * The transform operation may be asynchronous. 519 * 520 * The default websocket handshake http request uses path "/mqtt". 521 * All required headers for a websocket handshake are present, 522 * plus the optional header "Sec-WebSocket-Protocol: mqtt". 523 * 524 * This is only applicable to websocket-based mqtt connections. 525 * 526 * @param handshakeTransform http request handshake transform 527 */ setWebsocketHandshakeTransform(Consumer<WebsocketHandshakeTransformArgs> handshakeTransform)528 public void setWebsocketHandshakeTransform(Consumer<WebsocketHandshakeTransformArgs> handshakeTransform) { 529 this.websocketHandshakeTransform = handshakeTransform; 530 } 531 532 /** 533 * Queries the handshake http request transform to use when upgrading the connection 534 * 535 * @return http request handshake transform 536 */ getWebsocketHandshakeTransform()537 public Consumer<WebsocketHandshakeTransformArgs> getWebsocketHandshakeTransform() { 538 return websocketHandshakeTransform; 539 } 540 541 /** 542 * Creates a (shallow) clone of this config object 543 * 544 * @return shallow clone of this config object 545 */ clone()546 public MqttConnectionConfig clone() { 547 try (MqttConnectionConfig clone = new MqttConnectionConfig()) { 548 clone.setEndpoint(getEndpoint()); 549 clone.setPort(getPort()); 550 clone.setSocketOptions(getSocketOptions()); 551 552 clone.setMqttClient(getMqttClient()); 553 clone.setMqtt5Client(getMqtt5Client()); 554 clone.setClientId(getClientId()); 555 clone.setUsername(getUsername()); 556 clone.setPassword(getPassword()); 557 clone.setConnectionCallbacks(getConnectionCallbacks()); 558 clone.setKeepAliveSecs(getKeepAliveSecs()); 559 clone.setPingTimeoutMs(getPingTimeoutMs()); 560 clone.setProtocolOperationTimeoutMs(getProtocolOperationTimeoutMs()); 561 clone.setCleanSession(getCleanSession()); 562 563 clone.setWillMessage(getWillMessage()); 564 565 clone.setUseWebsockets(getUseWebsockets()); 566 clone.setHttpProxyOptions(getHttpProxyOptions()); 567 clone.setWebsocketHandshakeTransform(getWebsocketHandshakeTransform()); 568 569 clone.setReconnectTimeoutSecs(getMinReconnectTimeoutSecs(), getMaxReconnectTimeoutSecs()); 570 571 // success, bump up the ref count so we can escape the try-with-resources block 572 clone.addRef(); 573 return clone; 574 } 575 } 576 } 577