• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 package com.example.mqtt5;
6 
7 import java.util.ArrayList;
8 import java.util.List;
9 import java.util.UUID;
10 import java.util.concurrent.CompletableFuture;
11 import java.util.concurrent.TimeUnit;
12 import java.util.function.Consumer;
13 
14 import software.amazon.awssdk.crt.CrtResource;
15 import software.amazon.awssdk.crt.io.TlsContext;
16 import software.amazon.awssdk.crt.io.TlsContextOptions;
17 import software.amazon.awssdk.crt.mqtt5.*;
18 import software.amazon.awssdk.crt.mqtt5.packets.*;
19 
20 import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket.ConnectPacketBuilder;
21 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectPacketBuilder;
22 import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket.DisconnectReasonCode;
23 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket.PublishPacketBuilder;
24 import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket.PayloadFormatIndicator;
25 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.SubscribePacketBuilder;
26 import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket.RetainHandlingType;
27 import software.amazon.awssdk.crt.mqtt5.packets.UnsubscribePacket.UnsubscribePacketBuilder;
28 
29 public class Mqtt5Sample {
30 
31     static String endpoint = "localhost";
32     static Long port = 1883L;
33     static String clientID = "MQTT5_Sample_Java_" + UUID.randomUUID().toString();
34     static boolean useWebsockets = false;
35     static boolean useTls = false;
36     static boolean showHelp = false;
37 
printUsage()38     static void printUsage() {
39         System.out.println(
40             "Usage:\n" +
41             "  --help            This message\n"+
42             "  --endpoint        MQTT5 endpoint hostname (optional, default=localhost)\n"+
43             "  --port            MQTT5 endpoint port to use (optional, default=1883)\n"+
44             "  --clientID        The ClientID to connect with (optional, default=MQTT5_Sample_Java_<UUID>)\n"+
45             "  --use_websockets  If defined, websockets will be used (optional)\n"+
46             "  --use_tls         If defined, TLS will be used (optional)"
47         );
48     }
49 
parseCommandLine(String[] args)50     static void parseCommandLine(String[] args) {
51         for (int idx = 0; idx < args.length; ++idx) {
52             switch (args[idx]) {
53                 case "--help":
54                     showHelp = true;
55                     break;
56                 case "--endpoint":
57                     if (idx + 1 < args.length) {
58                         endpoint = args[++idx];
59                     }
60                     break;
61                 case "--port":
62                     if (idx + 1 < args.length) {
63                         port = Long.parseLong(args[++idx]);
64                     }
65                     break;
66                 case "--clientID":
67                     if (idx + 1 < args.length) {
68                         clientID = args[++idx];
69                     }
70                     break;
71                 case "--use_websockets":
72                     useWebsockets = true;
73                     break;
74                 case "--use_tls":
75                     useTls = true;
76                     break;
77                 default:
78                     System.out.println("Unrecognized argument: " + args[idx]);
79             }
80         }
81     }
82 
83 
84     static final class SampleLifecycleEvents implements Mqtt5ClientOptions.LifecycleEvents {
85         CompletableFuture<Void> connectedFuture = new CompletableFuture<>();
86         CompletableFuture<Void> stopFuture = new CompletableFuture<>();
87 
88         @Override
onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn)89         public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) {
90             System.out.println("[Lifecycle event] Client attempting connection...");
91         }
92 
93         @Override
onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn)94         public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) {
95             System.out.println("[Lifecycle event] Client connection success...");
96             connectedFuture.complete(null);
97         }
98 
99         @Override
onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn)100         public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) {
101             System.out.println("[Lifecycle event] Client connection failed...");
102             connectedFuture.completeExceptionally(new Exception("Connection failure"));
103         }
104 
105         @Override
onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn)106         public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) {
107             System.out.println("[Lifecycle event] Client disconnected...");
108         }
109 
110         @Override
onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn)111         public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {
112             System.out.println("[Lifecycle event] Client stopped...");
113             stopFuture.complete(null);
114         }
115     }
116 
117     static final class SamplePublishEvents implements Mqtt5ClientOptions.PublishEvents {
118         @Override
onMessageReceived(Mqtt5Client client, PublishReturn publishReturn)119         public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
120             PublishPacket publishPacket = publishReturn.getPublishPacket();
121             System.out.println(
122                 "Message received:\n"+
123                 "  Topic: " + publishPacket.getTopic() + "\n" +
124                 "  Payload: " + new String(publishPacket.getPayload())
125             );
126             if (publishPacket.getUserProperties() != null) {
127                 List<UserProperty> userProperties = publishPacket.getUserProperties();
128                 for (int i = 0; i < userProperties.size(); i++) {
129                     System.out.println("  Property: " + userProperties.get(i).key + " - " + userProperties.get(i).value);
130                 }
131             }
132         }
133     }
134 
main(String[] args)135     public static void main(String[] args) {
136 
137         parseCommandLine(args);
138         if (showHelp) {
139             printUsage();
140             return;
141         }
142 
143         SampleLifecycleEvents sampleLifecycleEvents = new SampleLifecycleEvents();
144         SamplePublishEvents samplePublishEvents = new SamplePublishEvents();
145         Consumer<Mqtt5WebsocketHandshakeTransformArgs> websocketTransform = null;
146         TlsContext tlsContext = null;
147 
148         try {
149             Mqtt5ClientOptions.Mqtt5ClientOptionsBuilder optionsBuilder = new Mqtt5ClientOptions.Mqtt5ClientOptionsBuilder(endpoint, port);
150 
151             optionsBuilder.withLifecycleEvents(sampleLifecycleEvents);
152             optionsBuilder.withPublishEvents(samplePublishEvents);
153 
154             if (useWebsockets == true) {
155                 websocketTransform = new Consumer<Mqtt5WebsocketHandshakeTransformArgs>() {
156                     @Override
157                     public void accept(Mqtt5WebsocketHandshakeTransformArgs t) {
158                         t.complete(t.getHttpRequest());
159                     }
160                 };
161                 optionsBuilder.withWebsocketHandshakeTransform(websocketTransform);
162             }
163             if (useTls == true) {
164                 try (TlsContextOptions tlsOptions = TlsContextOptions.createDefaultClient()) {
165                     tlsOptions.withVerifyPeer(false);
166                     tlsContext = new TlsContext(tlsOptions);
167                     optionsBuilder.withTlsContext(tlsContext);
168                 }
169             }
170 
171             ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder();
172             connectBuilder.withClientId(clientID);
173             // Add a will
174             PublishPacketBuilder willBuilder = new PublishPacketBuilder();
175             willBuilder.withTopic("test/topic/will");
176             willBuilder.withPayload("Goodbye".getBytes());
177             willBuilder.withQOS(QOS.AT_MOST_ONCE);
178             connectBuilder.withWill(willBuilder.build());
179             // Add the connection options
180             optionsBuilder.withConnectOptions(connectBuilder.build());
181 
182             try (Mqtt5Client client = new Mqtt5Client(optionsBuilder.build())) {
183                 // Connect and make sure it is successful
184                 try {
185                     client.start();
186                     sampleLifecycleEvents.connectedFuture.get(60, TimeUnit.SECONDS);
187                     System.out.println("Connection status: " + client.getIsConnected());
188                 } catch (Exception ex) {
189                     System.out.println("Could not connect! Exception: " + ex.toString());
190                     System.exit(1);
191                     return;
192                 }
193 
194                 // Subscribe
195                 SubscribePacketBuilder subscribePacketBuilder = new SubscribePacketBuilder();
196                 subscribePacketBuilder.withSubscription("test/topic", QOS.AT_LEAST_ONCE);
197                 // Make sure it is successful
198                 try {
199                     SubAckPacket subAckPacket = client.subscribe(subscribePacketBuilder.build()).get(60, TimeUnit.SECONDS);
200                     if (subAckPacket.getReasonCodes().get(0) != SubAckPacket.SubAckReasonCode.GRANTED_QOS_1) {
201                         System.out.println("Could not subscribe! Error code: " + subAckPacket.getReasonCodes().get(0).toString());
202                         System.exit(1);
203                         return;
204                     }
205                 } catch (Exception ex) {
206                     System.out.println("Could not subscribe! Exception: " + ex.toString());
207                     System.exit(1);
208                     return;
209                 }
210 
211                 // Publish
212                 PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder();
213                 publishPacketBuilder.withPayload("Hello World!".getBytes());
214                 publishPacketBuilder.withQOS(QOS.AT_LEAST_ONCE);
215                 publishPacketBuilder.withTopic("test/topic");
216                 // Add user properties
217                 List<UserProperty> publishProperties = new ArrayList<UserProperty>();
218                 publishProperties.add(new UserProperty("Red", "Blue"));
219                 publishProperties.add(new UserProperty("key", "value"));
220                 publishPacketBuilder.withUserProperties(publishProperties);
221                 // Publish 10 times and make sure they are successful
222                 try {
223                     for (int i = 0; i < 10; i++) {
224                         client.publish(publishPacketBuilder.build()).get(60, TimeUnit.SECONDS);
225                         Thread.sleep(100);
226                     }
227                 } catch (Exception ex) {
228                     System.out.println("Could not publish! Exception: " + ex.toString());
229                     System.exit(1);
230                     return;
231                 }
232 
233                 // Unsubscribe
234                 UnsubscribePacketBuilder unsubscribePacketBuilder = new UnsubscribePacketBuilder();
235                 unsubscribePacketBuilder.withSubscription("test/topic");
236                 // Make sure it is successful
237                 try {
238                     client.unsubscribe(unsubscribePacketBuilder.build()).get(60, TimeUnit.SECONDS);
239                 } catch (Exception ex) {
240                     System.out.println("Could not unsubscribe! Exception: " + ex.toString());
241                     System.exit(1);
242                     return;
243                 }
244 
245                 // Disconnect
246                 DisconnectPacketBuilder disconnectPacketBuilder = new DisconnectPacketBuilder();
247                 disconnectPacketBuilder.withReasonCode(DisconnectReasonCode.NORMAL_DISCONNECTION);
248                 // Make sure it is successful
249                 try {
250                     client.stop(disconnectPacketBuilder.build());
251                     sampleLifecycleEvents.stopFuture.get(60, TimeUnit.SECONDS);
252                     System.out.println("Connection status: " + client.getIsConnected());
253                 } catch (Exception ex) {
254                     System.out.println("Could not stop client! Exception: " + ex.toString());
255                     System.exit(1);
256                     return;
257                 }
258             }
259         } finally {}
260 
261         if (tlsContext != null) {
262             tlsContext.close();
263         }
264 
265         System.out.println("Waiting for no resources...");
266         CrtResource.waitForNoResources();
267 
268         System.out.println("Finished sample!");
269         System.exit(0);
270     }
271 }
272