1 /*******************************************************************************
2 * Copyright (c) 2012, 2013 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial contribution
15 * Ian Craggs - change delimiter option from char to string
16 *******************************************************************************/
17
18 /*
19
20 stdout subscriber
21
22 compulsory parameters:
23
24 topic to subscribe to
25
26 defaulted parameters:
27
28 --host localhost
29 --port 1883
30 --qos 2
31 --delimiter \n
32 --clientid stdout_subscriber
33
34 --userid none
35 --password none
36
37 */
38 #include <stdio.h>
39 #include <memory.h>
40 #define MQTT_DEBUG 1
41 #include "MQTTClient.h"
42
43 #define DEFAULT_STACK_SIZE -1
44
45 #include "linux.cpp"
46
47 #include <signal.h>
48 #include <sys/time.h>
49 #include <stdlib.h>
50
51
52 volatile int toStop = 0;
53
54
usage()55 void usage()
56 {
57 printf("MQTT stdout subscriber\n");
58 printf("Usage: stdoutsub topicname <options>, where options are:\n");
59 printf(" --host <hostname> (default is localhost)\n");
60 printf(" --port <port> (default is 1883)\n");
61 printf(" --qos <qos> (default is 2)\n");
62 printf(" --delimiter <delim> (default is \\n)\n");
63 printf(" --clientid <clientid> (default is hostname+timestamp)\n");
64 printf(" --username none\n");
65 printf(" --password none\n");
66 printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n");
67 exit(-1);
68 }
69
70
cfinish(int sig)71 void cfinish(int sig)
72 {
73 signal(SIGINT, NULL);
74 toStop = 1;
75 }
76
77
78 struct opts_struct
79 {
80 char* clientid;
81 int nodelimiter;
82 char* delimiter;
83 MQTT::QoS qos;
84 char* username;
85 char* password;
86 char* host;
87 int port;
88 int showtopics;
89 } opts =
90 {
91 (char*)"stdout-subscriber", 0, (char*)"\n", MQTT::QOS2, NULL, NULL, (char*)"localhost", 1883, 0
92 };
93
94
getopts(int argc,char ** argv)95 void getopts(int argc, char** argv)
96 {
97 int count = 2;
98
99 while (count < argc)
100 {
101 if (strcmp(argv[count], "--qos") == 0)
102 {
103 if (++count < argc)
104 {
105 if (strcmp(argv[count], "0") == 0)
106 opts.qos = MQTT::QOS0;
107 else if (strcmp(argv[count], "1") == 0)
108 opts.qos = MQTT::QOS1;
109 else if (strcmp(argv[count], "2") == 0)
110 opts.qos = MQTT::QOS2;
111 else
112 usage();
113 }
114 else
115 usage();
116 }
117 else if (strcmp(argv[count], "--host") == 0)
118 {
119 if (++count < argc)
120 opts.host = argv[count];
121 else
122 usage();
123 }
124 else if (strcmp(argv[count], "--port") == 0)
125 {
126 if (++count < argc)
127 opts.port = atoi(argv[count]);
128 else
129 usage();
130 }
131 else if (strcmp(argv[count], "--clientid") == 0)
132 {
133 if (++count < argc)
134 opts.clientid = argv[count];
135 else
136 usage();
137 }
138 else if (strcmp(argv[count], "--username") == 0)
139 {
140 if (++count < argc)
141 opts.username = argv[count];
142 else
143 usage();
144 }
145 else if (strcmp(argv[count], "--password") == 0)
146 {
147 if (++count < argc)
148 opts.password = argv[count];
149 else
150 usage();
151 }
152 else if (strcmp(argv[count], "--delimiter") == 0)
153 {
154 if (++count < argc)
155 opts.delimiter = argv[count];
156 else
157 opts.nodelimiter = 1;
158 }
159 else if (strcmp(argv[count], "--showtopics") == 0)
160 {
161 if (++count < argc)
162 {
163 if (strcmp(argv[count], "on") == 0)
164 opts.showtopics = 1;
165 else if (strcmp(argv[count], "off") == 0)
166 opts.showtopics = 0;
167 else
168 usage();
169 }
170 else
171 usage();
172 }
173 count++;
174 }
175
176 }
177
178
myconnect(IPStack & ipstack,MQTT::Client<IPStack,Countdown,1000> & client,MQTTPacket_connectData & data)179 void myconnect(IPStack& ipstack, MQTT::Client<IPStack, Countdown, 1000>& client, MQTTPacket_connectData& data)
180 {
181 printf("Connecting to %s:%d\n", opts.host, opts.port);
182 int rc = ipstack.connect(opts.host, opts.port);
183 if (rc != 0)
184 printf("rc from TCP connect is %d\n", rc);
185
186 rc = client.connect(data);
187 if (rc != 0)
188 {
189 printf("Failed to connect, return code %d\n", rc);
190 exit(-1);
191 }
192 printf("Connected\n");
193 }
194
195
messageArrived(MQTT::MessageData & md)196 void messageArrived(MQTT::MessageData& md)
197 {
198 MQTT::Message &message = md.message;
199
200 if (opts.showtopics)
201 printf("%.*s\t", md.topicName.lenstring.len, md.topicName.lenstring.data);
202 if (opts.nodelimiter)
203 printf("%.*s", (int)message.payloadlen, (char*)message.payload);
204 else
205 printf("%.*s%s", (int)message.payloadlen, (char*)message.payload, opts.delimiter);
206 fflush(stdout);
207 }
208
209
main(int argc,char ** argv)210 int main(int argc, char** argv)
211 {
212 int rc = 0;
213
214 if (argc < 2)
215 usage();
216
217 char* topic = argv[1];
218
219 if (strchr(topic, '#') || strchr(topic, '+'))
220 opts.showtopics = 1;
221 if (opts.showtopics)
222 printf("topic is %s\n", topic);
223
224 getopts(argc, argv);
225
226 IPStack ipstack = IPStack();
227 MQTT::Client<IPStack, Countdown, 1000> client = MQTT::Client<IPStack, Countdown, 1000>(ipstack);
228
229 signal(SIGINT, cfinish);
230 signal(SIGTERM, cfinish);
231
232 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
233 data.willFlag = 0;
234 data.MQTTVersion = 3;
235 data.clientID.cstring = opts.clientid;
236 data.username.cstring = opts.username;
237 data.password.cstring = opts.password;
238
239 data.keepAliveInterval = 10;
240 data.cleansession = 1;
241 printf("will flag %d\n", data.willFlag);
242
243 myconnect(ipstack, client, data);
244
245 rc = client.subscribe(topic, opts.qos, messageArrived);
246 printf("Subscribed %d\n", rc);
247
248 while (!toStop)
249 {
250 client.yield(1000);
251
252 //if (!client.isconnected)
253 // myconnect(ipstack, client, data);
254 }
255
256 printf("Stopping\n");
257
258 rc = client.disconnect();
259
260 ipstack.disconnect();
261
262 return 0;
263 }
264
265
266