1 /** 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * SPDX-License-Identifier: Apache-2.0. 4 */ 5 package canary.mqtt5; 6 7 import java.io.PrintWriter; 8 import java.net.SocketOption; 9 import java.time.temporal.ChronoUnit; 10 import java.util.ArrayList; 11 import java.util.List; 12 import java.util.Random; 13 import java.util.UUID; 14 import java.util.concurrent.CompletableFuture; 15 import java.util.concurrent.TimeUnit; 16 import java.util.function.Consumer; 17 18 import software.amazon.awssdk.crt.CRT; 19 import software.amazon.awssdk.crt.CrtResource; 20 import software.amazon.awssdk.crt.Log; 21 import software.amazon.awssdk.crt.Log.LogLevel; 22 import software.amazon.awssdk.crt.Log.LogSubject; 23 import software.amazon.awssdk.crt.io.ClientBootstrap; 24 import software.amazon.awssdk.crt.io.EventLoopGroup; 25 import software.amazon.awssdk.crt.io.HostResolver; 26 import software.amazon.awssdk.crt.io.SocketOptions; 27 import software.amazon.awssdk.crt.io.TlsContext; 28 import software.amazon.awssdk.crt.io.TlsContextOptions; 29 import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions.JitterMode; 30 import software.amazon.awssdk.crt.mqtt5.*; 31 import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.Mqtt5ClientOptionsBuilder; 32 import software.amazon.awssdk.crt.mqtt5.packets.*; 33 34 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket.ConnectPacketBuilder; 35 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectPacketBuilder; 36 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectReasonCode; 37 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket.PublishPacketBuilder; 38 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket.PayloadFormatIndicator; 39 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.SubscribePacketBuilder; 40 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.RetainHandlingType; 41 import software.amazon.awssdk.crt.mqtt5.packets.UnsubscribePacket.UnsubscribePacketBuilder; 42 43 public class Mqtt5Canary { 44 static String configEndpoint = "localhost"; 45 static Long configPort = 1883L; 46 static String configCaFile = null; 47 static String configCertFile = null; 48 static String configKeyFile = null; 49 static String configClientID = "MQTT5_Sample_Java_" + UUID.randomUUID().toString(); 50 static boolean configUseWebsockets = false; 51 static boolean configUseTls = false; 52 static Integer configThreads = 8; 53 static Integer configClients = 3; 54 static Integer configTps = 12; 55 static Long configSeconds = 600L; 56 static boolean configShowHelp = false; 57 static boolean configLogStdout = true; 58 static boolean configLogAWS = false; 59 static LogLevel configLogAWSLevel = LogLevel.Debug; 60 static String configLogFile; 61 static PrintWriter configFilePrinter; 62 63 static List<Mqtt5Client> clients = new ArrayList<Mqtt5Client>(); 64 static List<ClientsData> clientsData = new ArrayList<ClientsData>(); 65 static TlsContext clientsContext = null; 66 static EventLoopGroup clientsEventLoopGroup; 67 static HostResolver clientsHostResolver; 68 static ClientBootstrap clientsBootstrap; 69 static SocketOptions clientsSocketOptions; 70 static Consumer<Mqtt5WebsocketHandshakeTransformArgs> clientsWebsocketTransform = null; 71 static CanaryLifecycleEvents clientsLifecycleEvents = new CanaryLifecycleEvents(); 72 static CanaryPublishEvents clientsPublishEvents = new CanaryPublishEvents(); 73 74 static Random random = new Random(); 75 static java.time.LocalDateTime startDateTime; 76 77 static int operationFutureWaitTime = 30; 78 79 private static final int MAX_PAYLOAD_SIZE = 65535; // Use UINT64_MAX for the payload size 80 printUsage()81 static void printUsage() { 82 System.out.println( 83 "Usage:\n" + 84 " --help This message\n"+ 85 " --endpoint MQTT5 endpoint hostname (optional, default=localhost)\n"+ 86 " --port MQTT5 endpoint port to use (optional, default=1883)\n"+ 87 " --ca_file A path to a CA certificate file (optional)\n"+ 88 " --cert A path to a certificate file (optional, will use mTLS if defined)\n" + 89 " --key A path to a private key file (optional, will use mTLS if defined)\n" + 90 " --clientID The ClientID to connect with (optional, default=MQTT5_Sample_Java_<UUID>)\n"+ 91 " --use_websockets If defined, websockets will be used (optional)\n"+ 92 " --use_tls If defined, TLS (or mTLS) will be used (optional)\n"+ 93 "\n"+ 94 " --threads The number of EventLoop group threads to use (optional, default=8)\n"+ 95 " --clients The number of clients to use (optional, default=3, max=50)\n"+ 96 " --tps The number of seconds to wait after performing an operation (optional, default=12)\n"+ 97 " --seconds The number of seconds to run the Canary test (optional, default=600)\n"+ 98 " --log_console If defined, logging will print to stdout (optional, default=true, type=boolean)\n"+ 99 " --log_aws If defined, logging will occur using the AWS Logger (optional, type=boolean) \n"+ 100 " --log_aws_level If defined, logging to AWS Logger will use that log level (optional, default=Debug)\n"+ 101 " --log_file If defined, logging will be written to this file (optional)" 102 ); 103 } 104 parseCommandLine(String[] args)105 static void parseCommandLine(String[] args) { 106 for (int idx = 0; idx < args.length; ++idx) { 107 switch (args[idx]) { 108 case "--help": 109 configShowHelp = true; 110 break; 111 case "--endpoint": 112 if (idx + 1 < args.length) { 113 configEndpoint = args[++idx]; 114 } 115 break; 116 case "--port": 117 if (idx + 1 < args.length) { 118 configPort = Long.parseLong(args[++idx]); 119 } 120 break; 121 case "--ca_file": 122 if (idx + 1 < args.length) { 123 configCaFile = args[++idx]; 124 } 125 break; 126 case "--cert": 127 if (idx + 1 < args.length) { 128 configCertFile = args[++idx]; 129 } 130 break; 131 case "--key": 132 if (idx + 1 < args.length) { 133 configKeyFile = args[++idx]; 134 } 135 break; 136 case "--clientID": 137 if (idx + 1 < args.length) { 138 configClientID = args[++idx]; 139 } 140 break; 141 case "--use_websockets": 142 configUseWebsockets = true; 143 break; 144 case "--use_tls": 145 configUseTls = true; 146 break; 147 case "--threads": 148 if (idx + 1 < args.length) { 149 configThreads = Integer.parseInt(args[++idx]); 150 } 151 break; 152 case "--clients": 153 if (idx + 1 < args.length) { 154 configClients = Integer.parseInt(args[++idx]); 155 } 156 break; 157 case "--tps": 158 if (idx + 1 < args.length) { 159 int tps = Integer.parseInt(args[++idx]); 160 if (tps == 0) { 161 configTps = 0; 162 } else { 163 configTps = 1000 / tps; 164 } 165 } 166 break; 167 case "--seconds": 168 if (idx + 1 < args.length) { 169 configSeconds = Long.parseLong(args[++idx]); 170 } 171 break; 172 case "--log_console": 173 if (idx + 1 < args.length) { 174 configLogStdout = Boolean.parseBoolean(args[++idx]); 175 } 176 break; 177 case "--log_aws": 178 if (idx + 1 < args.length) { 179 configLogAWS = Boolean.parseBoolean(args[++idx]); 180 } 181 break; 182 case "--log_aws_level": 183 if (idx + 1 < args.length) { 184 configLogAWSLevel = LogLevel.valueOf(args[++idx]); 185 } 186 break; 187 case "--log_file": 188 if (idx + 1 < args.length) { 189 configLogFile = args[++idx]; 190 } 191 break; 192 default: 193 System.out.println("Unrecognized argument: " + args[idx]); 194 } 195 } 196 } 197 PrintLog(String message)198 static void PrintLog(String message) { 199 if (configLogStdout == true) { 200 System.out.println("[CANARY] " + message); 201 } 202 if (configFilePrinter != null) { 203 configFilePrinter.println(message); 204 } 205 if (configLogAWS == true) { 206 Log.log(configLogAWSLevel, LogSubject.MqttClient, "[CANARY] " + message); 207 } 208 } 209 exitWithError(int errorCode)210 static void exitWithError(int errorCode) { 211 if (configFilePrinter != null) { 212 configFilePrinter.close(); 213 } 214 System.exit(errorCode); 215 } 216 217 static final class ClientsData { 218 CompletableFuture<Void> connectedFuture = new CompletableFuture<>(); 219 CompletableFuture<Void> stopFuture = new CompletableFuture<>(); 220 String sharedTopic = "test/shared_topic"; 221 String clientId = ""; 222 boolean subscribedToTopics = false; 223 boolean isWaitingForOperation = false; 224 } 225 226 enum CANARY_OPERATIONS { 227 OPERATION_NULL, 228 OPERATION_START, 229 OPERATION_STOP, 230 OPERATION_SUBSCRIBE, 231 OPERATION_UNSUBSCRIBE, 232 OPERATION_UNSUBSCRIBE_BAD, 233 OPERATION_PUBLISH_QOS0, 234 OPERATION_PUBLISH_QOS1, 235 OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0, 236 OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1, 237 OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS0, 238 OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS1, 239 } 240 static List<CANARY_OPERATIONS> clientsOperationsList = new ArrayList<CANARY_OPERATIONS>(); 241 242 static final class CanaryLifecycleEvents implements Mqtt5ClientOptions.LifecycleEvents { 243 @Override onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn)244 public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) {} 245 246 @Override onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn)247 public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) { 248 ConnAckPacket connAckData = onConnectionSuccessReturn.getConnAckPacket(); 249 NegotiatedSettings negotiatedSettings = onConnectionSuccessReturn.getNegotiatedSettings(); 250 int clientIdx = clients.indexOf(client); 251 252 if (connAckData.getReasonCode() == ConnAckPacket.ConnectReasonCode.SUCCESS) { 253 PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection success..."); 254 clientsData.get(clientIdx).clientId = negotiatedSettings.getAssignedClientID(); 255 clientsData.get(clientIdx).connectedFuture.complete(null); 256 clientsData.get(clientIdx).stopFuture = new CompletableFuture<>(); 257 } else { 258 PrintLog("[Lifecycle event] Client ID " + clientIdx + " ConnAckPacket code: " + connAckData.getReasonCode().toString()); 259 clientsData.get(clientIdx).connectedFuture.completeExceptionally(new Exception("Connection failure")); 260 } 261 } 262 263 @Override onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn)264 public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { 265 int clientIdx = clients.indexOf(client); 266 PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection failed with errorCode : " + onConnectionFailureReturn.getErrorCode()); 267 clientsData.get(clientIdx).connectedFuture.completeExceptionally(new Exception("Connection failure")); 268 clientsData.get(clientIdx).subscribedToTopics = false; 269 } 270 271 @Override onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn)272 public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) { 273 int clientIdx = clients.indexOf(client); 274 PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection disconnected..."); 275 PrintLog("[Lifecycle event] Client ID " + clientIdx + " Disconnection error code: " + Integer.toString(onDisconnectionReturn.getErrorCode())); 276 clientsData.get(clientIdx).connectedFuture = new CompletableFuture<>(); 277 clientsData.get(clientIdx).subscribedToTopics = false; 278 } 279 280 @Override onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn)281 public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) { 282 int clientIdx = clients.indexOf(client); 283 PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection stopped..."); 284 clientsData.get(clientIdx).connectedFuture = new CompletableFuture<>(); 285 clientsData.get(clientIdx).stopFuture.complete(null); 286 clientsData.get(clientIdx).subscribedToTopics = false; 287 } 288 } 289 290 static final class CanaryPublishEvents implements Mqtt5ClientOptions.PublishEvents { 291 @Override onMessageReceived(Mqtt5Client client, PublishReturn publishReturn)292 public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { 293 PublishPacket publishPacket = publishReturn.getPublishPacket(); 294 int clientIdx = clients.indexOf(client); 295 PrintLog("[Publish event] Client ID " + clientIdx + " message received:\n" + 296 " Topic: " + publishPacket.getTopic() + "\n"); 297 } 298 } 299 300 // ================================================================================ 301 // SETUP FUNCTIONS 302 // ================================================================================ 303 setupClients()304 public static void setupClients() { 305 // Create the builder 306 Mqtt5ClientOptionsBuilder clientOptionsBuilder = new Mqtt5ClientOptionsBuilder(configEndpoint, configPort); 307 308 clientsEventLoopGroup = new EventLoopGroup(configThreads); 309 clientsHostResolver = new HostResolver(clientsEventLoopGroup); 310 clientsBootstrap = new ClientBootstrap(clientsEventLoopGroup, clientsHostResolver); 311 clientOptionsBuilder.withBootstrap(clientsBootstrap); 312 313 clientsSocketOptions = new SocketOptions(); 314 clientsSocketOptions.type = SocketOptions.SocketType.STREAM; 315 clientsSocketOptions.connectTimeoutMs = 60000; 316 clientsSocketOptions.keepAliveTimeoutSecs = 0; 317 clientsSocketOptions.keepAliveIntervalSecs = 0; 318 clientOptionsBuilder.withSocketOptions(clientsSocketOptions); 319 320 clientOptionsBuilder.withSessionBehavior(Mqtt5ClientOptions.ClientSessionBehavior.CLEAN); 321 clientOptionsBuilder.withLifecycleEvents(clientsLifecycleEvents); 322 clientOptionsBuilder.withRetryJitterMode(JitterMode.None); 323 clientOptionsBuilder.withMinReconnectDelayMs(60000L); 324 clientOptionsBuilder.withMaxReconnectDelayMs(120000L); 325 clientOptionsBuilder.withMinConnectedTimeToResetReconnectDelayMs(30000L); 326 clientOptionsBuilder.withPingTimeoutMs(20000L); 327 clientOptionsBuilder.withPublishEvents(clientsPublishEvents); 328 329 if (configUseTls == true || configCertFile != null || configKeyFile != null) { 330 TlsContextOptions tlsContextOptions = TlsContextOptions.createDefaultClient(); 331 if (configCertFile != null || configKeyFile != null) { 332 tlsContextOptions.withMtlsFromPath(configCertFile, configKeyFile); 333 } else { 334 tlsContextOptions.withVerifyPeer(false); 335 } 336 337 if (configCaFile != null) { 338 tlsContextOptions.overrideDefaultTrustStoreFromPath(null, configCaFile); 339 } 340 341 clientsContext = new TlsContext(tlsContextOptions); 342 clientOptionsBuilder.withTlsContext(clientsContext); 343 } 344 345 if (configUseWebsockets == true) { 346 clientsWebsocketTransform = new Consumer<Mqtt5WebsocketHandshakeTransformArgs>() { 347 @Override 348 public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { 349 t.complete(t.getHttpRequest()); 350 } 351 }; 352 clientOptionsBuilder.withWebsocketHandshakeTransform(clientsWebsocketTransform); 353 } 354 355 ConnectPacketBuilder connectPacketBuilder = new ConnectPacketBuilder(); 356 connectPacketBuilder.withKeepAliveIntervalSeconds(30L); 357 connectPacketBuilder.withMaximumPacketSizeBytes(128L * 1024L); 358 connectPacketBuilder.withReceiveMaximum(9L); 359 clientOptionsBuilder.withConnectOptions(connectPacketBuilder.build()); 360 361 for (int i = 0; i < configClients; i++) { 362 connectPacketBuilder.withClientId(configClientID + "_" + Integer.toString(i)); 363 clientOptionsBuilder.withConnectOptions(connectPacketBuilder.build()); 364 Mqtt5Client newClient = new Mqtt5Client(clientOptionsBuilder.build()); 365 clients.add(newClient); 366 ClientsData newData = new ClientsData(); 367 clientsData.add(newData); 368 } 369 } 370 setupOperations()371 public static void setupOperations() { 372 // For now have everything evenly distributed 373 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_STOP); 374 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_SUBSCRIBE); 375 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_UNSUBSCRIBE); 376 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_UNSUBSCRIBE_BAD); 377 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_QOS0); 378 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_QOS1); 379 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0); 380 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1); 381 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS0); 382 clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS1); 383 } 384 385 // ================================================================================ 386 // OPERATIONS 387 // ================================================================================ 388 OperationNull(int clientIdx)389 public static void OperationNull(int clientIdx) { 390 // Do nothing! 391 PrintLog("[OP] Null called for client ID " + clientIdx); 392 return; 393 } 394 OperationStart(int clientIdx)395 public static void OperationStart(int clientIdx) { 396 Mqtt5Client client = clients.get(clientIdx); 397 if (clientsData.get(clientIdx).isWaitingForOperation == true) { 398 PrintLog("[OP] Start called for client ID " + clientIdx + " but already has operation..."); 399 return; 400 } 401 if (client.getIsConnected() == true) { 402 PrintLog("[OP] Start called for client ID " + clientIdx + " but is already connected/started!"); 403 return; 404 } 405 406 clientsData.get(clientIdx).isWaitingForOperation = true; 407 PrintLog("[OP] About to start client ID " + clientIdx); 408 client.start(); 409 try { 410 clientsData.get(clientIdx).connectedFuture.get(operationFutureWaitTime, TimeUnit.SECONDS); 411 } catch (Exception ex) { 412 PrintLog("[OP] Start had an exception! Exception: " + ex); 413 ex.printStackTrace(); 414 if (configFilePrinter != null) { 415 ex.printStackTrace(configFilePrinter); 416 } 417 } 418 PrintLog("[OP] Started client ID " + clientIdx); 419 clientsData.get(clientIdx).isWaitingForOperation = false; 420 } 421 OperationStop(int clientIdx)422 public static void OperationStop(int clientIdx) { 423 Mqtt5Client client = clients.get(clientIdx); 424 if (clientsData.get(clientIdx).isWaitingForOperation == true) { 425 PrintLog("[OP] Stop called for client ID " + clientIdx + " but already has operation..."); 426 return; 427 } 428 if (client.getIsConnected() == false) { 429 PrintLog("[OP] Stop called for client ID " + clientIdx + " but is already disconnected/stopped!"); 430 return; 431 } 432 433 clientsData.get(clientIdx).isWaitingForOperation = true; 434 PrintLog("[OP] About to stop client ID " + clientIdx); 435 client.stop(); 436 try { 437 clientsData.get(clientIdx).stopFuture.get(operationFutureWaitTime, TimeUnit.SECONDS); 438 } catch (Exception ex) { 439 PrintLog("[OP] Stop had an exception! Exception: " + ex); 440 ex.printStackTrace(); 441 if (configFilePrinter != null) { 442 ex.printStackTrace(configFilePrinter); 443 } 444 } 445 PrintLog("[OP] Stopped client ID " + clientIdx); 446 clientsData.get(clientIdx).isWaitingForOperation = false; 447 } 448 OperationSubscribe(int clientIdx)449 public static void OperationSubscribe(int clientIdx) { 450 Mqtt5Client client = clients.get(clientIdx); 451 if (clientsData.get(clientIdx).isWaitingForOperation == true) { 452 PrintLog("[OP] Subscribe called for client ID " + clientIdx + " but already has operation..."); 453 return; 454 } 455 if (client.getIsConnected() == false) { 456 OperationStart(clientIdx); 457 return; 458 } 459 if (clientsData.get(clientIdx).subscribedToTopics == true) { 460 return; 461 } 462 463 clientsData.get(clientIdx).isWaitingForOperation = true; 464 PrintLog("[OP] About to subscribe client ID " + clientIdx); 465 SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(); 466 subscribePacketBuilder.withSubscription(clientsData.get(clientIdx).clientId, QOS.AT_LEAST_ONCE); 467 subscribePacketBuilder.withSubscription(clientsData.get(clientIdx).sharedTopic, QOS.AT_LEAST_ONCE); 468 try { 469 client.subscribe(subscribePacketBuilder.build()).get(operationFutureWaitTime, TimeUnit.SECONDS); 470 } catch (Exception ex) { 471 PrintLog("[OP] Subscribe had an exception! Exception: " + ex); 472 ex.printStackTrace(); 473 if (configFilePrinter != null) { 474 ex.printStackTrace(configFilePrinter); 475 } 476 } 477 clientsData.get(clientIdx).subscribedToTopics = true; 478 PrintLog("[OP] Subscribed client ID " + clientIdx); 479 clientsData.get(clientIdx).isWaitingForOperation = false; 480 } 481 OperationUnsubscribe(int clientIdx)482 public static void OperationUnsubscribe(int clientIdx) { 483 Mqtt5Client client = clients.get(clientIdx); 484 if (clientsData.get(clientIdx).isWaitingForOperation == true) { 485 PrintLog("[OP] Unsubscribe called for client ID " + clientIdx + " but already has operation..."); 486 return; 487 } 488 if (client.getIsConnected() == false) { 489 OperationStart(clientIdx); 490 return; 491 } 492 if (clientsData.get(clientIdx).subscribedToTopics == false) { 493 return; 494 } 495 496 clientsData.get(clientIdx).isWaitingForOperation = true; 497 PrintLog("[OP] About to unsubscribe client ID " + clientIdx); 498 UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder(); 499 unsubscribePacketBuilder.withSubscription(clientsData.get(clientIdx).clientId); 500 unsubscribePacketBuilder.withSubscription(clientsData.get(clientIdx).sharedTopic); 501 try { 502 client.unsubscribe(unsubscribePacketBuilder.build()).get(operationFutureWaitTime, TimeUnit.SECONDS); 503 } catch (Exception ex) { 504 PrintLog("[OP] Unsubscribe had an exception! Exception: " + ex); 505 ex.printStackTrace(); 506 if (configFilePrinter != null) { 507 ex.printStackTrace(configFilePrinter); 508 } 509 } 510 clientsData.get(clientIdx).subscribedToTopics = false; 511 PrintLog("[OP] Unsubscribed client ID " + clientIdx); 512 clientsData.get(clientIdx).isWaitingForOperation = false; 513 } 514 OperationUnsubscribeBad(int clientIdx)515 public static void OperationUnsubscribeBad(int clientIdx) { 516 Mqtt5Client client = clients.get(clientIdx); 517 if (clientsData.get(clientIdx).isWaitingForOperation == true) { 518 PrintLog("[OP] Unsubscribe bad called for client ID " + clientIdx + " but already has operation..."); 519 return; 520 } 521 if (client.getIsConnected() == false) { 522 OperationStart(clientIdx); 523 return; 524 } 525 526 clientsData.get(clientIdx).isWaitingForOperation = true; 527 PrintLog("[OP] About to unsubscribe (bad) client ID " + clientIdx); 528 UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder(); 529 unsubscribePacketBuilder.withSubscription("Non_existent_topic_here"); 530 try { 531 client.unsubscribe(unsubscribePacketBuilder.build()).get(operationFutureWaitTime, TimeUnit.SECONDS); 532 } catch (Exception ex) { 533 PrintLog("[OP] Unsubscribe (bad) had an exception! Exception: " + ex); 534 ex.printStackTrace(); 535 if (configFilePrinter != null) { 536 ex.printStackTrace(configFilePrinter); 537 } 538 } 539 PrintLog("[OP] Unsubscribed (bad) client ID " + clientIdx); 540 clientsData.get(clientIdx).isWaitingForOperation = false; 541 } 542 543 // Note: Handles QoS 0, QoS 1, and topic filter based on passed-in input OperationPublish(int clientIdx, QOS qos, String topic)544 public static void OperationPublish(int clientIdx, QOS qos, String topic) { 545 Mqtt5Client client = clients.get(clientIdx); 546 if (clientsData.get(clientIdx).isWaitingForOperation == true) { 547 PrintLog("[OP] Publish called for client ID " + clientIdx + " with QoS" + qos + " with topic " + topic + " - but already has operation..."); 548 return; 549 } 550 if (client.getIsConnected() == false) { 551 OperationStart(clientIdx); 552 return; 553 } 554 555 clientsData.get(clientIdx).isWaitingForOperation = true; 556 PrintLog("[OP] About to publish client ID " + clientIdx + " with QoS " + qos + " with topic " + topic); 557 PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); 558 publishPacketBuilder.withQOS(qos); 559 560 int payload_size = random.nextInt(MAX_PAYLOAD_SIZE); 561 byte[] payload_bytes = new byte[payload_size]; 562 for (int i = 0; i < payload_size; i++) { 563 payload_bytes[i] = (byte)random.nextInt(128); 564 } 565 publishPacketBuilder.withPayload(payload_bytes); 566 publishPacketBuilder.withTopic(topic); 567 568 // Add user properties! 569 List<UserProperty> propertyList = new ArrayList<UserProperty>(); 570 propertyList.add(new UserProperty("key", "value")); 571 propertyList.add(new UserProperty("cat", "dog")); 572 propertyList.add(new UserProperty("red", "blue")); 573 publishPacketBuilder.withUserProperties(propertyList); 574 575 try { 576 client.publish(publishPacketBuilder.build()).get(operationFutureWaitTime, TimeUnit.SECONDS); 577 } catch (Exception ex) { 578 PrintLog("[OP] Publish with QoS " + qos + " with topic " + topic + " had an exception! Exception: " + ex); 579 ex.printStackTrace(); 580 if (configFilePrinter != null) { 581 ex.printStackTrace(configFilePrinter); 582 } 583 } 584 PrintLog("[OP] Published client ID " + clientIdx + " with QoS " + qos + " with topic " + topic); 585 clientsData.get(clientIdx).isWaitingForOperation = false; 586 } 587 OperationPublishQoS0(int clientIdx)588 public static void OperationPublishQoS0(int clientIdx) { 589 OperationPublish(clientIdx, QOS.AT_MOST_ONCE, "topic1"); 590 } 591 OperationPublishQoS1(int clientIdx)592 public static void OperationPublishQoS1(int clientIdx) { 593 OperationPublish(clientIdx, QOS.AT_LEAST_ONCE, "topic1"); 594 } 595 OperationPublishToSubscribedTopicQoS0(int clientIdx)596 public static void OperationPublishToSubscribedTopicQoS0(int clientIdx) { 597 OperationPublish(clientIdx, QOS.AT_MOST_ONCE, clientsData.get(clientIdx).clientId); 598 } 599 OperationPublishToSubscribedTopicQoS1(int clientIdx)600 public static void OperationPublishToSubscribedTopicQoS1(int clientIdx) { 601 OperationPublish(clientIdx, QOS.AT_LEAST_ONCE, clientsData.get(clientIdx).clientId); 602 } 603 OperationPublishToSharedTopicQoS0(int clientIdx)604 public static void OperationPublishToSharedTopicQoS0(int clientIdx) { 605 OperationPublish(clientIdx, QOS.AT_MOST_ONCE, clientsData.get(clientIdx).sharedTopic); 606 } 607 OperationPublishToSharedTopicQoS1(int clientIdx)608 public static void OperationPublishToSharedTopicQoS1(int clientIdx) { 609 OperationPublish(clientIdx, QOS.AT_LEAST_ONCE, clientsData.get(clientIdx).sharedTopic); 610 } 611 612 // ================================================================================ 613 // MAIN FUNCTIONS 614 // ================================================================================ 615 PerformRandomOperation()616 public static void PerformRandomOperation() { 617 int randomIdx = random.nextInt(clientsOperationsList.size()); 618 for (int i = 0; i < clients.size(); i++) { 619 PerformOperation(clientsOperationsList.get(randomIdx), i); 620 randomIdx = random.nextInt(clientsOperationsList.size()); 621 } 622 } 623 PerformOperation(CANARY_OPERATIONS operation, int clientIdx)624 public static void PerformOperation(CANARY_OPERATIONS operation, int clientIdx) { 625 switch (operation) { 626 case OPERATION_NULL: 627 OperationNull(clientIdx); 628 break; 629 case OPERATION_START: 630 OperationStart(clientIdx); 631 break; 632 case OPERATION_STOP: 633 OperationStop(clientIdx); 634 break; 635 case OPERATION_SUBSCRIBE: 636 OperationSubscribe(clientIdx); 637 break; 638 case OPERATION_UNSUBSCRIBE: 639 OperationUnsubscribe(clientIdx); 640 break; 641 case OPERATION_UNSUBSCRIBE_BAD: 642 OperationUnsubscribeBad(clientIdx); 643 break; 644 case OPERATION_PUBLISH_QOS0: 645 OperationPublishQoS0(clientIdx); 646 break; 647 case OPERATION_PUBLISH_QOS1: 648 OperationPublishQoS1(clientIdx); 649 break; 650 case OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0: 651 OperationPublishToSubscribedTopicQoS0(clientIdx); 652 break; 653 case OPERATION_PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1: 654 OperationPublishToSubscribedTopicQoS1(clientIdx); 655 break; 656 case OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS0: 657 OperationPublishToSharedTopicQoS0(clientIdx); 658 break; 659 case OPERATION_PUBLISH_TO_SHARED_TOPIC_QOS1: 660 OperationPublishToSharedTopicQoS1(clientIdx); 661 break; 662 default: 663 PrintLog("Client ID " + clientIdx + " ERROR - Unknown operation! Performing null"); 664 OperationNull(clientIdx); 665 break; 666 } 667 } 668 main(String[] args)669 public static void main(String[] args) { 670 671 System.out.println("Setting up Canary..."); 672 673 // Setup 674 // ==================== 675 parseCommandLine(args); 676 if (configShowHelp == true) { 677 printUsage(); 678 return; 679 } 680 681 // Some option validation 682 if (configCertFile != null || configKeyFile != null) { 683 if (configCertFile == null) { 684 System.out.println("Private key file defined but not the certificate!"); 685 exitWithError(1); 686 } else if (configKeyFile == null) { 687 System.out.println("Certificate file defined but not the private key!"); 688 exitWithError(1); 689 } 690 } 691 692 // Make all the clients 693 setupClients(); 694 695 // Make all the operations 696 setupOperations(); 697 698 // Capture the current time 699 startDateTime = java.time.LocalDateTime.now(); 700 701 if (configLogAWS == true) { 702 if (configLogFile == null) { 703 Log.initLoggingToStdout(configLogAWSLevel); 704 } else { 705 Log.initLoggingToFile(configLogAWSLevel, configLogFile); 706 } 707 } 708 if (configLogFile != null && configLogAWS == false) { 709 try { 710 configFilePrinter = new PrintWriter(configLogFile, "UTF-8"); 711 } catch (Exception ex) { 712 System.out.println("Could not create non-AWS log file!"); 713 exitWithError(1); 714 } 715 } 716 717 // Test loop 718 // ==================== 719 PrintLog("Starting canary test loop..."); 720 721 boolean done = false; 722 java.time.LocalDateTime nowDateTime = java.time.LocalDateTime.now(); 723 long secondsDifference = 0; 724 long operationsExecuted = 0; 725 while (!done) { 726 try { 727 nowDateTime = java.time.LocalDateTime.now(); 728 secondsDifference = startDateTime.until(java.time.LocalDateTime.now(), ChronoUnit.SECONDS); 729 if (secondsDifference >= configSeconds) { 730 done = true; 731 } 732 } catch (ArithmeticException ex) { 733 // Time overflow - exit with an error! 734 exitWithError(1); 735 } 736 737 operationsExecuted += 1; 738 PerformRandomOperation(); 739 740 try { 741 Thread.sleep(configTps); 742 } catch (Exception ex) { 743 PrintLog("[OP] Could not sleep for " + (configTps) + " seconds due to exception! Exception: " + ex); 744 exitWithError(1); 745 } 746 } 747 748 PrintLog("Test loop operations complete: Total=" + (operationsExecuted * configClients) + " Cycles=" + operationsExecuted); 749 750 // Stop all the clients and close them to clean their memory 751 for (int i = 0; i < clients.size(); i++) { 752 OperationStop(i); 753 clients.get(i).close(); 754 } 755 756 // Cleanup 757 // ==================== 758 PrintLog("Cleaning up canary..."); 759 760 if (clientsContext != null) { 761 clientsContext.close(); 762 } 763 if (clientsEventLoopGroup != null) { 764 clientsEventLoopGroup.close(); 765 } 766 if (clientsHostResolver != null) { 767 clientsHostResolver.close(); 768 } 769 if (clientsBootstrap != null) { 770 clientsBootstrap.close(); 771 } 772 if (clientsSocketOptions != null) { 773 clientsSocketOptions.close(); 774 } 775 776 PrintLog("Waiting for no resources..."); 777 CrtResource.waitForNoResources(); 778 779 PrintLog("Finished canary with no errors"); 780 exitWithError(0); // exit with no error 781 } 782 } 783