1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP) 6 #define RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP 7 8 #include "../rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace schedulers { 13 14 namespace detail { 15 16 struct action_queue 17 { 18 typedef action_queue this_type; 19 20 typedef scheduler_base::clock_type clock; 21 typedef time_schedulable<clock::time_point> item_type; 22 23 private: 24 typedef schedulable_queue<item_type::time_point_type> queue_item_time; 25 26 public: 27 struct current_thread_queue_type { 28 std::shared_ptr<worker_interface> w; 29 recursion r; 30 queue_item_time q; 31 }; 32 33 private: 34 #if defined(RXCPP_THREAD_LOCAL) current_thread_queuerxcpp::schedulers::detail::action_queue35 static current_thread_queue_type*& current_thread_queue() { 36 static RXCPP_THREAD_LOCAL current_thread_queue_type* q; 37 return q; 38 } 39 #else 40 static rxu::thread_local_storage<current_thread_queue_type>& current_thread_queue() { 41 static rxu::thread_local_storage<current_thread_queue_type> q; 42 return q; 43 } 44 #endif 45 46 public: 47 ownedrxcpp::schedulers::detail::action_queue48 static bool owned() { 49 return !!current_thread_queue(); 50 } get_worker_interfacerxcpp::schedulers::detail::action_queue51 static const std::shared_ptr<worker_interface>& get_worker_interface() { 52 return current_thread_queue()->w; 53 } get_recursionrxcpp::schedulers::detail::action_queue54 static recursion& get_recursion() { 55 return current_thread_queue()->r; 56 } emptyrxcpp::schedulers::detail::action_queue57 static bool empty() { 58 if (!current_thread_queue()) { 59 std::terminate(); 60 } 61 return current_thread_queue()->q.empty(); 62 } toprxcpp::schedulers::detail::action_queue63 static queue_item_time::const_reference top() { 64 if (!current_thread_queue()) { 65 std::terminate(); 66 } 67 return current_thread_queue()->q.top(); 68 } poprxcpp::schedulers::detail::action_queue69 static void pop() { 70 auto& state = current_thread_queue(); 71 if (!state) { 72 std::terminate(); 73 } 74 state->q.pop(); 75 if (state->q.empty()) { 76 // allow recursion 77 state->r.reset(true); 78 } 79 } pushrxcpp::schedulers::detail::action_queue80 static void push(item_type item) { 81 auto& state = current_thread_queue(); 82 if (!state) { 83 std::terminate(); 84 } 85 if (!item.what.is_subscribed()) { 86 return; 87 } 88 state->q.push(std::move(item)); 89 // disallow recursion 90 state->r.reset(false); 91 } ensurerxcpp::schedulers::detail::action_queue92 static std::shared_ptr<worker_interface> ensure(std::shared_ptr<worker_interface> w) { 93 if (!!current_thread_queue()) { 94 std::terminate(); 95 } 96 // create and publish new queue 97 current_thread_queue() = new current_thread_queue_type(); 98 current_thread_queue()->w = w; 99 return w; 100 } createrxcpp::schedulers::detail::action_queue101 static std::unique_ptr<current_thread_queue_type> create(std::shared_ptr<worker_interface> w) { 102 std::unique_ptr<current_thread_queue_type> result(new current_thread_queue_type()); 103 result->w = std::move(w); 104 return result; 105 } setrxcpp::schedulers::detail::action_queue106 static void set(current_thread_queue_type* q) { 107 if (!!current_thread_queue()) { 108 std::terminate(); 109 } 110 // publish new queue 111 current_thread_queue() = q; 112 } destroyrxcpp::schedulers::detail::action_queue113 static void destroy(current_thread_queue_type* q) { 114 delete q; 115 } destroyrxcpp::schedulers::detail::action_queue116 static void destroy() { 117 if (!current_thread_queue()) { 118 std::terminate(); 119 } 120 #if defined(RXCPP_THREAD_LOCAL) 121 destroy(current_thread_queue()); 122 #else 123 destroy(current_thread_queue().get()); 124 #endif 125 current_thread_queue() = nullptr; 126 } 127 }; 128 129 } 130 131 struct current_thread : public scheduler_interface 132 { 133 private: 134 typedef current_thread this_type; 135 current_thread(const this_type&); 136 137 typedef detail::action_queue queue_type; 138 139 struct derecurser : public worker_interface 140 { 141 private: 142 typedef current_thread this_type; 143 derecurser(const this_type&); 144 public: derecurserrxcpp::schedulers::current_thread::derecurser145 derecurser() 146 { 147 } ~derecurserrxcpp::schedulers::current_thread::derecurser148 virtual ~derecurser() 149 { 150 } 151 nowrxcpp::schedulers::current_thread::derecurser152 virtual clock_type::time_point now() const { 153 return clock_type::now(); 154 } 155 schedulerxcpp::schedulers::current_thread::derecurser156 virtual void schedule(const schedulable& scbl) const { 157 queue_type::push(queue_type::item_type(now(), scbl)); 158 } 159 schedulerxcpp::schedulers::current_thread::derecurser160 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { 161 queue_type::push(queue_type::item_type(when, scbl)); 162 } 163 }; 164 165 struct current_worker : public worker_interface 166 { 167 private: 168 typedef current_thread this_type; 169 current_worker(const this_type&); 170 public: current_workerrxcpp::schedulers::current_thread::current_worker171 current_worker() 172 { 173 } ~current_workerrxcpp::schedulers::current_thread::current_worker174 virtual ~current_worker() 175 { 176 } 177 nowrxcpp::schedulers::current_thread::current_worker178 virtual clock_type::time_point now() const { 179 return clock_type::now(); 180 } 181 schedulerxcpp::schedulers::current_thread::current_worker182 virtual void schedule(const schedulable& scbl) const { 183 schedule(now(), scbl); 184 } 185 schedulerxcpp::schedulers::current_thread::current_worker186 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { 187 if (!scbl.is_subscribed()) { 188 return; 189 } 190 191 { 192 // check ownership 193 if (queue_type::owned()) { 194 // already has an owner - delegate 195 queue_type::get_worker_interface()->schedule(when, scbl); 196 return; 197 } 198 199 // take ownership 200 queue_type::ensure(std::make_shared<derecurser>()); 201 } 202 // release ownership 203 RXCPP_UNWIND_AUTO([]{ 204 queue_type::destroy(); 205 }); 206 207 const auto& recursor = queue_type::get_recursion().get_recurse(); 208 std::this_thread::sleep_until(when); 209 if (scbl.is_subscribed()) { 210 scbl(recursor); 211 } 212 if (queue_type::empty()) { 213 return; 214 } 215 216 // loop until queue is empty 217 for ( 218 auto next = queue_type::top().when; 219 (std::this_thread::sleep_until(next), true); 220 next = queue_type::top().when 221 ) { 222 auto what = queue_type::top().what; 223 224 queue_type::pop(); 225 226 if (what.is_subscribed()) { 227 what(recursor); 228 } 229 230 if (queue_type::empty()) { 231 break; 232 } 233 } 234 } 235 }; 236 237 std::shared_ptr<current_worker> wi; 238 239 public: current_threadrxcpp::schedulers::current_thread240 current_thread() 241 : wi(std::make_shared<current_worker>()) 242 { 243 } ~current_threadrxcpp::schedulers::current_thread244 virtual ~current_thread() 245 { 246 } 247 is_schedule_requiredrxcpp::schedulers::current_thread248 static bool is_schedule_required() { return !queue_type::owned(); } 249 is_tail_recursion_allowedrxcpp::schedulers::current_thread250 inline bool is_tail_recursion_allowed() const { 251 return queue_type::empty(); 252 } 253 nowrxcpp::schedulers::current_thread254 virtual clock_type::time_point now() const { 255 return clock_type::now(); 256 } 257 create_workerrxcpp::schedulers::current_thread258 virtual worker create_worker(composite_subscription cs) const { 259 return worker(std::move(cs), wi); 260 } 261 }; 262 make_current_thread()263inline const scheduler& make_current_thread() { 264 static scheduler instance = make_scheduler<current_thread>(); 265 return instance; 266 } 267 268 } 269 270 } 271 272 #endif 273