1 /*******************************************************************************
2 * Copyright (c) 2012, 2016 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial contribution
15 * Ian Craggs - change delimiter option from char to string
16 * Al Stockdill-Mander - Version using the embedded C client
17 * Ian Craggs - update MQTTClient function names
18 *******************************************************************************/
19
20 /*
21
22 stdout subscriber
23
24 compulsory parameters:
25
26 topic to subscribe to
27
28 defaulted parameters:
29
30 --host localhost
31 --port 1883
32 --qos 2
33 --delimiter \n
34 --clientid stdout_subscriber
35
36 --userid none
37 --password none
38
39 for example:
40
41 stdoutsub topic/of/interest --host iot.eclipse.org
42
43 */
44 #include <stdio.h>
45 #include <memory.h>
46 #include "MQTTClient.h"
47
48 #include <stdio.h>
49 #include <signal.h>
50
51 #include <sys/time.h>
52
53
54 volatile int toStop = 0;
55
56
usage()57 void usage()
58 {
59 printf("MQTT stdout subscriber\n");
60 printf("Usage: stdoutsub topicname <options>, where options are:\n");
61 printf(" --host <hostname> (default is localhost)\n");
62 printf(" --port <port> (default is 1883)\n");
63 printf(" --qos <qos> (default is 2)\n");
64 printf(" --delimiter <delim> (default is \\n)\n");
65 printf(" --clientid <clientid> (default is hostname+timestamp)\n");
66 printf(" --username none\n");
67 printf(" --password none\n");
68 printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n");
69 exit(-1);
70 }
71
72
cfinish(int sig)73 void cfinish(int sig)
74 {
75 signal(SIGINT, NULL);
76 toStop = 1;
77 }
78
79
80 struct opts_struct
81 {
82 char* clientid;
83 int nodelimiter;
84 char* delimiter;
85 enum QoS qos;
86 char* username;
87 char* password;
88 char* host;
89 int port;
90 int showtopics;
91 } opts =
92 {
93 (char*)"stdout-subscriber", 0, (char*)"\n", QOS2, NULL, NULL, (char*)"localhost", 1883, 0
94 };
95
96
getopts(int argc,char ** argv)97 void getopts(int argc, char** argv)
98 {
99 int count = 2;
100
101 while (count < argc)
102 {
103 if (strcmp(argv[count], "--qos") == 0)
104 {
105 if (++count < argc)
106 {
107 if (strcmp(argv[count], "0") == 0)
108 opts.qos = QOS0;
109 else if (strcmp(argv[count], "1") == 0)
110 opts.qos = QOS1;
111 else if (strcmp(argv[count], "2") == 0)
112 opts.qos = QOS2;
113 else
114 usage();
115 }
116 else
117 usage();
118 }
119 else if (strcmp(argv[count], "--host") == 0)
120 {
121 if (++count < argc)
122 opts.host = argv[count];
123 else
124 usage();
125 }
126 else if (strcmp(argv[count], "--port") == 0)
127 {
128 if (++count < argc)
129 opts.port = atoi(argv[count]);
130 else
131 usage();
132 }
133 else if (strcmp(argv[count], "--clientid") == 0)
134 {
135 if (++count < argc)
136 opts.clientid = argv[count];
137 else
138 usage();
139 }
140 else if (strcmp(argv[count], "--username") == 0)
141 {
142 if (++count < argc)
143 opts.username = argv[count];
144 else
145 usage();
146 }
147 else if (strcmp(argv[count], "--password") == 0)
148 {
149 if (++count < argc)
150 opts.password = argv[count];
151 else
152 usage();
153 }
154 else if (strcmp(argv[count], "--delimiter") == 0)
155 {
156 if (++count < argc)
157 opts.delimiter = argv[count];
158 else
159 opts.nodelimiter = 1;
160 }
161 else if (strcmp(argv[count], "--showtopics") == 0)
162 {
163 if (++count < argc)
164 {
165 if (strcmp(argv[count], "on") == 0)
166 opts.showtopics = 1;
167 else if (strcmp(argv[count], "off") == 0)
168 opts.showtopics = 0;
169 else
170 usage();
171 }
172 else
173 usage();
174 }
175 count++;
176 }
177
178 }
179
180
messageArrived(MessageData * md)181 void messageArrived(MessageData* md)
182 {
183 MQTTMessage* message = md->message;
184
185 if (opts.showtopics)
186 printf("%.*s\t", md->topicName->lenstring.len, md->topicName->lenstring.data);
187 if (opts.nodelimiter)
188 printf("%.*s", (int)message->payloadlen, (char*)message->payload);
189 else
190 printf("%.*s%s", (int)message->payloadlen, (char*)message->payload, opts.delimiter);
191 //fflush(stdout);
192 }
193
194
main(int argc,char ** argv)195 int main(int argc, char** argv)
196 {
197 int rc = 0;
198 unsigned char buf[100];
199 unsigned char readbuf[100];
200
201 if (argc < 2)
202 usage();
203
204 char* topic = argv[1];
205
206 if (strchr(topic, '#') || strchr(topic, '+'))
207 opts.showtopics = 1;
208 if (opts.showtopics)
209 printf("topic is %s\n", topic);
210
211 getopts(argc, argv);
212
213 Network n;
214 MQTTClient c;
215
216 signal(SIGINT, cfinish);
217 signal(SIGTERM, cfinish);
218
219 NetworkInit(&n);
220 NetworkConnect(&n, opts.host, opts.port);
221 MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
222
223 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
224 data.willFlag = 0;
225 data.MQTTVersion = 3;
226 data.clientID.cstring = opts.clientid;
227 data.username.cstring = opts.username;
228 data.password.cstring = opts.password;
229
230 data.keepAliveInterval = 10;
231 data.cleansession = 1;
232 printf("Connecting to %s %d\n", opts.host, opts.port);
233
234 rc = MQTTConnect(&c, &data);
235 printf("Connected %d\n", rc);
236
237 printf("Subscribing to %s\n", topic);
238 rc = MQTTSubscribe(&c, topic, opts.qos, messageArrived);
239 printf("Subscribed %d\n", rc);
240
241 while (!toStop)
242 {
243 MQTTYield(&c, 1000);
244 }
245
246 printf("Stopping\n");
247
248 MQTTDisconnect(&c);
249 NetworkDisconnect(&n);
250
251 return 0;
252 }
253
254
255