• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2014 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    http://www.eclipse.org/legal/epl-v10.html
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Sergio R. Caprile - port to the bare metal environment
16  *******************************************************************************/
17 
18 #include <stdio.h>
19 #include <string.h>
20 #include <stdlib.h>
21 
22 #include "MQTTPacket.h"
23 #include "transport.h"
24 
25 /* This is in order to get an asynchronous signal to stop the sample,
26 as the code loops waiting for msgs on the subscribed topic.
27 Your actual code will depend on your hw and approach, but this sample can be
28 run on Linux so debugging of the non-hardware specific bare metal code is easier.
29 See at bottom of file for details */
30 #include <signal.h>
31 
32 int toStop = 0;
33 
34 void stop_init(void);
35 /* */
36 
37 /* Same as above, we provide a set of functions to test/debug on a friendlier system;
38 the init() and  close() actions on the serial are just for this, you will probably
39 handle this on whatever handles your media in your application */
40 void sampleserial_init(void);
41 void sampleserial_close(void);
42 int samplesend(unsigned char *address, unsigned int bytes);
43 int samplerecv(unsigned char *address, unsigned int maxbytes);
44 /* */
45 
46 /* You will use your hardware specifics here, see transport.h. */
47 static transport_iofunctions_t iof = {samplesend, samplerecv};
48 
49 enum states { READING, PUBLISHING };
50 
main(int argc,char * argv[])51 int main(int argc, char *argv[])
52 {
53 	MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
54 	int rc = 0;
55 	int mysock = 0;
56 	unsigned char buf[200];
57 	int buflen = sizeof(buf);
58 	int msgid = 1;
59 	MQTTString topicString = MQTTString_initializer;
60 	int req_qos = 0;
61 	char* payload = "mypayload";
62 	int payloadlen = strlen(payload);
63 	int len = 0;
64 	MQTTTransport mytransport;
65 	int state = READING;
66 
67 	stop_init();
68 	sampleserial_init();
69 
70 	mysock = transport_open(&iof);
71 	if(mysock < 0)
72 		return mysock;
73 	/* You will (or already are) 'somehow' connect(ed) to host:port via your hardware specifics. E.g.:
74 		you have a serial (RS-232/UART) link
75 		you have a cell modem and you issue your AT+ magic
76 		you have some TCP/IP which is not lwIP (nor a full-fledged socket compliant one)
77 		 and you TCP connect
78 	*/
79 
80 	mytransport.sck = &mysock;
81 	mytransport.getfn = transport_getdatanb;
82 	mytransport.state = 0;
83 	data.clientID.cstring = "me";
84 	data.keepAliveInterval = 20;
85 	data.cleansession = 1;
86 	data.username.cstring = "testuser";
87 	data.password.cstring = "testpassword";
88 
89 	len = MQTTSerialize_connect(buf, buflen, &data);
90 	/* This one blocks until it finishes sending, you will probably not want this in real life,
91 	in such a case replace this call by a scheme similar to the one you'll see in the main loop */
92 	rc = transport_sendPacketBuffer(mysock, buf, len);
93 
94 	printf("Sent MQTT connect\n");
95 	/* wait for connack */
96 	do {
97 		int frc;
98 		if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == CONNACK){
99 			unsigned char sessionPresent, connack_rc;
100 			if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0){
101 				printf("Unable to connect, return code %d\n", connack_rc);
102 				goto exit;
103 			}
104 			break;
105 		}
106 		else if (frc == -1)
107 			goto exit;
108 	} while (1); /* handle timeouts here */
109 
110 	printf("MQTT connected\n");
111 	/* subscribe */
112 	topicString.cstring = "substopic";
113 	len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
114 
115 	/* This is equivalent to the one above, but using the non-blocking functions. You will probably not want this in real life,
116 	in such a case replace this call by a scheme similar to the one you'll see in the main loop */
117 	transport_sendPacketBuffernb_start(mysock, buf, len);
118 	while((rc=transport_sendPacketBuffernb(mysock)) != TRANSPORT_DONE);
119 	do {
120 		int frc;
121 		if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == SUBACK){ /* wait for suback */
122 			unsigned short submsgid;
123 			int subcount;
124 			int granted_qos;
125 
126 			rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
127 			if (granted_qos != 0){
128 				printf("granted qos != 0, %d\n", granted_qos);
129 				goto exit;
130 			}
131 			break;
132 		}
133 		else if (frc == -1)
134 			goto exit;
135 	} while (1); /* handle timeouts here */
136 	printf("Subscribed\n");
137 
138 	/* loop getting msgs on subscribed topic */
139 	topicString.cstring = "pubtopic";
140 	state = READING;
141 	while (!toStop)	{
142 		/* do other stuff here */
143 		switch(state){
144 		case READING:
145 			if((rc=MQTTPacket_readnb(buf, buflen, &mytransport)) == PUBLISH){
146 				unsigned char dup;
147 				int qos;
148 				unsigned char retained;
149 				unsigned short msgid;
150 				int payloadlen_in;
151 				unsigned char* payload_in;
152 				int rc;
153 				MQTTString receivedTopic;
154 
155 				rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
156 						&payload_in, &payloadlen_in, buf, buflen);
157 				printf("message arrived %.*s\n", payloadlen_in, payload_in);
158 				printf("publishing reading\n");
159 				state = PUBLISHING;
160 				len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
161 				transport_sendPacketBuffernb_start(mysock, buf, len);
162 			} else if(rc == -1){
163 				/* handle I/O errors here */
164 				goto exit;
165 			}	/* handle timeouts here */
166 			break;
167 		case PUBLISHING:
168 			switch(transport_sendPacketBuffernb(mysock)){
169 			case TRANSPORT_DONE:
170 				printf("Published\n");
171 				state = READING;
172 				break;
173 			case TRANSPORT_ERROR:
174 				/* handle any I/O errors here */
175 				goto exit;
176 				break;
177 			case TRANSPORT_AGAIN:
178 			default:
179 				/* handle timeouts here, not probable unless there is a hardware problem */
180 				break;
181 			}
182 			break;
183 		}
184 	}
185 
186 	printf("disconnecting\n");
187 	len = MQTTSerialize_disconnect(buf, buflen);
188 	/* Same blocking related stuff here */
189 	rc = transport_sendPacketBuffer(mysock, buf, len);
190 
191 exit:
192 	transport_close(mysock);
193 
194 	sampleserial_close();
195 	return 0;
196 }
197 
198 
199 /* To stop the sample */
cfinish(int sig)200 void cfinish(int sig)
201 {
202 	signal(SIGINT, NULL);
203 	toStop = 1;
204 }
205 
stop_init(void)206 void stop_init(void)
207 {
208 	signal(SIGINT, cfinish);
209 	signal(SIGTERM, cfinish);
210 }
211 
212 /* Serial hack:
213 Simulate serial transfers on an established TCP connection
214  */
215 #include <unistd.h>
216 #include <errno.h>
217 #include <sys/types.h>
218 #include <sys/socket.h>
219 #include <netinet/in.h>
220 #include <arpa/inet.h>
221 #include <fcntl.h>
222 
223 static int sockfd;
224 
sampleserial_init(void)225 void sampleserial_init(void)
226 {
227 struct sockaddr_in serv_addr;
228 
229 
230 	if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
231 	  perror(NULL);
232 	  exit(2);
233 	}
234 	serv_addr.sin_family = AF_INET;
235 	serv_addr.sin_addr.s_addr = inet_addr("198.41.30.241");
236 	serv_addr.sin_port = htons(1883);
237 	if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
238 		printf("ERROR connecting\n");
239 		exit(-1);
240 	}
241 	printf("- TCP Connected to Eclipse\n");
242         /* set to non-blocking */
243 	fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
244 }
245 
sampleserial_close(void)246 void sampleserial_close(void)
247 {
248 	close(sockfd);
249 }
250 
samplesend(unsigned char * address,unsigned int bytes)251 int samplesend(unsigned char *address, unsigned int bytes)
252 {
253 int len;
254 
255 	if(rand() > (RAND_MAX/2))	// 50% probability of being busy
256 		return 0;
257 	if(rand() > (RAND_MAX/2)){	// 50% probability of sending half the requested data (no room in buffer)
258 		if(bytes > 1)
259 			bytes /= 2;
260 	}
261 	if((len = write(sockfd, address, bytes)) >= 0)
262 		return len;
263 	if(errno == EAGAIN)
264 		return 0;
265 	return -1;
266 }
267 
samplerecv(unsigned char * address,unsigned int maxbytes)268 int samplerecv(unsigned char *address, unsigned int maxbytes)
269 {
270 int len;
271 
272 	if(rand() > (RAND_MAX/2))	// 50% probability of no data
273 		return 0;
274 	if(rand() > (RAND_MAX/2)){	// 50% probability of getting half the requested data (not arrived yet)
275 		if(maxbytes > 1){
276 			maxbytes /= 2;
277 		}
278 	}
279 	if((len = read(sockfd, address, maxbytes)) >= 0)
280 		return len;
281 	if(errno == EAGAIN)
282 		return 0;
283 	return -1;
284 }
285 
286