1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv-common.h"
23 #include "uv_log.h"
24 #include "uv_trace.h"
25
26 #if !defined(_WIN32)
27 # include "unix/internal.h"
28 #else
29 # include "win/internal.h"
30 #endif
31
32 #include <stdlib.h>
33 #ifdef USE_FFRT
34 #include <assert.h>
35 #include "ffrt_inner.h"
36 #endif
37 #include <stdio.h>
38 #ifdef ASYNC_STACKTRACE
39 #include "dfx/async_stack/libuv_async_stack.h"
40 #endif
41
42 #define MAX_THREADPOOL_SIZE 1024
43 #define UV_TRACE_NAME "UV_TRACE"
44
45 #ifdef USE_OHOS_DFX
46 #define MIN_REQS_THRESHOLD 100
47 #define MAX_REQS_THRESHOLD 300
48 #define CURSOR 5
49 #endif
50
51 static uv_rwlock_t g_closed_uv_loop_rwlock;
52 static uv_once_t once = UV_ONCE_INIT;
53 static uv_cond_t cond;
54 static uv_mutex_t mutex;
55 static unsigned int idle_threads;
56 static unsigned int nthreads;
57 static uv_thread_t* threads;
58 static uv_thread_t default_threads[4];
59 static struct uv__queue exit_message;
60 static struct uv__queue wq;
61 static struct uv__queue run_slow_work_message;
62 static struct uv__queue slow_io_pending_wq;
63
64
65 #ifdef UV_STATISTIC
66 #define MAX_DUMP_QUEUE_SIZE 200
67 static uv_mutex_t dump_queue_mutex;
68 static QUEUE dump_queue;
69 static unsigned int dump_queue_size;
70
71 static int statistic_idle;
72 static uv_mutex_t statistic_mutex;
73 static QUEUE statistic_works;
74 static uv_cond_t dump_cond;
75 static uv_thread_t dump_thread;
76
uv_dump_worker(void * arg)77 static void uv_dump_worker(void* arg) {
78 struct uv__statistic_work* w;
79 struct uv__queue* q;
80 uv_sem_post((uv_sem_t*) arg);
81 arg = NULL;
82 uv_mutex_lock(&statistic_mutex);
83 for (;;) {
84 while (uv__queue_empty(&statistic_works)) {
85 statistic_idle = 1;
86 uv_cond_wait(&dump_cond, &statistic_mutex);
87 statistic_idle = 0;
88 }
89 q = uv__queue_head(&statistic_works);
90 if (q == &exit_message) {
91 uv_cond_signal(&dump_cond);
92 uv_mutex_unlock(&statistic_mutex);
93 break;
94 }
95 uv__queue_remove(q);
96 uv__queue_init(q);
97 uv_mutex_unlock(&statistic_mutex);
98 w = uv__queue_data(q, struct uv__statistic_work, wq);
99 w->work(w);
100 free(w);
101 uv_mutex_lock(&statistic_mutex);
102 }
103 }
104
post_statistic_work(struct uv__queue * q)105 static void post_statistic_work(struct uv__queue* q) {
106 uv_mutex_lock(&statistic_mutex);
107 uv__queue_insert_tail(&statistic_works, q);
108 if (statistic_idle)
109 uv_cond_signal(&dump_cond);
110 uv_mutex_unlock(&statistic_mutex);
111 }
112
uv__queue_work_info(struct uv__statistic_work * work)113 static void uv__queue_work_info(struct uv__statistic_work *work) {
114 uv_mutex_lock(&dump_queue_mutex);
115 if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */
116 struct uv__queue* q;
117 uv__queue_foreach(q, &dump_queue) {
118 struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
119 if (info->state == DONE_END) {
120 uv__queue_remove(q);
121 free(info);
122 dump_queue_size--;
123 }
124 }
125 if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) {
126 abort(); /* too many works not done. */
127 }
128 }
129
130 uv__queue_insert_head(&dump_queue, &work->info->wq);
131 dump_queue_size++;
132 uv_mutex_unlock(&dump_queue_mutex);
133 }
134
uv__update_work_info(struct uv__statistic_work * work)135 static void uv__update_work_info(struct uv__statistic_work *work) {
136 uv_mutex_lock(&dump_queue_mutex);
137 if (work != NULL && work->info != NULL) {
138 work->info->state = work->state;
139 switch (work->state) {
140 case WAITING:
141 work->info->queue_time = work->time;
142 break;
143 case WORK_EXECUTING:
144 work->info->execute_start_time = work->time;
145 break;
146 case WORK_END:
147 work->info->execute_end_time = work->time;
148 break;
149 case DONE_EXECUTING:
150 work->info->done_start_time = work->time;
151 break;
152 case DONE_END:
153 work->info->done_end_time = work->time;
154 break;
155 default:
156 break;
157 }
158 }
159 uv_mutex_unlock(&dump_queue_mutex);
160 }
161
162
163 // return the timestamp in millisecond
uv__now_timestamp()164 static uint64_t uv__now_timestamp() {
165 uv_timeval64_t tv;
166 int r = uv_gettimeofday(&tv);
167 if (r != 0) {
168 return 0;
169 }
170 return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
171 }
172
173
uv__post_statistic_work(struct uv__work * w,enum uv_work_state state)174 static void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) {
175 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
176 if (dump_work == NULL) {
177 return;
178 }
179 dump_work->info = w->info;
180 dump_work->work = uv__update_work_info;
181 dump_work->time = uv__now_timestamp();
182 dump_work->state = state;
183 uv__queue_init(&dump_work->wq);
184 post_statistic_work(&dump_work->wq);
185 }
186
187
init_work_dump_queue()188 static void init_work_dump_queue()
189 {
190 if (uv_mutex_init(&dump_queue_mutex))
191 abort();
192 uv_mutex_lock(&dump_queue_mutex);
193 uv__queue_init(&dump_queue);
194 dump_queue_size = 0;
195 uv_mutex_unlock(&dump_queue_mutex);
196
197 /* init dump thread */
198 statistic_idle = 1;
199 if (uv_mutex_init(&statistic_mutex))
200 abort();
201 uv__queue_init(&statistic_works);
202 uv_sem_t sem;
203 if (uv_cond_init(&dump_cond))
204 abort();
205 if (uv_sem_init(&sem, 0))
206 abort();
207 if (uv_thread_create(&dump_thread, uv_dump_worker, &sem))
208 abort();
209 uv_sem_wait(&sem);
210 uv_sem_destroy(&sem);
211 }
212
213
uv_init_dump_info(struct uv_work_dump_info * info,struct uv__work * w)214 void uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) {
215 if (info == NULL)
216 return;
217 info->queue_time = 0;
218 info->state = WAITING;
219 info->execute_start_time = 0;
220 info->execute_end_time = 0;
221 info->done_start_time = 0;
222 info->done_end_time = 0;
223 info->work = w;
224 uv__queue_init(&info->wq);
225 }
226
227
uv_queue_statics(struct uv_work_dump_info * info)228 void uv_queue_statics(struct uv_work_dump_info* info) {
229 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
230 if (dump_work == NULL) {
231 abort();
232 }
233 dump_work->info = info;
234 dump_work->work = uv__queue_work_info;
235 info->queue_time = uv__now_timestamp();
236 dump_work->state = WAITING;
237 uv__queue_init(&dump_work->wq);
238 post_statistic_work(&dump_work->wq);
239 }
240
241
uv_dump_work_queue(int * size)242 uv_worker_info_t* uv_dump_work_queue(int* size) {
243 #ifdef UV_STATISTIC
244 uv_mutex_lock(&dump_queue_mutex);
245 if (uv__queue_empty(&dump_queue)) {
246 return NULL;
247 }
248 *size = dump_queue_size;
249 uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size);
250 struct uv__queue* q;
251 int i = 0;
252 uv__queue_foreach(q, &dump_queue) {
253 struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
254 dump_info[i].queue_time = info->queue_time;
255 dump_info[i].builtin_return_address[0] = info->builtin_return_address[0];
256 dump_info[i].builtin_return_address[1] = info->builtin_return_address[1];
257 dump_info[i].builtin_return_address[2] = info->builtin_return_address[2];
258 switch (info->state) {
259 case WAITING:
260 strcpy(dump_info[i].state, "waiting");
261 break;
262 case WORK_EXECUTING:
263 strcpy(dump_info[i].state, "work_executing");
264 break;
265 case WORK_END:
266 strcpy(dump_info[i].state, "work_end");
267 break;
268 case DONE_EXECUTING:
269 strcpy(dump_info[i].state, "done_executing");
270 break;
271 case DONE_END:
272 strcpy(dump_info[i].state, "done_end");
273 break;
274 default:
275 break;
276 }
277 dump_info[i].execute_start_time = info->execute_start_time;
278 dump_info[i].execute_end_time = info->execute_end_time;
279 dump_info[i].done_start_time = info->done_start_time;
280 dump_info[i].done_end_time = info->done_end_time;
281 ++i;
282 }
283 uv_mutex_unlock(&dump_queue_mutex);
284 return dump_info;
285 #else
286 size = 0;
287 return NULL;
288 #endif
289 }
290 #endif
291
292
init_closed_uv_loop_rwlock_once(void)293 static void init_closed_uv_loop_rwlock_once(void) {
294 uv_rwlock_init(&g_closed_uv_loop_rwlock);
295 }
296
297
rdlock_closed_uv_loop_rwlock(void)298 void rdlock_closed_uv_loop_rwlock(void) {
299 uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
300 }
301
302
rdunlock_closed_uv_loop_rwlock(void)303 void rdunlock_closed_uv_loop_rwlock(void) {
304 uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
305 }
306
307
is_uv_loop_good_magic(const uv_loop_t * loop)308 int is_uv_loop_good_magic(const uv_loop_t* loop) {
309 if (loop->magic == UV_LOOP_MAGIC) {
310 return 1;
311 }
312 UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop, loop->magic);
313 return 0;
314 }
315
316
on_uv_loop_close(uv_loop_t * loop)317 void on_uv_loop_close(uv_loop_t* loop) {
318 time_t t1, t2;
319 time(&t1);
320 uv_start_trace(UV_TRACE_TAG, "Get Write Lock");
321 uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
322 uv_end_trace(UV_TRACE_TAG);
323 loop->magic = ~UV_LOOP_MAGIC;
324 uv_start_trace(UV_TRACE_TAG, "Release Write Lock");
325 uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
326 uv_end_trace(UV_TRACE_TAG);
327 time(&t2);
328 UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop, (ssize_t)(t2 - t1));
329 }
330
331
uv__cancelled(struct uv__work * w)332 static void uv__cancelled(struct uv__work* w) {
333 abort();
334 }
335
336
337 #ifndef USE_FFRT
338 static unsigned int slow_io_work_running;
339
slow_work_thread_threshold(void)340 static unsigned int slow_work_thread_threshold(void) {
341 return (nthreads + 1) / 2;
342 }
343
344
345 /* To avoid deadlock with uv_cancel() it's crucial that the worker
346 * never holds the global mutex and the loop-local mutex at the same time.
347 */
worker(void * arg)348 static void worker(void* arg) {
349 struct uv__work* w;
350 struct uv__queue* q;
351 int is_slow_work;
352
353 uv_sem_post((uv_sem_t*) arg);
354 arg = NULL;
355
356 uv_mutex_lock(&mutex);
357 for (;;) {
358 /* `mutex` should always be locked at this point. */
359
360 /* Keep waiting while either no work is present or only slow I/O
361 and we're at the threshold for that. */
362 while (uv__queue_empty(&wq) ||
363 (uv__queue_head(&wq) == &run_slow_work_message &&
364 uv__queue_next(&run_slow_work_message) == &wq &&
365 slow_io_work_running >= slow_work_thread_threshold())) {
366 idle_threads += 1;
367 uv_cond_wait(&cond, &mutex);
368 idle_threads -= 1;
369 }
370
371 q = uv__queue_head(&wq);
372 if (q == &exit_message) {
373 uv_cond_signal(&cond);
374 uv_mutex_unlock(&mutex);
375 break;
376 }
377
378 uv__queue_remove(q);
379 uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */
380
381 is_slow_work = 0;
382 if (q == &run_slow_work_message) {
383 /* If we're at the slow I/O threshold, re-schedule until after all
384 other work in the queue is done. */
385 if (slow_io_work_running >= slow_work_thread_threshold()) {
386 uv__queue_insert_tail(&wq, q);
387 continue;
388 }
389
390 /* If we encountered a request to run slow I/O work but there is none
391 to run, that means it's cancelled => Start over. */
392 if (uv__queue_empty(&slow_io_pending_wq))
393 continue;
394
395 is_slow_work = 1;
396 slow_io_work_running++;
397
398 q = uv__queue_head(&slow_io_pending_wq);
399 uv__queue_remove(q);
400 uv__queue_init(q);
401
402 /* If there is more slow I/O work, schedule it to be run as well. */
403 if (!uv__queue_empty(&slow_io_pending_wq)) {
404 uv__queue_insert_tail(&wq, &run_slow_work_message);
405 if (idle_threads > 0)
406 uv_cond_signal(&cond);
407 }
408 }
409
410 uv_mutex_unlock(&mutex);
411
412 w = uv__queue_data(q, struct uv__work, wq);
413 #ifdef UV_STATISTIC
414 uv__post_statistic_work(w, WORK_EXECUTING);
415 #endif
416 w->work(w);
417 #ifdef UV_STATISTIC
418 uv__post_statistic_work(w, WORK_END);
419 #endif
420 uv_mutex_lock(&w->loop->wq_mutex);
421 w->work = NULL; /* Signal uv_cancel() that the work req is done
422 executing. */
423 uv__queue_insert_tail(&w->loop->wq, &w->wq);
424 uv_async_send(&w->loop->wq_async);
425 uv_mutex_unlock(&w->loop->wq_mutex);
426
427 /* Lock `mutex` since that is expected at the start of the next
428 * iteration. */
429 uv_mutex_lock(&mutex);
430 if (is_slow_work) {
431 /* `slow_io_work_running` is protected by `mutex`. */
432 slow_io_work_running--;
433 }
434 }
435 }
436 #endif
437
438
post(struct uv__queue * q,enum uv__work_kind kind)439 static void post(struct uv__queue* q, enum uv__work_kind kind) {
440 uv_mutex_lock(&mutex);
441 if (kind == UV__WORK_SLOW_IO) {
442 /* Insert into a separate queue. */
443 uv__queue_insert_tail(&slow_io_pending_wq, q);
444 if (!uv__queue_empty(&run_slow_work_message)) {
445 /* Running slow I/O tasks is already scheduled => Nothing to do here.
446 The worker that runs said other task will schedule this one as well. */
447 uv_mutex_unlock(&mutex);
448 return;
449 }
450 q = &run_slow_work_message;
451 }
452
453 uv__queue_insert_tail(&wq, q);
454 if (idle_threads > 0)
455 uv_cond_signal(&cond);
456 uv_mutex_unlock(&mutex);
457 }
458
459
460 #ifdef __MVS__
461 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */
462 __attribute__((destructor))
463 #endif
uv__threadpool_cleanup(void)464 void uv__threadpool_cleanup(void) {
465 unsigned int i;
466
467 if (nthreads == 0)
468 return;
469
470 #ifndef __MVS__
471 /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
472 post(&exit_message, UV__WORK_CPU);
473 #endif
474
475 for (i = 0; i < nthreads; i++)
476 if (uv_thread_join(threads + i))
477 abort();
478
479 if (threads != default_threads)
480 uv__free(threads);
481
482 uv_mutex_destroy(&mutex);
483 uv_cond_destroy(&cond);
484
485 threads = NULL;
486 nthreads = 0;
487 #ifdef UV_STATISTIC
488 post_statistic_work(&exit_message);
489 uv_thread_join(dump_thread);
490 uv_mutex_destroy(&statistic_mutex);
491 uv_cond_destroy(&dump_cond);
492 #endif
493 }
494
495
496 #ifndef USE_FFRT
init_threads(void)497 static void init_threads(void) {
498 uv_thread_options_t config;
499 unsigned int i;
500 const char* val;
501 uv_sem_t sem;
502
503 nthreads = ARRAY_SIZE(default_threads);
504 val = getenv("UV_THREADPOOL_SIZE");
505 if (val != NULL)
506 nthreads = atoi(val);
507 if (nthreads == 0)
508 nthreads = 1;
509 if (nthreads > MAX_THREADPOOL_SIZE)
510 nthreads = MAX_THREADPOOL_SIZE;
511
512 threads = default_threads;
513 if (nthreads > ARRAY_SIZE(default_threads)) {
514 threads = uv__malloc(nthreads * sizeof(threads[0]));
515 if (threads == NULL) {
516 nthreads = ARRAY_SIZE(default_threads);
517 threads = default_threads;
518 }
519 }
520
521 if (uv_cond_init(&cond))
522 abort();
523
524 if (uv_mutex_init(&mutex))
525 abort();
526
527 uv__queue_init(&wq);
528 uv__queue_init(&slow_io_pending_wq);
529 uv__queue_init(&run_slow_work_message);
530
531 if (uv_sem_init(&sem, 0))
532 abort();
533
534 config.flags = UV_THREAD_HAS_STACK_SIZE;
535 config.stack_size = 8u << 20; /* 8 MB */
536
537 for (i = 0; i < nthreads; i++)
538 if (uv_thread_create_ex(threads + i, &config, worker, &sem))
539 abort();
540
541 for (i = 0; i < nthreads; i++)
542 uv_sem_wait(&sem);
543
544 uv_sem_destroy(&sem);
545 }
546
547
548 #ifndef _WIN32
reset_once(void)549 static void reset_once(void) {
550 uv_once_t child_once = UV_ONCE_INIT;
551 memcpy(&once, &child_once, sizeof(child_once));
552 }
553 #endif
554
555
init_once(void)556 static void init_once(void) {
557 #ifndef _WIN32
558 /* Re-initialize the threadpool after fork.
559 * Note that this discards the global mutex and condition as well
560 * as the work queue.
561 */
562 if (pthread_atfork(NULL, NULL, &reset_once))
563 abort();
564 #endif
565 init_closed_uv_loop_rwlock_once();
566 #ifdef UV_STATISTIC
567 init_work_dump_queue();
568 #endif
569 init_threads();
570 }
571
572
uv__work_submit(uv_loop_t * loop,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))573 void uv__work_submit(uv_loop_t* loop,
574 struct uv__work* w,
575 enum uv__work_kind kind,
576 void (*work)(struct uv__work* w),
577 void (*done)(struct uv__work* w, int status)) {
578 uv_once(&once, init_once);
579 w->loop = loop;
580 w->work = work;
581 w->done = done;
582 post(&w->wq, kind);
583 }
584 #endif
585
586
uv__print_active_reqs(uv_loop_t * loop,const char * flag)587 static void uv__print_active_reqs(uv_loop_t* loop, const char* flag) {
588 #ifdef USE_OHOS_DFX
589 unsigned int count = loop->active_reqs.count;
590 if (count == MIN_REQS_THRESHOLD || count == MIN_REQS_THRESHOLD + CURSOR ||
591 count == MAX_REQS_THRESHOLD || count == MAX_REQS_THRESHOLD + CURSOR) {
592 UV_LOGW("loop:%{public}zu, flag:%{public}s, active reqs:%{public}u", (size_t)loop, flag, count);
593 }
594 #else
595 return;
596 #endif
597 }
598
599
600 #ifdef USE_FFRT
uv__task_done_wrapper(void * work,int status)601 static void uv__task_done_wrapper(void* work, int status) {
602 struct uv__work* w = (struct uv__work*)work;
603 uv__print_active_reqs(w->loop, "complete");
604 w->done(w, status);
605 }
606 #endif
607
608
609 /* TODO(bnoordhuis) teach libuv how to cancel file operations
610 * that go through io_uring instead of the thread pool.
611 */
uv__work_cancel(uv_loop_t * loop,uv_req_t * req,struct uv__work * w)612 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
613 int cancelled;
614
615 rdlock_closed_uv_loop_rwlock();
616 if (!is_uv_loop_good_magic(w->loop)) {
617 rdunlock_closed_uv_loop_rwlock();
618 return 0;
619 }
620
621 #ifndef USE_FFRT
622 uv_mutex_lock(&mutex);
623 uv_mutex_lock(&w->loop->wq_mutex);
624
625 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
626 if (cancelled)
627 uv__queue_remove(&w->wq);
628
629 uv_mutex_unlock(&w->loop->wq_mutex);
630 uv_mutex_unlock(&mutex);
631 #else
632 uv_mutex_lock(&w->loop->wq_mutex);
633 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
634 && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]);
635 uv_mutex_unlock(&w->loop->wq_mutex);
636 #endif
637
638 if (!cancelled) {
639 rdunlock_closed_uv_loop_rwlock();
640 return UV_EBUSY;
641 }
642
643 w->work = uv__cancelled;
644 uv_mutex_lock(&loop->wq_mutex);
645 #ifndef USE_FFRT
646 uv__queue_insert_tail(&loop->wq, &w->wq);
647 uv_async_send(&loop->wq_async);
648 #else
649 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
650 int qos = (ffrt_qos_t)(intptr_t)req->reserved[0];
651
652 if (uv_check_data_valid((struct uv_loop_data*)(w->loop->data)) == 0) {
653 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
654 struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data -
655 (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
656 addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
657 } else {
658 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
659 uv_async_send(&loop->wq_async);
660 }
661 #endif
662 uv_mutex_unlock(&loop->wq_mutex);
663 rdunlock_closed_uv_loop_rwlock();
664
665 return 0;
666 }
667
668
uv__work_done(uv_async_t * handle)669 void uv__work_done(uv_async_t* handle) {
670 struct uv__work* w;
671 uv_loop_t* loop;
672 struct uv__queue* q;
673 struct uv__queue wq;
674 int err;
675 int nevents;
676
677 loop = container_of(handle, uv_loop_t, wq_async);
678 #ifdef USE_OHOS_DFX
679 if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) != 0)
680 uv__print_active_reqs(loop, "complete");
681 #endif
682 rdlock_closed_uv_loop_rwlock();
683 if (!is_uv_loop_good_magic(loop)) {
684 rdunlock_closed_uv_loop_rwlock();
685 return;
686 }
687 rdunlock_closed_uv_loop_rwlock();
688
689 uv_mutex_lock(&loop->wq_mutex);
690 #ifndef USE_FFRT
691 uv__queue_move(&loop->wq, &wq);
692 #else
693 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
694 int i;
695 uv__queue_init(&wq);
696 for (i = 4; i >= 0; i--) {
697 if (!uv__queue_empty(&lfields->wq_sub[i])) {
698 uv__queue_append(&lfields->wq_sub[i], &wq);
699 }
700 }
701 #endif
702 uv_mutex_unlock(&loop->wq_mutex);
703
704 nevents = 0;
705 uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
706 while (!uv__queue_empty(&wq)) {
707 q = uv__queue_head(&wq);
708 uv__queue_remove(q);
709
710 w = container_of(q, struct uv__work, wq);
711 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
712 #ifdef UV_STATISTIC
713 uv__post_statistic_work(w, DONE_EXECUTING);
714 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
715 if (dump_work == NULL) {
716 UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno));
717 break;
718 }
719 dump_work->info = w->info;
720 dump_work->work = uv__update_work_info;
721 #endif
722 w->done(w, err);
723 nevents++;
724 #ifdef UV_STATISTIC
725 dump_work->time = uv__now_timestamp();
726 dump_work->state = DONE_END;
727 QUEUE_INIT(&dump_work->wq);
728 post_statistic_work(&dump_work->wq);
729 #endif
730 }
731 uv_end_trace(UV_TRACE_TAG);
732
733 /* This check accomplishes 2 things:
734 * 1. Even if the queue was empty, the call to uv__work_done() should count
735 * as an event. Which will have been added by the event loop when
736 * calling this callback.
737 * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
738 */
739 if (nevents > 1) {
740 /* Subtract 1 to counter the call to uv__work_done(). */
741 uv__metrics_inc_events(loop, nevents - 1);
742 if (uv__get_internal_fields(loop)->current_timeout == 0)
743 uv__metrics_inc_events_waiting(loop, nevents - 1);
744 }
745 }
746
747
uv__queue_work(struct uv__work * w)748 static void uv__queue_work(struct uv__work* w) {
749 uv_work_t* req = container_of(w, uv_work_t, work_req);
750 #ifdef ASYNC_STACKTRACE
751 LibuvSetStackId((uint64_t)req->reserved[3]);
752 #endif
753 req->work_cb(req);
754 }
755
756
uv__queue_done(struct uv__work * w,int err)757 static void uv__queue_done(struct uv__work* w, int err) {
758 uv_work_t* req;
759
760 if (w == NULL) {
761 UV_LOGE("uv_work_t is NULL");
762 return;
763 }
764
765 req = container_of(w, uv_work_t, work_req);
766 #ifdef ASYNC_STACKTRACE
767 LibuvSetStackId((uint64_t)req->reserved[3]);
768 #endif
769 uv__req_unregister(req->loop, req);
770
771 if (req->after_work_cb == NULL)
772 return;
773
774 req->after_work_cb(req, err);
775 }
776
777
778 #ifdef USE_FFRT
uv__ffrt_work(ffrt_executor_task_t * data,ffrt_qos_t qos)779 void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
780 {
781 struct uv__work* w = (struct uv__work *)data;
782 uv_loop_t* loop = w->loop;
783 #ifdef UV_STATISTIC
784 uv__post_statistic_work(w, WORK_EXECUTING);
785 #endif
786 w->work(w);
787 #ifdef UV_STATISTIC
788 uv__post_statistic_work(w, WORK_END);
789 #endif
790 rdlock_closed_uv_loop_rwlock();
791 if (loop->magic != UV_LOOP_MAGIC) {
792 rdunlock_closed_uv_loop_rwlock();
793 UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid",
794 (size_t)loop, loop->magic);
795 return;
796 }
797
798 uv_mutex_lock(&loop->wq_mutex);
799 w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
800
801 if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) {
802 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
803 struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data -
804 (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
805 addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
806 } else {
807 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
808 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
809 uv_async_send(&loop->wq_async);
810 }
811 uv_mutex_unlock(&loop->wq_mutex);
812 rdunlock_closed_uv_loop_rwlock();
813 }
814
init_once(void)815 static void init_once(void)
816 {
817 init_closed_uv_loop_rwlock_once();
818 /* init uv work statics queue */
819 #ifdef UV_STATISTIC
820 init_work_dump_queue();
821 #endif
822 ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
823 }
824
825
826 /* ffrt uv__work_submit */
uv__work_submit(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))827 void uv__work_submit(uv_loop_t* loop,
828 uv_req_t* req,
829 struct uv__work* w,
830 enum uv__work_kind kind,
831 void (*work)(struct uv__work *w),
832 void (*done)(struct uv__work *w, int status)) {
833 uv_once(&once, init_once);
834 ffrt_task_attr_t attr;
835 ffrt_task_attr_init(&attr);
836
837 switch(kind) {
838 case UV__WORK_CPU:
839 ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
840 break;
841 case UV__WORK_FAST_IO:
842 ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
843 break;
844 case UV__WORK_SLOW_IO:
845 ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
846 break;
847 default:
848 #ifdef USE_OHOS_DFX
849 UV_LOGI("Unknown work kind");
850 #endif
851 return;
852 }
853
854 w->loop = loop;
855 w->work = work;
856 w->done = done;
857
858 req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
859 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
860 ffrt_task_attr_destroy(&attr);
861 }
862
863
864 /* ffrt uv__work_submit */
uv__work_submit_with_qos(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,ffrt_qos_t qos,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))865 void uv__work_submit_with_qos(uv_loop_t* loop,
866 uv_req_t* req,
867 struct uv__work* w,
868 ffrt_qos_t qos,
869 void (*work)(struct uv__work *w),
870 void (*done)(struct uv__work *w, int status)) {
871 uv_once(&once, init_once);
872 ffrt_task_attr_t attr;
873 ffrt_task_attr_init(&attr);
874 ffrt_task_attr_set_qos(&attr, qos);
875
876 w->loop = loop;
877 w->work = work;
878 w->done = done;
879
880 req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
881 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
882 ffrt_task_attr_destroy(&attr);
883 }
884 #endif
885
886
uv_queue_work(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb)887 int uv_queue_work(uv_loop_t* loop,
888 uv_work_t* req,
889 uv_work_cb work_cb,
890 uv_after_work_cb after_work_cb) {
891 if (work_cb == NULL)
892 return UV_EINVAL;
893
894 uv__print_active_reqs(loop, "execute");
895 uv__req_init(loop, req, UV_WORK);
896 req->loop = loop;
897 req->work_cb = work_cb;
898 req->after_work_cb = after_work_cb;
899
900 #ifdef UV_STATISTIC
901 struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info));
902 if (info == NULL) {
903 abort();
904 }
905 uv_init_dump_info(info, &req->work_req);
906 info->builtin_return_address[0] = __builtin_return_address(0);
907 info->builtin_return_address[1] = __builtin_return_address(1);
908 info->builtin_return_address[2] = __builtin_return_address(2);
909 (req->work_req).info = info;
910 #endif
911 #ifdef ASYNC_STACKTRACE
912 req->reserved[3] = (void*)LibuvCollectAsyncStack();
913 #endif
914 uv__work_submit(loop,
915 #ifdef USE_FFRT
916 (uv_req_t*)req,
917 #endif
918 &req->work_req,
919 UV__WORK_CPU,
920 uv__queue_work,
921 uv__queue_done
922 );
923 #ifdef UV_STATISTIC
924 uv_queue_statics(info);
925 #endif
926 return 0;
927 }
928
929
uv_queue_work_with_qos(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb,uv_qos_t qos)930 int uv_queue_work_with_qos(uv_loop_t* loop,
931 uv_work_t* req,
932 uv_work_cb work_cb,
933 uv_after_work_cb after_work_cb,
934 uv_qos_t qos) {
935 #ifdef USE_FFRT
936 if (work_cb == NULL)
937 return UV_EINVAL;
938
939 STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
940 STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
941 STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
942 STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
943 STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_deadline_request);
944 if (qos < ffrt_qos_background || qos > ffrt_qos_deadline_request) {
945 return UV_EINVAL;
946 }
947
948 uv__print_active_reqs(loop, "execute");
949 uv__req_init(loop, req, UV_WORK);
950 req->loop = loop;
951 req->work_cb = work_cb;
952 req->after_work_cb = after_work_cb;
953 #ifdef UV_STATISTIC
954 struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info));
955 if (info == NULL) {
956 abort();
957 }
958 uv_init_dump_info(info, &req->work_req);
959 info->builtin_return_address[0] = __builtin_return_address(0);
960 info->builtin_return_address[1] = __builtin_return_address(1);
961 info->builtin_return_address[2] = __builtin_return_address(2);
962 (req->work_req).info = info;
963 #endif
964 uv__work_submit_with_qos(loop,
965 (uv_req_t*)req,
966 &req->work_req,
967 (ffrt_qos_t)qos,
968 uv__queue_work,
969 uv__queue_done);
970 #ifdef UV_STATISTIC
971 uv_queue_statics(info);
972 #endif
973 return 0;
974 #else
975 return uv_queue_work(loop, req, work_cb, after_work_cb);
976 #endif
977 }
978
979
uv_cancel(uv_req_t * req)980 int uv_cancel(uv_req_t* req) {
981 struct uv__work* wreq;
982 uv_loop_t* loop;
983
984 switch (req->type) {
985 case UV_FS:
986 loop = ((uv_fs_t*) req)->loop;
987 wreq = &((uv_fs_t*) req)->work_req;
988 break;
989 case UV_GETADDRINFO:
990 loop = ((uv_getaddrinfo_t*) req)->loop;
991 wreq = &((uv_getaddrinfo_t*) req)->work_req;
992 break;
993 case UV_GETNAMEINFO:
994 loop = ((uv_getnameinfo_t*) req)->loop;
995 wreq = &((uv_getnameinfo_t*) req)->work_req;
996 break;
997 case UV_RANDOM:
998 loop = ((uv_random_t*) req)->loop;
999 wreq = &((uv_random_t*) req)->work_req;
1000 break;
1001 case UV_WORK:
1002 loop = ((uv_work_t*) req)->loop;
1003 wreq = &((uv_work_t*) req)->work_req;
1004 break;
1005 default:
1006 return UV_EINVAL;
1007 }
1008
1009 return uv__work_cancel(loop, req, wreq);
1010 }
1011