• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Marl Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "osfiber.h"  // Must come first. See osfiber_ucontext.h.
16 
17 #include "marl/scheduler.h"
18 
19 #include "marl/debug.h"
20 #include "marl/sanitizers.h"
21 #include "marl/thread.h"
22 #include "marl/trace.h"
23 
24 #if defined(_WIN32)
25 #include <intrin.h>  // __nop()
26 #endif
27 
28 // Enable to trace scheduler events.
29 #define ENABLE_TRACE_EVENTS 0
30 
31 // Enable to print verbose debug logging.
32 #define ENABLE_DEBUG_LOGGING 0
33 
34 #if ENABLE_TRACE_EVENTS
35 #define TRACE(...) MARL_SCOPED_EVENT(__VA_ARGS__)
36 #else
37 #define TRACE(...)
38 #endif
39 
40 #if ENABLE_DEBUG_LOGGING
41 #define DBG_LOG(msg, ...) \
42   printf("%.3x " msg "\n", (int)threadID() & 0xfff, __VA_ARGS__)
43 #else
44 #define DBG_LOG(msg, ...)
45 #endif
46 
47 #define ASSERT_FIBER_STATE(FIBER, STATE)                                   \
48   MARL_ASSERT(FIBER->state == STATE,                                       \
49               "fiber %d was in state %s, but expected %s", (int)FIBER->id, \
50               Fiber::toString(FIBER->state), Fiber::toString(STATE))
51 
52 namespace {
53 
54 #if ENABLE_DEBUG_LOGGING
55 // threadID() returns a uint64_t representing the currently executing thread.
56 // threadID() is only intended to be used for debugging purposes.
threadID()57 inline uint64_t threadID() {
58   auto id = std::this_thread::get_id();
59   return std::hash<std::thread::id>()(id);
60 }
61 #endif
62 
nop()63 inline void nop() {
64 #if defined(_WIN32)
65   __nop();
66 #else
67   __asm__ __volatile__("nop");
68 #endif
69 }
70 
setConfigDefaults(const marl::Scheduler::Config & cfgIn)71 inline marl::Scheduler::Config setConfigDefaults(
72     const marl::Scheduler::Config& cfgIn) {
73   marl::Scheduler::Config cfg{cfgIn};
74   if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) {
75     cfg.workerThread.affinityPolicy = marl::Thread::Affinity::Policy::anyOf(
76         marl::Thread::Affinity::all(cfg.allocator), cfg.allocator);
77   }
78   return cfg;
79 }
80 
81 }  // anonymous namespace
82 
83 namespace marl {
84 
85 ////////////////////////////////////////////////////////////////////////////////
86 // Scheduler
87 ////////////////////////////////////////////////////////////////////////////////
88 thread_local Scheduler* Scheduler::bound = nullptr;
89 
get()90 Scheduler* Scheduler::get() {
91   return bound;
92 }
93 
bind()94 void Scheduler::bind() {
95 #if !MEMORY_SANITIZER_ENABLED
96   // thread_local variables in shared libraries are initialized at load-time,
97   // but this is not observed by MemorySanitizer if the loader itself was not
98   // instrumented, leading to false-positive unitialized variable errors.
99   // See https://github.com/google/marl/issues/184
100   MARL_ASSERT(bound == nullptr, "Scheduler already bound");
101 #endif
102   bound = this;
103   {
104     marl::lock lock(singleThreadedWorkers.mutex);
105     auto worker = cfg.allocator->make_unique<Worker>(
106         this, Worker::Mode::SingleThreaded, -1);
107     worker->start();
108     auto tid = std::this_thread::get_id();
109     singleThreadedWorkers.byTid.emplace(tid, std::move(worker));
110   }
111 }
112 
unbind()113 void Scheduler::unbind() {
114   MARL_ASSERT(bound != nullptr, "No scheduler bound");
115   auto worker = Worker::getCurrent();
116   worker->stop();
117   {
118     marl::lock lock(bound->singleThreadedWorkers.mutex);
119     auto tid = std::this_thread::get_id();
120     auto it = bound->singleThreadedWorkers.byTid.find(tid);
121     MARL_ASSERT(it != bound->singleThreadedWorkers.byTid.end(),
122                 "singleThreadedWorker not found");
123     MARL_ASSERT(it->second.get() == worker, "worker is not bound?");
124     bound->singleThreadedWorkers.byTid.erase(it);
125     if (bound->singleThreadedWorkers.byTid.empty()) {
126       bound->singleThreadedWorkers.unbind.notify_one();
127     }
128   }
129   bound = nullptr;
130 }
131 
Scheduler(const Config & config)132 Scheduler::Scheduler(const Config& config)
133     : cfg(setConfigDefaults(config)),
134       workerThreads{},
135       singleThreadedWorkers(config.allocator) {
136   for (size_t i = 0; i < spinningWorkers.size(); i++) {
137     spinningWorkers[i] = -1;
138   }
139   for (int i = 0; i < cfg.workerThread.count; i++) {
140     workerThreads[i] =
141         cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, i);
142   }
143   for (int i = 0; i < cfg.workerThread.count; i++) {
144     workerThreads[i]->start();
145   }
146 }
147 
~Scheduler()148 Scheduler::~Scheduler() {
149   {
150     // Wait until all the single threaded workers have been unbound.
151     marl::lock lock(singleThreadedWorkers.mutex);
152     lock.wait(singleThreadedWorkers.unbind,
153               [this]() REQUIRES(singleThreadedWorkers.mutex) {
154                 return singleThreadedWorkers.byTid.empty();
155               });
156   }
157 
158   // Release all worker threads.
159   // This will wait for all in-flight tasks to complete before returning.
160   for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
161     workerThreads[i]->stop();
162   }
163   for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
164     cfg.allocator->destroy(workerThreads[i]);
165   }
166 }
167 
enqueue(Task && task)168 void Scheduler::enqueue(Task&& task) {
169   if (task.is(Task::Flags::SameThread)) {
170     Worker::getCurrent()->enqueue(std::move(task));
171     return;
172   }
173   if (cfg.workerThread.count > 0) {
174     while (true) {
175       // Prioritize workers that have recently started spinning.
176       auto i = --nextSpinningWorkerIdx % spinningWorkers.size();
177       auto idx = spinningWorkers[i].exchange(-1);
178       if (idx < 0) {
179         // If a spinning worker couldn't be found, round-robin the
180         // workers.
181         idx = nextEnqueueIndex++ % cfg.workerThread.count;
182       }
183 
184       auto worker = workerThreads[idx];
185       if (worker->tryLock()) {
186         worker->enqueueAndUnlock(std::move(task));
187         return;
188       }
189     }
190   } else {
191     if (auto worker = Worker::getCurrent()) {
192       worker->enqueue(std::move(task));
193     } else {
194       MARL_FATAL(
195           "singleThreadedWorker not found. Did you forget to call "
196           "marl::Scheduler::bind()?");
197     }
198   }
199 }
200 
config() const201 const Scheduler::Config& Scheduler::config() const {
202   return cfg;
203 }
204 
stealWork(Worker * thief,uint64_t from,Task & out)205 bool Scheduler::stealWork(Worker* thief, uint64_t from, Task& out) {
206   if (cfg.workerThread.count > 0) {
207     auto thread = workerThreads[from % cfg.workerThread.count];
208     if (thread != thief) {
209       if (thread->steal(out)) {
210         return true;
211       }
212     }
213   }
214   return false;
215 }
216 
onBeginSpinning(int workerId)217 void Scheduler::onBeginSpinning(int workerId) {
218   auto idx = nextSpinningWorkerIdx++ % spinningWorkers.size();
219   spinningWorkers[idx] = workerId;
220 }
221 
222 ////////////////////////////////////////////////////////////////////////////////
223 // Scheduler::Config
224 ////////////////////////////////////////////////////////////////////////////////
allCores()225 Scheduler::Config Scheduler::Config::allCores() {
226   return Config().setWorkerThreadCount(Thread::numLogicalCPUs());
227 }
228 
229 ////////////////////////////////////////////////////////////////////////////////
230 // Scheduler::Fiber
231 ////////////////////////////////////////////////////////////////////////////////
Fiber(Allocator::unique_ptr<OSFiber> && impl,uint32_t id)232 Scheduler::Fiber::Fiber(Allocator::unique_ptr<OSFiber>&& impl, uint32_t id)
233     : id(id), impl(std::move(impl)), worker(Worker::getCurrent()) {
234   MARL_ASSERT(worker != nullptr, "No Scheduler::Worker bound");
235 }
236 
237 // TODO(chromium:1211047): Testing the static thread_local Worker::current for
238 // null causes a MemorySantizer false positive.
239 CLANG_NO_SANITIZE_MEMORY
current()240 Scheduler::Fiber* Scheduler::Fiber::current() {
241   auto worker = Worker::getCurrent();
242   return worker != nullptr ? worker->getCurrentFiber() : nullptr;
243 }
244 
notify()245 void Scheduler::Fiber::notify() {
246   worker->enqueue(this);
247 }
248 
wait(marl::lock & lock,const Predicate & pred)249 void Scheduler::Fiber::wait(marl::lock& lock, const Predicate& pred) {
250   MARL_ASSERT(worker == Worker::getCurrent(),
251               "Scheduler::Fiber::wait() must only be called on the currently "
252               "executing fiber");
253   worker->wait(lock, nullptr, pred);
254 }
255 
switchTo(Fiber * to)256 void Scheduler::Fiber::switchTo(Fiber* to) {
257   MARL_ASSERT(worker == Worker::getCurrent(),
258               "Scheduler::Fiber::switchTo() must only be called on the "
259               "currently executing fiber");
260   if (to != this) {
261     impl->switchTo(to->impl.get());
262   }
263 }
264 
create(Allocator * allocator,uint32_t id,size_t stackSize,const std::function<void ()> & func)265 Allocator::unique_ptr<Scheduler::Fiber> Scheduler::Fiber::create(
266     Allocator* allocator,
267     uint32_t id,
268     size_t stackSize,
269     const std::function<void()>& func) {
270   return allocator->make_unique<Fiber>(
271       OSFiber::createFiber(allocator, stackSize, func), id);
272 }
273 
274 Allocator::unique_ptr<Scheduler::Fiber>
createFromCurrentThread(Allocator * allocator,uint32_t id)275 Scheduler::Fiber::createFromCurrentThread(Allocator* allocator, uint32_t id) {
276   return allocator->make_unique<Fiber>(
277       OSFiber::createFiberFromCurrentThread(allocator), id);
278 }
279 
toString(State state)280 const char* Scheduler::Fiber::toString(State state) {
281   switch (state) {
282     case State::Idle:
283       return "Idle";
284     case State::Yielded:
285       return "Yielded";
286     case State::Queued:
287       return "Queued";
288     case State::Running:
289       return "Running";
290     case State::Waiting:
291       return "Waiting";
292   }
293   MARL_ASSERT(false, "bad fiber state");
294   return "<unknown>";
295 }
296 
297 ////////////////////////////////////////////////////////////////////////////////
298 // Scheduler::WaitingFibers
299 ////////////////////////////////////////////////////////////////////////////////
WaitingFibers(Allocator * allocator)300 Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator)
301     : timeouts(allocator), fibers(allocator) {}
302 
operator bool() const303 Scheduler::WaitingFibers::operator bool() const {
304   return !fibers.empty();
305 }
306 
take(const TimePoint & timeout)307 Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timeout) {
308   if (!*this) {
309     return nullptr;
310   }
311   auto it = timeouts.begin();
312   if (timeout < it->timepoint) {
313     return nullptr;
314   }
315   auto fiber = it->fiber;
316   timeouts.erase(it);
317   auto deleted = fibers.erase(fiber) != 0;
318   (void)deleted;
319   MARL_ASSERT(deleted, "WaitingFibers::take() maps out of sync");
320   return fiber;
321 }
322 
next() const323 Scheduler::TimePoint Scheduler::WaitingFibers::next() const {
324   MARL_ASSERT(*this,
325               "WaitingFibers::next() called when there' no waiting fibers");
326   return timeouts.begin()->timepoint;
327 }
328 
add(const TimePoint & timeout,Fiber * fiber)329 void Scheduler::WaitingFibers::add(const TimePoint& timeout, Fiber* fiber) {
330   timeouts.emplace(Timeout{timeout, fiber});
331   bool added = fibers.emplace(fiber, timeout).second;
332   (void)added;
333   MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
334 }
335 
erase(Fiber * fiber)336 void Scheduler::WaitingFibers::erase(Fiber* fiber) {
337   auto it = fibers.find(fiber);
338   if (it != fibers.end()) {
339     auto timeout = it->second;
340     auto erased = timeouts.erase(Timeout{timeout, fiber}) != 0;
341     (void)erased;
342     MARL_ASSERT(erased, "WaitingFibers::erase() maps out of sync");
343     fibers.erase(it);
344   }
345 }
346 
contains(Fiber * fiber) const347 bool Scheduler::WaitingFibers::contains(Fiber* fiber) const {
348   return fibers.count(fiber) != 0;
349 }
350 
operator <(const Timeout & o) const351 bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const {
352   if (timepoint != o.timepoint) {
353     return timepoint < o.timepoint;
354   }
355   return fiber < o.fiber;
356 }
357 
358 ////////////////////////////////////////////////////////////////////////////////
359 // Scheduler::Worker
360 ////////////////////////////////////////////////////////////////////////////////
361 thread_local Scheduler::Worker* Scheduler::Worker::current = nullptr;
362 
Worker(Scheduler * scheduler,Mode mode,uint32_t id)363 Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)
364     : id(id),
365       mode(mode),
366       scheduler(scheduler),
367       work(scheduler->cfg.allocator),
368       idleFibers(scheduler->cfg.allocator) {}
369 
start()370 void Scheduler::Worker::start() {
371   switch (mode) {
372     case Mode::MultiThreaded: {
373       auto allocator = scheduler->cfg.allocator;
374       auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
375       auto affinity = affinityPolicy->get(id, allocator);
376       thread = Thread(std::move(affinity), [=] {
377         Thread::setName("Thread<%.2d>", int(id));
378 
379         if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
380           initFunc(id);
381         }
382 
383         Scheduler::bound = scheduler;
384         Worker::current = this;
385         mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
386         currentFiber = mainFiber.get();
387         {
388           marl::lock lock(work.mutex);
389           run();
390         }
391         mainFiber.reset();
392         Worker::current = nullptr;
393       });
394       break;
395     }
396     case Mode::SingleThreaded: {
397       Worker::current = this;
398       mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
399       currentFiber = mainFiber.get();
400       break;
401     }
402     default:
403       MARL_ASSERT(false, "Unknown mode: %d", int(mode));
404   }
405 }
406 
stop()407 void Scheduler::Worker::stop() {
408   switch (mode) {
409     case Mode::MultiThreaded: {
410       enqueue(Task([this] { shutdown = true; }, Task::Flags::SameThread));
411       thread.join();
412       break;
413     }
414     case Mode::SingleThreaded: {
415       marl::lock lock(work.mutex);
416       shutdown = true;
417       runUntilShutdown();
418       Worker::current = nullptr;
419       break;
420     }
421     default:
422       MARL_ASSERT(false, "Unknown mode: %d", int(mode));
423   }
424 }
425 
wait(const TimePoint * timeout)426 bool Scheduler::Worker::wait(const TimePoint* timeout) {
427   DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
428   {
429     marl::lock lock(work.mutex);
430     suspend(timeout);
431   }
432   return timeout == nullptr || std::chrono::system_clock::now() < *timeout;
433 }
434 
wait(lock & waitLock,const TimePoint * timeout,const Predicate & pred)435 bool Scheduler::Worker::wait(lock& waitLock,
436                              const TimePoint* timeout,
437                              const Predicate& pred) {
438   DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
439   while (!pred()) {
440     // Lock the work mutex to call suspend().
441     work.mutex.lock();
442 
443     // Unlock the wait mutex with the work mutex lock held.
444     // Order is important here as we need to ensure that the fiber is not
445     // enqueued (via Fiber::notify()) between the waitLock.unlock() and fiber
446     // switch, otherwise the Fiber::notify() call may be ignored and the fiber
447     // is never woken.
448     waitLock.unlock_no_tsa();
449 
450     // suspend the fiber.
451     suspend(timeout);
452 
453     // Fiber resumed. We don't need the work mutex locked any more.
454     work.mutex.unlock();
455 
456     // Re-lock to either return due to timeout, or call pred().
457     waitLock.lock_no_tsa();
458 
459     // Check timeout.
460     if (timeout != nullptr && std::chrono::system_clock::now() >= *timeout) {
461       return false;
462     }
463 
464     // Spurious wake up. Spin again.
465   }
466   return true;
467 }
468 
suspend(const std::chrono::system_clock::time_point * timeout)469 void Scheduler::Worker::suspend(
470     const std::chrono::system_clock::time_point* timeout) {
471   // Current fiber is yielding as it is blocked.
472   if (timeout != nullptr) {
473     changeFiberState(currentFiber, Fiber::State::Running,
474                      Fiber::State::Waiting);
475     work.waiting.add(*timeout, currentFiber);
476   } else {
477     changeFiberState(currentFiber, Fiber::State::Running,
478                      Fiber::State::Yielded);
479   }
480 
481   // First wait until there's something else this worker can do.
482   waitForWork();
483 
484   work.numBlockedFibers++;
485 
486   if (!work.fibers.empty()) {
487     // There's another fiber that has become unblocked, resume that.
488     work.num--;
489     auto to = containers::take(work.fibers);
490     ASSERT_FIBER_STATE(to, Fiber::State::Queued);
491     switchToFiber(to);
492   } else if (!idleFibers.empty()) {
493     // There's an old fiber we can reuse, resume that.
494     auto to = containers::take(idleFibers);
495     ASSERT_FIBER_STATE(to, Fiber::State::Idle);
496     switchToFiber(to);
497   } else {
498     // Tasks to process and no existing fibers to resume.
499     // Spawn a new fiber.
500     switchToFiber(createWorkerFiber());
501   }
502 
503   work.numBlockedFibers--;
504 
505   setFiberState(currentFiber, Fiber::State::Running);
506 }
507 
tryLock()508 bool Scheduler::Worker::tryLock() {
509   return work.mutex.try_lock();
510 }
511 
enqueue(Fiber * fiber)512 void Scheduler::Worker::enqueue(Fiber* fiber) {
513   bool notify = false;
514   {
515     marl::lock lock(work.mutex);
516     DBG_LOG("%d: ENQUEUE(%d %s)", (int)id, (int)fiber->id,
517             Fiber::toString(fiber->state));
518     switch (fiber->state) {
519       case Fiber::State::Running:
520       case Fiber::State::Queued:
521         return;  // Nothing to do here - task is already queued or running.
522       case Fiber::State::Waiting:
523         work.waiting.erase(fiber);
524         break;
525       case Fiber::State::Idle:
526       case Fiber::State::Yielded:
527         break;
528     }
529     notify = work.notifyAdded;
530     work.fibers.push_back(fiber);
531     MARL_ASSERT(!work.waiting.contains(fiber),
532                 "fiber is unexpectedly in the waiting list");
533     setFiberState(fiber, Fiber::State::Queued);
534     work.num++;
535   }
536 
537   if (notify) {
538     work.added.notify_one();
539   }
540 }
541 
enqueue(Task && task)542 void Scheduler::Worker::enqueue(Task&& task) {
543   work.mutex.lock();
544   enqueueAndUnlock(std::move(task));
545 }
546 
enqueueAndUnlock(Task && task)547 void Scheduler::Worker::enqueueAndUnlock(Task&& task) {
548   auto notify = work.notifyAdded;
549   work.tasks.push_back(std::move(task));
550   work.num++;
551   work.mutex.unlock();
552   if (notify) {
553     work.added.notify_one();
554   }
555 }
556 
steal(Task & out)557 bool Scheduler::Worker::steal(Task& out) {
558   if (work.num.load() == 0) {
559     return false;
560   }
561   if (!work.mutex.try_lock()) {
562     return false;
563   }
564   if (work.tasks.empty() || work.tasks.front().is(Task::Flags::SameThread)) {
565     work.mutex.unlock();
566     return false;
567   }
568   work.num--;
569   out = containers::take(work.tasks);
570   work.mutex.unlock();
571   return true;
572 }
573 
run()574 void Scheduler::Worker::run() {
575   if (mode == Mode::MultiThreaded) {
576     MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id);
577     // This is the entry point for a multi-threaded worker.
578     // Start with a regular condition-variable wait for work. This avoids
579     // starting the thread with a spinForWork().
580     work.wait([this]() REQUIRES(work.mutex) {
581       return work.num > 0 || work.waiting || shutdown;
582     });
583   }
584   ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
585   runUntilShutdown();
586   switchToFiber(mainFiber.get());
587 }
588 
runUntilShutdown()589 void Scheduler::Worker::runUntilShutdown() {
590   while (!shutdown || work.num > 0 || work.numBlockedFibers > 0U) {
591     waitForWork();
592     runUntilIdle();
593   }
594 }
595 
waitForWork()596 void Scheduler::Worker::waitForWork() {
597   MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
598               "work.num out of sync");
599   if (work.num > 0) {
600     return;
601   }
602 
603   if (mode == Mode::MultiThreaded) {
604     scheduler->onBeginSpinning(id);
605     work.mutex.unlock();
606     spinForWork();
607     work.mutex.lock();
608   }
609 
610   work.wait([this]() REQUIRES(work.mutex) {
611     return work.num > 0 || (shutdown && work.numBlockedFibers == 0U);
612   });
613   if (work.waiting) {
614     enqueueFiberTimeouts();
615   }
616 }
617 
enqueueFiberTimeouts()618 void Scheduler::Worker::enqueueFiberTimeouts() {
619   auto now = std::chrono::system_clock::now();
620   while (auto fiber = work.waiting.take(now)) {
621     changeFiberState(fiber, Fiber::State::Waiting, Fiber::State::Queued);
622     DBG_LOG("%d: TIMEOUT(%d)", (int)id, (int)fiber->id);
623     work.fibers.push_back(fiber);
624     work.num++;
625   }
626 }
627 
changeFiberState(Fiber * fiber,Fiber::State from,Fiber::State to) const628 void Scheduler::Worker::changeFiberState(Fiber* fiber,
629                                          Fiber::State from,
630                                          Fiber::State to) const {
631   (void)from;  // Unusued parameter when ENABLE_DEBUG_LOGGING is disabled.
632   DBG_LOG("%d: CHANGE_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
633           Fiber::toString(from), Fiber::toString(to));
634   ASSERT_FIBER_STATE(fiber, from);
635   fiber->state = to;
636 }
637 
setFiberState(Fiber * fiber,Fiber::State to) const638 void Scheduler::Worker::setFiberState(Fiber* fiber, Fiber::State to) const {
639   DBG_LOG("%d: SET_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
640           Fiber::toString(fiber->state), Fiber::toString(to));
641   fiber->state = to;
642 }
643 
spinForWork()644 void Scheduler::Worker::spinForWork() {
645   TRACE("SPIN");
646   Task stolen;
647 
648   constexpr auto duration = std::chrono::milliseconds(1);
649   auto start = std::chrono::high_resolution_clock::now();
650   while (std::chrono::high_resolution_clock::now() - start < duration) {
651     for (int i = 0; i < 256; i++)  // Empirically picked magic number!
652     {
653       // clang-format off
654       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
655       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
656       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
657       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
658       // clang-format on
659       if (work.num > 0) {
660         return;
661       }
662     }
663 
664     if (scheduler->stealWork(this, rng(), stolen)) {
665       marl::lock lock(work.mutex);
666       work.tasks.emplace_back(std::move(stolen));
667       work.num++;
668       return;
669     }
670 
671     std::this_thread::yield();
672   }
673 }
674 
runUntilIdle()675 void Scheduler::Worker::runUntilIdle() {
676   ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
677   MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
678               "work.num out of sync");
679   while (!work.fibers.empty() || !work.tasks.empty()) {
680     // Note: we cannot take and store on the stack more than a single fiber
681     // or task at a time, as the Fiber may yield and these items may get
682     // held on suspended fiber stack.
683 
684     while (!work.fibers.empty()) {
685       work.num--;
686       auto fiber = containers::take(work.fibers);
687       // Sanity checks,
688       MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle");
689       MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running");
690       ASSERT_FIBER_STATE(fiber, Fiber::State::Queued);
691 
692       changeFiberState(currentFiber, Fiber::State::Running, Fiber::State::Idle);
693       auto added = idleFibers.emplace(currentFiber).second;
694       (void)added;
695       MARL_ASSERT(added, "fiber already idle");
696 
697       switchToFiber(fiber);
698       changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running);
699     }
700 
701     if (!work.tasks.empty()) {
702       work.num--;
703       auto task = containers::take(work.tasks);
704       work.mutex.unlock();
705 
706       // Run the task.
707       task();
708 
709       // std::function<> can carry arguments with complex destructors.
710       // Ensure these are destructed outside of the lock.
711       task = Task();
712 
713       work.mutex.lock();
714     }
715   }
716 }
717 
createWorkerFiber()718 Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
719   auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
720   DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
721   auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId,
722                              scheduler->cfg.fiberStackSize,
723                              [&]() REQUIRES(work.mutex) { run(); });
724   auto ptr = fiber.get();
725   workerFibers.emplace_back(std::move(fiber));
726   return ptr;
727 }
728 
switchToFiber(Fiber * to)729 void Scheduler::Worker::switchToFiber(Fiber* to) {
730   DBG_LOG("%d: SWITCH(%d -> %d)", (int)id, (int)currentFiber->id, (int)to->id);
731   MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
732               "switching to idle fiber");
733   auto from = currentFiber;
734   currentFiber = to;
735   from->switchTo(to);
736 }
737 
738 ////////////////////////////////////////////////////////////////////////////////
739 // Scheduler::Worker::Work
740 ////////////////////////////////////////////////////////////////////////////////
Work(Allocator * allocator)741 Scheduler::Worker::Work::Work(Allocator* allocator)
742     : tasks(allocator), fibers(allocator), waiting(allocator) {}
743 
744 template <typename F>
wait(F && f)745 void Scheduler::Worker::Work::wait(F&& f) {
746   notifyAdded = true;
747   if (waiting) {
748     mutex.wait_until_locked(added, waiting.next(), f);
749   } else {
750     mutex.wait_locked(added, f);
751   }
752   notifyAdded = false;
753 }
754 
755 ////////////////////////////////////////////////////////////////////////////////
756 // Scheduler::Worker::Work
757 ////////////////////////////////////////////////////////////////////////////////
SingleThreadedWorkers(Allocator * allocator)758 Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator)
759     : byTid(allocator) {}
760 
761 }  // namespace marl
762