1 // Copyright 2019 The Marl Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "osfiber.h" // Must come first. See osfiber_ucontext.h.
16
17 #include "marl/scheduler.h"
18
19 #include "marl/debug.h"
20 #include "marl/sanitizers.h"
21 #include "marl/thread.h"
22 #include "marl/trace.h"
23
24 #if defined(_WIN32)
25 #include <intrin.h> // __nop()
26 #endif
27
28 // Enable to trace scheduler events.
29 #define ENABLE_TRACE_EVENTS 0
30
31 // Enable to print verbose debug logging.
32 #define ENABLE_DEBUG_LOGGING 0
33
34 #if ENABLE_TRACE_EVENTS
35 #define TRACE(...) MARL_SCOPED_EVENT(__VA_ARGS__)
36 #else
37 #define TRACE(...)
38 #endif
39
40 #if ENABLE_DEBUG_LOGGING
41 #define DBG_LOG(msg, ...) \
42 printf("%.3x " msg "\n", (int)threadID() & 0xfff, __VA_ARGS__)
43 #else
44 #define DBG_LOG(msg, ...)
45 #endif
46
47 #define ASSERT_FIBER_STATE(FIBER, STATE) \
48 MARL_ASSERT(FIBER->state == STATE, \
49 "fiber %d was in state %s, but expected %s", (int)FIBER->id, \
50 Fiber::toString(FIBER->state), Fiber::toString(STATE))
51
52 namespace {
53
54 #if ENABLE_DEBUG_LOGGING
55 // threadID() returns a uint64_t representing the currently executing thread.
56 // threadID() is only intended to be used for debugging purposes.
threadID()57 inline uint64_t threadID() {
58 auto id = std::this_thread::get_id();
59 return std::hash<std::thread::id>()(id);
60 }
61 #endif
62
nop()63 inline void nop() {
64 #if defined(_WIN32)
65 __nop();
66 #else
67 __asm__ __volatile__("nop");
68 #endif
69 }
70
setConfigDefaults(const marl::Scheduler::Config & cfgIn)71 inline marl::Scheduler::Config setConfigDefaults(
72 const marl::Scheduler::Config& cfgIn) {
73 marl::Scheduler::Config cfg{cfgIn};
74 if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) {
75 cfg.workerThread.affinityPolicy = marl::Thread::Affinity::Policy::anyOf(
76 marl::Thread::Affinity::all(cfg.allocator), cfg.allocator);
77 }
78 return cfg;
79 }
80
81 } // anonymous namespace
82
83 namespace marl {
84
85 ////////////////////////////////////////////////////////////////////////////////
86 // Scheduler
87 ////////////////////////////////////////////////////////////////////////////////
88 thread_local Scheduler* Scheduler::bound = nullptr;
89
get()90 Scheduler* Scheduler::get() {
91 return bound;
92 }
93
bind()94 void Scheduler::bind() {
95 #if !MEMORY_SANITIZER_ENABLED
96 // thread_local variables in shared libraries are initialized at load-time,
97 // but this is not observed by MemorySanitizer if the loader itself was not
98 // instrumented, leading to false-positive unitialized variable errors.
99 // See https://github.com/google/marl/issues/184
100 MARL_ASSERT(bound == nullptr, "Scheduler already bound");
101 #endif
102 bound = this;
103 {
104 marl::lock lock(singleThreadedWorkers.mutex);
105 auto worker = cfg.allocator->make_unique<Worker>(
106 this, Worker::Mode::SingleThreaded, -1);
107 worker->start();
108 auto tid = std::this_thread::get_id();
109 singleThreadedWorkers.byTid.emplace(tid, std::move(worker));
110 }
111 }
112
unbind()113 void Scheduler::unbind() {
114 MARL_ASSERT(bound != nullptr, "No scheduler bound");
115 auto worker = Worker::getCurrent();
116 worker->stop();
117 {
118 marl::lock lock(bound->singleThreadedWorkers.mutex);
119 auto tid = std::this_thread::get_id();
120 auto it = bound->singleThreadedWorkers.byTid.find(tid);
121 MARL_ASSERT(it != bound->singleThreadedWorkers.byTid.end(),
122 "singleThreadedWorker not found");
123 MARL_ASSERT(it->second.get() == worker, "worker is not bound?");
124 bound->singleThreadedWorkers.byTid.erase(it);
125 if (bound->singleThreadedWorkers.byTid.empty()) {
126 bound->singleThreadedWorkers.unbind.notify_one();
127 }
128 }
129 bound = nullptr;
130 }
131
Scheduler(const Config & config)132 Scheduler::Scheduler(const Config& config)
133 : cfg(setConfigDefaults(config)),
134 workerThreads{},
135 singleThreadedWorkers(config.allocator) {
136 for (size_t i = 0; i < spinningWorkers.size(); i++) {
137 spinningWorkers[i] = -1;
138 }
139 for (int i = 0; i < cfg.workerThread.count; i++) {
140 workerThreads[i] =
141 cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, i);
142 }
143 for (int i = 0; i < cfg.workerThread.count; i++) {
144 workerThreads[i]->start();
145 }
146 }
147
~Scheduler()148 Scheduler::~Scheduler() {
149 {
150 // Wait until all the single threaded workers have been unbound.
151 marl::lock lock(singleThreadedWorkers.mutex);
152 lock.wait(singleThreadedWorkers.unbind,
153 [this]() REQUIRES(singleThreadedWorkers.mutex) {
154 return singleThreadedWorkers.byTid.empty();
155 });
156 }
157
158 // Release all worker threads.
159 // This will wait for all in-flight tasks to complete before returning.
160 for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
161 workerThreads[i]->stop();
162 }
163 for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
164 cfg.allocator->destroy(workerThreads[i]);
165 }
166 }
167
enqueue(Task && task)168 void Scheduler::enqueue(Task&& task) {
169 if (task.is(Task::Flags::SameThread)) {
170 Worker::getCurrent()->enqueue(std::move(task));
171 return;
172 }
173 if (cfg.workerThread.count > 0) {
174 while (true) {
175 // Prioritize workers that have recently started spinning.
176 auto i = --nextSpinningWorkerIdx % spinningWorkers.size();
177 auto idx = spinningWorkers[i].exchange(-1);
178 if (idx < 0) {
179 // If a spinning worker couldn't be found, round-robin the
180 // workers.
181 idx = nextEnqueueIndex++ % cfg.workerThread.count;
182 }
183
184 auto worker = workerThreads[idx];
185 if (worker->tryLock()) {
186 worker->enqueueAndUnlock(std::move(task));
187 return;
188 }
189 }
190 } else {
191 if (auto worker = Worker::getCurrent()) {
192 worker->enqueue(std::move(task));
193 } else {
194 MARL_FATAL(
195 "singleThreadedWorker not found. Did you forget to call "
196 "marl::Scheduler::bind()?");
197 }
198 }
199 }
200
config() const201 const Scheduler::Config& Scheduler::config() const {
202 return cfg;
203 }
204
stealWork(Worker * thief,uint64_t from,Task & out)205 bool Scheduler::stealWork(Worker* thief, uint64_t from, Task& out) {
206 if (cfg.workerThread.count > 0) {
207 auto thread = workerThreads[from % cfg.workerThread.count];
208 if (thread != thief) {
209 if (thread->steal(out)) {
210 return true;
211 }
212 }
213 }
214 return false;
215 }
216
onBeginSpinning(int workerId)217 void Scheduler::onBeginSpinning(int workerId) {
218 auto idx = nextSpinningWorkerIdx++ % spinningWorkers.size();
219 spinningWorkers[idx] = workerId;
220 }
221
222 ////////////////////////////////////////////////////////////////////////////////
223 // Scheduler::Config
224 ////////////////////////////////////////////////////////////////////////////////
allCores()225 Scheduler::Config Scheduler::Config::allCores() {
226 return Config().setWorkerThreadCount(Thread::numLogicalCPUs());
227 }
228
229 ////////////////////////////////////////////////////////////////////////////////
230 // Scheduler::Fiber
231 ////////////////////////////////////////////////////////////////////////////////
Fiber(Allocator::unique_ptr<OSFiber> && impl,uint32_t id)232 Scheduler::Fiber::Fiber(Allocator::unique_ptr<OSFiber>&& impl, uint32_t id)
233 : id(id), impl(std::move(impl)), worker(Worker::getCurrent()) {
234 MARL_ASSERT(worker != nullptr, "No Scheduler::Worker bound");
235 }
236
237 // TODO(chromium:1211047): Testing the static thread_local Worker::current for
238 // null causes a MemorySantizer false positive.
239 CLANG_NO_SANITIZE_MEMORY
current()240 Scheduler::Fiber* Scheduler::Fiber::current() {
241 auto worker = Worker::getCurrent();
242 return worker != nullptr ? worker->getCurrentFiber() : nullptr;
243 }
244
notify()245 void Scheduler::Fiber::notify() {
246 worker->enqueue(this);
247 }
248
wait(marl::lock & lock,const Predicate & pred)249 void Scheduler::Fiber::wait(marl::lock& lock, const Predicate& pred) {
250 MARL_ASSERT(worker == Worker::getCurrent(),
251 "Scheduler::Fiber::wait() must only be called on the currently "
252 "executing fiber");
253 worker->wait(lock, nullptr, pred);
254 }
255
switchTo(Fiber * to)256 void Scheduler::Fiber::switchTo(Fiber* to) {
257 MARL_ASSERT(worker == Worker::getCurrent(),
258 "Scheduler::Fiber::switchTo() must only be called on the "
259 "currently executing fiber");
260 if (to != this) {
261 impl->switchTo(to->impl.get());
262 }
263 }
264
create(Allocator * allocator,uint32_t id,size_t stackSize,const std::function<void ()> & func)265 Allocator::unique_ptr<Scheduler::Fiber> Scheduler::Fiber::create(
266 Allocator* allocator,
267 uint32_t id,
268 size_t stackSize,
269 const std::function<void()>& func) {
270 return allocator->make_unique<Fiber>(
271 OSFiber::createFiber(allocator, stackSize, func), id);
272 }
273
274 Allocator::unique_ptr<Scheduler::Fiber>
createFromCurrentThread(Allocator * allocator,uint32_t id)275 Scheduler::Fiber::createFromCurrentThread(Allocator* allocator, uint32_t id) {
276 return allocator->make_unique<Fiber>(
277 OSFiber::createFiberFromCurrentThread(allocator), id);
278 }
279
toString(State state)280 const char* Scheduler::Fiber::toString(State state) {
281 switch (state) {
282 case State::Idle:
283 return "Idle";
284 case State::Yielded:
285 return "Yielded";
286 case State::Queued:
287 return "Queued";
288 case State::Running:
289 return "Running";
290 case State::Waiting:
291 return "Waiting";
292 }
293 MARL_ASSERT(false, "bad fiber state");
294 return "<unknown>";
295 }
296
297 ////////////////////////////////////////////////////////////////////////////////
298 // Scheduler::WaitingFibers
299 ////////////////////////////////////////////////////////////////////////////////
WaitingFibers(Allocator * allocator)300 Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator)
301 : timeouts(allocator), fibers(allocator) {}
302
operator bool() const303 Scheduler::WaitingFibers::operator bool() const {
304 return !fibers.empty();
305 }
306
take(const TimePoint & timeout)307 Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timeout) {
308 if (!*this) {
309 return nullptr;
310 }
311 auto it = timeouts.begin();
312 if (timeout < it->timepoint) {
313 return nullptr;
314 }
315 auto fiber = it->fiber;
316 timeouts.erase(it);
317 auto deleted = fibers.erase(fiber) != 0;
318 (void)deleted;
319 MARL_ASSERT(deleted, "WaitingFibers::take() maps out of sync");
320 return fiber;
321 }
322
next() const323 Scheduler::TimePoint Scheduler::WaitingFibers::next() const {
324 MARL_ASSERT(*this,
325 "WaitingFibers::next() called when there' no waiting fibers");
326 return timeouts.begin()->timepoint;
327 }
328
add(const TimePoint & timeout,Fiber * fiber)329 void Scheduler::WaitingFibers::add(const TimePoint& timeout, Fiber* fiber) {
330 timeouts.emplace(Timeout{timeout, fiber});
331 bool added = fibers.emplace(fiber, timeout).second;
332 (void)added;
333 MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
334 }
335
erase(Fiber * fiber)336 void Scheduler::WaitingFibers::erase(Fiber* fiber) {
337 auto it = fibers.find(fiber);
338 if (it != fibers.end()) {
339 auto timeout = it->second;
340 auto erased = timeouts.erase(Timeout{timeout, fiber}) != 0;
341 (void)erased;
342 MARL_ASSERT(erased, "WaitingFibers::erase() maps out of sync");
343 fibers.erase(it);
344 }
345 }
346
contains(Fiber * fiber) const347 bool Scheduler::WaitingFibers::contains(Fiber* fiber) const {
348 return fibers.count(fiber) != 0;
349 }
350
operator <(const Timeout & o) const351 bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const {
352 if (timepoint != o.timepoint) {
353 return timepoint < o.timepoint;
354 }
355 return fiber < o.fiber;
356 }
357
358 ////////////////////////////////////////////////////////////////////////////////
359 // Scheduler::Worker
360 ////////////////////////////////////////////////////////////////////////////////
361 thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
362
Worker(Scheduler * scheduler,Mode mode,uint32_t id)363 Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)
364 : id(id),
365 mode(mode),
366 scheduler(scheduler),
367 work(scheduler->cfg.allocator),
368 idleFibers(scheduler->cfg.allocator) {}
369
start()370 void Scheduler::Worker::start() {
371 switch (mode) {
372 case Mode::MultiThreaded: {
373 auto allocator = scheduler->cfg.allocator;
374 auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
375 auto affinity = affinityPolicy->get(id, allocator);
376 thread = Thread(std::move(affinity), [=] {
377 Thread::setName("Thread<%.2d>", int(id));
378
379 if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
380 initFunc(id);
381 }
382
383 Scheduler::bound = scheduler;
384 Worker::current = this;
385 mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
386 currentFiber = mainFiber.get();
387 {
388 marl::lock lock(work.mutex);
389 run();
390 }
391 mainFiber.reset();
392 Worker::current = nullptr;
393 });
394 break;
395 }
396 case Mode::SingleThreaded: {
397 Worker::current = this;
398 mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
399 currentFiber = mainFiber.get();
400 break;
401 }
402 default:
403 MARL_ASSERT(false, "Unknown mode: %d", int(mode));
404 }
405 }
406
stop()407 void Scheduler::Worker::stop() {
408 switch (mode) {
409 case Mode::MultiThreaded: {
410 enqueue(Task([this] { shutdown = true; }, Task::Flags::SameThread));
411 thread.join();
412 break;
413 }
414 case Mode::SingleThreaded: {
415 marl::lock lock(work.mutex);
416 shutdown = true;
417 runUntilShutdown();
418 Worker::current = nullptr;
419 break;
420 }
421 default:
422 MARL_ASSERT(false, "Unknown mode: %d", int(mode));
423 }
424 }
425
wait(const TimePoint * timeout)426 bool Scheduler::Worker::wait(const TimePoint* timeout) {
427 DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
428 {
429 marl::lock lock(work.mutex);
430 suspend(timeout);
431 }
432 return timeout == nullptr || std::chrono::system_clock::now() < *timeout;
433 }
434
wait(lock & waitLock,const TimePoint * timeout,const Predicate & pred)435 bool Scheduler::Worker::wait(lock& waitLock,
436 const TimePoint* timeout,
437 const Predicate& pred) {
438 DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
439 while (!pred()) {
440 // Lock the work mutex to call suspend().
441 work.mutex.lock();
442
443 // Unlock the wait mutex with the work mutex lock held.
444 // Order is important here as we need to ensure that the fiber is not
445 // enqueued (via Fiber::notify()) between the waitLock.unlock() and fiber
446 // switch, otherwise the Fiber::notify() call may be ignored and the fiber
447 // is never woken.
448 waitLock.unlock_no_tsa();
449
450 // suspend the fiber.
451 suspend(timeout);
452
453 // Fiber resumed. We don't need the work mutex locked any more.
454 work.mutex.unlock();
455
456 // Re-lock to either return due to timeout, or call pred().
457 waitLock.lock_no_tsa();
458
459 // Check timeout.
460 if (timeout != nullptr && std::chrono::system_clock::now() >= *timeout) {
461 return false;
462 }
463
464 // Spurious wake up. Spin again.
465 }
466 return true;
467 }
468
suspend(const std::chrono::system_clock::time_point * timeout)469 void Scheduler::Worker::suspend(
470 const std::chrono::system_clock::time_point* timeout) {
471 // Current fiber is yielding as it is blocked.
472 if (timeout != nullptr) {
473 changeFiberState(currentFiber, Fiber::State::Running,
474 Fiber::State::Waiting);
475 work.waiting.add(*timeout, currentFiber);
476 } else {
477 changeFiberState(currentFiber, Fiber::State::Running,
478 Fiber::State::Yielded);
479 }
480
481 // First wait until there's something else this worker can do.
482 waitForWork();
483
484 work.numBlockedFibers++;
485
486 if (!work.fibers.empty()) {
487 // There's another fiber that has become unblocked, resume that.
488 work.num--;
489 auto to = containers::take(work.fibers);
490 ASSERT_FIBER_STATE(to, Fiber::State::Queued);
491 switchToFiber(to);
492 } else if (!idleFibers.empty()) {
493 // There's an old fiber we can reuse, resume that.
494 auto to = containers::take(idleFibers);
495 ASSERT_FIBER_STATE(to, Fiber::State::Idle);
496 switchToFiber(to);
497 } else {
498 // Tasks to process and no existing fibers to resume.
499 // Spawn a new fiber.
500 switchToFiber(createWorkerFiber());
501 }
502
503 work.numBlockedFibers--;
504
505 setFiberState(currentFiber, Fiber::State::Running);
506 }
507
tryLock()508 bool Scheduler::Worker::tryLock() {
509 return work.mutex.try_lock();
510 }
511
enqueue(Fiber * fiber)512 void Scheduler::Worker::enqueue(Fiber* fiber) {
513 bool notify = false;
514 {
515 marl::lock lock(work.mutex);
516 DBG_LOG("%d: ENQUEUE(%d %s)", (int)id, (int)fiber->id,
517 Fiber::toString(fiber->state));
518 switch (fiber->state) {
519 case Fiber::State::Running:
520 case Fiber::State::Queued:
521 return; // Nothing to do here - task is already queued or running.
522 case Fiber::State::Waiting:
523 work.waiting.erase(fiber);
524 break;
525 case Fiber::State::Idle:
526 case Fiber::State::Yielded:
527 break;
528 }
529 notify = work.notifyAdded;
530 work.fibers.push_back(fiber);
531 MARL_ASSERT(!work.waiting.contains(fiber),
532 "fiber is unexpectedly in the waiting list");
533 setFiberState(fiber, Fiber::State::Queued);
534 work.num++;
535 }
536
537 if (notify) {
538 work.added.notify_one();
539 }
540 }
541
enqueue(Task && task)542 void Scheduler::Worker::enqueue(Task&& task) {
543 work.mutex.lock();
544 enqueueAndUnlock(std::move(task));
545 }
546
enqueueAndUnlock(Task && task)547 void Scheduler::Worker::enqueueAndUnlock(Task&& task) {
548 auto notify = work.notifyAdded;
549 work.tasks.push_back(std::move(task));
550 work.num++;
551 work.mutex.unlock();
552 if (notify) {
553 work.added.notify_one();
554 }
555 }
556
steal(Task & out)557 bool Scheduler::Worker::steal(Task& out) {
558 if (work.num.load() == 0) {
559 return false;
560 }
561 if (!work.mutex.try_lock()) {
562 return false;
563 }
564 if (work.tasks.empty() || work.tasks.front().is(Task::Flags::SameThread)) {
565 work.mutex.unlock();
566 return false;
567 }
568 work.num--;
569 out = containers::take(work.tasks);
570 work.mutex.unlock();
571 return true;
572 }
573
run()574 void Scheduler::Worker::run() {
575 if (mode == Mode::MultiThreaded) {
576 MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id);
577 // This is the entry point for a multi-threaded worker.
578 // Start with a regular condition-variable wait for work. This avoids
579 // starting the thread with a spinForWork().
580 work.wait([this]() REQUIRES(work.mutex) {
581 return work.num > 0 || work.waiting || shutdown;
582 });
583 }
584 ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
585 runUntilShutdown();
586 switchToFiber(mainFiber.get());
587 }
588
runUntilShutdown()589 void Scheduler::Worker::runUntilShutdown() {
590 while (!shutdown || work.num > 0 || work.numBlockedFibers > 0U) {
591 waitForWork();
592 runUntilIdle();
593 }
594 }
595
waitForWork()596 void Scheduler::Worker::waitForWork() {
597 MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
598 "work.num out of sync");
599 if (work.num > 0) {
600 return;
601 }
602
603 if (mode == Mode::MultiThreaded) {
604 scheduler->onBeginSpinning(id);
605 work.mutex.unlock();
606 spinForWork();
607 work.mutex.lock();
608 }
609
610 work.wait([this]() REQUIRES(work.mutex) {
611 return work.num > 0 || (shutdown && work.numBlockedFibers == 0U);
612 });
613 if (work.waiting) {
614 enqueueFiberTimeouts();
615 }
616 }
617
enqueueFiberTimeouts()618 void Scheduler::Worker::enqueueFiberTimeouts() {
619 auto now = std::chrono::system_clock::now();
620 while (auto fiber = work.waiting.take(now)) {
621 changeFiberState(fiber, Fiber::State::Waiting, Fiber::State::Queued);
622 DBG_LOG("%d: TIMEOUT(%d)", (int)id, (int)fiber->id);
623 work.fibers.push_back(fiber);
624 work.num++;
625 }
626 }
627
changeFiberState(Fiber * fiber,Fiber::State from,Fiber::State to) const628 void Scheduler::Worker::changeFiberState(Fiber* fiber,
629 Fiber::State from,
630 Fiber::State to) const {
631 (void)from; // Unusued parameter when ENABLE_DEBUG_LOGGING is disabled.
632 DBG_LOG("%d: CHANGE_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
633 Fiber::toString(from), Fiber::toString(to));
634 ASSERT_FIBER_STATE(fiber, from);
635 fiber->state = to;
636 }
637
setFiberState(Fiber * fiber,Fiber::State to) const638 void Scheduler::Worker::setFiberState(Fiber* fiber, Fiber::State to) const {
639 DBG_LOG("%d: SET_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
640 Fiber::toString(fiber->state), Fiber::toString(to));
641 fiber->state = to;
642 }
643
spinForWork()644 void Scheduler::Worker::spinForWork() {
645 TRACE("SPIN");
646 Task stolen;
647
648 constexpr auto duration = std::chrono::milliseconds(1);
649 auto start = std::chrono::high_resolution_clock::now();
650 while (std::chrono::high_resolution_clock::now() - start < duration) {
651 for (int i = 0; i < 256; i++) // Empirically picked magic number!
652 {
653 // clang-format off
654 nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
655 nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
656 nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
657 nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
658 // clang-format on
659 if (work.num > 0) {
660 return;
661 }
662 }
663
664 if (scheduler->stealWork(this, rng(), stolen)) {
665 marl::lock lock(work.mutex);
666 work.tasks.emplace_back(std::move(stolen));
667 work.num++;
668 return;
669 }
670
671 std::this_thread::yield();
672 }
673 }
674
runUntilIdle()675 void Scheduler::Worker::runUntilIdle() {
676 ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
677 MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
678 "work.num out of sync");
679 while (!work.fibers.empty() || !work.tasks.empty()) {
680 // Note: we cannot take and store on the stack more than a single fiber
681 // or task at a time, as the Fiber may yield and these items may get
682 // held on suspended fiber stack.
683
684 while (!work.fibers.empty()) {
685 work.num--;
686 auto fiber = containers::take(work.fibers);
687 // Sanity checks,
688 MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle");
689 MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running");
690 ASSERT_FIBER_STATE(fiber, Fiber::State::Queued);
691
692 changeFiberState(currentFiber, Fiber::State::Running, Fiber::State::Idle);
693 auto added = idleFibers.emplace(currentFiber).second;
694 (void)added;
695 MARL_ASSERT(added, "fiber already idle");
696
697 switchToFiber(fiber);
698 changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running);
699 }
700
701 if (!work.tasks.empty()) {
702 work.num--;
703 auto task = containers::take(work.tasks);
704 work.mutex.unlock();
705
706 // Run the task.
707 task();
708
709 // std::function<> can carry arguments with complex destructors.
710 // Ensure these are destructed outside of the lock.
711 task = Task();
712
713 work.mutex.lock();
714 }
715 }
716 }
717
createWorkerFiber()718 Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
719 auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
720 DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
721 auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId,
722 scheduler->cfg.fiberStackSize,
723 [&]() REQUIRES(work.mutex) { run(); });
724 auto ptr = fiber.get();
725 workerFibers.emplace_back(std::move(fiber));
726 return ptr;
727 }
728
switchToFiber(Fiber * to)729 void Scheduler::Worker::switchToFiber(Fiber* to) {
730 DBG_LOG("%d: SWITCH(%d -> %d)", (int)id, (int)currentFiber->id, (int)to->id);
731 MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
732 "switching to idle fiber");
733 auto from = currentFiber;
734 currentFiber = to;
735 from->switchTo(to);
736 }
737
738 ////////////////////////////////////////////////////////////////////////////////
739 // Scheduler::Worker::Work
740 ////////////////////////////////////////////////////////////////////////////////
Work(Allocator * allocator)741 Scheduler::Worker::Work::Work(Allocator* allocator)
742 : tasks(allocator), fibers(allocator), waiting(allocator) {}
743
744 template <typename F>
wait(F && f)745 void Scheduler::Worker::Work::wait(F&& f) {
746 notifyAdded = true;
747 if (waiting) {
748 mutex.wait_until_locked(added, waiting.next(), f);
749 } else {
750 mutex.wait_locked(added, f);
751 }
752 notifyAdded = false;
753 }
754
755 ////////////////////////////////////////////////////////////////////////////////
756 // Scheduler::Worker::Work
757 ////////////////////////////////////////////////////////////////////////////////
SingleThreadedWorkers(Allocator * allocator)758 Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator)
759 : byTid(allocator) {}
760
761 } // namespace marl
762