• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6  package software.amazon.awssdk.crt.test;
7 
8 import static org.junit.Assert.*;
9 
10 import software.amazon.awssdk.crt.CRT;
11 import software.amazon.awssdk.crt.auth.credentials.CredentialsProvider;
12 import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig;
13 import software.amazon.awssdk.crt.http.HttpProxyOptions;
14 import software.amazon.awssdk.crt.io.ClientBootstrap;
15 import software.amazon.awssdk.crt.io.EventLoopGroup;
16 import software.amazon.awssdk.crt.io.HostResolver;
17 import software.amazon.awssdk.crt.io.TlsContext;
18 import software.amazon.awssdk.crt.mqtt.*;
19 
20 import java.nio.file.Path;
21 import java.util.UUID;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 
26  class MissingCredentialsException extends RuntimeException {
MissingCredentialsException(String message)27      MissingCredentialsException(String message) {
28          super(message);
29      }
30  }
31 
32  public class MqttClientConnectionFixture extends CrtTestFixture {
33 
34     MqttClientConnection connection = null;
35     private boolean disconnecting = false;
36 
37     private CompletableFuture<OnConnectionSuccessReturn> onConnectionSuccessFuture = new CompletableFuture<OnConnectionSuccessReturn>();
38     private CompletableFuture<OnConnectionFailureReturn> onConnectionFailureFuture = new CompletableFuture<OnConnectionFailureReturn>();
39     private CompletableFuture<OnConnectionClosedReturn> onConnectionClosedFuture = new CompletableFuture<OnConnectionClosedReturn>();
40 
41     static final boolean AWS_TEST_IS_CI = System.getProperty("AWS_TEST_IS_CI") != null;
42     static final String AWS_TEST_MQTT311_ROOTCA = System.getProperty("AWS_TEST_MQTT311_ROOT_CA");
43     // Static credential related
44     static final String AWS_TEST_MQTT311_ROLE_CREDENTIAL_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT311_ROLE_CREDENTIAL_ACCESS_KEY");
45     static final String AWS_TEST_MQTT311_ROLE_CREDENTIAL_SECRET_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT311_ROLE_CREDENTIAL_SECRET_ACCESS_KEY");
46     static final String AWS_TEST_MQTT311_ROLE_CREDENTIAL_SESSION_TOKEN = System.getProperty("AWS_TEST_MQTT311_ROLE_CREDENTIAL_SESSION_TOKEN");
47     // Custom Key Ops
48     static final String AWS_TEST_MQTT311_CUSTOM_KEY_OPS_KEY = System.getProperty("AWS_TEST_MQTT311_CUSTOM_KEY_OPS_KEY");
49     static final String AWS_TEST_MQTT311_CUSTOM_KEY_OPS_CERT = System.getProperty("AWS_TEST_MQTT311_CUSTOM_KEY_OPS_CERT");
50     // MQTT311 Cognito
51     static final String AWS_TEST_MQTT311_COGNITO_ENDPOINT = System.getProperty("AWS_TEST_MQTT311_COGNITO_ENDPOINT");
52     static final String AWS_TEST_MQTT311_COGNITO_IDENTITY = System.getProperty("AWS_TEST_MQTT311_COGNITO_IDENTITY");
53     // MQTT311 Codebuild/Direct connections data
54     static final String AWS_TEST_MQTT311_DIRECT_MQTT_HOST = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_HOST");
55     static final String AWS_TEST_MQTT311_DIRECT_MQTT_PORT = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_PORT");
56     static final String AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_HOST");
57     static final String AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_PORT");
58     static final String AWS_TEST_MQTT311_DIRECT_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_TLS_HOST");
59     static final String AWS_TEST_MQTT311_DIRECT_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT311_DIRECT_MQTT_TLS_PORT");
60     // MQTT311 Codebuild/Websocket connections data
61     static final String AWS_TEST_MQTT311_WS_MQTT_HOST = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_HOST");
62     static final String AWS_TEST_MQTT311_WS_MQTT_PORT = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_PORT");
63     static final String AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_HOST");
64     static final String AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_PORT");
65     static final String AWS_TEST_MQTT311_WS_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_TLS_HOST");
66     static final String AWS_TEST_MQTT311_WS_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT311_WS_MQTT_TLS_PORT");
67     // MQTT311 Codebuild misc connections data
68     static final String AWS_TEST_MQTT311_BASIC_AUTH_USERNAME = System.getProperty("AWS_TEST_MQTT311_BASIC_AUTH_USERNAME");
69     static final String AWS_TEST_MQTT311_BASIC_AUTH_PASSWORD = System.getProperty("AWS_TEST_MQTT311_BASIC_AUTH_PASSWORD");
70     static final String AWS_TEST_MQTT311_CERTIFICATE_FILE = System.getProperty("AWS_TEST_MQTT311_CERTIFICATE_FILE");
71     static final String AWS_TEST_MQTT311_KEY_FILE = System.getProperty("AWS_TEST_MQTT311_KEY_FILE");
72     // MQTT311 IoT Endpoint, Key, Cert
73     static final String AWS_TEST_MQTT311_IOT_CORE_HOST = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_HOST");
74     static final String AWS_TEST_MQTT311_IOT_CORE_RSA_CERT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT");
75     static final String AWS_TEST_MQTT311_IOT_CORE_RSA_KEY = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY");
76     static final String AWS_TEST_MQTT311_IOT_CORE_ECC_CERT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_ECC_CERT");
77     static final String AWS_TEST_MQTT311_IOT_CORE_ECC_KEY = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_ECC_KEY");
78     // MQTT311 Proxy
79     static final String AWS_TEST_MQTT311_PROXY_HOST = System.getProperty("AWS_TEST_MQTT311_PROXY_HOST");
80     static final String AWS_TEST_MQTT311_PROXY_PORT = System.getProperty("AWS_TEST_MQTT311_PROXY_PORT");
81     // MQTT311 Keystore
82     static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_FORMAT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_FORMAT");
83     static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_FILE = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_FILE");
84     static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_PASSWORD = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_PASSWORD");
85     static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_CERT_ALIAS = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_CERT_ALIAS");
86     static final String AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_CERT_PASSWORD = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_KEYSTORE_CERT_PASSWORD");
87     // MQTT311 PKCS12
88     static final String AWS_TEST_MQTT311_IOT_CORE_PKCS12_KEY = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_PKCS12_KEY");
89     static final String AWS_TEST_MQTT311_IOT_CORE_PKCS12_KEY_PASSWORD = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_PKCS12_KEY_PASSWORD");
90     // MQTT311 PKCS11
91     static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_LIB = System.getProperty("AWS_TEST_PKCS11_LIB");
92     static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_TOKEN_LABEL = System.getProperty("AWS_TEST_PKCS11_TOKEN_LABEL");
93     static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_PIN = System.getProperty("AWS_TEST_PKCS11_PIN");
94     static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_PKEY_LABEL = System.getProperty("AWS_TEST_PKCS11_PKEY_LABEL");
95     static final String AWS_TEST_MQTT311_IOT_CORE_PKCS11_CERT_FILE = System.getProperty("AWS_TEST_PKCS11_CERT_FILE");
96     // MQTT311 X509
97     static final String AWS_TEST_MQTT311_IOT_CORE_X509_CERT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_CERT");
98     static final String AWS_TEST_MQTT311_IOT_CORE_X509_KEY = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_KEY");
99     static final String AWS_TEST_MQTT311_IOT_CORE_X509_ENDPOINT = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_ENDPOINT");
100     static final String AWS_TEST_MQTT311_IOT_CORE_X509_ROLE_ALIAS = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_ROLE_ALIAS");
101     static final String AWS_TEST_MQTT311_IOT_CORE_X509_THING_NAME = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_X509_THING_NAME");
102     // MQTT311 Windows Cert Store
103     static final String AWS_TEST_MQTT311_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS");
104     static final String AWS_TEST_MQTT311_IOT_CORE_WINDOWS_CERT_STORE = System.getProperty("AWS_TEST_MQTT311_IOT_CORE_WINDOWS_CERT_STORE");
105 
106     // MQTT5 Custom Key Ops (so we don't have to make a new file just for a single test)
107     static final String AWS_TEST_MQTT5_IOT_CORE_HOST = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_HOST");
108     static final String AWS_TEST_MQTT5_CUSTOM_KEY_OPS_CERT = System.getProperty("AWS_TEST_MQTT5_CUSTOM_KEY_OPS_CERT");
109     static final String AWS_TEST_MQTT5_CUSTOM_KEY_OPS_KEY = System.getProperty("AWS_TEST_MQTT5_CUSTOM_KEY_OPS_KEY");
110 
111     static final short TEST_PORT = 8883;
112     static final short TEST_PORT_ALPN = 443;
113     static final String TEST_CLIENTID = "aws-crt-java-";
114     static final String TEST_REGION = "us-east-1";
115 
116     Path pathToCert = null;
117     Path pathToKey = null;
118     Path pathToCa = null;
119 
120     String certificatePem = null;
121     String privateKeyPem = null;
122     String caRoot = null;
123     String iotEndpoint = null;
124 
125     class ConnectionEventsStatistics {
126         int onConnectionSuccessCalled;
127         int onConnectionFailureCalled;
128         int onConnectionResumedCalled;
129         int onConnectionInterruptedCalled;
130         int onConnectionClosedCalled;
131     };
132 
133     ConnectionEventsStatistics connectionEventsStatistics = new ConnectionEventsStatistics();
134 
135     Consumer<MqttConnectionConfig> connectionConfigTransformer = null;
setConnectionConfigTransformer(Consumer<MqttConnectionConfig> connectionConfigTransformer)136     protected void setConnectionConfigTransformer(Consumer<MqttConnectionConfig> connectionConfigTransformer) {
137         this.connectionConfigTransformer = connectionConfigTransformer;
138     }
139 
140     Consumer<MqttMessage> connectionMessageTransfomer = null;
setConnectionMessageTransformer(Consumer<MqttMessage> connectionMessageTransfomer)141     protected void setConnectionMessageTransformer(Consumer<MqttMessage> connectionMessageTransfomer) {
142         this.connectionMessageTransfomer = connectionMessageTransfomer;
143     }
144 
MqttClientConnectionFixture()145     MqttClientConnectionFixture() {
146     }
147 
connectDirectWithConfig(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions)148     boolean connectDirectWithConfig(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions)
149     {
150         try {
151             return connectDirectWithConfigThrows(tlsContext, endpoint, port, username, password, httpProxyOptions);
152         } catch (Exception ex) {
153             fail("Exception during connect: " + ex.toString());
154         }
155         return false;
156     }
157 
connectDirectWithConfigThrows(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions)158     boolean connectDirectWithConfigThrows(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions) throws Exception
159     {
160         try(EventLoopGroup elg = new EventLoopGroup(1);
161             HostResolver hr = new HostResolver(elg);
162             ClientBootstrap bootstrap = new ClientBootstrap(elg, hr);) {
163 
164             // Connection callback events
165             MqttClientConnectionEvents events = new MqttClientConnectionEvents() {
166                 @Override
167                 public void onConnectionResumed(boolean sessionPresent) {
168                     connectionEventsStatistics.onConnectionResumedCalled++;
169                     System.out.println("Connection resumed");
170                 }
171 
172                 @Override
173                 public void onConnectionInterrupted(int errorCode) {
174                     connectionEventsStatistics.onConnectionInterruptedCalled++;
175                     if (!disconnecting) {
176                         System.out.println(
177                                 "Connection interrupted: error: " + errorCode + " " + CRT.awsErrorString(errorCode));
178                     }
179                 }
180 
181                 @Override
182                 public void onConnectionFailure(OnConnectionFailureReturn data) {
183                     System.out.println("Connection failed with error: " + data.getErrorCode() + " " + CRT.awsErrorString(data.getErrorCode()));
184                     connectionEventsStatistics.onConnectionFailureCalled++;
185                     onConnectionFailureFuture.complete(data);
186                 }
187 
188                 @Override
189                 public void onConnectionSuccess(OnConnectionSuccessReturn data) {
190                     System.out.println("Connection success. Session present: " + data.getSessionPresent());
191                     connectionEventsStatistics.onConnectionSuccessCalled++;
192                     onConnectionSuccessFuture.complete(data);
193                 }
194 
195                 @Override
196                 public void onConnectionClosed(OnConnectionClosedReturn data) {
197                     System.out.println("Connection disconnected successfully");
198                     connectionEventsStatistics.onConnectionClosedCalled++;
199                     onConnectionClosedFuture.complete(data);
200                 }
201             };
202 
203             // Default settings
204             boolean cleanSession = true; // only true is supported right now
205             int keepAliveSecs = 0;
206             int protocolOperationTimeout = 60000;
207             String clientId = TEST_CLIENTID + (UUID.randomUUID()).toString();
208 
209             try (MqttConnectionConfig config = new MqttConnectionConfig()) {
210 
211                 MqttClient client = null;
212                 if (tlsContext != null)
213                 {
214                     client = new MqttClient(bootstrap, tlsContext);
215                 }
216                 else
217                 {
218                     client = new MqttClient(bootstrap);
219                 }
220 
221                 config.setMqttClient(client);
222                 config.setClientId(clientId);
223                 config.setEndpoint(endpoint);
224                 config.setPort(port);
225                 config.setCleanSession(cleanSession);
226                 config.setKeepAliveSecs(keepAliveSecs);
227                 config.setProtocolOperationTimeoutMs(protocolOperationTimeout);
228                 config.setConnectionCallbacks(events);
229 
230                 if (httpProxyOptions != null) {
231                     config.setHttpProxyOptions(httpProxyOptions);
232                 }
233                 if (username != null) {
234                     config.setUsername(username);
235                 }
236                 if (password != null)
237                 {
238                     config.setPassword(password);
239                 }
240 
241                 if (connectionConfigTransformer != null) {
242                     connectionConfigTransformer.accept(config);
243                 }
244 
245                 try {
246                     connection = new MqttClientConnection(config);
247                     if (connectionMessageTransfomer != null) {
248                         connection.onMessage(connectionMessageTransfomer);
249                     }
250                     CompletableFuture<Boolean> connected = connection.connect();
251                     connected.get();
252                 } finally {
253                     client.close();
254                 }
255                 return true;
256             }
257         }
258     }
259 
connectWebsocketsWithCredentialsProvider(CredentialsProvider credentialsProvider, String endpoint, int port, TlsContext tlsContext, String username, String password, HttpProxyOptions httpProxyOptions)260     boolean connectWebsocketsWithCredentialsProvider(CredentialsProvider credentialsProvider, String endpoint, int port, TlsContext tlsContext, String username, String password, HttpProxyOptions httpProxyOptions)
261     {
262         // Return result
263         boolean result = false;
264         // Default settings
265         boolean cleanSession = true; // only true is supported right now
266         int keepAliveSecs = 0;
267         int protocolOperationTimeout = 60000;
268         String clientId = TEST_CLIENTID + (UUID.randomUUID()).toString();
269 
270         // Connection callback events
271         MqttClientConnectionEvents events = new MqttClientConnectionEvents() {
272             @Override
273             public void onConnectionResumed(boolean sessionPresent) {
274                 System.out.println("Connection resumed");
275             }
276 
277             @Override
278             public void onConnectionInterrupted(int errorCode) {
279                 if (!disconnecting) {
280                     System.out.println(
281                             "Connection interrupted: error: " + errorCode + " " + CRT.awsErrorString(errorCode));
282                 }
283             }
284 
285             @Override
286             public void onConnectionFailure(OnConnectionFailureReturn data) {
287                 System.out.println("Connection failed with error: " + data.getErrorCode() + " " + CRT.awsErrorString(data.getErrorCode()));
288                 onConnectionFailureFuture.complete(data);
289             }
290 
291             @Override
292             public void onConnectionSuccess(OnConnectionSuccessReturn data) {
293                 System.out.println("Connection success. Session present: " + data.getSessionPresent());
294                 onConnectionSuccessFuture.complete(data);
295             }
296 
297             @Override
298             public void onConnectionClosed(OnConnectionClosedReturn data) {
299                 System.out.println("Connection disconnected successfully");
300                 onConnectionClosedFuture.complete(data);
301             }
302         };
303 
304         try (EventLoopGroup elg = new EventLoopGroup(1);
305             HostResolver hr = new HostResolver(elg);
306             ClientBootstrap bootstrap = new ClientBootstrap(elg, hr);)
307         {
308             try (MqttConnectionConfig config = new MqttConnectionConfig();
309                  AwsSigningConfig signingConfig = new AwsSigningConfig();) {
310 
311                 MqttClient client = null;
312                 if (tlsContext != null)
313                 {
314                     client = new MqttClient(bootstrap, tlsContext);
315                 }
316                 else
317                 {
318                     client = new MqttClient(bootstrap);
319                 }
320 
321                 config.setMqttClient(client);
322                 config.setClientId(clientId);
323                 config.setEndpoint(endpoint);
324                 config.setPort(port);
325                 config.setCleanSession(cleanSession);
326                 config.setKeepAliveSecs(keepAliveSecs);
327                 config.setProtocolOperationTimeoutMs(protocolOperationTimeout);
328                 config.setUseWebsockets(true);
329                 config.setConnectionCallbacks(events);
330 
331                 if (username != null) {
332                     config.setUsername(username);
333                 }
334                 if (password != null) {
335                     config.setPassword(password);
336                 }
337                 if (httpProxyOptions != null) {
338                     config.setHttpProxyOptions(httpProxyOptions);
339                 }
340 
341                 if (connectionConfigTransformer != null) {
342                     connectionConfigTransformer.accept(config);
343                 }
344 
345                 // Make the websocket transformer
346                 if (credentialsProvider != null) {
347                     signingConfig.setAlgorithm(AwsSigningConfig.AwsSigningAlgorithm.SIGV4);
348                     // NOTE: Missing a credentials provider gives a non-helpful error. This needs to be changed in Java V2...
349                     signingConfig.setCredentialsProvider(credentialsProvider);
350                 }
351                 signingConfig.setSignatureType(AwsSigningConfig.AwsSignatureType.HTTP_REQUEST_VIA_QUERY_PARAMS);
352                 signingConfig.setRegion(TEST_REGION);
353                 signingConfig.setService("iotdevicegateway");
354                 signingConfig.setOmitSessionToken(true);
355                 try (MqttClientConnectionSigv4HandshakeTransformer transformer = new MqttClientConnectionSigv4HandshakeTransformer(signingConfig);)
356                 {
357                     config.setWebsocketHandshakeTransform(transformer);
358                     connection = new MqttClientConnection(config);
359                     if (connectionMessageTransfomer != null) {
360                         connection.onMessage(connectionMessageTransfomer);
361                     }
362                     CompletableFuture<Boolean> connected = connection.connect();
363                     connected.get();
364                     result = true;
365                 }
366                 finally {
367                     client.close();
368                 }
369 
370             } catch (Exception ex) {
371                 fail("Exception during connect: " + ex.toString());
372             }
373         }
374         return result;
375     }
376 
disconnect()377     void disconnect() {
378         disconnecting = true;
379         try {
380             CompletableFuture<Void> disconnected = connection.disconnect();
381             disconnected.get();
382         } catch (Exception ex) {
383             fail("Exception during disconnect: " + ex.getMessage());
384         }
385 
386     }
387 
close()388     void close() {
389         connection.close();
390     }
391 
publish(String topic, byte[] payload, QualityOfService qos)392     CompletableFuture<Integer> publish(String topic, byte[] payload, QualityOfService qos) {
393         try {
394             MqttMessage messageToSend = new MqttMessage(topic, payload, qos);
395             return connection.publish(messageToSend);
396         } catch (Exception ex) {
397             fail("Exception during publish: " + ex.getMessage());
398         }
399         return null;
400     }
401 
checkOperationStatistics( long expectedIncompleteOperationCount, long expectedIncompleteOperationSize)402     void checkOperationStatistics(
403         long expectedIncompleteOperationCount, long expectedIncompleteOperationSize) {
404         try {
405             MqttClientConnectionOperationStatistics statistics = connection.getOperationStatistics();
406 
407             long incomplete_ops_count = statistics.getIncompleteOperationCount();
408             long incomplete_ops_size = statistics.getIncompleteOperationSize();
409 
410             if (incomplete_ops_count != expectedIncompleteOperationCount) {
411                 fail("Incomplete operations count:" + incomplete_ops_count + " did not equal expected value:" + expectedIncompleteOperationCount);
412             }
413 
414             if (incomplete_ops_size != expectedIncompleteOperationSize) {
415                 fail("Incomplete operations size:" + incomplete_ops_size + " did not equal expected value:" + expectedIncompleteOperationSize);
416             }
417 
418             // TODO - check unacked operations once we have a way to consistently check them. Part of the issue right now
419             // is that unacked operations are heavily dependent on the socket and when incomplete operations get to the socket
420             // and so the speed of the socket can mess up the test.
421 
422         } catch (Exception ex) {
423             fail("Exception during operation statistics check: " + ex.getMessage());
424         }
425     }
426 
sleepForMilliseconds(long secondsToSleep)427     void sleepForMilliseconds(long secondsToSleep) {
428         try {
429             Thread.sleep(secondsToSleep);
430         } catch (Exception ex) {
431             fail("Exception during sleep: " + ex.getMessage());
432         }
433     }
434 
waitForConnectSuccess()435     OnConnectionSuccessReturn waitForConnectSuccess() throws Exception {
436         return onConnectionSuccessFuture.get(60, TimeUnit.SECONDS);
437     }
438 
waitForConnectFailure()439     OnConnectionFailureReturn waitForConnectFailure() throws Exception {
440         return onConnectionFailureFuture.get(60, TimeUnit.SECONDS);
441     }
442 
waitForConnectClose()443     OnConnectionClosedReturn waitForConnectClose() throws Exception {
444         return onConnectionClosedFuture.get(60, TimeUnit.SECONDS);
445     }
446 }
447 
448