1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_SCHEDULER_RUN_LOOP_HPP) 6 #define RXCPP_RX_SCHEDULER_RUN_LOOP_HPP 7 8 #include "../rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace schedulers { 13 14 namespace detail { 15 16 struct run_loop_state : public std::enable_shared_from_this<run_loop_state> 17 { 18 typedef scheduler::clock_type clock_type; 19 20 typedef detail::schedulable_queue< 21 clock_type::time_point> queue_item_time; 22 23 typedef queue_item_time::item_type item_type; 24 typedef queue_item_time::const_reference const_reference_item_type; 25 ~run_loop_staterxcpp::schedulers::detail::run_loop_state26 virtual ~run_loop_state() 27 { 28 } 29 run_loop_staterxcpp::schedulers::detail::run_loop_state30 run_loop_state() 31 { 32 } 33 34 composite_subscription lifetime; 35 mutable std::mutex lock; 36 mutable queue_item_time q; 37 recursion r; 38 std::function<void(clock_type::time_point)> notify_earlier_wakeup; 39 }; 40 41 } 42 43 44 struct run_loop_scheduler : public scheduler_interface 45 { 46 private: 47 typedef run_loop_scheduler this_type; 48 run_loop_scheduler(const this_type&); 49 50 struct run_loop_worker : public worker_interface 51 { 52 private: 53 typedef run_loop_worker this_type; 54 55 run_loop_worker(const this_type&); 56 57 public: 58 std::weak_ptr<detail::run_loop_state> state; 59 ~run_loop_workerrxcpp::schedulers::run_loop_scheduler::run_loop_worker60 virtual ~run_loop_worker() 61 { 62 } 63 run_loop_workerrxcpp::schedulers::run_loop_scheduler::run_loop_worker64 explicit run_loop_worker(std::weak_ptr<detail::run_loop_state> ws) 65 : state(ws) 66 { 67 } 68 nowrxcpp::schedulers::run_loop_scheduler::run_loop_worker69 virtual clock_type::time_point now() const { 70 return clock_type::now(); 71 } 72 schedulerxcpp::schedulers::run_loop_scheduler::run_loop_worker73 virtual void schedule(const schedulable& scbl) const { 74 schedule(now(), scbl); 75 } 76 schedulerxcpp::schedulers::run_loop_scheduler::run_loop_worker77 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { 78 if (scbl.is_subscribed()) { 79 auto st = state.lock(); 80 std::unique_lock<std::mutex> guard(st->lock); 81 const bool need_earlier_wakeup_notification = st->notify_earlier_wakeup && 82 (st->q.empty() || when < st->q.top().when); 83 st->q.push(detail::run_loop_state::item_type(when, scbl)); 84 st->r.reset(false); 85 if (need_earlier_wakeup_notification) st->notify_earlier_wakeup(when); 86 guard.unlock(); // So we can't get attempt to recursively lock the state 87 } 88 } 89 }; 90 91 std::weak_ptr<detail::run_loop_state> state; 92 93 public: run_loop_schedulerrxcpp::schedulers::run_loop_scheduler94 explicit run_loop_scheduler(std::weak_ptr<detail::run_loop_state> ws) 95 : state(ws) 96 { 97 } ~run_loop_schedulerrxcpp::schedulers::run_loop_scheduler98 virtual ~run_loop_scheduler() 99 { 100 } 101 nowrxcpp::schedulers::run_loop_scheduler102 virtual clock_type::time_point now() const { 103 return clock_type::now(); 104 } 105 create_workerrxcpp::schedulers::run_loop_scheduler106 virtual worker create_worker(composite_subscription cs) const { 107 auto lifetime = state.lock()->lifetime; 108 auto token = lifetime.add(cs); 109 cs.add([=](){lifetime.remove(token);}); 110 return worker(cs, create_worker_interface()); 111 } 112 create_worker_interfacerxcpp::schedulers::run_loop_scheduler113 std::shared_ptr<worker_interface> create_worker_interface() const { 114 return std::make_shared<run_loop_worker>(state); 115 } 116 }; 117 118 class run_loop 119 { 120 private: 121 typedef run_loop this_type; 122 // don't allow this instance to copy/move since it owns current_thread queue 123 // for the thread it is constructed on. 124 run_loop(const this_type&); 125 run_loop(this_type&&); 126 127 typedef detail::action_queue queue_type; 128 129 typedef detail::run_loop_state::item_type item_type; 130 typedef detail::run_loop_state::const_reference_item_type const_reference_item_type; 131 132 std::shared_ptr<detail::run_loop_state> state; 133 std::shared_ptr<run_loop_scheduler> sc; 134 135 public: 136 typedef scheduler::clock_type clock_type; run_loop()137 run_loop() 138 : state(std::make_shared<detail::run_loop_state>()) 139 , sc(std::make_shared<run_loop_scheduler>(state)) 140 { 141 // take ownership so that the current_thread scheduler 142 // uses the same queue on this thread 143 queue_type::ensure(sc->create_worker_interface()); 144 } ~run_loop()145 ~run_loop() 146 { 147 state->lifetime.unsubscribe(); 148 149 std::unique_lock<std::mutex> guard(state->lock); 150 151 // release ownership 152 queue_type::destroy(); 153 154 auto expired = std::move(state->q); 155 if (!state->q.empty()) std::terminate(); 156 } 157 now() const158 clock_type::time_point now() const { 159 return clock_type::now(); 160 } 161 get_subscription() const162 composite_subscription get_subscription() const { 163 return state->lifetime; 164 } 165 empty() const166 bool empty() const { 167 return state->q.empty(); 168 } 169 peek() const170 const_reference_item_type peek() const { 171 return state->q.top(); 172 } 173 dispatch() const174 void dispatch() const { 175 std::unique_lock<std::mutex> guard(state->lock); 176 if (state->q.empty()) { 177 return; 178 } 179 auto& peek = state->q.top(); 180 if (!peek.what.is_subscribed()) { 181 state->q.pop(); 182 return; 183 } 184 if (clock_type::now() < peek.when) { 185 return; 186 } 187 auto what = peek.what; 188 state->q.pop(); 189 state->r.reset(state->q.empty()); 190 guard.unlock(); 191 what(state->r.get_recurse()); 192 } 193 get_scheduler() const194 scheduler get_scheduler() const { 195 return make_scheduler(sc); 196 } 197 set_notify_earlier_wakeup(std::function<void (clock_type::time_point)> const & f)198 void set_notify_earlier_wakeup(std::function<void(clock_type::time_point)> const& f) { 199 std::unique_lock<std::mutex> guard(state->lock); 200 state->notify_earlier_wakeup = f; 201 } 202 }; 203 make_run_loop(const run_loop & r)204inline scheduler make_run_loop(const run_loop& r) { 205 return r.get_scheduler(); 206 } 207 208 } 209 210 } 211 212 #endif 213