• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2009, 2022 IBM Corp. and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial implementation and documentation
15  *******************************************************************************/
16 
17 #if !defined(MQTTASYNCUTILS_H_)
18 #define MQTTASYNCUTILS_H_
19 
20 #include "MQTTPacket.h"
21 #include "Thread.h"
22 
23 #define URI_TCP  "tcp://"
24 #define URI_MQTT "mqtt://"
25 #define URI_WS   "ws://"
26 #define URI_WSS  "wss://"
27 
28 enum MQTTAsync_threadStates
29 {
30 	STOPPED, STARTING, RUNNING, STOPPING
31 };
32 
33 typedef struct
34 {
35 	MQTTAsync_message* msg;
36 	char* topicName;
37 	int topicLen;
38 	unsigned int seqno; /* only used on restore */
39 } qEntry;
40 
41 typedef struct
42 {
43 	int type;
44 	MQTTAsync_onSuccess* onSuccess;
45 	MQTTAsync_onFailure* onFailure;
46 	MQTTAsync_onSuccess5* onSuccess5;
47 	MQTTAsync_onFailure5* onFailure5;
48 	MQTTAsync_token token;
49 	void* context;
50 	START_TIME_TYPE start_time;
51 	MQTTProperties properties;
52 	union
53 	{
54 		struct
55 		{
56 			int count;
57 			char** topics;
58 			int* qoss;
59 			MQTTSubscribe_options opts;
60 			MQTTSubscribe_options* optlist;
61 		} sub;
62 		struct
63 		{
64 			int count;
65 			char** topics;
66 		} unsub;
67 		struct
68 		{
69 			char* destinationName;
70 			int payloadlen;
71 			void* payload;
72 			int qos;
73 			int retained;
74 		} pub;
75 		struct
76 		{
77 			int internal;
78 			int timeout;
79 			enum MQTTReasonCodes reasonCode;
80 		} dis;
81 		struct
82 		{
83 			int currentURI;
84 			int MQTTVersion; /**< current MQTT version being used to connect */
85 		} conn;
86 	} details;
87 } MQTTAsync_command;
88 
89 typedef struct MQTTAsync_struct
90 {
91 	char* serverURI;
92 	int ssl;
93 	int websocket;
94 	Clients* c;
95 
96 	/* "Global", to the client, callback definitions */
97 	MQTTAsync_connectionLost* cl;
98 	MQTTAsync_messageArrived* ma;
99 	MQTTAsync_deliveryComplete* dc;
100 	void* clContext; /* the context to be associated with the conn lost callback*/
101 	void* maContext; /* the context to be associated with the msg arrived callback*/
102 	void* dcContext; /* the context to be associated with the deliv complete callback*/
103 
104 	MQTTAsync_connected* connected;
105 	void* connected_context; /* the context to be associated with the connected callback*/
106 
107 	MQTTAsync_disconnected* disconnected;
108 	void* disconnected_context; /* the context to be associated with the disconnected callback*/
109 
110 	MQTTAsync_updateConnectOptions* updateConnectOptions;
111 	void* updateConnectOptions_context;
112 
113 	/* Each time connect is called, we store the options that were used.  These are reused in
114 	   any call to reconnect, or an automatic reconnect attempt */
115 	MQTTAsync_command connect;		/* Connect operation properties */
116 	MQTTAsync_command disconnect;		/* Disconnect operation properties */
117 	MQTTAsync_command* pending_write;       /* Is there a socket write pending? */
118 
119 	List* responses;
120 	unsigned int command_seqno;
121 
122 	MQTTPacket* pack;
123 
124 	/* added for offline buffering */
125 	MQTTAsync_createOptions* createOptions;
126 	int shouldBeConnected;
127 	int noBufferedMessages; /* the current number of buffered (publish) messages for this client */
128 
129 	/* added for automatic reconnect */
130 	int automaticReconnect;
131 	int minRetryInterval;
132 	int maxRetryInterval;
133 	int serverURIcount;
134 	char** serverURIs;
135 	int connectTimeout;
136 
137 	int currentInterval;
138 	int currentIntervalBase;
139 	START_TIME_TYPE lastConnectionFailedTime;
140 	int retrying;
141 	int reconnectNow;
142 
143 	/* MQTT V5 properties */
144 	MQTTProperties* connectProps;
145 	MQTTProperties* willProps;
146 
147 } MQTTAsyncs;
148 
149 typedef struct
150 {
151 	MQTTAsync_command command;
152 	MQTTAsyncs* client;
153 	unsigned int seqno; /* only used on restore */
154 	int not_restored;
155 	char* key; /* if not_restored, this holds the key */
156 } MQTTAsync_queuedCommand;
157 
158 void MQTTAsync_lock_mutex(mutex_type amutex);
159 void MQTTAsync_unlock_mutex(mutex_type amutex);
160 void MQTTAsync_terminate(void);
161 #if !defined(NO_PERSISTENCE)
162 int MQTTAsync_restoreCommands(MQTTAsyncs* client);
163 #endif
164 int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size);
165 void MQTTAsync_emptyMessageQueue(Clients* client);
166 void MQTTAsync_freeResponses(MQTTAsyncs* m);
167 void MQTTAsync_freeCommands(MQTTAsyncs* m);
168 int MQTTAsync_unpersistCommandsAndMessages(Clients* c);
169 void MQTTAsync_closeSession(Clients* client, enum MQTTReasonCodes reasonCode, MQTTProperties* props);
170 int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* options, int internal);
171 int MQTTAsync_assignMsgId(MQTTAsyncs* m);
172 int MQTTAsync_getNoBufferedMessages(MQTTAsyncs* m);
173 void MQTTAsync_writeContinue(SOCKET socket);
174 void MQTTAsync_writeComplete(SOCKET socket, int rc);
175 void setRetryLoopInterval(int keepalive);
176 void MQTTAsync_NULLPublishResponses(MQTTAsyncs* m);
177 void MQTTAsync_NULLPublishCommands(MQTTAsyncs* m);
178 
179 #if defined(_WIN32) || defined(_WIN64)
180 #else
181 #define WINAPI
182 #endif
183 
184 thread_return_type WINAPI MQTTAsync_sendThread(void* n);
185 thread_return_type WINAPI MQTTAsync_receiveThread(void* n);
186 
187 #endif /* MQTTASYNCUTILS_H_ */
188