• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2014 IBM Corp.
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    http://www.eclipse.org/legal/epl-v10.html
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Sergio R. Caprile - clarifications and/or documentation extension
16  *******************************************************************************/
17 
18 #include <stdio.h>
19 #include <string.h>
20 #include <stdlib.h>
21 
22 #include "MQTTPacket.h"
23 #include "transport.h"
24 
25 /* This is in order to get an asynchronous signal to stop the sample,
26 as the code loops waiting for msgs on the subscribed topic.
27 Your actual code will depend on your hw and approach*/
28 #include <signal.h>
29 
30 int toStop = 0;
31 
cfinish(int sig)32 void cfinish(int sig)
33 {
34 	signal(SIGINT, NULL);
35 	toStop = 1;
36 }
37 
stop_init(void)38 void stop_init(void)
39 {
40 	signal(SIGINT, cfinish);
41 	signal(SIGTERM, cfinish);
42 }
43 /* */
44 
main(int argc,char * argv[])45 int main(int argc, char *argv[])
46 {
47 	MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
48 	int rc = 0;
49 	int mysock = 0;
50 	unsigned char buf[200];
51 	int buflen = sizeof(buf);
52 	int msgid = 1;
53 	MQTTString topicString = MQTTString_initializer;
54 	int req_qos = 0;
55 	char* payload = "mypayload";
56 	int payloadlen = strlen(payload);
57 	int len = 0;
58 	char *host = MQTT_HOST;
59 	int port = MQTT_PORT;
60 	MQTTTransport mytransport;
61 
62 	stop_init();
63 	if (argc > 1)
64 		host = argv[1];
65 
66 	if (argc > 2)
67 		port = atoi(argv[2]);
68 
69 	mysock = transport_open(host, port);
70 	if(mysock < 0)
71 		return mysock;
72 
73 	printf("Sending to hostname %s port %d\n", host, port);
74 
75 	mytransport.sck = &mysock;
76 	mytransport.getfn = transport_getdatanb;
77 	mytransport.state = 0;
78 	data.clientID.cstring = "me";
79 	data.keepAliveInterval = 20;
80 	data.cleansession = 1;
81 	data.username.cstring = "testuser";
82 	data.password.cstring = "testpassword";
83 
84 	len = MQTTSerialize_connect(buf, buflen, &data);
85 	rc = transport_sendPacketBuffer(mysock, buf, len);
86 
87 	/* wait for connack */
88 	if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
89 	{
90 		unsigned char sessionPresent, connack_rc;
91 
92 		if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
93 		{
94 			printf("Unable to connect, return code %d\n", connack_rc);
95 			goto exit;
96 		}
97 	}
98 	else
99 		goto exit;
100 
101 	/* subscribe */
102 	topicString.cstring = MQTT_TOPIC;
103 	len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
104 
105 	rc = transport_sendPacketBuffer(mysock, buf, len);
106 	do {
107 		int frc;
108 		if ((frc=MQTTPacket_readnb(buf, buflen, &mytransport)) == SUBACK) /* wait for suback */
109 		{
110 			unsigned short submsgid;
111 			int subcount;
112 			int granted_qos;
113 
114 			rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
115 			if (granted_qos != 0)
116 			{
117 				printf("granted qos != 0, %d\n", granted_qos);
118 				goto exit;
119 			}
120 			break;
121 		}
122 		else if (frc == -1)
123 			goto exit;
124 	} while (1); /* handle timeouts here */
125 	/* loop getting msgs on subscribed topic */
126 	topicString.cstring = "pubtopic";
127 	while (!toStop)
128 	{
129 		/* handle timeouts */
130 		if (MQTTPacket_readnb(buf, buflen, &mytransport) == PUBLISH)
131 		{
132 			unsigned char dup;
133 			int qos;
134 			unsigned char retained;
135 			unsigned short msgid;
136 			int payloadlen_in;
137 			unsigned char* payload_in;
138 			int rc;
139 			MQTTString receivedTopic;
140 
141 			rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
142 					&payload_in, &payloadlen_in, buf, buflen);
143 			printf("message arrived %.*s\n", payloadlen_in, payload_in);
144 			printf("publishing reading\n");
145 			len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
146 			rc = transport_sendPacketBuffer(mysock, buf, len);
147 		}
148 	}
149 
150 	printf("disconnecting\n");
151 	len = MQTTSerialize_disconnect(buf, buflen);
152 	rc = transport_sendPacketBuffer(mysock, buf, len);
153 
154 exit:
155 	transport_close(mysock);
156 
157 	return 0;
158 }
159