1 /*******************************************************************************
2 * Copyright (c) 2014 IBM Corp.
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial API and implementation and/or initial documentation
15 * Sergio R. Caprile - clarifications and/or documentation extension
16 *******************************************************************************/
17
18 #include <stdio.h>
19 #include <string.h>
20 #include <stdlib.h>
21
22 #include "MQTTPacket.h"
23 #include "transport.h"
24
25 /* This is in order to get an asynchronous signal to stop the sample,
26 as the code loops waiting for msgs on the subscribed topic.
27 Your actual code will depend on your hw and approach*/
28 #include <signal.h>
29
30 int toStop = 0;
31
cfinish(int sig)32 void cfinish(int sig)
33 {
34 signal(SIGINT, NULL);
35 toStop = 1;
36 }
37
stop_init(void)38 void stop_init(void)
39 {
40 signal(SIGINT, cfinish);
41 signal(SIGTERM, cfinish);
42 }
43 /* */
44
main(int argc,char * argv[])45 int main(int argc, char *argv[])
46 {
47 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
48 int rc = 0;
49 int mysock = 0;
50 unsigned char buf[200];
51 int buflen = sizeof(buf);
52 int msgid = 1;
53 MQTTString topicString = MQTTString_initializer;
54 int req_qos = 0;
55 char* payload = "mypayload";
56 int payloadlen = strlen(payload);
57 int len = 0;
58 char *host = MQTT_HOST;
59 int port = MQTT_PORT;
60 MQTTTransport mytransport;
61
62 stop_init();
63 if (argc > 1)
64 host = argv[1];
65
66 if (argc > 2)
67 port = atoi(argv[2]);
68
69 mysock = transport_open(host, port);
70 if(mysock < 0)
71 return mysock;
72
73 printf("Sending to hostname %s port %d\n", host, port);
74
75 mytransport.sck = &mysock;
76 mytransport.getfn = transport_getdatanb;
77 mytransport.state = 0;
78 data.clientID.cstring = "me";
79 data.keepAliveInterval = 20;
80 data.cleansession = 1;
81 data.username.cstring = "testuser";
82 data.password.cstring = "testpassword";
83
84 len = MQTTSerialize_connect(buf, buflen, &data);
85 rc = transport_sendPacketBuffer(mysock, buf, len);
86
87 /* wait for connack */
88 if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
89 {
90 unsigned char sessionPresent, connack_rc;
91
92 if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
93 {
94 printf("Unable to connect, return code %d\n", connack_rc);
95 goto exit;
96 }
97 }
98 else
99 goto exit;
100
101 /* subscribe */
102 topicString.cstring = MQTT_TOPIC;
103 len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
104
105 rc = transport_sendPacketBuffer(mysock, buf, len);
106 do {
107 int frc;
108 if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == SUBACK) /* wait for suback */
109 {
110 unsigned short submsgid;
111 int subcount;
112 int granted_qos;
113
114 rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
115 if (granted_qos != 0)
116 {
117 printf("granted qos != 0, %d\n", granted_qos);
118 goto exit;
119 }
120 break;
121 }
122 else if (frc == -1)
123 goto exit;
124 } while (1); /* handle timeouts here */
125 /* loop getting msgs on subscribed topic */
126 topicString.cstring = "pubtopic";
127 while (!toStop)
128 {
129 /* handle timeouts */
130 if (MQTTPacket_readnb(buf, buflen, &mytransport) == PUBLISH)
131 {
132 unsigned char dup;
133 int qos;
134 unsigned char retained;
135 unsigned short msgid;
136 int payloadlen_in;
137 unsigned char* payload_in;
138 int rc;
139 MQTTString receivedTopic;
140
141 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
142 &payload_in, &payloadlen_in, buf, buflen);
143 printf("message arrived %.*s\n", payloadlen_in, payload_in);
144 printf("publishing reading\n");
145 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
146 rc = transport_sendPacketBuffer(mysock, buf, len);
147 }
148 }
149
150 printf("disconnecting\n");
151 len = MQTTSerialize_disconnect(buf, buflen);
152 rc = transport_sendPacketBuffer(mysock, buf, len);
153
154 exit:
155 transport_close(mysock);
156
157 return 0;
158 }
159