• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv-common.h"
23 #include "uv_log.h"
24 #include "uv_trace.h"
25 
26 #if !defined(_WIN32)
27 # include "unix/internal.h"
28 #else
29 # include "win/internal.h"
30 #endif
31 
32 #include <stdlib.h>
33 #ifdef USE_FFRT
34 #include <assert.h>
35 #include "ffrt_inner.h"
36 #endif
37 #include <stdio.h>
38 #ifdef ASYNC_STACKTRACE
39 #include "dfx/async_stack/libuv_async_stack.h"
40 #endif
41 
42 #define MAX_THREADPOOL_SIZE 1024
43 #define UV_TRACE_NAME "UV_TRACE"
44 
45 #ifdef USE_OHOS_DFX
46 #define MIN_REQS_THRESHOLD 100
47 #define MAX_REQS_THRESHOLD 300
48 #define CURSOR 5
49 #endif
50 
51 static uv_rwlock_t g_closed_uv_loop_rwlock;
52 static uv_once_t once = UV_ONCE_INIT;
53 static uv_cond_t cond;
54 static uv_mutex_t mutex;
55 static unsigned int idle_threads;
56 static unsigned int nthreads;
57 static uv_thread_t* threads;
58 static uv_thread_t default_threads[4];
59 static struct uv__queue exit_message;
60 static struct uv__queue wq;
61 static struct uv__queue run_slow_work_message;
62 static struct uv__queue slow_io_pending_wq;
63 
64 
65 #ifdef UV_STATISTIC
66 #define MAX_DUMP_QUEUE_SIZE 200
67 static uv_mutex_t dump_queue_mutex;
68 static QUEUE dump_queue;
69 static unsigned int dump_queue_size;
70 
71 static int statistic_idle;
72 static uv_mutex_t statistic_mutex;
73 static QUEUE statistic_works;
74 static uv_cond_t dump_cond;
75 static uv_thread_t dump_thread;
76 
uv_dump_worker(void * arg)77 static void uv_dump_worker(void* arg) {
78   struct uv__statistic_work* w;
79   struct uv__queue* q;
80   uv_sem_post((uv_sem_t*) arg);
81   arg = NULL;
82   uv_mutex_lock(&statistic_mutex);
83   for (;;) {
84     while (uv__queue_empty(&statistic_works)) {
85       statistic_idle = 1;
86       uv_cond_wait(&dump_cond, &statistic_mutex);
87       statistic_idle = 0;
88     }
89     q = uv__queue_head(&statistic_works);
90     if (q == &exit_message) {
91       uv_cond_signal(&dump_cond);
92       uv_mutex_unlock(&statistic_mutex);
93       break;
94     }
95     uv__queue_remove(q);
96     uv__queue_init(q);
97     uv_mutex_unlock(&statistic_mutex);
98     w = uv__queue_data(q, struct uv__statistic_work, wq);
99     w->work(w);
100     free(w);
101     uv_mutex_lock(&statistic_mutex);
102   }
103 }
104 
post_statistic_work(struct uv__queue * q)105 static void post_statistic_work(struct uv__queue* q) {
106   uv_mutex_lock(&statistic_mutex);
107   uv__queue_insert_tail(&statistic_works, q);
108   if (statistic_idle)
109     uv_cond_signal(&dump_cond);
110   uv_mutex_unlock(&statistic_mutex);
111 }
112 
uv__queue_work_info(struct uv__statistic_work * work)113 static void uv__queue_work_info(struct uv__statistic_work *work) {
114   uv_mutex_lock(&dump_queue_mutex);
115   if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */
116     struct uv__queue* q;
117     uv__queue_foreach(q, &dump_queue) {
118       struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
119       if (info->state == DONE_END) {
120         uv__queue_remove(q);
121         free(info);
122         dump_queue_size--;
123       }
124     }
125     if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) {
126       abort(); /* too many works not done. */
127     }
128   }
129 
130   uv__queue_insert_head(&dump_queue,  &work->info->wq);
131   dump_queue_size++;
132   uv_mutex_unlock(&dump_queue_mutex);
133 }
134 
uv__update_work_info(struct uv__statistic_work * work)135 static void uv__update_work_info(struct uv__statistic_work *work) {
136   uv_mutex_lock(&dump_queue_mutex);
137   if (work != NULL && work->info != NULL) {
138     work->info->state = work->state;
139     switch (work->state) {
140       case WAITING:
141         work->info->queue_time = work->time;
142         break;
143       case WORK_EXECUTING:
144         work->info->execute_start_time = work->time;
145         break;
146       case WORK_END:
147         work->info->execute_end_time = work->time;
148         break;
149       case DONE_EXECUTING:
150         work->info->done_start_time = work->time;
151         break;
152       case DONE_END:
153         work->info->done_end_time = work->time;
154         break;
155       default:
156         break;
157     }
158   }
159   uv_mutex_unlock(&dump_queue_mutex);
160 }
161 
162 
163 // return the timestamp in millisecond
uv__now_timestamp()164 static uint64_t uv__now_timestamp() {
165   uv_timeval64_t tv;
166   int r = uv_gettimeofday(&tv);
167   if (r != 0) {
168     return 0;
169   }
170   return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
171 }
172 
173 
uv__post_statistic_work(struct uv__work * w,enum uv_work_state state)174 static void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) {
175   struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
176   if (dump_work == NULL) {
177     return;
178   }
179   dump_work->info = w->info;
180   dump_work->work = uv__update_work_info;
181   dump_work->time = uv__now_timestamp();
182   dump_work->state = state;
183   uv__queue_init(&dump_work->wq);
184   post_statistic_work(&dump_work->wq);
185 }
186 
187 
init_work_dump_queue()188 static void init_work_dump_queue()
189 {
190   if (uv_mutex_init(&dump_queue_mutex))
191     abort();
192   uv_mutex_lock(&dump_queue_mutex);
193   uv__queue_init(&dump_queue);
194   dump_queue_size = 0;
195   uv_mutex_unlock(&dump_queue_mutex);
196 
197   /* init dump thread */
198   statistic_idle = 1;
199   if (uv_mutex_init(&statistic_mutex))
200     abort();
201   uv__queue_init(&statistic_works);
202   uv_sem_t sem;
203   if (uv_cond_init(&dump_cond))
204     abort();
205   if (uv_sem_init(&sem, 0))
206     abort();
207   if (uv_thread_create(&dump_thread, uv_dump_worker, &sem))
208       abort();
209   uv_sem_wait(&sem);
210   uv_sem_destroy(&sem);
211 }
212 
213 
uv_init_dump_info(struct uv_work_dump_info * info,struct uv__work * w)214 void uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) {
215   if (info == NULL)
216     return;
217   info->queue_time = 0;
218   info->state = WAITING;
219   info->execute_start_time = 0;
220   info->execute_end_time = 0;
221   info->done_start_time = 0;
222   info->done_end_time = 0;
223   info->work = w;
224   uv__queue_init(&info->wq);
225 }
226 
227 
uv_queue_statics(struct uv_work_dump_info * info)228 void uv_queue_statics(struct uv_work_dump_info* info) {
229   struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
230   if (dump_work == NULL) {
231     abort();
232   }
233   dump_work->info = info;
234   dump_work->work = uv__queue_work_info;
235   info->queue_time = uv__now_timestamp();
236   dump_work->state = WAITING;
237   uv__queue_init(&dump_work->wq);
238   post_statistic_work(&dump_work->wq);
239 }
240 
241 
uv_dump_work_queue(int * size)242 uv_worker_info_t* uv_dump_work_queue(int* size) {
243 #ifdef UV_STATISTIC
244   uv_mutex_lock(&dump_queue_mutex);
245   if (uv__queue_empty(&dump_queue)) {
246     return NULL;
247   }
248   *size = dump_queue_size;
249   uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size);
250   struct uv__queue* q;
251   int i = 0;
252   uv__queue_foreach(q, &dump_queue) {
253     struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
254     dump_info[i].queue_time = info->queue_time;
255     dump_info[i].builtin_return_address[0] = info->builtin_return_address[0];
256     dump_info[i].builtin_return_address[1] = info->builtin_return_address[1];
257     dump_info[i].builtin_return_address[2] = info->builtin_return_address[2];
258     switch (info->state) {
259       case WAITING:
260         strcpy(dump_info[i].state, "waiting");
261         break;
262       case WORK_EXECUTING:
263         strcpy(dump_info[i].state, "work_executing");
264         break;
265       case WORK_END:
266         strcpy(dump_info[i].state, "work_end");
267         break;
268       case DONE_EXECUTING:
269         strcpy(dump_info[i].state, "done_executing");
270         break;
271       case DONE_END:
272         strcpy(dump_info[i].state, "done_end");
273         break;
274       default:
275         break;
276     }
277     dump_info[i].execute_start_time = info->execute_start_time;
278     dump_info[i].execute_end_time = info->execute_end_time;
279     dump_info[i].done_start_time = info->done_start_time;
280     dump_info[i].done_end_time = info->done_end_time;
281     ++i;
282   }
283   uv_mutex_unlock(&dump_queue_mutex);
284   return dump_info;
285 #else
286   size = 0;
287   return NULL;
288 #endif
289 }
290 #endif
291 
292 
init_closed_uv_loop_rwlock_once(void)293 static void init_closed_uv_loop_rwlock_once(void) {
294   uv_rwlock_init(&g_closed_uv_loop_rwlock);
295 }
296 
297 
rdlock_closed_uv_loop_rwlock(void)298 void rdlock_closed_uv_loop_rwlock(void) {
299   uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
300 }
301 
302 
rdunlock_closed_uv_loop_rwlock(void)303 void rdunlock_closed_uv_loop_rwlock(void) {
304   uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
305 }
306 
307 
is_uv_loop_good_magic(const uv_loop_t * loop)308 int is_uv_loop_good_magic(const uv_loop_t* loop) {
309   if (loop->magic == UV_LOOP_MAGIC) {
310     return 1;
311   }
312   UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop, loop->magic);
313   return 0;
314 }
315 
316 
on_uv_loop_close(uv_loop_t * loop)317 void on_uv_loop_close(uv_loop_t* loop) {
318   time_t t1, t2;
319   time(&t1);
320   uv_start_trace(UV_TRACE_TAG, "Get Write Lock");
321   uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
322   uv_end_trace(UV_TRACE_TAG);
323   loop->magic = ~UV_LOOP_MAGIC;
324   uv_start_trace(UV_TRACE_TAG, "Release Write Lock");
325   uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
326   uv_end_trace(UV_TRACE_TAG);
327   time(&t2);
328   UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop, (ssize_t)(t2 - t1));
329 }
330 
331 
uv__cancelled(struct uv__work * w)332 static void uv__cancelled(struct uv__work* w) {
333   abort();
334 }
335 
336 
337 #ifndef USE_FFRT
338 static unsigned int slow_io_work_running;
339 
slow_work_thread_threshold(void)340 static unsigned int slow_work_thread_threshold(void) {
341   return (nthreads + 1) / 2;
342 }
343 
344 
345 /* To avoid deadlock with uv_cancel() it's crucial that the worker
346  * never holds the global mutex and the loop-local mutex at the same time.
347  */
worker(void * arg)348 static void worker(void* arg) {
349   struct uv__work* w;
350   struct uv__queue* q;
351   int is_slow_work;
352 
353   uv_sem_post((uv_sem_t*) arg);
354   arg = NULL;
355 
356   uv_mutex_lock(&mutex);
357   for (;;) {
358     /* `mutex` should always be locked at this point. */
359 
360     /* Keep waiting while either no work is present or only slow I/O
361        and we're at the threshold for that. */
362     while (uv__queue_empty(&wq) ||
363            (uv__queue_head(&wq) == &run_slow_work_message &&
364             uv__queue_next(&run_slow_work_message) == &wq &&
365             slow_io_work_running >= slow_work_thread_threshold())) {
366       idle_threads += 1;
367       uv_cond_wait(&cond, &mutex);
368       idle_threads -= 1;
369     }
370 
371     q = uv__queue_head(&wq);
372     if (q == &exit_message) {
373       uv_cond_signal(&cond);
374       uv_mutex_unlock(&mutex);
375       break;
376     }
377 
378     uv__queue_remove(q);
379     uv__queue_init(q);  /* Signal uv_cancel() that the work req is executing. */
380 
381     is_slow_work = 0;
382     if (q == &run_slow_work_message) {
383       /* If we're at the slow I/O threshold, re-schedule until after all
384          other work in the queue is done. */
385       if (slow_io_work_running >= slow_work_thread_threshold()) {
386         uv__queue_insert_tail(&wq, q);
387         continue;
388       }
389 
390       /* If we encountered a request to run slow I/O work but there is none
391          to run, that means it's cancelled => Start over. */
392       if (uv__queue_empty(&slow_io_pending_wq))
393         continue;
394 
395       is_slow_work = 1;
396       slow_io_work_running++;
397 
398       q = uv__queue_head(&slow_io_pending_wq);
399       uv__queue_remove(q);
400       uv__queue_init(q);
401 
402       /* If there is more slow I/O work, schedule it to be run as well. */
403       if (!uv__queue_empty(&slow_io_pending_wq)) {
404         uv__queue_insert_tail(&wq, &run_slow_work_message);
405         if (idle_threads > 0)
406           uv_cond_signal(&cond);
407       }
408     }
409 
410     uv_mutex_unlock(&mutex);
411 
412     w = uv__queue_data(q, struct uv__work, wq);
413 #ifdef UV_STATISTIC
414     uv__post_statistic_work(w, WORK_EXECUTING);
415 #endif
416 #ifdef ASYNC_STACKTRACE
417     uv_work_t* req = container_of(w, uv_work_t, work_req);
418     LibuvSetStackId((uint64_t)req->reserved[3]);
419 #endif
420     w->work(w);
421 #ifdef UV_STATISTIC
422     uv__post_statistic_work(w, WORK_END);
423 #endif
424     uv_mutex_lock(&w->loop->wq_mutex);
425     w->work = NULL;  /* Signal uv_cancel() that the work req is done
426                         executing. */
427     uv__queue_insert_tail(&w->loop->wq, &w->wq);
428     uv_async_send(&w->loop->wq_async);
429     uv_mutex_unlock(&w->loop->wq_mutex);
430 
431     /* Lock `mutex` since that is expected at the start of the next
432      * iteration. */
433     uv_mutex_lock(&mutex);
434     if (is_slow_work) {
435       /* `slow_io_work_running` is protected by `mutex`. */
436       slow_io_work_running--;
437     }
438   }
439 }
440 #endif
441 
442 
post(struct uv__queue * q,enum uv__work_kind kind)443 static void post(struct uv__queue* q, enum uv__work_kind kind) {
444   uv_mutex_lock(&mutex);
445   if (kind == UV__WORK_SLOW_IO) {
446     /* Insert into a separate queue. */
447     uv__queue_insert_tail(&slow_io_pending_wq, q);
448     if (!uv__queue_empty(&run_slow_work_message)) {
449       /* Running slow I/O tasks is already scheduled => Nothing to do here.
450          The worker that runs said other task will schedule this one as well. */
451       uv_mutex_unlock(&mutex);
452       return;
453     }
454     q = &run_slow_work_message;
455   }
456 
457   uv__queue_insert_tail(&wq, q);
458   if (idle_threads > 0)
459     uv_cond_signal(&cond);
460   uv_mutex_unlock(&mutex);
461 }
462 
463 
464 #ifdef __MVS__
465 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */
466 __attribute__((destructor))
467 #endif
uv__threadpool_cleanup(void)468 void uv__threadpool_cleanup(void) {
469   unsigned int i;
470 
471   if (nthreads == 0)
472     return;
473 
474 #ifndef __MVS__
475   /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
476   post(&exit_message, UV__WORK_CPU);
477 #endif
478 
479   for (i = 0; i < nthreads; i++)
480     if (uv_thread_join(threads + i))
481       abort();
482 
483   if (threads != default_threads)
484     uv__free(threads);
485 
486   uv_mutex_destroy(&mutex);
487   uv_cond_destroy(&cond);
488 
489   threads = NULL;
490   nthreads = 0;
491 #ifdef UV_STATISTIC
492   post_statistic_work(&exit_message);
493   uv_thread_join(dump_thread);
494   uv_mutex_destroy(&statistic_mutex);
495   uv_cond_destroy(&dump_cond);
496 #endif
497 }
498 
499 
500 #ifndef USE_FFRT
init_threads(void)501 static void init_threads(void) {
502   uv_thread_options_t config;
503   unsigned int i;
504   const char* val;
505   uv_sem_t sem;
506 
507   nthreads = ARRAY_SIZE(default_threads);
508   val = getenv("UV_THREADPOOL_SIZE");
509   if (val != NULL)
510     nthreads = atoi(val);
511   if (nthreads == 0)
512     nthreads = 1;
513   if (nthreads > MAX_THREADPOOL_SIZE)
514     nthreads = MAX_THREADPOOL_SIZE;
515 
516   threads = default_threads;
517   if (nthreads > ARRAY_SIZE(default_threads)) {
518     threads = uv__malloc(nthreads * sizeof(threads[0]));
519     if (threads == NULL) {
520       nthreads = ARRAY_SIZE(default_threads);
521       threads = default_threads;
522     }
523   }
524 
525   if (uv_cond_init(&cond))
526     abort();
527 
528   if (uv_mutex_init(&mutex))
529     abort();
530 
531   uv__queue_init(&wq);
532   uv__queue_init(&slow_io_pending_wq);
533   uv__queue_init(&run_slow_work_message);
534 
535   if (uv_sem_init(&sem, 0))
536     abort();
537 
538   config.flags = UV_THREAD_HAS_STACK_SIZE;
539   config.stack_size = 8u << 20;  /* 8 MB */
540 
541   for (i = 0; i < nthreads; i++)
542     if (uv_thread_create_ex(threads + i, &config, worker, &sem))
543       abort();
544 
545   for (i = 0; i < nthreads; i++)
546     uv_sem_wait(&sem);
547 
548   uv_sem_destroy(&sem);
549 }
550 
551 
552 #ifndef _WIN32
reset_once(void)553 static void reset_once(void) {
554   uv_once_t child_once = UV_ONCE_INIT;
555   memcpy(&once, &child_once, sizeof(child_once));
556 }
557 #endif
558 
559 
init_once(void)560 static void init_once(void) {
561 #ifndef _WIN32
562   /* Re-initialize the threadpool after fork.
563    * Note that this discards the global mutex and condition as well
564    * as the work queue.
565    */
566   if (pthread_atfork(NULL, NULL, &reset_once))
567     abort();
568 #endif
569   init_closed_uv_loop_rwlock_once();
570 #ifdef UV_STATISTIC
571   init_work_dump_queue();
572 #endif
573   init_threads();
574 }
575 
576 
uv__work_submit(uv_loop_t * loop,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))577 void uv__work_submit(uv_loop_t* loop,
578                      struct uv__work* w,
579                      enum uv__work_kind kind,
580                      void (*work)(struct uv__work* w),
581                      void (*done)(struct uv__work* w, int status)) {
582   uv_once(&once, init_once);
583   w->loop = loop;
584   w->work = work;
585   w->done = done;
586   post(&w->wq, kind);
587 }
588 #endif
589 
590 
uv__print_active_reqs(uv_loop_t * loop,const char * flag)591 static void uv__print_active_reqs(uv_loop_t* loop, const char* flag) {
592 #ifdef USE_OHOS_DFX
593   unsigned int count = loop->active_reqs.count;
594   if (count == MIN_REQS_THRESHOLD || count == MIN_REQS_THRESHOLD + CURSOR ||
595       count == MAX_REQS_THRESHOLD || count == MAX_REQS_THRESHOLD + CURSOR) {
596     UV_LOGW("loop:%{public}zu, flag:%{public}s, active reqs:%{public}u", (size_t)loop, flag, count);
597   }
598 #else
599   return;
600 #endif
601 }
602 
603 
604 #ifdef USE_FFRT
uv__task_done_wrapper(void * work,int status)605 static void uv__task_done_wrapper(void* work, int status) {
606   struct uv__work* w = (struct uv__work*)work;
607   uv__print_active_reqs(w->loop, "complete");
608   w->done(w, status);
609 }
610 #endif
611 
612 
613 /* TODO(bnoordhuis) teach libuv how to cancel file operations
614  * that go through io_uring instead of the thread pool.
615  */
uv__work_cancel(uv_loop_t * loop,uv_req_t * req,struct uv__work * w)616 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
617   int cancelled;
618 
619   rdlock_closed_uv_loop_rwlock();
620   if (!is_uv_loop_good_magic(w->loop)) {
621     rdunlock_closed_uv_loop_rwlock();
622     return 0;
623   }
624 
625 #ifndef USE_FFRT
626   uv_mutex_lock(&mutex);
627   uv_mutex_lock(&w->loop->wq_mutex);
628 
629   cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
630   if (cancelled)
631     uv__queue_remove(&w->wq);
632 
633   uv_mutex_unlock(&w->loop->wq_mutex);
634   uv_mutex_unlock(&mutex);
635 #else
636   uv_mutex_lock(&w->loop->wq_mutex);
637   cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
638     && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]);
639   uv_mutex_unlock(&w->loop->wq_mutex);
640 #endif
641 
642   if (!cancelled) {
643     rdunlock_closed_uv_loop_rwlock();
644     return UV_EBUSY;
645   }
646 
647   w->work = uv__cancelled;
648   uv_mutex_lock(&loop->wq_mutex);
649 #ifndef USE_FFRT
650   uv__queue_insert_tail(&loop->wq, &w->wq);
651   uv_async_send(&loop->wq_async);
652 #else
653   uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
654   int qos = (ffrt_qos_t)(intptr_t)req->reserved[0];
655 
656   if (uv_check_data_valid((struct uv_loop_data*)(w->loop->data)) == 0) {
657     int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
658     struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data -
659       (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
660     addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
661   } else {
662     uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
663     uv_async_send(&loop->wq_async);
664   }
665 #endif
666   uv_mutex_unlock(&loop->wq_mutex);
667   rdunlock_closed_uv_loop_rwlock();
668 
669   return 0;
670 }
671 
672 
uv__work_done(uv_async_t * handle)673 void uv__work_done(uv_async_t* handle) {
674   struct uv__work* w;
675   uv_loop_t* loop;
676   struct uv__queue* q;
677   struct uv__queue wq;
678   int err;
679   int nevents;
680 
681   loop = container_of(handle, uv_loop_t, wq_async);
682 #ifdef USE_OHOS_DFX
683   if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) != 0)
684     uv__print_active_reqs(loop, "complete");
685 #endif
686   rdlock_closed_uv_loop_rwlock();
687   if (!is_uv_loop_good_magic(loop)) {
688     rdunlock_closed_uv_loop_rwlock();
689     return;
690   }
691   rdunlock_closed_uv_loop_rwlock();
692 
693   uv_mutex_lock(&loop->wq_mutex);
694 #ifndef USE_FFRT
695   uv__queue_move(&loop->wq, &wq);
696 #else
697   uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
698   int i;
699   uv__queue_init(&wq);
700   for (i = 3; i >= 0; i--) {
701     if (!uv__queue_empty(&lfields->wq_sub[i])) {
702       uv__queue_append(&lfields->wq_sub[i], &wq);
703     }
704   }
705 #endif
706   uv_mutex_unlock(&loop->wq_mutex);
707 
708   nevents = 0;
709   uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
710   while (!uv__queue_empty(&wq)) {
711     q = uv__queue_head(&wq);
712     uv__queue_remove(q);
713 
714     w = container_of(q, struct uv__work, wq);
715     err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
716 #ifdef UV_STATISTIC
717     uv__post_statistic_work(w, DONE_EXECUTING);
718     struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
719     if (dump_work == NULL) {
720       UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno));
721       break;
722     }
723     dump_work->info = w->info;
724     dump_work->work = uv__update_work_info;
725 #endif
726 #ifdef ASYNC_STACKTRACE
727     uv_work_t* req = container_of(w, uv_work_t, work_req);
728     LibuvSetStackId((uint64_t)req->reserved[3]);
729 #endif
730     w->done(w, err);
731     nevents++;
732 #ifdef UV_STATISTIC
733     dump_work->time = uv__now_timestamp();
734     dump_work->state = DONE_END;
735     QUEUE_INIT(&dump_work->wq);
736     post_statistic_work(&dump_work->wq);
737 #endif
738   }
739   uv_end_trace(UV_TRACE_TAG);
740 
741   /* This check accomplishes 2 things:
742    * 1. Even if the queue was empty, the call to uv__work_done() should count
743    *    as an event. Which will have been added by the event loop when
744    *    calling this callback.
745    * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
746    */
747   if (nevents > 1) {
748     /* Subtract 1 to counter the call to uv__work_done(). */
749     uv__metrics_inc_events(loop, nevents - 1);
750     if (uv__get_internal_fields(loop)->current_timeout == 0)
751       uv__metrics_inc_events_waiting(loop, nevents - 1);
752   }
753 }
754 
755 
uv__queue_work(struct uv__work * w)756 static void uv__queue_work(struct uv__work* w) {
757   uv_work_t* req = container_of(w, uv_work_t, work_req);
758 
759   req->work_cb(req);
760 }
761 
762 
uv__queue_done(struct uv__work * w,int err)763 static void uv__queue_done(struct uv__work* w, int err) {
764   uv_work_t* req;
765 
766   if (w == NULL) {
767     UV_LOGE("uv_work_t is NULL");
768     return;
769   }
770 
771   req = container_of(w, uv_work_t, work_req);
772   uv__req_unregister(req->loop, req);
773 
774   if (req->after_work_cb == NULL)
775     return;
776 
777   req->after_work_cb(req, err);
778 }
779 
780 
781 #ifdef USE_FFRT
uv__ffrt_work(ffrt_executor_task_t * data,ffrt_qos_t qos)782 void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
783 {
784   struct uv__work* w = (struct uv__work *)data;
785   uv_loop_t* loop = w->loop;
786 #ifdef UV_STATISTIC
787   uv__post_statistic_work(w, WORK_EXECUTING);
788 #endif
789 #ifdef ASYNC_STACKTRACE
790   uv_work_t* req = container_of(w, uv_work_t, work_req);
791   LibuvSetStackId((uint64_t)req->reserved[3]);
792 #endif
793   w->work(w);
794 #ifdef UV_STATISTIC
795   uv__post_statistic_work(w, WORK_END);
796 #endif
797   rdlock_closed_uv_loop_rwlock();
798   if (loop->magic != UV_LOOP_MAGIC) {
799     rdunlock_closed_uv_loop_rwlock();
800     UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid",
801             (size_t)loop, loop->magic);
802     return;
803   }
804 
805   uv_mutex_lock(&loop->wq_mutex);
806   w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
807 
808   if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) {
809     int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
810     struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data -
811       (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
812     addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
813   } else {
814     uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
815     uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
816     uv_async_send(&loop->wq_async);
817   }
818   uv_mutex_unlock(&loop->wq_mutex);
819   rdunlock_closed_uv_loop_rwlock();
820 }
821 
init_once(void)822 static void init_once(void)
823 {
824   init_closed_uv_loop_rwlock_once();
825   /* init uv work statics queue */
826 #ifdef UV_STATISTIC
827   init_work_dump_queue();
828 #endif
829   ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
830 }
831 
832 
833 /* ffrt uv__work_submit */
uv__work_submit(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))834 void uv__work_submit(uv_loop_t* loop,
835                      uv_req_t* req,
836                      struct uv__work* w,
837                      enum uv__work_kind kind,
838                      void (*work)(struct uv__work *w),
839                      void (*done)(struct uv__work *w, int status)) {
840   uv_once(&once, init_once);
841   ffrt_task_attr_t attr;
842   ffrt_task_attr_init(&attr);
843 
844   switch(kind) {
845     case UV__WORK_CPU:
846       ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
847       break;
848     case UV__WORK_FAST_IO:
849       ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
850       break;
851     case UV__WORK_SLOW_IO:
852       ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
853       break;
854     default:
855 #ifdef USE_OHOS_DFX
856       UV_LOGI("Unknown work kind");
857 #endif
858       return;
859   }
860 
861   w->loop = loop;
862   w->work = work;
863   w->done = done;
864 
865   req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
866   ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
867   ffrt_task_attr_destroy(&attr);
868 }
869 
870 
871 /* ffrt uv__work_submit */
uv__work_submit_with_qos(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,ffrt_qos_t qos,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))872 void uv__work_submit_with_qos(uv_loop_t* loop,
873                      uv_req_t* req,
874                      struct uv__work* w,
875                      ffrt_qos_t qos,
876                      void (*work)(struct uv__work *w),
877                      void (*done)(struct uv__work *w, int status)) {
878     uv_once(&once, init_once);
879     ffrt_task_attr_t attr;
880     ffrt_task_attr_init(&attr);
881     ffrt_task_attr_set_qos(&attr, qos);
882 
883     w->loop = loop;
884     w->work = work;
885     w->done = done;
886 
887     req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
888     ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
889     ffrt_task_attr_destroy(&attr);
890 }
891 #endif
892 
893 
uv_queue_work(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb)894 int uv_queue_work(uv_loop_t* loop,
895                   uv_work_t* req,
896                   uv_work_cb work_cb,
897                   uv_after_work_cb after_work_cb) {
898   if (work_cb == NULL)
899     return UV_EINVAL;
900 
901   uv__print_active_reqs(loop, "execute");
902   uv__req_init(loop, req, UV_WORK);
903   req->loop = loop;
904   req->work_cb = work_cb;
905   req->after_work_cb = after_work_cb;
906 
907 #ifdef UV_STATISTIC
908   struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info));
909   if (info == NULL) {
910     abort();
911   }
912   uv_init_dump_info(info, &req->work_req);
913   info->builtin_return_address[0] = __builtin_return_address(0);
914   info->builtin_return_address[1] = __builtin_return_address(1);
915   info->builtin_return_address[2] = __builtin_return_address(2);
916   (req->work_req).info = info;
917 #endif
918 #ifdef ASYNC_STACKTRACE
919   req->reserved[3] = (void*)LibuvCollectAsyncStack();
920 #endif
921   uv__work_submit(loop,
922 #ifdef USE_FFRT
923                   (uv_req_t*)req,
924 #endif
925                   &req->work_req,
926                   UV__WORK_CPU,
927                   uv__queue_work,
928                   uv__queue_done
929 );
930 #ifdef UV_STATISTIC
931   uv_queue_statics(info);
932 #endif
933   return 0;
934 }
935 
936 
uv_queue_work_with_qos(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb,uv_qos_t qos)937 int uv_queue_work_with_qos(uv_loop_t* loop,
938                   uv_work_t* req,
939                   uv_work_cb work_cb,
940                   uv_after_work_cb after_work_cb,
941                   uv_qos_t qos) {
942 #ifdef USE_FFRT
943   if (work_cb == NULL)
944     return UV_EINVAL;
945 
946   STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
947   STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
948   STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
949   STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
950   if (qos < ffrt_qos_background || qos > ffrt_qos_user_initiated) {
951     return UV_EINVAL;
952   }
953 
954   uv__print_active_reqs(loop, "execute");
955   uv__req_init(loop, req, UV_WORK);
956   req->loop = loop;
957   req->work_cb = work_cb;
958   req->after_work_cb = after_work_cb;
959 #ifdef UV_STATISTIC
960   struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info));
961   if (info == NULL) {
962     abort();
963   }
964   uv_init_dump_info(info, &req->work_req);
965   info->builtin_return_address[0] = __builtin_return_address(0);
966   info->builtin_return_address[1] = __builtin_return_address(1);
967   info->builtin_return_address[2] = __builtin_return_address(2);
968   (req->work_req).info = info;
969 #endif
970   uv__work_submit_with_qos(loop,
971                   (uv_req_t*)req,
972                   &req->work_req,
973                   (ffrt_qos_t)qos,
974                   uv__queue_work,
975                   uv__queue_done);
976 #ifdef UV_STATISTIC
977   uv_queue_statics(info);
978 #endif
979   return 0;
980 #else
981   return uv_queue_work(loop, req, work_cb, after_work_cb);
982 #endif
983 }
984 
985 
uv_cancel(uv_req_t * req)986 int uv_cancel(uv_req_t* req) {
987   struct uv__work* wreq;
988   uv_loop_t* loop;
989 
990   switch (req->type) {
991   case UV_FS:
992     loop =  ((uv_fs_t*) req)->loop;
993     wreq = &((uv_fs_t*) req)->work_req;
994     break;
995   case UV_GETADDRINFO:
996     loop =  ((uv_getaddrinfo_t*) req)->loop;
997     wreq = &((uv_getaddrinfo_t*) req)->work_req;
998     break;
999   case UV_GETNAMEINFO:
1000     loop = ((uv_getnameinfo_t*) req)->loop;
1001     wreq = &((uv_getnameinfo_t*) req)->work_req;
1002     break;
1003   case UV_RANDOM:
1004     loop = ((uv_random_t*) req)->loop;
1005     wreq = &((uv_random_t*) req)->work_req;
1006     break;
1007   case UV_WORK:
1008     loop =  ((uv_work_t*) req)->loop;
1009     wreq = &((uv_work_t*) req)->work_req;
1010     break;
1011   default:
1012     return UV_EINVAL;
1013   }
1014 
1015   return uv__work_cancel(loop, req, wreq);
1016 }
1017