1 /*******************************************************************************
2 * Copyright (c) 2014 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Sergio R. Caprile
15 *******************************************************************************/
16
17 #include <stdio.h>
18 #include <string.h>
19 #include <stdlib.h>
20
21 #include "MQTTPacket.h"
22 #include "transport.h"
23
24 #define KEEPALIVE_INTERVAL 20
25
26 /* This is to get a timebase in seconds to test the sample */
27 #include <time.h>
28 time_t old_t;
start_ping_timer(void)29 void start_ping_timer(void)
30 {
31 time(&old_t);
32 old_t += KEEPALIVE_INTERVAL/2 + 1;
33 }
34
time_to_ping(void)35 int time_to_ping(void)
36 {
37 time_t t;
38
39 time(&t);
40 if(t >= old_t)
41 return 1;
42 return 0;
43 }
44
45 /* This is in order to get an asynchronous signal to stop the sample,
46 as the code loops waiting for msgs on the subscribed topic.
47 Your actual code will depend on your hw and approach, but this sample can be
48 run on Linux so debugging of the non-hardware specific bare metal code is easier.
49 See at bottom of file for details */
50 #include <signal.h>
51
52 int toStop = 0;
53
54 void stop_init(void);
55 /* */
56
57 /* Same as above, we provide a set of functions to test/debug on a friendlier system;
58 the init() and close() actions on the serial are just for this, you will probably
59 handle this on whatever handles your media in your application */
60 void sampleserial_init(void);
61 void sampleserial_close(void);
62 int samplesend(unsigned char *address, unsigned int bytes);
63 int samplerecv(unsigned char *address, unsigned int maxbytes);
64 /* */
65
66 /* You will use your hardware specifics here, see transport.h. */
67 static transport_iofunctions_t iof = {samplesend, samplerecv};
68
69 enum states { IDLE, SENDPING, GETPONG };
70
main(int argc,char * argv[])71 int main(int argc, char *argv[])
72 {
73 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
74 int rc = 0;
75 int mysock = 0;
76 unsigned char buf[200];
77 int buflen = sizeof(buf);
78 int len = 0;
79 MQTTTransport mytransport;
80 int state;
81
82 stop_init();
83 sampleserial_init();
84
85 mysock = transport_open(&iof);
86 if(mysock < 0)
87 return mysock;
88 /* You will (or already are) 'somehow' connect(ed) to host:port via your hardware specifics. E.g.:
89 you have a serial (RS-232/UART) link
90 you have a cell modem and you issue your AT+ magic
91 you have some TCP/IP which is not lwIP (nor a full-fledged socket compliant one)
92 and you TCP connect
93 */
94
95 mytransport.sck = &mysock;
96 mytransport.getfn = transport_getdatanb;
97 mytransport.state = 0;
98 data.clientID.cstring = "me";
99 data.keepAliveInterval = KEEPALIVE_INTERVAL;
100 data.cleansession = 1;
101 data.username.cstring = "testuser";
102 data.password.cstring = "testpassword";
103
104 len = MQTTSerialize_connect(buf, buflen, &data);
105 /* This one blocks until it finishes sending, you will probably not want this in real life,
106 in such a case replace this call by a scheme similar to the one you'll see in the main loop */
107 rc = transport_sendPacketBuffer(mysock, buf, len);
108
109 printf("Sent MQTT connect\n");
110 /* wait for connack */
111 do {
112 int frc;
113 if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == CONNACK){
114 unsigned char sessionPresent, connack_rc;
115 if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0){
116 printf("Unable to connect, return code %d\n", connack_rc);
117 goto exit;
118 }
119 break;
120 }
121 else if (frc == -1)
122 goto exit;
123 } while (1); /* handle timeouts here */
124
125 printf("MQTT connected\n");
126 start_ping_timer();
127 state = IDLE;
128 while (!toStop) {
129 switch(state){
130 case IDLE:
131 if(time_to_ping()){
132 len = MQTTSerialize_pingreq(buf, buflen);
133 transport_sendPacketBuffernb_start(mysock, buf, len);
134 state = SENDPING;
135 }
136 break;
137 case SENDPING:
138 switch(transport_sendPacketBuffernb(mysock)){
139 case TRANSPORT_DONE:
140 printf("Ping...");
141 start_ping_timer();
142 state = GETPONG;
143 break;
144 case TRANSPORT_ERROR:
145 /* handle any I/O errors here */
146 goto exit;
147 break;
148 case TRANSPORT_AGAIN:
149 default:
150 /* handle timeouts here, not probable unless there is a hardware problem */
151 break;
152 }
153 break;
154 case GETPONG:
155 if((rc=MQTTPacket_readnb(buf, buflen, &mytransport)) == PINGRESP){
156 printf("Pong\n");
157 start_ping_timer();
158 state = IDLE;
159 } else if(rc == -1){
160 /* handle I/O errors here */
161 printf("OOPS\n");
162 goto exit;
163 } /* handle timeouts here */
164 break;
165 }
166 }
167
168 printf("disconnecting\n");
169 len = MQTTSerialize_disconnect(buf, buflen);
170 /* Same blocking related stuff here */
171 rc = transport_sendPacketBuffer(mysock, buf, len);
172
173 exit:
174 transport_close(mysock);
175
176 sampleserial_close();
177 return 0;
178 }
179
180
181 /* To stop the sample */
cfinish(int sig)182 void cfinish(int sig)
183 {
184 signal(SIGINT, NULL);
185 toStop = 1;
186 }
187
stop_init(void)188 void stop_init(void)
189 {
190 signal(SIGINT, cfinish);
191 signal(SIGTERM, cfinish);
192 }
193
194 /* Serial hack:
195 Simulate serial transfers on an established TCP connection
196 */
197 #include <unistd.h>
198 #include <errno.h>
199 #include <sys/types.h>
200 #include <sys/socket.h>
201 #include <netinet/in.h>
202 #include <arpa/inet.h>
203 #include <fcntl.h>
204
205 static int sockfd;
206
sampleserial_init(void)207 void sampleserial_init(void)
208 {
209 struct sockaddr_in serv_addr;
210
211
212 if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
213 perror(NULL);
214 exit(2);
215 }
216 serv_addr.sin_family = AF_INET;
217 serv_addr.sin_addr.s_addr = inet_addr("198.41.30.241");
218 serv_addr.sin_port = htons(1883);
219 if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
220 printf("ERROR connecting\n");
221 exit(-1);
222 }
223 printf("- TCP Connected to Eclipse\n");
224 /* set to non-blocking */
225 fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
226 }
227
sampleserial_close(void)228 void sampleserial_close(void)
229 {
230 close(sockfd);
231 }
232
samplesend(unsigned char * address,unsigned int bytes)233 int samplesend(unsigned char *address, unsigned int bytes)
234 {
235 int len;
236
237 if(rand() > (RAND_MAX/2)) // 50% probability of being busy
238 return 0;
239 if(rand() > (RAND_MAX/2)){ // 50% probability of sending half the requested data (no room in buffer)
240 if(bytes > 1)
241 bytes /= 2;
242 }
243 if((len = write(sockfd, address, bytes)) >= 0)
244 return len;
245 if(errno == EAGAIN)
246 return 0;
247 return -1;
248 }
249
samplerecv(unsigned char * address,unsigned int maxbytes)250 int samplerecv(unsigned char *address, unsigned int maxbytes)
251 {
252 int len;
253
254 if(rand() > (RAND_MAX/2)) // 50% probability of no data
255 return 0;
256 if(rand() > (RAND_MAX/2)){ // 50% probability of getting half the requested data (not arrived yet)
257 if(maxbytes > 1){
258 maxbytes /= 2;
259 }
260 }
261 if((len = read(sockfd, address, maxbytes)) >= 0)
262 return len;
263 if(errno == EAGAIN)
264 return 0;
265 return -1;
266 }
267
268