1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace schedulers {
13
14 typedef std::function<std::thread(std::function<void()>)> thread_factory;
15
16 struct new_thread : public scheduler_interface
17 {
18 private:
19 typedef new_thread this_type;
20 new_thread(const this_type&);
21
22 struct new_worker : public worker_interface
23 {
24 private:
25 typedef new_worker this_type;
26
27 typedef detail::action_queue queue_type;
28
29 new_worker(const this_type&);
30
31 struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
32 {
33 typedef detail::schedulable_queue<
34 typename clock_type::time_point> queue_item_time;
35
36 typedef queue_item_time::item_type item_type;
37
~new_worker_staterxcpp::schedulers::new_thread::new_worker::new_worker_state38 virtual ~new_worker_state()
39 {
40 // Ensure that std::thread is no longer joinable,
41 // otherwise the destructor will call std::terminate.
42 if (!worker.joinable()) {
43 return;
44 }
45 if (worker.get_id() != std::this_thread::get_id()) {
46 worker.join();
47 } else {
48 worker.detach();
49 }
50 }
51
new_worker_staterxcpp::schedulers::new_thread::new_worker::new_worker_state52 explicit new_worker_state(composite_subscription cs)
53 : lifetime(cs)
54 {
55 }
56
57 composite_subscription lifetime;
58 mutable std::mutex lock;
59 mutable std::condition_variable wake;
60 mutable queue_item_time q;
61 std::thread worker;
62 recursion r;
63 };
64
65 std::shared_ptr<new_worker_state> state;
66
67 public:
~new_workerrxcpp::schedulers::new_thread::new_worker68 virtual ~new_worker()
69 {
70 }
71
new_workerrxcpp::schedulers::new_thread::new_worker72 explicit new_worker(std::shared_ptr<new_worker_state> ws)
73 : state(ws)
74 {
75 }
76
new_workerrxcpp::schedulers::new_thread::new_worker77 new_worker(composite_subscription cs, thread_factory& tf)
78 : state(std::make_shared<new_worker_state>(cs))
79 {
80 auto keepAlive = state;
81
82 state->lifetime.add([keepAlive](){
83 std::unique_lock<std::mutex> guard(keepAlive->lock);
84 auto expired = std::move(keepAlive->q);
85 keepAlive->q = new_worker_state::queue_item_time{};
86 if (!keepAlive->q.empty()) std::terminate();
87 keepAlive->wake.notify_one();
88
89 // ~new_worker_state cleans up the std::thread
90 });
91
92 state->worker = tf([keepAlive](){
93
94 // take ownership
95 queue_type::ensure(std::make_shared<new_worker>(keepAlive));
96 // release ownership
97 RXCPP_UNWIND_AUTO([]{
98 queue_type::destroy();
99 });
100
101 for(;;) {
102 std::unique_lock<std::mutex> guard(keepAlive->lock);
103 if (keepAlive->q.empty()) {
104 keepAlive->wake.wait(guard, [keepAlive](){
105 return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
106 });
107 }
108 if (!keepAlive->lifetime.is_subscribed()) {
109 break;
110 }
111 auto& peek = keepAlive->q.top();
112 if (!peek.what.is_subscribed()) {
113 keepAlive->q.pop();
114 continue;
115 }
116 if (clock_type::now() < peek.when) {
117 keepAlive->wake.wait_until(guard, peek.when);
118 continue;
119 }
120 auto what = peek.what;
121 keepAlive->q.pop();
122 keepAlive->r.reset(keepAlive->q.empty());
123 guard.unlock();
124 what(keepAlive->r.get_recurse());
125 }
126 });
127 }
128
nowrxcpp::schedulers::new_thread::new_worker129 virtual clock_type::time_point now() const {
130 return clock_type::now();
131 }
132
schedulerxcpp::schedulers::new_thread::new_worker133 virtual void schedule(const schedulable& scbl) const {
134 schedule(now(), scbl);
135 }
136
schedulerxcpp::schedulers::new_thread::new_worker137 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
138 if (scbl.is_subscribed()) {
139 std::unique_lock<std::mutex> guard(state->lock);
140 state->q.push(new_worker_state::item_type(when, scbl));
141 state->r.reset(false);
142 }
143 state->wake.notify_one();
144 }
145 };
146
147 mutable thread_factory factory;
148
149 public:
new_threadrxcpp::schedulers::new_thread150 new_thread()
151 : factory([](std::function<void()> start){
152 return std::thread(std::move(start));
153 })
154 {
155 }
new_threadrxcpp::schedulers::new_thread156 explicit new_thread(thread_factory tf)
157 : factory(tf)
158 {
159 }
~new_threadrxcpp::schedulers::new_thread160 virtual ~new_thread()
161 {
162 }
163
nowrxcpp::schedulers::new_thread164 virtual clock_type::time_point now() const {
165 return clock_type::now();
166 }
167
create_workerrxcpp::schedulers::new_thread168 virtual worker create_worker(composite_subscription cs) const {
169 return worker(cs, std::make_shared<new_worker>(cs, factory));
170 }
171 };
172
make_new_thread()173 inline scheduler make_new_thread() {
174 static scheduler instance = make_scheduler<new_thread>();
175 return instance;
176 }
make_new_thread(thread_factory tf)177 inline scheduler make_new_thread(thread_factory tf) {
178 return make_scheduler<new_thread>(tf);
179 }
180
181 }
182
183 }
184
185 #endif
186