1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace schedulers {
13
14 struct event_loop : public scheduler_interface
15 {
16 private:
17 typedef event_loop this_type;
18 event_loop(const this_type&);
19
20 struct loop_worker : public worker_interface
21 {
22 private:
23 typedef loop_worker this_type;
24 loop_worker(const this_type&);
25
26 typedef detail::schedulable_queue<
27 typename clock_type::time_point> queue_item_time;
28
29 typedef queue_item_time::item_type item_type;
30
31 composite_subscription lifetime;
32 worker controller;
33 std::shared_ptr<const scheduler_interface> alive;
34
35 public:
~loop_workerrxcpp::schedulers::event_loop::loop_worker36 virtual ~loop_worker()
37 {
38 }
loop_workerrxcpp::schedulers::event_loop::loop_worker39 loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive)
40 : lifetime(cs)
41 , controller(w)
42 , alive(alive)
43 {
44 auto token = controller.add(cs);
45 cs.add([token, w](){
46 w.remove(token);
47 });
48 }
49
nowrxcpp::schedulers::event_loop::loop_worker50 virtual clock_type::time_point now() const {
51 return clock_type::now();
52 }
53
schedulerxcpp::schedulers::event_loop::loop_worker54 virtual void schedule(const schedulable& scbl) const {
55 controller.schedule(lifetime, scbl.get_action());
56 }
57
schedulerxcpp::schedulers::event_loop::loop_worker58 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
59 controller.schedule(when, lifetime, scbl.get_action());
60 }
61 };
62
63 mutable thread_factory factory;
64 scheduler newthread;
65 mutable std::atomic<std::size_t> count;
66 composite_subscription loops_lifetime;
67 std::vector<worker> loops;
68
69 public:
event_looprxcpp::schedulers::event_loop70 event_loop()
71 : factory([](std::function<void()> start){
72 return std::thread(std::move(start));
73 })
74 , newthread(make_new_thread())
75 , count(0)
76 {
77 auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
78 while (remaining--) {
79 loops.push_back(newthread.create_worker(loops_lifetime));
80 }
81 }
event_looprxcpp::schedulers::event_loop82 explicit event_loop(thread_factory tf)
83 : factory(tf)
84 , newthread(make_new_thread(tf))
85 , count(0)
86 {
87 auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
88 while (remaining--) {
89 loops.push_back(newthread.create_worker(loops_lifetime));
90 }
91 }
~event_looprxcpp::schedulers::event_loop92 virtual ~event_loop()
93 {
94 loops_lifetime.unsubscribe();
95 }
96
nowrxcpp::schedulers::event_loop97 virtual clock_type::time_point now() const {
98 return clock_type::now();
99 }
100
create_workerrxcpp::schedulers::event_loop101 virtual worker create_worker(composite_subscription cs) const {
102 return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this()));
103 }
104 };
105
make_event_loop()106 inline scheduler make_event_loop() {
107 static scheduler instance = make_scheduler<event_loop>();
108 return instance;
109 }
make_event_loop(thread_factory tf)110 inline scheduler make_event_loop(thread_factory tf) {
111 return make_scheduler<event_loop>(tf);
112 }
113
114 }
115
116 }
117
118 #endif
119