1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv-common.h"
23 #include "uv_log.h"
24 #include "uv_trace.h"
25
26 #if !defined(_WIN32)
27 # include "unix/internal.h"
28 #else
29 # include "win/internal.h"
30 #endif
31
32 #include <stdlib.h>
33 #ifdef USE_FFRT
34 #include <assert.h>
35 #include "ffrt_inner.h"
36 #endif
37 #include <stdio.h>
38 #ifdef ASYNC_STACKTRACE
39 #include "dfx/async_stack/libuv_async_stack.h"
40 #endif
41
42 #define MAX_THREADPOOL_SIZE 1024
43 #define UV_TRACE_NAME "UV_TRACE"
44
45 #ifdef USE_OHOS_DFX
46 #define MIN_REQS_THRESHOLD 100
47 #define MAX_REQS_THRESHOLD 300
48 #define CURSOR 5
49 #endif
50
51 typedef enum {
52 /* ffrt qos */
53 FFRT_QOS = 0,
54 /* record task name */
55 DFX_TASK_NAME,
56 /* ffrt task handle */
57 FFRT_TASK_DEPENDENCE,
58 /* collect asynchronous task stack */
59 DFX_ASYNC_STACK,
60 } req_reversed;
61
62 #ifdef USE_FFRT
63 static uv_rwlock_t g_closed_uv_loop_rwlock;
64 #endif
65 static uv_once_t once = UV_ONCE_INIT;
66 static uv_cond_t cond;
67 static uv_mutex_t mutex;
68 static unsigned int idle_threads;
69 static unsigned int nthreads;
70 static uv_thread_t* threads;
71 static uv_thread_t default_threads[4];
72 static struct uv__queue exit_message;
73 static struct uv__queue wq;
74 static struct uv__queue run_slow_work_message;
75 static struct uv__queue slow_io_pending_wq;
76
77
78 #ifdef USE_FFRT
init_closed_uv_loop_rwlock_once(void)79 static void init_closed_uv_loop_rwlock_once(void) {
80 uv_rwlock_init(&g_closed_uv_loop_rwlock);
81 }
82
83
rdlock_closed_uv_loop_rwlock(void)84 void rdlock_closed_uv_loop_rwlock(void) {
85 uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
86 }
87
88
rdunlock_closed_uv_loop_rwlock(void)89 void rdunlock_closed_uv_loop_rwlock(void) {
90 uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
91 }
92
93
is_uv_loop_good_magic(const uv_loop_t * loop)94 int is_uv_loop_good_magic(const uv_loop_t* loop) {
95 if (loop->magic == UV_LOOP_MAGIC) {
96 return 1;
97 }
98 UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop % UV_ADDR_MOD, loop->magic);
99 return 0;
100 }
101 #endif
102
103
on_uv_loop_close(uv_loop_t * loop)104 void on_uv_loop_close(uv_loop_t* loop) {
105 time_t t1, t2;
106 time(&t1);
107 #ifdef USE_FFRT
108 uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
109 loop->magic = ~UV_LOOP_MAGIC;
110 uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
111 #endif
112 time(&t2);
113 UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop % UV_ADDR_MOD, (ssize_t)(t2 - t1));
114 }
115
116
117 #ifdef USE_FFRT
uv__cancelled(struct uv__work * w,int qos)118 static void uv__cancelled(struct uv__work* w, int qos) {
119 #else
120 static void uv__cancelled(struct uv__work* w) {
121 #endif
122 abort();
123 }
124
125
126 #ifndef USE_FFRT
127 static unsigned int slow_io_work_running;
128
129 static unsigned int slow_work_thread_threshold(void) {
130 return (nthreads + 1) / 2;
131 }
132
133
134 /* To avoid deadlock with uv_cancel() it's crucial that the worker
135 * never holds the global mutex and the loop-local mutex at the same time.
136 */
137 static void worker(void* arg) {
138 struct uv__work* w;
139 struct uv__queue* q;
140 int is_slow_work;
141
142 uv_sem_post((uv_sem_t*) arg);
143 arg = NULL;
144
145 uv_mutex_lock(&mutex);
146 for (;;) {
147 /* `mutex` should always be locked at this point. */
148
149 /* Keep waiting while either no work is present or only slow I/O
150 and we're at the threshold for that. */
151 while (uv__queue_empty(&wq) ||
152 (uv__queue_head(&wq) == &run_slow_work_message &&
153 uv__queue_next(&run_slow_work_message) == &wq &&
154 slow_io_work_running >= slow_work_thread_threshold())) {
155 idle_threads += 1;
156 uv_cond_wait(&cond, &mutex);
157 idle_threads -= 1;
158 }
159
160 q = uv__queue_head(&wq);
161 if (q == &exit_message) {
162 uv_cond_signal(&cond);
163 uv_mutex_unlock(&mutex);
164 break;
165 }
166
167 uv__queue_remove(q);
168 uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */
169
170 is_slow_work = 0;
171 if (q == &run_slow_work_message) {
172 /* If we're at the slow I/O threshold, re-schedule until after all
173 other work in the queue is done. */
174 if (slow_io_work_running >= slow_work_thread_threshold()) {
175 uv__queue_insert_tail(&wq, q);
176 continue;
177 }
178
179 /* If we encountered a request to run slow I/O work but there is none
180 to run, that means it's cancelled => Start over. */
181 if (uv__queue_empty(&slow_io_pending_wq))
182 continue;
183
184 is_slow_work = 1;
185 slow_io_work_running++;
186
187 q = uv__queue_head(&slow_io_pending_wq);
188 uv__queue_remove(q);
189 uv__queue_init(q);
190
191 /* If there is more slow I/O work, schedule it to be run as well. */
192 if (!uv__queue_empty(&slow_io_pending_wq)) {
193 uv__queue_insert_tail(&wq, &run_slow_work_message);
194 if (idle_threads > 0)
195 uv_cond_signal(&cond);
196 }
197 }
198
199 uv_mutex_unlock(&mutex);
200
201 w = uv__queue_data(q, struct uv__work, wq);
202 w->work(w);
203 uv_mutex_lock(&w->loop->wq_mutex);
204 w->work = NULL; /* Signal uv_cancel() that the work req is done
205 executing. */
206 uv__queue_insert_tail(&w->loop->wq, &w->wq);
207 uv_async_send(&w->loop->wq_async);
208 uv_mutex_unlock(&w->loop->wq_mutex);
209
210 /* Lock `mutex` since that is expected at the start of the next
211 * iteration. */
212 uv_mutex_lock(&mutex);
213 if (is_slow_work) {
214 /* `slow_io_work_running` is protected by `mutex`. */
215 slow_io_work_running--;
216 }
217 }
218 }
219 #endif
220
221
222 static void post(struct uv__queue* q, enum uv__work_kind kind) {
223 uv_mutex_lock(&mutex);
224 if (kind == UV__WORK_SLOW_IO) {
225 /* Insert into a separate queue. */
226 uv__queue_insert_tail(&slow_io_pending_wq, q);
227 if (!uv__queue_empty(&run_slow_work_message)) {
228 /* Running slow I/O tasks is already scheduled => Nothing to do here.
229 The worker that runs said other task will schedule this one as well. */
230 uv_mutex_unlock(&mutex);
231 return;
232 }
233 q = &run_slow_work_message;
234 }
235
236 uv__queue_insert_tail(&wq, q);
237 if (idle_threads > 0)
238 uv_cond_signal(&cond);
239 uv_mutex_unlock(&mutex);
240 }
241
242
243 #ifdef __MVS__
244 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */
245 __attribute__((destructor))
246 #endif
247 void uv__threadpool_cleanup(void) {
248 unsigned int i;
249
250 if (nthreads == 0)
251 return;
252
253 #ifndef __MVS__
254 /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
255 post(&exit_message, UV__WORK_CPU);
256 #endif
257
258 for (i = 0; i < nthreads; i++)
259 if (uv_thread_join(threads + i))
260 abort();
261
262 if (threads != default_threads)
263 uv__free(threads);
264
265 uv_mutex_destroy(&mutex);
266 uv_cond_destroy(&cond);
267
268 threads = NULL;
269 nthreads = 0;
270 }
271
272
273 #ifndef USE_FFRT
274 static void init_threads(void) {
275 uv_thread_options_t config;
276 unsigned int i;
277 const char* val;
278 uv_sem_t sem;
279
280 nthreads = ARRAY_SIZE(default_threads);
281 val = getenv("UV_THREADPOOL_SIZE");
282 if (val != NULL)
283 nthreads = atoi(val);
284 if (nthreads == 0)
285 nthreads = 1;
286 if (nthreads > MAX_THREADPOOL_SIZE)
287 nthreads = MAX_THREADPOOL_SIZE;
288
289 threads = default_threads;
290 if (nthreads > ARRAY_SIZE(default_threads)) {
291 threads = uv__malloc(nthreads * sizeof(threads[0]));
292 if (threads == NULL) {
293 nthreads = ARRAY_SIZE(default_threads);
294 threads = default_threads;
295 }
296 }
297
298 if (uv_cond_init(&cond))
299 abort();
300
301 if (uv_mutex_init(&mutex))
302 abort();
303
304 uv__queue_init(&wq);
305 uv__queue_init(&slow_io_pending_wq);
306 uv__queue_init(&run_slow_work_message);
307
308 if (uv_sem_init(&sem, 0))
309 abort();
310
311 config.flags = UV_THREAD_HAS_STACK_SIZE;
312 config.stack_size = 8u << 20; /* 8 MB */
313
314 for (i = 0; i < nthreads; i++)
315 if (uv_thread_create_ex(threads + i, &config, worker, &sem))
316 abort();
317
318 for (i = 0; i < nthreads; i++)
319 uv_sem_wait(&sem);
320
321 uv_sem_destroy(&sem);
322 }
323
324
325 #ifndef _WIN32
326 static void reset_once(void) {
327 uv_once_t child_once = UV_ONCE_INIT;
328 memcpy(&once, &child_once, sizeof(child_once));
329 }
330 #endif
331
332
333 static void init_once(void) {
334 #ifndef _WIN32
335 /* Re-initialize the threadpool after fork.
336 * Note that this discards the global mutex and condition as well
337 * as the work queue.
338 */
339 if (pthread_atfork(NULL, NULL, &reset_once))
340 abort();
341 #endif
342 #ifdef USE_FFRT
343 init_closed_uv_loop_rwlock_once();
344 #endif
345 init_threads();
346 }
347
348
349 void uv__work_submit(uv_loop_t* loop,
350 struct uv__work* w,
351 enum uv__work_kind kind,
352 void (*work)(struct uv__work* w),
353 void (*done)(struct uv__work* w, int status)) {
354 uv_once(&once, init_once);
355 w->loop = loop;
356 w->work = work;
357 w->done = done;
358 post(&w->wq, kind);
359 }
360 #endif
361
362
363 static void uv__print_active_reqs(uv_loop_t* loop, const char* flag) {
364 #ifdef USE_OHOS_DFX
365 unsigned int count = loop->active_reqs.count;
366 if (count == MIN_REQS_THRESHOLD || count == MIN_REQS_THRESHOLD + CURSOR ||
367 count == MAX_REQS_THRESHOLD || count == MAX_REQS_THRESHOLD + CURSOR) {
368 UV_LOGW("loop:%{public}zu, flag:%{public}s, active reqs:%{public}u", (size_t)loop % UV_ADDR_MOD, flag, count);
369 }
370 #else
371 return;
372 #endif
373 }
374
375
376 #ifdef USE_FFRT
377 static void uv__req_reserved_init(uv_work_t* req) {
378 for (int i = 0; i < sizeof(req->reserved) / sizeof(req->reserved[0]); i++) {
379 req->reserved[i] = NULL;
380 }
381 }
382
383
384 static void uv__task_done_wrapper(void* work, int status) {
385 struct uv__work* w = (struct uv__work*)work;
386 uv__print_active_reqs(w->loop, "complete");
387 w->done(w, status);
388 }
389
390
391 void uv__work_submit_to_eventloop(uv_req_t* req, struct uv__work* w, int qos) {
392 uv_loop_t* loop = w->loop;
393 rdlock_closed_uv_loop_rwlock();
394 if (!is_uv_loop_good_magic(loop)) {
395 rdunlock_closed_uv_loop_rwlock();
396 UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid",
397 (size_t)loop % UV_ADDR_MOD, loop->magic);
398 return;
399 }
400
401 uv_mutex_lock(&loop->wq_mutex);
402 w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
403
404 if (uv_check_data_valid(loop) == 0) {
405 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
406 struct uv_loop_data* data = (struct uv_loop_data*)loop->data;
407 uv_mutex_unlock(&loop->wq_mutex);
408 if (req->type == UV_WORK) {
409 data->post_task_func((char*)req->reserved[DFX_TASK_NAME], uv__task_done_wrapper, (void*)w, status, qos);
410 } else {
411 data->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos);
412 }
413 } else {
414 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
415 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
416 uv_mutex_unlock(&loop->wq_mutex);
417 uv_async_send(&loop->wq_async);
418 }
419 rdunlock_closed_uv_loop_rwlock();
420 }
421 #endif
422
423
424 /* TODO(bnoordhuis) teach libuv how to cancel file operations
425 * that go through io_uring instead of the thread pool.
426 */
427 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
428 int cancelled;
429
430 #ifdef USE_FFRT
431 rdlock_closed_uv_loop_rwlock();
432 if (!is_uv_loop_good_magic(w->loop)) {
433 rdunlock_closed_uv_loop_rwlock();
434 return 0;
435 }
436 #endif
437
438 #ifndef USE_FFRT
439 uv_mutex_lock(&mutex);
440 uv_mutex_lock(&w->loop->wq_mutex);
441
442 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
443 if (cancelled)
444 uv__queue_remove(&w->wq);
445
446 uv_mutex_unlock(&w->loop->wq_mutex);
447 uv_mutex_unlock(&mutex);
448 #else
449 uv_mutex_lock(&w->loop->wq_mutex);
450 if (req->type == UV_WORK && req->reserved[FFRT_TASK_DEPENDENCE] != NULL) {
451 cancelled = w->work != NULL && (ffrt_skip((ffrt_task_handle_t)req->reserved[FFRT_TASK_DEPENDENCE]) == 0);
452 } else {
453 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
454 && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[FFRT_QOS]);
455 }
456 uv_mutex_unlock(&w->loop->wq_mutex);
457 #endif
458
459 if (!cancelled) {
460 #ifdef USE_FFRT
461 rdunlock_closed_uv_loop_rwlock();
462 #endif
463 return UV_EBUSY;
464 }
465
466 w->work = uv__cancelled;
467 uv_mutex_lock(&loop->wq_mutex);
468 #ifndef USE_FFRT
469 uv__queue_insert_tail(&loop->wq, &w->wq);
470 uv_async_send(&loop->wq_async);
471 #else
472 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
473 int qos = (ffrt_qos_t)(intptr_t)req->reserved[FFRT_QOS];
474
475 if (uv_check_data_valid(w->loop) == 0) {
476 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
477 struct uv_loop_data* data = (struct uv_loop_data*)w->loop->data;
478 if (req->type == UV_WORK) {
479 data->post_task_func((char*)req->reserved[DFX_TASK_NAME], uv__task_done_wrapper, (void*)w, status, qos);
480 } else {
481 data->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos);
482 }
483 } else {
484 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
485 uv_async_send(&loop->wq_async);
486 }
487 #endif
488 uv_mutex_unlock(&loop->wq_mutex);
489 #ifdef USE_FFRT
490 rdunlock_closed_uv_loop_rwlock();
491 #endif
492
493 return 0;
494 }
495
496
497 void uv__work_done(uv_async_t* handle) {
498 struct uv__work* w;
499 uv_loop_t* loop;
500 struct uv__queue* q;
501 struct uv__queue wq;
502 int err;
503 int nevents;
504
505 loop = container_of(handle, uv_loop_t, wq_async);
506 #ifdef USE_FFRT
507 rdlock_closed_uv_loop_rwlock();
508 if (!is_uv_loop_good_magic(loop)) {
509 rdunlock_closed_uv_loop_rwlock();
510 return;
511 }
512 rdunlock_closed_uv_loop_rwlock();
513 #endif
514
515 #ifdef USE_OHOS_DFX
516 if (uv_check_data_valid(loop) == 0) {
517 return;
518 }
519 uv__print_active_reqs(loop, "complete");
520 #endif
521
522 uv_mutex_lock(&loop->wq_mutex);
523 #ifndef USE_FFRT
524 uv__queue_move(&loop->wq, &wq);
525 #else
526 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
527 int i;
528 uv__queue_init(&wq);
529 for (i = 5; i >= 0; i--) {
530 // No task in 4-th lfields->wq_sub queue.
531 if (i == 4) {
532 continue;
533 }
534 if (!uv__queue_empty(&lfields->wq_sub[i])) {
535 uv__queue_append(&lfields->wq_sub[i], &wq);
536 }
537 }
538 #endif
539 uv_mutex_unlock(&loop->wq_mutex);
540
541 nevents = 0;
542 uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
543 while (!uv__queue_empty(&wq)) {
544 q = uv__queue_head(&wq);
545 uv__queue_remove(q);
546
547 w = container_of(q, struct uv__work, wq);
548 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
549 w->done(w, err);
550 nevents++;
551 }
552 uv_end_trace(UV_TRACE_TAG);
553
554 /* This check accomplishes 2 things:
555 * 1. Even if the queue was empty, the call to uv__work_done() should count
556 * as an event. Which will have been added by the event loop when
557 * calling this callback.
558 * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
559 */
560 if (nevents > 1) {
561 /* Subtract 1 to counter the call to uv__work_done(). */
562 uv__metrics_inc_events(loop, nevents - 1);
563 if (uv__get_internal_fields(loop)->current_timeout == 0)
564 uv__metrics_inc_events_waiting(loop, nevents - 1);
565 }
566 }
567
568
569 #ifdef USE_FFRT
570 static void uv__queue_work(struct uv__work* w, int qos) {
571 #else
572 static void uv__queue_work(struct uv__work* w) {
573 #endif
574 uv_work_t* req = container_of(w, uv_work_t, work_req);
575 #ifdef ASYNC_STACKTRACE
576 LibuvSetStackId((uint64_t)req->reserved[DFX_ASYNC_STACK]);
577 #endif
578 req->work_cb(req);
579 #ifdef USE_FFRT
580 uv__work_submit_to_eventloop(req, w, qos);
581 #endif
582 }
583
584
585 static void uv__queue_done(struct uv__work* w, int err) {
586 uv_work_t* req;
587
588 if (w == NULL) {
589 UV_LOGE("uv_work_t is NULL");
590 return;
591 }
592
593 req = container_of(w, uv_work_t, work_req);
594 #ifdef ASYNC_STACKTRACE
595 LibuvSetStackId((uint64_t)req->reserved[DFX_ASYNC_STACK]);
596 #endif
597 uv__req_unregister(req->loop, req);
598
599 if (req->after_work_cb == NULL)
600 return;
601 #ifdef USE_FFRT
602 if (req->reserved[DFX_TASK_NAME] != NULL) {
603 free((char*)req->reserved[DFX_TASK_NAME]);
604 req->reserved[DFX_TASK_NAME] = NULL;
605 }
606 if (req->reserved[FFRT_TASK_DEPENDENCE] != NULL) {
607 ffrt_task_handle_destroy((ffrt_task_handle_t)req->reserved[FFRT_TASK_DEPENDENCE]);
608 req->reserved[FFRT_TASK_DEPENDENCE] = NULL;
609 }
610 #endif
611 req->after_work_cb(req, err);
612 }
613
614
615 #ifdef USE_FFRT
616 struct ffrt_function {
617 ffrt_function_header_t header;
618 struct uv__work* w;
619 int qos;
620 };
621
622 void uv__ffrt_work_ordered(void* t) {
623 ffrt_this_task_set_legacy_mode(true);
624 struct ffrt_function* f = (struct ffrt_function*)t;
625 if (f == NULL || f->w == NULL || f->w->work == NULL) {
626 UV_LOGE("uv work is invalid");
627 ffrt_this_task_set_legacy_mode(false);
628 return;
629 }
630 f->w->work(f->w, f->qos);
631 ffrt_this_task_set_legacy_mode(false);
632 }
633
634
635 void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
636 {
637 struct uv__work* w = (struct uv__work *)data;
638 if (w == NULL || w->work == NULL) {
639 UV_LOGE("uv work is invalid");
640 return;
641 }
642 w->work(w, (int)qos);
643 }
644
645 static void init_once(void)
646 {
647 init_closed_uv_loop_rwlock_once();
648 /* init uv work statics queue */
649 ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
650 }
651
652
653 /* ffrt uv__work_submit */
654 void uv__work_submit(uv_loop_t* loop,
655 uv_req_t* req,
656 struct uv__work* w,
657 enum uv__work_kind kind,
658 void (*work)(struct uv__work *w, int qos),
659 void (*done)(struct uv__work *w, int status)) {
660 uv_once(&once, init_once);
661 ffrt_task_attr_t attr;
662 ffrt_task_attr_init(&attr);
663
664 switch(kind) {
665 case UV__WORK_CPU:
666 ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
667 break;
668 case UV__WORK_FAST_IO:
669 ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
670 break;
671 case UV__WORK_SLOW_IO:
672 ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
673 break;
674 default:
675 #ifdef USE_OHOS_DFX
676 UV_LOGI("Unknown work kind");
677 #endif
678 return;
679 }
680
681 w->loop = loop;
682 w->work = work;
683 w->done = done;
684
685 req->reserved[FFRT_QOS] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
686 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
687 ffrt_task_attr_destroy(&attr);
688 }
689
690
691 /* ffrt uv__work_submit */
692 void uv__work_submit_with_qos(uv_loop_t* loop,
693 uv_req_t* req,
694 struct uv__work* w,
695 ffrt_qos_t qos,
696 void (*work)(struct uv__work *w, int qos),
697 void (*done)(struct uv__work *w, int status)) {
698 uv_once(&once, init_once);
699 ffrt_task_attr_t attr;
700 ffrt_task_attr_init(&attr);
701 ffrt_task_attr_set_qos(&attr, qos);
702
703 w->loop = loop;
704 w->work = work;
705 w->done = done;
706
707 req->reserved[FFRT_QOS] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
708 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
709 ffrt_task_attr_destroy(&attr);
710 }
711
712
713 /* ffrt uv__work_submit_ordered */
714 void uv__work_submit_ordered(uv_loop_t* loop,
715 uv_req_t* req,
716 struct uv__work* w,
717 ffrt_qos_t qos,
718 void (*work)(struct uv__work *w, int qos),
719 void (*done)(struct uv__work *w, int status),
720 uintptr_t taskId) {
721 uv_once(&once, init_once);
722 ffrt_task_attr_t attr;
723 ffrt_task_attr_init(&attr);
724 ffrt_task_attr_set_qos(&attr, qos);
725
726 w->loop = loop;
727 w->work = work;
728 w->done = done;
729
730 req->reserved[FFRT_QOS] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
731 struct ffrt_function* f =
732 (struct ffrt_function*)ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_general);
733 f->header.exec = uv__ffrt_work_ordered;
734 f->header.destroy = NULL;
735 f->w = w;
736 f->qos = qos;
737 ffrt_dependence_t dependence;
738 dependence.type = ffrt_dependence_data;
739 dependence.ptr = (void*)taskId;
740 ffrt_deps_t out_deps;
741 out_deps.len = 1;
742 out_deps.items = &dependence;
743 ffrt_task_handle_t handle = ffrt_submit_h_base((ffrt_function_header_t*)f, NULL, &out_deps, &attr);
744 if (handle == NULL) {
745 UV_LOGE("submit task failed");
746 }
747 req->reserved[FFRT_TASK_DEPENDENCE] = (void*)handle;
748 ffrt_task_attr_destroy(&attr);
749 }
750 #endif
751
752
753 int uv_queue_work(uv_loop_t* loop,
754 uv_work_t* req,
755 uv_work_cb work_cb,
756 uv_after_work_cb after_work_cb) {
757 if (work_cb == NULL)
758 return UV_EINVAL;
759 #ifdef USE_FFRT
760 uv__req_reserved_init(req);
761 #endif
762 uv__print_active_reqs(loop, "execute");
763 uv__req_init(loop, req, UV_WORK);
764 req->loop = loop;
765 req->work_cb = work_cb;
766 req->after_work_cb = after_work_cb;
767
768 #ifdef ASYNC_STACKTRACE
769 /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
770 req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
771 #endif
772 uv__work_submit(loop,
773 #ifdef USE_FFRT
774 (uv_req_t*)req,
775 #endif
776 &req->work_req,
777 UV__WORK_CPU,
778 uv__queue_work,
779 uv__queue_done
780 );
781 return 0;
782 }
783
784
785 int uv_queue_work_internal(uv_loop_t* loop,
786 uv_work_t* req,
787 uv_work_cb work_cb,
788 uv_after_work_cb after_work_cb,
789 const char* task_name) {
790 #ifdef USE_FFRT
791 if (work_cb == NULL)
792 return UV_EINVAL;
793
794 uv__req_reserved_init(req);
795 uv__copy_taskname((uv_req_t*)req, task_name);
796
797 uv__print_active_reqs(loop, "execute");
798 uv__req_init(loop, req, UV_WORK);
799 req->loop = loop;
800 req->work_cb = work_cb;
801 req->after_work_cb = after_work_cb;
802
803 #ifdef ASYNC_STACKTRACE
804 /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
805 req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
806 #endif
807 uv__work_submit(loop,
808 (uv_req_t*)req,
809 &req->work_req,
810 UV__WORK_CPU,
811 uv__queue_work,
812 uv__queue_done);
813 return 0;
814 #else
815 return uv_queue_work(loop, req, work_cb, after_work_cb);
816 #endif
817 }
818
819
820 int uv_queue_work_with_qos(uv_loop_t* loop,
821 uv_work_t* req,
822 uv_work_cb work_cb,
823 uv_after_work_cb after_work_cb,
824 uv_qos_t qos) {
825 #ifdef USE_FFRT
826 if (work_cb == NULL)
827 return UV_EINVAL;
828
829 uv__req_reserved_init(req);
830 STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
831 STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
832 STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
833 STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
834 STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_user_interactive);
835 if (qos == uv_qos_reserved) {
836 UV_LOGW("Invalid qos %{public}d", (int)qos);
837 return UV_EINVAL;
838 }
839 if (qos < ffrt_qos_background || qos > ffrt_qos_user_interactive) {
840 return UV_EINVAL;
841 }
842
843 uv__print_active_reqs(loop, "execute");
844 uv__req_init(loop, req, UV_WORK);
845 req->loop = loop;
846 req->work_cb = work_cb;
847 req->after_work_cb = after_work_cb;
848
849 #ifdef ASYNC_STACKTRACE
850 /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
851 req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
852 #endif
853 uv__work_submit_with_qos(loop,
854 (uv_req_t*)req,
855 &req->work_req,
856 (ffrt_qos_t)qos,
857 uv__queue_work,
858 uv__queue_done);
859 return 0;
860 #else
861 return uv_queue_work(loop, req, work_cb, after_work_cb);
862 #endif
863 }
864
865
866 int uv_queue_work_with_qos_internal(uv_loop_t* loop,
867 uv_work_t* req,
868 uv_work_cb work_cb,
869 uv_after_work_cb after_work_cb,
870 uv_qos_t qos,
871 const char* task_name) {
872 #ifdef USE_FFRT
873 if (work_cb == NULL)
874 return UV_EINVAL;
875
876 uv__req_reserved_init(req);
877 uv__copy_taskname((uv_req_t*)req, task_name);
878
879 STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
880 STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
881 STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
882 STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
883 STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_user_interactive);
884 if (qos == uv_qos_reserved) {
885 UV_LOGW("Invalid qos %{public}d", (int)qos);
886 return UV_EINVAL;
887 }
888 if (qos < ffrt_qos_background || qos > ffrt_qos_user_interactive) {
889 return UV_EINVAL;
890 }
891
892 uv__print_active_reqs(loop, "execute");
893 uv__req_init(loop, req, UV_WORK);
894 req->loop = loop;
895 req->work_cb = work_cb;
896 req->after_work_cb = after_work_cb;
897
898 #ifdef ASYNC_STACKTRACE
899 /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
900 req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
901 #endif
902 uv__work_submit_with_qos(loop,
903 (uv_req_t*)req,
904 &req->work_req,
905 (ffrt_qos_t)qos,
906 uv__queue_work,
907 uv__queue_done);
908 return 0;
909 #else
910 return uv_queue_work_with_qos(loop, req, work_cb, after_work_cb, qos);
911 #endif
912 }
913
914
915 int uv_queue_work_ordered(uv_loop_t* loop,
916 uv_work_t* req,
917 uv_work_cb work_cb,
918 uv_after_work_cb after_work_cb,
919 uv_qos_t qos,
920 uintptr_t taskId) {
921 #ifdef USE_FFRT
922 if (work_cb == NULL)
923 return UV_EINVAL;
924
925 uv__req_reserved_init(req);
926
927 STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
928 STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
929 STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
930 STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
931 STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_user_interactive);
932 if (qos == uv_qos_reserved) {
933 UV_LOGW("Invalid qos %{public}d", (int)qos);
934 return UV_EINVAL;
935 }
936 if (qos < ffrt_qos_background || qos > ffrt_qos_user_interactive) {
937 return UV_EINVAL;
938 }
939
940 uv__print_active_reqs(loop, "execute");
941 uv__req_init(loop, req, UV_WORK);
942 req->loop = loop;
943 req->work_cb = work_cb;
944 req->after_work_cb = after_work_cb;
945
946 #ifdef ASYNC_STACKTRACE
947 /* The req->reserved[DFX_ASYNC_STACK] is used for DFX only. */
948 req->reserved[DFX_ASYNC_STACK] = (void*)LibuvCollectAsyncStack();
949 #endif
950 uv__work_submit_ordered(loop,
951 (uv_req_t*)req,
952 &req->work_req,
953 (ffrt_qos_t)qos,
954 uv__queue_work,
955 uv__queue_done,
956 taskId);
957 return 0;
958 #else
959 return uv_queue_work_with_qos(loop, req, work_cb, after_work_cb, qos);
960 #endif
961 }
962
963
964 int uv_cancel(uv_req_t* req) {
965 struct uv__work* wreq;
966 uv_loop_t* loop;
967
968 switch (req->type) {
969 case UV_FS:
970 loop = ((uv_fs_t*) req)->loop;
971 wreq = &((uv_fs_t*) req)->work_req;
972 break;
973 case UV_GETADDRINFO:
974 loop = ((uv_getaddrinfo_t*) req)->loop;
975 wreq = &((uv_getaddrinfo_t*) req)->work_req;
976 break;
977 case UV_GETNAMEINFO:
978 loop = ((uv_getnameinfo_t*) req)->loop;
979 wreq = &((uv_getnameinfo_t*) req)->work_req;
980 break;
981 case UV_RANDOM:
982 loop = ((uv_random_t*) req)->loop;
983 wreq = &((uv_random_t*) req)->work_req;
984 break;
985 case UV_WORK:
986 loop = ((uv_work_t*) req)->loop;
987 wreq = &((uv_work_t*) req)->work_req;
988 break;
989 default:
990 return UV_EINVAL;
991 }
992
993 return uv__work_cancel(loop, req, wreq);
994 }
995