• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*******************************************************************************
2  * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
3  *
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v2.0
6  * and Eclipse Distribution License v1.0 which accompany this distribution.
7  *
8  * The Eclipse Public License is available at
9  *    https://www.eclipse.org/legal/epl-2.0/
10  * and the Eclipse Distribution License is available at
11  *   http://www.eclipse.org/org/documents/edl-v10.php.
12  *
13  * Contributors:
14  *    Ian Craggs - initial API and implementation and/or initial documentation
15  *    Ian Craggs, Allan Stockdill-Mander - SSL updates
16  *    Ian Craggs - fix for issue #244, issue #20
17  *******************************************************************************/
18 
19 /**
20  * @file
21  * \brief Socket buffering related functions
22  *
23  * Some other related functions are in the Socket module
24  */
25 #include "SocketBuffer.h"
26 #include "LinkedList.h"
27 #include "Log.h"
28 #include "Messages.h"
29 #include "StackTrace.h"
30 
31 #include <stdlib.h>
32 #include <stdio.h>
33 #include <string.h>
34 
35 #include "Heap.h"
36 
37 #if defined(_WIN32) || defined(_WIN64)
38 #define iov_len len
39 #define iov_base buf
40 #endif
41 
42 /**
43  * Default input queue buffer
44  */
45 static socket_queue* def_queue;
46 
47 /**
48  * List of queued input buffers
49  */
50 static List* queues;
51 
52 /**
53  * List of queued write buffers
54  */
55 static List writes;
56 
57 
58 int socketcompare(void* a, void* b);
59 int SocketBuffer_newDefQ(void);
60 void SocketBuffer_freeDefQ(void);
61 int pending_socketcompare(void* a, void* b);
62 
63 
64 /**
65  * List callback function for comparing socket_queues by socket
66  * @param a first integer value
67  * @param b second integer value
68  * @return boolean indicating whether a and b are equal
69  */
socketcompare(void * a,void * b)70 int socketcompare(void* a, void* b)
71 {
72 	return ((socket_queue*)a)->socket == *(int*)b;
73 }
74 
75 
76 /**
77  * Create a new default queue when one has just been used.
78  */
SocketBuffer_newDefQ(void)79 int SocketBuffer_newDefQ(void)
80 {
81 	int rc = PAHO_MEMORY_ERROR;
82 
83 	def_queue = malloc(sizeof(socket_queue));
84 	if (def_queue)
85 	{
86 		def_queue->buflen = 1000;
87 		def_queue->buf = malloc(def_queue->buflen);
88 		if (def_queue->buf)
89 		{
90 #if defined(ZERO_SOCK_FD_IS_INVALID)
91 			def_queue->socket = def_queue->index = 0;
92 #else
93 			def_queue->socket = -1;
94 			def_queue->index = 0;
95 #endif
96 			def_queue->buflen = def_queue->datalen = def_queue->headerlen = 0;
97 			rc = 0;
98 		}
99 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
100 		else
101 		{
102 			free(def_queue);
103 			def_queue = NULL;
104 		}
105 #endif
106 	}
107 	return rc;
108 }
109 
110 
111 /**
112  * Initialize the socketBuffer module
113  */
SocketBuffer_initialize(void)114 int SocketBuffer_initialize(void)
115 {
116 	int rc = 0;
117 
118 	FUNC_ENTRY;
119 	rc = SocketBuffer_newDefQ();
120 	if (rc == 0)
121 	{
122 		if ((queues = ListInitialize()) == NULL)
123 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
124 		{
125 			SocketBuffer_freeDefQ();
126 			rc = PAHO_MEMORY_ERROR;
127 		}
128 #else
129 			rc = PAHO_MEMORY_ERROR;
130 #endif
131 	}
132 	ListZero(&writes);
133 	FUNC_EXIT_RC(rc);
134 	return rc;
135 }
136 
137 
138 /**
139  * Free the default queue memory
140  */
SocketBuffer_freeDefQ(void)141 void SocketBuffer_freeDefQ(void)
142 {
143 	free(def_queue->buf);
144 	free(def_queue);
145 	def_queue = NULL;
146 }
147 
148 
149 /**
150  * Terminate the socketBuffer module
151  */
SocketBuffer_terminate(void)152 void SocketBuffer_terminate(void)
153 {
154 	ListElement* cur = NULL;
155 	ListEmpty(&writes);
156 
157 	FUNC_ENTRY;
158 	while (ListNextElement(queues, &cur))
159 		free(((socket_queue*)(cur->content))->buf);
160 	ListFree(queues);
161 	SocketBuffer_freeDefQ();
162 	FUNC_EXIT;
163 }
164 
165 
166 /**
167  * Cleanup any buffers for a specific socket
168  * @param socket the socket to clean up
169  */
SocketBuffer_cleanup(SOCKET socket)170 void SocketBuffer_cleanup(SOCKET socket)
171 {
172 	FUNC_ENTRY;
173 	SocketBuffer_writeComplete(socket); /* clean up write buffers */
174 	if (ListFindItem(queues, &socket, socketcompare))
175 	{
176 		free(((socket_queue*)(queues->current->content))->buf);
177 		ListRemove(queues, queues->current->content);
178 	}
179 	if (def_queue->socket == socket)
180 	{
181 #if defined(ZERO_SOCK_FD_IS_INVALID)
182 		def_queue->socket = def_queue->index = 0;
183 #else
184 		def_queue->socket = -1;
185 		def_queue->index = 0;
186 #endif
187 		def_queue->headerlen = def_queue->datalen = 0;
188 	}
189 	FUNC_EXIT;
190 }
191 
192 
193 /**
194  * Get any queued data for a specific socket
195  * @param socket the socket to get queued data for
196  * @param bytes the number of bytes of data to retrieve
197  * @param actual_len the actual length returned
198  * @return the actual data
199  */
SocketBuffer_getQueuedData(SOCKET socket,size_t bytes,size_t * actual_len)200 char* SocketBuffer_getQueuedData(SOCKET socket, size_t bytes, size_t* actual_len)
201 {
202 	socket_queue* queue = NULL;
203 
204 	FUNC_ENTRY;
205 	if (ListFindItem(queues, &socket, socketcompare))
206 	{  /* if there is queued data for this socket, add any data read to it */
207 		queue = (socket_queue*)(queues->current->content);
208 		*actual_len = queue->datalen;
209 	}
210 	else
211 	{
212 		*actual_len = 0;
213 		queue = def_queue;
214 	}
215 	if (bytes > queue->buflen)
216 	{
217 		if (queue->datalen > 0)
218 		{
219 			void* newmem = malloc(bytes);
220 			if (newmem)
221 			{
222 				memcpy(newmem, queue->buf, queue->datalen);
223 				free(queue->buf);
224 				queue->buf = newmem;
225 			}
226 			else
227 			{
228 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
229 				free(queue->buf);
230 				queue->buf = NULL;
231 #endif
232 				goto exit;
233 			}
234 		}
235 		else
236 			queue->buf = realloc(queue->buf, bytes);
237 		queue->buflen = bytes;
238 	}
239 exit:
240 	FUNC_EXIT;
241 	return queue->buf;
242 }
243 
244 
245 /**
246  * Get any queued character for a specific socket
247  * @param socket the socket to get queued data for
248  * @param c the character returned if any
249  * @return completion code
250  */
SocketBuffer_getQueuedChar(SOCKET socket,char * c)251 int SocketBuffer_getQueuedChar(SOCKET socket, char* c)
252 {
253 	int rc = SOCKETBUFFER_INTERRUPTED;
254 
255 	FUNC_ENTRY;
256 	if (ListFindItem(queues, &socket, socketcompare))
257 	{  /* if there is queued data for this socket, read that first */
258 		socket_queue* queue = (socket_queue*)(queues->current->content);
259 		if (queue->index < queue->headerlen)
260 		{
261 			*c = queue->fixed_header[(queue->index)++];
262 			Log(TRACE_MAX, -1, "index is now %d, headerlen %d", queue->index, (int)queue->headerlen);
263 			rc = SOCKETBUFFER_COMPLETE;
264 			goto exit;
265 		}
266 		else if (queue->index > 4)
267 		{
268 			Log(LOG_FATAL, -1, "header is already at full length");
269 			rc = SOCKET_ERROR;
270 			goto exit;
271 		}
272 	}
273 exit:
274 	FUNC_EXIT_RC(rc);
275 	return rc;  /* there was no queued char if rc is SOCKETBUFFER_INTERRUPTED*/
276 }
277 
278 
279 /**
280  * A socket read was interrupted so we need to queue data
281  * @param socket the socket to get queued data for
282  * @param actual_len the actual length of data that was read
283  */
SocketBuffer_interrupted(SOCKET socket,size_t actual_len)284 void SocketBuffer_interrupted(SOCKET socket, size_t actual_len)
285 {
286 	socket_queue* queue = NULL;
287 
288 	FUNC_ENTRY;
289 	if (ListFindItem(queues, &socket, socketcompare))
290 		queue = (socket_queue*)(queues->current->content);
291 	else /* new saved queue */
292 	{
293 		queue = def_queue;
294 		/* if SocketBuffer_queueChar() has not yet been called, then the socket number
295 		  in def_queue will not have been set.  Issue #244.
296 		  If actual_len == 0 then we may not need to do anything - I'll leave that
297 		  optimization for another time. */
298 		queue->socket = socket;
299 		ListAppend(queues, def_queue, sizeof(socket_queue)+def_queue->buflen);
300 		SocketBuffer_newDefQ();
301 	}
302 	queue->index = 0;
303 	queue->datalen = actual_len;
304 	FUNC_EXIT;
305 }
306 
307 
308 /**
309  * A socket read has now completed so we can get rid of the queue
310  * @param socket the socket for which the operation is now complete
311  * @return pointer to the default queue data
312  */
SocketBuffer_complete(SOCKET socket)313 char* SocketBuffer_complete(SOCKET socket)
314 {
315 	FUNC_ENTRY;
316 	if (ListFindItem(queues, &socket, socketcompare))
317 	{
318 		socket_queue* queue = (socket_queue*)(queues->current->content);
319 		SocketBuffer_freeDefQ();
320 		def_queue = queue;
321 		ListDetach(queues, queue);
322 	}
323 #if defined(ZERO_SOCK_FD_IS_INVALID)
324 	def_queue->socket = def_queue->index = 0;
325 #else
326 	def_queue->socket = -1;
327 	def_queue->index = 0;
328 #endif
329 	def_queue->headerlen = def_queue->datalen = 0;
330 	FUNC_EXIT;
331 	return def_queue->buf;
332 }
333 
334 
335 /**
336  * Queued a Charactor to a specific socket
337  * @param socket the socket for which to queue char for
338  * @param c the character to queue
339  */
SocketBuffer_queueChar(SOCKET socket,char c)340 void SocketBuffer_queueChar(SOCKET socket, char c)
341 {
342 	int error = 0;
343 	socket_queue* curq = def_queue;
344 
345 	FUNC_ENTRY;
346 	if (ListFindItem(queues, &socket, socketcompare))
347 		curq = (socket_queue*)(queues->current->content);
348 #if defined(ZERO_SOCK_FD_IS_INVALID)
349 	else if (def_queue->socket == 0)
350 #else
351 	else if (def_queue->socket == -1)
352 #endif
353 	{
354 		def_queue->socket = socket;
355 		def_queue->index = 0;
356 		def_queue->datalen = 0;
357 	}
358 	else if (def_queue->socket != socket)
359 	{
360 		Log(LOG_FATAL, -1, "attempt to reuse socket queue");
361 		error = 1;
362 	}
363 	if (curq->index > 4)
364 	{
365 		Log(LOG_FATAL, -1, "socket queue fixed_header field full");
366 		error = 1;
367 	}
368 	if (!error)
369 	{
370 		curq->fixed_header[(curq->index)++] = c;
371 		curq->headerlen = curq->index;
372 	}
373 	Log(TRACE_MAX, -1, "queueChar: index is now %d, headerlen %d", curq->index, (int)curq->headerlen);
374 	FUNC_EXIT;
375 }
376 
377 
378 /**
379  * A socket write was interrupted so store the remaining data
380  * @param socket the socket for which the write was interrupted
381  * @param count the number of iovec buffers
382  * @param iovecs buffer array
383  * @param frees a set of flags indicating which of the iovecs array should be freed
384  * @param total total data length to be written
385  * @param bytes actual data length that was written
386  */
387 #if defined(OPENSSL) || defined(MBEDTLS)
SocketBuffer_pendingWrite(SOCKET socket,SSL * ssl,int count,iobuf * iovecs,int * frees,size_t total,size_t bytes)388 int SocketBuffer_pendingWrite(SOCKET socket, SSL* ssl, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
389 #else
390 int SocketBuffer_pendingWrite(SOCKET socket, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
391 #endif
392 {
393 	int i = 0;
394 	pending_writes* pw = NULL;
395 	int rc = 0;
396 
397 	FUNC_ENTRY;
398 	/* store the buffers until the whole packet is written */
399 	if ((pw = malloc(sizeof(pending_writes))) == NULL)
400 	{
401 		rc = PAHO_MEMORY_ERROR;
402 		goto exit;
403 	}
404 	pw->socket = socket;
405 #if defined(OPENSSL) || defined(MBEDTLS)
406 	pw->ssl = ssl;
407 #endif
408 	pw->bytes = bytes;
409 	pw->total = total;
410 	pw->count = count;
411 	for (i = 0; i < count; i++)
412 	{
413 		pw->iovecs[i] = iovecs[i];
414 		pw->frees[i] = frees[i];
415 	}
416 	ListAppend(&writes, pw, sizeof(pw) + total);
417 exit:
418 	FUNC_EXIT_RC(rc);
419 	return rc;
420 }
421 
422 
423 /**
424  * List callback function for comparing pending_writes by socket
425  * @param a first integer value
426  * @param b second integer value
427  * @return boolean indicating whether a and b are equal
428  */
pending_socketcompare(void * a,void * b)429 int pending_socketcompare(void* a, void* b)
430 {
431 	return ((pending_writes*)a)->socket == *(int*)b;
432 }
433 
434 
435 /**
436  * Get any queued write data for a specific socket
437  * @param socket the socket to get queued data for
438  * @return pointer to the queued data or NULL
439  */
SocketBuffer_getWrite(SOCKET socket)440 pending_writes* SocketBuffer_getWrite(SOCKET socket)
441 {
442 	ListElement* le = ListFindItem(&writes, &socket, pending_socketcompare);
443 	return (le) ? (pending_writes*)(le->content) : NULL;
444 }
445 
446 
447 /**
448  * A socket write has now completed so we can get rid of the queue
449  * @param socket the socket for which the operation is now complete
450  * @return completion code, boolean - was the queue removed?
451  */
SocketBuffer_writeComplete(SOCKET socket)452 int SocketBuffer_writeComplete(SOCKET socket)
453 {
454 	return ListRemoveItem(&writes, &socket, pending_socketcompare);
455 }
456 
457 
458 /**
459  * Update the queued write data for a socket in the case of QoS 0 messages.
460  * @param socket the socket for which the operation is now complete
461  * @param topic the topic of the QoS 0 write
462  * @param payload the payload of the QoS 0 write
463  * @return pointer to the updated queued data structure, or NULL
464  */
SocketBuffer_updateWrite(SOCKET socket,char * topic,char * payload)465 pending_writes* SocketBuffer_updateWrite(SOCKET socket, char* topic, char* payload)
466 {
467 	pending_writes* pw = NULL;
468 	ListElement* le = NULL;
469 
470 	FUNC_ENTRY;
471 	if ((le = ListFindItem(&writes, &socket, pending_socketcompare)) != NULL)
472 	{
473 		pw = (pending_writes*)(le->content);
474 		if (pw->count == 4)
475 		{
476 			pw->iovecs[2].iov_base = topic;
477 			pw->iovecs[3].iov_base = payload;
478 		}
479 	}
480 
481 	FUNC_EXIT;
482 	return pw;
483 }
484