1 /*******************************************************************************
2 * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v2.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * https://www.eclipse.org/legal/epl-2.0/
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial API and implementation and/or initial documentation
15 * Ian Craggs, Allan Stockdill-Mander - SSL updates
16 * Ian Craggs - fix for issue #244, issue #20
17 *******************************************************************************/
18
19 /**
20 * @file
21 * \brief Socket buffering related functions
22 *
23 * Some other related functions are in the Socket module
24 */
25 #include "SocketBuffer.h"
26 #include "LinkedList.h"
27 #include "Log.h"
28 #include "Messages.h"
29 #include "StackTrace.h"
30
31 #include <stdlib.h>
32 #include <stdio.h>
33 #include <string.h>
34
35 #include "Heap.h"
36
37 #if defined(_WIN32) || defined(_WIN64)
38 #define iov_len len
39 #define iov_base buf
40 #endif
41
42 /**
43 * Default input queue buffer
44 */
45 static socket_queue* def_queue;
46
47 /**
48 * List of queued input buffers
49 */
50 static List* queues;
51
52 /**
53 * List of queued write buffers
54 */
55 static List writes;
56
57
58 int socketcompare(void* a, void* b);
59 int SocketBuffer_newDefQ(void);
60 void SocketBuffer_freeDefQ(void);
61 int pending_socketcompare(void* a, void* b);
62
63
64 /**
65 * List callback function for comparing socket_queues by socket
66 * @param a first integer value
67 * @param b second integer value
68 * @return boolean indicating whether a and b are equal
69 */
socketcompare(void * a,void * b)70 int socketcompare(void* a, void* b)
71 {
72 return ((socket_queue*)a)->socket == *(int*)b;
73 }
74
75
76 /**
77 * Create a new default queue when one has just been used.
78 */
SocketBuffer_newDefQ(void)79 int SocketBuffer_newDefQ(void)
80 {
81 int rc = PAHO_MEMORY_ERROR;
82
83 def_queue = malloc(sizeof(socket_queue));
84 if (def_queue)
85 {
86 def_queue->buflen = 1000;
87 def_queue->buf = malloc(def_queue->buflen);
88 if (def_queue->buf)
89 {
90 #if defined(ZERO_SOCK_FD_IS_INVALID)
91 def_queue->socket = def_queue->index = 0;
92 #else
93 def_queue->socket = -1;
94 def_queue->index = 0;
95 #endif
96 def_queue->buflen = def_queue->datalen = def_queue->headerlen = 0;
97 rc = 0;
98 }
99 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
100 else
101 {
102 free(def_queue);
103 def_queue = NULL;
104 }
105 #endif
106 }
107 return rc;
108 }
109
110
111 /**
112 * Initialize the socketBuffer module
113 */
SocketBuffer_initialize(void)114 int SocketBuffer_initialize(void)
115 {
116 int rc = 0;
117
118 FUNC_ENTRY;
119 rc = SocketBuffer_newDefQ();
120 if (rc == 0)
121 {
122 if ((queues = ListInitialize()) == NULL)
123 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
124 {
125 SocketBuffer_freeDefQ();
126 rc = PAHO_MEMORY_ERROR;
127 }
128 #else
129 rc = PAHO_MEMORY_ERROR;
130 #endif
131 }
132 ListZero(&writes);
133 FUNC_EXIT_RC(rc);
134 return rc;
135 }
136
137
138 /**
139 * Free the default queue memory
140 */
SocketBuffer_freeDefQ(void)141 void SocketBuffer_freeDefQ(void)
142 {
143 free(def_queue->buf);
144 free(def_queue);
145 def_queue = NULL;
146 }
147
148
149 /**
150 * Terminate the socketBuffer module
151 */
SocketBuffer_terminate(void)152 void SocketBuffer_terminate(void)
153 {
154 ListElement* cur = NULL;
155 ListEmpty(&writes);
156
157 FUNC_ENTRY;
158 while (ListNextElement(queues, &cur))
159 free(((socket_queue*)(cur->content))->buf);
160 ListFree(queues);
161 SocketBuffer_freeDefQ();
162 FUNC_EXIT;
163 }
164
165
166 /**
167 * Cleanup any buffers for a specific socket
168 * @param socket the socket to clean up
169 */
SocketBuffer_cleanup(SOCKET socket)170 void SocketBuffer_cleanup(SOCKET socket)
171 {
172 FUNC_ENTRY;
173 SocketBuffer_writeComplete(socket); /* clean up write buffers */
174 if (ListFindItem(queues, &socket, socketcompare))
175 {
176 free(((socket_queue*)(queues->current->content))->buf);
177 ListRemove(queues, queues->current->content);
178 }
179 if (def_queue->socket == socket)
180 {
181 #if defined(ZERO_SOCK_FD_IS_INVALID)
182 def_queue->socket = def_queue->index = 0;
183 #else
184 def_queue->socket = -1;
185 def_queue->index = 0;
186 #endif
187 def_queue->headerlen = def_queue->datalen = 0;
188 }
189 FUNC_EXIT;
190 }
191
192
193 /**
194 * Get any queued data for a specific socket
195 * @param socket the socket to get queued data for
196 * @param bytes the number of bytes of data to retrieve
197 * @param actual_len the actual length returned
198 * @return the actual data
199 */
SocketBuffer_getQueuedData(SOCKET socket,size_t bytes,size_t * actual_len)200 char* SocketBuffer_getQueuedData(SOCKET socket, size_t bytes, size_t* actual_len)
201 {
202 socket_queue* queue = NULL;
203
204 FUNC_ENTRY;
205 if (ListFindItem(queues, &socket, socketcompare))
206 { /* if there is queued data for this socket, add any data read to it */
207 queue = (socket_queue*)(queues->current->content);
208 *actual_len = queue->datalen;
209 }
210 else
211 {
212 *actual_len = 0;
213 queue = def_queue;
214 }
215 if (bytes > queue->buflen)
216 {
217 if (queue->datalen > 0)
218 {
219 void* newmem = malloc(bytes);
220 if (newmem)
221 {
222 memcpy(newmem, queue->buf, queue->datalen);
223 free(queue->buf);
224 queue->buf = newmem;
225 }
226 else
227 {
228 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
229 free(queue->buf);
230 queue->buf = NULL;
231 #endif
232 goto exit;
233 }
234 }
235 else
236 queue->buf = realloc(queue->buf, bytes);
237 queue->buflen = bytes;
238 }
239 exit:
240 FUNC_EXIT;
241 return queue->buf;
242 }
243
244
245 /**
246 * Get any queued character for a specific socket
247 * @param socket the socket to get queued data for
248 * @param c the character returned if any
249 * @return completion code
250 */
SocketBuffer_getQueuedChar(SOCKET socket,char * c)251 int SocketBuffer_getQueuedChar(SOCKET socket, char* c)
252 {
253 int rc = SOCKETBUFFER_INTERRUPTED;
254
255 FUNC_ENTRY;
256 if (ListFindItem(queues, &socket, socketcompare))
257 { /* if there is queued data for this socket, read that first */
258 socket_queue* queue = (socket_queue*)(queues->current->content);
259 if (queue->index < queue->headerlen)
260 {
261 *c = queue->fixed_header[(queue->index)++];
262 Log(TRACE_MAX, -1, "index is now %d, headerlen %d", queue->index, (int)queue->headerlen);
263 rc = SOCKETBUFFER_COMPLETE;
264 goto exit;
265 }
266 else if (queue->index > 4)
267 {
268 Log(LOG_FATAL, -1, "header is already at full length");
269 rc = SOCKET_ERROR;
270 goto exit;
271 }
272 }
273 exit:
274 FUNC_EXIT_RC(rc);
275 return rc; /* there was no queued char if rc is SOCKETBUFFER_INTERRUPTED*/
276 }
277
278
279 /**
280 * A socket read was interrupted so we need to queue data
281 * @param socket the socket to get queued data for
282 * @param actual_len the actual length of data that was read
283 */
SocketBuffer_interrupted(SOCKET socket,size_t actual_len)284 void SocketBuffer_interrupted(SOCKET socket, size_t actual_len)
285 {
286 socket_queue* queue = NULL;
287
288 FUNC_ENTRY;
289 if (ListFindItem(queues, &socket, socketcompare))
290 queue = (socket_queue*)(queues->current->content);
291 else /* new saved queue */
292 {
293 queue = def_queue;
294 /* if SocketBuffer_queueChar() has not yet been called, then the socket number
295 in def_queue will not have been set. Issue #244.
296 If actual_len == 0 then we may not need to do anything - I'll leave that
297 optimization for another time. */
298 queue->socket = socket;
299 ListAppend(queues, def_queue, sizeof(socket_queue)+def_queue->buflen);
300 SocketBuffer_newDefQ();
301 }
302 queue->index = 0;
303 queue->datalen = actual_len;
304 FUNC_EXIT;
305 }
306
307
308 /**
309 * A socket read has now completed so we can get rid of the queue
310 * @param socket the socket for which the operation is now complete
311 * @return pointer to the default queue data
312 */
SocketBuffer_complete(SOCKET socket)313 char* SocketBuffer_complete(SOCKET socket)
314 {
315 FUNC_ENTRY;
316 if (ListFindItem(queues, &socket, socketcompare))
317 {
318 socket_queue* queue = (socket_queue*)(queues->current->content);
319 SocketBuffer_freeDefQ();
320 def_queue = queue;
321 ListDetach(queues, queue);
322 }
323 #if defined(ZERO_SOCK_FD_IS_INVALID)
324 def_queue->socket = def_queue->index = 0;
325 #else
326 def_queue->socket = -1;
327 def_queue->index = 0;
328 #endif
329 def_queue->headerlen = def_queue->datalen = 0;
330 FUNC_EXIT;
331 return def_queue->buf;
332 }
333
334
335 /**
336 * Queued a Charactor to a specific socket
337 * @param socket the socket for which to queue char for
338 * @param c the character to queue
339 */
SocketBuffer_queueChar(SOCKET socket,char c)340 void SocketBuffer_queueChar(SOCKET socket, char c)
341 {
342 int error = 0;
343 socket_queue* curq = def_queue;
344
345 FUNC_ENTRY;
346 if (ListFindItem(queues, &socket, socketcompare))
347 curq = (socket_queue*)(queues->current->content);
348 #if defined(ZERO_SOCK_FD_IS_INVALID)
349 else if (def_queue->socket == 0)
350 #else
351 else if (def_queue->socket == -1)
352 #endif
353 {
354 def_queue->socket = socket;
355 def_queue->index = 0;
356 def_queue->datalen = 0;
357 }
358 else if (def_queue->socket != socket)
359 {
360 Log(LOG_FATAL, -1, "attempt to reuse socket queue");
361 error = 1;
362 }
363 if (curq->index > 4)
364 {
365 Log(LOG_FATAL, -1, "socket queue fixed_header field full");
366 error = 1;
367 }
368 if (!error)
369 {
370 curq->fixed_header[(curq->index)++] = c;
371 curq->headerlen = curq->index;
372 }
373 Log(TRACE_MAX, -1, "queueChar: index is now %d, headerlen %d", curq->index, (int)curq->headerlen);
374 FUNC_EXIT;
375 }
376
377
378 /**
379 * A socket write was interrupted so store the remaining data
380 * @param socket the socket for which the write was interrupted
381 * @param count the number of iovec buffers
382 * @param iovecs buffer array
383 * @param frees a set of flags indicating which of the iovecs array should be freed
384 * @param total total data length to be written
385 * @param bytes actual data length that was written
386 */
387 #if defined(OPENSSL) || defined(MBEDTLS)
SocketBuffer_pendingWrite(SOCKET socket,SSL * ssl,int count,iobuf * iovecs,int * frees,size_t total,size_t bytes)388 int SocketBuffer_pendingWrite(SOCKET socket, SSL* ssl, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
389 #else
390 int SocketBuffer_pendingWrite(SOCKET socket, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
391 #endif
392 {
393 int i = 0;
394 pending_writes* pw = NULL;
395 int rc = 0;
396
397 FUNC_ENTRY;
398 /* store the buffers until the whole packet is written */
399 if ((pw = malloc(sizeof(pending_writes))) == NULL)
400 {
401 rc = PAHO_MEMORY_ERROR;
402 goto exit;
403 }
404 pw->socket = socket;
405 #if defined(OPENSSL) || defined(MBEDTLS)
406 pw->ssl = ssl;
407 #endif
408 pw->bytes = bytes;
409 pw->total = total;
410 pw->count = count;
411 for (i = 0; i < count; i++)
412 {
413 pw->iovecs[i] = iovecs[i];
414 pw->frees[i] = frees[i];
415 }
416 ListAppend(&writes, pw, sizeof(pw) + total);
417 exit:
418 FUNC_EXIT_RC(rc);
419 return rc;
420 }
421
422
423 /**
424 * List callback function for comparing pending_writes by socket
425 * @param a first integer value
426 * @param b second integer value
427 * @return boolean indicating whether a and b are equal
428 */
pending_socketcompare(void * a,void * b)429 int pending_socketcompare(void* a, void* b)
430 {
431 return ((pending_writes*)a)->socket == *(int*)b;
432 }
433
434
435 /**
436 * Get any queued write data for a specific socket
437 * @param socket the socket to get queued data for
438 * @return pointer to the queued data or NULL
439 */
SocketBuffer_getWrite(SOCKET socket)440 pending_writes* SocketBuffer_getWrite(SOCKET socket)
441 {
442 ListElement* le = ListFindItem(&writes, &socket, pending_socketcompare);
443 return (le) ? (pending_writes*)(le->content) : NULL;
444 }
445
446
447 /**
448 * A socket write has now completed so we can get rid of the queue
449 * @param socket the socket for which the operation is now complete
450 * @return completion code, boolean - was the queue removed?
451 */
SocketBuffer_writeComplete(SOCKET socket)452 int SocketBuffer_writeComplete(SOCKET socket)
453 {
454 return ListRemoveItem(&writes, &socket, pending_socketcompare);
455 }
456
457
458 /**
459 * Update the queued write data for a socket in the case of QoS 0 messages.
460 * @param socket the socket for which the operation is now complete
461 * @param topic the topic of the QoS 0 write
462 * @param payload the payload of the QoS 0 write
463 * @return pointer to the updated queued data structure, or NULL
464 */
SocketBuffer_updateWrite(SOCKET socket,char * topic,char * payload)465 pending_writes* SocketBuffer_updateWrite(SOCKET socket, char* topic, char* payload)
466 {
467 pending_writes* pw = NULL;
468 ListElement* le = NULL;
469
470 FUNC_ENTRY;
471 if ((le = ListFindItem(&writes, &socket, pending_socketcompare)) != NULL)
472 {
473 pw = (pending_writes*)(le->content);
474 if (pw->count == 4)
475 {
476 pw->iovecs[2].iov_base = topic;
477 pw->iovecs[3].iov_base = payload;
478 }
479 }
480
481 FUNC_EXIT;
482 return pw;
483 }
484