1
2
3 #include <sys/types.h>
4 #include <sys/socket.h>
5 #include <sys/param.h>
6 #include <sys/time.h>
7 #include <sys/select.h>
8 #include <netinet/in.h>
9 #include <netinet/tcp.h>
10 #include <arpa/inet.h>
11 #include <netdb.h>
12 #include <stdio.h>
13 #include <unistd.h>
14 #include <errno.h>
15 #include <fcntl.h>
16
17 #include <stdlib.h>
18 #include <string.h>
19 #include <signal.h>
20
21 #include "MQTTClient.h"
22 //#include "FP.cpp"
23
24 #define DEFAULT_STACK_SIZE -1
25
26
27 class IPStack
28 {
29 public:
IPStack()30 IPStack()
31 {
32
33 }
34
Socket_error(const char * aString)35 int Socket_error(const char* aString)
36 {
37
38 if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
39 {
40 if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
41 printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
42 }
43 return errno;
44 }
45
connect(const char * hostname,int port)46 int connect(const char* hostname, int port)
47 {
48 int type = SOCK_STREAM;
49 struct sockaddr_in address;
50 int rc = -1;
51 sa_family_t family = AF_INET;
52 struct addrinfo *result = NULL;
53 struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
54
55 if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
56 {
57 struct addrinfo* res = result;
58
59 /* prefer ip4 addresses */
60 while (res)
61 {
62 if (res->ai_family == AF_INET)
63 {
64 result = res;
65 break;
66 }
67 res = res->ai_next;
68 }
69
70 if (result->ai_family == AF_INET)
71 {
72 address.sin_port = htons(port);
73 address.sin_family = family = AF_INET;
74 address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
75 }
76 else
77 rc = -1;
78
79 freeaddrinfo(result);
80 }
81
82 if (rc == 0)
83 {
84 mysock = socket(family, type, 0);
85 if (mysock != -1)
86 {
87 int opt = 1;
88
89 //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
90 // printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
91
92 rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
93 }
94 }
95
96 return rc;
97 }
98
read(unsigned char * buffer,int len,int timeout_ms)99 int read(unsigned char* buffer, int len, int timeout_ms)
100 {
101 struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
102 if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
103 {
104 interval.tv_sec = 0;
105 interval.tv_usec = 100;
106 }
107
108 setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
109
110 //printf("reading %d bytes\n", len);
111 int rc = ::recv(mysock, buffer, (size_t)len, 0);
112 if (rc == -1)
113 Socket_error("read");
114 //printf("read %d bytes\n", rc);
115 return rc;
116 }
117
write(unsigned char * buffer,int len,int timeout)118 int write(unsigned char* buffer, int len, int timeout)
119 {
120 struct timeval tv;
121
122 tv.tv_sec = 0; /* 30 Secs Timeout */
123 tv.tv_usec = timeout * 1000; // Not init'ing this can cause strange errors
124
125 setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,sizeof(struct timeval));
126 int rc = ::write(mysock, buffer, len);
127 //printf("write rc %d\n", rc);
128 return rc;
129 }
130
disconnect()131 int disconnect()
132 {
133 return ::close(mysock);
134 }
135
136 private:
137
138 int mysock;
139
140 };
141
142
143 class Countdown
144 {
145 public:
Countdown()146 Countdown()
147 {
148
149 }
150
Countdown(int ms)151 Countdown(int ms)
152 {
153 countdown_ms(ms);
154 }
155
156
expired()157 bool expired()
158 {
159 struct timeval now, res;
160 gettimeofday(&now, NULL);
161 timersub(&end_time, &now, &res);
162 //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
163 //if (res.tv_sec > 0 || res.tv_usec > 0)
164 // printf("expired %d %d\n", res.tv_sec, res.tv_usec);
165 return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);
166 }
167
168
countdown_ms(int ms)169 void countdown_ms(int ms)
170 {
171 struct timeval now;
172 gettimeofday(&now, NULL);
173 struct timeval interval = {ms / 1000, (ms % 1000) * 1000};
174 //printf("interval %d %d\n", interval.tv_sec, interval.tv_usec);
175 timeradd(&now, &interval, &end_time);
176 }
177
178
countdown(int seconds)179 void countdown(int seconds)
180 {
181 struct timeval now;
182 gettimeofday(&now, NULL);
183 struct timeval interval = {seconds, 0};
184 timeradd(&now, &interval, &end_time);
185 }
186
187
left_ms()188 int left_ms()
189 {
190 struct timeval now, res;
191 gettimeofday(&now, NULL);
192 timersub(&end_time, &now, &res);
193 //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
194 return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
195 }
196
197 private:
198
199 struct timeval end_time;
200 };
201
202
203 int arrivedcount = 0;
204
messageArrived(MQTT::MessageData & md)205 void messageArrived(MQTT::MessageData& md)
206 {
207 MQTT::Message &message = md.message;
208
209 printf("Message %d arrived: qos %d, retained %d, dup %d, packetid %d\n",
210 ++arrivedcount, message.qos, message.retained, message.dup, message.id);
211 printf("Payload %.*s\n", message.payloadlen, (char*)message.payload);
212 }
213
214
main(int argc,char * argv[])215 int main(int argc, char* argv[])
216 {
217 IPStack ipstack = IPStack();
218 float version = 0.3;
219 const char* topic = "mbed-sample";
220
221 printf("Version is %f\n", version);
222
223 MQTT::Client<IPStack, Countdown> client = MQTT::Client<IPStack, Countdown>(ipstack);
224
225 const char* hostname = "localhost"; //"m2m.eclipse.org";
226 int port = 1883;
227 printf("Connecting to %s:%d\n", hostname, port);
228 int rc = ipstack.connect(hostname, port);
229 if (rc != 0)
230 printf("rc from TCP connect is %d\n", rc);
231
232 printf("MQTT connecting\n");
233 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
234 data.MQTTVersion = 3;
235 data.clientID.cstring = (char*)"mbed-icraggs";
236 rc = client.connect(&data);
237 if (rc != 0)
238 printf("rc from MQTT connect is %d\n", rc);
239 printf("MQTT connected\n");
240
241 rc = client.subscribe("+", MQTT::QOS2, messageArrived);
242 if (rc != 0)
243 printf("rc from MQTT subscribe is %d\n", rc);
244
245 MQTT::Message message;
246
247 // QoS 0
248 char buf[100];
249 sprintf(buf, "Hello World! QoS 0 message from app version %f", version);
250 message.qos = MQTT::QOS0;
251 message.retained = false;
252 message.dup = false;
253 message.payload = (void*)buf;
254 message.payloadlen = strlen(buf)+1;
255 rc = client.publish(topic, &message);
256 while (arrivedcount == 0)
257 client.yield(100);
258
259 // QoS 1
260 printf("Now QoS 1\n");
261 sprintf(buf, "Hello World! QoS 1 message from app version %f", version);
262 message.qos = MQTT::QOS1;
263 message.payloadlen = strlen(buf)+1;
264 rc = client.publish(topic, &message);
265 while (arrivedcount == 1)
266 client.yield(100);
267
268 // QoS 2
269 sprintf(buf, "Hello World! QoS 2 message from app version %f", version);
270 message.qos = MQTT::QOS2;
271 message.payloadlen = strlen(buf)+1;
272 rc = client.publish(topic, &message);
273 while (arrivedcount == 2)
274 client.yield(100);
275
276 rc = client.unsubscribe(topic);
277 if (rc != 0)
278 printf("rc from unsubscribe was %d\n", rc);
279
280 rc = client.disconnect();
281 if (rc != 0)
282 printf("rc from disconnect was %d\n", rc);
283
284 ipstack.disconnect();
285
286 printf("Finishing with %d messages received\n", arrivedcount);
287
288 return 0;
289 }
290
291
292