• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2012, 2013 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *   http://www.eclipse.org/legal/epl-v10.html
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial contribution
15  *    Ian Craggs - change delimiter option from char to string
16  *******************************************************************************/
17 
18 /*
19 
20  stdout subscriber
21 
22  compulsory parameters:
23 
24   topic to subscribe to
25 
26  defaulted parameters:
27 
28 	--host localhost
29 	--port 1883
30 	--qos 2
31 	--delimiter \n
32 	--clientid stdout_subscriber
33 
34 	--userid none
35 	--password none
36 
37 */
38 #include <stdio.h>
39 #include <memory.h>
40 #define MQTT_DEBUG 1
41 #include "MQTTClient.h"
42 
43 #define DEFAULT_STACK_SIZE -1
44 
45 #include "linux.cpp"
46 
47 #include <signal.h>
48 #include <sys/time.h>
49 #include <stdlib.h>
50 
51 
52 volatile int toStop = 0;
53 
54 
usage()55 void usage()
56 {
57 	printf("MQTT stdout subscriber\n");
58 	printf("Usage: stdoutsub topicname <options>, where options are:\n");
59 	printf("  --host <hostname> (default is localhost)\n");
60 	printf("  --port <port> (default is 1883)\n");
61 	printf("  --qos <qos> (default is 2)\n");
62 	printf("  --delimiter <delim> (default is \\n)\n");
63 	printf("  --clientid <clientid> (default is hostname+timestamp)\n");
64 	printf("  --username none\n");
65 	printf("  --password none\n");
66 	printf("  --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n");
67 	exit(-1);
68 }
69 
70 
cfinish(int sig)71 void cfinish(int sig)
72 {
73 	signal(SIGINT, NULL);
74 	toStop = 1;
75 }
76 
77 
78 struct opts_struct
79 {
80 	char* clientid;
81 	int nodelimiter;
82 	char* delimiter;
83 	MQTT::QoS qos;
84 	char* username;
85 	char* password;
86 	char* host;
87 	int port;
88 	int showtopics;
89 } opts =
90 {
91 	(char*)"stdout-subscriber", 0, (char*)"\n", MQTT::QOS2, NULL, NULL, (char*)"localhost", 1883, 0
92 };
93 
94 
getopts(int argc,char ** argv)95 void getopts(int argc, char** argv)
96 {
97 	int count = 2;
98 
99 	while (count < argc)
100 	{
101 		if (strcmp(argv[count], "--qos") == 0)
102 		{
103 			if (++count < argc)
104 			{
105 				if (strcmp(argv[count], "0") == 0)
106 					opts.qos = MQTT::QOS0;
107 				else if (strcmp(argv[count], "1") == 0)
108 					opts.qos = MQTT::QOS1;
109 				else if (strcmp(argv[count], "2") == 0)
110 					opts.qos = MQTT::QOS2;
111 				else
112 					usage();
113 			}
114 			else
115 				usage();
116 		}
117 		else if (strcmp(argv[count], "--host") == 0)
118 		{
119 			if (++count < argc)
120 				opts.host = argv[count];
121 			else
122 				usage();
123 		}
124 		else if (strcmp(argv[count], "--port") == 0)
125 		{
126 			if (++count < argc)
127 				opts.port = atoi(argv[count]);
128 			else
129 				usage();
130 		}
131 		else if (strcmp(argv[count], "--clientid") == 0)
132 		{
133 			if (++count < argc)
134 				opts.clientid = argv[count];
135 			else
136 				usage();
137 		}
138 		else if (strcmp(argv[count], "--username") == 0)
139 		{
140 			if (++count < argc)
141 				opts.username = argv[count];
142 			else
143 				usage();
144 		}
145 		else if (strcmp(argv[count], "--password") == 0)
146 		{
147 			if (++count < argc)
148 				opts.password = argv[count];
149 			else
150 				usage();
151 		}
152 		else if (strcmp(argv[count], "--delimiter") == 0)
153 		{
154 			if (++count < argc)
155 				opts.delimiter = argv[count];
156 			else
157 				opts.nodelimiter = 1;
158 		}
159 		else if (strcmp(argv[count], "--showtopics") == 0)
160 		{
161 			if (++count < argc)
162 			{
163 				if (strcmp(argv[count], "on") == 0)
164 					opts.showtopics = 1;
165 				else if (strcmp(argv[count], "off") == 0)
166 					opts.showtopics = 0;
167 				else
168 					usage();
169 			}
170 			else
171 				usage();
172 		}
173 		count++;
174 	}
175 
176 }
177 
178 
myconnect(IPStack & ipstack,MQTT::Client<IPStack,Countdown,1000> & client,MQTTPacket_connectData & data)179 void myconnect(IPStack& ipstack, MQTT::Client<IPStack, Countdown, 1000>& client, MQTTPacket_connectData& data)
180 {
181 	printf("Connecting to %s:%d\n", opts.host, opts.port);
182 	int rc = ipstack.connect(opts.host, opts.port);
183 	if (rc != 0)
184 	    printf("rc from TCP connect is %d\n", rc);
185 
186 	rc = client.connect(data);
187 	if (rc != 0)
188 	{
189 		printf("Failed to connect, return code %d\n", rc);
190 		exit(-1);
191 	}
192 	printf("Connected\n");
193 }
194 
195 
messageArrived(MQTT::MessageData & md)196 void messageArrived(MQTT::MessageData& md)
197 {
198 	MQTT::Message &message = md.message;
199 
200 	if (opts.showtopics)
201 		printf("%.*s\t", md.topicName.lenstring.len, md.topicName.lenstring.data);
202 	if (opts.nodelimiter)
203 		printf("%.*s", (int)message.payloadlen, (char*)message.payload);
204 	else
205 		printf("%.*s%s", (int)message.payloadlen, (char*)message.payload, opts.delimiter);
206 	fflush(stdout);
207 }
208 
209 
main(int argc,char ** argv)210 int main(int argc, char** argv)
211 {
212 	int rc = 0;
213 
214 	if (argc < 2)
215 		usage();
216 
217 	char* topic = argv[1];
218 
219 	if (strchr(topic, '#') || strchr(topic, '+'))
220 		opts.showtopics = 1;
221 	if (opts.showtopics)
222 		printf("topic is %s\n", topic);
223 
224 	getopts(argc, argv);
225 
226 	IPStack ipstack = IPStack();
227 	MQTT::Client<IPStack, Countdown, 1000> client = MQTT::Client<IPStack, Countdown, 1000>(ipstack);
228 
229 	signal(SIGINT, cfinish);
230 	signal(SIGTERM, cfinish);
231 
232 	MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
233 	data.willFlag = 0;
234 	data.MQTTVersion = 3;
235 	data.clientID.cstring = opts.clientid;
236 	data.username.cstring = opts.username;
237 	data.password.cstring = opts.password;
238 
239 	data.keepAliveInterval = 10;
240 	data.cleansession = 1;
241 	printf("will flag %d\n", data.willFlag);
242 
243 	myconnect(ipstack, client, data);
244 
245 	rc = client.subscribe(topic, opts.qos, messageArrived);
246 	printf("Subscribed %d\n", rc);
247 
248 	while (!toStop)
249 	{
250 		client.yield(1000);
251 
252 		//if (!client.isconnected)
253 		//	myconnect(ipstack, client, data);
254 	}
255 
256 	printf("Stopping\n");
257 
258 	rc = client.disconnect();
259 
260 	ipstack.disconnect();
261 
262 	return 0;
263 }
264 
265 
266