• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 
3 #include <sys/types.h>
4 #include <sys/socket.h>
5 #include <sys/param.h>
6 #include <sys/time.h>
7 #include <sys/select.h>
8 #include <netinet/in.h>
9 #include <netinet/tcp.h>
10 #include <arpa/inet.h>
11 #include <netdb.h>
12 #include <stdio.h>
13 #include <unistd.h>
14 #include <errno.h>
15 #include <fcntl.h>
16 
17 #include <stdlib.h>
18 #include <string.h>
19 #include <signal.h>
20 
21 #include "MQTTClient.h"
22 //#include "FP.cpp"
23 
24 #define DEFAULT_STACK_SIZE -1
25 
26 
27 class IPStack
28 {
29 public:
IPStack()30     IPStack()
31     {
32 
33     }
34 
Socket_error(const char * aString)35     int Socket_error(const char* aString)
36     {
37 
38         if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
39         {
40             if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
41                 printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
42         }
43         return errno;
44     }
45 
connect(const char * hostname,int port)46     int connect(const char* hostname, int port)
47     {
48         int type = SOCK_STREAM;
49         struct sockaddr_in address;
50         int rc = -1;
51         sa_family_t family = AF_INET;
52         struct addrinfo *result = NULL;
53         struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
54 
55         if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
56         {
57             struct addrinfo* res = result;
58 
59             /* prefer ip4 addresses */
60             while (res)
61             {
62                 if (res->ai_family == AF_INET)
63                 {
64                     result = res;
65                     break;
66                 }
67                 res = res->ai_next;
68             }
69 
70             if (result->ai_family == AF_INET)
71             {
72                 address.sin_port = htons(port);
73                 address.sin_family = family = AF_INET;
74                 address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
75             }
76             else
77                 rc = -1;
78 
79             freeaddrinfo(result);
80         }
81 
82         if (rc == 0)
83         {
84             mysock = socket(family, type, 0);
85             if (mysock != -1)
86             {
87                 int opt = 1;
88 
89                 //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
90                 //    printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
91 
92                 rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
93             }
94         }
95 
96         return rc;
97     }
98 
read(unsigned char * buffer,int len,int timeout_ms)99     int read(unsigned char* buffer, int len, int timeout_ms)
100     {
101         struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
102         if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
103         {
104             interval.tv_sec = 0;
105             interval.tv_usec = 100;
106         }
107 
108         setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
109 
110         //printf("reading %d bytes\n", len);
111         int rc = ::recv(mysock, buffer, (size_t)len, 0);
112         if (rc == -1)
113             Socket_error("read");
114         //printf("read %d bytes\n", rc);
115         return rc;
116     }
117 
write(unsigned char * buffer,int len,int timeout)118     int write(unsigned char* buffer, int len, int timeout)
119     {
120         struct timeval tv;
121 
122         tv.tv_sec = 0;  /* 30 Secs Timeout */
123         tv.tv_usec = timeout * 1000;  // Not init'ing this can cause strange errors
124 
125         setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,sizeof(struct timeval));
126         int    rc = ::write(mysock, buffer, len);
127         //printf("write rc %d\n", rc);
128         return rc;
129     }
130 
disconnect()131     int disconnect()
132     {
133         return ::close(mysock);
134     }
135 
136 private:
137 
138     int mysock;
139 
140 };
141 
142 
143 class Countdown
144 {
145 public:
Countdown()146     Countdown()
147     {
148 
149     }
150 
Countdown(int ms)151     Countdown(int ms)
152     {
153         countdown_ms(ms);
154     }
155 
156 
expired()157     bool expired()
158     {
159         struct timeval now, res;
160         gettimeofday(&now, NULL);
161         timersub(&end_time, &now, &res);
162         //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
163         //if (res.tv_sec > 0 || res.tv_usec > 0)
164         //    printf("expired %d %d\n", res.tv_sec, res.tv_usec);
165         return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);
166     }
167 
168 
countdown_ms(int ms)169     void countdown_ms(int ms)
170     {
171         struct timeval now;
172         gettimeofday(&now, NULL);
173         struct timeval interval = {ms / 1000, (ms % 1000) * 1000};
174         //printf("interval %d %d\n", interval.tv_sec, interval.tv_usec);
175         timeradd(&now, &interval, &end_time);
176     }
177 
178 
countdown(int seconds)179     void countdown(int seconds)
180     {
181         struct timeval now;
182         gettimeofday(&now, NULL);
183         struct timeval interval = {seconds, 0};
184         timeradd(&now, &interval, &end_time);
185     }
186 
187 
left_ms()188     int left_ms()
189     {
190         struct timeval now, res;
191         gettimeofday(&now, NULL);
192         timersub(&end_time, &now, &res);
193         //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
194         return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
195     }
196 
197 private:
198 
199     struct timeval end_time;
200 };
201 
202 
203 int arrivedcount = 0;
204 
messageArrived(MQTT::MessageData & md)205 void messageArrived(MQTT::MessageData& md)
206 {
207     MQTT::Message &message = md.message;
208 
209     printf("Message %d arrived: qos %d, retained %d, dup %d, packetid %d\n",
210         ++arrivedcount, message.qos, message.retained, message.dup, message.id);
211     printf("Payload %.*s\n", message.payloadlen, (char*)message.payload);
212 }
213 
214 
main(int argc,char * argv[])215 int main(int argc, char* argv[])
216 {
217     IPStack ipstack = IPStack();
218     float version = 0.3;
219     const char* topic = "mbed-sample";
220 
221     printf("Version is %f\n", version);
222 
223     MQTT::Client<IPStack, Countdown> client = MQTT::Client<IPStack, Countdown>(ipstack);
224 
225     const char* hostname = "localhost"; //"m2m.eclipse.org";
226     int port = 1883;
227     printf("Connecting to %s:%d\n", hostname, port);
228     int rc = ipstack.connect(hostname, port);
229     if (rc != 0)
230         printf("rc from TCP connect is %d\n", rc);
231 
232     printf("MQTT connecting\n");
233     MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
234     data.MQTTVersion = 3;
235     data.clientID.cstring = (char*)"mbed-icraggs";
236     rc = client.connect(&data);
237     if (rc != 0)
238         printf("rc from MQTT connect is %d\n", rc);
239     printf("MQTT connected\n");
240 
241     rc = client.subscribe("+", MQTT::QOS2, messageArrived);
242     if (rc != 0)
243         printf("rc from MQTT subscribe is %d\n", rc);
244 
245     MQTT::Message message;
246 
247     // QoS 0
248     char buf[100];
249     sprintf(buf, "Hello World!  QoS 0 message from app version %f", version);
250     message.qos = MQTT::QOS0;
251     message.retained = false;
252     message.dup = false;
253     message.payload = (void*)buf;
254     message.payloadlen = strlen(buf)+1;
255     rc = client.publish(topic, &message);
256     while (arrivedcount == 0)
257         client.yield(100);
258 
259     // QoS 1
260     printf("Now QoS 1\n");
261     sprintf(buf, "Hello World!  QoS 1 message from app version %f", version);
262     message.qos = MQTT::QOS1;
263     message.payloadlen = strlen(buf)+1;
264     rc = client.publish(topic, &message);
265     while (arrivedcount == 1)
266         client.yield(100);
267 
268     // QoS 2
269     sprintf(buf, "Hello World!  QoS 2 message from app version %f", version);
270     message.qos = MQTT::QOS2;
271     message.payloadlen = strlen(buf)+1;
272     rc = client.publish(topic, &message);
273     while (arrivedcount == 2)
274         client.yield(100);
275 
276     rc = client.unsubscribe(topic);
277     if (rc != 0)
278         printf("rc from unsubscribe was %d\n", rc);
279 
280     rc = client.disconnect();
281     if (rc != 0)
282         printf("rc from disconnect was %d\n", rc);
283 
284     ipstack.disconnect();
285 
286     printf("Finishing with %d messages received\n", arrivedcount);
287 
288     return 0;
289 }
290 
291 
292