• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2014 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    http://www.eclipse.org/legal/epl-v10.html
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Sergio R. Caprile
15  *******************************************************************************/
16 
17 #include <stdio.h>
18 #include <string.h>
19 #include <stdlib.h>
20 
21 #include "MQTTPacket.h"
22 #include "transport.h"
23 
24 #define KEEPALIVE_INTERVAL 20
25 
26 /* This is to get a timebase in seconds to test the sample */
27 #include <time.h>
28 time_t old_t;
start_ping_timer(void)29 void start_ping_timer(void)
30 {
31 	time(&old_t);
32 	old_t += KEEPALIVE_INTERVAL/2 + 1;
33 }
34 
time_to_ping(void)35 int time_to_ping(void)
36 {
37 time_t t;
38 
39 	time(&t);
40 	if(t >= old_t)
41 	  	return 1;
42 	return 0;
43 }
44 
45 /* This is in order to get an asynchronous signal to stop the sample,
46 as the code loops waiting for msgs on the subscribed topic.
47 Your actual code will depend on your hw and approach, but this sample can be
48 run on Linux so debugging of the non-hardware specific bare metal code is easier.
49 See at bottom of file for details */
50 #include <signal.h>
51 
52 int toStop = 0;
53 
54 void stop_init(void);
55 /* */
56 
57 /* Same as above, we provide a set of functions to test/debug on a friendlier system;
58 the init() and  close() actions on the serial are just for this, you will probably
59 handle this on whatever handles your media in your application */
60 void sampleserial_init(void);
61 void sampleserial_close(void);
62 int samplesend(unsigned char *address, unsigned int bytes);
63 int samplerecv(unsigned char *address, unsigned int maxbytes);
64 /* */
65 
66 /* You will use your hardware specifics here, see transport.h. */
67 static transport_iofunctions_t iof = {samplesend, samplerecv};
68 
69 enum states { IDLE, SENDPING, GETPONG };
70 
main(int argc,char * argv[])71 int main(int argc, char *argv[])
72 {
73 	MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
74 	int rc = 0;
75 	int mysock = 0;
76 	unsigned char buf[200];
77 	int buflen = sizeof(buf);
78 	int len = 0;
79 	MQTTTransport mytransport;
80 	int state;
81 
82 	stop_init();
83 	sampleserial_init();
84 
85 	mysock = transport_open(&iof);
86 	if(mysock < 0)
87 		return mysock;
88 	/* You will (or already are) 'somehow' connect(ed) to host:port via your hardware specifics. E.g.:
89 		you have a serial (RS-232/UART) link
90 		you have a cell modem and you issue your AT+ magic
91 		you have some TCP/IP which is not lwIP (nor a full-fledged socket compliant one)
92 		 and you TCP connect
93 	*/
94 
95 	mytransport.sck = &mysock;
96 	mytransport.getfn = transport_getdatanb;
97 	mytransport.state = 0;
98 	data.clientID.cstring = "me";
99 	data.keepAliveInterval = KEEPALIVE_INTERVAL;
100 	data.cleansession = 1;
101 	data.username.cstring = "testuser";
102 	data.password.cstring = "testpassword";
103 
104 	len = MQTTSerialize_connect(buf, buflen, &data);
105 	/* This one blocks until it finishes sending, you will probably not want this in real life,
106 	in such a case replace this call by a scheme similar to the one you'll see in the main loop */
107 	rc = transport_sendPacketBuffer(mysock, buf, len);
108 
109 	printf("Sent MQTT connect\n");
110 	/* wait for connack */
111 	do {
112 		int frc;
113 		if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == CONNACK){
114 			unsigned char sessionPresent, connack_rc;
115 			if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0){
116 				printf("Unable to connect, return code %d\n", connack_rc);
117 				goto exit;
118 			}
119 			break;
120 		}
121 		else if (frc == -1)
122 			goto exit;
123 	} while (1); /* handle timeouts here */
124 
125 	printf("MQTT connected\n");
126 	start_ping_timer();
127 	state = IDLE;
128 	while (!toStop)	{
129 		switch(state){
130 		case IDLE:
131 			if(time_to_ping()){
132 				len = MQTTSerialize_pingreq(buf, buflen);
133 				transport_sendPacketBuffernb_start(mysock, buf, len);
134 				state = SENDPING;
135 			}
136 			break;
137 		case SENDPING:
138 			switch(transport_sendPacketBuffernb(mysock)){
139 			case TRANSPORT_DONE:
140 				printf("Ping...");
141 				start_ping_timer();
142 				state = GETPONG;
143 				break;
144 			case TRANSPORT_ERROR:
145 				/* handle any I/O errors here */
146 				goto exit;
147 				break;
148 			case TRANSPORT_AGAIN:
149 			default:
150 				/* handle timeouts here, not probable unless there is a hardware problem */
151 				break;
152 			}
153 			break;
154 		case GETPONG:
155 			if((rc=MQTTPacket_readnb(buf, buflen, &mytransport)) == PINGRESP){
156 				printf("Pong\n");
157 				start_ping_timer();
158 				state = IDLE;
159 			} else if(rc == -1){
160 				/* handle I/O errors here */
161 				printf("OOPS\n");
162 				goto exit;
163 			}	/* handle timeouts here */
164 			break;
165 		}
166 	}
167 
168 	printf("disconnecting\n");
169 	len = MQTTSerialize_disconnect(buf, buflen);
170 	/* Same blocking related stuff here */
171 	rc = transport_sendPacketBuffer(mysock, buf, len);
172 
173 exit:
174 	transport_close(mysock);
175 
176 	sampleserial_close();
177 	return 0;
178 }
179 
180 
181 /* To stop the sample */
cfinish(int sig)182 void cfinish(int sig)
183 {
184 	signal(SIGINT, NULL);
185 	toStop = 1;
186 }
187 
stop_init(void)188 void stop_init(void)
189 {
190 	signal(SIGINT, cfinish);
191 	signal(SIGTERM, cfinish);
192 }
193 
194 /* Serial hack:
195 Simulate serial transfers on an established TCP connection
196  */
197 #include <unistd.h>
198 #include <errno.h>
199 #include <sys/types.h>
200 #include <sys/socket.h>
201 #include <netinet/in.h>
202 #include <arpa/inet.h>
203 #include <fcntl.h>
204 
205 static int sockfd;
206 
sampleserial_init(void)207 void sampleserial_init(void)
208 {
209 struct sockaddr_in serv_addr;
210 
211 
212 	if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
213 	  perror(NULL);
214 	  exit(2);
215 	}
216 	serv_addr.sin_family = AF_INET;
217 	serv_addr.sin_addr.s_addr = inet_addr("198.41.30.241");
218 	serv_addr.sin_port = htons(1883);
219 	if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
220 		printf("ERROR connecting\n");
221 		exit(-1);
222 	}
223 	printf("- TCP Connected to Eclipse\n");
224         /* set to non-blocking */
225 	fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
226 }
227 
sampleserial_close(void)228 void sampleserial_close(void)
229 {
230 	close(sockfd);
231 }
232 
samplesend(unsigned char * address,unsigned int bytes)233 int samplesend(unsigned char *address, unsigned int bytes)
234 {
235 int len;
236 
237 	if(rand() > (RAND_MAX/2))	// 50% probability of being busy
238 		return 0;
239 	if(rand() > (RAND_MAX/2)){	// 50% probability of sending half the requested data (no room in buffer)
240 		if(bytes > 1)
241 			bytes /= 2;
242 	}
243 	if((len = write(sockfd, address, bytes)) >= 0)
244 		return len;
245 	if(errno == EAGAIN)
246 		return 0;
247 	return -1;
248 }
249 
samplerecv(unsigned char * address,unsigned int maxbytes)250 int samplerecv(unsigned char *address, unsigned int maxbytes)
251 {
252 int len;
253 
254 	if(rand() > (RAND_MAX/2))	// 50% probability of no data
255 		return 0;
256 	if(rand() > (RAND_MAX/2)){	// 50% probability of getting half the requested data (not arrived yet)
257 		if(maxbytes > 1){
258 			maxbytes /= 2;
259 		}
260 	}
261 	if((len = read(sockfd, address, maxbytes)) >= 0)
262 		return len;
263 	if(errno == EAGAIN)
264 		return 0;
265 	return -1;
266 }
267 
268