• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv-common.h"
23 #include "uv_log.h"
24 #include "uv_trace.h"
25 
26 #if !defined(_WIN32)
27 # include "unix/internal.h"
28 #else
29 # include "win/internal.h"
30 #endif
31 
32 #include <stdlib.h>
33 #ifdef USE_FFRT
34 #include <assert.h>
35 #include "ffrt_inner.h"
36 #endif
37 #include <stdio.h>
38 #ifdef ASYNC_STACKTRACE
39 #include "dfx/async_stack/libuv_async_stack.h"
40 #endif
41 
42 #define MAX_THREADPOOL_SIZE 1024
43 #define UV_TRACE_NAME "UV_TRACE"
44 
45 #ifdef USE_OHOS_DFX
46 #define MIN_REQS_THRESHOLD 100
47 #define MAX_REQS_THRESHOLD 300
48 #define CURSOR 5
49 #endif
50 
51 typedef enum {
52   /* ffrt qos */
53   FFRT_QOS = 0,
54   /* record task name */
55   DFX_TASK_NAME,
56   /* ffrt task handle */
57   FFRT_TASK_DEPENDENCE,
58   /* collect asynchronous task stack */
59   DFX_ASYNC_STACK,
60 } req_reversed;
61 
62 #ifdef USE_FFRT
63 static uv_rwlock_t g_closed_uv_loop_rwlock;
64 #endif
65 static uv_once_t once = UV_ONCE_INIT;
66 static uv_cond_t cond;
67 static uv_mutex_t mutex;
68 static unsigned int idle_threads;
69 static unsigned int nthreads;
70 static uv_thread_t* threads;
71 static uv_thread_t default_threads[4];
72 static struct uv__queue exit_message;
73 static struct uv__queue wq;
74 static struct uv__queue run_slow_work_message;
75 static struct uv__queue slow_io_pending_wq;
76 
77 
78 #ifdef USE_FFRT
init_closed_uv_loop_rwlock_once(void)79 static void init_closed_uv_loop_rwlock_once(void) {
80   uv_rwlock_init(&g_closed_uv_loop_rwlock);
81 }
82 
83 
rdlock_closed_uv_loop_rwlock(void)84 void rdlock_closed_uv_loop_rwlock(void) {
85   uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
86 }
87 
88 
rdunlock_closed_uv_loop_rwlock(void)89 void rdunlock_closed_uv_loop_rwlock(void) {
90   uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
91 }
92 
93 
is_uv_loop_good_magic(const uv_loop_t * loop)94 int is_uv_loop_good_magic(const uv_loop_t* loop) {
95   if (loop->magic == UV_LOOP_MAGIC) {
96     return 1;
97   }
98   UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop % UV_ADDR_MOD, loop->magic);
99   return 0;
100 }
101 #endif
102 
103 
on_uv_loop_close(uv_loop_t * loop)104 void on_uv_loop_close(uv_loop_t* loop) {
105   time_t t1, t2;
106   time(&t1);
107 #ifdef USE_FFRT
108   uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
109   loop->magic = ~UV_LOOP_MAGIC;
110   uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
111 #endif
112   time(&t2);
113   UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop % UV_ADDR_MOD, (ssize_t)(t2 - t1));
114 }
115 
116 
117 #ifdef USE_FFRT
uv__cancelled(struct uv__work * w,int qos)118 static void uv__cancelled(struct uv__work* w, int qos) {
119 #else
120 static void uv__cancelled(struct uv__work* w) {
121 #endif
122   abort();
123 }
124 
125 
126 #ifndef USE_FFRT
127 static unsigned int slow_io_work_running;
128 
129 static unsigned int slow_work_thread_threshold(void) {
130   return (nthreads + 1) / 2;
131 }
132 
133 
134 /* To avoid deadlock with uv_cancel() it's crucial that the worker
135  * never holds the global mutex and the loop-local mutex at the same time.
136  */
137 static void worker(void* arg) {
138   struct uv__work* w;
139   struct uv__queue* q;
140   int is_slow_work;
141 
142   uv_sem_post((uv_sem_t*) arg);
143   arg = NULL;
144 
145   uv_mutex_lock(&mutex);
146   for (;;) {
147     /* `mutex` should always be locked at this point. */
148 
149     /* Keep waiting while either no work is present or only slow I/O
150        and we're at the threshold for that. */
151     while (uv__queue_empty(&wq) ||
152            (uv__queue_head(&wq) == &run_slow_work_message &&
153             uv__queue_next(&run_slow_work_message) == &wq &&
154             slow_io_work_running >= slow_work_thread_threshold())) {
155       idle_threads += 1;
156       uv_cond_wait(&cond, &mutex);
157       idle_threads -= 1;
158     }
159 
160     q = uv__queue_head(&wq);
161     if (q == &exit_message) {
162       uv_cond_signal(&cond);
163       uv_mutex_unlock(&mutex);
164       break;
165     }
166 
167     uv__queue_remove(q);
168     uv__queue_init(q);  /* Signal uv_cancel() that the work req is executing. */
169 
170     is_slow_work = 0;
171     if (q == &run_slow_work_message) {
172       /* If we're at the slow I/O threshold, re-schedule until after all
173          other work in the queue is done. */
174       if (slow_io_work_running >= slow_work_thread_threshold()) {
175         uv__queue_insert_tail(&wq, q);
176         continue;
177       }
178 
179       /* If we encountered a request to run slow I/O work but there is none
180          to run, that means it's cancelled => Start over. */
181       if (uv__queue_empty(&slow_io_pending_wq))
182         continue;
183 
184       is_slow_work = 1;
185       slow_io_work_running++;
186 
187       q = uv__queue_head(&slow_io_pending_wq);
188       uv__queue_remove(q);
189       uv__queue_init(q);
190 
191       /* If there is more slow I/O work, schedule it to be run as well. */
192       if (!uv__queue_empty(&slow_io_pending_wq)) {
193         uv__queue_insert_tail(&wq, &run_slow_work_message);
194         if (idle_threads > 0)
195           uv_cond_signal(&cond);
196       }
197     }
198 
199     uv_mutex_unlock(&mutex);
200 
201     w = uv__queue_data(q, struct uv__work, wq);
202     w->work(w);
203     uv_mutex_lock(&w->loop->wq_mutex);
204     w->work = NULL;  /* Signal uv_cancel() that the work req is done
205                         executing. */
206     uv__queue_insert_tail(&w->loop->wq, &w->wq);
207     uv_async_send(&w->loop->wq_async);
208     uv_mutex_unlock(&w->loop->wq_mutex);
209 
210     /* Lock `mutex` since that is expected at the start of the next
211      * iteration. */
212     uv_mutex_lock(&mutex);
213     if (is_slow_work) {
214       /* `slow_io_work_running` is protected by `mutex`. */
215       slow_io_work_running--;
216     }
217   }
218 }
219 #endif
220 
221 
222 static void post(struct uv__queue* q, enum uv__work_kind kind) {
223   uv_mutex_lock(&mutex);
224   if (kind == UV__WORK_SLOW_IO) {
225     /* Insert into a separate queue. */
226     uv__queue_insert_tail(&slow_io_pending_wq, q);
227     if (!uv__queue_empty(&run_slow_work_message)) {
228       /* Running slow I/O tasks is already scheduled => Nothing to do here.
229          The worker that runs said other task will schedule this one as well. */
230       uv_mutex_unlock(&mutex);
231       return;
232     }
233     q = &run_slow_work_message;
234   }
235 
236   uv__queue_insert_tail(&wq, q);
237   if (idle_threads > 0)
238     uv_cond_signal(&cond);
239   uv_mutex_unlock(&mutex);
240 }
241 
242 
243 #ifdef __MVS__
244 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */
245 __attribute__((destructor))
246 #endif
247 void uv__threadpool_cleanup(void) {
248   unsigned int i;
249 
250   if (nthreads == 0)
251     return;
252 
253 #ifndef __MVS__
254   /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
255   post(&exit_message, UV__WORK_CPU);
256 #endif
257 
258   for (i = 0; i < nthreads; i++)
259     if (uv_thread_join(threads + i))
260       abort();
261 
262   if (threads != default_threads)
263     uv__free(threads);
264 
265   uv_mutex_destroy(&mutex);
266   uv_cond_destroy(&cond);
267 
268   threads = NULL;
269   nthreads = 0;
270 }
271 
272 
273 #ifndef USE_FFRT
274 static void init_threads(void) {
275   uv_thread_options_t config;
276   unsigned int i;
277   const char* val;
278   uv_sem_t sem;
279 
280   nthreads = ARRAY_SIZE(default_threads);
281   val = getenv("UV_THREADPOOL_SIZE");
282   if (val != NULL)
283     nthreads = atoi(val);
284   if (nthreads == 0)
285     nthreads = 1;
286   if (nthreads > MAX_THREADPOOL_SIZE)
287     nthreads = MAX_THREADPOOL_SIZE;
288 
289   threads = default_threads;
290   if (nthreads > ARRAY_SIZE(default_threads)) {
291     threads = uv__malloc(nthreads * sizeof(threads[0]));
292     if (threads == NULL) {
293       nthreads = ARRAY_SIZE(default_threads);
294       threads = default_threads;
295     }
296   }
297 
298   if (uv_cond_init(&cond))
299     abort();
300 
301   if (uv_mutex_init(&mutex))
302     abort();
303 
304   uv__queue_init(&wq);
305   uv__queue_init(&slow_io_pending_wq);
306   uv__queue_init(&run_slow_work_message);
307 
308   if (uv_sem_init(&sem, 0))
309     abort();
310 
311   config.flags = UV_THREAD_HAS_STACK_SIZE;
312   config.stack_size = 8u << 20;  /* 8 MB */
313 
314   for (i = 0; i < nthreads; i++)
315     if (uv_thread_create_ex(threads + i, &config, worker, &sem))
316       abort();
317 
318   for (i = 0; i < nthreads; i++)
319     uv_sem_wait(&sem);
320 
321   uv_sem_destroy(&sem);
322 }
323 
324 
325 #ifndef _WIN32
326 static void reset_once(void) {
327   uv_once_t child_once = UV_ONCE_INIT;
328   memcpy(&once, &child_once, sizeof(child_once));
329 }
330 #endif
331 
332 
333 static void init_once(void) {
334 #ifndef _WIN32
335   /* Re-initialize the threadpool after fork.
336    * Note that this discards the global mutex and condition as well
337    * as the work queue.
338    */
339   if (pthread_atfork(NULL, NULL, &reset_once))
340     abort();
341 #endif
342 #ifdef USE_FFRT
343   init_closed_uv_loop_rwlock_once();
344 #endif
345   init_threads();
346 }
347 
348 
349 void uv__work_submit(uv_loop_t* loop,
350                      struct uv__work* w,
351                      enum uv__work_kind kind,
352                      void (*work)(struct uv__work* w),
353                      void (*done)(struct uv__work* w, int status)) {
354   uv_once(&once, init_once);
355   w->loop = loop;
356   w->work = work;
357   w->done = done;
358   post(&w->wq, kind);
359 }
360 #endif
361 
362 
363 static void uv__print_active_reqs(uv_loop_t* loop, const char* flag) {
364 #ifdef USE_OHOS_DFX
365   unsigned int count = loop->active_reqs.count;
366   if (count == MIN_REQS_THRESHOLD || count == MIN_REQS_THRESHOLD + CURSOR ||
367       count == MAX_REQS_THRESHOLD || count == MAX_REQS_THRESHOLD + CURSOR) {
368     UV_LOGW("loop:%{public}zu, flag:%{public}s, active reqs:%{public}u", (size_t)loop % UV_ADDR_MOD, flag, count);
369   }
370 #else
371   return;
372 #endif
373 }
374 
375 
376 #ifdef USE_FFRT
377 static void uv__req_reserved_init(uv_work_t* req) {
378   for (int i = 0; i < sizeof(req->reserved) / sizeof(req->reserved[0]); i++) {
379     req->reserved[i] = NULL;
380   }
381 }
382 
383 
384 static void uv__task_done_wrapper(void* work, int status) {
385   struct uv__work* w = (struct uv__work*)work;
386   uv__print_active_reqs(w->loop, "complete");
387   w->done(w, status);
388 }
389 
390 
391 void uv__work_submit_to_eventloop(uv_req_t* req, struct uv__work* w, int qos) {
392   uv_loop_t* loop = w->loop;
393   rdlock_closed_uv_loop_rwlock();
394   if (!is_uv_loop_good_magic(loop)) {
395     rdunlock_closed_uv_loop_rwlock();
396     UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid",
397             (size_t)loop % UV_ADDR_MOD, loop->magic);
398     return;
399   }
400 
401   uv_mutex_lock(&loop->wq_mutex);
402   w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
403 
404   if (uv_check_data_valid(loop) == 0) {
405     int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
406     struct uv_loop_data* data = (struct uv_loop_data*)loop->data;
407     uv_mutex_unlock(&loop->wq_mutex);
408     if (req->type == UV_WORK) {
409       data->post_task_func((char*)req->reserved[DFX_TASK_NAME], uv__task_done_wrapper, (void*)w, status, qos);
410     } else {
411       data->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos);
412     }
413   } else {
414     uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
415     uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
416     uv_mutex_unlock(&loop->wq_mutex);
417     uv_async_send(&loop->wq_async);
418   }
419   rdunlock_closed_uv_loop_rwlock();
420 }
421 #endif
422 
423 
424 /* TODO(bnoordhuis) teach libuv how to cancel file operations
425  * that go through io_uring instead of the thread pool.
426  */
427 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
428   int cancelled;
429 
430 #ifdef USE_FFRT
431   rdlock_closed_uv_loop_rwlock();
432   if (!is_uv_loop_good_magic(w->loop)) {
433     rdunlock_closed_uv_loop_rwlock();
434     return 0;
435   }
436 #endif
437 
438 #ifndef USE_FFRT
439   uv_mutex_lock(&mutex);
440   uv_mutex_lock(&w->loop->wq_mutex);
441 
442   cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
443   if (cancelled)
444     uv__queue_remove(&w->wq);
445 
446   uv_mutex_unlock(&w->loop->wq_mutex);
447   uv_mutex_unlock(&mutex);
448 #else
449   uv_mutex_lock(&w->loop->wq_mutex);
450   if (req->type == UV_WORK && req->reserved[FFRT_TASK_DEPENDENCE] != NULL) {
451     cancelled = w->work != NULL && (ffrt_skip((ffrt_task_handle_t)req->reserved[FFRT_TASK_DEPENDENCE]) == 0);
452   } else {
453     cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
454       && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[FFRT_QOS]);
455   }
456   uv_mutex_unlock(&w->loop->wq_mutex);
457 #endif
458 
459   if (!cancelled) {
460 #ifdef USE_FFRT
461     rdunlock_closed_uv_loop_rwlock();
462 #endif
463     return UV_EBUSY;
464   }
465 
466   w->work = uv__cancelled;
467   uv_mutex_lock(&loop->wq_mutex);
468 #ifndef USE_FFRT
469   uv__queue_insert_tail(&loop->wq, &w->wq);
470   uv_async_send(&loop->wq_async);
471 #else
472   uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
473   int qos = (ffrt_qos_t)(intptr_t)req->reserved[FFRT_QOS];
474 
475   if (uv_check_data_valid(w->loop) == 0) {
476     int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
477     struct uv_loop_data* data = (struct uv_loop_data*)w->loop->data;
478     if (req->type == UV_WORK) {
479       data->post_task_func((char*)req->reserved[DFX_TASK_NAME], uv__task_done_wrapper, (void*)w, status, qos);
480     } else {
481       data->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos);
482     }
483   } else {
484     uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
485     uv_async_send(&loop->wq_async);
486   }
487 #endif
488   uv_mutex_unlock(&loop->wq_mutex);
489 #ifdef USE_FFRT
490   rdunlock_closed_uv_loop_rwlock();
491 #endif
492 
493   return 0;
494 }
495 
496 
497 void uv__work_done(uv_async_t* handle) {
498   struct uv__work* w;
499   uv_loop_t* loop;
500   struct uv__queue* q;
501   struct uv__queue wq;
502   int err;
503   int nevents;
504 
505   loop = container_of(handle, uv_loop_t, wq_async);
506 #ifdef USE_FFRT
507   rdlock_closed_uv_loop_rwlock();
508   if (!is_uv_loop_good_magic(loop)) {
509     rdunlock_closed_uv_loop_rwlock();
510     return;
511   }
512   rdunlock_closed_uv_loop_rwlock();
513 #endif
514 
515 #ifdef USE_OHOS_DFX
516   if (uv_check_data_valid(loop) == 0) {
517     return;
518   }
519   uv__print_active_reqs(loop, "complete");
520 #endif
521 
522   uv_mutex_lock(&loop->wq_mutex);
523 #ifndef USE_FFRT
524   uv__queue_move(&loop->wq, &wq);
525 #else
526   uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
527   int i;
528   uv__queue_init(&wq);
529   for (i = 5; i >= 0; i--) {
530     // No task in 4-th lfields->wq_sub queue.
531     if (i == 4) {
532       continue;
533     }
534     if (!uv__queue_empty(&lfields->wq_sub[i])) {
535       uv__queue_append(&lfields->wq_sub[i], &wq);
536     }
537   }
538 #endif
539   uv_mutex_unlock(&loop->wq_mutex);
540 
541   nevents = 0;
542   uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
543   while (!uv__queue_empty(&wq)) {
544     q = uv__queue_head(&wq);
545     uv__queue_remove(q);
546 
547     w = container_of(q, struct uv__work, wq);
548     err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
549     w->done(w, err);
550     nevents++;
551   }
552   uv_end_trace(UV_TRACE_TAG);
553 
554   /* This check accomplishes 2 things:
555    * 1. Even if the queue was empty, the call to uv__work_done() should count
556    *    as an event. Which will have been added by the event loop when
557    *    calling this callback.
558    * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
559    */
560   if (nevents > 1) {
561     /* Subtract 1 to counter the call to uv__work_done(). */
562     uv__metrics_inc_events(loop, nevents - 1);
563     if (uv__get_internal_fields(loop)->current_timeout == 0)
564       uv__metrics_inc_events_waiting(loop, nevents - 1);
565   }
566 }
567 
568 
569 #ifdef USE_FFRT
570 static void uv__queue_work(struct uv__work* w, int qos) {
571 #else
572 static void uv__queue_work(struct uv__work* w) {
573 #endif
574   uv_work_t* req = container_of(w, uv_work_t, work_req);
575 #ifdef ASYNC_STACKTRACE
576   LibuvSetStackId((uint64_t)req->reserved[DFX_ASYNC_STACK]);
577 #endif
578   req->work_cb(req);
579 #ifdef USE_FFRT
580   uv__work_submit_to_eventloop(req, w, qos);
581 #endif
582 }
583 
584 
585 static void uv__queue_done(struct uv__work* w, int err) {
586   uv_work_t* req;
587 
588   if (w == NULL) {
589     UV_LOGE("uv_work_t is NULL");
590     return;
591   }
592 
593   req = container_of(w, uv_work_t, work_req);
594 #ifdef ASYNC_STACKTRACE
595   LibuvSetStackId((uint64_t)req->reserved[DFX_ASYNC_STACK]);
596 #endif
597   uv__req_unregister(req->loop, req);
598 
599   if (req->after_work_cb == NULL)
600     return;
601 #ifdef USE_FFRT
602   if (req->reserved[DFX_TASK_NAME] != NULL) {
603     free((char*)req->reserved[DFX_TASK_NAME]);
604     req->reserved[DFX_TASK_NAME] = NULL;
605   }
606   if (req->reserved[FFRT_TASK_DEPENDENCE] != NULL) {
607     ffrt_task_handle_destroy((ffrt_task_handle_t)req->reserved[FFRT_TASK_DEPENDENCE]);
608     req->reserved[FFRT_TASK_DEPENDENCE] = NULL;
609   }
610 #endif
611   req->after_work_cb(req, err);
612 }
613 
614 
615 #ifdef USE_FFRT
616 struct ffrt_function {
617   ffrt_function_header_t header;
618   struct uv__work* w;
619   int qos;
620 };
621 
622 void uv__ffrt_work_ordered(void* t) {
623   ffrt_this_task_set_legacy_mode(true);
624   struct ffrt_function* f = (struct ffrt_function*)t;
625   if (f == NULL || f->w == NULL || f->w->work == NULL) {
626     UV_LOGE("uv work is invalid");
627     ffrt_this_task_set_legacy_mode(false);
628     return;
629   }
630   f->w->work(f->w, f->qos);
631   ffrt_this_task_set_legacy_mode(false);
632 }
633 
634 
635 void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
636 {
637   struct uv__work* w = (struct uv__work *)data;
638   if (w == NULL || w->work == NULL) {
639     UV_LOGE("uv work is invalid");
640     return;
641   }
642   w->work(w, (int)qos);
643 }
644 
645 static void init_once(void)
646 {
647   init_closed_uv_loop_rwlock_once();
648   /* init uv work statics queue */
649   ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
650 }
651 
652 
653 /* ffrt uv__work_submit */
654 void uv__work_submit(uv_loop_t* loop,
655                      uv_req_t* req,
656                      struct uv__work* w,
657                      enum uv__work_kind kind,
658                      void (*work)(struct uv__work *w, int qos),
659                      void (*done)(struct uv__work *w, int status)) {
660   uv_once(&once, init_once);
661   ffrt_task_attr_t attr;
662   ffrt_task_attr_init(&attr);
663 
664   switch(kind) {
665     case UV__WORK_CPU:
666       ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
667       break;
668     case UV__WORK_FAST_IO:
669       ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
670       break;
671     case UV__WORK_SLOW_IO:
672       ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
673       break;
674     default:
675 #ifdef USE_OHOS_DFX
676       UV_LOGI("Unknown work kind");
677 #endif
678       return;
679   }
680 
681   w->loop = loop;
682   w->work = work;
683   w->done = done;
684 
685   req->reserved[FFRT_QOS] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
686   ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
687   ffrt_task_attr_destroy(&attr);
688 }
689 
690 
691 /* ffrt uv__work_submit */
692 void uv__work_submit_with_qos(uv_loop_t* loop,
693                               uv_req_t* req,
694                               struct uv__work* w,
695                               ffrt_qos_t qos,
696                               void (*work)(struct uv__work *w, int qos),
697                               void (*done)(struct uv__work *w, int status)) {
698     uv_once(&once, init_once);
699     ffrt_task_attr_t attr;
700     ffrt_task_attr_init(&attr);
701     ffrt_task_attr_set_qos(&attr, qos);
702 
703     w->loop = loop;
704     w->work = work;
705     w->done = done;
706 
707     req->reserved[FFRT_QOS] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
708     ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
709     ffrt_task_attr_destroy(&attr);
710 }
711 
712 
713 /* ffrt uv__work_submit_ordered */
714 void uv__work_submit_ordered(uv_loop_t* loop,
715                              uv_req_t* req,
716                              struct uv__work* w,
717                              ffrt_qos_t qos,
718                              void (*work)(struct uv__work *w, int qos),
719                              void (*done)(struct uv__work *w, int status),
720                              uintptr_t taskId) {
721   uv_once(&once, init_once);
722   ffrt_task_attr_t attr;
723   ffrt_task_attr_init(&attr);
724   ffrt_task_attr_set_qos(&attr, qos);
725 
726   w->loop = loop;
727   w->work = work;
728   w->done = done;
729 
730   req->reserved[FFRT_QOS] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
731   struct ffrt_function* f =
732                       (struct ffrt_function*)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_general);
733   f->header.exec = uv__ffrt_work_ordered;
734   f->header.destroy = NULL;
735   f->w = w;
736   f->qos = qos;
737   ffrt_dependence_t dependence;
738   dependence.type = ffrt_dependence_data;
739   dependence.ptr = (void*)taskId;
740   ffrt_deps_t out_deps;
741   out_deps.len = 1;
742   out_deps.items = &dependence;
743   ffrt_task_handle_t handle = ffrt_submit_h_base((ffrt_function_header_t*)f, NULL, &out_deps, &attr);
744   if (handle == NULL) {
745     UV_LOGE("submit task failed");
746   }
747   req->reserved[FFRT_TASK_DEPENDENCE] = (void*)handle;
748   ffrt_task_attr_destroy(&attr);
749 }
750 #endif
751 
752 
753 int uv_queue_work(uv_loop_t* loop,
754                   uv_work_t* req,
755                   uv_work_cb work_cb,
756                   uv_after_work_cb after_work_cb) {
757   if (work_cb == NULL)
758     return UV_EINVAL;
759 #ifdef USE_FFRT
760   uv__req_reserved_init(req);
761 #endif
762   uv__print_active_reqs(loop, "execute");
763   uv__req_init(loop, req, UV_WORK);
764   req->loop = loop;
765   req->work_cb = work_cb;
766   req->after_work_cb = after_work_cb;
767 
768 #ifdef ASYNC_STACKTRACE
769   /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
770   req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
771 #endif
772   uv__work_submit(loop,
773 #ifdef USE_FFRT
774                   (uv_req_t*)req,
775 #endif
776                   &req->work_req,
777                   UV__WORK_CPU,
778                   uv__queue_work,
779                   uv__queue_done
780 );
781   return 0;
782 }
783 
784 
785 int uv_queue_work_internal(uv_loop_t* loop,
786                            uv_work_t* req,
787                            uv_work_cb work_cb,
788                            uv_after_work_cb after_work_cb,
789                            const char* task_name) {
790 #ifdef USE_FFRT
791   if (work_cb == NULL)
792     return UV_EINVAL;
793 
794   uv__req_reserved_init(req);
795   uv__copy_taskname((uv_req_t*)req, task_name);
796 
797   uv__print_active_reqs(loop, "execute");
798   uv__req_init(loop, req, UV_WORK);
799   req->loop = loop;
800   req->work_cb = work_cb;
801   req->after_work_cb = after_work_cb;
802 
803 #ifdef ASYNC_STACKTRACE
804   /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
805   req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
806 #endif
807   uv__work_submit(loop,
808                   (uv_req_t*)req,
809                   &req->work_req,
810                   UV__WORK_CPU,
811                   uv__queue_work,
812                   uv__queue_done);
813   return 0;
814 #else
815   return uv_queue_work(loop, req, work_cb, after_work_cb);
816 #endif
817 }
818 
819 
820 int uv_queue_work_with_qos(uv_loop_t* loop,
821                   uv_work_t* req,
822                   uv_work_cb work_cb,
823                   uv_after_work_cb after_work_cb,
824                   uv_qos_t qos) {
825 #ifdef USE_FFRT
826   if (work_cb == NULL)
827     return UV_EINVAL;
828 
829   uv__req_reserved_init(req);
830   STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
831   STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
832   STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
833   STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
834   STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_user_interactive);
835   if (qos == uv_qos_reserved) {
836     UV_LOGW("Invalid qos %{public}d", (int)qos);
837     return UV_EINVAL;
838   }
839   if (qos < ffrt_qos_background || qos > ffrt_qos_user_interactive) {
840     return UV_EINVAL;
841   }
842 
843   uv__print_active_reqs(loop, "execute");
844   uv__req_init(loop, req, UV_WORK);
845   req->loop = loop;
846   req->work_cb = work_cb;
847   req->after_work_cb = after_work_cb;
848 
849 #ifdef ASYNC_STACKTRACE
850   /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
851   req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
852 #endif
853   uv__work_submit_with_qos(loop,
854                   (uv_req_t*)req,
855                   &req->work_req,
856                   (ffrt_qos_t)qos,
857                   uv__queue_work,
858                   uv__queue_done);
859   return 0;
860 #else
861   return uv_queue_work(loop, req, work_cb, after_work_cb);
862 #endif
863 }
864 
865 
866 int uv_queue_work_with_qos_internal(uv_loop_t* loop,
867                            uv_work_t* req,
868                            uv_work_cb work_cb,
869                            uv_after_work_cb after_work_cb,
870                            uv_qos_t qos,
871                            const char* task_name) {
872 #ifdef USE_FFRT
873   if (work_cb == NULL)
874     return UV_EINVAL;
875 
876   uv__req_reserved_init(req);
877   uv__copy_taskname((uv_req_t*)req, task_name);
878 
879   STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
880   STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
881   STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
882   STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
883   STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_user_interactive);
884   if (qos == uv_qos_reserved) {
885     UV_LOGW("Invalid qos %{public}d", (int)qos);
886     return UV_EINVAL;
887   }
888   if (qos < ffrt_qos_background || qos > ffrt_qos_user_interactive) {
889     return UV_EINVAL;
890   }
891 
892   uv__print_active_reqs(loop, "execute");
893   uv__req_init(loop, req, UV_WORK);
894   req->loop = loop;
895   req->work_cb = work_cb;
896   req->after_work_cb = after_work_cb;
897 
898 #ifdef ASYNC_STACKTRACE
899   /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
900   req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
901 #endif
902   uv__work_submit_with_qos(loop,
903                   (uv_req_t*)req,
904                   &req->work_req,
905                   (ffrt_qos_t)qos,
906                   uv__queue_work,
907                   uv__queue_done);
908   return 0;
909 #else
910   return uv_queue_work_with_qos(loop, req, work_cb, after_work_cb, qos);
911 #endif
912 }
913 
914 
915 int uv_queue_work_ordered(uv_loop_t* loop,
916                           uv_work_t* req,
917                           uv_work_cb work_cb,
918                           uv_after_work_cb after_work_cb,
919                           uv_qos_t qos,
920                           uintptr_t taskId) {
921 #ifdef USE_FFRT
922   if (work_cb == NULL)
923     return UV_EINVAL;
924 
925   uv__req_reserved_init(req);
926 
927   STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
928   STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
929   STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
930   STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
931   STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_user_interactive);
932   if (qos == uv_qos_reserved) {
933     UV_LOGW("Invalid qos %{public}d", (int)qos);
934     return UV_EINVAL;
935   }
936   if (qos < ffrt_qos_background || qos > ffrt_qos_user_interactive) {
937     return UV_EINVAL;
938   }
939 
940   uv__print_active_reqs(loop, "execute");
941   uv__req_init(loop, req, UV_WORK);
942   req->loop = loop;
943   req->work_cb = work_cb;
944   req->after_work_cb = after_work_cb;
945 
946 #ifdef ASYNC_STACKTRACE
947   /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
948   req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
949 #endif
950   uv__work_submit_ordered(loop,
951                   (uv_req_t*)req,
952                   &req->work_req,
953                   (ffrt_qos_t)qos,
954                   uv__queue_work,
955                   uv__queue_done,
956                   taskId);
957   return 0;
958 #else
959   return uv_queue_work_with_qos(loop, req, work_cb, after_work_cb, qos);
960 #endif
961 }
962 
963 
964 int uv_cancel(uv_req_t* req) {
965   struct uv__work* wreq;
966   uv_loop_t* loop;
967 
968   switch (req->type) {
969   case UV_FS:
970     loop =  ((uv_fs_t*) req)->loop;
971     wreq = &((uv_fs_t*) req)->work_req;
972     break;
973   case UV_GETADDRINFO:
974     loop =  ((uv_getaddrinfo_t*) req)->loop;
975     wreq = &((uv_getaddrinfo_t*) req)->work_req;
976     break;
977   case UV_GETNAMEINFO:
978     loop = ((uv_getnameinfo_t*) req)->loop;
979     wreq = &((uv_getnameinfo_t*) req)->work_req;
980     break;
981   case UV_RANDOM:
982     loop = ((uv_random_t*) req)->loop;
983     wreq = &((uv_random_t*) req)->work_req;
984     break;
985   case UV_WORK:
986     loop =  ((uv_work_t*) req)->loop;
987     wreq = &((uv_work_t*) req)->work_req;
988     break;
989   default:
990     return UV_EINVAL;
991   }
992 
993   return uv__work_cancel(loop, req, wreq);
994 }
995