1 /*******************************************************************************
2 * Copyright (c) 2014 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial API and implementation and/or initial documentation
15 * Sergio R. Caprile - port to the bare metal environment
16 *******************************************************************************/
17
18 #include <stdio.h>
19 #include <string.h>
20 #include <stdlib.h>
21
22 #include "MQTTPacket.h"
23 #include "transport.h"
24
25 /* This is in order to get an asynchronous signal to stop the sample,
26 as the code loops waiting for msgs on the subscribed topic.
27 Your actual code will depend on your hw and approach, but this sample can be
28 run on Linux so debugging of the non-hardware specific bare metal code is easier.
29 See at bottom of file for details */
30 #include <signal.h>
31
32 int toStop = 0;
33
34 void stop_init(void);
35 /* */
36
37 /* Same as above, we provide a set of functions to test/debug on a friendlier system;
38 the init() and close() actions on the serial are just for this, you will probably
39 handle this on whatever handles your media in your application */
40 void sampleserial_init(void);
41 void sampleserial_close(void);
42 int samplesend(unsigned char *address, unsigned int bytes);
43 int samplerecv(unsigned char *address, unsigned int maxbytes);
44 /* */
45
46 /* You will use your hardware specifics here, see transport.h. */
47 static transport_iofunctions_t iof = {samplesend, samplerecv};
48
49 enum states { READING, PUBLISHING };
50
main(int argc,char * argv[])51 int main(int argc, char *argv[])
52 {
53 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
54 int rc = 0;
55 int mysock = 0;
56 unsigned char buf[200];
57 int buflen = sizeof(buf);
58 int msgid = 1;
59 MQTTString topicString = MQTTString_initializer;
60 int req_qos = 0;
61 char* payload = "mypayload";
62 int payloadlen = strlen(payload);
63 int len = 0;
64 MQTTTransport mytransport;
65 int state = READING;
66
67 stop_init();
68 sampleserial_init();
69
70 mysock = transport_open(&iof);
71 if(mysock < 0)
72 return mysock;
73 /* You will (or already are) 'somehow' connect(ed) to host:port via your hardware specifics. E.g.:
74 you have a serial (RS-232/UART) link
75 you have a cell modem and you issue your AT+ magic
76 you have some TCP/IP which is not lwIP (nor a full-fledged socket compliant one)
77 and you TCP connect
78 */
79
80 mytransport.sck = &mysock;
81 mytransport.getfn = transport_getdatanb;
82 mytransport.state = 0;
83 data.clientID.cstring = "me";
84 data.keepAliveInterval = 20;
85 data.cleansession = 1;
86 data.username.cstring = "testuser";
87 data.password.cstring = "testpassword";
88
89 len = MQTTSerialize_connect(buf, buflen, &data);
90 /* This one blocks until it finishes sending, you will probably not want this in real life,
91 in such a case replace this call by a scheme similar to the one you'll see in the main loop */
92 rc = transport_sendPacketBuffer(mysock, buf, len);
93
94 printf("Sent MQTT connect\n");
95 /* wait for connack */
96 do {
97 int frc;
98 if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == CONNACK){
99 unsigned char sessionPresent, connack_rc;
100 if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0){
101 printf("Unable to connect, return code %d\n", connack_rc);
102 goto exit;
103 }
104 break;
105 }
106 else if (frc == -1)
107 goto exit;
108 } while (1); /* handle timeouts here */
109
110 printf("MQTT connected\n");
111 /* subscribe */
112 topicString.cstring = "substopic";
113 len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
114
115 /* This is equivalent to the one above, but using the non-blocking functions. You will probably not want this in real life,
116 in such a case replace this call by a scheme similar to the one you'll see in the main loop */
117 transport_sendPacketBuffernb_start(mysock, buf, len);
118 while((rc=transport_sendPacketBuffernb(mysock)) != TRANSPORT_DONE);
119 do {
120 int frc;
121 if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == SUBACK){ /* wait for suback */
122 unsigned short submsgid;
123 int subcount;
124 int granted_qos;
125
126 rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
127 if (granted_qos != 0){
128 printf("granted qos != 0, %d\n", granted_qos);
129 goto exit;
130 }
131 break;
132 }
133 else if (frc == -1)
134 goto exit;
135 } while (1); /* handle timeouts here */
136 printf("Subscribed\n");
137
138 /* loop getting msgs on subscribed topic */
139 topicString.cstring = "pubtopic";
140 state = READING;
141 while (!toStop) {
142 /* do other stuff here */
143 switch(state){
144 case READING:
145 if((rc=MQTTPacket_readnb(buf, buflen, &mytransport)) == PUBLISH){
146 unsigned char dup;
147 int qos;
148 unsigned char retained;
149 unsigned short msgid;
150 int payloadlen_in;
151 unsigned char* payload_in;
152 int rc;
153 MQTTString receivedTopic;
154
155 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
156 &payload_in, &payloadlen_in, buf, buflen);
157 printf("message arrived %.*s\n", payloadlen_in, payload_in);
158 printf("publishing reading\n");
159 state = PUBLISHING;
160 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
161 transport_sendPacketBuffernb_start(mysock, buf, len);
162 } else if(rc == -1){
163 /* handle I/O errors here */
164 goto exit;
165 } /* handle timeouts here */
166 break;
167 case PUBLISHING:
168 switch(transport_sendPacketBuffernb(mysock)){
169 case TRANSPORT_DONE:
170 printf("Published\n");
171 state = READING;
172 break;
173 case TRANSPORT_ERROR:
174 /* handle any I/O errors here */
175 goto exit;
176 break;
177 case TRANSPORT_AGAIN:
178 default:
179 /* handle timeouts here, not probable unless there is a hardware problem */
180 break;
181 }
182 break;
183 }
184 }
185
186 printf("disconnecting\n");
187 len = MQTTSerialize_disconnect(buf, buflen);
188 /* Same blocking related stuff here */
189 rc = transport_sendPacketBuffer(mysock, buf, len);
190
191 exit:
192 transport_close(mysock);
193
194 sampleserial_close();
195 return 0;
196 }
197
198
199 /* To stop the sample */
cfinish(int sig)200 void cfinish(int sig)
201 {
202 signal(SIGINT, NULL);
203 toStop = 1;
204 }
205
stop_init(void)206 void stop_init(void)
207 {
208 signal(SIGINT, cfinish);
209 signal(SIGTERM, cfinish);
210 }
211
212 /* Serial hack:
213 Simulate serial transfers on an established TCP connection
214 */
215 #include <unistd.h>
216 #include <errno.h>
217 #include <sys/types.h>
218 #include <sys/socket.h>
219 #include <netinet/in.h>
220 #include <arpa/inet.h>
221 #include <fcntl.h>
222
223 static int sockfd;
224
sampleserial_init(void)225 void sampleserial_init(void)
226 {
227 struct sockaddr_in serv_addr;
228
229
230 if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
231 perror(NULL);
232 exit(2);
233 }
234 serv_addr.sin_family = AF_INET;
235 serv_addr.sin_addr.s_addr = inet_addr("198.41.30.241");
236 serv_addr.sin_port = htons(1883);
237 if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
238 printf("ERROR connecting\n");
239 exit(-1);
240 }
241 printf("- TCP Connected to Eclipse\n");
242 /* set to non-blocking */
243 fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
244 }
245
sampleserial_close(void)246 void sampleserial_close(void)
247 {
248 close(sockfd);
249 }
250
samplesend(unsigned char * address,unsigned int bytes)251 int samplesend(unsigned char *address, unsigned int bytes)
252 {
253 int len;
254
255 if(rand() > (RAND_MAX/2)) // 50% probability of being busy
256 return 0;
257 if(rand() > (RAND_MAX/2)){ // 50% probability of sending half the requested data (no room in buffer)
258 if(bytes > 1)
259 bytes /= 2;
260 }
261 if((len = write(sockfd, address, bytes)) >= 0)
262 return len;
263 if(errno == EAGAIN)
264 return 0;
265 return -1;
266 }
267
samplerecv(unsigned char * address,unsigned int maxbytes)268 int samplerecv(unsigned char *address, unsigned int maxbytes)
269 {
270 int len;
271
272 if(rand() > (RAND_MAX/2)) // 50% probability of no data
273 return 0;
274 if(rand() > (RAND_MAX/2)){ // 50% probability of getting half the requested data (not arrived yet)
275 if(maxbytes > 1){
276 maxbytes /= 2;
277 }
278 }
279 if((len = read(sockfd, address, maxbytes)) >= 0)
280 return len;
281 if(errno == EAGAIN)
282 return 0;
283 return -1;
284 }
285
286