• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv-common.h"
23 #include "uv_log.h"
24 #include "uv_trace.h"
25 
26 #if !defined(_WIN32)
27 # include "unix/internal.h"
28 #else
29 # include "win/internal.h"
30 #endif
31 
32 #include <stdlib.h>
33 #ifdef USE_FFRT
34 #include <assert.h>
35 #include "ffrt_inner.h"
36 #endif
37 #include <stdio.h>
38 #ifdef ASYNC_STACKTRACE
39 #include "dfx/async_stack/libuv_async_stack.h"
40 #endif
41 
42 #define MAX_THREADPOOL_SIZE 1024
43 #define UV_TRACE_NAME "UV_TRACE"
44 
45 static uv_rwlock_t g_closed_uv_loop_rwlock;
46 static uv_once_t once = UV_ONCE_INIT;
47 static uv_cond_t cond;
48 static uv_mutex_t mutex;
49 static unsigned int idle_threads;
50 static unsigned int nthreads;
51 static uv_thread_t* threads;
52 static uv_thread_t default_threads[4];
53 static struct uv__queue exit_message;
54 static struct uv__queue wq;
55 static struct uv__queue run_slow_work_message;
56 static struct uv__queue slow_io_pending_wq;
57 
58 
59 #ifdef UV_STATISTIC
60 #define MAX_DUMP_QUEUE_SIZE 200
61 static uv_mutex_t dump_queue_mutex;
62 static QUEUE dump_queue;
63 static unsigned int dump_queue_size;
64 
65 static int statistic_idle;
66 static uv_mutex_t statistic_mutex;
67 static QUEUE statistic_works;
68 static uv_cond_t dump_cond;
69 static uv_thread_t dump_thread;
70 
uv_dump_worker(void * arg)71 static void uv_dump_worker(void*  arg) {
72   struct uv__statistic_work* w;
73   struct uv__queue* q;
74   uv_sem_post((uv_sem_t*) arg);
75   arg = NULL;
76   uv_mutex_lock(&statistic_mutex);
77   for (;;) {
78     while (uv__queue_empty(&statistic_works)) {
79       statistic_idle = 1;
80       uv_cond_wait(&dump_cond, &statistic_mutex);
81       statistic_idle = 0;
82     }
83     q = uv__queue_head(&statistic_works);
84     if (q == &exit_message) {
85       uv_cond_signal(&dump_cond);
86       uv_mutex_unlock(&statistic_mutex);
87       break;
88     }
89     uv__queue_remove(q);
90     uv__queue_init(q);
91     uv_mutex_unlock(&statistic_mutex);
92     w = uv__queue_data(q, struct uv__statistic_work, wq);
93     w->work(w);
94     free(w);
95     uv_mutex_lock(&statistic_mutex);
96   }
97 }
98 
post_statistic_work(struct uv__queue * q)99 static void post_statistic_work(struct uv__queue* q) {
100   uv_mutex_lock(&statistic_mutex);
101   uv__queue_insert_tail(&statistic_works, q);
102   if (statistic_idle)
103     uv_cond_signal(&dump_cond);
104   uv_mutex_unlock(&statistic_mutex);
105 }
106 
uv__queue_work_info(struct uv__statistic_work * work)107 static void uv__queue_work_info(struct uv__statistic_work *work) {
108   uv_mutex_lock(&dump_queue_mutex);
109   if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */
110     struct uv__queue* q;
111     uv__queue_foreach(q, &dump_queue) {
112       struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
113       if (info->state == DONE_END) {
114         uv__queue_remove(q);
115         free(info);
116         dump_queue_size--;
117       }
118     }
119     if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) {
120       abort(); /* too many works not done. */
121     }
122   }
123 
124   uv__queue_insert_head(&dump_queue,  &work->info->wq);
125   dump_queue_size++;
126   uv_mutex_unlock(&dump_queue_mutex);
127 }
128 
uv__update_work_info(struct uv__statistic_work * work)129 static void uv__update_work_info(struct uv__statistic_work *work) {
130   uv_mutex_lock(&dump_queue_mutex);
131   if (work != NULL && work->info != NULL) {
132     work->info->state = work->state;
133     switch (work->state) {
134       case WAITING:
135         work->info->queue_time = work->time;
136         break;
137       case WORK_EXECUTING:
138         work->info->execute_start_time = work->time;
139         break;
140       case WORK_END:
141         work->info->execute_end_time = work->time;
142         break;
143       case DONE_EXECUTING:
144         work->info->done_start_time = work->time;
145         break;
146       case DONE_END:
147         work->info->done_end_time = work->time;
148         break;
149       default:
150         break;
151     }
152   }
153   uv_mutex_unlock(&dump_queue_mutex);
154 }
155 
156 
157 // return the timestamp in millisecond
uv__now_timestamp()158 static uint64_t uv__now_timestamp() {
159   uv_timeval64_t tv;
160   int r = uv_gettimeofday(&tv);
161   if (r != 0) {
162     return 0;
163   }
164   return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
165 }
166 
167 
uv__post_statistic_work(struct uv__work * w,enum uv_work_state state)168 static void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) {
169   struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
170   if (dump_work == NULL) {
171     return;
172   }
173   dump_work->info = w->info;
174   dump_work->work = uv__update_work_info;
175   dump_work->time = uv__now_timestamp();
176   dump_work->state = state;
177   uv__queue_init(&dump_work->wq);
178   post_statistic_work(&dump_work->wq);
179 }
180 
181 
init_work_dump_queue()182 static void init_work_dump_queue()
183 {
184   if (uv_mutex_init(&dump_queue_mutex))
185     abort();
186   uv_mutex_lock(&dump_queue_mutex);
187   uv__queue_init(&dump_queue);
188   dump_queue_size = 0;
189   uv_mutex_unlock(&dump_queue_mutex);
190 
191   /* init dump thread */
192   statistic_idle = 1;
193   if (uv_mutex_init(&statistic_mutex))
194     abort();
195   uv__queue_init(&statistic_works);
196   uv_sem_t sem;
197   if (uv_cond_init(&dump_cond))
198     abort();
199   if (uv_sem_init(&sem, 0))
200     abort();
201   if (uv_thread_create(&dump_thread, uv_dump_worker, &sem))
202       abort();
203   uv_sem_wait(&sem);
204   uv_sem_destroy(&sem);
205 }
206 
207 
uv_init_dump_info(struct uv_work_dump_info * info,struct uv__work * w)208 void uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) {
209   if (info == NULL)
210     return;
211   info->queue_time = 0;
212   info->state = WAITING;
213   info->execute_start_time = 0;
214   info->execute_end_time = 0;
215   info->done_start_time = 0;
216   info->done_end_time = 0;
217   info->work = w;
218   uv__queue_init(&info->wq);
219 }
220 
221 
uv_queue_statics(struct uv_work_dump_info * info)222 void uv_queue_statics(struct uv_work_dump_info* info) {
223   struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
224   if (dump_work == NULL) {
225     abort();
226   }
227   dump_work->info = info;
228   dump_work->work = uv__queue_work_info;
229   info->queue_time = uv__now_timestamp();
230   dump_work->state = WAITING;
231   uv__queue_init(&dump_work->wq);
232   post_statistic_work(&dump_work->wq);
233 }
234 
235 
uv_dump_work_queue(int * size)236 uv_worker_info_t* uv_dump_work_queue(int* size) {
237 #ifdef UV_STATISTIC
238   uv_mutex_lock(&dump_queue_mutex);
239   if (uv__queue_empty(&dump_queue)) {
240     return NULL;
241   }
242   *size = dump_queue_size;
243   uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size);
244   struct uv__queue* q;
245   int i = 0;
246   uv__queue_foreach(q, &dump_queue) {
247     struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
248     dump_info[i].queue_time = info->queue_time;
249     dump_info[i].builtin_return_address[0] = info->builtin_return_address[0];
250     dump_info[i].builtin_return_address[1] = info->builtin_return_address[1];
251     dump_info[i].builtin_return_address[2] = info->builtin_return_address[2];
252     switch (info->state) {
253       case WAITING:
254         strcpy(dump_info[i].state, "waiting");
255         break;
256       case WORK_EXECUTING:
257         strcpy(dump_info[i].state, "work_executing");
258         break;
259       case WORK_END:
260         strcpy(dump_info[i].state, "work_end");
261         break;
262       case DONE_EXECUTING:
263         strcpy(dump_info[i].state, "done_executing");
264         break;
265       case DONE_END:
266         strcpy(dump_info[i].state, "done_end");
267         break;
268       default:
269         break;
270     }
271     dump_info[i].execute_start_time = info->execute_start_time;
272     dump_info[i].execute_end_time = info->execute_end_time;
273     dump_info[i].done_start_time = info->done_start_time;
274     dump_info[i].done_end_time = info->done_end_time;
275     ++i;
276   }
277   uv_mutex_unlock(&dump_queue_mutex);
278   return dump_info;
279 #else
280   size = 0;
281   return NULL;
282 #endif
283 }
284 #endif
285 
286 
init_closed_uv_loop_rwlock_once(void)287 static void init_closed_uv_loop_rwlock_once(void) {
288   uv_rwlock_init(&g_closed_uv_loop_rwlock);
289 }
290 
291 
rdlock_closed_uv_loop_rwlock(void)292 void rdlock_closed_uv_loop_rwlock(void) {
293   uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
294 }
295 
296 
rdunlock_closed_uv_loop_rwlock(void)297 void rdunlock_closed_uv_loop_rwlock(void) {
298   uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
299 }
300 
301 
is_uv_loop_good_magic(const uv_loop_t * loop)302 int is_uv_loop_good_magic(const uv_loop_t* loop) {
303   if (loop->magic == UV_LOOP_MAGIC) {
304     return 1;
305   }
306   UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop, loop->magic);
307   return 0;
308 }
309 
310 
on_uv_loop_close(uv_loop_t * loop)311 void on_uv_loop_close(uv_loop_t* loop) {
312   time_t t1, t2;
313   time(&t1);
314   uv_start_trace(UV_TRACE_TAG, "Get Write Lock");
315   uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
316   uv_end_trace(UV_TRACE_TAG);
317   loop->magic = ~UV_LOOP_MAGIC;
318   uv_start_trace(UV_TRACE_TAG, "Release Write Lock");
319   uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
320   uv_end_trace(UV_TRACE_TAG);
321   time(&t2);
322   UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop, (ssize_t)(t2 - t1));
323 }
324 
325 
uv__cancelled(struct uv__work * w)326 static void uv__cancelled(struct uv__work* w) {
327   abort();
328 }
329 
330 
331 #ifndef USE_FFRT
332 static unsigned int slow_io_work_running;
333 
slow_work_thread_threshold(void)334 static unsigned int slow_work_thread_threshold(void) {
335   return (nthreads + 1) / 2;
336 }
337 
338 
339 /* To avoid deadlock with uv_cancel() it's crucial that the worker
340  * never holds the global mutex and the loop-local mutex at the same time.
341  */
worker(void * arg)342 static void worker(void* arg) {
343   struct uv__work* w;
344   struct uv__queue* q;
345   int is_slow_work;
346 
347   uv_sem_post((uv_sem_t*) arg);
348   arg = NULL;
349 
350   uv_mutex_lock(&mutex);
351   for (;;) {
352     /* `mutex` should always be locked at this point. */
353 
354     /* Keep waiting while either no work is present or only slow I/O
355        and we're at the threshold for that. */
356     while (uv__queue_empty(&wq) ||
357            (uv__queue_head(&wq) == &run_slow_work_message &&
358             uv__queue_next(&run_slow_work_message) == &wq &&
359             slow_io_work_running >= slow_work_thread_threshold())) {
360       idle_threads += 1;
361       uv_cond_wait(&cond, &mutex);
362       idle_threads -= 1;
363     }
364 
365     q = uv__queue_head(&wq);
366     if (q == &exit_message) {
367       uv_cond_signal(&cond);
368       uv_mutex_unlock(&mutex);
369       break;
370     }
371 
372     uv__queue_remove(q);
373     uv__queue_init(q);  /* Signal uv_cancel() that the work req is executing. */
374 
375     is_slow_work = 0;
376     if (q == &run_slow_work_message) {
377       /* If we're at the slow I/O threshold, re-schedule until after all
378          other work in the queue is done. */
379       if (slow_io_work_running >= slow_work_thread_threshold()) {
380         uv__queue_insert_tail(&wq, q);
381         continue;
382       }
383 
384       /* If we encountered a request to run slow I/O work but there is none
385          to run, that means it's cancelled => Start over. */
386       if (uv__queue_empty(&slow_io_pending_wq))
387         continue;
388 
389       is_slow_work = 1;
390       slow_io_work_running++;
391 
392       q = uv__queue_head(&slow_io_pending_wq);
393       uv__queue_remove(q);
394       uv__queue_init(q);
395 
396       /* If there is more slow I/O work, schedule it to be run as well. */
397       if (!uv__queue_empty(&slow_io_pending_wq)) {
398         uv__queue_insert_tail(&wq, &run_slow_work_message);
399         if (idle_threads > 0)
400           uv_cond_signal(&cond);
401       }
402     }
403 
404     uv_mutex_unlock(&mutex);
405 
406     w = uv__queue_data(q, struct uv__work, wq);
407 #ifdef UV_STATISTIC
408     uv__post_statistic_work(w, WORK_EXECUTING);
409 #endif
410 #ifdef ASYNC_STACKTRACE
411     uv_work_t* req = container_of(w, uv_work_t, work_req);
412     LibuvSetStackId((uint64_t)req->reserved[3]);
413 #endif
414     w->work(w);
415 #ifdef UV_STATISTIC
416     uv__post_statistic_work(w, WORK_END);
417 #endif
418     uv_mutex_lock(&w->loop->wq_mutex);
419     w->work = NULL;  /* Signal uv_cancel() that the work req is done
420                         executing. */
421     uv__queue_insert_tail(&w->loop->wq, &w->wq);
422     uv_async_send(&w->loop->wq_async);
423     uv_mutex_unlock(&w->loop->wq_mutex);
424 
425     /* Lock `mutex` since that is expected at the start of the next
426      * iteration. */
427     uv_mutex_lock(&mutex);
428     if (is_slow_work) {
429       /* `slow_io_work_running` is protected by `mutex`. */
430       slow_io_work_running--;
431     }
432   }
433 }
434 #endif
435 
436 
post(struct uv__queue * q,enum uv__work_kind kind)437 static void post(struct uv__queue* q, enum uv__work_kind kind) {
438   uv_mutex_lock(&mutex);
439   if (kind == UV__WORK_SLOW_IO) {
440     /* Insert into a separate queue. */
441     uv__queue_insert_tail(&slow_io_pending_wq, q);
442     if (!uv__queue_empty(&run_slow_work_message)) {
443       /* Running slow I/O tasks is already scheduled => Nothing to do here.
444          The worker that runs said other task will schedule this one as well. */
445       uv_mutex_unlock(&mutex);
446       return;
447     }
448     q = &run_slow_work_message;
449   }
450 
451   uv__queue_insert_tail(&wq, q);
452   if (idle_threads > 0)
453     uv_cond_signal(&cond);
454   uv_mutex_unlock(&mutex);
455 }
456 
457 
458 #ifdef __MVS__
459 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */
460 __attribute__((destructor))
461 #endif
uv__threadpool_cleanup(void)462 void uv__threadpool_cleanup(void) {
463   unsigned int i;
464 
465   if (nthreads == 0)
466     return;
467 
468 #ifndef __MVS__
469   /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
470   post(&exit_message, UV__WORK_CPU);
471 #endif
472 
473   for (i = 0; i < nthreads; i++)
474     if (uv_thread_join(threads + i))
475       abort();
476 
477   if (threads != default_threads)
478     uv__free(threads);
479 
480   uv_mutex_destroy(&mutex);
481   uv_cond_destroy(&cond);
482 
483   threads = NULL;
484   nthreads = 0;
485 #ifdef UV_STATISTIC
486   post_statistic_work(&exit_message);
487   uv_thread_join(dump_thread);
488   uv_mutex_destroy(&statistic_mutex);
489   uv_cond_destroy(&dump_cond);
490 #endif
491 }
492 
493 
494 #ifndef USE_FFRT
init_threads(void)495 static void init_threads(void) {
496   uv_thread_options_t config;
497   unsigned int i;
498   const char* val;
499   uv_sem_t sem;
500 
501   nthreads = ARRAY_SIZE(default_threads);
502   val = getenv("UV_THREADPOOL_SIZE");
503   if (val != NULL)
504     nthreads = atoi(val);
505   if (nthreads == 0)
506     nthreads = 1;
507   if (nthreads > MAX_THREADPOOL_SIZE)
508     nthreads = MAX_THREADPOOL_SIZE;
509 
510   threads = default_threads;
511   if (nthreads > ARRAY_SIZE(default_threads)) {
512     threads = uv__malloc(nthreads * sizeof(threads[0]));
513     if (threads == NULL) {
514       nthreads = ARRAY_SIZE(default_threads);
515       threads = default_threads;
516     }
517   }
518 
519   if (uv_cond_init(&cond))
520     abort();
521 
522   if (uv_mutex_init(&mutex))
523     abort();
524 
525   uv__queue_init(&wq);
526   uv__queue_init(&slow_io_pending_wq);
527   uv__queue_init(&run_slow_work_message);
528 
529   if (uv_sem_init(&sem, 0))
530     abort();
531 
532   config.flags = UV_THREAD_HAS_STACK_SIZE;
533   config.stack_size = 8u << 20;  /* 8 MB */
534 
535   for (i = 0; i < nthreads; i++)
536     if (uv_thread_create_ex(threads + i, &config, worker, &sem))
537       abort();
538 
539   for (i = 0; i < nthreads; i++)
540     uv_sem_wait(&sem);
541 
542   uv_sem_destroy(&sem);
543 }
544 
545 
546 #ifndef _WIN32
reset_once(void)547 static void reset_once(void) {
548   uv_once_t child_once = UV_ONCE_INIT;
549   memcpy(&once, &child_once, sizeof(child_once));
550 }
551 #endif
552 
553 
init_once(void)554 static void init_once(void) {
555 #ifndef _WIN32
556   /* Re-initialize the threadpool after fork.
557    * Note that this discards the global mutex and condition as well
558    * as the work queue.
559    */
560   if (pthread_atfork(NULL, NULL, &reset_once))
561     abort();
562 #endif
563   init_closed_uv_loop_rwlock_once();
564 #ifdef UV_STATISTIC
565   init_work_dump_queue();
566 #endif
567   init_threads();
568 }
569 
570 
uv__work_submit(uv_loop_t * loop,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))571 void uv__work_submit(uv_loop_t* loop,
572                      struct uv__work* w,
573                      enum uv__work_kind kind,
574                      void (*work)(struct uv__work* w),
575                      void (*done)(struct uv__work* w, int status)) {
576   uv_once(&once, init_once);
577   w->loop = loop;
578   w->work = work;
579   w->done = done;
580   post(&w->wq, kind);
581 }
582 #endif
583 
584 
585 #ifdef USE_FFRT
uv__task_done_wrapper(void * work,int status)586 static void uv__task_done_wrapper(void* work, int status) {
587   struct uv__work* w = (struct uv__work*)work;
588   w->done(w, status);
589 }
590 #endif
591 
592 
593 /* TODO(bnoordhuis) teach libuv how to cancel file operations
594  * that go through io_uring instead of the thread pool.
595  */
uv__work_cancel(uv_loop_t * loop,uv_req_t * req,struct uv__work * w)596 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
597   int cancelled;
598 
599   rdlock_closed_uv_loop_rwlock();
600   if (!is_uv_loop_good_magic(w->loop)) {
601     rdunlock_closed_uv_loop_rwlock();
602     return 0;
603   }
604 
605 #ifndef USE_FFRT
606   uv_mutex_lock(&mutex);
607   uv_mutex_lock(&w->loop->wq_mutex);
608 
609   cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
610   if (cancelled)
611     uv__queue_remove(&w->wq);
612 
613   uv_mutex_unlock(&w->loop->wq_mutex);
614   uv_mutex_unlock(&mutex);
615 #else
616   uv_mutex_lock(&w->loop->wq_mutex);
617   cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
618     && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]);
619   uv_mutex_unlock(&w->loop->wq_mutex);
620 #endif
621 
622   if (!cancelled) {
623     rdunlock_closed_uv_loop_rwlock();
624     return UV_EBUSY;
625   }
626 
627   w->work = uv__cancelled;
628   uv_mutex_lock(&loop->wq_mutex);
629 #ifndef USE_FFRT
630   uv__queue_insert_tail(&loop->wq, &w->wq);
631   uv_async_send(&loop->wq_async);
632 #else
633   uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
634   int qos = (ffrt_qos_t)(intptr_t)req->reserved[0];
635 
636   if (uv_check_data_valid((struct uv_loop_data*)(w->loop->data)) == 0) {
637     int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
638     struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data -
639       (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
640     addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
641   } else {
642     uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
643     uv_async_send(&loop->wq_async);
644   }
645 #endif
646   uv_mutex_unlock(&loop->wq_mutex);
647   rdunlock_closed_uv_loop_rwlock();
648 
649   return 0;
650 }
651 
652 
uv__work_done(uv_async_t * handle)653 void uv__work_done(uv_async_t* handle) {
654   struct uv__work* w;
655   uv_loop_t* loop;
656   struct uv__queue* q;
657   struct uv__queue wq;
658   int err;
659   int nevents;
660 
661   loop = container_of(handle, uv_loop_t, wq_async);
662   rdlock_closed_uv_loop_rwlock();
663   if (!is_uv_loop_good_magic(loop)) {
664     rdunlock_closed_uv_loop_rwlock();
665     return;
666   }
667   rdunlock_closed_uv_loop_rwlock();
668 
669   uv_mutex_lock(&loop->wq_mutex);
670 #ifndef USE_FFRT
671   uv__queue_move(&loop->wq, &wq);
672 #else
673   uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
674   int i;
675   uv__queue_init(&wq);
676   for (i = 3; i >= 0; i--) {
677     if (!uv__queue_empty(&lfields->wq_sub[i])) {
678       uv__queue_append(&lfields->wq_sub[i], &wq);
679     }
680   }
681 #endif
682   uv_mutex_unlock(&loop->wq_mutex);
683 
684   nevents = 0;
685   uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
686   while (!uv__queue_empty(&wq)) {
687     q = uv__queue_head(&wq);
688     uv__queue_remove(q);
689 
690     w = container_of(q, struct uv__work, wq);
691     err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
692 #ifdef UV_STATISTIC
693     uv__post_statistic_work(w, DONE_EXECUTING);
694     struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
695     if (dump_work == NULL) {
696       UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno));
697       break;
698     }
699     dump_work->info = w->info;
700     dump_work->work = uv__update_work_info;
701 #endif
702 #ifdef ASYNC_STACKTRACE
703     uv_work_t* req = container_of(w, uv_work_t, work_req);
704     LibuvSetStackId((uint64_t)req->reserved[3]);
705 #endif
706     w->done(w, err);
707     nevents++;
708 #ifdef UV_STATISTIC
709     dump_work->time = uv__now_timestamp();
710     dump_work->state = DONE_END;
711     QUEUE_INIT(&dump_work->wq);
712     post_statistic_work(&dump_work->wq);
713 #endif
714   }
715   uv_end_trace(UV_TRACE_TAG);
716 
717   /* This check accomplishes 2 things:
718    * 1. Even if the queue was empty, the call to uv__work_done() should count
719    *    as an event. Which will have been added by the event loop when
720    *    calling this callback.
721    * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
722    */
723   if (nevents > 1) {
724     /* Subtract 1 to counter the call to uv__work_done(). */
725     uv__metrics_inc_events(loop, nevents - 1);
726     if (uv__get_internal_fields(loop)->current_timeout == 0)
727       uv__metrics_inc_events_waiting(loop, nevents - 1);
728   }
729 }
730 
731 
uv__queue_work(struct uv__work * w)732 static void uv__queue_work(struct uv__work* w) {
733   uv_work_t* req = container_of(w, uv_work_t, work_req);
734 
735   req->work_cb(req);
736 }
737 
738 
uv__queue_done(struct uv__work * w,int err)739 static void uv__queue_done(struct uv__work* w, int err) {
740   uv_work_t* req;
741 
742   if (w == NULL) {
743     UV_LOGE("uv_work_t is NULL");
744     return;
745   }
746 
747   req = container_of(w, uv_work_t, work_req);
748   uv__req_unregister(req->loop, req);
749 
750   if (req->after_work_cb == NULL)
751     return;
752 
753   req->after_work_cb(req, err);
754 }
755 
756 
757 #ifdef USE_FFRT
uv__ffrt_work(ffrt_executor_task_t * data,ffrt_qos_t qos)758 void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
759 {
760   struct uv__work* w = (struct uv__work *)data;
761   uv_loop_t* loop = w->loop;
762 #ifdef UV_STATISTIC
763   uv__post_statistic_work(w, WORK_EXECUTING);
764 #endif
765   uv_work_t* req = container_of(w, uv_work_t, work_req);
766 #ifdef ASYNC_STACKTRACE
767   LibuvSetStackId((uint64_t)req->reserved[3]);
768 #endif
769   w->work(w);
770 #ifdef UV_STATISTIC
771   uv__post_statistic_work(w, WORK_END);
772 #endif
773   rdlock_closed_uv_loop_rwlock();
774   if (loop->magic != UV_LOOP_MAGIC) {
775     rdunlock_closed_uv_loop_rwlock();
776     UV_LOGE("uv_loop(%{public}zu:%{public}#x) in task(%p:%p) is invalid",
777             (size_t)loop, loop->magic, req->work_cb, req->after_work_cb);
778     return;
779   }
780 
781   uv_mutex_lock(&loop->wq_mutex);
782   w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
783 
784   if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) {
785     int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
786     struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data -
787       (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
788     addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
789   } else {
790     uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
791     uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
792     uv_async_send(&loop->wq_async);
793   }
794   uv_mutex_unlock(&loop->wq_mutex);
795   rdunlock_closed_uv_loop_rwlock();
796 }
797 
init_once(void)798 static void init_once(void)
799 {
800   init_closed_uv_loop_rwlock_once();
801   /* init uv work statics queue */
802 #ifdef UV_STATISTIC
803   init_work_dump_queue();
804 #endif
805   ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
806 }
807 
808 
809 /* ffrt uv__work_submit */
uv__work_submit(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))810 void uv__work_submit(uv_loop_t* loop,
811                      uv_req_t* req,
812                      struct uv__work* w,
813                      enum uv__work_kind kind,
814                      void (*work)(struct uv__work *w),
815                      void (*done)(struct uv__work *w, int status)) {
816   uv_once(&once, init_once);
817   ffrt_task_attr_t attr;
818   ffrt_task_attr_init(&attr);
819 
820   switch(kind) {
821     case UV__WORK_CPU:
822       ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
823       break;
824     case UV__WORK_FAST_IO:
825       ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
826       break;
827     case UV__WORK_SLOW_IO:
828       ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
829       break;
830     default:
831 #ifdef USE_OHOS_DFX
832       UV_LOGI("Unknown work kind");
833 #endif
834       return;
835   }
836 
837   w->loop = loop;
838   w->work = work;
839   w->done = done;
840 
841   req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
842   ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
843   ffrt_task_attr_destroy(&attr);
844 }
845 
846 
847 /* ffrt uv__work_submit */
uv__work_submit_with_qos(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,ffrt_qos_t qos,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))848 void uv__work_submit_with_qos(uv_loop_t* loop,
849                      uv_req_t* req,
850                      struct uv__work* w,
851                      ffrt_qos_t qos,
852                      void (*work)(struct uv__work *w),
853                      void (*done)(struct uv__work *w, int status)) {
854     uv_once(&once, init_once);
855     ffrt_task_attr_t attr;
856     ffrt_task_attr_init(&attr);
857     ffrt_task_attr_set_qos(&attr, qos);
858 
859     w->loop = loop;
860     w->work = work;
861     w->done = done;
862 
863     req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
864     ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
865     ffrt_task_attr_destroy(&attr);
866 }
867 #endif
868 
869 
uv_queue_work(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb)870 int uv_queue_work(uv_loop_t* loop,
871                   uv_work_t* req,
872                   uv_work_cb work_cb,
873                   uv_after_work_cb after_work_cb) {
874   if (work_cb == NULL)
875     return UV_EINVAL;
876 
877   uv__req_init(loop, req, UV_WORK);
878   req->loop = loop;
879   req->work_cb = work_cb;
880   req->after_work_cb = after_work_cb;
881 
882 #ifdef UV_STATISTIC
883   struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info));
884   if (info == NULL) {
885     abort();
886   }
887   uv_init_dump_info(info, &req->work_req);
888   info->builtin_return_address[0] = __builtin_return_address(0);
889   info->builtin_return_address[1] = __builtin_return_address(1);
890   info->builtin_return_address[2] = __builtin_return_address(2);
891   (req->work_req).info = info;
892 #endif
893 #ifdef ASYNC_STACKTRACE
894   req->reserved[3] = (void*)LibuvCollectAsyncStack();
895 #endif
896   uv__work_submit(loop,
897 #ifdef USE_FFRT
898                   (uv_req_t*)req,
899 #endif
900                   &req->work_req,
901                   UV__WORK_CPU,
902                   uv__queue_work,
903                   uv__queue_done
904 );
905 #ifdef UV_STATISTIC
906   uv_queue_statics(info);
907 #endif
908   return 0;
909 }
910 
911 
uv_queue_work_with_qos(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb,uv_qos_t qos)912 int uv_queue_work_with_qos(uv_loop_t* loop,
913                   uv_work_t* req,
914                   uv_work_cb work_cb,
915                   uv_after_work_cb after_work_cb,
916                   uv_qos_t qos) {
917 #ifdef USE_FFRT
918   if (work_cb == NULL)
919     return UV_EINVAL;
920 
921   STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
922   STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
923   STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
924   STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
925   if (qos < ffrt_qos_background || qos > ffrt_qos_user_initiated) {
926     return UV_EINVAL;
927   }
928 
929   uv__req_init(loop, req, UV_WORK);
930   req->loop = loop;
931   req->work_cb = work_cb;
932   req->after_work_cb = after_work_cb;
933 #ifdef UV_STATISTIC
934   struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info));
935   if (info == NULL) {
936     abort();
937   }
938   uv_init_dump_info(info, &req->work_req);
939   info->builtin_return_address[0] = __builtin_return_address(0);
940   info->builtin_return_address[1] = __builtin_return_address(1);
941   info->builtin_return_address[2] = __builtin_return_address(2);
942   (req->work_req).info = info;
943 #endif
944   uv__work_submit_with_qos(loop,
945                   (uv_req_t*)req,
946                   &req->work_req,
947                   (ffrt_qos_t)qos,
948                   uv__queue_work,
949                   uv__queue_done);
950 #ifdef UV_STATISTIC
951   uv_queue_statics(info);
952 #endif
953   return 0;
954 #else
955   return uv_queue_work(loop, req, work_cb, after_work_cb);
956 #endif
957 }
958 
959 
uv_cancel(uv_req_t * req)960 int uv_cancel(uv_req_t* req) {
961   struct uv__work* wreq;
962   uv_loop_t* loop;
963 
964   switch (req->type) {
965   case UV_FS:
966     loop =  ((uv_fs_t*) req)->loop;
967     wreq = &((uv_fs_t*) req)->work_req;
968     break;
969   case UV_GETADDRINFO:
970     loop =  ((uv_getaddrinfo_t*) req)->loop;
971     wreq = &((uv_getaddrinfo_t*) req)->work_req;
972     break;
973   case UV_GETNAMEINFO:
974     loop = ((uv_getnameinfo_t*) req)->loop;
975     wreq = &((uv_getnameinfo_t*) req)->work_req;
976     break;
977   case UV_RANDOM:
978     loop = ((uv_random_t*) req)->loop;
979     wreq = &((uv_random_t*) req)->work_req;
980     break;
981   case UV_WORK:
982     loop =  ((uv_work_t*) req)->loop;
983     wreq = &((uv_work_t*) req)->work_req;
984     break;
985   default:
986     return UV_EINVAL;
987   }
988 
989   return uv__work_cancel(loop, req, wreq);
990 }
991