• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 package canary.mqtt5;
6 
7 import java.io.PrintWriter;
8 import java.net.SocketOption;
9 import java.time.temporal.ChronoUnit;
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.Random;
13 import java.util.UUID;
14 import java.util.concurrent.CompletableFuture;
15 import java.util.concurrent.TimeUnit;
16 import java.util.function.Consumer;
17 
18 import software.amazon.awssdk.crt.CRT;
19 import software.amazon.awssdk.crt.CrtResource;
20 import software.amazon.awssdk.crt.Log;
21 import software.amazon.awssdk.crt.Log.LogLevel;
22 import software.amazon.awssdk.crt.Log.LogSubject;
23 import software.amazon.awssdk.crt.io.ClientBootstrap;
24 import software.amazon.awssdk.crt.io.EventLoopGroup;
25 import software.amazon.awssdk.crt.io.HostResolver;
26 import software.amazon.awssdk.crt.io.SocketOptions;
27 import software.amazon.awssdk.crt.io.TlsContext;
28 import software.amazon.awssdk.crt.io.TlsContextOptions;
29 import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions.JitterMode;
30 import software.amazon.awssdk.crt.mqtt5.*;
31 import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.Mqtt5ClientOptionsBuilder;
32 import software.amazon.awssdk.crt.mqtt5.packets.*;
33 
34 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket.ConnectPacketBuilder;
35 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectPacketBuilder;
36 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectReasonCode;
37 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket.PublishPacketBuilder;
38 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket.PayloadFormatIndicator;
39 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.SubscribePacketBuilder;
40 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.RetainHandlingType;
41 import software.amazon.awssdk.crt.mqtt5.packets.UnsubscribePacket.UnsubscribePacketBuilder;
42 
43 public class Mqtt5Canary {
44     static String configEndpoint = "localhost";
45     static Long configPort = 1883L;
46     static String configCaFile = null;
47     static String configCertFile = null;
48     static String configKeyFile = null;
49     static String configClientID = "MQTT5_Sample_Java_" + UUID.randomUUID().toString();
50     static boolean configUseWebsockets = false;
51     static boolean configUseTls = false;
52     static Integer configThreads = 8;
53     static Integer configClients = 3;
54     static Integer configTps = 12;
55     static Long configSeconds = 600L;
56     static boolean configShowHelp = false;
57     static boolean configLogStdout = true;
58     static boolean configLogAWS = false;
59     static LogLevel configLogAWSLevel = LogLevel.Debug;
60     static String configLogFile;
61     static PrintWriter configFilePrinter;
62 
63     static List<Mqtt5Client> clients = new ArrayList<Mqtt5Client>();
64     static List<ClientsData> clientsData = new ArrayList<ClientsData>();
65     static TlsContext clientsContext = null;
66     static EventLoopGroup clientsEventLoopGroup;
67     static HostResolver clientsHostResolver;
68     static ClientBootstrap clientsBootstrap;
69     static SocketOptions clientsSocketOptions;
70     static Consumer<Mqtt5WebsocketHandshakeTransformArgs> clientsWebsocketTransform = null;
71     static CanaryLifecycleEvents clientsLifecycleEvents = new CanaryLifecycleEvents();
72     static CanaryPublishEvents clientsPublishEvents = new CanaryPublishEvents();
73 
74     static Random random = new Random();
75     static java.time.LocalDateTime startDateTime;
76 
77     static int operationFutureWaitTime = 30;
78 
79     private static final int MAX_PAYLOAD_SIZE = 65535; // Use UINT64_MAX for the payload size
80 
printUsage()81     static void printUsage() {
82         System.out.println(
83                 "Usage:\n" +
84                         "  --help            This message\n"+
85                         "  --endpoint        MQTT5 endpoint hostname (optional, default=localhost)\n"+
86                         "  --port            MQTT5 endpoint port to use (optional, default=1883)\n"+
87                         "  --ca_file         A path to a CA certificate file (optional)\n"+
88                         "  --cert            A path to a certificate file (optional, will use mTLS if defined)\n" +
89                         "  --key             A path to a private key file (optional, will use mTLS if defined)\n" +
90                         "  --clientID        The ClientID to connect with (optional, default=MQTT5_Sample_Java_<UUID>)\n"+
91                         "  --use_websockets  If defined, websockets will be used (optional)\n"+
92                         "  --use_tls         If defined, TLS (or mTLS) will be used (optional)\n"+
93                         "\n"+
94                         " --threads          The number of EventLoop group threads to use (optional, default=8)\n"+
95                         " --clients          The number of clients to use (optional, default=3, max=50)\n"+
96                         " --tps              The number of seconds to wait after performing an operation (optional, default=12)\n"+
97                         " --seconds          The number of seconds to run the Canary test (optional, default=600)\n"+
98                         " --log_console      If defined, logging will print to stdout (optional, default=true, type=boolean)\n"+
99                         " --log_aws          If defined, logging will occur using the AWS Logger (optional, type=boolean) \n"+
100                         " --log_aws_level    If defined, logging to AWS Logger will use that log level (optional, default=Debug)\n"+
101                         " --log_file         If defined, logging will be written to this file (optional)"
102         );
103     }
104 
parseCommandLine(String[] args)105     static void parseCommandLine(String[] args) {
106         for (int idx = 0; idx < args.length; ++idx) {
107             switch (args[idx]) {
108                 case "--help":
109                     configShowHelp = true;
110                     break;
111                 case "--endpoint":
112                     if (idx + 1 < args.length) {
113                         configEndpoint = args[++idx];
114                     }
115                     break;
116                 case "--port":
117                     if (idx + 1 < args.length) {
118                         configPort = Long.parseLong(args[++idx]);
119                     }
120                     break;
121                 case "--ca_file":
122                     if (idx + 1 < args.length) {
123                         configCaFile = args[++idx];
124                     }
125                     break;
126                 case "--cert":
127                     if (idx + 1 < args.length) {
128                         configCertFile = args[++idx];
129                     }
130                     break;
131                 case "--key":
132                     if (idx + 1 < args.length) {
133                         configKeyFile = args[++idx];
134                     }
135                     break;
136                 case "--clientID":
137                     if (idx + 1 < args.length) {
138                         configClientID = args[++idx];
139                     }
140                     break;
141                 case "--use_websockets":
142                     configUseWebsockets = true;
143                     break;
144                 case "--use_tls":
145                     configUseTls = true;
146                     break;
147                 case "--threads":
148                     if (idx + 1 < args.length) {
149                         configThreads = Integer.parseInt(args[++idx]);
150                     }
151                     break;
152                 case "--clients":
153                     if (idx + 1 < args.length) {
154                         configClients = Integer.parseInt(args[++idx]);
155                     }
156                     break;
157                 case "--tps":
158                     if (idx + 1 < args.length) {
159                         int tps = Integer.parseInt(args[++idx]);
160                         if (tps == 0) {
161                             configTps = 0;
162                         } else {
163                             configTps = 1000 / tps;
164                         }
165                     }
166                     break;
167                 case "--seconds":
168                     if (idx + 1 < args.length) {
169                         configSeconds = Long.parseLong(args[++idx]);
170                     }
171                     break;
172                 case "--log_console":
173                     if (idx + 1 < args.length) {
174                         configLogStdout = Boolean.parseBoolean(args[++idx]);
175                     }
176                     break;
177                 case "--log_aws":
178                     if (idx + 1 < args.length) {
179                         configLogAWS = Boolean.parseBoolean(args[++idx]);
180                     }
181                     break;
182                 case "--log_aws_level":
183                     if (idx + 1 < args.length) {
184                         configLogAWSLevel = LogLevel.valueOf(args[++idx]);
185                     }
186                     break;
187                 case "--log_file":
188                     if (idx + 1 < args.length) {
189                         configLogFile = args[++idx];
190                     }
191                     break;
192                 default:
193                     System.out.println("Unrecognized argument: " + args[idx]);
194             }
195         }
196     }
197 
PrintLog(String message)198     static void PrintLog(String message) {
199         if (configLogStdout == true) {
200             System.out.println("[CANARY] " + message);
201         }
202         if (configFilePrinter != null) {
203             configFilePrinter.println(message);
204         }
205         if (configLogAWS == true) {
206             Log.log(configLogAWSLevel, LogSubject.MqttClient, "[CANARY] " + message);
207         }
208     }
209 
exitWithError(int errorCode)210     static void exitWithError(int errorCode) {
211         if (configFilePrinter != null) {
212             configFilePrinter.close();
213         }
214         System.exit(errorCode);
215     }
216 
217     static final class ClientsData {
218         CompletableFuture<Void> connectedFuture = new CompletableFuture<>();
219         CompletableFuture<Void> stopFuture = new CompletableFuture<>();
220         String sharedTopic = "test/shared_topic";
221         String clientId = "";
222         boolean subscribedToTopics = false;
223         boolean isWaitingForOperation = false;
224     }
225 
226     enum CANARY_OPERATIONS {
227         OPERATION_NULL,
228         OPERATION_START,
229         OPERATION_STOP,
230         OPERATION_SUBSCRIBE,
231         OPERATION_UNSUBSCRIBE,
232         OPERATION_UNSUBSCRIBE_BAD,
233         OPERATION_PUBLISH_QOS0,
234         OPERATION_PUBLISH_QOS1,
235         OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0,
236         OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1,
237         OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS0,
238         OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS1,
239     }
240     static List<CANARY_OPERATIONS> clientsOperationsList = new ArrayList<CANARY_OPERATIONS>();
241 
242     static final class CanaryLifecycleEvents implements Mqtt5ClientOptions.LifecycleEvents {
243         @Override
onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn)244         public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) {}
245 
246         @Override
onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn)247         public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) {
248             ConnAckPacket connAckData = onConnectionSuccessReturn.getConnAckPacket();
249             NegotiatedSettings negotiatedSettings = onConnectionSuccessReturn.getNegotiatedSettings();
250             int clientIdx = clients.indexOf(client);
251 
252             if (connAckData.getReasonCode() == ConnAckPacket.ConnectReasonCode.SUCCESS) {
253                 PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection success...");
254                 clientsData.get(clientIdx).clientId = negotiatedSettings.getAssignedClientID();
255                 clientsData.get(clientIdx).connectedFuture.complete(null);
256                 clientsData.get(clientIdx).stopFuture = new CompletableFuture<>();
257             } else {
258                 PrintLog("[Lifecycle event] Client ID " + clientIdx + " ConnAckPacket code: " + connAckData.getReasonCode().toString());
259                 clientsData.get(clientIdx).connectedFuture.completeExceptionally(new Exception("Connection failure"));
260             }
261         }
262 
263         @Override
onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn)264         public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) {
265             int clientIdx = clients.indexOf(client);
266             PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection failed with errorCode : " + onConnectionFailureReturn.getErrorCode());
267             clientsData.get(clientIdx).connectedFuture.completeExceptionally(new Exception("Connection failure"));
268             clientsData.get(clientIdx).subscribedToTopics = false;
269         }
270 
271         @Override
onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn)272         public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) {
273             int clientIdx = clients.indexOf(client);
274             PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection disconnected...");
275             PrintLog("[Lifecycle event] Client ID " + clientIdx + " Disconnection error code: " + Integer.toString(onDisconnectionReturn.getErrorCode()));
276             clientsData.get(clientIdx).connectedFuture = new CompletableFuture<>();
277             clientsData.get(clientIdx).subscribedToTopics = false;
278         }
279 
280         @Override
onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn)281         public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {
282             int clientIdx = clients.indexOf(client);
283             PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection stopped...");
284             clientsData.get(clientIdx).connectedFuture = new CompletableFuture<>();
285             clientsData.get(clientIdx).stopFuture.complete(null);
286             clientsData.get(clientIdx).subscribedToTopics = false;
287         }
288     }
289 
290     static final class CanaryPublishEvents implements Mqtt5ClientOptions.PublishEvents {
291         @Override
onMessageReceived(Mqtt5Client client, PublishReturn publishReturn)292         public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
293             PublishPacket publishPacket = publishReturn.getPublishPacket();
294             int clientIdx = clients.indexOf(client);
295             PrintLog("[Publish event] Client ID " + clientIdx + " message received:\n" +
296                     "  Topic: " + publishPacket.getTopic() + "\n");
297         }
298     }
299 
300     // ================================================================================
301     // SETUP FUNCTIONS
302     // ================================================================================
303 
setupClients()304     public static void setupClients() {
305         // Create the builder
306         Mqtt5ClientOptionsBuilder clientOptionsBuilder = new Mqtt5ClientOptionsBuilder(configEndpoint, configPort);
307 
308         clientsEventLoopGroup = new EventLoopGroup(configThreads);
309         clientsHostResolver = new HostResolver(clientsEventLoopGroup);
310         clientsBootstrap = new ClientBootstrap(clientsEventLoopGroup, clientsHostResolver);
311         clientOptionsBuilder.withBootstrap(clientsBootstrap);
312 
313         clientsSocketOptions = new SocketOptions();
314         clientsSocketOptions.type = SocketOptions.SocketType.STREAM;
315         clientsSocketOptions.connectTimeoutMs = 60000;
316         clientsSocketOptions.keepAliveTimeoutSecs = 0;
317         clientsSocketOptions.keepAliveIntervalSecs = 0;
318         clientOptionsBuilder.withSocketOptions(clientsSocketOptions);
319 
320         clientOptionsBuilder.withSessionBehavior(Mqtt5ClientOptions.ClientSessionBehavior.CLEAN);
321         clientOptionsBuilder.withLifecycleEvents(clientsLifecycleEvents);
322         clientOptionsBuilder.withRetryJitterMode(JitterMode.None);
323         clientOptionsBuilder.withMinReconnectDelayMs(60000L);
324         clientOptionsBuilder.withMaxReconnectDelayMs(120000L);
325         clientOptionsBuilder.withMinConnectedTimeToResetReconnectDelayMs(30000L);
326         clientOptionsBuilder.withPingTimeoutMs(20000L);
327         clientOptionsBuilder.withPublishEvents(clientsPublishEvents);
328 
329         if (configUseTls == true || configCertFile != null || configKeyFile != null) {
330             TlsContextOptions tlsContextOptions = TlsContextOptions.createDefaultClient();
331             if (configCertFile != null || configKeyFile != null) {
332                 tlsContextOptions.withMtlsFromPath(configCertFile, configKeyFile);
333             } else {
334                 tlsContextOptions.withVerifyPeer(false);
335             }
336 
337             if (configCaFile != null) {
338                 tlsContextOptions.overrideDefaultTrustStoreFromPath(null, configCaFile);
339             }
340 
341             clientsContext = new TlsContext(tlsContextOptions);
342             clientOptionsBuilder.withTlsContext(clientsContext);
343         }
344 
345         if (configUseWebsockets == true) {
346             clientsWebsocketTransform = new Consumer<Mqtt5WebsocketHandshakeTransformArgs>() {
347                 @Override
348                 public void accept(Mqtt5WebsocketHandshakeTransformArgs t) {
349                     t.complete(t.getHttpRequest());
350                 }
351             };
352             clientOptionsBuilder.withWebsocketHandshakeTransform(clientsWebsocketTransform);
353         }
354 
355         ConnectPacketBuilder connectPacketBuilder = new ConnectPacketBuilder();
356         connectPacketBuilder.withKeepAliveIntervalSeconds(30L);
357         connectPacketBuilder.withMaximumPacketSizeBytes(128L * 1024L);
358         connectPacketBuilder.withReceiveMaximum(9L);
359         clientOptionsBuilder.withConnectOptions(connectPacketBuilder.build());
360 
361         for (int i = 0; i < configClients; i++) {
362             connectPacketBuilder.withClientId(configClientID + "_" + Integer.toString(i));
363             clientOptionsBuilder.withConnectOptions(connectPacketBuilder.build());
364             Mqtt5Client newClient = new Mqtt5Client(clientOptionsBuilder.build());
365             clients.add(newClient);
366             ClientsData newData = new ClientsData();
367             clientsData.add(newData);
368         }
369     }
370 
setupOperations()371     public static void setupOperations() {
372         // For now have everything evenly distributed
373         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_STOP);
374         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_SUBSCRIBE);
375         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_UNSUBSCRIBE);
376         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_UNSUBSCRIBE_BAD);
377         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_QOS0);
378         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_QOS1);
379         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0);
380         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1);
381         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS0);
382         clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS1);
383     }
384 
385     // ================================================================================
386     // OPERATIONS
387     // ================================================================================
388 
OperationNull(int clientIdx)389     public static void OperationNull(int clientIdx) {
390         // Do nothing!
391         PrintLog("[OP] Null called for client ID " + clientIdx);
392         return;
393     }
394 
OperationStart(int clientIdx)395     public static void OperationStart(int clientIdx) {
396         Mqtt5Client client = clients.get(clientIdx);
397         if (clientsData.get(clientIdx).isWaitingForOperation == true) {
398             PrintLog("[OP] Start called for client ID " + clientIdx + " but already has operation...");
399             return;
400         }
401         if (client.getIsConnected() == true) {
402             PrintLog("[OP] Start called for client ID " + clientIdx + " but is already connected/started!");
403             return;
404         }
405 
406         clientsData.get(clientIdx).isWaitingForOperation = true;
407         PrintLog("[OP] About to start client ID " + clientIdx);
408         client.start();
409         try {
410             clientsData.get(clientIdx).connectedFuture.get(operationFutureWaitTime, TimeUnit.SECONDS);
411         } catch (Exception ex) {
412             PrintLog("[OP] Start had an exception! Exception: " + ex);
413             ex.printStackTrace();
414             if (configFilePrinter != null) {
415                 ex.printStackTrace(configFilePrinter);
416             }
417         }
418         PrintLog("[OP] Started client ID " + clientIdx);
419         clientsData.get(clientIdx).isWaitingForOperation = false;
420     }
421 
OperationStop(int clientIdx)422     public static void OperationStop(int clientIdx) {
423         Mqtt5Client client = clients.get(clientIdx);
424         if (clientsData.get(clientIdx).isWaitingForOperation == true) {
425             PrintLog("[OP] Stop called for client ID " + clientIdx + " but already has operation...");
426             return;
427         }
428         if (client.getIsConnected() == false) {
429             PrintLog("[OP] Stop called for client ID " + clientIdx + " but is already disconnected/stopped!");
430             return;
431         }
432 
433         clientsData.get(clientIdx).isWaitingForOperation = true;
434         PrintLog("[OP] About to stop client ID " + clientIdx);
435         client.stop();
436         try {
437             clientsData.get(clientIdx).stopFuture.get(operationFutureWaitTime, TimeUnit.SECONDS);
438         } catch (Exception ex) {
439             PrintLog("[OP] Stop had an exception! Exception: " + ex);
440             ex.printStackTrace();
441             if (configFilePrinter != null) {
442                 ex.printStackTrace(configFilePrinter);
443             }
444         }
445         PrintLog("[OP] Stopped client ID " + clientIdx);
446         clientsData.get(clientIdx).isWaitingForOperation = false;
447     }
448 
OperationSubscribe(int clientIdx)449     public static void OperationSubscribe(int clientIdx) {
450         Mqtt5Client client = clients.get(clientIdx);
451         if (clientsData.get(clientIdx).isWaitingForOperation == true) {
452             PrintLog("[OP] Subscribe called for client ID " + clientIdx + " but already has operation...");
453             return;
454         }
455         if (client.getIsConnected() == false) {
456             OperationStart(clientIdx);
457             return;
458         }
459         if (clientsData.get(clientIdx).subscribedToTopics == true) {
460             return;
461         }
462 
463         clientsData.get(clientIdx).isWaitingForOperation = true;
464         PrintLog("[OP] About to subscribe client ID " + clientIdx);
465         SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder();
466         subscribePacketBuilder.withSubscription(clientsData.get(clientIdx).clientId, QOS.AT_LEAST_ONCE);
467         subscribePacketBuilder.withSubscription(clientsData.get(clientIdx).sharedTopic, QOS.AT_LEAST_ONCE);
468         try {
469             client.subscribe(subscribePacketBuilder.build()).get(operationFutureWaitTime, TimeUnit.SECONDS);
470         } catch (Exception ex) {
471             PrintLog("[OP] Subscribe had an exception! Exception: " + ex);
472             ex.printStackTrace();
473             if (configFilePrinter != null) {
474                 ex.printStackTrace(configFilePrinter);
475             }
476         }
477         clientsData.get(clientIdx).subscribedToTopics = true;
478         PrintLog("[OP] Subscribed client ID " + clientIdx);
479         clientsData.get(clientIdx).isWaitingForOperation = false;
480     }
481 
OperationUnsubscribe(int clientIdx)482     public static void OperationUnsubscribe(int clientIdx) {
483         Mqtt5Client client = clients.get(clientIdx);
484         if (clientsData.get(clientIdx).isWaitingForOperation == true) {
485             PrintLog("[OP] Unsubscribe called for client ID " + clientIdx + " but already has operation...");
486             return;
487         }
488         if (client.getIsConnected() == false) {
489             OperationStart(clientIdx);
490             return;
491         }
492         if (clientsData.get(clientIdx).subscribedToTopics == false) {
493             return;
494         }
495 
496         clientsData.get(clientIdx).isWaitingForOperation = true;
497         PrintLog("[OP] About to unsubscribe client ID " + clientIdx);
498         UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder();
499         unsubscribePacketBuilder.withSubscription(clientsData.get(clientIdx).clientId);
500         unsubscribePacketBuilder.withSubscription(clientsData.get(clientIdx).sharedTopic);
501         try {
502             client.unsubscribe(unsubscribePacketBuilder.build()).get(operationFutureWaitTime, TimeUnit.SECONDS);
503         } catch (Exception ex) {
504             PrintLog("[OP] Unsubscribe had an exception! Exception: " + ex);
505             ex.printStackTrace();
506             if (configFilePrinter != null) {
507                 ex.printStackTrace(configFilePrinter);
508             }
509         }
510         clientsData.get(clientIdx).subscribedToTopics = false;
511         PrintLog("[OP] Unsubscribed client ID " + clientIdx);
512         clientsData.get(clientIdx).isWaitingForOperation = false;
513     }
514 
OperationUnsubscribeBad(int clientIdx)515     public static void OperationUnsubscribeBad(int clientIdx) {
516         Mqtt5Client client = clients.get(clientIdx);
517         if (clientsData.get(clientIdx).isWaitingForOperation == true) {
518             PrintLog("[OP] Unsubscribe bad called for client ID " + clientIdx + " but already has operation...");
519             return;
520         }
521         if (client.getIsConnected() == false) {
522             OperationStart(clientIdx);
523             return;
524         }
525 
526         clientsData.get(clientIdx).isWaitingForOperation = true;
527         PrintLog("[OP] About to unsubscribe (bad) client ID " + clientIdx);
528         UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder();
529         unsubscribePacketBuilder.withSubscription("Non_existent_topic_here");
530         try {
531             client.unsubscribe(unsubscribePacketBuilder.build()).get(operationFutureWaitTime, TimeUnit.SECONDS);
532         } catch (Exception ex) {
533             PrintLog("[OP] Unsubscribe (bad) had an exception! Exception: " + ex);
534             ex.printStackTrace();
535             if (configFilePrinter != null) {
536                 ex.printStackTrace(configFilePrinter);
537             }
538         }
539         PrintLog("[OP] Unsubscribed (bad) client ID " + clientIdx);
540         clientsData.get(clientIdx).isWaitingForOperation = false;
541     }
542 
543     // Note: Handles QoS 0, QoS 1, and topic filter based on passed-in input
OperationPublish(int clientIdx, QOS qos, String topic)544     public static void OperationPublish(int clientIdx, QOS qos, String topic) {
545         Mqtt5Client client = clients.get(clientIdx);
546         if (clientsData.get(clientIdx).isWaitingForOperation == true) {
547             PrintLog("[OP] Publish called for client ID " + clientIdx + " with QoS" + qos + " with topic " + topic + " - but already has operation...");
548             return;
549         }
550         if (client.getIsConnected() == false) {
551             OperationStart(clientIdx);
552             return;
553         }
554 
555         clientsData.get(clientIdx).isWaitingForOperation = true;
556         PrintLog("[OP] About to publish client ID " + clientIdx + " with QoS " + qos + " with topic " + topic);
557         PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder();
558         publishPacketBuilder.withQOS(qos);
559 
560         int payload_size = random.nextInt(MAX_PAYLOAD_SIZE);
561         byte[] payload_bytes = new byte[payload_size];
562         for (int i = 0; i < payload_size; i++) {
563             payload_bytes[i] = (byte)random.nextInt(128);
564         }
565         publishPacketBuilder.withPayload(payload_bytes);
566         publishPacketBuilder.withTopic(topic);
567 
568         // Add user properties!
569         List<UserProperty> propertyList = new ArrayList<UserProperty>();
570         propertyList.add(new UserProperty("key", "value"));
571         propertyList.add(new UserProperty("cat", "dog"));
572         propertyList.add(new UserProperty("red", "blue"));
573         publishPacketBuilder.withUserProperties(propertyList);
574 
575         try {
576             client.publish(publishPacketBuilder.build()).get(operationFutureWaitTime, TimeUnit.SECONDS);
577         } catch (Exception ex) {
578             PrintLog("[OP] Publish with QoS " + qos + " with topic " + topic + " had an exception! Exception: " + ex);
579             ex.printStackTrace();
580             if (configFilePrinter != null) {
581                 ex.printStackTrace(configFilePrinter);
582             }
583         }
584         PrintLog("[OP] Published client ID " + clientIdx + " with QoS " + qos + " with topic " + topic);
585         clientsData.get(clientIdx).isWaitingForOperation = false;
586     }
587 
OperationPublishQoS0(int clientIdx)588     public static void OperationPublishQoS0(int clientIdx) {
589         OperationPublish(clientIdx, QOS.AT_MOST_ONCE, "topic1");
590     }
591 
OperationPublishQoS1(int clientIdx)592     public static void OperationPublishQoS1(int clientIdx) {
593         OperationPublish(clientIdx, QOS.AT_LEAST_ONCE, "topic1");
594     }
595 
OperationPublishToSubscribedTopicQoS0(int clientIdx)596     public static void OperationPublishToSubscribedTopicQoS0(int clientIdx) {
597         OperationPublish(clientIdx, QOS.AT_MOST_ONCE, clientsData.get(clientIdx).clientId);
598     }
599 
OperationPublishToSubscribedTopicQoS1(int clientIdx)600     public static void OperationPublishToSubscribedTopicQoS1(int clientIdx) {
601         OperationPublish(clientIdx, QOS.AT_LEAST_ONCE, clientsData.get(clientIdx).clientId);
602     }
603 
OperationPublishToSharedTopicQoS0(int clientIdx)604     public static void OperationPublishToSharedTopicQoS0(int clientIdx) {
605         OperationPublish(clientIdx, QOS.AT_MOST_ONCE, clientsData.get(clientIdx).sharedTopic);
606     }
607 
OperationPublishToSharedTopicQoS1(int clientIdx)608     public static void OperationPublishToSharedTopicQoS1(int clientIdx) {
609         OperationPublish(clientIdx, QOS.AT_LEAST_ONCE, clientsData.get(clientIdx).sharedTopic);
610     }
611 
612     // ================================================================================
613     //  MAIN FUNCTIONS
614     // ================================================================================
615 
PerformRandomOperation()616     public static void PerformRandomOperation() {
617         int randomIdx = random.nextInt(clientsOperationsList.size());
618         for (int i = 0; i < clients.size(); i++) {
619             PerformOperation(clientsOperationsList.get(randomIdx), i);
620             randomIdx = random.nextInt(clientsOperationsList.size());
621         }
622     }
623 
PerformOperation(CANARY_OPERATIONS operation, int clientIdx)624     public static void PerformOperation(CANARY_OPERATIONS operation, int clientIdx) {
625         switch (operation) {
626             case OPERATION_NULL:
627                 OperationNull(clientIdx);
628                 break;
629             case OPERATION_START:
630                 OperationStart(clientIdx);
631                 break;
632             case OPERATION_STOP:
633                 OperationStop(clientIdx);
634                 break;
635             case OPERATION_SUBSCRIBE:
636                 OperationSubscribe(clientIdx);
637                 break;
638             case OPERATION_UNSUBSCRIBE:
639                 OperationUnsubscribe(clientIdx);
640                 break;
641             case OPERATION_UNSUBSCRIBE_BAD:
642                 OperationUnsubscribeBad(clientIdx);
643                 break;
644             case OPERATION_PUBLISH_QOS0:
645                 OperationPublishQoS0(clientIdx);
646                 break;
647             case OPERATION_PUBLISH_QOS1:
648                 OperationPublishQoS1(clientIdx);
649                 break;
650             case OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0:
651                 OperationPublishToSubscribedTopicQoS0(clientIdx);
652                 break;
653             case OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1:
654                 OperationPublishToSubscribedTopicQoS1(clientIdx);
655                 break;
656             case OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS0:
657                 OperationPublishToSharedTopicQoS0(clientIdx);
658                 break;
659             case OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS1:
660                 OperationPublishToSharedTopicQoS1(clientIdx);
661                 break;
662             default:
663                 PrintLog("Client ID " + clientIdx + " ERROR - Unknown operation! Performing null");
664                 OperationNull(clientIdx);
665                 break;
666         }
667     }
668 
main(String[] args)669     public static void main(String[] args) {
670 
671         System.out.println("Setting up Canary...");
672 
673         // Setup
674         // ====================
675         parseCommandLine(args);
676         if (configShowHelp == true) {
677             printUsage();
678             return;
679         }
680 
681         // Some option validation
682         if (configCertFile != null || configKeyFile != null) {
683             if (configCertFile == null) {
684                 System.out.println("Private key file defined but not the certificate!");
685                 exitWithError(1);
686             } else if (configKeyFile == null) {
687                 System.out.println("Certificate file defined but not the private key!");
688                 exitWithError(1);
689             }
690         }
691 
692         // Make all the clients
693         setupClients();
694 
695         // Make all the operations
696         setupOperations();
697 
698         // Capture the current time
699         startDateTime = java.time.LocalDateTime.now();
700 
701         if (configLogAWS == true) {
702             if (configLogFile == null) {
703                 Log.initLoggingToStdout(configLogAWSLevel);
704             } else {
705                 Log.initLoggingToFile(configLogAWSLevel, configLogFile);
706             }
707         }
708         if (configLogFile != null && configLogAWS == false) {
709             try {
710                 configFilePrinter = new PrintWriter(configLogFile, "UTF-8");
711             } catch (Exception ex) {
712                 System.out.println("Could not create non-AWS log file!");
713                 exitWithError(1);
714             }
715         }
716 
717         // Test loop
718         // ====================
719         PrintLog("Starting canary test loop...");
720 
721         boolean done = false;
722         java.time.LocalDateTime nowDateTime = java.time.LocalDateTime.now();
723         long secondsDifference = 0;
724         long operationsExecuted = 0;
725         while (!done) {
726             try {
727                 nowDateTime = java.time.LocalDateTime.now();
728                 secondsDifference = startDateTime.until(java.time.LocalDateTime.now(), ChronoUnit.SECONDS);
729                 if (secondsDifference >= configSeconds) {
730                     done = true;
731                 }
732             } catch (ArithmeticException ex) {
733                 // Time overflow - exit with an error!
734                 exitWithError(1);
735             }
736 
737             operationsExecuted += 1;
738             PerformRandomOperation();
739 
740             try {
741                 Thread.sleep(configTps);
742             } catch (Exception ex) {
743                 PrintLog("[OP] Could not sleep for " + (configTps) + " seconds due to exception! Exception: " + ex);
744                 exitWithError(1);
745             }
746         }
747 
748         PrintLog("Test loop operations complete: Total=" + (operationsExecuted * configClients) + " Cycles=" + operationsExecuted);
749 
750         // Stop all the clients and close them to clean their memory
751         for (int i = 0; i < clients.size(); i++) {
752             OperationStop(i);
753             clients.get(i).close();
754         }
755 
756         // Cleanup
757         // ====================
758         PrintLog("Cleaning up canary...");
759 
760         if (clientsContext != null) {
761             clientsContext.close();
762         }
763         if (clientsEventLoopGroup != null) {
764             clientsEventLoopGroup.close();
765         }
766         if (clientsHostResolver != null) {
767             clientsHostResolver.close();
768         }
769         if (clientsBootstrap != null) {
770             clientsBootstrap.close();
771         }
772         if (clientsSocketOptions != null) {
773             clientsSocketOptions.close();
774         }
775 
776         PrintLog("Waiting for no resources...");
777         CrtResource.waitForNoResources();
778 
779         PrintLog("Finished canary with no errors");
780         exitWithError(0); // exit with no error
781     }
782 }
783