1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv-common.h"
23 #include "uv_log.h"
24 #include "uv_trace.h"
25
26 #if !defined(_WIN32)
27 # include "unix/internal.h"
28 #else
29 # include "win/internal.h"
30 #endif
31
32 #include <stdlib.h>
33 #ifdef USE_FFRT
34 #include <assert.h>
35 #include "ffrt_inner.h"
36 #endif
37 #include <stdio.h>
38 #ifdef ASYNC_STACKTRACE
39 #include "dfx/async_stack/libuv_async_stack.h"
40 #endif
41
42 #define MAX_THREADPOOL_SIZE 1024
43 #define UV_TRACE_NAME "UV_TRACE"
44
45 static uv_rwlock_t g_closed_uv_loop_rwlock;
46 static uv_once_t once = UV_ONCE_INIT;
47 static uv_cond_t cond;
48 static uv_mutex_t mutex;
49 static unsigned int idle_threads;
50 static unsigned int nthreads;
51 static uv_thread_t* threads;
52 static uv_thread_t default_threads[4];
53 static struct uv__queue exit_message;
54 static struct uv__queue wq;
55 static struct uv__queue run_slow_work_message;
56 static struct uv__queue slow_io_pending_wq;
57
58
59 #ifdef UV_STATISTIC
60 #define MAX_DUMP_QUEUE_SIZE 200
61 static uv_mutex_t dump_queue_mutex;
62 static QUEUE dump_queue;
63 static unsigned int dump_queue_size;
64
65 static int statistic_idle;
66 static uv_mutex_t statistic_mutex;
67 static QUEUE statistic_works;
68 static uv_cond_t dump_cond;
69 static uv_thread_t dump_thread;
70
uv_dump_worker(void * arg)71 static void uv_dump_worker(void* arg) {
72 struct uv__statistic_work* w;
73 struct uv__queue* q;
74 uv_sem_post((uv_sem_t*) arg);
75 arg = NULL;
76 uv_mutex_lock(&statistic_mutex);
77 for (;;) {
78 while (uv__queue_empty(&statistic_works)) {
79 statistic_idle = 1;
80 uv_cond_wait(&dump_cond, &statistic_mutex);
81 statistic_idle = 0;
82 }
83 q = uv__queue_head(&statistic_works);
84 if (q == &exit_message) {
85 uv_cond_signal(&dump_cond);
86 uv_mutex_unlock(&statistic_mutex);
87 break;
88 }
89 uv__queue_remove(q);
90 uv__queue_init(q);
91 uv_mutex_unlock(&statistic_mutex);
92 w = uv__queue_data(q, struct uv__statistic_work, wq);
93 w->work(w);
94 free(w);
95 uv_mutex_lock(&statistic_mutex);
96 }
97 }
98
post_statistic_work(struct uv__queue * q)99 static void post_statistic_work(struct uv__queue* q) {
100 uv_mutex_lock(&statistic_mutex);
101 uv__queue_insert_tail(&statistic_works, q);
102 if (statistic_idle)
103 uv_cond_signal(&dump_cond);
104 uv_mutex_unlock(&statistic_mutex);
105 }
106
uv__queue_work_info(struct uv__statistic_work * work)107 static void uv__queue_work_info(struct uv__statistic_work *work) {
108 uv_mutex_lock(&dump_queue_mutex);
109 if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */
110 struct uv__queue* q;
111 uv__queue_foreach(q, &dump_queue) {
112 struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
113 if (info->state == DONE_END) {
114 uv__queue_remove(q);
115 free(info);
116 dump_queue_size--;
117 }
118 }
119 if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) {
120 abort(); /* too many works not done. */
121 }
122 }
123
124 uv__queue_insert_head(&dump_queue, &work->info->wq);
125 dump_queue_size++;
126 uv_mutex_unlock(&dump_queue_mutex);
127 }
128
uv__update_work_info(struct uv__statistic_work * work)129 static void uv__update_work_info(struct uv__statistic_work *work) {
130 uv_mutex_lock(&dump_queue_mutex);
131 if (work != NULL && work->info != NULL) {
132 work->info->state = work->state;
133 switch (work->state) {
134 case WAITING:
135 work->info->queue_time = work->time;
136 break;
137 case WORK_EXECUTING:
138 work->info->execute_start_time = work->time;
139 break;
140 case WORK_END:
141 work->info->execute_end_time = work->time;
142 break;
143 case DONE_EXECUTING:
144 work->info->done_start_time = work->time;
145 break;
146 case DONE_END:
147 work->info->done_end_time = work->time;
148 break;
149 default:
150 break;
151 }
152 }
153 uv_mutex_unlock(&dump_queue_mutex);
154 }
155
156
157 // return the timestamp in millisecond
uv__now_timestamp()158 static uint64_t uv__now_timestamp() {
159 uv_timeval64_t tv;
160 int r = uv_gettimeofday(&tv);
161 if (r != 0) {
162 return 0;
163 }
164 return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
165 }
166
167
uv__post_statistic_work(struct uv__work * w,enum uv_work_state state)168 static void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) {
169 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
170 if (dump_work == NULL) {
171 return;
172 }
173 dump_work->info = w->info;
174 dump_work->work = uv__update_work_info;
175 dump_work->time = uv__now_timestamp();
176 dump_work->state = state;
177 uv__queue_init(&dump_work->wq);
178 post_statistic_work(&dump_work->wq);
179 }
180
181
init_work_dump_queue()182 static void init_work_dump_queue()
183 {
184 if (uv_mutex_init(&dump_queue_mutex))
185 abort();
186 uv_mutex_lock(&dump_queue_mutex);
187 uv__queue_init(&dump_queue);
188 dump_queue_size = 0;
189 uv_mutex_unlock(&dump_queue_mutex);
190
191 /* init dump thread */
192 statistic_idle = 1;
193 if (uv_mutex_init(&statistic_mutex))
194 abort();
195 uv__queue_init(&statistic_works);
196 uv_sem_t sem;
197 if (uv_cond_init(&dump_cond))
198 abort();
199 if (uv_sem_init(&sem, 0))
200 abort();
201 if (uv_thread_create(&dump_thread, uv_dump_worker, &sem))
202 abort();
203 uv_sem_wait(&sem);
204 uv_sem_destroy(&sem);
205 }
206
207
uv_init_dump_info(struct uv_work_dump_info * info,struct uv__work * w)208 void uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) {
209 if (info == NULL)
210 return;
211 info->queue_time = 0;
212 info->state = WAITING;
213 info->execute_start_time = 0;
214 info->execute_end_time = 0;
215 info->done_start_time = 0;
216 info->done_end_time = 0;
217 info->work = w;
218 uv__queue_init(&info->wq);
219 }
220
221
uv_queue_statics(struct uv_work_dump_info * info)222 void uv_queue_statics(struct uv_work_dump_info* info) {
223 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
224 if (dump_work == NULL) {
225 abort();
226 }
227 dump_work->info = info;
228 dump_work->work = uv__queue_work_info;
229 info->queue_time = uv__now_timestamp();
230 dump_work->state = WAITING;
231 uv__queue_init(&dump_work->wq);
232 post_statistic_work(&dump_work->wq);
233 }
234
235
uv_dump_work_queue(int * size)236 uv_worker_info_t* uv_dump_work_queue(int* size) {
237 #ifdef UV_STATISTIC
238 uv_mutex_lock(&dump_queue_mutex);
239 if (uv__queue_empty(&dump_queue)) {
240 return NULL;
241 }
242 *size = dump_queue_size;
243 uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size);
244 struct uv__queue* q;
245 int i = 0;
246 uv__queue_foreach(q, &dump_queue) {
247 struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
248 dump_info[i].queue_time = info->queue_time;
249 dump_info[i].builtin_return_address[0] = info->builtin_return_address[0];
250 dump_info[i].builtin_return_address[1] = info->builtin_return_address[1];
251 dump_info[i].builtin_return_address[2] = info->builtin_return_address[2];
252 switch (info->state) {
253 case WAITING:
254 strcpy(dump_info[i].state, "waiting");
255 break;
256 case WORK_EXECUTING:
257 strcpy(dump_info[i].state, "work_executing");
258 break;
259 case WORK_END:
260 strcpy(dump_info[i].state, "work_end");
261 break;
262 case DONE_EXECUTING:
263 strcpy(dump_info[i].state, "done_executing");
264 break;
265 case DONE_END:
266 strcpy(dump_info[i].state, "done_end");
267 break;
268 default:
269 break;
270 }
271 dump_info[i].execute_start_time = info->execute_start_time;
272 dump_info[i].execute_end_time = info->execute_end_time;
273 dump_info[i].done_start_time = info->done_start_time;
274 dump_info[i].done_end_time = info->done_end_time;
275 ++i;
276 }
277 uv_mutex_unlock(&dump_queue_mutex);
278 return dump_info;
279 #else
280 size = 0;
281 return NULL;
282 #endif
283 }
284 #endif
285
286
init_closed_uv_loop_rwlock_once(void)287 static void init_closed_uv_loop_rwlock_once(void) {
288 uv_rwlock_init(&g_closed_uv_loop_rwlock);
289 }
290
291
rdlock_closed_uv_loop_rwlock(void)292 void rdlock_closed_uv_loop_rwlock(void) {
293 uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
294 }
295
296
rdunlock_closed_uv_loop_rwlock(void)297 void rdunlock_closed_uv_loop_rwlock(void) {
298 uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
299 }
300
301
is_uv_loop_good_magic(const uv_loop_t * loop)302 int is_uv_loop_good_magic(const uv_loop_t* loop) {
303 if (loop->magic == UV_LOOP_MAGIC) {
304 return 1;
305 }
306 UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop, loop->magic);
307 return 0;
308 }
309
310
on_uv_loop_close(uv_loop_t * loop)311 void on_uv_loop_close(uv_loop_t* loop) {
312 time_t t1, t2;
313 time(&t1);
314 uv_start_trace(UV_TRACE_TAG, "Get Write Lock");
315 uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
316 uv_end_trace(UV_TRACE_TAG);
317 loop->magic = ~UV_LOOP_MAGIC;
318 uv_start_trace(UV_TRACE_TAG, "Release Write Lock");
319 uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
320 uv_end_trace(UV_TRACE_TAG);
321 time(&t2);
322 UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop, (ssize_t)(t2 - t1));
323 }
324
325
uv__cancelled(struct uv__work * w)326 static void uv__cancelled(struct uv__work* w) {
327 abort();
328 }
329
330
331 #ifndef USE_FFRT
332 static unsigned int slow_io_work_running;
333
slow_work_thread_threshold(void)334 static unsigned int slow_work_thread_threshold(void) {
335 return (nthreads + 1) / 2;
336 }
337
338
339 /* To avoid deadlock with uv_cancel() it's crucial that the worker
340 * never holds the global mutex and the loop-local mutex at the same time.
341 */
worker(void * arg)342 static void worker(void* arg) {
343 struct uv__work* w;
344 struct uv__queue* q;
345 int is_slow_work;
346
347 uv_sem_post((uv_sem_t*) arg);
348 arg = NULL;
349
350 uv_mutex_lock(&mutex);
351 for (;;) {
352 /* `mutex` should always be locked at this point. */
353
354 /* Keep waiting while either no work is present or only slow I/O
355 and we're at the threshold for that. */
356 while (uv__queue_empty(&wq) ||
357 (uv__queue_head(&wq) == &run_slow_work_message &&
358 uv__queue_next(&run_slow_work_message) == &wq &&
359 slow_io_work_running >= slow_work_thread_threshold())) {
360 idle_threads += 1;
361 uv_cond_wait(&cond, &mutex);
362 idle_threads -= 1;
363 }
364
365 q = uv__queue_head(&wq);
366 if (q == &exit_message) {
367 uv_cond_signal(&cond);
368 uv_mutex_unlock(&mutex);
369 break;
370 }
371
372 uv__queue_remove(q);
373 uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */
374
375 is_slow_work = 0;
376 if (q == &run_slow_work_message) {
377 /* If we're at the slow I/O threshold, re-schedule until after all
378 other work in the queue is done. */
379 if (slow_io_work_running >= slow_work_thread_threshold()) {
380 uv__queue_insert_tail(&wq, q);
381 continue;
382 }
383
384 /* If we encountered a request to run slow I/O work but there is none
385 to run, that means it's cancelled => Start over. */
386 if (uv__queue_empty(&slow_io_pending_wq))
387 continue;
388
389 is_slow_work = 1;
390 slow_io_work_running++;
391
392 q = uv__queue_head(&slow_io_pending_wq);
393 uv__queue_remove(q);
394 uv__queue_init(q);
395
396 /* If there is more slow I/O work, schedule it to be run as well. */
397 if (!uv__queue_empty(&slow_io_pending_wq)) {
398 uv__queue_insert_tail(&wq, &run_slow_work_message);
399 if (idle_threads > 0)
400 uv_cond_signal(&cond);
401 }
402 }
403
404 uv_mutex_unlock(&mutex);
405
406 w = uv__queue_data(q, struct uv__work, wq);
407 #ifdef UV_STATISTIC
408 uv__post_statistic_work(w, WORK_EXECUTING);
409 #endif
410 #ifdef ASYNC_STACKTRACE
411 uv_work_t* req = container_of(w, uv_work_t, work_req);
412 LibuvSetStackId((uint64_t)req->reserved[3]);
413 #endif
414 w->work(w);
415 #ifdef UV_STATISTIC
416 uv__post_statistic_work(w, WORK_END);
417 #endif
418 uv_mutex_lock(&w->loop->wq_mutex);
419 w->work = NULL; /* Signal uv_cancel() that the work req is done
420 executing. */
421 uv__queue_insert_tail(&w->loop->wq, &w->wq);
422 uv_async_send(&w->loop->wq_async);
423 uv_mutex_unlock(&w->loop->wq_mutex);
424
425 /* Lock `mutex` since that is expected at the start of the next
426 * iteration. */
427 uv_mutex_lock(&mutex);
428 if (is_slow_work) {
429 /* `slow_io_work_running` is protected by `mutex`. */
430 slow_io_work_running--;
431 }
432 }
433 }
434 #endif
435
436
post(struct uv__queue * q,enum uv__work_kind kind)437 static void post(struct uv__queue* q, enum uv__work_kind kind) {
438 uv_mutex_lock(&mutex);
439 if (kind == UV__WORK_SLOW_IO) {
440 /* Insert into a separate queue. */
441 uv__queue_insert_tail(&slow_io_pending_wq, q);
442 if (!uv__queue_empty(&run_slow_work_message)) {
443 /* Running slow I/O tasks is already scheduled => Nothing to do here.
444 The worker that runs said other task will schedule this one as well. */
445 uv_mutex_unlock(&mutex);
446 return;
447 }
448 q = &run_slow_work_message;
449 }
450
451 uv__queue_insert_tail(&wq, q);
452 if (idle_threads > 0)
453 uv_cond_signal(&cond);
454 uv_mutex_unlock(&mutex);
455 }
456
457
458 #ifdef __MVS__
459 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */
460 __attribute__((destructor))
461 #endif
uv__threadpool_cleanup(void)462 void uv__threadpool_cleanup(void) {
463 unsigned int i;
464
465 if (nthreads == 0)
466 return;
467
468 #ifndef __MVS__
469 /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
470 post(&exit_message, UV__WORK_CPU);
471 #endif
472
473 for (i = 0; i < nthreads; i++)
474 if (uv_thread_join(threads + i))
475 abort();
476
477 if (threads != default_threads)
478 uv__free(threads);
479
480 uv_mutex_destroy(&mutex);
481 uv_cond_destroy(&cond);
482
483 threads = NULL;
484 nthreads = 0;
485 #ifdef UV_STATISTIC
486 post_statistic_work(&exit_message);
487 uv_thread_join(dump_thread);
488 uv_mutex_destroy(&statistic_mutex);
489 uv_cond_destroy(&dump_cond);
490 #endif
491 }
492
493
494 #ifndef USE_FFRT
init_threads(void)495 static void init_threads(void) {
496 uv_thread_options_t config;
497 unsigned int i;
498 const char* val;
499 uv_sem_t sem;
500
501 nthreads = ARRAY_SIZE(default_threads);
502 val = getenv("UV_THREADPOOL_SIZE");
503 if (val != NULL)
504 nthreads = atoi(val);
505 if (nthreads == 0)
506 nthreads = 1;
507 if (nthreads > MAX_THREADPOOL_SIZE)
508 nthreads = MAX_THREADPOOL_SIZE;
509
510 threads = default_threads;
511 if (nthreads > ARRAY_SIZE(default_threads)) {
512 threads = uv__malloc(nthreads * sizeof(threads[0]));
513 if (threads == NULL) {
514 nthreads = ARRAY_SIZE(default_threads);
515 threads = default_threads;
516 }
517 }
518
519 if (uv_cond_init(&cond))
520 abort();
521
522 if (uv_mutex_init(&mutex))
523 abort();
524
525 uv__queue_init(&wq);
526 uv__queue_init(&slow_io_pending_wq);
527 uv__queue_init(&run_slow_work_message);
528
529 if (uv_sem_init(&sem, 0))
530 abort();
531
532 config.flags = UV_THREAD_HAS_STACK_SIZE;
533 config.stack_size = 8u << 20; /* 8 MB */
534
535 for (i = 0; i < nthreads; i++)
536 if (uv_thread_create_ex(threads + i, &config, worker, &sem))
537 abort();
538
539 for (i = 0; i < nthreads; i++)
540 uv_sem_wait(&sem);
541
542 uv_sem_destroy(&sem);
543 }
544
545
546 #ifndef _WIN32
reset_once(void)547 static void reset_once(void) {
548 uv_once_t child_once = UV_ONCE_INIT;
549 memcpy(&once, &child_once, sizeof(child_once));
550 }
551 #endif
552
553
init_once(void)554 static void init_once(void) {
555 #ifndef _WIN32
556 /* Re-initialize the threadpool after fork.
557 * Note that this discards the global mutex and condition as well
558 * as the work queue.
559 */
560 if (pthread_atfork(NULL, NULL, &reset_once))
561 abort();
562 #endif
563 init_closed_uv_loop_rwlock_once();
564 #ifdef UV_STATISTIC
565 init_work_dump_queue();
566 #endif
567 init_threads();
568 }
569
570
uv__work_submit(uv_loop_t * loop,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))571 void uv__work_submit(uv_loop_t* loop,
572 struct uv__work* w,
573 enum uv__work_kind kind,
574 void (*work)(struct uv__work* w),
575 void (*done)(struct uv__work* w, int status)) {
576 uv_once(&once, init_once);
577 w->loop = loop;
578 w->work = work;
579 w->done = done;
580 post(&w->wq, kind);
581 }
582 #endif
583
584
585 #ifdef USE_FFRT
uv__task_done_wrapper(void * work,int status)586 static void uv__task_done_wrapper(void* work, int status) {
587 struct uv__work* w = (struct uv__work*)work;
588 w->done(w, status);
589 }
590 #endif
591
592
593 /* TODO(bnoordhuis) teach libuv how to cancel file operations
594 * that go through io_uring instead of the thread pool.
595 */
uv__work_cancel(uv_loop_t * loop,uv_req_t * req,struct uv__work * w)596 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
597 int cancelled;
598
599 rdlock_closed_uv_loop_rwlock();
600 if (!is_uv_loop_good_magic(w->loop)) {
601 rdunlock_closed_uv_loop_rwlock();
602 return 0;
603 }
604
605 #ifndef USE_FFRT
606 uv_mutex_lock(&mutex);
607 uv_mutex_lock(&w->loop->wq_mutex);
608
609 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
610 if (cancelled)
611 uv__queue_remove(&w->wq);
612
613 uv_mutex_unlock(&w->loop->wq_mutex);
614 uv_mutex_unlock(&mutex);
615 #else
616 uv_mutex_lock(&w->loop->wq_mutex);
617 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
618 && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]);
619 uv_mutex_unlock(&w->loop->wq_mutex);
620 #endif
621
622 if (!cancelled) {
623 rdunlock_closed_uv_loop_rwlock();
624 return UV_EBUSY;
625 }
626
627 w->work = uv__cancelled;
628 uv_mutex_lock(&loop->wq_mutex);
629 #ifndef USE_FFRT
630 uv__queue_insert_tail(&loop->wq, &w->wq);
631 uv_async_send(&loop->wq_async);
632 #else
633 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
634 int qos = (ffrt_qos_t)(intptr_t)req->reserved[0];
635
636 if (uv_check_data_valid((struct uv_loop_data*)(w->loop->data)) == 0) {
637 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
638 struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data -
639 (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
640 addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
641 } else {
642 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
643 uv_async_send(&loop->wq_async);
644 }
645 #endif
646 uv_mutex_unlock(&loop->wq_mutex);
647 rdunlock_closed_uv_loop_rwlock();
648
649 return 0;
650 }
651
652
uv__work_done(uv_async_t * handle)653 void uv__work_done(uv_async_t* handle) {
654 struct uv__work* w;
655 uv_loop_t* loop;
656 struct uv__queue* q;
657 struct uv__queue wq;
658 int err;
659 int nevents;
660
661 loop = container_of(handle, uv_loop_t, wq_async);
662 rdlock_closed_uv_loop_rwlock();
663 if (!is_uv_loop_good_magic(loop)) {
664 rdunlock_closed_uv_loop_rwlock();
665 return;
666 }
667 rdunlock_closed_uv_loop_rwlock();
668
669 uv_mutex_lock(&loop->wq_mutex);
670 #ifndef USE_FFRT
671 uv__queue_move(&loop->wq, &wq);
672 #else
673 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
674 int i;
675 uv__queue_init(&wq);
676 for (i = 3; i >= 0; i--) {
677 if (!uv__queue_empty(&lfields->wq_sub[i])) {
678 uv__queue_append(&lfields->wq_sub[i], &wq);
679 }
680 }
681 #endif
682 uv_mutex_unlock(&loop->wq_mutex);
683
684 nevents = 0;
685 uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
686 while (!uv__queue_empty(&wq)) {
687 q = uv__queue_head(&wq);
688 uv__queue_remove(q);
689
690 w = container_of(q, struct uv__work, wq);
691 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
692 #ifdef UV_STATISTIC
693 uv__post_statistic_work(w, DONE_EXECUTING);
694 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
695 if (dump_work == NULL) {
696 UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno));
697 break;
698 }
699 dump_work->info = w->info;
700 dump_work->work = uv__update_work_info;
701 #endif
702 #ifdef ASYNC_STACKTRACE
703 uv_work_t* req = container_of(w, uv_work_t, work_req);
704 LibuvSetStackId((uint64_t)req->reserved[3]);
705 #endif
706 w->done(w, err);
707 nevents++;
708 #ifdef UV_STATISTIC
709 dump_work->time = uv__now_timestamp();
710 dump_work->state = DONE_END;
711 QUEUE_INIT(&dump_work->wq);
712 post_statistic_work(&dump_work->wq);
713 #endif
714 }
715 uv_end_trace(UV_TRACE_TAG);
716
717 /* This check accomplishes 2 things:
718 * 1. Even if the queue was empty, the call to uv__work_done() should count
719 * as an event. Which will have been added by the event loop when
720 * calling this callback.
721 * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
722 */
723 if (nevents > 1) {
724 /* Subtract 1 to counter the call to uv__work_done(). */
725 uv__metrics_inc_events(loop, nevents - 1);
726 if (uv__get_internal_fields(loop)->current_timeout == 0)
727 uv__metrics_inc_events_waiting(loop, nevents - 1);
728 }
729 }
730
731
uv__queue_work(struct uv__work * w)732 static void uv__queue_work(struct uv__work* w) {
733 uv_work_t* req = container_of(w, uv_work_t, work_req);
734
735 req->work_cb(req);
736 }
737
738
uv__queue_done(struct uv__work * w,int err)739 static void uv__queue_done(struct uv__work* w, int err) {
740 uv_work_t* req;
741
742 if (w == NULL) {
743 UV_LOGE("uv_work_t is NULL");
744 return;
745 }
746
747 req = container_of(w, uv_work_t, work_req);
748 uv__req_unregister(req->loop, req);
749
750 if (req->after_work_cb == NULL)
751 return;
752
753 req->after_work_cb(req, err);
754 }
755
756
757 #ifdef USE_FFRT
uv__ffrt_work(ffrt_executor_task_t * data,ffrt_qos_t qos)758 void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
759 {
760 struct uv__work* w = (struct uv__work *)data;
761 uv_loop_t* loop = w->loop;
762 #ifdef UV_STATISTIC
763 uv__post_statistic_work(w, WORK_EXECUTING);
764 #endif
765 uv_work_t* req = container_of(w, uv_work_t, work_req);
766 #ifdef ASYNC_STACKTRACE
767 LibuvSetStackId((uint64_t)req->reserved[3]);
768 #endif
769 w->work(w);
770 #ifdef UV_STATISTIC
771 uv__post_statistic_work(w, WORK_END);
772 #endif
773 rdlock_closed_uv_loop_rwlock();
774 if (loop->magic != UV_LOOP_MAGIC) {
775 rdunlock_closed_uv_loop_rwlock();
776 UV_LOGE("uv_loop(%{public}zu:%{public}#x) in task(%p:%p) is invalid",
777 (size_t)loop, loop->magic, req->work_cb, req->after_work_cb);
778 return;
779 }
780
781 uv_mutex_lock(&loop->wq_mutex);
782 w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
783
784 if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) {
785 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
786 struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data -
787 (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
788 addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
789 } else {
790 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
791 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
792 uv_async_send(&loop->wq_async);
793 }
794 uv_mutex_unlock(&loop->wq_mutex);
795 rdunlock_closed_uv_loop_rwlock();
796 }
797
init_once(void)798 static void init_once(void)
799 {
800 init_closed_uv_loop_rwlock_once();
801 /* init uv work statics queue */
802 #ifdef UV_STATISTIC
803 init_work_dump_queue();
804 #endif
805 ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
806 }
807
808
809 /* ffrt uv__work_submit */
uv__work_submit(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))810 void uv__work_submit(uv_loop_t* loop,
811 uv_req_t* req,
812 struct uv__work* w,
813 enum uv__work_kind kind,
814 void (*work)(struct uv__work *w),
815 void (*done)(struct uv__work *w, int status)) {
816 uv_once(&once, init_once);
817 ffrt_task_attr_t attr;
818 ffrt_task_attr_init(&attr);
819
820 switch(kind) {
821 case UV__WORK_CPU:
822 ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
823 break;
824 case UV__WORK_FAST_IO:
825 ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
826 break;
827 case UV__WORK_SLOW_IO:
828 ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
829 break;
830 default:
831 #ifdef USE_OHOS_DFX
832 UV_LOGI("Unknown work kind");
833 #endif
834 return;
835 }
836
837 w->loop = loop;
838 w->work = work;
839 w->done = done;
840
841 req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
842 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
843 ffrt_task_attr_destroy(&attr);
844 }
845
846
847 /* ffrt uv__work_submit */
uv__work_submit_with_qos(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,ffrt_qos_t qos,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))848 void uv__work_submit_with_qos(uv_loop_t* loop,
849 uv_req_t* req,
850 struct uv__work* w,
851 ffrt_qos_t qos,
852 void (*work)(struct uv__work *w),
853 void (*done)(struct uv__work *w, int status)) {
854 uv_once(&once, init_once);
855 ffrt_task_attr_t attr;
856 ffrt_task_attr_init(&attr);
857 ffrt_task_attr_set_qos(&attr, qos);
858
859 w->loop = loop;
860 w->work = work;
861 w->done = done;
862
863 req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
864 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
865 ffrt_task_attr_destroy(&attr);
866 }
867 #endif
868
869
uv_queue_work(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb)870 int uv_queue_work(uv_loop_t* loop,
871 uv_work_t* req,
872 uv_work_cb work_cb,
873 uv_after_work_cb after_work_cb) {
874 if (work_cb == NULL)
875 return UV_EINVAL;
876
877 uv__req_init(loop, req, UV_WORK);
878 req->loop = loop;
879 req->work_cb = work_cb;
880 req->after_work_cb = after_work_cb;
881
882 #ifdef UV_STATISTIC
883 struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info));
884 if (info == NULL) {
885 abort();
886 }
887 uv_init_dump_info(info, &req->work_req);
888 info->builtin_return_address[0] = __builtin_return_address(0);
889 info->builtin_return_address[1] = __builtin_return_address(1);
890 info->builtin_return_address[2] = __builtin_return_address(2);
891 (req->work_req).info = info;
892 #endif
893 #ifdef ASYNC_STACKTRACE
894 req->reserved[3] = (void*)LibuvCollectAsyncStack();
895 #endif
896 uv__work_submit(loop,
897 #ifdef USE_FFRT
898 (uv_req_t*)req,
899 #endif
900 &req->work_req,
901 UV__WORK_CPU,
902 uv__queue_work,
903 uv__queue_done
904 );
905 #ifdef UV_STATISTIC
906 uv_queue_statics(info);
907 #endif
908 return 0;
909 }
910
911
uv_queue_work_with_qos(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb,uv_qos_t qos)912 int uv_queue_work_with_qos(uv_loop_t* loop,
913 uv_work_t* req,
914 uv_work_cb work_cb,
915 uv_after_work_cb after_work_cb,
916 uv_qos_t qos) {
917 #ifdef USE_FFRT
918 if (work_cb == NULL)
919 return UV_EINVAL;
920
921 STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
922 STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
923 STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
924 STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
925 if (qos < ffrt_qos_background || qos > ffrt_qos_user_initiated) {
926 return UV_EINVAL;
927 }
928
929 uv__req_init(loop, req, UV_WORK);
930 req->loop = loop;
931 req->work_cb = work_cb;
932 req->after_work_cb = after_work_cb;
933 #ifdef UV_STATISTIC
934 struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info));
935 if (info == NULL) {
936 abort();
937 }
938 uv_init_dump_info(info, &req->work_req);
939 info->builtin_return_address[0] = __builtin_return_address(0);
940 info->builtin_return_address[1] = __builtin_return_address(1);
941 info->builtin_return_address[2] = __builtin_return_address(2);
942 (req->work_req).info = info;
943 #endif
944 uv__work_submit_with_qos(loop,
945 (uv_req_t*)req,
946 &req->work_req,
947 (ffrt_qos_t)qos,
948 uv__queue_work,
949 uv__queue_done);
950 #ifdef UV_STATISTIC
951 uv_queue_statics(info);
952 #endif
953 return 0;
954 #else
955 return uv_queue_work(loop, req, work_cb, after_work_cb);
956 #endif
957 }
958
959
uv_cancel(uv_req_t * req)960 int uv_cancel(uv_req_t* req) {
961 struct uv__work* wreq;
962 uv_loop_t* loop;
963
964 switch (req->type) {
965 case UV_FS:
966 loop = ((uv_fs_t*) req)->loop;
967 wreq = &((uv_fs_t*) req)->work_req;
968 break;
969 case UV_GETADDRINFO:
970 loop = ((uv_getaddrinfo_t*) req)->loop;
971 wreq = &((uv_getaddrinfo_t*) req)->work_req;
972 break;
973 case UV_GETNAMEINFO:
974 loop = ((uv_getnameinfo_t*) req)->loop;
975 wreq = &((uv_getnameinfo_t*) req)->work_req;
976 break;
977 case UV_RANDOM:
978 loop = ((uv_random_t*) req)->loop;
979 wreq = &((uv_random_t*) req)->work_req;
980 break;
981 case UV_WORK:
982 loop = ((uv_work_t*) req)->loop;
983 wreq = &((uv_work_t*) req)->work_req;
984 break;
985 default:
986 return UV_EINVAL;
987 }
988
989 return uv__work_cancel(loop, req, wreq);
990 }
991