1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv-common.h"
23 #include "uv_log.h"
24 #include "uv_trace.h"
25
26 #if !defined(_WIN32)
27 # include "unix/internal.h"
28 #else
29 # include "win/internal.h"
30 #endif
31
32 #include <stdlib.h>
33 #ifdef USE_FFRT
34 #include <assert.h>
35 #include "ffrt_inner.h"
36 #endif
37 #include <stdio.h>
38 #ifdef ASYNC_STACKTRACE
39 #include "dfx/async_stack/libuv_async_stack.h"
40 #endif
41
42 #define MAX_THREADPOOL_SIZE 1024
43 #define UV_TRACE_NAME "UV_TRACE"
44
45 #ifdef USE_OHOS_DFX
46 #define MIN_REQS_THRESHOLD 100
47 #define MAX_REQS_THRESHOLD 300
48 #define CURSOR 5
49 #endif
50
51 static uv_rwlock_t g_closed_uv_loop_rwlock;
52 static uv_once_t once = UV_ONCE_INIT;
53 static uv_cond_t cond;
54 static uv_mutex_t mutex;
55 static unsigned int idle_threads;
56 static unsigned int nthreads;
57 static uv_thread_t* threads;
58 static uv_thread_t default_threads[4];
59 static struct uv__queue exit_message;
60 static struct uv__queue wq;
61 static struct uv__queue run_slow_work_message;
62 static struct uv__queue slow_io_pending_wq;
63
64
65 #ifdef UV_STATISTIC
66 #define MAX_DUMP_QUEUE_SIZE 200
67 static uv_mutex_t dump_queue_mutex;
68 static QUEUE dump_queue;
69 static unsigned int dump_queue_size;
70
71 static int statistic_idle;
72 static uv_mutex_t statistic_mutex;
73 static QUEUE statistic_works;
74 static uv_cond_t dump_cond;
75 static uv_thread_t dump_thread;
76
uv_dump_worker(void * arg)77 static void uv_dump_worker(void* arg) {
78 struct uv__statistic_work* w;
79 struct uv__queue* q;
80 uv_sem_post((uv_sem_t*) arg);
81 arg = NULL;
82 uv_mutex_lock(&statistic_mutex);
83 for (;;) {
84 while (uv__queue_empty(&statistic_works)) {
85 statistic_idle = 1;
86 uv_cond_wait(&dump_cond, &statistic_mutex);
87 statistic_idle = 0;
88 }
89 q = uv__queue_head(&statistic_works);
90 if (q == &exit_message) {
91 uv_cond_signal(&dump_cond);
92 uv_mutex_unlock(&statistic_mutex);
93 break;
94 }
95 uv__queue_remove(q);
96 uv__queue_init(q);
97 uv_mutex_unlock(&statistic_mutex);
98 w = uv__queue_data(q, struct uv__statistic_work, wq);
99 w->work(w);
100 free(w);
101 uv_mutex_lock(&statistic_mutex);
102 }
103 }
104
post_statistic_work(struct uv__queue * q)105 static void post_statistic_work(struct uv__queue* q) {
106 uv_mutex_lock(&statistic_mutex);
107 uv__queue_insert_tail(&statistic_works, q);
108 if (statistic_idle)
109 uv_cond_signal(&dump_cond);
110 uv_mutex_unlock(&statistic_mutex);
111 }
112
uv__queue_work_info(struct uv__statistic_work * work)113 static void uv__queue_work_info(struct uv__statistic_work *work) {
114 uv_mutex_lock(&dump_queue_mutex);
115 if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) { /* release works already done */
116 struct uv__queue* q;
117 uv__queue_foreach(q, &dump_queue) {
118 struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
119 if (info->state == DONE_END) {
120 uv__queue_remove(q);
121 free(info);
122 dump_queue_size--;
123 }
124 }
125 if (dump_queue_size + 1 > MAX_DUMP_QUEUE_SIZE) {
126 abort(); /* too many works not done. */
127 }
128 }
129
130 uv__queue_insert_head(&dump_queue, &work->info->wq);
131 dump_queue_size++;
132 uv_mutex_unlock(&dump_queue_mutex);
133 }
134
uv__update_work_info(struct uv__statistic_work * work)135 static void uv__update_work_info(struct uv__statistic_work *work) {
136 uv_mutex_lock(&dump_queue_mutex);
137 if (work != NULL && work->info != NULL) {
138 work->info->state = work->state;
139 switch (work->state) {
140 case WAITING:
141 work->info->queue_time = work->time;
142 break;
143 case WORK_EXECUTING:
144 work->info->execute_start_time = work->time;
145 break;
146 case WORK_END:
147 work->info->execute_end_time = work->time;
148 break;
149 case DONE_EXECUTING:
150 work->info->done_start_time = work->time;
151 break;
152 case DONE_END:
153 work->info->done_end_time = work->time;
154 break;
155 default:
156 break;
157 }
158 }
159 uv_mutex_unlock(&dump_queue_mutex);
160 }
161
162
163 // return the timestamp in millisecond
uv__now_timestamp()164 static uint64_t uv__now_timestamp() {
165 uv_timeval64_t tv;
166 int r = uv_gettimeofday(&tv);
167 if (r != 0) {
168 return 0;
169 }
170 return (uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
171 }
172
173
uv__post_statistic_work(struct uv__work * w,enum uv_work_state state)174 static void uv__post_statistic_work(struct uv__work *w, enum uv_work_state state) {
175 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
176 if (dump_work == NULL) {
177 return;
178 }
179 dump_work->info = w->info;
180 dump_work->work = uv__update_work_info;
181 dump_work->time = uv__now_timestamp();
182 dump_work->state = state;
183 uv__queue_init(&dump_work->wq);
184 post_statistic_work(&dump_work->wq);
185 }
186
187
init_work_dump_queue()188 static void init_work_dump_queue()
189 {
190 if (uv_mutex_init(&dump_queue_mutex))
191 abort();
192 uv_mutex_lock(&dump_queue_mutex);
193 uv__queue_init(&dump_queue);
194 dump_queue_size = 0;
195 uv_mutex_unlock(&dump_queue_mutex);
196
197 /* init dump thread */
198 statistic_idle = 1;
199 if (uv_mutex_init(&statistic_mutex))
200 abort();
201 uv__queue_init(&statistic_works);
202 uv_sem_t sem;
203 if (uv_cond_init(&dump_cond))
204 abort();
205 if (uv_sem_init(&sem, 0))
206 abort();
207 if (uv_thread_create(&dump_thread, uv_dump_worker, &sem))
208 abort();
209 uv_sem_wait(&sem);
210 uv_sem_destroy(&sem);
211 }
212
213
uv_init_dump_info(struct uv_work_dump_info * info,struct uv__work * w)214 void uv_init_dump_info(struct uv_work_dump_info* info, struct uv__work* w) {
215 if (info == NULL)
216 return;
217 info->queue_time = 0;
218 info->state = WAITING;
219 info->execute_start_time = 0;
220 info->execute_end_time = 0;
221 info->done_start_time = 0;
222 info->done_end_time = 0;
223 info->work = w;
224 uv__queue_init(&info->wq);
225 }
226
227
uv_queue_statics(struct uv_work_dump_info * info)228 void uv_queue_statics(struct uv_work_dump_info* info) {
229 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
230 if (dump_work == NULL) {
231 abort();
232 }
233 dump_work->info = info;
234 dump_work->work = uv__queue_work_info;
235 info->queue_time = uv__now_timestamp();
236 dump_work->state = WAITING;
237 uv__queue_init(&dump_work->wq);
238 post_statistic_work(&dump_work->wq);
239 }
240
241
uv_dump_work_queue(int * size)242 uv_worker_info_t* uv_dump_work_queue(int* size) {
243 #ifdef UV_STATISTIC
244 uv_mutex_lock(&dump_queue_mutex);
245 if (uv__queue_empty(&dump_queue)) {
246 return NULL;
247 }
248 *size = dump_queue_size;
249 uv_worker_info_t* dump_info = (uv_worker_info_t*) malloc(sizeof(uv_worker_info_t) * dump_queue_size);
250 struct uv__queue* q;
251 int i = 0;
252 uv__queue_foreach(q, &dump_queue) {
253 struct uv_work_dump_info* info = uv__queue_data(q, struct uv_work_dump_info, wq);
254 dump_info[i].queue_time = info->queue_time;
255 dump_info[i].builtin_return_address[0] = info->builtin_return_address[0];
256 dump_info[i].builtin_return_address[1] = info->builtin_return_address[1];
257 dump_info[i].builtin_return_address[2] = info->builtin_return_address[2];
258 switch (info->state) {
259 case WAITING:
260 strcpy(dump_info[i].state, "waiting");
261 break;
262 case WORK_EXECUTING:
263 strcpy(dump_info[i].state, "work_executing");
264 break;
265 case WORK_END:
266 strcpy(dump_info[i].state, "work_end");
267 break;
268 case DONE_EXECUTING:
269 strcpy(dump_info[i].state, "done_executing");
270 break;
271 case DONE_END:
272 strcpy(dump_info[i].state, "done_end");
273 break;
274 default:
275 break;
276 }
277 dump_info[i].execute_start_time = info->execute_start_time;
278 dump_info[i].execute_end_time = info->execute_end_time;
279 dump_info[i].done_start_time = info->done_start_time;
280 dump_info[i].done_end_time = info->done_end_time;
281 ++i;
282 }
283 uv_mutex_unlock(&dump_queue_mutex);
284 return dump_info;
285 #else
286 size = 0;
287 return NULL;
288 #endif
289 }
290 #endif
291
292
init_closed_uv_loop_rwlock_once(void)293 static void init_closed_uv_loop_rwlock_once(void) {
294 uv_rwlock_init(&g_closed_uv_loop_rwlock);
295 }
296
297
rdlock_closed_uv_loop_rwlock(void)298 void rdlock_closed_uv_loop_rwlock(void) {
299 uv_rwlock_rdlock(&g_closed_uv_loop_rwlock);
300 }
301
302
rdunlock_closed_uv_loop_rwlock(void)303 void rdunlock_closed_uv_loop_rwlock(void) {
304 uv_rwlock_rdunlock(&g_closed_uv_loop_rwlock);
305 }
306
307
is_uv_loop_good_magic(const uv_loop_t * loop)308 int is_uv_loop_good_magic(const uv_loop_t* loop) {
309 if (loop->magic == UV_LOOP_MAGIC) {
310 return 1;
311 }
312 UV_LOGE("loop:(%{public}zu:%{public}#x) invalid", (size_t)loop, loop->magic);
313 return 0;
314 }
315
316
on_uv_loop_close(uv_loop_t * loop)317 void on_uv_loop_close(uv_loop_t* loop) {
318 time_t t1, t2;
319 time(&t1);
320 uv_start_trace(UV_TRACE_TAG, "Get Write Lock");
321 uv_rwlock_wrlock(&g_closed_uv_loop_rwlock);
322 uv_end_trace(UV_TRACE_TAG);
323 loop->magic = ~UV_LOOP_MAGIC;
324 uv_start_trace(UV_TRACE_TAG, "Release Write Lock");
325 uv_rwlock_wrunlock(&g_closed_uv_loop_rwlock);
326 uv_end_trace(UV_TRACE_TAG);
327 time(&t2);
328 UV_LOGI("loop:(%{public}zu) closed in %{public}zds", (size_t)loop, (ssize_t)(t2 - t1));
329 }
330
331
uv__cancelled(struct uv__work * w)332 static void uv__cancelled(struct uv__work* w) {
333 abort();
334 }
335
336
337 #ifndef USE_FFRT
338 static unsigned int slow_io_work_running;
339
slow_work_thread_threshold(void)340 static unsigned int slow_work_thread_threshold(void) {
341 return (nthreads + 1) / 2;
342 }
343
344
345 /* To avoid deadlock with uv_cancel() it's crucial that the worker
346 * never holds the global mutex and the loop-local mutex at the same time.
347 */
worker(void * arg)348 static void worker(void* arg) {
349 struct uv__work* w;
350 struct uv__queue* q;
351 int is_slow_work;
352
353 uv_sem_post((uv_sem_t*) arg);
354 arg = NULL;
355
356 uv_mutex_lock(&mutex);
357 for (;;) {
358 /* `mutex` should always be locked at this point. */
359
360 /* Keep waiting while either no work is present or only slow I/O
361 and we're at the threshold for that. */
362 while (uv__queue_empty(&wq) ||
363 (uv__queue_head(&wq) == &run_slow_work_message &&
364 uv__queue_next(&run_slow_work_message) == &wq &&
365 slow_io_work_running >= slow_work_thread_threshold())) {
366 idle_threads += 1;
367 uv_cond_wait(&cond, &mutex);
368 idle_threads -= 1;
369 }
370
371 q = uv__queue_head(&wq);
372 if (q == &exit_message) {
373 uv_cond_signal(&cond);
374 uv_mutex_unlock(&mutex);
375 break;
376 }
377
378 uv__queue_remove(q);
379 uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */
380
381 is_slow_work = 0;
382 if (q == &run_slow_work_message) {
383 /* If we're at the slow I/O threshold, re-schedule until after all
384 other work in the queue is done. */
385 if (slow_io_work_running >= slow_work_thread_threshold()) {
386 uv__queue_insert_tail(&wq, q);
387 continue;
388 }
389
390 /* If we encountered a request to run slow I/O work but there is none
391 to run, that means it's cancelled => Start over. */
392 if (uv__queue_empty(&slow_io_pending_wq))
393 continue;
394
395 is_slow_work = 1;
396 slow_io_work_running++;
397
398 q = uv__queue_head(&slow_io_pending_wq);
399 uv__queue_remove(q);
400 uv__queue_init(q);
401
402 /* If there is more slow I/O work, schedule it to be run as well. */
403 if (!uv__queue_empty(&slow_io_pending_wq)) {
404 uv__queue_insert_tail(&wq, &run_slow_work_message);
405 if (idle_threads > 0)
406 uv_cond_signal(&cond);
407 }
408 }
409
410 uv_mutex_unlock(&mutex);
411
412 w = uv__queue_data(q, struct uv__work, wq);
413 #ifdef UV_STATISTIC
414 uv__post_statistic_work(w, WORK_EXECUTING);
415 #endif
416 #ifdef ASYNC_STACKTRACE
417 uv_work_t* req = container_of(w, uv_work_t, work_req);
418 LibuvSetStackId((uint64_t)req->reserved[3]);
419 #endif
420 w->work(w);
421 #ifdef UV_STATISTIC
422 uv__post_statistic_work(w, WORK_END);
423 #endif
424 uv_mutex_lock(&w->loop->wq_mutex);
425 w->work = NULL; /* Signal uv_cancel() that the work req is done
426 executing. */
427 uv__queue_insert_tail(&w->loop->wq, &w->wq);
428 uv_async_send(&w->loop->wq_async);
429 uv_mutex_unlock(&w->loop->wq_mutex);
430
431 /* Lock `mutex` since that is expected at the start of the next
432 * iteration. */
433 uv_mutex_lock(&mutex);
434 if (is_slow_work) {
435 /* `slow_io_work_running` is protected by `mutex`. */
436 slow_io_work_running--;
437 }
438 }
439 }
440 #endif
441
442
post(struct uv__queue * q,enum uv__work_kind kind)443 static void post(struct uv__queue* q, enum uv__work_kind kind) {
444 uv_mutex_lock(&mutex);
445 if (kind == UV__WORK_SLOW_IO) {
446 /* Insert into a separate queue. */
447 uv__queue_insert_tail(&slow_io_pending_wq, q);
448 if (!uv__queue_empty(&run_slow_work_message)) {
449 /* Running slow I/O tasks is already scheduled => Nothing to do here.
450 The worker that runs said other task will schedule this one as well. */
451 uv_mutex_unlock(&mutex);
452 return;
453 }
454 q = &run_slow_work_message;
455 }
456
457 uv__queue_insert_tail(&wq, q);
458 if (idle_threads > 0)
459 uv_cond_signal(&cond);
460 uv_mutex_unlock(&mutex);
461 }
462
463
464 #ifdef __MVS__
465 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */
466 __attribute__((destructor))
467 #endif
uv__threadpool_cleanup(void)468 void uv__threadpool_cleanup(void) {
469 unsigned int i;
470
471 if (nthreads == 0)
472 return;
473
474 #ifndef __MVS__
475 /* TODO(gabylb) - zos: revisit when Woz compiler is available. */
476 post(&exit_message, UV__WORK_CPU);
477 #endif
478
479 for (i = 0; i < nthreads; i++)
480 if (uv_thread_join(threads + i))
481 abort();
482
483 if (threads != default_threads)
484 uv__free(threads);
485
486 uv_mutex_destroy(&mutex);
487 uv_cond_destroy(&cond);
488
489 threads = NULL;
490 nthreads = 0;
491 #ifdef UV_STATISTIC
492 post_statistic_work(&exit_message);
493 uv_thread_join(dump_thread);
494 uv_mutex_destroy(&statistic_mutex);
495 uv_cond_destroy(&dump_cond);
496 #endif
497 }
498
499
500 #ifndef USE_FFRT
init_threads(void)501 static void init_threads(void) {
502 uv_thread_options_t config;
503 unsigned int i;
504 const char* val;
505 uv_sem_t sem;
506
507 nthreads = ARRAY_SIZE(default_threads);
508 val = getenv("UV_THREADPOOL_SIZE");
509 if (val != NULL)
510 nthreads = atoi(val);
511 if (nthreads == 0)
512 nthreads = 1;
513 if (nthreads > MAX_THREADPOOL_SIZE)
514 nthreads = MAX_THREADPOOL_SIZE;
515
516 threads = default_threads;
517 if (nthreads > ARRAY_SIZE(default_threads)) {
518 threads = uv__malloc(nthreads * sizeof(threads[0]));
519 if (threads == NULL) {
520 nthreads = ARRAY_SIZE(default_threads);
521 threads = default_threads;
522 }
523 }
524
525 if (uv_cond_init(&cond))
526 abort();
527
528 if (uv_mutex_init(&mutex))
529 abort();
530
531 uv__queue_init(&wq);
532 uv__queue_init(&slow_io_pending_wq);
533 uv__queue_init(&run_slow_work_message);
534
535 if (uv_sem_init(&sem, 0))
536 abort();
537
538 config.flags = UV_THREAD_HAS_STACK_SIZE;
539 config.stack_size = 8u << 20; /* 8 MB */
540
541 for (i = 0; i < nthreads; i++)
542 if (uv_thread_create_ex(threads + i, &config, worker, &sem))
543 abort();
544
545 for (i = 0; i < nthreads; i++)
546 uv_sem_wait(&sem);
547
548 uv_sem_destroy(&sem);
549 }
550
551
552 #ifndef _WIN32
reset_once(void)553 static void reset_once(void) {
554 uv_once_t child_once = UV_ONCE_INIT;
555 memcpy(&once, &child_once, sizeof(child_once));
556 }
557 #endif
558
559
init_once(void)560 static void init_once(void) {
561 #ifndef _WIN32
562 /* Re-initialize the threadpool after fork.
563 * Note that this discards the global mutex and condition as well
564 * as the work queue.
565 */
566 if (pthread_atfork(NULL, NULL, &reset_once))
567 abort();
568 #endif
569 init_closed_uv_loop_rwlock_once();
570 #ifdef UV_STATISTIC
571 init_work_dump_queue();
572 #endif
573 init_threads();
574 }
575
576
uv__work_submit(uv_loop_t * loop,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))577 void uv__work_submit(uv_loop_t* loop,
578 struct uv__work* w,
579 enum uv__work_kind kind,
580 void (*work)(struct uv__work* w),
581 void (*done)(struct uv__work* w, int status)) {
582 uv_once(&once, init_once);
583 w->loop = loop;
584 w->work = work;
585 w->done = done;
586 post(&w->wq, kind);
587 }
588 #endif
589
590
uv__print_active_reqs(uv_loop_t * loop,const char * flag)591 static void uv__print_active_reqs(uv_loop_t* loop, const char* flag) {
592 #ifdef USE_OHOS_DFX
593 unsigned int count = loop->active_reqs.count;
594 if (count == MIN_REQS_THRESHOLD || count == MIN_REQS_THRESHOLD + CURSOR ||
595 count == MAX_REQS_THRESHOLD || count == MAX_REQS_THRESHOLD + CURSOR) {
596 UV_LOGW("loop:%{public}zu, flag:%{public}s, active reqs:%{public}u", (size_t)loop, flag, count);
597 }
598 #else
599 return;
600 #endif
601 }
602
603
604 #ifdef USE_FFRT
uv__task_done_wrapper(void * work,int status)605 static void uv__task_done_wrapper(void* work, int status) {
606 struct uv__work* w = (struct uv__work*)work;
607 uv__print_active_reqs(w->loop, "complete");
608 w->done(w, status);
609 }
610 #endif
611
612
613 /* TODO(bnoordhuis) teach libuv how to cancel file operations
614 * that go through io_uring instead of the thread pool.
615 */
uv__work_cancel(uv_loop_t * loop,uv_req_t * req,struct uv__work * w)616 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
617 int cancelled;
618
619 rdlock_closed_uv_loop_rwlock();
620 if (!is_uv_loop_good_magic(w->loop)) {
621 rdunlock_closed_uv_loop_rwlock();
622 return 0;
623 }
624
625 #ifndef USE_FFRT
626 uv_mutex_lock(&mutex);
627 uv_mutex_lock(&w->loop->wq_mutex);
628
629 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
630 if (cancelled)
631 uv__queue_remove(&w->wq);
632
633 uv_mutex_unlock(&w->loop->wq_mutex);
634 uv_mutex_unlock(&mutex);
635 #else
636 uv_mutex_lock(&w->loop->wq_mutex);
637 cancelled = !uv__queue_empty(&w->wq) && w->work != NULL
638 && ffrt_executor_task_cancel(w, (ffrt_qos_t)(intptr_t)req->reserved[0]);
639 uv_mutex_unlock(&w->loop->wq_mutex);
640 #endif
641
642 if (!cancelled) {
643 rdunlock_closed_uv_loop_rwlock();
644 return UV_EBUSY;
645 }
646
647 w->work = uv__cancelled;
648 uv_mutex_lock(&loop->wq_mutex);
649 #ifndef USE_FFRT
650 uv__queue_insert_tail(&loop->wq, &w->wq);
651 uv_async_send(&loop->wq_async);
652 #else
653 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(w->loop);
654 int qos = (ffrt_qos_t)(intptr_t)req->reserved[0];
655
656 if (uv_check_data_valid((struct uv_loop_data*)(w->loop->data)) == 0) {
657 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
658 struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data -
659 (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
660 addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
661 } else {
662 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
663 uv_async_send(&loop->wq_async);
664 }
665 #endif
666 uv_mutex_unlock(&loop->wq_mutex);
667 rdunlock_closed_uv_loop_rwlock();
668
669 return 0;
670 }
671
672
uv__work_done(uv_async_t * handle)673 void uv__work_done(uv_async_t* handle) {
674 struct uv__work* w;
675 uv_loop_t* loop;
676 struct uv__queue* q;
677 struct uv__queue wq;
678 int err;
679 int nevents;
680
681 loop = container_of(handle, uv_loop_t, wq_async);
682 #ifdef USE_OHOS_DFX
683 if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) != 0)
684 uv__print_active_reqs(loop, "complete");
685 #endif
686 rdlock_closed_uv_loop_rwlock();
687 if (!is_uv_loop_good_magic(loop)) {
688 rdunlock_closed_uv_loop_rwlock();
689 return;
690 }
691 rdunlock_closed_uv_loop_rwlock();
692
693 uv_mutex_lock(&loop->wq_mutex);
694 #ifndef USE_FFRT
695 uv__queue_move(&loop->wq, &wq);
696 #else
697 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
698 int i;
699 uv__queue_init(&wq);
700 for (i = 3; i >= 0; i--) {
701 if (!uv__queue_empty(&lfields->wq_sub[i])) {
702 uv__queue_append(&lfields->wq_sub[i], &wq);
703 }
704 }
705 #endif
706 uv_mutex_unlock(&loop->wq_mutex);
707
708 nevents = 0;
709 uv_start_trace(UV_TRACE_TAG, UV_TRACE_NAME);
710 while (!uv__queue_empty(&wq)) {
711 q = uv__queue_head(&wq);
712 uv__queue_remove(q);
713
714 w = container_of(q, struct uv__work, wq);
715 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
716 #ifdef UV_STATISTIC
717 uv__post_statistic_work(w, DONE_EXECUTING);
718 struct uv__statistic_work* dump_work = (struct uv__statistic_work*)malloc(sizeof(struct uv__statistic_work));
719 if (dump_work == NULL) {
720 UV_LOGE("malloc(%{public}zu) failed: %{public}s", sizeof(struct uv__statistic_work), strerror(errno));
721 break;
722 }
723 dump_work->info = w->info;
724 dump_work->work = uv__update_work_info;
725 #endif
726 #ifdef ASYNC_STACKTRACE
727 uv_work_t* req = container_of(w, uv_work_t, work_req);
728 LibuvSetStackId((uint64_t)req->reserved[3]);
729 #endif
730 w->done(w, err);
731 nevents++;
732 #ifdef UV_STATISTIC
733 dump_work->time = uv__now_timestamp();
734 dump_work->state = DONE_END;
735 QUEUE_INIT(&dump_work->wq);
736 post_statistic_work(&dump_work->wq);
737 #endif
738 }
739 uv_end_trace(UV_TRACE_TAG);
740
741 /* This check accomplishes 2 things:
742 * 1. Even if the queue was empty, the call to uv__work_done() should count
743 * as an event. Which will have been added by the event loop when
744 * calling this callback.
745 * 2. Prevents accidental wrap around in case nevents == 0 events == 0.
746 */
747 if (nevents > 1) {
748 /* Subtract 1 to counter the call to uv__work_done(). */
749 uv__metrics_inc_events(loop, nevents - 1);
750 if (uv__get_internal_fields(loop)->current_timeout == 0)
751 uv__metrics_inc_events_waiting(loop, nevents - 1);
752 }
753 }
754
755
uv__queue_work(struct uv__work * w)756 static void uv__queue_work(struct uv__work* w) {
757 uv_work_t* req = container_of(w, uv_work_t, work_req);
758
759 req->work_cb(req);
760 }
761
762
uv__queue_done(struct uv__work * w,int err)763 static void uv__queue_done(struct uv__work* w, int err) {
764 uv_work_t* req;
765
766 if (w == NULL) {
767 UV_LOGE("uv_work_t is NULL");
768 return;
769 }
770
771 req = container_of(w, uv_work_t, work_req);
772 uv__req_unregister(req->loop, req);
773
774 if (req->after_work_cb == NULL)
775 return;
776
777 req->after_work_cb(req, err);
778 }
779
780
781 #ifdef USE_FFRT
uv__ffrt_work(ffrt_executor_task_t * data,ffrt_qos_t qos)782 void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos)
783 {
784 struct uv__work* w = (struct uv__work *)data;
785 uv_loop_t* loop = w->loop;
786 #ifdef UV_STATISTIC
787 uv__post_statistic_work(w, WORK_EXECUTING);
788 #endif
789 #ifdef ASYNC_STACKTRACE
790 uv_work_t* req = container_of(w, uv_work_t, work_req);
791 LibuvSetStackId((uint64_t)req->reserved[3]);
792 #endif
793 w->work(w);
794 #ifdef UV_STATISTIC
795 uv__post_statistic_work(w, WORK_END);
796 #endif
797 rdlock_closed_uv_loop_rwlock();
798 if (loop->magic != UV_LOOP_MAGIC) {
799 rdunlock_closed_uv_loop_rwlock();
800 UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid",
801 (size_t)loop, loop->magic);
802 return;
803 }
804
805 uv_mutex_lock(&loop->wq_mutex);
806 w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
807
808 if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) {
809 int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
810 struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data -
811 (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS));
812 addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos);
813 } else {
814 uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop);
815 uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq);
816 uv_async_send(&loop->wq_async);
817 }
818 uv_mutex_unlock(&loop->wq_mutex);
819 rdunlock_closed_uv_loop_rwlock();
820 }
821
init_once(void)822 static void init_once(void)
823 {
824 init_closed_uv_loop_rwlock_once();
825 /* init uv work statics queue */
826 #ifdef UV_STATISTIC
827 init_work_dump_queue();
828 #endif
829 ffrt_executor_task_register_func(uv__ffrt_work, ffrt_uv_task);
830 }
831
832
833 /* ffrt uv__work_submit */
uv__work_submit(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,enum uv__work_kind kind,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))834 void uv__work_submit(uv_loop_t* loop,
835 uv_req_t* req,
836 struct uv__work* w,
837 enum uv__work_kind kind,
838 void (*work)(struct uv__work *w),
839 void (*done)(struct uv__work *w, int status)) {
840 uv_once(&once, init_once);
841 ffrt_task_attr_t attr;
842 ffrt_task_attr_init(&attr);
843
844 switch(kind) {
845 case UV__WORK_CPU:
846 ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
847 break;
848 case UV__WORK_FAST_IO:
849 ffrt_task_attr_set_qos(&attr, ffrt_qos_default);
850 break;
851 case UV__WORK_SLOW_IO:
852 ffrt_task_attr_set_qos(&attr, ffrt_qos_background);
853 break;
854 default:
855 #ifdef USE_OHOS_DFX
856 UV_LOGI("Unknown work kind");
857 #endif
858 return;
859 }
860
861 w->loop = loop;
862 w->work = work;
863 w->done = done;
864
865 req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
866 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
867 ffrt_task_attr_destroy(&attr);
868 }
869
870
871 /* ffrt uv__work_submit */
uv__work_submit_with_qos(uv_loop_t * loop,uv_req_t * req,struct uv__work * w,ffrt_qos_t qos,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))872 void uv__work_submit_with_qos(uv_loop_t* loop,
873 uv_req_t* req,
874 struct uv__work* w,
875 ffrt_qos_t qos,
876 void (*work)(struct uv__work *w),
877 void (*done)(struct uv__work *w, int status)) {
878 uv_once(&once, init_once);
879 ffrt_task_attr_t attr;
880 ffrt_task_attr_init(&attr);
881 ffrt_task_attr_set_qos(&attr, qos);
882
883 w->loop = loop;
884 w->work = work;
885 w->done = done;
886
887 req->reserved[0] = (void *)(intptr_t)ffrt_task_attr_get_qos(&attr);
888 ffrt_executor_task_submit((ffrt_executor_task_t *)w, &attr);
889 ffrt_task_attr_destroy(&attr);
890 }
891 #endif
892
893
uv_queue_work(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb)894 int uv_queue_work(uv_loop_t* loop,
895 uv_work_t* req,
896 uv_work_cb work_cb,
897 uv_after_work_cb after_work_cb) {
898 if (work_cb == NULL)
899 return UV_EINVAL;
900
901 uv__print_active_reqs(loop, "execute");
902 uv__req_init(loop, req, UV_WORK);
903 req->loop = loop;
904 req->work_cb = work_cb;
905 req->after_work_cb = after_work_cb;
906
907 #ifdef UV_STATISTIC
908 struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info));
909 if (info == NULL) {
910 abort();
911 }
912 uv_init_dump_info(info, &req->work_req);
913 info->builtin_return_address[0] = __builtin_return_address(0);
914 info->builtin_return_address[1] = __builtin_return_address(1);
915 info->builtin_return_address[2] = __builtin_return_address(2);
916 (req->work_req).info = info;
917 #endif
918 #ifdef ASYNC_STACKTRACE
919 req->reserved[3] = (void*)LibuvCollectAsyncStack();
920 #endif
921 uv__work_submit(loop,
922 #ifdef USE_FFRT
923 (uv_req_t*)req,
924 #endif
925 &req->work_req,
926 UV__WORK_CPU,
927 uv__queue_work,
928 uv__queue_done
929 );
930 #ifdef UV_STATISTIC
931 uv_queue_statics(info);
932 #endif
933 return 0;
934 }
935
936
uv_queue_work_with_qos(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb,uv_qos_t qos)937 int uv_queue_work_with_qos(uv_loop_t* loop,
938 uv_work_t* req,
939 uv_work_cb work_cb,
940 uv_after_work_cb after_work_cb,
941 uv_qos_t qos) {
942 #ifdef USE_FFRT
943 if (work_cb == NULL)
944 return UV_EINVAL;
945
946 STATIC_ASSERT(uv_qos_background == ffrt_qos_background);
947 STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility);
948 STATIC_ASSERT(uv_qos_default == ffrt_qos_default);
949 STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated);
950 if (qos < ffrt_qos_background || qos > ffrt_qos_user_initiated) {
951 return UV_EINVAL;
952 }
953
954 uv__print_active_reqs(loop, "execute");
955 uv__req_init(loop, req, UV_WORK);
956 req->loop = loop;
957 req->work_cb = work_cb;
958 req->after_work_cb = after_work_cb;
959 #ifdef UV_STATISTIC
960 struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info));
961 if (info == NULL) {
962 abort();
963 }
964 uv_init_dump_info(info, &req->work_req);
965 info->builtin_return_address[0] = __builtin_return_address(0);
966 info->builtin_return_address[1] = __builtin_return_address(1);
967 info->builtin_return_address[2] = __builtin_return_address(2);
968 (req->work_req).info = info;
969 #endif
970 uv__work_submit_with_qos(loop,
971 (uv_req_t*)req,
972 &req->work_req,
973 (ffrt_qos_t)qos,
974 uv__queue_work,
975 uv__queue_done);
976 #ifdef UV_STATISTIC
977 uv_queue_statics(info);
978 #endif
979 return 0;
980 #else
981 return uv_queue_work(loop, req, work_cb, after_work_cb);
982 #endif
983 }
984
985
uv_cancel(uv_req_t * req)986 int uv_cancel(uv_req_t* req) {
987 struct uv__work* wreq;
988 uv_loop_t* loop;
989
990 switch (req->type) {
991 case UV_FS:
992 loop = ((uv_fs_t*) req)->loop;
993 wreq = &((uv_fs_t*) req)->work_req;
994 break;
995 case UV_GETADDRINFO:
996 loop = ((uv_getaddrinfo_t*) req)->loop;
997 wreq = &((uv_getaddrinfo_t*) req)->work_req;
998 break;
999 case UV_GETNAMEINFO:
1000 loop = ((uv_getnameinfo_t*) req)->loop;
1001 wreq = &((uv_getnameinfo_t*) req)->work_req;
1002 break;
1003 case UV_RANDOM:
1004 loop = ((uv_random_t*) req)->loop;
1005 wreq = &((uv_random_t*) req)->work_req;
1006 break;
1007 case UV_WORK:
1008 loop = ((uv_work_t*) req)->loop;
1009 wreq = &((uv_work_t*) req)->work_req;
1010 break;
1011 default:
1012 return UV_EINVAL;
1013 }
1014
1015 return uv__work_cancel(loop, req, wreq);
1016 }
1017