1 /*******************************************************************************
2 * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
3 *
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v2.0
6 * and Eclipse Distribution License v1.0 which accompany this distribution.
7 *
8 * The Eclipse Public License is available at
9 * https://www.eclipse.org/legal/epl-2.0/
10 * and the Eclipse Distribution License is available at
11 * http://www.eclipse.org/org/documents/edl-v10.php.
12 *
13 * Contributors:
14 * Ian Craggs - initial implementation and documentation
15 * Ian Craggs - async client updates
16 * Ian Craggs - fix for bug 484496
17 * Juergen Kosel, Ian Craggs - fix for issue #135
18 * Ian Craggs - issue #217
19 * Ian Craggs - fix for issue #186
20 * Ian Craggs - remove StackTrace print debugging calls
21 *******************************************************************************/
22
23 /**
24 * @file
25 * \brief Socket related functions
26 *
27 * Some other related functions are in the SocketBuffer module
28 */
29
30
31 #include "Socket.h"
32 #include "Log.h"
33 #include "SocketBuffer.h"
34 #include "Messages.h"
35 #include "StackTrace.h"
36 #if defined(IOT_CONNECT)
37 #include "soc_socket.h"
38 #include "atiny_dns.h"
39 #include "securec.h"
40 #include "atiny_mqtt_commu.h"
41 #endif
42
43 #if defined(IOT_LITEOS_ADAPT)
44 #include "lwip/sockets.h"
45 #endif
46
47 #if defined(OPENSSL) || defined(MBEDTLS)
48 #include "SSLSocket.h"
49 #endif
50
51 #include <stdlib.h>
52 #include <string.h>
53 #include <signal.h>
54 #include <ctype.h>
55
56 #include "Heap.h"
57 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
58 #define RETRY_TIMES 2
59 #define RETRY_TIMEOUT_S 10
60 #endif
61
62 #if defined(USE_SELECT)
63 int isReady(int socket, fd_set* read_set, fd_set* write_set);
64 int Socket_continueWrites(fd_set* pwset, SOCKET* socket, mutex_type mutex);
65 #else
66 int isReady(int index);
67 int Socket_continueWrites(SOCKET* socket, mutex_type mutex);
68 #endif
69 int Socket_setnonblocking(SOCKET sock);
70 int Socket_error(char* aString, SOCKET sock);
71 int Socket_addSocket(SOCKET newSd);
72 int Socket_writev(SOCKET socket, iobuf* iovecs, int count, unsigned long* bytes);
73 int Socket_close_only(SOCKET socket);
74 int Socket_continueWrite(SOCKET socket);
75 char* Socket_getaddrname(struct sockaddr* sa, SOCKET sock);
76 int Socket_abortWrite(SOCKET socket);
77
78 #if defined(_WIN32) || defined(_WIN64)
79 #define iov_len len
80 #define iov_base buf
81 #define snprintf _snprintf
82 #endif
83
84 /**
85 * Structure to hold all socket data for this module
86 */
87 Sockets mod_s;
88 #if defined(USE_SELECT)
89 static fd_set wset;
90 #endif
91
92 extern mutex_type socket_mutex;
93
94 /**
95 * Set a socket non-blocking, OS independently
96 * @param sock the socket to set non-blocking
97 * @return TCP call error code
98 */
Socket_setnonblocking(SOCKET sock)99 int Socket_setnonblocking(SOCKET sock)
100 {
101 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
102 return 0;
103 #else
104 int rc;
105 #if defined(_WIN32) || defined(_WIN64)
106 u_long flag = 1L;
107
108 FUNC_ENTRY;
109 rc = ioctl(sock, FIONBIO, &flag);
110 #else
111 int flags;
112
113 FUNC_ENTRY;
114 if ((flags = fcntl(sock, F_GETFL, 0)))
115 flags = 0;
116 rc = fcntl(sock, F_SETFL, flags | O_NONBLOCK);
117 #endif
118 FUNC_EXIT_RC(rc);
119 return rc;
120 #endif
121 }
122
123
124 /**
125 * Gets the specific error corresponding to SOCKET_ERROR
126 * @param aString the function that was being used when the error occurred
127 * @param sock the socket on which the error occurred
128 * @return the specific TCP error code
129 */
Socket_error(char * aString,SOCKET sock)130 int Socket_error(char* aString, SOCKET sock)
131 {
132 int err;
133
134 #if defined(_WIN32) || defined(_WIN64)
135 err = WSAGetLastError();
136 #else
137 err = errno;
138 #endif
139 if (err != EINTR && err != EAGAIN && err != EINPROGRESS && err != EWOULDBLOCK)
140 {
141 if (strcmp(aString, "shutdown") != 0 || (err != ENOTCONN && err != ECONNRESET))
142 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
143 Log(TRACE_MINIMUM, -1, "Socket error (%d) in %s for socket %d", err, aString, sock);
144 #else
145 Log(TRACE_MINIMUM, -1, "Socket error %s(%d) in %s for socket %d", strerror(err), err, aString, sock);
146 #endif
147 }
148 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
149 return -1;
150 #else
151 return err;
152 #endif
153 }
154
155
156 /**
157 * Initialize the socket module
158 */
159 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
Socket_outInitialize(void)160 int Socket_outInitialize(void)
161 #else
162 void Socket_outInitialize(void)
163 #endif
164 {
165 #if defined(_WIN32) || defined(_WIN64)
166 WORD winsockVer = 0x0202;
167 WSADATA wsd;
168
169 FUNC_ENTRY;
170 WSAStartup(winsockVer, &wsd);
171 #else
172 FUNC_ENTRY;
173 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
174 signal(SIGPIPE, SIG_IGN);
175 #endif
176 #endif
177
178 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
179 int rc;
180 rc = SocketBuffer_initialize();
181 if (rc != 0)
182 goto exit;
183 #else
184 SocketBuffer_initialize();
185 #endif
186 mod_s.connect_pending = ListInitialize();
187 mod_s.write_pending = ListInitialize();
188
189 #if defined(USE_SELECT)
190 mod_s.clientsds = ListInitialize();
191 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
192 if ((mod_s.clientsds == NULL) || (mod_s.connect_pending == NULL) || (mod_s.write_pending == NULL))
193 {
194 SocketBuffer_terminate();
195 if (mod_s.clientsds != NULL)
196 {
197 ListFree(mod_s.clientsds);
198 mod_s.clientsds = NULL;
199 }
200 if (mod_s.connect_pending != NULL)
201 {
202 ListFree(mod_s.connect_pending);
203 mod_s.connect_pending = NULL;
204 }
205 if (mod_s.write_pending != NULL)
206 {
207 ListFree(mod_s.write_pending);
208 mod_s.write_pending = NULL;
209 }
210 rc = PAHO_MEMORY_ERROR;
211 goto exit;
212 }
213 #else
214 mod_s.cur_clientsds = NULL;
215 FD_ZERO(&(mod_s.rset)); /* Initialize the descriptor set */
216 FD_ZERO(&(mod_s.pending_wset));
217 mod_s.maxfdp1 = 0;
218 memcpy((void*)&(mod_s.rset_saved), (void*)&(mod_s.rset), sizeof(mod_s.rset_saved));
219 #endif
220 #else
221 mod_s.nfds = 0;
222 mod_s.fds_read = NULL;
223 mod_s.fds_write = NULL;
224
225 mod_s.saved.cur_fd = -1;
226 mod_s.saved.fds_write = NULL;
227 mod_s.saved.fds_read = NULL;
228 mod_s.saved.nfds = 0;
229 #endif
230 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
231 exit:
232 FUNC_EXIT_RC(rc);
233 return rc;
234 #else
235 FUNC_EXIT;
236 #endif
237 }
238
239
240 /**
241 * Terminate the socket module
242 */
Socket_outTerminate(void)243 void Socket_outTerminate(void)
244 {
245 FUNC_ENTRY;
246 ListFree(mod_s.connect_pending);
247 ListFree(mod_s.write_pending);
248 #if defined(USE_SELECT)
249 ListFree(mod_s.clientsds);
250 #else
251 if (mod_s.fds_read)
252 free(mod_s.fds_read);
253 if (mod_s.fds_write)
254 free(mod_s.fds_write);
255 if (mod_s.saved.fds_write)
256 free(mod_s.saved.fds_write);
257 if (mod_s.saved.fds_read)
258 free(mod_s.saved.fds_read);
259 #endif
260 SocketBuffer_terminate();
261 #if defined(_WIN32) || defined(_WIN64)
262 WSACleanup();
263 #endif
264 FUNC_EXIT;
265 }
266
267
268 #if defined(USE_SELECT)
269 /**
270 * Add a socket to the list of socket to check with select
271 * @param newSd the new socket to add
272 */
Socket_addSocket(SOCKET newSd)273 int Socket_addSocket(SOCKET newSd)
274 {
275 int rc = 0;
276
277 FUNC_ENTRY;
278 if (ListFindItem(mod_s.clientsds, &newSd, intcompare) == NULL) /* make sure we don't add the same socket twice */
279 {
280 if (mod_s.clientsds->count >= FD_SETSIZE)
281 {
282 Log(LOG_ERROR, -1, "addSocket: exceeded FD_SETSIZE %d", FD_SETSIZE);
283 rc = SOCKET_ERROR;
284 }
285 else
286 {
287 SOCKET* pnewSd = (SOCKET*)malloc(sizeof(newSd));
288
289 if (!pnewSd)
290 {
291 rc = PAHO_MEMORY_ERROR;
292 goto exit;
293 }
294 *pnewSd = newSd;
295 if (!ListAppend(mod_s.clientsds, pnewSd, sizeof(newSd)))
296 {
297 free(pnewSd);
298 rc = PAHO_MEMORY_ERROR;
299 goto exit;
300 }
301 FD_SET(newSd, &(mod_s.rset_saved));
302 mod_s.maxfdp1 = max(mod_s.maxfdp1, (int)newSd + 1);
303 rc = Socket_setnonblocking(newSd);
304 if (rc == SOCKET_ERROR)
305 Log(LOG_ERROR, -1, "addSocket: setnonblocking");
306 }
307 }
308 else
309 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
310 {
311 #endif
312 Log(LOG_ERROR, -1, "addSocket: socket %d already in the list", newSd);
313 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
314 rc = 1;
315 }
316 #endif
317
318 exit:
319 FUNC_EXIT_RC(rc);
320 return rc;
321 }
322 #else
cmpfds(const void * p1,const void * p2)323 static int cmpfds(const void *p1, const void *p2)
324 {
325 SOCKET key1 = ((struct pollfd*)p1)->fd;
326 SOCKET key2 = ((struct pollfd*)p2)->fd;
327
328 return (key1 == key2) ? 0 : ((key1 < key2) ? -1 : 1);
329 }
330
331
cmpsockfds(const void * p1,const void * p2)332 static int cmpsockfds(const void *p1, const void *p2)
333 {
334 int key1 = *(int*)p1;
335 SOCKET key2 = ((struct pollfd*)p2)->fd;
336
337 return (key1 == key2) ? 0 : ((key1 < key2) ? -1 : 1);
338 }
339
340
341 /**
342 * Add a socket to the list of socket to check with select
343 * @param newSd the new socket to add
344 */
Socket_addSocket(SOCKET newSd)345 int Socket_addSocket(SOCKET newSd)
346 {
347 int rc = 0;
348
349 FUNC_ENTRY;
350 Thread_lock_mutex(socket_mutex);
351 mod_s.nfds++;
352 if (mod_s.fds_read)
353 mod_s.fds_read = realloc(mod_s.fds_read, mod_s.nfds * sizeof(mod_s.fds_read[0]));
354 else
355 mod_s.fds_read = malloc(mod_s.nfds * sizeof(mod_s.fds_read[0]));
356 if (!mod_s.fds_read)
357 {
358 rc = PAHO_MEMORY_ERROR;
359 goto exit;
360 }
361 if (mod_s.fds_write)
362 mod_s.fds_write = realloc(mod_s.fds_write, mod_s.nfds * sizeof(mod_s.fds_write[0]));
363 else
364 mod_s.fds_write = malloc(mod_s.nfds * sizeof(mod_s.fds_write[0]));
365 if (!mod_s.fds_read)
366 {
367 rc = PAHO_MEMORY_ERROR;
368 goto exit;
369 }
370
371 mod_s.fds_read[mod_s.nfds - 1].fd = newSd;
372 mod_s.fds_write[mod_s.nfds - 1].fd = newSd;
373 #if defined(_WIN32) || defined(_WIN64)
374 mod_s.fds_read[mod_s.nfds - 1].events = POLLIN;
375 mod_s.fds_write[mod_s.nfds - 1].events = POLLOUT;
376 #else
377 mod_s.fds_read[mod_s.nfds - 1].events = POLLIN | POLLNVAL;
378 mod_s.fds_write[mod_s.nfds - 1].events = POLLOUT;
379 #endif
380
381 /* sort the poll fds array by socket number */
382 qsort(mod_s.fds_read, (size_t)mod_s.nfds, sizeof(mod_s.fds_read[0]), cmpfds);
383 qsort(mod_s.fds_write, (size_t)mod_s.nfds, sizeof(mod_s.fds_write[0]), cmpfds);
384
385 rc = Socket_setnonblocking(newSd);
386 if (rc == SOCKET_ERROR)
387 Log(LOG_ERROR, -1, "addSocket: setnonblocking");
388
389 exit:
390 Thread_unlock_mutex(socket_mutex);
391 FUNC_EXIT_RC(rc);
392 return rc;
393 }
394 #endif
395
396
397 #if defined(USE_SELECT)
398 /**
399 * Don't accept work from a client unless it is accepting work back, i.e. its socket is writeable
400 * this seems like a reasonable form of flow control, and practically, seems to work.
401 * @param socket the socket to check
402 * @param read_set the socket read set (see select doc)
403 * @param write_set the socket write set (see select doc)
404 * @return boolean - is the socket ready to go?
405 */
isReady(int socket,fd_set * read_set,fd_set * write_set)406 int isReady(int socket, fd_set* read_set, fd_set* write_set)
407 {
408 int rc = 1;
409
410 FUNC_ENTRY;
411 if (ListFindItem(mod_s.connect_pending, &socket, intcompare) && FD_ISSET(socket, write_set))
412 ListRemoveItem(mod_s.connect_pending, &socket, intcompare);
413 else
414 rc = FD_ISSET(socket, read_set) && FD_ISSET(socket, write_set) && Socket_noPendingWrites(socket);
415 FUNC_EXIT_RC(rc);
416 return rc;
417 }
418 #else
419 /**
420 * Don't accept work from a client unless it is accepting work back, i.e. its socket is writeable
421 * this seems like a reasonable form of flow control, and practically, seems to work.
422 * @param index the socket index to check
423 * @return boolean - is the socket ready to go?
424 */
isReady(int index)425 int isReady(int index)
426 {
427 int rc = 1;
428 SOCKET* socket = &mod_s.saved.fds_write[index].fd;
429
430 FUNC_ENTRY;
431
432 if ((mod_s.saved.fds_read[index].revents & POLLHUP) || (mod_s.saved.fds_read[index].revents & POLLNVAL))
433 ; /* signal work to be done if there is an error on the socket */
434 else if (ListFindItem(mod_s.connect_pending, socket, intcompare) &&
435 (mod_s.saved.fds_write[index].revents & POLLOUT))
436 ListRemoveItem(mod_s.connect_pending, socket, intcompare);
437 else
438 rc = (mod_s.saved.fds_read[index].revents & POLLIN) &&
439 (mod_s.saved.fds_write[index].revents & POLLOUT) &&
440 Socket_noPendingWrites(*socket);
441
442 FUNC_EXIT_RC(rc);
443 return rc;
444 }
445 #endif
446
447 #if defined(USE_SELECT)
448 /**
449 * Returns the next socket ready for communications as indicated by select
450 * @param more_work flag to indicate more work is waiting, and thus a timeout value of 0 should
451 * be used for the select
452 * @param timeout the timeout to be used for the select, unless overridden
453 * @param rc a value other than 0 indicates an error of the returned socket
454 * @return the socket next ready, or 0 if none is ready
455 */
Socket_getReadySocket(int more_work,int timeout,mutex_type mutex,int * rc)456 SOCKET Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int* rc)
457 {
458 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
459 SOCKET sock = SOCKET_ERROR;
460 #else
461 SOCKET sock = 0;
462 #endif
463 *rc = 0;
464 int timeout_ms = 1000;
465
466 FUNC_ENTRY;
467 Thread_lock_mutex(mutex);
468 if (mod_s.clientsds->count == 0)
469 goto exit;
470
471 if (more_work)
472 timeout_ms = 0;
473 else if (timeout >= 0)
474 timeout_ms = timeout;
475
476 while (mod_s.cur_clientsds != NULL)
477 {
478 if (isReady(*((int*)(mod_s.cur_clientsds->content)), &(mod_s.rset), &wset))
479 break;
480 ListNextElement(mod_s.clientsds, &mod_s.cur_clientsds);
481 }
482
483 if (mod_s.cur_clientsds == NULL)
484 {
485 static struct timeval zero = {0L, 0L}; /* 0 seconds */
486 int rc1, maxfdp1_saved;
487 fd_set pwset;
488 struct timeval timeout_tv = {0L, 0L};
489
490 if (timeout_ms > 0L)
491 {
492 timeout_tv.tv_sec = timeout_ms / 1000;
493 timeout_tv.tv_usec = (timeout_ms % 1000) * 1000; /* this field is microseconds! */
494 }
495
496 memcpy((void*)&(mod_s.rset), (void*)&(mod_s.rset_saved), sizeof(mod_s.rset));
497 memcpy((void*)&(pwset), (void*)&(mod_s.pending_wset), sizeof(pwset));
498 maxfdp1_saved = mod_s.maxfdp1;
499
500 if (maxfdp1_saved == 0)
501 {
502 sock = 0;
503 goto exit; /* no work to do */
504 }
505 /* Prevent performance issue by unlocking the socket_mutex while waiting for a ready socket. */
506 Thread_unlock_mutex(mutex);
507 #if defined(IOT_CONNECT)
508 *rc = mqtt_select(maxfdp1_saved, &(mod_s.rset), &pwset, NULL, &timeout_tv);
509 #else
510 *rc = select(maxfdp1_saved, &(mod_s.rset), &pwset, NULL, &timeout_tv);
511 #endif
512 Thread_lock_mutex(mutex);
513 if (*rc == SOCKET_ERROR)
514 {
515 Socket_error("read select", 0);
516 goto exit;
517 }
518 Log(TRACE_MAX, -1, "Return code %d from read select", *rc);
519 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
520 rc1 = Socket_continueWrites(&pwset, &sock, mutex);
521 #if defined(SEND_MAX_LEN)
522 if (rc1 == SOCKET_ERROR || rc1 == EXT_SOCKET_RET_MESSAGE_TOO_LONG)
523 #else
524 if (rc1 == SOCKET_ERROR)
525 #endif
526 {
527 *rc = rc1;
528 goto exit;
529 }
530 #else
531 if (Socket_continueWrites(&pwset, &sock, mutex) == SOCKET_ERROR)
532 {
533 *rc = SOCKET_ERROR;
534 goto exit;
535 }
536 #endif
537
538 memcpy((void*)&wset, (void*)&(mod_s.rset_saved), sizeof(wset));
539 #if defined(IOT_CONNECT)
540 if ((rc1 = mqtt_select(mod_s.maxfdp1, NULL, &(wset), NULL, &zero)) == SOCKET_ERROR)
541 #else
542 if ((rc1 = select(mod_s.maxfdp1, NULL, &(wset), NULL, &zero)) == SOCKET_ERROR)
543 #endif
544 {
545 Socket_error("write select", 0);
546 *rc = rc1;
547 goto exit;
548 }
549 Log(TRACE_MAX, -1, "Return code %d from write select", rc1);
550
551 if (*rc == 0 && rc1 == 0)
552 {
553 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
554 sock = SOCKET_ERROR;
555 #else
556 sock = 0;
557 #endif
558 goto exit; /* no work to do */
559 }
560
561 mod_s.cur_clientsds = mod_s.clientsds->first;
562 while (mod_s.cur_clientsds != NULL)
563 {
564 int cursock = *((int*)(mod_s.cur_clientsds->content));
565 if (isReady(cursock, &(mod_s.rset), &wset))
566 break;
567 ListNextElement(mod_s.clientsds, &mod_s.cur_clientsds);
568 }
569 }
570
571 *rc = 0;
572 if (mod_s.cur_clientsds == NULL)
573 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
574 {
575 sock = SOCKET_ERROR;
576 }
577 #else
578 sock = 0;
579 #endif
580 else
581 {
582 sock = *((int*)(mod_s.cur_clientsds->content));
583 ListNextElement(mod_s.clientsds, &mod_s.cur_clientsds);
584 }
585 exit:
586 Thread_unlock_mutex(mutex);
587 FUNC_EXIT_RC(sock);
588 return sock;
589 } /* end getReadySocket */
590 #else
591 /**
592 * Returns the next socket ready for communications as indicated by select
593 * @param more_work flag to indicate more work is waiting, and thus a timeout value of 0 should
594 * be used for the select
595 * @param timeout the timeout to be used in ms
596 * @param rc a value other than 0 indicates an error of the returned socket
597 * @return the socket next ready, or 0 if none is ready
598 */
Socket_getReadySocket(int more_work,int timeout,mutex_type mutex,int * rc)599 SOCKET Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int* rc)
600 {
601 SOCKET sock = 0;
602 *rc = 0;
603 int timeout_ms = 1000;
604
605 FUNC_ENTRY;
606 Thread_lock_mutex(mutex);
607 if (mod_s.nfds == 0 && mod_s.saved.nfds == 0)
608 goto exit;
609
610 if (more_work)
611 timeout_ms = 0;
612 else if (timeout >= 0)
613 timeout_ms = timeout;
614
615 while (mod_s.saved.cur_fd != -1)
616 {
617 if (isReady(mod_s.saved.cur_fd))
618 break;
619 mod_s.saved.cur_fd = (mod_s.saved.cur_fd == mod_s.saved.nfds - 1) ? -1 : mod_s.saved.cur_fd + 1;
620 }
621
622 if (mod_s.saved.cur_fd == -1)
623 {
624 int rc1 = 0;
625
626 if (mod_s.nfds != mod_s.saved.nfds)
627 {
628 mod_s.saved.nfds = mod_s.nfds;
629 if (mod_s.saved.fds_read)
630 mod_s.saved.fds_read = realloc(mod_s.saved.fds_read, mod_s.nfds * sizeof(struct pollfd));
631 else
632 mod_s.saved.fds_read = malloc(mod_s.nfds * sizeof(struct pollfd));
633 if (mod_s.saved.fds_write)
634 mod_s.saved.fds_write = realloc(mod_s.saved.fds_write, mod_s.nfds * sizeof(struct pollfd));
635 else
636 mod_s.saved.fds_write = malloc(mod_s.nfds * sizeof(struct pollfd));
637 }
638 memcpy(mod_s.saved.fds_read, mod_s.fds_read, mod_s.nfds * sizeof(struct pollfd));
639 memcpy(mod_s.saved.fds_write, mod_s.fds_write, mod_s.nfds * sizeof(struct pollfd));
640
641 if (mod_s.saved.nfds == 0)
642 {
643 sock = 0;
644 goto exit; /* no work to do */
645 }
646
647 /* Check pending write set for writeable sockets */
648 rc1 = poll(mod_s.saved.fds_write, mod_s.saved.nfds, 0);
649 if (rc1 > 0 && Socket_continueWrites(&sock, mutex) == SOCKET_ERROR)
650 {
651 *rc = SOCKET_ERROR;
652 goto exit;
653 }
654
655 /* Prevent performance issue by unlocking the socket_mutex while waiting for a ready socket. */
656 Thread_unlock_mutex(mutex);
657 *rc = poll(mod_s.saved.fds_read, mod_s.saved.nfds, timeout_ms);
658 Thread_lock_mutex(mutex);
659 if (*rc == SOCKET_ERROR)
660 {
661 Socket_error("poll", 0);
662 goto exit;
663 }
664 Log(TRACE_MAX, -1, "Return code %d from poll", *rc);
665
666 if (rc1 == 0 && *rc == 0)
667 {
668 sock = 0;
669 goto exit; /* no work to do */
670 }
671
672 mod_s.saved.cur_fd = 0;
673 while (mod_s.saved.cur_fd != -1)
674 {
675 if (isReady(mod_s.saved.cur_fd))
676 break;
677 mod_s.saved.cur_fd = (mod_s.saved.cur_fd == mod_s.saved.nfds - 1) ? -1 : mod_s.saved.cur_fd + 1;
678 }
679 }
680
681 *rc = 0;
682 if (mod_s.saved.cur_fd == -1)
683 sock = 0;
684 else
685 {
686 sock = mod_s.saved.fds_read[mod_s.saved.cur_fd].fd;
687 mod_s.saved.cur_fd = (mod_s.saved.cur_fd == mod_s.saved.nfds - 1) ? -1 : mod_s.saved.cur_fd + 1;
688 }
689 exit:
690 Thread_unlock_mutex(mutex);
691 FUNC_EXIT_RC(sock);
692 return sock;
693 } /* end getReadySocket */
694 #endif
695
696
697 /**
698 * Reads one byte from a socket
699 * @param socket the socket to read from
700 * @param c the character read, returned
701 * @return completion code
702 */
Socket_getch(SOCKET socket,char * c)703 int Socket_getch(SOCKET socket, char* c)
704 {
705 int rc = SOCKET_ERROR;
706
707 FUNC_ENTRY;
708 if ((rc = SocketBuffer_getQueuedChar(socket, c)) != SOCKETBUFFER_INTERRUPTED)
709 goto exit;
710 #if defined(IOT_CONNECT)
711 if ((rc = adapt_recv(socket, c, (size_t)1, 0)) == SOCKET_ERROR)
712 #elif defined(IOT_LITEOS_ADAPT)
713 if ((rc = lwip_recv(socket, c, (size_t)1, 0)) == SOCKET_ERROR)
714 #else
715 if ((rc = recv(socket, c, (size_t)1, 0)) == SOCKET_ERROR)
716 #endif
717 {
718 int err = Socket_error("recv - getch", socket);
719 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
720 if (err == EWOULDBLOCK)
721 #else
722 if (err == EWOULDBLOCK || err == EAGAIN)
723 #endif
724 {
725 rc = TCPSOCKET_INTERRUPTED;
726 SocketBuffer_interrupted(socket, 0);
727 }
728 }
729 else if (rc == 0)
730 rc = SOCKET_ERROR; /* The return value from recv is 0 when the peer has performed an orderly shutdown. */
731 else if (rc == 1)
732 {
733 SocketBuffer_queueChar(socket, *c);
734 rc = TCPSOCKET_COMPLETE;
735 }
736 exit:
737 FUNC_EXIT_RC(rc);
738 return rc;
739 }
740
741
742 /**
743 * Attempts to read a number of bytes from a socket, non-blocking. If a previous read did not
744 * finish, then retrieve that data.
745 * @param socket the socket to read from
746 * @param bytes the number of bytes to read
747 * @param actual_len the actual number of bytes read
748 * @return completion code
749 */
Socket_getdata(SOCKET socket,size_t bytes,size_t * actual_len,int * rc)750 char *Socket_getdata(SOCKET socket, size_t bytes, size_t* actual_len, int *rc)
751 {
752 char* buf;
753
754 FUNC_ENTRY;
755 if (bytes == 0)
756 {
757 buf = SocketBuffer_complete(socket);
758 goto exit;
759 }
760
761 buf = SocketBuffer_getQueuedData(socket, bytes, actual_len);
762
763 #if defined(IOT_CONNECT)
764 if ((*rc = adapt_recv(socket, buf + (*actual_len), (int)(bytes - (*actual_len)), 0)) == SOCKET_ERROR)
765 #elif defined(IOT_LITEOS_ADAPT)
766 if ((*rc = lwip_recv(socket, buf + (*actual_len), (int)(bytes - (*actual_len)), 0)) == SOCKET_ERROR)
767 #else
768 if ((*rc = recv(socket, buf + (*actual_len), (int)(bytes - (*actual_len)), 0)) == SOCKET_ERROR)
769 #endif
770 {
771 *rc = Socket_error("recv - getdata", socket);
772 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
773 if (*rc != EWOULDBLOCK)
774 #else
775 if (*rc != EAGAIN && *rc != EWOULDBLOCK)
776 #endif
777 {
778 buf = NULL;
779 goto exit;
780 }
781 }
782 else if (*rc == 0) /* rc 0 means the other end closed the socket, albeit "gracefully" */
783 {
784 buf = NULL;
785 goto exit;
786 }
787 else
788 *actual_len += *rc;
789
790 if (*actual_len == bytes)
791 SocketBuffer_complete(socket);
792 else /* we didn't read the whole packet */
793 {
794 SocketBuffer_interrupted(socket, *actual_len);
795 Log(TRACE_MAX, -1, "%d bytes expected but %d bytes now received", (int)bytes, (int)*actual_len);
796 }
797 exit:
798 FUNC_EXIT;
799 return buf;
800 }
801
802
803 /**
804 * Indicate whether any data is pending outbound for a socket.
805 * @return boolean - true == no pending data.
806 */
Socket_noPendingWrites(SOCKET socket)807 int Socket_noPendingWrites(SOCKET socket)
808 {
809 SOCKET cursock = socket;
810 return ListFindItem(mod_s.write_pending, &cursock, intcompare) == NULL;
811 }
812
813
814 /**
815 * Attempts to write a series of iovec buffers to a socket in *one* system call so that
816 * they are sent as one packet.
817 * @param socket the socket to write to
818 * @param iovecs an array of buffers to write
819 * @param count number of buffers in iovecs
820 * @param bytes number of bytes actually written returned
821 * @return completion code, especially TCPSOCKET_INTERRUPTED
822 */
Socket_writev(SOCKET socket,iobuf * iovecs,int count,unsigned long * bytes)823 int Socket_writev(SOCKET socket, iobuf* iovecs, int count, unsigned long* bytes)
824 {
825 int rc;
826
827 FUNC_ENTRY;
828 *bytes = 0L;
829 #if defined(_WIN32) || defined(_WIN64)
830 rc = WSASend(socket, iovecs, count, (LPDWORD)bytes, 0, NULL, NULL);
831 if (rc == SOCKET_ERROR)
832 {
833 int err = Socket_error("WSASend - putdatas", socket);
834 if (err == EWOULDBLOCK || err == EAGAIN)
835 rc = TCPSOCKET_INTERRUPTED;
836 }
837 #else
838 /*#define TCPSOCKET_INTERRUPTED_TESTING
839 This section forces the occasional return of TCPSOCKET_INTERRUPTED,
840 for testing purposes only!
841 */
842 #if defined(TCPSOCKET_INTERRUPTED_TESTING)
843 static int i = 0;
844 if (++i >= 10 && i < 21)
845 {
846 if (1)
847 {
848 printf("Deliberately simulating TCPSOCKET_INTERRUPTED\n");
849 rc = TCPSOCKET_INTERRUPTED; /* simulate a network wait */
850 }
851 else
852 {
853 printf("Deliberately simulating SOCKET_ERROR\n");
854 rc = SOCKET_ERROR;
855 }
856 /* should *bytes always be 0? */
857 if (i == 20)
858 {
859 printf("Shutdown socket\n");
860 shutdown(socket, SHUT_WR);
861 }
862 }
863 else
864 {
865 #endif
866 #if defined(IOT_CONNECT)
867 rc = adapt_writev(socket, iovecs, count);
868 #elif defined(IOT_LITEOS_ADAPT)
869 rc = lwip_writev(socket, iovecs, count);
870 #else
871 rc = writev(socket, iovecs, count);
872 #endif
873 #if defined(SEND_MAX_LEN)
874 if (rc == SOCKET_ERROR || rc == EXT_SOCKET_RET_MESSAGE_TOO_LONG)
875 #else
876 if (rc == SOCKET_ERROR)
877 #endif
878 {
879 int err = Socket_error("writev - putdatas", socket);
880 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
881 if (err == EWOULDBLOCK)
882 #else
883 if (err == EWOULDBLOCK || err == EAGAIN)
884 #endif
885 rc = TCPSOCKET_INTERRUPTED;
886 }
887 else
888 *bytes = rc;
889 #if defined(TCPSOCKET_INTERRUPTED_TESTING)
890 }
891 #endif
892 #endif
893 FUNC_EXIT_RC(rc);
894 return rc;
895 }
896
897
898 /**
899 * Attempts to write a series of buffers to a socket in *one* system call so that they are
900 * sent as one packet.
901 * @param socket the socket to write to
902 * @param buf0 the first buffer
903 * @param buf0len the length of data in the first buffer
904 * @param count number of buffers
905 * @param buffers an array of buffers to write
906 * @param buflens an array of corresponding buffer lengths
907 * @return completion code, especially TCPSOCKET_INTERRUPTED
908 */
Socket_putdatas(SOCKET socket,char * buf0,size_t buf0len,PacketBuffers bufs)909 int Socket_putdatas(SOCKET socket, char* buf0, size_t buf0len, PacketBuffers bufs)
910 {
911 unsigned long bytes = 0L;
912 iobuf iovecs[5];
913 int frees1[5];
914 int rc = TCPSOCKET_INTERRUPTED, i;
915 size_t total = buf0len;
916
917 FUNC_ENTRY;
918 if (!Socket_noPendingWrites(socket))
919 {
920 Log(LOG_SEVERE, -1, "Trying to write to socket %d for which there is already pending output", socket);
921 rc = SOCKET_ERROR;
922 goto exit;
923 }
924
925 for (i = 0; i < bufs.count; i++)
926 total += bufs.buflens[i];
927
928 iovecs[0].iov_base = buf0;
929 iovecs[0].iov_len = (ULONG)buf0len;
930 frees1[0] = 1; /* this buffer should be freed by SocketBuffer if the write is interrupted */
931 for (i = 0; i < bufs.count; i++)
932 {
933 iovecs[i+1].iov_base = bufs.buffers[i];
934 iovecs[i+1].iov_len = (ULONG)bufs.buflens[i];
935 frees1[i+1] = bufs.frees[i];
936 }
937
938 #if defined(IOT_CONNECT)
939 rc = Socket_writev(socket, iovecs, bufs.count+1, &bytes);
940 #if defined(SEND_MAX_LEN)
941 if (rc != SOCKET_ERROR && rc != EXT_SOCKET_RET_MESSAGE_TOO_LONG && rc != EXT_SOCKET_RET_SLIDING_WINDOW_FULL)
942 #else
943 if (rc != SOCKET_ERROR && rc != EXT_SOCKET_RET_SLIDING_WINDOW_FULL)
944 #endif
945 #else
946 if ((rc = Socket_writev(socket, iovecs, bufs.count+1, &bytes)) != SOCKET_ERROR)
947 #endif
948 {
949 if (bytes == total)
950 rc = TCPSOCKET_COMPLETE;
951 else
952 {
953 SOCKET* sockmem = (SOCKET*)malloc(sizeof(SOCKET));
954
955 if (!sockmem)
956 {
957 rc = PAHO_MEMORY_ERROR;
958 goto exit;
959 }
960 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
961 Log(TRACE_MIN, -1, "Partial write: %lu bytes of %lu actually written on socket %d",
962 bytes, (unsigned long)total, socket);
963 #else
964 Log(TRACE_MIN, -1, "Partial write: %lu bytes of %lu actually written on socket %d",
965 bytes, total, socket);
966 #endif
967 #if defined(OPENSSL) || defined(MBEDTLS)
968 SocketBuffer_pendingWrite(socket, NULL, bufs.count+1, iovecs, frees1, total, bytes);
969 #else
970 SocketBuffer_pendingWrite(socket, bufs.count+1, iovecs, frees1, total, bytes);
971 #endif
972 *sockmem = socket;
973 if (!ListAppend(mod_s.write_pending, sockmem, sizeof(int)))
974 {
975 free(sockmem);
976 rc = PAHO_MEMORY_ERROR;
977 goto exit;
978 }
979 #if defined(USE_SELECT)
980 FD_SET(socket, &(mod_s.pending_wset));
981 #endif
982 rc = TCPSOCKET_INTERRUPTED;
983 }
984 }
985 exit:
986 FUNC_EXIT_RC(rc);
987 return rc;
988 }
989
990
991 /**
992 * Add a socket to the pending write list, so that it is checked for writing in select. This is used
993 * in connect processing when the TCP connect is incomplete, as we need to check the socket for both
994 * ready to read and write states.
995 * @param socket the socket to add
996 */
Socket_addPendingWrite(SOCKET socket)997 void Socket_addPendingWrite(SOCKET socket)
998 {
999 #if defined(USE_SELECT)
1000 FD_SET(socket, &(mod_s.pending_wset));
1001 #endif
1002 }
1003
1004
1005 /**
1006 * Clear a socket from the pending write list - if one was added with Socket_addPendingWrite
1007 * @param socket the socket to remove
1008 */
Socket_clearPendingWrite(SOCKET socket)1009 void Socket_clearPendingWrite(SOCKET socket)
1010 {
1011 #if defined(USE_SELECT)
1012 if (FD_ISSET(socket, &(mod_s.pending_wset)))
1013 FD_CLR(socket, &(mod_s.pending_wset));
1014 #endif
1015 }
1016
1017
1018 /**
1019 * Close a socket without removing it from the select list.
1020 * @param socket the socket to close
1021 * @return completion code
1022 */
Socket_close_only(SOCKET socket)1023 int Socket_close_only(SOCKET socket)
1024 {
1025 int rc;
1026
1027 FUNC_ENTRY;
1028 #if defined(_WIN32) || defined(_WIN64)
1029 if (shutdown(socket, SD_BOTH) == SOCKET_ERROR)
1030 Socket_error("shutdown", socket);
1031 if ((rc = closesocket(socket)) == SOCKET_ERROR)
1032 Socket_error("close", socket);
1033 #else
1034 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
1035 if (shutdown(socket, SHUT_WR) == SOCKET_ERROR)
1036 Socket_error("shutdown", socket);
1037 #endif
1038 #if defined(IOT_CONNECT)
1039 if ((rc = adapt_recv(socket, NULL, (size_t)0, 0)) == SOCKET_ERROR)
1040 #elif defined(IOT_LITEOS_ADAPT)
1041 if ((rc = lwip_recv(socket, NULL, (size_t)0, 0)) == SOCKET_ERROR)
1042 #else
1043 if ((rc = recv(socket, NULL, (size_t)0, 0)) == SOCKET_ERROR)
1044 #endif
1045 Socket_error("shutdown", socket);
1046 #if defined(IOT_CONNECT)
1047 if ((rc = closesocket(socket, SOCK_CLOSE_FORCE_MODE)) == SOCKET_ERROR)
1048 #elif defined(IOT_LITEOS_ADAPT)
1049 if ((rc = lwip_close(socket)) == SOCKET_ERROR)
1050 #else
1051 if ((rc = close(socket)) == SOCKET_ERROR)
1052 #endif
1053 Socket_error("close", socket);
1054 #endif
1055 FUNC_EXIT_RC(rc);
1056 return rc;
1057 }
1058
1059 #if defined(USE_SELECT)
1060 /**
1061 * Close a socket and remove it from the select list.
1062 * @param socket the socket to close
1063 * @return completion code
1064 */
Socket_close(SOCKET socket)1065 int Socket_close(SOCKET socket)
1066 {
1067 int rc = 0;
1068
1069 FUNC_ENTRY;
1070 Socket_close_only(socket);
1071 FD_CLR(socket, &(mod_s.rset_saved));
1072 if (FD_ISSET(socket, &(mod_s.pending_wset)))
1073 FD_CLR(socket, &(mod_s.pending_wset));
1074 if (mod_s.cur_clientsds != NULL && *(int*)(mod_s.cur_clientsds->content) == socket)
1075 mod_s.cur_clientsds = mod_s.cur_clientsds->next;
1076 Socket_abortWrite(socket);
1077 SocketBuffer_cleanup(socket);
1078 ListRemoveItem(mod_s.connect_pending, &socket, intcompare);
1079 ListRemoveItem(mod_s.write_pending, &socket, intcompare);
1080
1081 if (ListRemoveItem(mod_s.clientsds, &socket, intcompare))
1082 Log(TRACE_MIN, -1, "Removed socket %d", socket);
1083 else
1084 {
1085 Log(LOG_ERROR, -1, "Failed to remove socket %d", socket);
1086 rc = SOCKET_ERROR;
1087 goto exit;
1088 }
1089 if (socket + 1 >= mod_s.maxfdp1)
1090 {
1091 /* now we have to reset mod_s.maxfdp1 */
1092 ListElement* cur_clientsds = NULL;
1093
1094 mod_s.maxfdp1 = 0;
1095 while (ListNextElement(mod_s.clientsds, &cur_clientsds))
1096 mod_s.maxfdp1 = max(*((int*)(cur_clientsds->content)), mod_s.maxfdp1);
1097 ++(mod_s.maxfdp1);
1098 Log(TRACE_MAX, -1, "Reset max fdp1 to %d", mod_s.maxfdp1);
1099 }
1100 exit:
1101 FUNC_EXIT_RC(rc);
1102 return rc;
1103 }
1104 #else
1105 /**
1106 * Close a socket and remove it from the select list.
1107 * @param socket the socket to close
1108 * @return completion code
1109 */
Socket_close(SOCKET socket)1110 int Socket_close(SOCKET socket)
1111 {
1112 struct pollfd* fd;
1113 int rc = 0;
1114
1115 FUNC_ENTRY;
1116 Socket_close_only(socket);
1117 Socket_abortWrite(socket);
1118 SocketBuffer_cleanup(socket);
1119 ListRemoveItem(mod_s.connect_pending, &socket, intcompare);
1120 ListRemoveItem(mod_s.write_pending, &socket, intcompare);
1121
1122 if (mod_s.nfds == 0)
1123 goto exit;
1124
1125 fd = bsearch(&socket, mod_s.fds_read, (size_t)mod_s.nfds, sizeof(mod_s.fds_read[0]), cmpsockfds);
1126 if (fd)
1127 {
1128 struct pollfd* last_fd = &mod_s.fds_read[mod_s.nfds - 1];
1129
1130 if (--mod_s.nfds == 0)
1131 {
1132 free(mod_s.fds_read);
1133 mod_s.fds_read = NULL;
1134 }
1135 else
1136 {
1137 if (fd != last_fd)
1138 {
1139 /* shift array to remove the socket in question */
1140 memmove(fd, fd + 1, (mod_s.nfds - (fd - mod_s.fds_read)) * sizeof(mod_s.fds_read[0]));
1141 }
1142 mod_s.fds_read = realloc(mod_s.fds_read, sizeof(mod_s.fds_read[0]) * mod_s.nfds);
1143 if (mod_s.fds_read == NULL)
1144 {
1145 rc = PAHO_MEMORY_ERROR;
1146 goto exit;
1147 }
1148 }
1149 Log(TRACE_MIN, -1, "Removed socket %d", socket);
1150 }
1151 else
1152 Log(LOG_ERROR, -1, "Failed to remove socket %d", socket);
1153
1154 fd = bsearch(&socket, mod_s.fds_write, (size_t)(mod_s.nfds+1), sizeof(mod_s.fds_write[0]), cmpsockfds);
1155 if (fd)
1156 {
1157 struct pollfd* last_fd = &mod_s.fds_write[mod_s.nfds];
1158
1159 if (mod_s.nfds == 0)
1160 {
1161 free(mod_s.fds_write);
1162 mod_s.fds_write = NULL;
1163 }
1164 else
1165 {
1166 if (fd != last_fd)
1167 {
1168 /* shift array to remove the socket in question */
1169 memmove(fd, fd + 1, (mod_s.nfds - (fd - mod_s.fds_write)) * sizeof(mod_s.fds_write[0]));
1170 }
1171 mod_s.fds_write = realloc(mod_s.fds_write, sizeof(mod_s.fds_write[0]) * mod_s.nfds);
1172 if (mod_s.fds_write == NULL)
1173 {
1174 rc = PAHO_MEMORY_ERROR;
1175 goto exit;
1176 }
1177 }
1178 Log(TRACE_MIN, -1, "Removed socket %d", socket);
1179 }
1180 else
1181 Log(LOG_ERROR, -1, "Failed to remove socket %d", socket);
1182 exit:
1183 FUNC_EXIT_RC(rc);
1184 return rc;
1185 }
1186 #endif
1187
1188 /**
1189 * Create a new socket and TCP connect to an address/port
1190 * @param addr the address string
1191 * @param port the TCP port
1192 * @param sock returns the new socket
1193 * @param timeout the timeout in milliseconds
1194 * @return completion code 0=good, SOCKET_ERROR=fail
1195 */
1196 #if !defined(IOT_CONNECT)
1197 #if defined(__GNUC__) && defined(__linux__) && !defined(__LITEOS__)
Socket_new(const char * addr,size_t addr_len,int port,SOCKET * sock,long timeout)1198 int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock, long timeout)
1199 #else
1200 int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock)
1201 #endif
1202 {
1203 int type = SOCK_STREAM;
1204 char *addr_mem;
1205 struct sockaddr_in address;
1206 #if defined(AF_INET6)
1207 struct sockaddr_in6 address6;
1208 #endif
1209 int rc = SOCKET_ERROR;
1210 #if defined(_WIN32) || defined(_WIN64)
1211 short family;
1212 #else
1213 sa_family_t family = AF_INET;
1214 #endif
1215 struct addrinfo *result = NULL;
1216 struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
1217
1218 FUNC_ENTRY;
1219 *sock = SOCKET_ERROR;
1220 memset(&address6, '\0', sizeof(address6));
1221
1222 if (addr[0] == '[')
1223 {
1224 ++addr;
1225 --addr_len;
1226 }
1227
1228 if ((addr_mem = malloc( addr_len + 1u )) == NULL)
1229 {
1230 rc = PAHO_MEMORY_ERROR;
1231 goto exit;
1232 }
1233 memcpy( addr_mem, addr, addr_len );
1234 addr_mem[addr_len] = '\0';
1235
1236 #if 0 /*defined(__GNUC__) && defined(__linux__)*/
1237 /* Commented out because the CI tests get intermittent ECONNABORTED return values
1238 * and I don't know why yet.
1239 */
1240 /* set getaddrinfo timeout if available */
1241 struct gaicb ar = {addr_mem, NULL, &hints, NULL};
1242 struct gaicb *reqs[] = {&ar};
1243
1244 unsigned long int seconds = timeout / 1000L;
1245 unsigned long int nanos = (timeout - (seconds * 1000L)) * 1000000L;
1246 struct timespec timeoutspec = {seconds, nanos};
1247
1248 rc = getaddrinfo_a(GAI_NOWAIT, reqs, 1, NULL);
1249 if (rc == 0)
1250 rc = gai_suspend((const struct gaicb* const *) reqs, 1, &timeoutspec);
1251
1252 if (rc == 0)
1253 {
1254 rc = gai_error(reqs[0]);
1255 result = ar.ar_result;
1256 }
1257 #else
1258 rc = getaddrinfo(addr_mem, NULL, &hints, &result);
1259 #endif
1260
1261 if (rc == 0)
1262 {
1263 struct addrinfo* res = result;
1264
1265 while (res)
1266 { /* prefer ip4 addresses */
1267 if (res->ai_family == AF_INET || res->ai_next == NULL)
1268 break;
1269 res = res->ai_next;
1270 }
1271
1272 if (res == NULL)
1273 rc = SOCKET_ERROR;
1274 else
1275 #if defined(AF_INET6)
1276 if (res->ai_family == AF_INET6)
1277 {
1278 address6.sin6_port = htons(port);
1279 address6.sin6_family = family = AF_INET6;
1280 memcpy(&address6.sin6_addr, &((struct sockaddr_in6*)(res->ai_addr))->sin6_addr, sizeof(address6.sin6_addr));
1281 }
1282 else
1283 #endif
1284 if (res->ai_family == AF_INET)
1285 {
1286 memset(&address.sin_zero, 0, sizeof(address.sin_zero));
1287 address.sin_port = htons(port);
1288 address.sin_family = family = AF_INET;
1289 address.sin_addr = ((struct sockaddr_in*)(res->ai_addr))->sin_addr;
1290 }
1291 else
1292 rc = SOCKET_ERROR;
1293
1294 freeaddrinfo(result);
1295 }
1296 else
1297 {
1298 Log(LOG_ERROR, -1, "getaddrinfo failed for addr %s with rc %d", addr_mem, rc);
1299 rc = SOCKET_ERROR;
1300 }
1301
1302 if (rc != 0)
1303 Log(LOG_ERROR, -1, "%s is not a valid IP address", addr_mem);
1304 else
1305 {
1306 *sock = socket(family, type, 0);
1307 if (*sock == INVALID_SOCKET)
1308 rc = Socket_error("socket", *sock);
1309 else
1310 {
1311 #if defined(NOSIGPIPE)
1312 int opt = 1;
1313
1314 if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
1315 Log(LOG_ERROR, -1, "Could not set SO_NOSIGPIPE for socket %d", *sock);
1316 #endif
1317 /*#define SMALL_TCP_BUFFER_TESTING
1318 This section sets the TCP send buffer to a small amount to provoke TCPSOCKET_INTERRUPTED
1319 return codes from send, for testing only!
1320 */
1321 #if defined(SMALL_TCP_BUFFER_TESTING)
1322 if (1)
1323 {
1324 int optsend = 100; //2 * 1440;
1325 printf("Setting optsend to %d\n", optsend);
1326 if (setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (void*)&optsend, sizeof(optsend)) != 0)
1327 Log(LOG_ERROR, -1, "Could not set SO_SNDBUF for socket %d", *sock);
1328 }
1329 #endif
1330 Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port);
1331 if (Socket_addSocket(*sock) == SOCKET_ERROR)
1332 rc = Socket_error("addSocket", *sock);
1333 else
1334 {
1335 /* this could complete immediately, even though we are non-blocking */
1336 if (family == AF_INET)
1337 rc = connect(*sock, (struct sockaddr*)&address, sizeof(address));
1338 #if defined(AF_INET6)
1339 else
1340 rc = connect(*sock, (struct sockaddr*)&address6, sizeof(address6));
1341 #endif
1342 if (rc == SOCKET_ERROR)
1343 rc = Socket_error("connect", *sock);
1344 if (rc == EINPROGRESS || rc == EWOULDBLOCK)
1345 {
1346 SOCKET* pnewSd = (SOCKET*)malloc(sizeof(SOCKET));
1347 ListElement* result = NULL;
1348
1349 if (!pnewSd)
1350 {
1351 rc = PAHO_MEMORY_ERROR;
1352 goto exit;
1353 }
1354 *pnewSd = *sock;
1355 Thread_lock_mutex(socket_mutex);
1356 result = ListAppend(mod_s.connect_pending, pnewSd, sizeof(SOCKET));
1357 Thread_unlock_mutex(socket_mutex);
1358 if (!result)
1359 {
1360 free(pnewSd);
1361 rc = PAHO_MEMORY_ERROR;
1362 goto exit;
1363 }
1364 Log(TRACE_MIN, 15, "Connect pending");
1365 }
1366 }
1367 /* Prevent socket leak by closing unusable sockets,
1368 as reported in https://github.com/eclipse/paho.mqtt.c/issues/135 */
1369 if (rc != 0 && (rc != EINPROGRESS) && (rc != EWOULDBLOCK))
1370 {
1371 Thread_lock_mutex(socket_mutex);
1372 Socket_close(*sock); /* close socket and remove from our list of sockets */
1373 Thread_unlock_mutex(socket_mutex);
1374 *sock = SOCKET_ERROR; /* as initialized before */
1375 }
1376 }
1377 }
1378
1379 exit:
1380 if (addr_mem)
1381 free(addr_mem);
1382
1383 FUNC_EXIT_RC(rc);
1384 return rc;
1385 }
1386 #else /* IOT_CONNECT */
Socket_new(const char * addr,size_t addr_len,int port,SOCKET * sock)1387 int Socket_new(const char* addr, size_t addr_len, int port, SOCKET* sock)
1388 {
1389 int mem_ret = -1;
1390 int type = SOCK_STREAM;
1391 char *addr_mem = NULL;
1392 struct sockaddr_in address;
1393 int rc = SOCKET_ERROR;
1394 sa_family_t family = AF_INET;
1395
1396 FUNC_ENTRY;
1397 *sock = SOCKET_ERROR;
1398 mem_ret = memset_s(&address, sizeof(address), '\0', sizeof(address));
1399 if (mem_ret != 0)
1400 {
1401 rc = PAHO_MEMORY_ERROR;
1402 goto exit;
1403 }
1404
1405 if (addr[0] == '[')
1406 {
1407 ++addr;
1408 --addr_len;
1409 }
1410
1411 if ((addr_mem = malloc( addr_len + 1u )) == NULL)
1412 {
1413 rc = PAHO_MEMORY_ERROR;
1414 goto exit;
1415 }
1416 memcpy( addr_mem, addr, addr_len );
1417 addr_mem[addr_len] = '\0';
1418
1419 address.sin_family = family = AF_INET;
1420 if (ipaddr_aton(addr_mem, (ip_addr_t *)&address.sin_addr) == 1)
1421 rc = 0;
1422 dns_attempts_cfg_t dns_attempt;
1423 dns_attempt.max_attempts = RETRY_TIMES;
1424 dns_attempt.wait_timeout = RETRY_TIMEOUT_S;
1425 if (rc != 0 && atiny_dns_parse(addr_mem, &address.sin_addr, &dns_attempt) == 0)
1426 rc = 0;
1427 address.sin_port = port;
1428
1429 if (rc != 0)
1430 Log(LOG_ERROR, -1, "%s is not a valid IP address", addr_mem);
1431 else
1432 {
1433 if (address.sin_addr.type == IPADDR_TYPE_V6) {
1434 address.sin_family = family = AF_INET6;
1435 }
1436
1437 *sock = (int)socket(family, type, TCP);
1438 if (*sock < 0)
1439 rc = Socket_error("socket", *sock);
1440 else
1441 {
1442 Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port);
1443 rc = Socket_addSocket(*sock);
1444 if (rc < 0)
1445 {
1446 rc = Socket_error("addSocket", *sock);
1447 }
1448 else if (rc > 0)
1449 {
1450 Socket_close_only(*sock);
1451 *sock = -1;
1452 rc = SOCKET_ERROR;
1453 goto exit;
1454 }
1455 else
1456 {
1457 if (bind_callbacks(*sock, family) != 0)
1458 Log(TRACE_MIN, -1, "%s,%d, bind fail.", __func__, __LINE__);
1459 /* this could complete immediately, even though we are non-blocking */
1460 rc = connect(*sock, (struct sockaddr*)&address, sizeof(address));
1461 if (rc == 0)
1462 rc = wait_connected(*sock);
1463 else
1464 rc = SOCKET_ERROR;
1465 if (rc == SOCKET_ERROR)
1466 rc = Socket_error("connect", *sock);
1467 if (rc == EINPROGRESS || rc == EWOULDBLOCK)
1468 {
1469 SOCKET* pnewSd = (SOCKET*)malloc(sizeof(SOCKET));
1470 ListElement* result = NULL;
1471
1472 if (!pnewSd)
1473 {
1474 rc = PAHO_MEMORY_ERROR;
1475 goto exit;
1476 }
1477 *pnewSd = *sock;
1478 Thread_lock_mutex(socket_mutex);
1479 result = ListAppend(mod_s.connect_pending, pnewSd, sizeof(SOCKET));
1480 Thread_unlock_mutex(socket_mutex);
1481 if (!result)
1482 {
1483 free(pnewSd);
1484 rc = PAHO_MEMORY_ERROR;
1485 goto exit;
1486 }
1487 Log(TRACE_MIN, 15, "Connect pending");
1488 }
1489 }
1490 /* Prevent socket leak by closing unusable sockets,
1491 as reported in https://github.com/eclipse/paho.mqtt.c/issues/135 */
1492 if (rc != 0 && (rc != EINPROGRESS) && (rc != EWOULDBLOCK))
1493 {
1494 Thread_lock_mutex(socket_mutex);
1495 Socket_close(*sock); /* close socket and remove from our list of sockets */
1496 Thread_unlock_mutex(socket_mutex);
1497 *sock = SOCKET_ERROR; /* as initialized before */
1498 }
1499 }
1500 }
1501
1502 exit:
1503 if (addr_mem)
1504 free(addr_mem);
1505
1506 FUNC_EXIT_RC(rc);
1507 return rc;
1508 }
1509 #endif
1510
1511 static Socket_writeContinue* writecontinue = NULL;
1512
Socket_setWriteContinueCallback(Socket_writeContinue * mywritecontinue)1513 void Socket_setWriteContinueCallback(Socket_writeContinue* mywritecontinue)
1514 {
1515 writecontinue = mywritecontinue;
1516 }
1517
1518 static Socket_writeComplete* writecomplete = NULL;
1519
Socket_setWriteCompleteCallback(Socket_writeComplete * mywritecomplete)1520 void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete)
1521 {
1522 writecomplete = mywritecomplete;
1523 }
1524
1525 static Socket_writeAvailable* writeAvailable = NULL;
1526
Socket_setWriteAvailableCallback(Socket_writeAvailable * mywriteavailable)1527 void Socket_setWriteAvailableCallback(Socket_writeAvailable* mywriteavailable)
1528 {
1529 writeAvailable = mywriteavailable;
1530 }
1531
1532 /**
1533 * Continue an outstanding write for a particular socket
1534 * @param socket that socket
1535 * @return completion code: 0=incomplete, 1=complete, -1=socket error
1536 */
Socket_continueWrite(SOCKET socket)1537 int Socket_continueWrite(SOCKET socket)
1538 {
1539 int rc = 0;
1540 pending_writes* pw;
1541 unsigned long curbuflen = 0L, /* cumulative total of buffer lengths */
1542 bytes = 0L;
1543 int curbuf = -1, i;
1544 iobuf iovecs1[5];
1545
1546 FUNC_ENTRY;
1547 pw = SocketBuffer_getWrite(socket);
1548
1549 #if defined(OPENSSL) || defined(MBEDTLS)
1550 if (pw->ssl)
1551 {
1552 rc = SSLSocket_continueWrite(pw);
1553 goto exit;
1554 }
1555 #endif
1556
1557 for (i = 0; i < pw->count; ++i)
1558 {
1559 if (pw->bytes <= curbuflen)
1560 { /* if previously written length is less than the buffer we are currently looking at,
1561 add the whole buffer */
1562 iovecs1[++curbuf].iov_len = pw->iovecs[i].iov_len;
1563 iovecs1[curbuf].iov_base = pw->iovecs[i].iov_base;
1564 }
1565 else if (pw->bytes < curbuflen + pw->iovecs[i].iov_len)
1566 { /* if previously written length is in the middle of the buffer we are currently looking at,
1567 add some of the buffer */
1568 size_t offset = pw->bytes - curbuflen;
1569 iovecs1[++curbuf].iov_len = pw->iovecs[i].iov_len - (ULONG)offset;
1570 iovecs1[curbuf].iov_base = (char*)pw->iovecs[i].iov_base + offset;
1571 }
1572 curbuflen += pw->iovecs[i].iov_len;
1573 }
1574
1575 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1576 rc = Socket_writev(socket, iovecs1, curbuf+1, &bytes);
1577 #if defined(SEND_MAX_LEN)
1578 if (rc != SOCKET_ERROR && rc != EXT_SOCKET_RET_MESSAGE_TOO_LONG)
1579 #else
1580 if (rc != SOCKET_ERROR)
1581 #endif
1582 #else
1583 if ((rc = Socket_writev(socket, iovecs1, curbuf+1, &bytes)) != SOCKET_ERROR)
1584 #endif
1585 {
1586 pw->bytes += bytes;
1587 if ((rc = (pw->bytes == pw->total)))
1588 { /* topic and payload buffers are freed elsewhere, when all references to them have been removed */
1589 for (i = 0; i < pw->count; i++)
1590 {
1591 if (pw->frees[i])
1592 {
1593 free(pw->iovecs[i].iov_base);
1594 pw->iovecs[i].iov_base = NULL;
1595 }
1596 }
1597 rc = 1; /* signal complete */
1598 Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket);
1599 }
1600 else
1601 {
1602 rc = 0; /* signal not complete */
1603 Log(TRACE_MIN, -1, "ContinueWrite wrote +%lu bytes on socket %d", bytes, socket);
1604 }
1605 }
1606 else /* if we got SOCKET_ERROR we need to clean up anyway - a partial write is no good anymore */
1607 {
1608 for (i = 0; i < pw->count; i++)
1609 {
1610 if (pw->frees[i])
1611 {
1612 free(pw->iovecs[i].iov_base);
1613 pw->iovecs[i].iov_base = NULL;
1614 }
1615 }
1616 }
1617 #if defined(OPENSSL) || defined(MBEDTLS)
1618 exit:
1619 #endif
1620 FUNC_EXIT_RC(rc);
1621 return rc;
1622 }
1623
1624
1625
1626 /**
1627 * Continue an outstanding write for a particular socket
1628 * @param socket that socket
1629 * @return completion code: 0=incomplete, 1=complete, -1=socket error
1630 */
Socket_abortWrite(SOCKET socket)1631 int Socket_abortWrite(SOCKET socket)
1632 {
1633 int i = -1, rc = 0;
1634 pending_writes* pw;
1635
1636 FUNC_ENTRY;
1637 if ((pw = SocketBuffer_getWrite(socket)) == NULL)
1638 goto exit;
1639
1640 #if defined(OPENSSL) || defined(MBEDTLS)
1641 if (pw->ssl)
1642 {
1643 #if !defined(IOT_CONNECT) && !defined(IOT_LITEOS_ADAPT)
1644 rc = SSLSocket_abortWrite(pw);
1645 #endif
1646 goto exit;
1647 }
1648 #endif
1649
1650 for (i = 0; i < pw->count; i++)
1651 {
1652 if (pw->frees[i])
1653 {
1654 Log(TRACE_MIN, -1, "Cleaning in abortWrite for socket %d", socket);
1655 free(pw->iovecs[i].iov_base);
1656 }
1657 }
1658 exit:
1659 FUNC_EXIT_RC(rc);
1660 return rc;
1661 }
1662
1663
1664 #if defined(USE_SELECT)
1665 /**
1666 * Continue any outstanding writes for a socket set
1667 * @param pwset the set of sockets
1668 * @param sock in case of a socket error contains the affected socket
1669 * @return completion code, 0 or SOCKET_ERROR
1670 */
Socket_continueWrites(fd_set * pwset,SOCKET * sock,mutex_type mutex)1671 int Socket_continueWrites(fd_set* pwset, SOCKET* sock, mutex_type mutex)
1672 #else
1673 /**
1674 * Continue any outstanding socket writes
1675
1676 * @param sock in case of a socket error contains the affected socket
1677 * @return completion code, 0 or SOCKET_ERROR
1678 */
1679 int Socket_continueWrites(SOCKET* sock, mutex_type mutex)
1680 #endif
1681 {
1682 int rc1 = 0;
1683 ListElement* curpending = mod_s.write_pending->first;
1684
1685 FUNC_ENTRY;
1686 while (curpending && curpending->content)
1687 {
1688 int socket = *(int*)(curpending->content);
1689 int rc = 0;
1690 #if defined(USE_SELECT)
1691
1692 if (FD_ISSET(socket, pwset) && ((rc = Socket_continueWrite(socket)) != 0))
1693 #else
1694 struct pollfd* fd;
1695
1696 /* find the socket in the fds structure */
1697 fd = bsearch(&socket, mod_s.saved.fds_write, (size_t)mod_s.saved.nfds, sizeof(mod_s.saved.fds_write[0]), cmpsockfds);
1698
1699 if ((fd->revents & POLLOUT) && ((rc = Socket_continueWrite(socket)) != 0))
1700 #endif
1701 {
1702 if (!SocketBuffer_writeComplete(socket))
1703 Log(LOG_SEVERE, -1, "Failed to remove pending write from socket buffer list");
1704 #if defined(USE_SELECT)
1705 FD_CLR(socket, &(mod_s.pending_wset));
1706 #endif
1707 if (!ListRemove(mod_s.write_pending, curpending->content))
1708 {
1709 Log(LOG_SEVERE, -1, "Failed to remove pending write from list");
1710 ListNextElement(mod_s.write_pending, &curpending);
1711 }
1712 curpending = mod_s.write_pending->current;
1713
1714 if (writeAvailable && rc > 0)
1715 (*writeAvailable)(socket);
1716
1717 if (writecomplete)
1718 {
1719 Thread_unlock_mutex(mutex);
1720 (*writecomplete)(socket, rc);
1721 Thread_lock_mutex(mutex);
1722 }
1723 }
1724 else
1725 ListNextElement(mod_s.write_pending, &curpending);
1726
1727 if (writecontinue && rc == 0)
1728 (*writecontinue)(socket);
1729
1730 #if defined(SEND_MAX_LEN)
1731 if (rc == SOCKET_ERROR || rc == EXT_SOCKET_RET_MESSAGE_TOO_LONG)
1732 #else
1733 if(rc == SOCKET_ERROR)
1734 #endif
1735 {
1736 *sock = socket;
1737 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1738 rc1 = rc;
1739 #else
1740 rc1 = SOCKET_ERROR;
1741 #endif
1742 }
1743 }
1744 FUNC_EXIT_RC(rc1);
1745 return rc1;
1746 }
1747
1748
1749 /**
1750 * Convert a numeric address to character string
1751 * @param sa socket numerical address
1752 * @param sock socket
1753 * @return the peer information
1754 */
Socket_getaddrname(struct sockaddr * sa,SOCKET sock)1755 char* Socket_getaddrname(struct sockaddr* sa, SOCKET sock)
1756 {
1757 /**
1758 * maximum length of the address string
1759 */
1760 #define ADDRLEN INET6_ADDRSTRLEN+1
1761 /**
1762 * maximum length of the port string
1763 */
1764 #define PORTLEN 10
1765 static char addr_string[ADDRLEN + PORTLEN];
1766
1767 #if defined(_WIN32) || defined(_WIN64)
1768 int buflen = ADDRLEN*2;
1769 wchar_t buf[ADDRLEN*2];
1770 if (WSAAddressToStringW(sa, sizeof(struct sockaddr_in6), NULL, buf, (LPDWORD)&buflen) == SOCKET_ERROR)
1771 Socket_error("WSAAddressToString", sock);
1772 else
1773 wcstombs(addr_string, buf, sizeof(addr_string));
1774 /* TODO: append the port information - format: [00:00:00::]:port */
1775 /* strcpy(&addr_string[strlen(addr_string)], "what?"); */
1776 #else
1777 struct sockaddr_in *sin = (struct sockaddr_in *)sa;
1778 size_t buflen = sizeof(addr_string) - strlen(addr_string);
1779
1780 inet_ntop(sin->sin_family, &sin->sin_addr, addr_string, ADDRLEN);
1781 if (snprintf(&addr_string[strlen(addr_string)], buflen, ":%d", ntohs(sin->sin_port)) >= buflen)
1782 addr_string[sizeof(addr_string)-1] = '\0'; /* just in case of snprintf buffer filling */
1783 #endif
1784 return addr_string;
1785 }
1786
1787
1788 /**
1789 * Get information about the other end connected to a socket
1790 * @param sock the socket to inquire on
1791 * @return the peer information
1792 */
Socket_getpeer(SOCKET sock)1793 char* Socket_getpeer(SOCKET sock)
1794 {
1795 #if defined(IOT_CONNECT) || defined(IOT_LITEOS_ADAPT)
1796 return "unknown";
1797 #else
1798 struct sockaddr_in6 sa;
1799 socklen_t sal = sizeof(sa);
1800
1801 if (getpeername(sock, (struct sockaddr*)&sa, &sal) == SOCKET_ERROR)
1802 {
1803 Socket_error("getpeername", sock);
1804 return "unknown";
1805 }
1806
1807 return Socket_getaddrname((struct sockaddr*)&sa, sock);
1808 #endif
1809 }
1810
1811
1812 #if defined(Socket_TEST)
1813
main(int argc,char * argv[])1814 int main(int argc, char *argv[])
1815 {
1816 Socket_connect("127.0.0.1", 1883);
1817 Socket_connect("localhost", 1883);
1818 Socket_connect("loadsadsacalhost", 1883);
1819 }
1820
1821 #endif
1822