1 /** 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * SPDX-License-Identifier: Apache-2.0. 4 */ 5 package com.example.mqtt5; 6 7 import java.util.ArrayList; 8 import java.util.List; 9 import java.util.UUID; 10 import java.util.concurrent.CompletableFuture; 11 import java.util.concurrent.TimeUnit; 12 import java.util.function.Consumer; 13 14 import software.amazon.awssdk.crt.CrtResource; 15 import software.amazon.awssdk.crt.io.TlsContext; 16 import software.amazon.awssdk.crt.io.TlsContextOptions; 17 import software.amazon.awssdk.crt.mqtt5.*; 18 import software.amazon.awssdk.crt.mqtt5.packets.*; 19 20 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket.ConnectPacketBuilder; 21 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectPacketBuilder; 22 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectReasonCode; 23 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket.PublishPacketBuilder; 24 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket.PayloadFormatIndicator; 25 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.SubscribePacketBuilder; 26 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.RetainHandlingType; 27 import software.amazon.awssdk.crt.mqtt5.packets.UnsubscribePacket.UnsubscribePacketBuilder; 28 29 public class Mqtt5Sample { 30 31 static String endpoint = "localhost"; 32 static Long port = 1883L; 33 static String clientID = "MQTT5_Sample_Java_" + UUID.randomUUID().toString(); 34 static boolean useWebsockets = false; 35 static boolean useTls = false; 36 static boolean showHelp = false; 37 printUsage()38 static void printUsage() { 39 System.out.println( 40 "Usage:\n" + 41 " --help This message\n"+ 42 " --endpoint MQTT5 endpoint hostname (optional, default=localhost)\n"+ 43 " --port MQTT5 endpoint port to use (optional, default=1883)\n"+ 44 " --clientID The ClientID to connect with (optional, default=MQTT5_Sample_Java_<UUID>)\n"+ 45 " --use_websockets If defined, websockets will be used (optional)\n"+ 46 " --use_tls If defined, TLS will be used (optional)" 47 ); 48 } 49 parseCommandLine(String[] args)50 static void parseCommandLine(String[] args) { 51 for (int idx = 0; idx < args.length; ++idx) { 52 switch (args[idx]) { 53 case "--help": 54 showHelp = true; 55 break; 56 case "--endpoint": 57 if (idx + 1 < args.length) { 58 endpoint = args[++idx]; 59 } 60 break; 61 case "--port": 62 if (idx + 1 < args.length) { 63 port = Long.parseLong(args[++idx]); 64 } 65 break; 66 case "--clientID": 67 if (idx + 1 < args.length) { 68 clientID = args[++idx]; 69 } 70 break; 71 case "--use_websockets": 72 useWebsockets = true; 73 break; 74 case "--use_tls": 75 useTls = true; 76 break; 77 default: 78 System.out.println("Unrecognized argument: " + args[idx]); 79 } 80 } 81 } 82 83 84 static final class SampleLifecycleEvents implements Mqtt5ClientOptions.LifecycleEvents { 85 CompletableFuture<Void> connectedFuture = new CompletableFuture<>(); 86 CompletableFuture<Void> stopFuture = new CompletableFuture<>(); 87 88 @Override onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn)89 public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) { 90 System.out.println("[Lifecycle event] Client attempting connection..."); 91 } 92 93 @Override onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn)94 public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) { 95 System.out.println("[Lifecycle event] Client connection success..."); 96 connectedFuture.complete(null); 97 } 98 99 @Override onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn)100 public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { 101 System.out.println("[Lifecycle event] Client connection failed..."); 102 connectedFuture.completeExceptionally(new Exception("Connection failure")); 103 } 104 105 @Override onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn)106 public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) { 107 System.out.println("[Lifecycle event] Client disconnected..."); 108 } 109 110 @Override onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn)111 public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) { 112 System.out.println("[Lifecycle event] Client stopped..."); 113 stopFuture.complete(null); 114 } 115 } 116 117 static final class SamplePublishEvents implements Mqtt5ClientOptions.PublishEvents { 118 @Override onMessageReceived(Mqtt5Client client, PublishReturn publishReturn)119 public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { 120 PublishPacket publishPacket = publishReturn.getPublishPacket(); 121 System.out.println( 122 "Message received:\n"+ 123 " Topic: " + publishPacket.getTopic() + "\n" + 124 " Payload: " + new String(publishPacket.getPayload()) 125 ); 126 if (publishPacket.getUserProperties() != null) { 127 List<UserProperty> userProperties = publishPacket.getUserProperties(); 128 for (int i = 0; i < userProperties.size(); i++) { 129 System.out.println(" Property: " + userProperties.get(i).key + " - " + userProperties.get(i).value); 130 } 131 } 132 } 133 } 134 main(String[] args)135 public static void main(String[] args) { 136 137 parseCommandLine(args); 138 if (showHelp) { 139 printUsage(); 140 return; 141 } 142 143 SampleLifecycleEvents sampleLifecycleEvents = new SampleLifecycleEvents(); 144 SamplePublishEvents samplePublishEvents = new SamplePublishEvents(); 145 Consumer<Mqtt5WebsocketHandshakeTransformArgs> websocketTransform = null; 146 TlsContext tlsContext = null; 147 148 try { 149 Mqtt5ClientOptions.Mqtt5ClientOptionsBuilder optionsBuilder = new Mqtt5ClientOptions.Mqtt5ClientOptionsBuilder(endpoint, port); 150 151 optionsBuilder.withLifecycleEvents(sampleLifecycleEvents); 152 optionsBuilder.withPublishEvents(samplePublishEvents); 153 154 if (useWebsockets == true) { 155 websocketTransform = new Consumer<Mqtt5WebsocketHandshakeTransformArgs>() { 156 @Override 157 public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { 158 t.complete(t.getHttpRequest()); 159 } 160 }; 161 optionsBuilder.withWebsocketHandshakeTransform(websocketTransform); 162 } 163 if (useTls == true) { 164 try (TlsContextOptions tlsOptions = TlsContextOptions.createDefaultClient()) { 165 tlsOptions.withVerifyPeer(false); 166 tlsContext = new TlsContext(tlsOptions); 167 optionsBuilder.withTlsContext(tlsContext); 168 } 169 } 170 171 ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder(); 172 connectBuilder.withClientId(clientID); 173 // Add a will 174 PublishPacketBuilder willBuilder = new PublishPacketBuilder(); 175 willBuilder.withTopic("test/topic/will"); 176 willBuilder.withPayload("Goodbye".getBytes()); 177 willBuilder.withQOS(QOS.AT_MOST_ONCE); 178 connectBuilder.withWill(willBuilder.build()); 179 // Add the connection options 180 optionsBuilder.withConnectOptions(connectBuilder.build()); 181 182 try (Mqtt5Client client = new Mqtt5Client(optionsBuilder.build())) { 183 // Connect and make sure it is successful 184 try { 185 client.start(); 186 sampleLifecycleEvents.connectedFuture.get(60, TimeUnit.SECONDS); 187 System.out.println("Connection status: " + client.getIsConnected()); 188 } catch (Exception ex) { 189 System.out.println("Could not connect! Exception: " + ex.toString()); 190 System.exit(1); 191 return; 192 } 193 194 // Subscribe 195 SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder(); 196 subscribePacketBuilder.withSubscription("test/topic", QOS.AT_LEAST_ONCE); 197 // Make sure it is successful 198 try { 199 SubAckPacket subAckPacket = client.subscribe(subscribePacketBuilder.build()).get(60, TimeUnit.SECONDS); 200 if (subAckPacket.getReasonCodes().get(0) != SubAckPacket.SubAckReasonCode.GRANTED_QOS_1) { 201 System.out.println("Could not subscribe! Error code: " + subAckPacket.getReasonCodes().get(0).toString()); 202 System.exit(1); 203 return; 204 } 205 } catch (Exception ex) { 206 System.out.println("Could not subscribe! Exception: " + ex.toString()); 207 System.exit(1); 208 return; 209 } 210 211 // Publish 212 PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); 213 publishPacketBuilder.withPayload("Hello World!".getBytes()); 214 publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE); 215 publishPacketBuilder.withTopic("test/topic"); 216 // Add user properties 217 List<UserProperty> publishProperties = new ArrayList<UserProperty>(); 218 publishProperties.add(new UserProperty("Red", "Blue")); 219 publishProperties.add(new UserProperty("key", "value")); 220 publishPacketBuilder.withUserProperties(publishProperties); 221 // Publish 10 times and make sure they are successful 222 try { 223 for (int i = 0; i < 10; i++) { 224 client.publish(publishPacketBuilder.build()).get(60, TimeUnit.SECONDS); 225 Thread.sleep(100); 226 } 227 } catch (Exception ex) { 228 System.out.println("Could not publish! Exception: " + ex.toString()); 229 System.exit(1); 230 return; 231 } 232 233 // Unsubscribe 234 UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder(); 235 unsubscribePacketBuilder.withSubscription("test/topic"); 236 // Make sure it is successful 237 try { 238 client.unsubscribe(unsubscribePacketBuilder.build()).get(60, TimeUnit.SECONDS); 239 } catch (Exception ex) { 240 System.out.println("Could not unsubscribe! Exception: " + ex.toString()); 241 System.exit(1); 242 return; 243 } 244 245 // Disconnect 246 DisconnectPacketBuilder disconnectPacketBuilder = new DisconnectPacketBuilder(); 247 disconnectPacketBuilder.withReasonCode(DisconnectReasonCode.NORMAL_DISCONNECTION); 248 // Make sure it is successful 249 try { 250 client.stop(disconnectPacketBuilder.build()); 251 sampleLifecycleEvents.stopFuture.get(60, TimeUnit.SECONDS); 252 System.out.println("Connection status: " + client.getIsConnected()); 253 } catch (Exception ex) { 254 System.out.println("Could not stop client! Exception: " + ex.toString()); 255 System.exit(1); 256 return; 257 } 258 } 259 } finally {} 260 261 if (tlsContext != null) { 262 tlsContext.close(); 263 } 264 265 System.out.println("Waiting for no resources..."); 266 CrtResource.waitForNoResources(); 267 268 System.out.println("Finished sample!"); 269 System.exit(0); 270 } 271 } 272