1 /******************************************************************************* 2 * Copyright (c) 2009, 2022 IBM Corp. and others 3 * 4 * All rights reserved. This program and the accompanying materials 5 * are made available under the terms of the Eclipse Public License v2.0 6 * and Eclipse Distribution License v1.0 which accompany this distribution. 7 * 8 * The Eclipse Public License is available at 9 * https://www.eclipse.org/legal/epl-2.0/ 10 * and the Eclipse Distribution License is available at 11 * http://www.eclipse.org/org/documents/edl-v10.php. 12 * 13 * Contributors: 14 * Ian Craggs - initial implementation and documentation 15 *******************************************************************************/ 16 17 #if !defined(MQTTASYNCUTILS_H_) 18 #define MQTTASYNCUTILS_H_ 19 20 #include "MQTTPacket.h" 21 #include "Thread.h" 22 23 #define URI_TCP "tcp://" 24 #define URI_MQTT "mqtt://" 25 #define URI_WS "ws://" 26 #define URI_WSS "wss://" 27 28 enum MQTTAsync_threadStates 29 { 30 STOPPED, STARTING, RUNNING, STOPPING 31 }; 32 33 typedef struct 34 { 35 MQTTAsync_message* msg; 36 char* topicName; 37 int topicLen; 38 unsigned int seqno; /* only used on restore */ 39 } qEntry; 40 41 typedef struct 42 { 43 int type; 44 MQTTAsync_onSuccess* onSuccess; 45 MQTTAsync_onFailure* onFailure; 46 MQTTAsync_onSuccess5* onSuccess5; 47 MQTTAsync_onFailure5* onFailure5; 48 MQTTAsync_token token; 49 void* context; 50 START_TIME_TYPE start_time; 51 MQTTProperties properties; 52 union 53 { 54 struct 55 { 56 int count; 57 char** topics; 58 int* qoss; 59 MQTTSubscribe_options opts; 60 MQTTSubscribe_options* optlist; 61 } sub; 62 struct 63 { 64 int count; 65 char** topics; 66 } unsub; 67 struct 68 { 69 char* destinationName; 70 int payloadlen; 71 void* payload; 72 int qos; 73 int retained; 74 } pub; 75 struct 76 { 77 int internal; 78 int timeout; 79 enum MQTTReasonCodes reasonCode; 80 } dis; 81 struct 82 { 83 int currentURI; 84 int MQTTVersion; /**< current MQTT version being used to connect */ 85 } conn; 86 } details; 87 } MQTTAsync_command; 88 89 typedef struct MQTTAsync_struct 90 { 91 char* serverURI; 92 int ssl; 93 int websocket; 94 Clients* c; 95 96 /* "Global", to the client, callback definitions */ 97 MQTTAsync_connectionLost* cl; 98 MQTTAsync_messageArrived* ma; 99 MQTTAsync_deliveryComplete* dc; 100 void* clContext; /* the context to be associated with the conn lost callback*/ 101 void* maContext; /* the context to be associated with the msg arrived callback*/ 102 void* dcContext; /* the context to be associated with the deliv complete callback*/ 103 104 MQTTAsync_connected* connected; 105 void* connected_context; /* the context to be associated with the connected callback*/ 106 107 MQTTAsync_disconnected* disconnected; 108 void* disconnected_context; /* the context to be associated with the disconnected callback*/ 109 110 MQTTAsync_updateConnectOptions* updateConnectOptions; 111 void* updateConnectOptions_context; 112 113 /* Each time connect is called, we store the options that were used. These are reused in 114 any call to reconnect, or an automatic reconnect attempt */ 115 MQTTAsync_command connect; /* Connect operation properties */ 116 MQTTAsync_command disconnect; /* Disconnect operation properties */ 117 MQTTAsync_command* pending_write; /* Is there a socket write pending? */ 118 119 List* responses; 120 unsigned int command_seqno; 121 122 MQTTPacket* pack; 123 124 /* added for offline buffering */ 125 MQTTAsync_createOptions* createOptions; 126 int shouldBeConnected; 127 int noBufferedMessages; /* the current number of buffered (publish) messages for this client */ 128 129 /* added for automatic reconnect */ 130 int automaticReconnect; 131 int minRetryInterval; 132 int maxRetryInterval; 133 int serverURIcount; 134 char** serverURIs; 135 int connectTimeout; 136 137 int currentInterval; 138 int currentIntervalBase; 139 START_TIME_TYPE lastConnectionFailedTime; 140 int retrying; 141 int reconnectNow; 142 143 /* MQTT V5 properties */ 144 MQTTProperties* connectProps; 145 MQTTProperties* willProps; 146 147 } MQTTAsyncs; 148 149 typedef struct 150 { 151 MQTTAsync_command command; 152 MQTTAsyncs* client; 153 unsigned int seqno; /* only used on restore */ 154 int not_restored; 155 char* key; /* if not_restored, this holds the key */ 156 } MQTTAsync_queuedCommand; 157 158 void MQTTAsync_lock_mutex(mutex_type amutex); 159 void MQTTAsync_unlock_mutex(mutex_type amutex); 160 void MQTTAsync_terminate(void); 161 #if !defined(NO_PERSISTENCE) 162 int MQTTAsync_restoreCommands(MQTTAsyncs* client); 163 #endif 164 int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size); 165 void MQTTAsync_emptyMessageQueue(Clients* client); 166 void MQTTAsync_freeResponses(MQTTAsyncs* m); 167 void MQTTAsync_freeCommands(MQTTAsyncs* m); 168 int MQTTAsync_unpersistCommandsAndMessages(Clients* c); 169 void MQTTAsync_closeSession(Clients* client, enum MQTTReasonCodes reasonCode, MQTTProperties* props); 170 int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* options, int internal); 171 int MQTTAsync_assignMsgId(MQTTAsyncs* m); 172 int MQTTAsync_getNoBufferedMessages(MQTTAsyncs* m); 173 void MQTTAsync_writeContinue(SOCKET socket); 174 void MQTTAsync_writeComplete(SOCKET socket, int rc); 175 void setRetryLoopInterval(int keepalive); 176 void MQTTAsync_NULLPublishResponses(MQTTAsyncs* m); 177 void MQTTAsync_NULLPublishCommands(MQTTAsyncs* m); 178 179 #if defined(_WIN32) || defined(_WIN64) 180 #else 181 #define WINAPI 182 #endif 183 184 thread_return_type WINAPI MQTTAsync_sendThread(void* n); 185 thread_return_type WINAPI MQTTAsync_receiveThread(void* n); 186 187 #endif /* MQTTASYNCUTILS_H_ */ 188