1 /******************************************************************************
2  *
3  *  Copyright 2009-2012 Broadcom Corporation
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 /*****************************************************************************
20  *
21  *  Filename:      uipc.cc
22  *
23  *  Description:   UIPC implementation for fluoride
24  *
25  *****************************************************************************/
26 
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <poll.h>
30 #include <signal.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <sys/mman.h>
35 #include <sys/prctl.h>
36 #include <sys/select.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39 #include <sys/un.h>
40 #include <unistd.h>
41 #include <mutex>
42 #include <set>
43 
44 // Define before including log.h
45 #define LOG_TAG "uipc"
46 
47 #include "audio_a2dp_hw/include/audio_a2dp_hw.h"
48 #include "osi/include/log.h"
49 #include "osi/include/osi.h"
50 #include "osi/include/socket_utils/sockets.h"
51 #include "uipc.h"
52 
53 /*****************************************************************************
54  *  Constants & Macros
55  *****************************************************************************/
56 
57 #define MAX(a, b) ((a) > (b) ? (a) : (b))
58 
59 #define CASE_RETURN_STR(const) \
60   case const:                  \
61     return #const;
62 
63 #define UIPC_DISCONNECTED (-1)
64 
65 #define SAFE_FD_ISSET(fd, set) (((fd) == -1) ? false : FD_ISSET((fd), (set)))
66 
67 #define UIPC_FLUSH_BUFFER_SIZE 1024
68 
69 /*****************************************************************************
70  *  Local type definitions
71  *****************************************************************************/
72 
73 typedef enum {
74   UIPC_TASK_FLAG_DISCONNECT_CHAN = 0x1,
75 } tUIPC_TASK_FLAGS;
76 
77 /*****************************************************************************
78  *  Static functions
79  *****************************************************************************/
80 static int uipc_close_ch_locked(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id);
81 
82 /*****************************************************************************
83  *  Externs
84  *****************************************************************************/
85 
86 /*****************************************************************************
87  *   Helper functions
88  *****************************************************************************/
89 
dump_uipc_event(tUIPC_EVENT event)90 const char* dump_uipc_event(tUIPC_EVENT event) {
91   switch (event) {
92     CASE_RETURN_STR(UIPC_OPEN_EVT)
93     CASE_RETURN_STR(UIPC_CLOSE_EVT)
94     CASE_RETURN_STR(UIPC_RX_DATA_EVT)
95     CASE_RETURN_STR(UIPC_RX_DATA_READY_EVT)
96     CASE_RETURN_STR(UIPC_TX_DATA_READY_EVT)
97     default:
98       return "UNKNOWN MSG ID";
99   }
100 }
101 
102 /*****************************************************************************
103  *   socket helper functions
104  ****************************************************************************/
105 
create_server_socket(const char * name)106 static inline int create_server_socket(const char* name) {
107   int s = socket(AF_LOCAL, SOCK_STREAM, 0);
108   if (s < 0) return -1;
109 
110   LOG_DEBUG("create_server_socket %s", name);
111 
112   if (osi_socket_local_server_bind(s, name,
113 #ifdef __ANDROID__
114                                    ANDROID_SOCKET_NAMESPACE_ABSTRACT
115 #else   // !__ANDROID__
116                                    ANDROID_SOCKET_NAMESPACE_FILESYSTEM
117 #endif  // __ANDROID__
118                                    ) < 0) {
119     LOG_DEBUG("socket failed to create (%s)", strerror(errno));
120     close(s);
121     return -1;
122   }
123 
124   if (listen(s, 5) < 0) {
125     LOG_DEBUG("listen failed: %s", strerror(errno));
126     close(s);
127     return -1;
128   }
129 
130   LOG_DEBUG("created socket fd %d", s);
131   return s;
132 }
133 
accept_server_socket(int sfd)134 static int accept_server_socket(int sfd) {
135   struct sockaddr_un remote;
136   struct pollfd pfd;
137   int fd;
138   socklen_t len = sizeof(struct sockaddr_un);
139 
140   LOG_DEBUG("accept fd %d", sfd);
141 
142   /* make sure there is data to process */
143   pfd.fd = sfd;
144   pfd.events = POLLIN;
145 
146   int poll_ret;
147   OSI_NO_INTR(poll_ret = poll(&pfd, 1, 0));
148   if (poll_ret == 0) {
149     LOG_WARN("accept poll timeout");
150     return -1;
151   }
152 
153   OSI_NO_INTR(fd = accept(sfd, (struct sockaddr*)&remote, &len));
154   if (fd == -1) {
155     LOG_ERROR("sock accept failed (%s)", strerror(errno));
156     return -1;
157   }
158 
159   // match socket buffer size option with client
160   const int size = AUDIO_STREAM_OUTPUT_BUFFER_SZ;
161   int ret =
162       setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&size, (int)sizeof(size));
163   if (ret < 0) {
164     LOG_ERROR("setsockopt failed (%s)", strerror(errno));
165   }
166 
167   return fd;
168 }
169 
170 /*****************************************************************************
171  *
172  *   uipc helper functions
173  *
174  ****************************************************************************/
175 
uipc_main_init(tUIPC_STATE & uipc)176 static int uipc_main_init(tUIPC_STATE& uipc) {
177   int i;
178 
179   LOG_DEBUG("### uipc_main_init ###");
180 
181   uipc.tid = 0;
182   uipc.running = 0;
183   memset(&uipc.active_set, 0, sizeof(uipc.active_set));
184   memset(&uipc.read_set, 0, sizeof(uipc.read_set));
185   uipc.max_fd = 0;
186   memset(&uipc.signal_fds, 0, sizeof(uipc.signal_fds));
187   memset(&uipc.ch, 0, sizeof(uipc.ch));
188 
189   /* setup interrupt socket pair */
190   if (socketpair(AF_UNIX, SOCK_STREAM, 0, uipc.signal_fds) < 0) {
191     return -1;
192   }
193 
194   FD_SET(uipc.signal_fds[0], &uipc.active_set);
195   uipc.max_fd = MAX(uipc.max_fd, uipc.signal_fds[0]);
196 
197   for (i = 0; i < UIPC_CH_NUM; i++) {
198     tUIPC_CHAN* p = &uipc.ch[i];
199     p->srvfd = UIPC_DISCONNECTED;
200     p->fd = UIPC_DISCONNECTED;
201     p->task_evt_flags = 0;
202     p->cback = NULL;
203   }
204 
205   return 0;
206 }
207 
uipc_main_cleanup(tUIPC_STATE & uipc)208 void uipc_main_cleanup(tUIPC_STATE& uipc) {
209   int i;
210 
211   LOG_DEBUG("uipc_main_cleanup");
212 
213   close(uipc.signal_fds[0]);
214   close(uipc.signal_fds[1]);
215 
216   /* close any open channels */
217   for (i = 0; i < UIPC_CH_NUM; i++) uipc_close_ch_locked(uipc, i);
218 }
219 
220 /* check pending events in read task */
uipc_check_task_flags_locked(tUIPC_STATE & uipc)221 static void uipc_check_task_flags_locked(tUIPC_STATE& uipc) {
222   int i;
223 
224   for (i = 0; i < UIPC_CH_NUM; i++) {
225     if (uipc.ch[i].task_evt_flags & UIPC_TASK_FLAG_DISCONNECT_CHAN) {
226       uipc.ch[i].task_evt_flags &= ~UIPC_TASK_FLAG_DISCONNECT_CHAN;
227       uipc_close_ch_locked(uipc, i);
228     }
229 
230     /* add here */
231   }
232 }
233 
uipc_check_fd_locked(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id)234 static int uipc_check_fd_locked(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id) {
235   if (ch_id >= UIPC_CH_NUM) return -1;
236 
237   if (SAFE_FD_ISSET(uipc.ch[ch_id].srvfd, &uipc.read_set)) {
238     LOG_DEBUG("INCOMING CONNECTION ON CH %d", ch_id);
239 
240     // Close the previous connection
241     if (uipc.ch[ch_id].fd != UIPC_DISCONNECTED) {
242       LOG_DEBUG("CLOSE CONNECTION (FD %d)", uipc.ch[ch_id].fd);
243       close(uipc.ch[ch_id].fd);
244       FD_CLR(uipc.ch[ch_id].fd, &uipc.active_set);
245       uipc.ch[ch_id].fd = UIPC_DISCONNECTED;
246     }
247 
248     uipc.ch[ch_id].fd = accept_server_socket(uipc.ch[ch_id].srvfd);
249 
250     LOG_DEBUG("NEW FD %d", uipc.ch[ch_id].fd);
251 
252     if ((uipc.ch[ch_id].fd >= 0) && uipc.ch[ch_id].cback) {
253       /*  if we have a callback we should add this fd to the active set
254           and notify user with callback event */
255       LOG_DEBUG("ADD FD %d TO ACTIVE SET", uipc.ch[ch_id].fd);
256       FD_SET(uipc.ch[ch_id].fd, &uipc.active_set);
257       uipc.max_fd = MAX(uipc.max_fd, uipc.ch[ch_id].fd);
258     }
259 
260     if (uipc.ch[ch_id].fd < 0) {
261       LOG_ERROR("FAILED TO ACCEPT CH %d", ch_id);
262       return -1;
263     }
264 
265     if (uipc.ch[ch_id].cback) uipc.ch[ch_id].cback(ch_id, UIPC_OPEN_EVT);
266   }
267 
268   if (SAFE_FD_ISSET(uipc.ch[ch_id].fd, &uipc.read_set)) {
269 
270     if (uipc.ch[ch_id].cback)
271       uipc.ch[ch_id].cback(ch_id, UIPC_RX_DATA_READY_EVT);
272   }
273   return 0;
274 }
275 
uipc_check_interrupt_locked(tUIPC_STATE & uipc)276 static void uipc_check_interrupt_locked(tUIPC_STATE& uipc) {
277   if (SAFE_FD_ISSET(uipc.signal_fds[0], &uipc.read_set)) {
278     char sig_recv = 0;
279     OSI_NO_INTR(
280         recv(uipc.signal_fds[0], &sig_recv, sizeof(sig_recv), MSG_WAITALL));
281   }
282 }
283 
uipc_wakeup_locked(tUIPC_STATE & uipc)284 static inline void uipc_wakeup_locked(tUIPC_STATE& uipc) {
285   char sig_on = 1;
286   LOG_DEBUG("UIPC SEND WAKE UP");
287 
288   OSI_NO_INTR(send(uipc.signal_fds[1], &sig_on, sizeof(sig_on), 0));
289 }
290 
uipc_setup_server_locked(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id,const char * name,tUIPC_RCV_CBACK * cback)291 static int uipc_setup_server_locked(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id,
292                                     const char* name, tUIPC_RCV_CBACK* cback) {
293   int fd;
294 
295   LOG_DEBUG("SETUP CHANNEL SERVER %d", ch_id);
296 
297   if (ch_id >= UIPC_CH_NUM) return -1;
298 
299   std::lock_guard<std::recursive_mutex> guard(uipc.mutex);
300 
301   fd = create_server_socket(name);
302 
303   if (fd < 0) {
304     LOG_ERROR("failed to setup %s: %s", name, strerror(errno));
305     return -1;
306   }
307 
308   LOG_DEBUG("ADD SERVER FD TO ACTIVE SET %d", fd);
309   FD_SET(fd, &uipc.active_set);
310   uipc.max_fd = MAX(uipc.max_fd, fd);
311 
312   uipc.ch[ch_id].srvfd = fd;
313   uipc.ch[ch_id].cback = cback;
314   uipc.ch[ch_id].read_poll_tmo_ms = DEFAULT_READ_POLL_TMO_MS;
315 
316   /* trigger main thread to update read set */
317   uipc_wakeup_locked(uipc);
318 
319   return 0;
320 }
321 
uipc_flush_ch_locked(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id)322 static void uipc_flush_ch_locked(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id) {
323   char buf[UIPC_FLUSH_BUFFER_SIZE];
324   struct pollfd pfd;
325 
326   pfd.events = POLLIN;
327   pfd.fd = uipc.ch[ch_id].fd;
328 
329   if (uipc.ch[ch_id].fd == UIPC_DISCONNECTED) {
330     LOG_DEBUG("%s() - fd disconnected. Exiting", __func__);
331     return;
332   }
333 
334   while (1) {
335     int ret;
336     OSI_NO_INTR(ret = poll(&pfd, 1, 1));
337     if (ret == 0) {
338       LOG_VERBOSE("%s(): poll() timeout - nothing to do. Exiting", __func__);
339       return;
340     }
341     if (ret < 0) {
342       LOG_WARN("%s() - poll() failed: return %d errno %d (%s). Exiting",
343                __func__, ret, errno, strerror(errno));
344       return;
345     }
346     LOG_VERBOSE("%s() - polling fd %d, revents: 0x%x, ret %d", __func__, pfd.fd,
347                 pfd.revents, ret);
348     if (pfd.revents & (POLLERR | POLLHUP)) {
349       LOG_WARN("%s() - POLLERR or POLLHUP. Exiting", __func__);
350       return;
351     }
352 
353     /* read sufficiently large buffer to ensure flush empties socket faster than
354        it is getting refilled */
355     (void)read(pfd.fd, &buf, UIPC_FLUSH_BUFFER_SIZE);
356   }
357 }
358 
uipc_flush_locked(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id)359 static void uipc_flush_locked(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id) {
360   if (ch_id >= UIPC_CH_NUM) return;
361 
362   switch (ch_id) {
363     case UIPC_CH_ID_AV_CTRL:
364       uipc_flush_ch_locked(uipc, UIPC_CH_ID_AV_CTRL);
365       break;
366 
367     case UIPC_CH_ID_AV_AUDIO:
368       uipc_flush_ch_locked(uipc, UIPC_CH_ID_AV_AUDIO);
369       break;
370   }
371 }
372 
uipc_close_ch_locked(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id)373 static int uipc_close_ch_locked(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id) {
374   int wakeup = 0;
375 
376   LOG_DEBUG("CLOSE CHANNEL %d", ch_id);
377 
378   if (ch_id >= UIPC_CH_NUM) return -1;
379 
380   if (uipc.ch[ch_id].srvfd != UIPC_DISCONNECTED) {
381     LOG_DEBUG("CLOSE SERVER (FD %d)", uipc.ch[ch_id].srvfd);
382     close(uipc.ch[ch_id].srvfd);
383     FD_CLR(uipc.ch[ch_id].srvfd, &uipc.active_set);
384     uipc.ch[ch_id].srvfd = UIPC_DISCONNECTED;
385     wakeup = 1;
386   }
387 
388   if (uipc.ch[ch_id].fd != UIPC_DISCONNECTED) {
389     LOG_DEBUG("CLOSE CONNECTION (FD %d)", uipc.ch[ch_id].fd);
390     close(uipc.ch[ch_id].fd);
391     FD_CLR(uipc.ch[ch_id].fd, &uipc.active_set);
392     uipc.ch[ch_id].fd = UIPC_DISCONNECTED;
393     wakeup = 1;
394   }
395 
396   /* notify this connection is closed */
397   if (uipc.ch[ch_id].cback) uipc.ch[ch_id].cback(ch_id, UIPC_CLOSE_EVT);
398 
399   /* trigger main thread update if something was updated */
400   if (wakeup) uipc_wakeup_locked(uipc);
401 
402   return 0;
403 }
404 
uipc_close_locked(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id)405 void uipc_close_locked(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id) {
406   if (uipc.ch[ch_id].srvfd == UIPC_DISCONNECTED) {
407     LOG_DEBUG("CHANNEL %d ALREADY CLOSED", ch_id);
408     return;
409   }
410 
411   /* schedule close on this channel */
412   uipc.ch[ch_id].task_evt_flags |= UIPC_TASK_FLAG_DISCONNECT_CHAN;
413   uipc_wakeup_locked(uipc);
414 }
415 
uipc_read_task(void * arg)416 static void* uipc_read_task(void* arg) {
417   tUIPC_STATE& uipc = *((tUIPC_STATE*)arg);
418   int ch_id;
419   int result;
420 
421   prctl(PR_SET_NAME, (unsigned long)"uipc-main", 0, 0, 0);
422 
423   while (uipc.running) {
424     uipc.read_set = uipc.active_set;
425 
426     result = select(uipc.max_fd + 1, &uipc.read_set, NULL, NULL, NULL);
427 
428     if (result == 0) {
429       LOG_DEBUG("select timeout");
430       continue;
431     }
432     if (result < 0) {
433       if (errno != EINTR) {
434         LOG_DEBUG("select failed %s", strerror(errno));
435       }
436       continue;
437     }
438 
439     {
440       std::lock_guard<std::recursive_mutex> guard(uipc.mutex);
441 
442       /* clear any wakeup interrupt */
443       uipc_check_interrupt_locked(uipc);
444 
445       /* check pending task events */
446       uipc_check_task_flags_locked(uipc);
447 
448       /* make sure we service audio channel first */
449       uipc_check_fd_locked(uipc, UIPC_CH_ID_AV_AUDIO);
450 
451       /* check for other connections */
452       for (ch_id = 0; ch_id < UIPC_CH_NUM; ch_id++) {
453         if (ch_id != UIPC_CH_ID_AV_AUDIO) uipc_check_fd_locked(uipc, ch_id);
454       }
455     }
456   }
457 
458   LOG_DEBUG("UIPC READ THREAD EXITING");
459 
460   uipc_main_cleanup(uipc);
461 
462   uipc.tid = 0;
463 
464   LOG_DEBUG("UIPC READ THREAD DONE");
465 
466   return nullptr;
467 }
468 
uipc_start_main_server_thread(tUIPC_STATE & uipc)469 int uipc_start_main_server_thread(tUIPC_STATE& uipc) {
470   uipc.running = 1;
471 
472   if (pthread_create(&uipc.tid, (const pthread_attr_t*)NULL, uipc_read_task,
473                      &uipc) != 0) {
474     LOG_ERROR("uipc_thread_create pthread_create failed:%d", errno);
475     return -1;
476   }
477 
478   return 0;
479 }
480 
481 /* blocking call */
uipc_stop_main_server_thread(tUIPC_STATE & uipc)482 void uipc_stop_main_server_thread(tUIPC_STATE& uipc) {
483   /* request shutdown of read thread */
484   {
485     std::lock_guard<std::recursive_mutex> lock(uipc.mutex);
486     uipc.running = 0;
487     uipc_wakeup_locked(uipc);
488   }
489 
490   /* wait until read thread is fully terminated */
491   /* tid might hold pointer value where it's value
492      is negative vaule with singed bit is set, so
493      corrected the logic to check zero or non zero */
494   if (uipc.tid) pthread_join(uipc.tid, NULL);
495 }
496 
497 /*******************************************************************************
498  **
499  ** Function         UIPC_Init
500  **
501  ** Description      Initialize UIPC module
502  **
503  ** Returns          void
504  **
505  ******************************************************************************/
UIPC_Init()506 std::unique_ptr<tUIPC_STATE> UIPC_Init() {
507   std::unique_ptr<tUIPC_STATE> uipc = std::make_unique<tUIPC_STATE>();
508   LOG_DEBUG("UIPC_Init");
509 
510   std::lock_guard<std::recursive_mutex> lock(uipc->mutex);
511 
512   uipc_main_init(*uipc);
513   uipc_start_main_server_thread(*uipc);
514 
515   return uipc;
516 }
517 
518 /*******************************************************************************
519  **
520  ** Function         UIPC_Open
521  **
522  ** Description      Open UIPC interface
523  **
524  ** Returns          true in case of success, false in case of failure.
525  **
526  ******************************************************************************/
UIPC_Open(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id,tUIPC_RCV_CBACK * p_cback,const char * socket_path)527 bool UIPC_Open(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id, tUIPC_RCV_CBACK* p_cback,
528                const char* socket_path) {
529   LOG_DEBUG("UIPC_Open : ch_id %d", ch_id);
530 
531   std::lock_guard<std::recursive_mutex> lock(uipc.mutex);
532 
533   if (ch_id >= UIPC_CH_NUM) {
534     return false;
535   }
536 
537   if (uipc.ch[ch_id].srvfd != UIPC_DISCONNECTED) {
538     LOG_DEBUG("CHANNEL %d ALREADY OPEN", ch_id);
539     return 0;
540   }
541 
542   uipc_setup_server_locked(uipc, ch_id, socket_path, p_cback);
543 
544   return true;
545 }
546 
547 /*******************************************************************************
548  **
549  ** Function         UIPC_Close
550  **
551  ** Description      Close UIPC interface
552  **
553  ** Returns          void
554  **
555  ******************************************************************************/
UIPC_Close(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id)556 void UIPC_Close(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id) {
557   LOG_DEBUG("UIPC_Close : ch_id %d", ch_id);
558 
559   /* special case handling uipc shutdown */
560   if (ch_id != UIPC_CH_ID_ALL) {
561     std::lock_guard<std::recursive_mutex> lock(uipc.mutex);
562     uipc_close_locked(uipc, ch_id);
563     return;
564   }
565 
566   LOG_DEBUG("UIPC_Close : waiting for shutdown to complete");
567   uipc_stop_main_server_thread(uipc);
568   LOG_DEBUG("UIPC_Close : shutdown complete");
569 }
570 
571 /*******************************************************************************
572  **
573  ** Function         UIPC_Send
574  **
575  ** Description      Called to transmit a message over UIPC.
576  **
577  ** Returns          true in case of success, false in case of failure.
578  **
579  ******************************************************************************/
UIPC_Send(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id,UNUSED_ATTR uint16_t msg_evt,const uint8_t * p_buf,uint16_t msglen)580 bool UIPC_Send(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id,
581                UNUSED_ATTR uint16_t msg_evt, const uint8_t* p_buf,
582                uint16_t msglen) {
583   LOG_DEBUG("UIPC_Send : ch_id:%d %d bytes", ch_id, msglen);
584 
585   std::lock_guard<std::recursive_mutex> lock(uipc.mutex);
586 
587   ssize_t ret;
588   OSI_NO_INTR(ret = write(uipc.ch[ch_id].fd, p_buf, msglen));
589   if (ret < 0) {
590     LOG_ERROR("failed to write (%s)", strerror(errno));
591     return false;
592   }
593 
594   return true;
595 }
596 
597 /*******************************************************************************
598  **
599  ** Function         UIPC_Read
600  **
601  ** Description      Called to read a message from UIPC.
602  **
603  ** Returns          return the number of bytes read.
604  **
605  ******************************************************************************/
606 
UIPC_Read(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id,uint8_t * p_buf,uint32_t len)607 uint32_t UIPC_Read(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id, uint8_t* p_buf,
608                    uint32_t len) {
609   if (ch_id >= UIPC_CH_NUM) {
610     LOG_ERROR("UIPC_Read : invalid ch id %d", ch_id);
611     return 0;
612   }
613 
614   int n_read = 0;
615   int fd = uipc.ch[ch_id].fd;
616   struct pollfd pfd;
617 
618   if (fd == UIPC_DISCONNECTED) {
619     LOG_ERROR("UIPC_Read : channel %d closed", ch_id);
620     return 0;
621   }
622 
623   while (n_read < (int)len) {
624     pfd.fd = fd;
625     pfd.events = POLLIN | POLLHUP;
626 
627     /* make sure there is data prior to attempting read to avoid blocking
628        a read for more than poll timeout */
629 
630     int poll_ret;
631     OSI_NO_INTR(poll_ret = poll(&pfd, 1, uipc.ch[ch_id].read_poll_tmo_ms));
632     if (poll_ret == 0) {
633       LOG_WARN("poll timeout (%d ms)", uipc.ch[ch_id].read_poll_tmo_ms);
634       break;
635     }
636     if (poll_ret < 0) {
637       LOG_ERROR("%s(): poll() failed: return %d errno %d (%s)", __func__,
638                 poll_ret, errno, strerror(errno));
639       break;
640     }
641 
642     if (pfd.revents & (POLLHUP | POLLNVAL)) {
643       LOG_WARN("poll : channel detached remotely");
644       std::lock_guard<std::recursive_mutex> lock(uipc.mutex);
645       uipc_close_locked(uipc, ch_id);
646       return 0;
647     }
648 
649     ssize_t n;
650     OSI_NO_INTR(n = recv(fd, p_buf + n_read, len - n_read, 0));
651 
652     if (n == 0) {
653       LOG_WARN("UIPC_Read : channel detached remotely");
654       std::lock_guard<std::recursive_mutex> lock(uipc.mutex);
655       uipc_close_locked(uipc, ch_id);
656       return 0;
657     }
658 
659     if (n < 0) {
660       LOG_WARN("UIPC_Read : read failed (%s)", strerror(errno));
661       return 0;
662     }
663 
664     n_read += n;
665   }
666 
667   return n_read;
668 }
669 
670 /*******************************************************************************
671  *
672  * Function         UIPC_Ioctl
673  *
674  * Description      Called to control UIPC.
675  *
676  * Returns          void
677  *
678  ******************************************************************************/
679 
UIPC_Ioctl(tUIPC_STATE & uipc,tUIPC_CH_ID ch_id,uint32_t request,void * param)680 bool UIPC_Ioctl(tUIPC_STATE& uipc, tUIPC_CH_ID ch_id, uint32_t request,
681                 void* param) {
682   LOG_DEBUG("#### UIPC_Ioctl : ch_id %d, request %d ####", ch_id, request);
683   std::lock_guard<std::recursive_mutex> lock(uipc.mutex);
684 
685   switch (request) {
686     case UIPC_REQ_RX_FLUSH:
687       uipc_flush_locked(uipc, ch_id);
688       break;
689 
690     case UIPC_REG_REMOVE_ACTIVE_READSET:
691       /* user will read data directly and not use select loop */
692       if (uipc.ch[ch_id].fd != UIPC_DISCONNECTED) {
693         /* remove this channel from active set */
694         FD_CLR(uipc.ch[ch_id].fd, &uipc.active_set);
695 
696         /* refresh active set */
697         uipc_wakeup_locked(uipc);
698       }
699       break;
700 
701     case UIPC_SET_READ_POLL_TMO:
702       uipc.ch[ch_id].read_poll_tmo_ms = (intptr_t)param;
703       LOG_DEBUG("UIPC_SET_READ_POLL_TMO : CH %d, TMO %d ms", ch_id,
704                 uipc.ch[ch_id].read_poll_tmo_ms);
705       break;
706 
707     default:
708       LOG_DEBUG("UIPC_Ioctl : request not handled (%d)", request);
709       break;
710   }
711 
712   return false;
713 }
714