1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SCHEDULER_HPP)
6 #define RXCPP_RX_SCHEDULER_HPP
7
8 #include "rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace schedulers {
13
14 class worker_interface;
15 class scheduler_interface;
16
17 namespace detail {
18
19 class action_type;
20 typedef std::shared_ptr<action_type> action_ptr;
21
22 typedef std::shared_ptr<worker_interface> worker_interface_ptr;
23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
24
25 typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
26 typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;
27
28 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
29 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
30
shared_empty()31 inline action_ptr shared_empty() {
32 static action_ptr shared_empty = std::make_shared<detail::action_type>();
33 return shared_empty;
34 }
35
36 }
37
38 // It is essential to keep virtual function calls out of an inner loop.
39 // To make tail-recursion work efficiently the recursion objects create
40 // a space on the stack inside the virtual function call in the actor that
41 // allows the callback and the scheduler to share stack space that records
42 // the request and the allowance without any virtual calls in the loop.
43
44 /// recursed is set on a schedulable by the action to allow the called
45 /// function to request to be rescheduled.
46 class recursed
47 {
48 bool& isrequested;
49 recursed operator=(const recursed&);
50 public:
recursed(bool & r)51 explicit recursed(bool& r)
52 : isrequested(r)
53 {
54 }
55 /// request to be rescheduled
operator ()() const56 inline void operator()() const {
57 isrequested = true;
58 }
59 };
60
61 /// recurse is passed to the action by the scheduler.
62 /// the action uses recurse to coordinate the scheduler and the function.
63 class recurse
64 {
65 bool& isallowed;
66 mutable bool isrequested;
67 recursed requestor;
68 recurse operator=(const recurse&);
69 public:
recurse(bool & a)70 explicit recurse(bool& a)
71 : isallowed(a)
72 , isrequested(true)
73 , requestor(isrequested)
74 {
75 }
76 /// does the scheduler allow tail-recursion now?
is_allowed() const77 inline bool is_allowed() const {
78 return isallowed;
79 }
80 /// did the function request to be recursed?
is_requested() const81 inline bool is_requested() const {
82 return isrequested;
83 }
84 /// reset the function request. call before each call to the function.
reset() const85 inline void reset() const {
86 isrequested = false;
87 }
88 /// get the recursed to set into the schedulable for the function to use to request recursion
get_recursed() const89 inline const recursed& get_recursed() const {
90 return requestor;
91 }
92 };
93
94 /// recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
95 class recursion
96 {
97 mutable bool isallowed;
98 recurse recursor;
99 recursion operator=(const recursion&);
100 public:
recursion()101 recursion()
102 : isallowed(true)
103 , recursor(isallowed)
104 {
105 }
recursion(bool b)106 explicit recursion(bool b)
107 : isallowed(b)
108 , recursor(isallowed)
109 {
110 }
111 /// set whether tail-recursion is allowed
reset(bool b=true) const112 inline void reset(bool b = true) const {
113 isallowed = b;
114 }
115 /// get the recurse to pass into each action being called
get_recurse() const116 inline const recurse& get_recurse() const {
117 return recursor;
118 }
119 };
120
121
122 struct action_base
123 {
124 typedef tag_action action_tag;
125 };
126
127 class schedulable;
128
129 /// action provides type-forgetting for a potentially recursive set of calls to a function that takes a schedulable
130 class action : public action_base
131 {
132 typedef action this_type;
133 detail::action_ptr inner;
134 public:
action()135 action()
136 {
137 }
action(detail::action_ptr i)138 explicit action(detail::action_ptr i)
139 : inner(std::move(i))
140 {
141 }
142
143 /// return the empty action
empty()144 inline static action empty() {
145 return action(detail::shared_empty());
146 }
147
148 /// call the function
149 inline void operator()(const schedulable& s, const recurse& r) const;
150 };
151
152 struct scheduler_base
153 {
154 typedef std::chrono::steady_clock clock_type;
155 typedef tag_scheduler scheduler_tag;
156 };
157
158 struct worker_base : public subscription_base
159 {
160 typedef tag_worker worker_tag;
161 };
162
163 class worker_interface
164 : public std::enable_shared_from_this<worker_interface>
165 {
166 typedef worker_interface this_type;
167
168 public:
169 typedef scheduler_base::clock_type clock_type;
170
~worker_interface()171 virtual ~worker_interface() {}
172
173 virtual clock_type::time_point now() const = 0;
174
175 virtual void schedule(const schedulable& scbl) const = 0;
176 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0;
177 };
178
179 namespace detail {
180
181 template<class F>
182 struct is_action_function
183 {
184 struct not_void {};
185 template<class CF>
186 static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr));
187 template<class CF>
188 static not_void check(...);
189
190 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
191 };
192
193 }
194
195 class weak_worker;
196
197 /// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap
198 /// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed
199 /// some inner implementations will impose additional constraints on the execution of items.
200 class worker : public worker_base
201 {
202 typedef worker this_type;
203 detail::worker_interface_ptr inner;
204 composite_subscription lifetime;
205 friend bool operator==(const worker&, const worker&);
206 friend class weak_worker;
207 public:
208 typedef scheduler_base::clock_type clock_type;
209 typedef composite_subscription::weak_subscription weak_subscription;
210
worker()211 worker()
212 {
213 }
worker(composite_subscription cs,detail::const_worker_interface_ptr i)214 worker(composite_subscription cs, detail::const_worker_interface_ptr i)
215 : inner(std::const_pointer_cast<worker_interface>(i))
216 , lifetime(std::move(cs))
217 {
218 }
worker(composite_subscription cs,worker o)219 worker(composite_subscription cs, worker o)
220 : inner(o.inner)
221 , lifetime(std::move(cs))
222 {
223 }
224
get_subscription() const225 inline const composite_subscription& get_subscription() const {
226 return lifetime;
227 }
get_subscription()228 inline composite_subscription& get_subscription() {
229 return lifetime;
230 }
231
232 // composite_subscription
233 //
is_subscribed() const234 inline bool is_subscribed() const {
235 return lifetime.is_subscribed();
236 }
add(subscription s) const237 inline weak_subscription add(subscription s) const {
238 return lifetime.add(std::move(s));
239 }
remove(weak_subscription w) const240 inline void remove(weak_subscription w) const {
241 return lifetime.remove(std::move(w));
242 }
clear() const243 inline void clear() const {
244 return lifetime.clear();
245 }
unsubscribe() const246 inline void unsubscribe() const {
247 return lifetime.unsubscribe();
248 }
249
250 // worker_interface
251 //
252 /// return the current time for this worker
now() const253 inline clock_type::time_point now() const {
254 return inner->now();
255 }
256
257 /// insert the supplied schedulable to be run as soon as possible
schedule(const schedulable & scbl) const258 inline void schedule(const schedulable& scbl) const {
259 // force rebinding scbl to this worker
260 schedule_rebind(scbl);
261 }
262
263 /// insert the supplied schedulable to be run at the time specified
schedule(clock_type::time_point when,const schedulable & scbl) const264 inline void schedule(clock_type::time_point when, const schedulable& scbl) const {
265 // force rebinding scbl to this worker
266 schedule_rebind(when, scbl);
267 }
268
269 // helpers
270 //
271
272 /// insert the supplied schedulable to be run at now() + the delay specified
schedule(clock_type::duration when,const schedulable & scbl) const273 inline void schedule(clock_type::duration when, const schedulable& scbl) const {
274 // force rebinding scbl to this worker
275 schedule_rebind(now() + when, scbl);
276 }
277
278 /// insert the supplied schedulable to be run at the initial time specified and then again at initial + (N * period)
279 /// this will continue until the worker or schedulable is unsubscribed.
schedule_periodically(clock_type::time_point initial,clock_type::duration period,const schedulable & scbl) const280 inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const {
281 // force rebinding scbl to this worker
282 schedule_periodically_rebind(initial, period, scbl);
283 }
284
285 /// insert the supplied schedulable to be run at now() + the initial delay specified and then again at now() + initial + (N * period)
286 /// this will continue until the worker or schedulable is unsubscribed.
schedule_periodically(clock_type::duration initial,clock_type::duration period,const schedulable & scbl) const287 inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const {
288 // force rebinding scbl to this worker
289 schedule_periodically_rebind(now() + initial, period, scbl);
290 }
291
292 /// use the supplied arguments to make a schedulable and then insert it to be run
293 template<class Arg0, class... ArgN>
294 auto schedule(Arg0&& a0, ArgN&&... an) const
295 -> typename std::enable_if<
296 (detail::is_action_function<Arg0>::value ||
297 is_subscription<Arg0>::value) &&
298 !is_schedulable<Arg0>::value>::type;
299 template<class... ArgN>
300 /// use the supplied arguments to make a schedulable and then insert it to be run
301 void schedule_rebind(const schedulable& scbl, ArgN&&... an) const;
302
303 /// use the supplied arguments to make a schedulable and then insert it to be run
304 template<class Arg0, class... ArgN>
305 auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
306 -> typename std::enable_if<
307 (detail::is_action_function<Arg0>::value ||
308 is_subscription<Arg0>::value) &&
309 !is_schedulable<Arg0>::value>::type;
310 /// use the supplied arguments to make a schedulable and then insert it to be run
311 template<class... ArgN>
312 void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const;
313
314 /// use the supplied arguments to make a schedulable and then insert it to be run
315 template<class Arg0, class... ArgN>
316 auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
317 -> typename std::enable_if<
318 (detail::is_action_function<Arg0>::value ||
319 is_subscription<Arg0>::value) &&
320 !is_schedulable<Arg0>::value>::type;
321 /// use the supplied arguments to make a schedulable and then insert it to be run
322 template<class... ArgN>
323 void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const;
324 };
325
operator ==(const worker & lhs,const worker & rhs)326 inline bool operator==(const worker& lhs, const worker& rhs) {
327 return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
328 }
operator !=(const worker & lhs,const worker & rhs)329 inline bool operator!=(const worker& lhs, const worker& rhs) {
330 return !(lhs == rhs);
331 }
332
333 class weak_worker
334 {
335 detail::worker_interface_weak_ptr inner;
336 composite_subscription lifetime;
337
338 public:
weak_worker()339 weak_worker()
340 {
341 }
weak_worker(worker & owner)342 explicit weak_worker(worker& owner)
343 : inner(owner.inner)
344 , lifetime(owner.lifetime)
345 {
346 }
347
lock() const348 worker lock() const {
349 return worker(lifetime, inner.lock());
350 }
351 };
352
353 class scheduler_interface
354 : public std::enable_shared_from_this<scheduler_interface>
355 {
356 typedef scheduler_interface this_type;
357
358 public:
359 typedef scheduler_base::clock_type clock_type;
360
~scheduler_interface()361 virtual ~scheduler_interface() {}
362
363 virtual clock_type::time_point now() const = 0;
364
365 virtual worker create_worker(composite_subscription cs) const = 0;
366 };
367
368
369 struct schedulable_base :
370 // public subscription_base, <- already in worker base
371 public worker_base,
372 public action_base
373 {
374 typedef tag_schedulable schedulable_tag;
375 };
376
377 /*!
378 \brief allows functions to be called at specified times and possibly in other contexts.
379
380 \ingroup group-core
381
382 */
383 class scheduler : public scheduler_base
384 {
385 typedef scheduler this_type;
386 detail::scheduler_interface_ptr inner;
387 friend bool operator==(const scheduler&, const scheduler&);
388 public:
389 typedef scheduler_base::clock_type clock_type;
390
scheduler()391 scheduler()
392 {
393 }
scheduler(detail::scheduler_interface_ptr i)394 explicit scheduler(detail::scheduler_interface_ptr i)
395 : inner(std::move(i))
396 {
397 }
scheduler(detail::const_scheduler_interface_ptr i)398 explicit scheduler(detail::const_scheduler_interface_ptr i)
399 : inner(std::const_pointer_cast<scheduler_interface>(i))
400 {
401 }
402
403 /// return the current time for this scheduler
now() const404 inline clock_type::time_point now() const {
405 return inner->now();
406 }
407 /// create a worker with a lifetime.
408 /// when the worker is unsubscribed all scheduled items will be unsubscribed.
409 /// items scheduled to a worker will be run one at a time.
410 /// scheduling order is preserved: when more than one item is scheduled for
411 /// time T then at time T they will be run in the order that they were scheduled.
create_worker(composite_subscription cs=composite_subscription ()) const412 inline worker create_worker(composite_subscription cs = composite_subscription()) const {
413 return inner->create_worker(cs);
414 }
415 };
416
417 template<class Scheduler, class... ArgN>
make_scheduler(ArgN &&...an)418 inline scheduler make_scheduler(ArgN&&... an) {
419 return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
420 }
421
make_scheduler(std::shared_ptr<scheduler_interface> si)422 inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) {
423 return scheduler(si);
424 }
425
426 class schedulable : public schedulable_base
427 {
428 typedef schedulable this_type;
429
430 composite_subscription lifetime;
431 weak_worker controller;
432 action activity;
433 bool scoped;
434 composite_subscription::weak_subscription action_scope;
435
436 struct detacher
437 {
~detacherrxcpp::schedulers::schedulable::detacher438 ~detacher()
439 {
440 if (that) {
441 that->unsubscribe();
442 }
443 }
detacherrxcpp::schedulers::schedulable::detacher444 detacher(const this_type* that)
445 : that(that)
446 {
447 }
448 const this_type* that;
449 };
450
451 class recursed_scope_type
452 {
453 mutable const recursed* requestor;
454
455 class exit_recursed_scope_type
456 {
457 const recursed_scope_type* that;
458 public:
~exit_recursed_scope_type()459 ~exit_recursed_scope_type()
460 {
461 if (that != nullptr) {
462 that->requestor = nullptr;
463 }
464 }
exit_recursed_scope_type(const recursed_scope_type * that)465 exit_recursed_scope_type(const recursed_scope_type* that)
466 : that(that)
467 {
468 }
exit_recursed_scope_type(exit_recursed_scope_type && other)469 exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT
470 : that(other.that)
471 {
472 other.that = nullptr;
473 }
474 };
475 public:
recursed_scope_type()476 recursed_scope_type()
477 : requestor(nullptr)
478 {
479 }
recursed_scope_type(const recursed_scope_type &)480 recursed_scope_type(const recursed_scope_type&)
481 : requestor(nullptr)
482 {
483 // does not aquire recursion scope
484 }
operator =(const recursed_scope_type &)485 recursed_scope_type& operator=(const recursed_scope_type& )
486 {
487 // no change in recursion scope
488 return *this;
489 }
reset(const recurse & r) const490 exit_recursed_scope_type reset(const recurse& r) const {
491 requestor = std::addressof(r.get_recursed());
492 return exit_recursed_scope_type(this);
493 }
is_recursed() const494 bool is_recursed() const {
495 return !!requestor;
496 }
operator ()() const497 void operator()() const {
498 (*requestor)();
499 }
500 };
501 recursed_scope_type recursed_scope;
502
503 public:
504 typedef composite_subscription::weak_subscription weak_subscription;
505 typedef scheduler_base::clock_type clock_type;
506
~schedulable()507 ~schedulable()
508 {
509 if (scoped) {
510 controller.lock().remove(action_scope);
511 }
512 }
schedulable()513 schedulable()
514 : scoped(false)
515 {
516 }
517
518 /// action and worker share lifetime
schedulable(worker q,action a)519 schedulable(worker q, action a)
520 : lifetime(q.get_subscription())
521 , controller(q)
522 , activity(std::move(a))
523 , scoped(false)
524 {
525 }
526 /// action and worker have independent lifetimes
schedulable(composite_subscription cs,worker q,action a)527 schedulable(composite_subscription cs, worker q, action a)
528 : lifetime(std::move(cs))
529 , controller(q)
530 , activity(std::move(a))
531 , scoped(true)
532 , action_scope(controller.lock().add(lifetime))
533 {
534 }
535 /// inherit lifetimes
schedulable(schedulable scbl,worker q,action a)536 schedulable(schedulable scbl, worker q, action a)
537 : lifetime(scbl.get_subscription())
538 , controller(q)
539 , activity(std::move(a))
540 , scoped(scbl.scoped)
541 , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription())
542 {
543 }
544
get_subscription() const545 inline const composite_subscription& get_subscription() const {
546 return lifetime;
547 }
get_subscription()548 inline composite_subscription& get_subscription() {
549 return lifetime;
550 }
get_worker() const551 inline const worker get_worker() const {
552 return controller.lock();
553 }
get_worker()554 inline worker get_worker() {
555 return controller.lock();
556 }
get_action() const557 inline const action& get_action() const {
558 return activity;
559 }
get_action()560 inline action& get_action() {
561 return activity;
562 }
563
empty(worker sc)564 inline static schedulable empty(worker sc) {
565 return schedulable(composite_subscription::empty(), sc, action::empty());
566 }
567
set_recursed(const recurse & r) const568 inline auto set_recursed(const recurse& r) const
569 -> decltype(recursed_scope.reset(r)) {
570 return recursed_scope.reset(r);
571 }
572
573 // recursed
574 //
is_recursed() const575 bool is_recursed() const {
576 return recursed_scope.is_recursed();
577 }
578 /// requests tail-recursion of the same action
579 /// this will exit the process if called when
580 /// is_recursed() is false.
581 /// Note: to improve perf it is not required
582 /// to call is_recursed() before calling this
583 /// operator. Context is sufficient. The schedulable
584 /// passed to the action by the scheduler will return
585 /// true from is_recursed()
operator ()() const586 inline void operator()() const {
587 recursed_scope();
588 }
589
590 // composite_subscription
591 //
is_subscribed() const592 inline bool is_subscribed() const {
593 return lifetime.is_subscribed();
594 }
add(subscription s) const595 inline weak_subscription add(subscription s) const {
596 return lifetime.add(std::move(s));
597 }
598 template<class F>
add(F f) const599 auto add(F f) const
600 -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
601 return lifetime.add(make_subscription(std::move(f)));
602 }
remove(weak_subscription w) const603 inline void remove(weak_subscription w) const {
604 return lifetime.remove(std::move(w));
605 }
clear() const606 inline void clear() const {
607 return lifetime.clear();
608 }
unsubscribe() const609 inline void unsubscribe() const {
610 return lifetime.unsubscribe();
611 }
612
613 // scheduler
614 //
now() const615 inline clock_type::time_point now() const {
616 return controller.lock().now();
617 }
618 /// put this on the queue of the stored scheduler to run asap
schedule() const619 inline void schedule() const {
620 if (is_subscribed()) {
621 get_worker().schedule(*this);
622 }
623 }
624 /// put this on the queue of the stored scheduler to run at the specified time
schedule(clock_type::time_point when) const625 inline void schedule(clock_type::time_point when) const {
626 if (is_subscribed()) {
627 get_worker().schedule(when, *this);
628 }
629 }
630 /// put this on the queue of the stored scheduler to run after a delay from now
schedule(clock_type::duration when) const631 inline void schedule(clock_type::duration when) const {
632 if (is_subscribed()) {
633 get_worker().schedule(when, *this);
634 }
635 }
636
637 // action
638 //
639 /// invokes the action
operator ()(const recurse & r) const640 inline void operator()(const recurse& r) const {
641 if (!is_subscribed()) {
642 return;
643 }
644 detacher protect(this);
645 activity(*this, r);
646 protect.that = nullptr;
647 }
648 };
649
650 struct current_thread;
651
652 namespace detail {
653
654 class action_type
655 : public std::enable_shared_from_this<action_type>
656 {
657 typedef action_type this_type;
658
659 public:
660 typedef std::function<void(const schedulable&, const recurse&)> function_type;
661
662 private:
663 function_type f;
664
665 public:
action_type()666 action_type()
667 {
668 }
669
action_type(function_type f)670 action_type(function_type f)
671 : f(std::move(f))
672 {
673 }
674
operator ()(const schedulable & s,const recurse & r)675 inline void operator()(const schedulable& s, const recurse& r) {
676 if (!f) {
677 std::terminate();
678 }
679 f(s, r);
680 }
681 };
682
683 class action_tailrecurser
684 : public std::enable_shared_from_this<action_type>
685 {
686 typedef action_type this_type;
687
688 public:
689 typedef std::function<void(const schedulable&)> function_type;
690
691 private:
692 function_type f;
693
694 public:
action_tailrecurser()695 action_tailrecurser()
696 {
697 }
698
action_tailrecurser(function_type f)699 action_tailrecurser(function_type f)
700 : f(std::move(f))
701 {
702 }
703
operator ()(const schedulable & s,const recurse & r)704 inline void operator()(const schedulable& s, const recurse& r) {
705 if (!f) {
706 std::terminate();
707 }
708 trace_activity().action_enter(s);
709 auto scope = s.set_recursed(r);
710 while (s.is_subscribed()) {
711 r.reset();
712 f(s);
713 if (!r.is_allowed() || !r.is_requested()) {
714 if (r.is_requested()) {
715 s.schedule();
716 }
717 break;
718 }
719 trace_activity().action_recurse(s);
720 }
721 trace_activity().action_return(s);
722 }
723 };
724 }
725
operator ()(const schedulable & s,const recurse & r) const726 inline void action::operator()(const schedulable& s, const recurse& r) const {
727 (*inner)(s, r);
728 }
729
make_action_empty()730 inline action make_action_empty() {
731 return action::empty();
732 }
733
734 template<class F>
make_action(F && f)735 inline action make_action(F&& f) {
736 static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)");
737 auto fn = std::forward<F>(f);
738 return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn)));
739 }
740
741 // copy
make_schedulable(const schedulable & scbl)742 inline auto make_schedulable(
743 const schedulable& scbl)
744 -> schedulable {
745 return schedulable(scbl);
746 }
747 // move
make_schedulable(schedulable && scbl)748 inline auto make_schedulable(
749 schedulable&& scbl)
750 -> schedulable {
751 return schedulable(std::move(scbl));
752 }
753
make_schedulable(worker sc,action a)754 inline schedulable make_schedulable(worker sc, action a) {
755 return schedulable(sc, a);
756 }
make_schedulable(worker sc,composite_subscription cs,action a)757 inline schedulable make_schedulable(worker sc, composite_subscription cs, action a) {
758 return schedulable(cs, sc, a);
759 }
760
761 template<class F>
make_schedulable(worker sc,F && f)762 auto make_schedulable(worker sc, F&& f)
763 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
764 return schedulable(sc, make_action(std::forward<F>(f)));
765 }
766 template<class F>
make_schedulable(worker sc,composite_subscription cs,F && f)767 auto make_schedulable(worker sc, composite_subscription cs, F&& f)
768 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
769 return schedulable(cs, sc, make_action(std::forward<F>(f)));
770 }
771 template<class F>
make_schedulable(schedulable scbl,composite_subscription cs,F && f)772 auto make_schedulable(schedulable scbl, composite_subscription cs, F&& f)
773 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
774 return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f)));
775 }
776 template<class F>
make_schedulable(schedulable scbl,worker sc,F && f)777 auto make_schedulable(schedulable scbl, worker sc, F&& f)
778 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
779 return schedulable(scbl, sc, make_action(std::forward<F>(f)));
780 }
781 template<class F>
make_schedulable(schedulable scbl,F && f)782 auto make_schedulable(schedulable scbl, F&& f)
783 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
784 return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f)));
785 }
786
make_schedulable(schedulable scbl,composite_subscription cs)787 inline auto make_schedulable(schedulable scbl, composite_subscription cs)
788 -> schedulable {
789 return schedulable(cs, scbl.get_worker(), scbl.get_action());
790 }
make_schedulable(schedulable scbl,worker sc,composite_subscription cs)791 inline auto make_schedulable(schedulable scbl, worker sc, composite_subscription cs)
792 -> schedulable {
793 return schedulable(cs, sc, scbl.get_action());
794 }
make_schedulable(schedulable scbl,worker sc)795 inline auto make_schedulable(schedulable scbl, worker sc)
796 -> schedulable {
797 return schedulable(scbl, sc, scbl.get_action());
798 }
799
800 template<class Arg0, class... ArgN>
schedule(Arg0 && a0,ArgN &&...an) const801 auto worker::schedule(Arg0&& a0, ArgN&&... an) const
802 -> typename std::enable_if<
803 (detail::is_action_function<Arg0>::value ||
804 is_subscription<Arg0>::value) &&
805 !is_schedulable<Arg0>::value>::type {
806 auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
807 trace_activity().schedule_enter(*inner.get(), scbl);
808 inner->schedule(std::move(scbl));
809 trace_activity().schedule_return(*inner.get());
810 }
811 template<class... ArgN>
schedule_rebind(const schedulable & scbl,ArgN &&...an) const812 void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const {
813 auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
814 trace_activity().schedule_enter(*inner.get(), rescbl);
815 inner->schedule(std::move(rescbl));
816 trace_activity().schedule_return(*inner.get());
817 }
818
819 template<class Arg0, class... ArgN>
schedule(clock_type::time_point when,Arg0 && a0,ArgN &&...an) const820 auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
821 -> typename std::enable_if<
822 (detail::is_action_function<Arg0>::value ||
823 is_subscription<Arg0>::value) &&
824 !is_schedulable<Arg0>::value>::type {
825 auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
826 trace_activity().schedule_when_enter(*inner.get(), when, scbl);
827 inner->schedule(when, std::move(scbl));
828 trace_activity().schedule_when_return(*inner.get());
829 }
830 template<class... ArgN>
schedule_rebind(clock_type::time_point when,const schedulable & scbl,ArgN &&...an) const831 void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const {
832 auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
833 trace_activity().schedule_when_enter(*inner.get(), when, rescbl);
834 inner->schedule(when, std::move(rescbl));
835 trace_activity().schedule_when_return(*inner.get());
836 }
837
838 template<class Arg0, class... ArgN>
schedule_periodically(clock_type::time_point initial,clock_type::duration period,Arg0 && a0,ArgN &&...an) const839 auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
840 -> typename std::enable_if<
841 (detail::is_action_function<Arg0>::value ||
842 is_subscription<Arg0>::value) &&
843 !is_schedulable<Arg0>::value>::type {
844 schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
845 }
846 template<class... ArgN>
schedule_periodically_rebind(clock_type::time_point initial,clock_type::duration period,const schedulable & scbl,ArgN &&...an) const847 void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
848 auto keepAlive = *this;
849 auto target = std::make_shared<clock_type::time_point>(initial);
850 auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
851 auto periodic = make_schedulable(
852 activity,
853 [keepAlive, target, period, activity](schedulable self) {
854 // any recursion requests will be pushed to the scheduler queue
855 recursion r(false);
856 // call action
857 activity(r.get_recurse());
858
859 // schedule next occurance (if the action took longer than 'period' target will be in the past)
860 *target += period;
861 self.schedule(*target);
862 });
863 trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
864 inner->schedule(*target, periodic);
865 trace_activity().schedule_when_return(*inner.get());
866 }
867
868 namespace detail {
869
870 template<class TimePoint>
871 struct time_schedulable
872 {
873 typedef TimePoint time_point_type;
874
time_schedulablerxcpp::schedulers::detail::time_schedulable875 time_schedulable(TimePoint when, schedulable a)
876 : when(when)
877 , what(std::move(a))
878 {
879 }
880 TimePoint when;
881 schedulable what;
882 };
883
884
885 // Sorts time_schedulable items in priority order sorted
886 // on value of time_schedulable.when. Items with equal
887 // values for when are sorted in fifo order.
888 template<class TimePoint>
889 class schedulable_queue {
890 public:
891 typedef time_schedulable<TimePoint> item_type;
892 typedef std::pair<item_type, int64_t> elem_type;
893 typedef std::vector<elem_type> container_type;
894 typedef const item_type& const_reference;
895
896 private:
897 struct compare_elem
898 {
operator ()rxcpp::schedulers::detail::schedulable_queue::compare_elem899 bool operator()(const elem_type& lhs, const elem_type& rhs) const {
900 if (lhs.first.when == rhs.first.when) {
901 return lhs.second > rhs.second;
902 }
903 else {
904 return lhs.first.when > rhs.first.when;
905 }
906 }
907 };
908
909 typedef std::priority_queue<
910 elem_type,
911 container_type,
912 compare_elem
913 > queue_type;
914
915 queue_type q;
916
917 int64_t ordinal;
918 public:
919
schedulable_queue()920 schedulable_queue()
921 : ordinal(0)
922 {
923 }
924
top() const925 const_reference top() const {
926 return q.top().first;
927 }
928
pop()929 void pop() {
930 q.pop();
931 }
932
empty() const933 bool empty() const {
934 return q.empty();
935 }
936
push(const item_type & value)937 void push(const item_type& value) {
938 q.push(elem_type(value, ordinal++));
939 }
940
push(item_type && value)941 void push(item_type&& value) {
942 q.push(elem_type(std::move(value), ordinal++));
943 }
944 };
945
946 }
947
948 }
949 namespace rxsc=schedulers;
950
951 }
952
953 #include "schedulers/rx-currentthread.hpp"
954 #include "schedulers/rx-runloop.hpp"
955 #include "schedulers/rx-newthread.hpp"
956 #include "schedulers/rx-eventloop.hpp"
957 #include "schedulers/rx-immediate.hpp"
958 #include "schedulers/rx-virtualtime.hpp"
959 #include "schedulers/rx-sameworker.hpp"
960
961 #endif
962