• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv-common.h"
23 #include "uv_log.h"
24 #include "uv_trace.h"
25 
26 #if !defined(_WIN32)
27 # include "unix/internal.h"
28 #else
29 # include "win/internal.h"
30 #endif
31 
32 #include <stdlib.h>
33 #ifdef USE_FFRT
34 #include <assert.h>
35 #include "ffrt_inner.h"
36 #endif
37 #include <stdio.h>
38 #ifdef ASYNC_STACKTRACE
39 #include "dfx/async_stack/libuv_async_stack.h"
40 #endif
41 
42 #define MAX_THREADPOOL_SIZE 1024
43 #define UV_TRACE_NAME "UV_TRACE"
44 
45 #ifdef USE_OHOS_DFX
46 #define MIN_REQS_THRESHOLD 100
47 #define MAX_REQS_THRESHOLD 300
48 #define CURSOR 5
49 #endif
50 
51 static uv_rwlock_t g_closed_uv_loop_rwlock;
52 static uv_once_t once = UV_ONCE_INIT;
53 static uv_cond_t cond;
54 static uv_mutex_t mutex;
55 static unsigned int idle_threads;
56 static unsigned int nthreads;
57 static uv_thread_t* threads;
58 static uv_thread_t default_threads[4];
59 static struct uv__queue exit_message;
60 static struct uv__queue wq;
61 static struct uv__queue run_slow_work_message;
62 static struct uv__queue slow_io_pending_wq;
63 
64 
65 #ifdef UV_STATISTIC
66 #define MAX_DUMP_QUEUE_SIZE 200
67 static uv_mutex_t dump_queue_mutex;
68 static QUEUE dump_queue;
69 static unsigned int dump_queue_size;
70 
71 static int statistic_idle;
72 static uv_mutex_t statistic_mutex;
73 static QUEUE statistic_works;
74 static uv_cond_t dump_cond;
75 static uv_thread_t dump_thread;
76 
uv_dump_worker(void * arg)77 static void uv_dump_worker(void*  arg) {
78   struct uv__statistic_work* w;
79   struct uv__queue* q;
80   uv_sem_post((uv_sem_t*) arg);
81   arg = NULL;
82   uv_mutex_lock(&statistic_mutex);
83   for (;;) {
84     while (uv__queue_empty(&statistic_works)) {
85       statistic_idle = 1;
86       uv_cond_wait(&dump_cond, &statistic_mutex);
87       statistic_idle = 0;
88     }
89     q = uv__queue_head(&statistic_works);
90     if (q == &exit_message) {
91       uv_cond_signal(&dump_cond);
92       uv_mutex_unlock(&statistic_mutex);
93       break;
94     }
95     uv__queue_remove(q);
96     uv__queue_init(q);
97     uv_mutex_unlock(&statistic_mutex);
98     w = uv__queue_data(q, struct uv__statistic_work, wq);
99     w->work(w);
100     free(w);
101     uv_mutex_lock(&statistic_mutex);
102   }
103 }
104 
post_statistic_work(struct uv__queue * q)105 static void post_statistic_work(struct uv__queue* q) {
106   uv_mutex_lock(&statistic_mutex);
107   uv__queue_insert_tail(&statistic_works, q);
108   if (statistic_idle)
109     uv_cond_signal(&dump_cond);
110   uv_mutex_unlock(&statistic_mutex);
111 }
112 
uv__queue_work_info(struct uv__statistic_work * work)113 static void uv__queue_work_info(struct uv__statistic_work *work) {
114   uv_mutex_lock(&dump_queue_mutex);
115   if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */
116     struct uv__queue* q;
117     uv__queue_foreach(q, &dump_queue) {
118       struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
119       if (info->state == DONE_END) {
120         uv__queue_remove(q);
121         free(info);
122         dump_queue_size--;
123       }
124     }
125     if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) {
126       abort(); /* too many works not done. */
127     }
128   }
129 
130   uv__queue_insert_head(&dump_queue,  &work->info->wq);
131   dump_queue_size++;
132   uv_mutex_unlock(&dump_queue_mutex);
133 }
134 
uv__update_work_info(struct uv__statistic_work * work)135 static void uv__update_work_info(struct uv__statistic_work *work) {
136   uv_mutex_lock(&dump_queue_mutex);
137   if (work != NULL && work->info != NULL) {
138     work->info->state = work->state;
139     switch (work->state) {
140       case WAITING:
141         work->info->queue_time = work->time;
142         break;
143       case WORK_EXECUTING:
144         work->info->execute_start_time = work->time;
145         break;
146       case WORK_END:
147         work->info->execute_end_time = work->time;
148         break;
149       case DONE_EXECUTING:
150         work->info->done_start_time = work->time;
151         break;
152       case DONE_END:
153         work->info->done_end_time = work->time;
154         break;
155       default:
156         break;
157     }
158   }
159   uv_mutex_unlock(&dump_queue_mutex);
160 }
161 
162 
163 // return the timestamp in millisecond
uv__now_timestamp()164 static uint64_t uv__now_timestamp() {
165   uv_timeval64_t tv;
166   int r = uv_gettimeofday(&tv);
167   if (r != 0) {
168     return 0;
169   }
170   return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
171 }
172 
173 
uv__post_statistic_work(struct uv__work * w,enum uv_work_state state)174 static void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) {
175   struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
176   if (dump_work == NULL) {
177     return;
178   }
179   dump_work->info = w->info;
180   dump_work->work = uv__update_work_info;
181   dump_work->time = uv__now_timestamp();
182   dump_work->state = state;
183   uv__queue_init(&dump_work->wq);
184   post_statistic_work(&dump_work->wq);
185 }
186 
187 
init_work_dump_queue()188 static void init_work_dump_queue()
189 {
190   if (uv_mutex_init(&dump_queue_mutex))
191     abort();
192   uv_mutex_lock(&dump_queue_mutex);
193   uv__queue_init(&dump_queue);
194   dump_queue_size = 0;
195   uv_mutex_unlock(&dump_queue_mutex);
196 
197   /* init dump thread */
198   statistic_idle = 1;
199   if (uv_mutex_init(&statistic_mutex))
200     abort();
201   uv__queue_init(&statistic_works);
202   uv_sem_t sem;
203   if (uv_cond_init(&dump_cond))
204     abort();
205   if (uv_sem_init(&sem, 0))
206     abort();
207   if (uv_thread_create(&dump_thread, uv_dump_worker, &sem))
208       abort();
209   uv_sem_wait(&sem);
210   uv_sem_destroy(&sem);
211 }
212 
213 
uv_init_dump_info(struct uv_work_dump_info * info,struct uv__work * w)214 void uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) {
215   if (info == NULL)
216     return;
217   info->queue_time = 0;
218   info->state = WAITING;
219   info->execute_start_time = 0;
220   info->execute_end_time = 0;
221   info->done_start_time = 0;
222   info->done_end_time = 0;
223   info->work = w;
224   uv__queue_init(&info->wq);
225 }
226 
227 
uv_queue_statics(struct uv_work_dump_info * info)228 void uv_queue_statics(struct uv_work_dump_info* info) {
229   struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
230   if (dump_work == NULL) {
231     abort();
232   }
233   dump_work->info = info;
234   dump_work->work = uv__queue_work_info;
235   info->queue_time = uv__now_timestamp();
236   dump_work->state = WAITING;
237   uv__queue_init(&dump_work->wq);
238   post_statistic_work(&dump_work->wq);
239 }
240 
241 
uv_dump_work_queue(int * size)242 uv_worker_info_t* uv_dump_work_queue(int* size) {
243 #ifdef UV_STATISTIC
244   uv_mutex_lock(&dump_queue_mutex);
245   if (uv__queue_empty(&dump_queue)) {
246     return NULL;
247   }
248   *size = dump_queue_size;
249   uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size);
250   struct uv__queue* q;
251   int i = 0;
252   uv__queue_foreach(q, &dump_queue) {
253     struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
254     dump_info[i].queue_time = info->queue_time;
255     dump_info[i].builtin_return_address[0] = info->builtin_return_address[0];
256     dump_info[i].builtin_return_address[1] = info->builtin_return_address[1];
257     dump_info[i].builtin_return_address[2] = info->builtin_return_address[2];
258     switch (info->state) {
259       case WAITING:
260         strcpy(dump_info[i].state, "waiting");
261         break;
262       case WORK_EXECUTING:
263         strcpy(dump_info[i].state, "work_executing");
264         break;
265       case WORK_END:
266         strcpy(dump_info[i].state, "work_end");
267         break;
268       case DONE_EXECUTING:
269         strcpy(dump_info[i].state, "done_executing");
270         break;
271       case DONE_END:
272         strcpy(dump_info[i].state, "done_end");
273         break;
274       default:
275         break;
276     }
277     dump_info[i].execute_start_time = info->execute_start_time;
278     dump_info[i].execute_end_time = info->execute_end_time;
279     dump_info[i].done_start_time = info->done_start_time;
280     dump_info[i].done_end_time = info->done_end_time;
281     ++i;
282   }
283   uv_mutex_unlock(&dump_queue_mutex);
284   return dump_info;
285 #else
286   size = 0;
287   return NULL;
288 #endif
289 }
290 #endif
291 
292 
init_closed_uv_loop_rwlock_once(void)293 static void init_closed_uv_loop_rwlock_once(void) {
294   uv_rwlock_init(&g_closed_uv_loop_rwlock);
295 }
296 
297 
rdlock_closed_uv_loop_rwlock(void)298 void rdlock_closed_uv_loop_rwlock(void) {
299   uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
300 }
301 
302 
rdunlock_closed_uv_loop_rwlock(void)303 void rdunlock_closed_uv_loop_rwlock(void) {
304   uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
305 }
306 
307 
is_uv_loop_good_magic(const uv_loop_t * loop)308 int is_uv_loop_good_magic(const uv_loop_t* loop) {
309   if (loop->magic == UV_LOOP_MAGIC) {
310     return 1;
311   }
312   UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop, loop->magic);
313   return 0;
314 }
315 
316 
on_uv_loop_close(uv_loop_t * loop)317 void on_uv_loop_close(uv_loop_t* loop) {
318   time_t t1, t2;
319   time(&t1);
320   uv_start_trace(UV_TRACE_TAG, "Get Write Lock");
321   uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
322   uv_end_trace(UV_TRACE_TAG);
323   loop->magic = ~UV_LOOP_MAGIC;
324   uv_start_trace(UV_TRACE_TAG, "Release Write Lock");
325   uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
326   uv_end_trace(UV_TRACE_TAG);
327   time(&t2);
328   UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop, (ssize_t)(t2 - t1));
329 }
330 
331 
uv__cancelled(struct uv__work * w)332 static void uv__cancelled(struct uv__work* w) {
333   abort();
334 }
335 
336 
337 #ifndef USE_FFRT
338 static unsigned int slow_io_work_running;
339 
slow_work_thread_threshold(void)340 static unsigned int slow_work_thread_threshold(void) {
341   return (nthreads + 1) / 2;
342 }
343 
344 
345 /* To avoid deadlock with uv_cancel() it's crucial that the worker
346  * never holds the global mutex and the loop-local mutex at the same time.
347  */
worker(void * arg)348 static void worker(void* arg) {
349   struct uv__work* w;
350   struct uv__queue* q;
351   int is_slow_work;
352 
353   uv_sem_post((uv_sem_t*) arg);
354   arg = NULL;
355 
356   uv_mutex_lock(&mutex);
357   for (;;) {
358     /* `mutex` should always be locked at this point. */
359 
360     /* Keep waiting while either no work is present or only slow I/O
361        and we're at the threshold for that. */
362     while (uv__queue_empty(&wq) ||
363            (uv__queue_head(&wq) == &run_slow_work_message &&
364             uv__queue_next(&run_slow_work_message) == &wq &&
365             slow_io_work_running >= slow_work_thread_threshold())) {
366       idle_threads += 1;
367       uv_cond_wait(&cond, &mutex);
368       idle_threads -= 1;
369     }
370 
371     q = uv__queue_head(&wq);
372     if (q == &exit_message) {
373       uv_cond_signal(&cond);
374       uv_mutex_unlock(&mutex);
375       break;
376     }
377 
378     uv__queue_remove(q);
379     uv__queue_init(q);  /* Signal uv_cancel() that the work req is executing. */
380 
381     is_slow_work = 0;
382     if (q == &run_slow_work_message) {
383       /* If we're at the slow I/O threshold, re-schedule until after all
384          other work in the queue is done. */
385       if (slow_io_work_running >= slow_work_thread_threshold()) {
386         uv__queue_insert_tail(&wq, q);
387         continue;
388       }
389 
390       /* If we encountered a request to run slow I/O work but there is none
391          to run, that means it's cancelled => Start over. */
392       if (uv__queue_empty(&slow_io_pending_wq))
393         continue;
394 
395       is_slow_work = 1;
396       slow_io_work_running++;
397 
398       q = uv__queue_head(&slow_io_pending_wq);
399       uv__queue_remove(q);
400       uv__queue_init(q);
401 
402       /* If there is more slow I/O work, schedule it to be run as well. */
403       if (!uv__queue_empty(&slow_io_pending_wq)) {
404         uv__queue_insert_tail(&wq, &run_slow_work_message);
405         if (idle_threads > 0)
406           uv_cond_signal(&cond);
407       }
408     }
409 
410     uv_mutex_unlock(&mutex);
411 
412     w = uv__queue_data(q, struct uv__work, wq);
413 #ifdef UV_STATISTIC
414     uv__post_statistic_work(w, WORK_EXECUTING);
415 #endif
416     w->work(w);
417 #ifdef UV_STATISTIC
418     uv__post_statistic_work(w, WORK_END);
419 #endif
420     uv_mutex_lock(&w->loop->wq_mutex);
421     w->work = NULL;  /* Signal uv_cancel() that the work req is done
422                         executing. */
423     uv__queue_insert_tail(&w->loop->wq, &w->wq);
424     uv_async_send(&w->loop->wq_async);
425     uv_mutex_unlock(&w->loop->wq_mutex);
426 
427     /* Lock `mutex` since that is expected at the start of the next
428      * iteration. */
429     uv_mutex_lock(&mutex);
430     if (is_slow_work) {
431       /* `slow_io_work_running` is protected by `mutex`. */
432       slow_io_work_running--;
433     }
434   }
435 }
436 #endif
437 
438 
post(struct uv__queue * q,enum uv__work_kind kind)439 static void post(struct uv__queue* q, enum uv__work_kind kind) {
440   uv_mutex_lock(&mutex);
441   if (kind == UV__WORK_SLOW_IO) {
442     /* Insert into a separate queue. */
443     uv__queue_insert_tail(&slow_io_pending_wq, q);
444     if (!uv__queue_empty(&run_slow_work_message)) {
445       /* Running slow I/O tasks is already scheduled => Nothing to do here.
446          The worker that runs said other task will schedule this one as well. */
447       uv_mutex_unlock(&mutex);
448       return;
449     }
450     q = &run_slow_work_message;
451   }
452 
453   uv__queue_insert_tail(&wq, q);
454   if (idle_threads > 0)
455     uv_cond_signal(&cond);
456   uv_mutex_unlock(&mutex);
457 }
458 
459 
460 #ifdef __MVS__
461 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */
462 __attribute__((destructor))
463 #endif
uv__threadpool_cleanup(void)464 void uv__threadpool_cleanup(void) {
465   unsigned int i;
466 
467   if (nthreads == 0)
468     return;
469 
470 #ifndef __MVS__
471   /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
472   post(&exit_message, UV__WORK_CPU);
473 #endif
474 
475   for (i = 0; i < nthreads; i++)
476     if (uv_thread_join(threads + i))
477       abort();
478 
479   if (threads != default_threads)
480     uv__free(threads);
481 
482   uv_mutex_destroy(&mutex);
483   uv_cond_destroy(&cond);
484 
485   threads = NULL;
486   nthreads = 0;
487 #ifdef UV_STATISTIC
488   post_statistic_work(&exit_message);
489   uv_thread_join(dump_thread);
490   uv_mutex_destroy(&statistic_mutex);
491   uv_cond_destroy(&dump_cond);
492 #endif
493 }
494 
495 
496 #ifndef USE_FFRT
init_threads(void)497 static void init_threads(void) {
498   uv_thread_options_t config;
499   unsigned int i;
500   const char* val;
501   uv_sem_t sem;
502 
503   nthreads = ARRAY_SIZE(default_threads);
504   val = getenv("UV_THREADPOOL_SIZE");
505   if (val != NULL)
506     nthreads = atoi(val);
507   if (nthreads == 0)
508     nthreads = 1;
509   if (nthreads > MAX_THREADPOOL_SIZE)
510     nthreads = MAX_THREADPOOL_SIZE;
511 
512   threads = default_threads;
513   if (nthreads > ARRAY_SIZE(default_threads)) {
514     threads = uv__malloc(nthreads * sizeof(threads[0]));
515     if (threads == NULL) {
516       nthreads = ARRAY_SIZE(default_threads);
517       threads = default_threads;
518     }
519   }
520 
521   if (uv_cond_init(&cond))
522     abort();
523 
524   if (uv_mutex_init(&mutex))
525     abort();
526 
527   uv__queue_init(&wq);
528   uv__queue_init(&slow_io_pending_wq);
529   uv__queue_init(&run_slow_work_message);
530 
531   if (uv_sem_init(&sem, 0))
532     abort();
533 
534   config.flags = UV_THREAD_HAS_STACK_SIZE;
535   config.stack_size = 8u << 20;  /* 8 MB */
536 
537   for (i = 0; i < nthreads; i++)
538     if (uv_thread_create_ex(threads + i, &config, worker, &sem))
539       abort();
540 
541   for (i = 0; i < nthreads; i++)
542     uv_sem_wait(&sem);
543 
544   uv_sem_destroy(&sem);
545 }
546 
547 
548 #ifndef _WIN32
reset_once(void)549 static void reset_once(void) {
550   uv_once_t child_once = UV_ONCE_INIT;
551   memcpy(&once, &child_once, sizeof(child_once));
552 }
553 #endif
554 
555 
init_once(void)556 static void init_once(void) {
557 #ifndef _WIN32
558   /* Re-initialize the threadpool after fork.
559    * Note that this discards the global mutex and condition as well
560    * as the work queue.
561    */
562   if (pthread_atfork(NULL, NULL, &reset_once))
563     abort();
564 #endif
565   init_closed_uv_loop_rwlock_once();
566 #ifdef UV_STATISTIC
567   init_work_dump_queue();
568 #endif
569   init_threads();
570 }
571 
572 
uv__work_submit(uv_loop_t * loop,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))573 void uv__work_submit(uv_loop_t* loop,
574                      struct uv__work* w,
575                      enum uv__work_kind kind,
576                      void (*work)(struct uv__work* w),
577                      void (*done)(struct uv__work* w, int status)) {
578   uv_once(&once, init_once);
579   w->loop = loop;
580   w->work = work;
581   w->done = done;
582   post(&w->wq, kind);
583 }
584 #endif
585 
586 
uv__print_active_reqs(uv_loop_t * loop,const char * flag)587 static void uv__print_active_reqs(uv_loop_t* loop, const char* flag) {
588 #ifdef USE_OHOS_DFX
589   unsigned int count = loop->active_reqs.count;
590   if (count == MIN_REQS_THRESHOLD || count == MIN_REQS_THRESHOLD + CURSOR ||
591       count == MAX_REQS_THRESHOLD || count == MAX_REQS_THRESHOLD + CURSOR) {
592     UV_LOGW("loop:%{public}zu, flag:%{public}s, active reqs:%{public}u", (size_t)loop, flag, count);
593   }
594 #else
595   return;
596 #endif
597 }
598 
599 
600 #ifdef USE_FFRT
uv__task_done_wrapper(void * work,int status)601 static void uv__task_done_wrapper(void* work, int status) {
602   struct uv__work* w = (struct uv__work*)work;
603   uv__print_active_reqs(w->loop, "complete");
604   w->done(w, status);
605 }
606 #endif
607 
608 
609 /* TODO(bnoordhuis) teach libuv how to cancel file operations
610  * that go through io_uring instead of the thread pool.
611  */
uv__work_cancel(uv_loop_t * loop,uv_req_t * req,struct uv__work * w)612 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
613   int cancelled;
614 
615   rdlock_closed_uv_loop_rwlock();
616   if (!is_uv_loop_good_magic(w->loop)) {
617     rdunlock_closed_uv_loop_rwlock();
618     return 0;
619   }
620 
621 #ifndef USE_FFRT
622   uv_mutex_lock(&mutex);
623   uv_mutex_lock(&w->loop->wq_mutex);
624 
625   cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
626   if (cancelled)
627     uv__queue_remove(&w->wq);
628 
629   uv_mutex_unlock(&w->loop->wq_mutex);
630   uv_mutex_unlock(&mutex);
631 #else
632   uv_mutex_lock(&w->loop->wq_mutex);
633   cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
634     && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]);
635   uv_mutex_unlock(&w->loop->wq_mutex);
636 #endif
637 
638   if (!cancelled) {
639     rdunlock_closed_uv_loop_rwlock();
640     return UV_EBUSY;
641   }
642 
643   w->work = uv__cancelled;
644   uv_mutex_lock(&loop->wq_mutex);
645 #ifndef USE_FFRT
646   uv__queue_insert_tail(&loop->wq, &w->wq);
647   uv_async_send(&loop->wq_async);
648 #else
649   uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
650   int qos = (ffrt_qos_t)(intptr_t)req->reserved[0];
651 
652   if (uv_check_data_valid((struct uv_loop_data*)(w->loop->data)) == 0) {
653     int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
654     struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data -
655       (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
656     addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
657   } else {
658     uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
659     uv_async_send(&loop->wq_async);
660   }
661 #endif
662   uv_mutex_unlock(&loop->wq_mutex);
663   rdunlock_closed_uv_loop_rwlock();
664 
665   return 0;
666 }
667 
668 
uv__work_done(uv_async_t * handle)669 void uv__work_done(uv_async_t* handle) {
670   struct uv__work* w;
671   uv_loop_t* loop;
672   struct uv__queue* q;
673   struct uv__queue wq;
674   int err;
675   int nevents;
676 
677   loop = container_of(handle, uv_loop_t, wq_async);
678 #ifdef USE_OHOS_DFX
679   if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) != 0)
680     uv__print_active_reqs(loop, "complete");
681 #endif
682   rdlock_closed_uv_loop_rwlock();
683   if (!is_uv_loop_good_magic(loop)) {
684     rdunlock_closed_uv_loop_rwlock();
685     return;
686   }
687   rdunlock_closed_uv_loop_rwlock();
688 
689   uv_mutex_lock(&loop->wq_mutex);
690 #ifndef USE_FFRT
691   uv__queue_move(&loop->wq, &wq);
692 #else
693   uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
694   int i;
695   uv__queue_init(&wq);
696   for (i = 4; i >= 0; i--) {
697     if (!uv__queue_empty(&lfields->wq_sub[i])) {
698       uv__queue_append(&lfields->wq_sub[i], &wq);
699     }
700   }
701 #endif
702   uv_mutex_unlock(&loop->wq_mutex);
703 
704   nevents = 0;
705   uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
706   while (!uv__queue_empty(&wq)) {
707     q = uv__queue_head(&wq);
708     uv__queue_remove(q);
709 
710     w = container_of(q, struct uv__work, wq);
711     err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
712 #ifdef UV_STATISTIC
713     uv__post_statistic_work(w, DONE_EXECUTING);
714     struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
715     if (dump_work == NULL) {
716       UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno));
717       break;
718     }
719     dump_work->info = w->info;
720     dump_work->work = uv__update_work_info;
721 #endif
722     w->done(w, err);
723     nevents++;
724 #ifdef UV_STATISTIC
725     dump_work->time = uv__now_timestamp();
726     dump_work->state = DONE_END;
727     QUEUE_INIT(&dump_work->wq);
728     post_statistic_work(&dump_work->wq);
729 #endif
730   }
731   uv_end_trace(UV_TRACE_TAG);
732 
733   /* This check accomplishes 2 things:
734    * 1. Even if the queue was empty, the call to uv__work_done() should count
735    *    as an event. Which will have been added by the event loop when
736    *    calling this callback.
737    * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
738    */
739   if (nevents > 1) {
740     /* Subtract 1 to counter the call to uv__work_done(). */
741     uv__metrics_inc_events(loop, nevents - 1);
742     if (uv__get_internal_fields(loop)->current_timeout == 0)
743       uv__metrics_inc_events_waiting(loop, nevents - 1);
744   }
745 }
746 
747 
uv__queue_work(struct uv__work * w)748 static void uv__queue_work(struct uv__work* w) {
749   uv_work_t* req = container_of(w, uv_work_t, work_req);
750 #ifdef ASYNC_STACKTRACE
751   LibuvSetStackId((uint64_t)req->reserved[3]);
752 #endif
753   req->work_cb(req);
754 }
755 
756 
uv__queue_done(struct uv__work * w,int err)757 static void uv__queue_done(struct uv__work* w, int err) {
758   uv_work_t* req;
759 
760   if (w == NULL) {
761     UV_LOGE("uv_work_t is NULL");
762     return;
763   }
764 
765   req = container_of(w, uv_work_t, work_req);
766 #ifdef ASYNC_STACKTRACE
767   LibuvSetStackId((uint64_t)req->reserved[3]);
768 #endif
769   uv__req_unregister(req->loop, req);
770 
771   if (req->after_work_cb == NULL)
772     return;
773 
774   req->after_work_cb(req, err);
775 }
776 
777 
778 #ifdef USE_FFRT
uv__ffrt_work(ffrt_executor_task_t * data,ffrt_qos_t qos)779 void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
780 {
781   struct uv__work* w = (struct uv__work *)data;
782   uv_loop_t* loop = w->loop;
783 #ifdef UV_STATISTIC
784   uv__post_statistic_work(w, WORK_EXECUTING);
785 #endif
786   w->work(w);
787 #ifdef UV_STATISTIC
788   uv__post_statistic_work(w, WORK_END);
789 #endif
790   rdlock_closed_uv_loop_rwlock();
791   if (loop->magic != UV_LOOP_MAGIC) {
792     rdunlock_closed_uv_loop_rwlock();
793     UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid",
794             (size_t)loop, loop->magic);
795     return;
796   }
797 
798   uv_mutex_lock(&loop->wq_mutex);
799   w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
800 
801   if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) {
802     int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
803     struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data -
804       (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
805     addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
806   } else {
807     uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
808     uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
809     uv_async_send(&loop->wq_async);
810   }
811   uv_mutex_unlock(&loop->wq_mutex);
812   rdunlock_closed_uv_loop_rwlock();
813 }
814 
init_once(void)815 static void init_once(void)
816 {
817   init_closed_uv_loop_rwlock_once();
818   /* init uv work statics queue */
819 #ifdef UV_STATISTIC
820   init_work_dump_queue();
821 #endif
822   ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
823 }
824 
825 
826 /* ffrt uv__work_submit */
uv__work_submit(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))827 void uv__work_submit(uv_loop_t* loop,
828                      uv_req_t* req,
829                      struct uv__work* w,
830                      enum uv__work_kind kind,
831                      void (*work)(struct uv__work *w),
832                      void (*done)(struct uv__work *w, int status)) {
833   uv_once(&once, init_once);
834   ffrt_task_attr_t attr;
835   ffrt_task_attr_init(&attr);
836 
837   switch(kind) {
838     case UV__WORK_CPU:
839       ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
840       break;
841     case UV__WORK_FAST_IO:
842       ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
843       break;
844     case UV__WORK_SLOW_IO:
845       ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
846       break;
847     default:
848 #ifdef USE_OHOS_DFX
849       UV_LOGI("Unknown work kind");
850 #endif
851       return;
852   }
853 
854   w->loop = loop;
855   w->work = work;
856   w->done = done;
857 
858   req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
859   ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
860   ffrt_task_attr_destroy(&attr);
861 }
862 
863 
864 /* ffrt uv__work_submit */
uv__work_submit_with_qos(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,ffrt_qos_t qos,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))865 void uv__work_submit_with_qos(uv_loop_t* loop,
866                      uv_req_t* req,
867                      struct uv__work* w,
868                      ffrt_qos_t qos,
869                      void (*work)(struct uv__work *w),
870                      void (*done)(struct uv__work *w, int status)) {
871     uv_once(&once, init_once);
872     ffrt_task_attr_t attr;
873     ffrt_task_attr_init(&attr);
874     ffrt_task_attr_set_qos(&attr, qos);
875 
876     w->loop = loop;
877     w->work = work;
878     w->done = done;
879 
880     req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
881     ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
882     ffrt_task_attr_destroy(&attr);
883 }
884 #endif
885 
886 
uv_queue_work(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb)887 int uv_queue_work(uv_loop_t* loop,
888                   uv_work_t* req,
889                   uv_work_cb work_cb,
890                   uv_after_work_cb after_work_cb) {
891   if (work_cb == NULL)
892     return UV_EINVAL;
893 
894   uv__print_active_reqs(loop, "execute");
895   uv__req_init(loop, req, UV_WORK);
896   req->loop = loop;
897   req->work_cb = work_cb;
898   req->after_work_cb = after_work_cb;
899 
900 #ifdef UV_STATISTIC
901   struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info));
902   if (info == NULL) {
903     abort();
904   }
905   uv_init_dump_info(info, &req->work_req);
906   info->builtin_return_address[0] = __builtin_return_address(0);
907   info->builtin_return_address[1] = __builtin_return_address(1);
908   info->builtin_return_address[2] = __builtin_return_address(2);
909   (req->work_req).info = info;
910 #endif
911 #ifdef ASYNC_STACKTRACE
912   req->reserved[3] = (void*)LibuvCollectAsyncStack();
913 #endif
914   uv__work_submit(loop,
915 #ifdef USE_FFRT
916                   (uv_req_t*)req,
917 #endif
918                   &req->work_req,
919                   UV__WORK_CPU,
920                   uv__queue_work,
921                   uv__queue_done
922 );
923 #ifdef UV_STATISTIC
924   uv_queue_statics(info);
925 #endif
926   return 0;
927 }
928 
929 
uv_queue_work_with_qos(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb,uv_qos_t qos)930 int uv_queue_work_with_qos(uv_loop_t* loop,
931                   uv_work_t* req,
932                   uv_work_cb work_cb,
933                   uv_after_work_cb after_work_cb,
934                   uv_qos_t qos) {
935 #ifdef USE_FFRT
936   if (work_cb == NULL)
937     return UV_EINVAL;
938 
939   STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
940   STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
941   STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
942   STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
943   STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_deadline_request);
944   if (qos < ffrt_qos_background || qos > ffrt_qos_deadline_request) {
945     return UV_EINVAL;
946   }
947 
948   uv__print_active_reqs(loop, "execute");
949   uv__req_init(loop, req, UV_WORK);
950   req->loop = loop;
951   req->work_cb = work_cb;
952   req->after_work_cb = after_work_cb;
953 #ifdef UV_STATISTIC
954   struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info));
955   if (info == NULL) {
956     abort();
957   }
958   uv_init_dump_info(info, &req->work_req);
959   info->builtin_return_address[0] = __builtin_return_address(0);
960   info->builtin_return_address[1] = __builtin_return_address(1);
961   info->builtin_return_address[2] = __builtin_return_address(2);
962   (req->work_req).info = info;
963 #endif
964   uv__work_submit_with_qos(loop,
965                   (uv_req_t*)req,
966                   &req->work_req,
967                   (ffrt_qos_t)qos,
968                   uv__queue_work,
969                   uv__queue_done);
970 #ifdef UV_STATISTIC
971   uv_queue_statics(info);
972 #endif
973   return 0;
974 #else
975   return uv_queue_work(loop, req, work_cb, after_work_cb);
976 #endif
977 }
978 
979 
uv_cancel(uv_req_t * req)980 int uv_cancel(uv_req_t* req) {
981   struct uv__work* wreq;
982   uv_loop_t* loop;
983 
984   switch (req->type) {
985   case UV_FS:
986     loop =  ((uv_fs_t*) req)->loop;
987     wreq = &((uv_fs_t*) req)->work_req;
988     break;
989   case UV_GETADDRINFO:
990     loop =  ((uv_getaddrinfo_t*) req)->loop;
991     wreq = &((uv_getaddrinfo_t*) req)->work_req;
992     break;
993   case UV_GETNAMEINFO:
994     loop = ((uv_getnameinfo_t*) req)->loop;
995     wreq = &((uv_getnameinfo_t*) req)->work_req;
996     break;
997   case UV_RANDOM:
998     loop = ((uv_random_t*) req)->loop;
999     wreq = &((uv_random_t*) req)->work_req;
1000     break;
1001   case UV_WORK:
1002     loop =  ((uv_work_t*) req)->loop;
1003     wreq = &((uv_work_t*) req)->work_req;
1004     break;
1005   default:
1006     return UV_EINVAL;
1007   }
1008 
1009   return uv__work_cancel(loop, req, wreq);
1010 }
1011