1 /** 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * SPDX-License-Identifier: Apache-2.0. 4 */ 5 6 package software.amazon.awssdk.crt.test; 7 8 import static org.junit.Assert.*; 9 10 import software.amazon.awssdk.crt.CRT; 11 import software.amazon.awssdk.crt.auth.credentials.CredentialsProvider; 12 import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig; 13 import software.amazon.awssdk.crt.http.HttpProxyOptions; 14 import software.amazon.awssdk.crt.io.ClientBootstrap; 15 import software.amazon.awssdk.crt.io.EventLoopGroup; 16 import software.amazon.awssdk.crt.io.HostResolver; 17 import software.amazon.awssdk.crt.io.TlsContext; 18 import software.amazon.awssdk.crt.mqtt.*; 19 20 import java.nio.file.Path; 21 import java.util.UUID; 22 import java.util.concurrent.CompletableFuture; 23 import java.util.concurrent.TimeUnit; 24 import java.util.function.Consumer; 25 26 class MissingCredentialsException extends RuntimeException { MissingCredentialsException(String message)27 MissingCredentialsException(String message) { 28 super(message); 29 } 30 } 31 32 public class MqttClientConnectionFixture extends CrtTestFixture { 33 34 MqttClientConnection connection = null; 35 private boolean disconnecting = false; 36 37 private CompletableFuture<OnConnectionSuccessReturn> onConnectionSuccessFuture = new CompletableFuture<OnConnectionSuccessReturn>(); 38 private CompletableFuture<OnConnectionFailureReturn> onConnectionFailureFuture = new CompletableFuture<OnConnectionFailureReturn>(); 39 private CompletableFuture<OnConnectionClosedReturn> onConnectionClosedFuture = new CompletableFuture<OnConnectionClosedReturn>(); 40 41 static final boolean AWS_TEST_IS_CI = System.getProperty("AWS_TEST_IS_CI") != null; 42 static final String AWS_TEST_MQTT311_ROOTCA = System.getProperty("AWS_TEST_MQTT311_ROOT_CA"); 43 // Static credential related 44 static final String AWS_TEST_MQTT311_ROLE_CREDENTIAL_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT311_ROLE_CREDENTIAL_ACCESS_KEY"); 45 static final String AWS_TEST_MQTT311_ROLE_CREDENTIAL_SECRET_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT311_ROLE_CREDENTIAL_SECRET_ACCESS_KEY"); 46 static final String AWS_TEST_MQTT311_ROLE_CREDENTIAL_SESSION_TOKEN = System.getProperty("AWS_TEST_MQTT311_ROLE_CREDENTIAL_SESSION_TOKEN"); 47 // Custom Key Ops 48 static final String AWS_TEST_MQTT311_CUSTOM_KEY_OPS_KEY = System.getProperty("AWS_TEST_MQTT311_CUSTOM_KEY_OPS_KEY"); 49 static final String AWS_TEST_MQTT311_CUSTOM_KEY_OPS_CERT = System.getProperty("AWS_TEST_MQTT311_CUSTOM_KEY_OPS_CERT"); 50 // MQTT311 Cognito 51 static final String AWS_TEST_MQTT311_COGNITO_ENDPOINT = System.getProperty("AWS_TEST_MQTT311_COGNITO_ENDPOINT"); 52 static final String AWS_TEST_MQTT311_COGNITO_IDENTITY = System.getProperty("AWS_TEST_MQTT311_COGNITO_IDENTITY"); 53 // MQTT311 Codebuild/Direct connections data 54 static final String AWS_TEST_MQTT311_DIRECT_MQTT_HOST = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_HOST"); 55 static final String AWS_TEST_MQTT311_DIRECT_MQTT_PORT = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_PORT"); 56 static final String AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_HOST"); 57 static final String AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_PORT"); 58 static final String AWS_TEST_MQTT311_DIRECT_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_TLS_HOST"); 59 static final String AWS_TEST_MQTT311_DIRECT_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_TLS_PORT"); 60 // MQTT311 Codebuild/Websocket connections data 61 static final String AWS_TEST_MQTT311_WS_MQTT_HOST = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_HOST"); 62 static final String AWS_TEST_MQTT311_WS_MQTT_PORT = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_PORT"); 63 static final String AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_HOST"); 64 static final String AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_PORT"); 65 static final String AWS_TEST_MQTT311_WS_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_TLS_HOST"); 66 static final String AWS_TEST_MQTT311_WS_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_TLS_PORT"); 67 // MQTT311 Codebuild misc connections data 68 static final String AWS_TEST_MQTT311_BASIC_AUTH_USERNAME = System.getProperty("AWS_TEST_MQTT311_BASIC_AUTH_USERNAME"); 69 static final String AWS_TEST_MQTT311_BASIC_AUTH_PASSWORD = System.getProperty("AWS_TEST_MQTT311_BASIC_AUTH_PASSWORD"); 70 static final String AWS_TEST_MQTT311_CERTIFICATE_FILE = System.getProperty("AWS_TEST_MQTT311_CERTIFICATE_FILE"); 71 static final String AWS_TEST_MQTT311_KEY_FILE = System.getProperty("AWS_TEST_MQTT311_KEY_FILE"); 72 // MQTT311 IoT Endpoint, Key, Cert 73 static final String AWS_TEST_MQTT311_IOT_CORE_HOST = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_HOST"); 74 static final String AWS_TEST_MQTT311_IOT_CORE_RSA_CERT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT"); 75 static final String AWS_TEST_MQTT311_IOT_CORE_RSA_KEY = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY"); 76 static final String AWS_TEST_MQTT311_IOT_CORE_ECC_CERT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_ECC_CERT"); 77 static final String AWS_TEST_MQTT311_IOT_CORE_ECC_KEY = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_ECC_KEY"); 78 // MQTT311 Proxy 79 static final String AWS_TEST_MQTT311_PROXY_HOST = System.getProperty("AWS_TEST_MQTT311_PROXY_HOST"); 80 static final String AWS_TEST_MQTT311_PROXY_PORT = System.getProperty("AWS_TEST_MQTT311_PROXY_PORT"); 81 // MQTT311 Keystore 82 static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_FORMAT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_FORMAT"); 83 static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_FILE = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_FILE"); 84 static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_PASSWORD = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_PASSWORD"); 85 static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_CERT_ALIAS = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_CERT_ALIAS"); 86 static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_CERT_PASSWORD = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_CERT_PASSWORD"); 87 // MQTT311 PKCS12 88 static final String AWS_TEST_MQTT311_IOT_CORE_PKCS12_KEY = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_PKCS12_KEY"); 89 static final String AWS_TEST_MQTT311_IOT_CORE_PKCS12_KEY_PASSWORD = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_PKCS12_KEY_PASSWORD"); 90 // MQTT311 PKCS11 91 static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_LIB = System.getProperty("AWS_TEST_PKCS11_LIB"); 92 static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_TOKEN_LABEL = System.getProperty("AWS_TEST_PKCS11_TOKEN_LABEL"); 93 static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_PIN = System.getProperty("AWS_TEST_PKCS11_PIN"); 94 static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_PKEY_LABEL = System.getProperty("AWS_TEST_PKCS11_PKEY_LABEL"); 95 static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_CERT_FILE = System.getProperty("AWS_TEST_PKCS11_CERT_FILE"); 96 // MQTT311 X509 97 static final String AWS_TEST_MQTT311_IOT_CORE_X509_CERT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_CERT"); 98 static final String AWS_TEST_MQTT311_IOT_CORE_X509_KEY = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_KEY"); 99 static final String AWS_TEST_MQTT311_IOT_CORE_X509_ENDPOINT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_ENDPOINT"); 100 static final String AWS_TEST_MQTT311_IOT_CORE_X509_ROLE_ALIAS = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_ROLE_ALIAS"); 101 static final String AWS_TEST_MQTT311_IOT_CORE_X509_THING_NAME = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_THING_NAME"); 102 // MQTT311 Windows Cert Store 103 static final String AWS_TEST_MQTT311_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS"); 104 static final String AWS_TEST_MQTT311_IOT_CORE_WINDOWS_CERT_STORE = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_WINDOWS_CERT_STORE"); 105 106 // MQTT5 Custom Key Ops (so we don't have to make a new file just for a single test) 107 static final String AWS_TEST_MQTT5_IOT_CORE_HOST = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_HOST"); 108 static final String AWS_TEST_MQTT5_CUSTOM_KEY_OPS_CERT = System.getProperty("AWS_TEST_MQTT5_CUSTOM_KEY_OPS_CERT"); 109 static final String AWS_TEST_MQTT5_CUSTOM_KEY_OPS_KEY = System.getProperty("AWS_TEST_MQTT5_CUSTOM_KEY_OPS_KEY"); 110 111 static final short TEST_PORT = 8883; 112 static final short TEST_PORT_ALPN = 443; 113 static final String TEST_CLIENTID = "aws-crt-java-"; 114 static final String TEST_REGION = "us-east-1"; 115 116 Path pathToCert = null; 117 Path pathToKey = null; 118 Path pathToCa = null; 119 120 String certificatePem = null; 121 String privateKeyPem = null; 122 String caRoot = null; 123 String iotEndpoint = null; 124 125 class ConnectionEventsStatistics { 126 int onConnectionSuccessCalled; 127 int onConnectionFailureCalled; 128 int onConnectionResumedCalled; 129 int onConnectionInterruptedCalled; 130 int onConnectionClosedCalled; 131 }; 132 133 ConnectionEventsStatistics connectionEventsStatistics = new ConnectionEventsStatistics(); 134 135 Consumer<MqttConnectionConfig> connectionConfigTransformer = null; setConnectionConfigTransformer(Consumer<MqttConnectionConfig> connectionConfigTransformer)136 protected void setConnectionConfigTransformer(Consumer<MqttConnectionConfig> connectionConfigTransformer) { 137 this.connectionConfigTransformer = connectionConfigTransformer; 138 } 139 140 Consumer<MqttMessage> connectionMessageTransfomer = null; setConnectionMessageTransformer(Consumer<MqttMessage> connectionMessageTransfomer)141 protected void setConnectionMessageTransformer(Consumer<MqttMessage> connectionMessageTransfomer) { 142 this.connectionMessageTransfomer = connectionMessageTransfomer; 143 } 144 MqttClientConnectionFixture()145 MqttClientConnectionFixture() { 146 } 147 connectDirectWithConfig(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions)148 boolean connectDirectWithConfig(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions) 149 { 150 try { 151 return connectDirectWithConfigThrows(tlsContext, endpoint, port, username, password, httpProxyOptions); 152 } catch (Exception ex) { 153 fail("Exception during connect: " + ex.toString()); 154 } 155 return false; 156 } 157 connectDirectWithConfigThrows(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions)158 boolean connectDirectWithConfigThrows(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions) throws Exception 159 { 160 try(EventLoopGroup elg = new EventLoopGroup(1); 161 HostResolver hr = new HostResolver(elg); 162 ClientBootstrap bootstrap = new ClientBootstrap(elg, hr);) { 163 164 // Connection callback events 165 MqttClientConnectionEvents events = new MqttClientConnectionEvents() { 166 @Override 167 public void onConnectionResumed(boolean sessionPresent) { 168 connectionEventsStatistics.onConnectionResumedCalled++; 169 System.out.println("Connection resumed"); 170 } 171 172 @Override 173 public void onConnectionInterrupted(int errorCode) { 174 connectionEventsStatistics.onConnectionInterruptedCalled++; 175 if (!disconnecting) { 176 System.out.println( 177 "Connection interrupted: error: " + errorCode + " " + CRT.awsErrorString(errorCode)); 178 } 179 } 180 181 @Override 182 public void onConnectionFailure(OnConnectionFailureReturn data) { 183 System.out.println("Connection failed with error: " + data.getErrorCode() + " " + CRT.awsErrorString(data.getErrorCode())); 184 connectionEventsStatistics.onConnectionFailureCalled++; 185 onConnectionFailureFuture.complete(data); 186 } 187 188 @Override 189 public void onConnectionSuccess(OnConnectionSuccessReturn data) { 190 System.out.println("Connection success. Session present: " + data.getSessionPresent()); 191 connectionEventsStatistics.onConnectionSuccessCalled++; 192 onConnectionSuccessFuture.complete(data); 193 } 194 195 @Override 196 public void onConnectionClosed(OnConnectionClosedReturn data) { 197 System.out.println("Connection disconnected successfully"); 198 connectionEventsStatistics.onConnectionClosedCalled++; 199 onConnectionClosedFuture.complete(data); 200 } 201 }; 202 203 // Default settings 204 boolean cleanSession = true; // only true is supported right now 205 int keepAliveSecs = 0; 206 int protocolOperationTimeout = 60000; 207 String clientId = TEST_CLIENTID + (UUID.randomUUID()).toString(); 208 209 try (MqttConnectionConfig config = new MqttConnectionConfig()) { 210 211 MqttClient client = null; 212 if (tlsContext != null) 213 { 214 client = new MqttClient(bootstrap, tlsContext); 215 } 216 else 217 { 218 client = new MqttClient(bootstrap); 219 } 220 221 config.setMqttClient(client); 222 config.setClientId(clientId); 223 config.setEndpoint(endpoint); 224 config.setPort(port); 225 config.setCleanSession(cleanSession); 226 config.setKeepAliveSecs(keepAliveSecs); 227 config.setProtocolOperationTimeoutMs(protocolOperationTimeout); 228 config.setConnectionCallbacks(events); 229 230 if (httpProxyOptions != null) { 231 config.setHttpProxyOptions(httpProxyOptions); 232 } 233 if (username != null) { 234 config.setUsername(username); 235 } 236 if (password != null) 237 { 238 config.setPassword(password); 239 } 240 241 if (connectionConfigTransformer != null) { 242 connectionConfigTransformer.accept(config); 243 } 244 245 try { 246 connection = new MqttClientConnection(config); 247 if (connectionMessageTransfomer != null) { 248 connection.onMessage(connectionMessageTransfomer); 249 } 250 CompletableFuture<Boolean> connected = connection.connect(); 251 connected.get(); 252 } finally { 253 client.close(); 254 } 255 return true; 256 } 257 } 258 } 259 connectWebsocketsWithCredentialsProvider(CredentialsProvider credentialsProvider, String endpoint, int port, TlsContext tlsContext, String username, String password, HttpProxyOptions httpProxyOptions)260 boolean connectWebsocketsWithCredentialsProvider(CredentialsProvider credentialsProvider, String endpoint, int port, TlsContext tlsContext, String username, String password, HttpProxyOptions httpProxyOptions) 261 { 262 // Return result 263 boolean result = false; 264 // Default settings 265 boolean cleanSession = true; // only true is supported right now 266 int keepAliveSecs = 0; 267 int protocolOperationTimeout = 60000; 268 String clientId = TEST_CLIENTID + (UUID.randomUUID()).toString(); 269 270 // Connection callback events 271 MqttClientConnectionEvents events = new MqttClientConnectionEvents() { 272 @Override 273 public void onConnectionResumed(boolean sessionPresent) { 274 System.out.println("Connection resumed"); 275 } 276 277 @Override 278 public void onConnectionInterrupted(int errorCode) { 279 if (!disconnecting) { 280 System.out.println( 281 "Connection interrupted: error: " + errorCode + " " + CRT.awsErrorString(errorCode)); 282 } 283 } 284 285 @Override 286 public void onConnectionFailure(OnConnectionFailureReturn data) { 287 System.out.println("Connection failed with error: " + data.getErrorCode() + " " + CRT.awsErrorString(data.getErrorCode())); 288 onConnectionFailureFuture.complete(data); 289 } 290 291 @Override 292 public void onConnectionSuccess(OnConnectionSuccessReturn data) { 293 System.out.println("Connection success. Session present: " + data.getSessionPresent()); 294 onConnectionSuccessFuture.complete(data); 295 } 296 297 @Override 298 public void onConnectionClosed(OnConnectionClosedReturn data) { 299 System.out.println("Connection disconnected successfully"); 300 onConnectionClosedFuture.complete(data); 301 } 302 }; 303 304 try (EventLoopGroup elg = new EventLoopGroup(1); 305 HostResolver hr = new HostResolver(elg); 306 ClientBootstrap bootstrap = new ClientBootstrap(elg, hr);) 307 { 308 try (MqttConnectionConfig config = new MqttConnectionConfig(); 309 AwsSigningConfig signingConfig = new AwsSigningConfig();) { 310 311 MqttClient client = null; 312 if (tlsContext != null) 313 { 314 client = new MqttClient(bootstrap, tlsContext); 315 } 316 else 317 { 318 client = new MqttClient(bootstrap); 319 } 320 321 config.setMqttClient(client); 322 config.setClientId(clientId); 323 config.setEndpoint(endpoint); 324 config.setPort(port); 325 config.setCleanSession(cleanSession); 326 config.setKeepAliveSecs(keepAliveSecs); 327 config.setProtocolOperationTimeoutMs(protocolOperationTimeout); 328 config.setUseWebsockets(true); 329 config.setConnectionCallbacks(events); 330 331 if (username != null) { 332 config.setUsername(username); 333 } 334 if (password != null) { 335 config.setPassword(password); 336 } 337 if (httpProxyOptions != null) { 338 config.setHttpProxyOptions(httpProxyOptions); 339 } 340 341 if (connectionConfigTransformer != null) { 342 connectionConfigTransformer.accept(config); 343 } 344 345 // Make the websocket transformer 346 if (credentialsProvider != null) { 347 signingConfig.setAlgorithm(AwsSigningConfig.AwsSigningAlgorithm.SIGV4); 348 // NOTE: Missing a credentials provider gives a non-helpful error. This needs to be changed in Java V2... 349 signingConfig.setCredentialsProvider(credentialsProvider); 350 } 351 signingConfig.setSignatureType(AwsSigningConfig.AwsSignatureType.HTTP_REQUEST_VIA_QUERY_PARAMS); 352 signingConfig.setRegion(TEST_REGION); 353 signingConfig.setService("iotdevicegateway"); 354 signingConfig.setOmitSessionToken(true); 355 try (MqttClientConnectionSigv4HandshakeTransformer transformer = new MqttClientConnectionSigv4HandshakeTransformer(signingConfig);) 356 { 357 config.setWebsocketHandshakeTransform(transformer); 358 connection = new MqttClientConnection(config); 359 if (connectionMessageTransfomer != null) { 360 connection.onMessage(connectionMessageTransfomer); 361 } 362 CompletableFuture<Boolean> connected = connection.connect(); 363 connected.get(); 364 result = true; 365 } 366 finally { 367 client.close(); 368 } 369 370 } catch (Exception ex) { 371 fail("Exception during connect: " + ex.toString()); 372 } 373 } 374 return result; 375 } 376 disconnect()377 void disconnect() { 378 disconnecting = true; 379 try { 380 CompletableFuture<Void> disconnected = connection.disconnect(); 381 disconnected.get(); 382 } catch (Exception ex) { 383 fail("Exception during disconnect: " + ex.getMessage()); 384 } 385 386 } 387 close()388 void close() { 389 connection.close(); 390 } 391 publish(String topic, byte[] payload, QualityOfService qos)392 CompletableFuture<Integer> publish(String topic, byte[] payload, QualityOfService qos) { 393 try { 394 MqttMessage messageToSend = new MqttMessage(topic, payload, qos); 395 return connection.publish(messageToSend); 396 } catch (Exception ex) { 397 fail("Exception during publish: " + ex.getMessage()); 398 } 399 return null; 400 } 401 checkOperationStatistics( long expectedIncompleteOperationCount, long expectedIncompleteOperationSize)402 void checkOperationStatistics( 403 long expectedIncompleteOperationCount, long expectedIncompleteOperationSize) { 404 try { 405 MqttClientConnectionOperationStatistics statistics = connection.getOperationStatistics(); 406 407 long incomplete_ops_count = statistics.getIncompleteOperationCount(); 408 long incomplete_ops_size = statistics.getIncompleteOperationSize(); 409 410 if (incomplete_ops_count != expectedIncompleteOperationCount) { 411 fail("Incomplete operations count:" + incomplete_ops_count + " did not equal expected value:" + expectedIncompleteOperationCount); 412 } 413 414 if (incomplete_ops_size != expectedIncompleteOperationSize) { 415 fail("Incomplete operations size:" + incomplete_ops_size + " did not equal expected value:" + expectedIncompleteOperationSize); 416 } 417 418 // TODO - check unacked operations once we have a way to consistently check them. Part of the issue right now 419 // is that unacked operations are heavily dependent on the socket and when incomplete operations get to the socket 420 // and so the speed of the socket can mess up the test. 421 422 } catch (Exception ex) { 423 fail("Exception during operation statistics check: " + ex.getMessage()); 424 } 425 } 426 sleepForMilliseconds(long secondsToSleep)427 void sleepForMilliseconds(long secondsToSleep) { 428 try { 429 Thread.sleep(secondsToSleep); 430 } catch (Exception ex) { 431 fail("Exception during sleep: " + ex.getMessage()); 432 } 433 } 434 waitForConnectSuccess()435 OnConnectionSuccessReturn waitForConnectSuccess() throws Exception { 436 return onConnectionSuccessFuture.get(60, TimeUnit.SECONDS); 437 } 438 waitForConnectFailure()439 OnConnectionFailureReturn waitForConnectFailure() throws Exception { 440 return onConnectionFailureFuture.get(60, TimeUnit.SECONDS); 441 } 442 waitForConnectClose()443 OnConnectionClosedReturn waitForConnectClose() throws Exception { 444 return onConnectionClosedFuture.get(60, TimeUnit.SECONDS); 445 } 446 } 447 448