• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 typedef std::function<std::thread(std::function<void()>)> thread_factory;
15 
16 struct new_thread : public scheduler_interface
17 {
18 private:
19     typedef new_thread this_type;
20     new_thread(const this_type&);
21 
22     struct new_worker : public worker_interface
23     {
24     private:
25         typedef new_worker this_type;
26 
27         typedef detail::action_queue queue_type;
28 
29         new_worker(const this_type&);
30 
31         struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
32         {
33             typedef detail::schedulable_queue<
34                 typename clock_type::time_point> queue_item_time;
35 
36             typedef queue_item_time::item_type item_type;
37 
~new_worker_staterxcpp::schedulers::new_thread::new_worker::new_worker_state38             virtual ~new_worker_state()
39             {
40             }
41 
new_worker_staterxcpp::schedulers::new_thread::new_worker::new_worker_state42             explicit new_worker_state(composite_subscription cs)
43                 : lifetime(cs)
44             {
45             }
46 
47             composite_subscription lifetime;
48             mutable std::mutex lock;
49             mutable std::condition_variable wake;
50             mutable queue_item_time q;
51             std::thread worker;
52             recursion r;
53         };
54 
55         std::shared_ptr<new_worker_state> state;
56 
57     public:
~new_workerrxcpp::schedulers::new_thread::new_worker58         virtual ~new_worker()
59         {
60         }
61 
new_workerrxcpp::schedulers::new_thread::new_worker62         explicit new_worker(std::shared_ptr<new_worker_state> ws)
63             : state(ws)
64         {
65         }
66 
new_workerrxcpp::schedulers::new_thread::new_worker67         new_worker(composite_subscription cs, thread_factory& tf)
68             : state(std::make_shared<new_worker_state>(cs))
69         {
70             auto keepAlive = state;
71 
72             state->lifetime.add([keepAlive](){
73                 std::unique_lock<std::mutex> guard(keepAlive->lock);
74                 auto expired = std::move(keepAlive->q);
75                 keepAlive->q = new_worker_state::queue_item_time{};
76                 if (!keepAlive->q.empty()) std::terminate();
77                 keepAlive->wake.notify_one();
78 
79                 if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
80                     guard.unlock();
81                     keepAlive->worker.join();
82                 }
83                 else {
84                     keepAlive->worker.detach();
85                 }
86             });
87 
88             state->worker = tf([keepAlive](){
89 
90                 // take ownership
91                 queue_type::ensure(std::make_shared<new_worker>(keepAlive));
92                 // release ownership
93                 RXCPP_UNWIND_AUTO([]{
94                     queue_type::destroy();
95                 });
96 
97                 for(;;) {
98                     std::unique_lock<std::mutex> guard(keepAlive->lock);
99                     if (keepAlive->q.empty()) {
100                         keepAlive->wake.wait(guard, [keepAlive](){
101                             return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
102                         });
103                     }
104                     if (!keepAlive->lifetime.is_subscribed()) {
105                         break;
106                     }
107                     auto& peek = keepAlive->q.top();
108                     if (!peek.what.is_subscribed()) {
109                         keepAlive->q.pop();
110                         continue;
111                     }
112                     if (clock_type::now() < peek.when) {
113                         keepAlive->wake.wait_until(guard, peek.when);
114                         continue;
115                     }
116                     auto what = peek.what;
117                     keepAlive->q.pop();
118                     keepAlive->r.reset(keepAlive->q.empty());
119                     guard.unlock();
120                     what(keepAlive->r.get_recurse());
121                 }
122             });
123         }
124 
nowrxcpp::schedulers::new_thread::new_worker125         virtual clock_type::time_point now() const {
126             return clock_type::now();
127         }
128 
schedulerxcpp::schedulers::new_thread::new_worker129         virtual void schedule(const schedulable& scbl) const {
130             schedule(now(), scbl);
131         }
132 
schedulerxcpp::schedulers::new_thread::new_worker133         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
134             if (scbl.is_subscribed()) {
135                 std::unique_lock<std::mutex> guard(state->lock);
136                 state->q.push(new_worker_state::item_type(when, scbl));
137                 state->r.reset(false);
138             }
139             state->wake.notify_one();
140         }
141     };
142 
143     mutable thread_factory factory;
144 
145 public:
new_threadrxcpp::schedulers::new_thread146     new_thread()
147         : factory([](std::function<void()> start){
148             return std::thread(std::move(start));
149         })
150     {
151     }
new_threadrxcpp::schedulers::new_thread152     explicit new_thread(thread_factory tf)
153         : factory(tf)
154     {
155     }
~new_threadrxcpp::schedulers::new_thread156     virtual ~new_thread()
157     {
158     }
159 
nowrxcpp::schedulers::new_thread160     virtual clock_type::time_point now() const {
161         return clock_type::now();
162     }
163 
create_workerrxcpp::schedulers::new_thread164     virtual worker create_worker(composite_subscription cs) const {
165         return worker(cs, std::make_shared<new_worker>(cs, factory));
166     }
167 };
168 
make_new_thread()169 inline scheduler make_new_thread() {
170     static scheduler instance = make_scheduler<new_thread>();
171     return instance;
172 }
make_new_thread(thread_factory tf)173 inline scheduler make_new_thread(thread_factory tf) {
174     return make_scheduler<new_thread>(tf);
175 }
176 
177 }
178 
179 }
180 
181 #endif
182