1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_COORDINATION_HPP)
6 #define RXCPP_RX_COORDINATION_HPP
7
8 #include "rx-includes.hpp"
9
10 namespace rxcpp {
11
12 struct tag_coordinator {};
13 struct coordinator_base {typedef tag_coordinator coordinator_tag;};
14
15 template<class T, class C = rxu::types_checked>
16 struct is_coordinator : public std::false_type {};
17
18 template<class T>
19 struct is_coordinator<T, typename rxu::types_checked_from<typename T::coordinator_tag>::type>
20 : public std::is_convertible<typename T::coordinator_tag*, tag_coordinator*> {};
21
22 struct tag_coordination {};
23 struct coordination_base {typedef tag_coordination coordination_tag;};
24
25 namespace detail {
26
27 template<class T, class C = rxu::types_checked>
28 struct is_coordination : public std::false_type {};
29
30 template<class T>
31 struct is_coordination<T, typename rxu::types_checked_from<typename T::coordination_tag>::type>
32 : public std::is_convertible<typename T::coordination_tag*, tag_coordination*> {};
33
34 }
35
36 template<class T, class Decayed = rxu::decay_t<T>>
37 struct is_coordination : detail::is_coordination<Decayed>
38 {
39 };
40
41 template<class Coordination, class DecayedCoordination = rxu::decay_t<Coordination>>
42 using coordination_tag_t = typename DecayedCoordination::coordination_tag;
43
44 template<class Input>
45 class coordinator : public coordinator_base
46 {
47 public:
48 typedef Input input_type;
49
50 private:
51 struct not_supported {typedef not_supported type;};
52
53 template<class Observable>
54 struct get_observable
55 {
56 typedef decltype((*(input_type*)nullptr).in((*(Observable*)nullptr))) type;
57 };
58
59 template<class Subscriber>
60 struct get_subscriber
61 {
62 typedef decltype((*(input_type*)nullptr).out((*(Subscriber*)nullptr))) type;
63 };
64
65 template<class F>
66 struct get_action_function
67 {
68 typedef decltype((*(input_type*)nullptr).act((*(F*)nullptr))) type;
69 };
70
71 public:
72 input_type input;
73
74 template<class T>
75 struct get
76 {
77 typedef typename std::conditional<
78 rxsc::detail::is_action_function<T>::value, get_action_function<T>, typename std::conditional<
79 is_observable<T>::value, get_observable<T>, typename std::conditional<
80 is_subscriber<T>::value, get_subscriber<T>, not_supported>::type>::type>::type::type type;
81 };
82
coordinator(Input i)83 coordinator(Input i) : input(i) {}
84
get_worker() const85 rxsc::worker get_worker() const {
86 return input.get_worker();
87 }
get_scheduler() const88 rxsc::scheduler get_scheduler() const {
89 return input.get_scheduler();
90 }
91
92 template<class Observable>
in(Observable o) const93 auto in(Observable o) const
94 -> typename get_observable<Observable>::type {
95 return input.in(std::move(o));
96 static_assert(is_observable<Observable>::value, "can only synchronize observables");
97 }
98
99 template<class Subscriber>
out(Subscriber s) const100 auto out(Subscriber s) const
101 -> typename get_subscriber<Subscriber>::type {
102 return input.out(std::move(s));
103 static_assert(is_subscriber<Subscriber>::value, "can only synchronize subscribers");
104 }
105
106 template<class F>
act(F f) const107 auto act(F f) const
108 -> typename get_action_function<F>::type {
109 return input.act(std::move(f));
110 static_assert(rxsc::detail::is_action_function<F>::value, "can only synchronize action functions");
111 }
112 };
113
114 class identity_one_worker : public coordination_base
115 {
116 rxsc::scheduler factory;
117
118 class input_type
119 {
120 rxsc::worker controller;
121 rxsc::scheduler factory;
122 public:
input_type(rxsc::worker w)123 explicit input_type(rxsc::worker w)
124 : controller(w)
125 , factory(rxsc::make_same_worker(w))
126 {
127 }
get_worker() const128 inline rxsc::worker get_worker() const {
129 return controller;
130 }
get_scheduler() const131 inline rxsc::scheduler get_scheduler() const {
132 return factory;
133 }
now() const134 inline rxsc::scheduler::clock_type::time_point now() const {
135 return factory.now();
136 }
137 template<class Observable>
in(Observable o) const138 auto in(Observable o) const
139 -> Observable {
140 return o;
141 }
142 template<class Subscriber>
out(Subscriber s) const143 auto out(Subscriber s) const
144 -> Subscriber {
145 return s;
146 }
147 template<class F>
act(F f) const148 auto act(F f) const
149 -> F {
150 return f;
151 }
152 };
153
154 public:
155
identity_one_worker(rxsc::scheduler sc)156 explicit identity_one_worker(rxsc::scheduler sc) : factory(sc) {}
157
158 typedef coordinator<input_type> coordinator_type;
159
now() const160 inline rxsc::scheduler::clock_type::time_point now() const {
161 return factory.now();
162 }
163
create_coordinator(composite_subscription cs=composite_subscription ()) const164 inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
165 auto w = factory.create_worker(std::move(cs));
166 return coordinator_type(input_type(std::move(w)));
167 }
168 };
169
identity_immediate()170 inline identity_one_worker identity_immediate() {
171 static identity_one_worker r(rxsc::make_immediate());
172 return r;
173 }
174
identity_current_thread()175 inline identity_one_worker identity_current_thread() {
176 static identity_one_worker r(rxsc::make_current_thread());
177 return r;
178 }
179
identity_same_worker(rxsc::worker w)180 inline identity_one_worker identity_same_worker(rxsc::worker w) {
181 return identity_one_worker(rxsc::make_same_worker(w));
182 }
183
184 class serialize_one_worker : public coordination_base
185 {
186 rxsc::scheduler factory;
187
188 template<class F>
189 struct serialize_action
190 {
191 F dest;
192 std::shared_ptr<std::mutex> lock;
serialize_actionrxcpp::serialize_one_worker::serialize_action193 serialize_action(F d, std::shared_ptr<std::mutex> m)
194 : dest(std::move(d))
195 , lock(std::move(m))
196 {
197 if (!lock) {
198 std::terminate();
199 }
200 }
operator ()rxcpp::serialize_one_worker::serialize_action201 auto operator()(const rxsc::schedulable& scbl) const
202 -> decltype(dest(scbl)) {
203 std::unique_lock<std::mutex> guard(*lock);
204 return dest(scbl);
205 }
206 };
207
208 template<class Observer>
209 struct serialize_observer
210 {
211 typedef serialize_observer<Observer> this_type;
212 typedef rxu::decay_t<Observer> dest_type;
213 typedef typename dest_type::value_type value_type;
214 typedef observer<value_type, this_type> observer_type;
215 dest_type dest;
216 std::shared_ptr<std::mutex> lock;
217
serialize_observerrxcpp::serialize_one_worker::serialize_observer218 serialize_observer(dest_type d, std::shared_ptr<std::mutex> m)
219 : dest(std::move(d))
220 , lock(std::move(m))
221 {
222 if (!lock) {
223 std::terminate();
224 }
225 }
on_nextrxcpp::serialize_one_worker::serialize_observer226 void on_next(value_type v) const {
227 std::unique_lock<std::mutex> guard(*lock);
228 dest.on_next(v);
229 }
on_errorrxcpp::serialize_one_worker::serialize_observer230 void on_error(rxu::error_ptr e) const {
231 std::unique_lock<std::mutex> guard(*lock);
232 dest.on_error(e);
233 }
on_completedrxcpp::serialize_one_worker::serialize_observer234 void on_completed() const {
235 std::unique_lock<std::mutex> guard(*lock);
236 dest.on_completed();
237 }
238
239 template<class Subscriber>
makerxcpp::serialize_one_worker::serialize_observer240 static subscriber<value_type, observer_type> make(const Subscriber& s, std::shared_ptr<std::mutex> m) {
241 return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m))));
242 }
243 };
244
245 class input_type
246 {
247 rxsc::worker controller;
248 rxsc::scheduler factory;
249 std::shared_ptr<std::mutex> lock;
250 public:
input_type(rxsc::worker w,std::shared_ptr<std::mutex> m)251 explicit input_type(rxsc::worker w, std::shared_ptr<std::mutex> m)
252 : controller(w)
253 , factory(rxsc::make_same_worker(w))
254 , lock(std::move(m))
255 {
256 }
get_worker() const257 inline rxsc::worker get_worker() const {
258 return controller;
259 }
get_scheduler() const260 inline rxsc::scheduler get_scheduler() const {
261 return factory;
262 }
now() const263 inline rxsc::scheduler::clock_type::time_point now() const {
264 return factory.now();
265 }
266 template<class Observable>
in(Observable o) const267 auto in(Observable o) const
268 -> Observable {
269 return o;
270 }
271 template<class Subscriber>
out(const Subscriber & s) const272 auto out(const Subscriber& s) const
273 -> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) {
274 return serialize_observer<decltype(s.get_observer())>::make(s, lock);
275 }
276 template<class F>
act(F f) const277 auto act(F f) const
278 -> serialize_action<F> {
279 return serialize_action<F>(std::move(f), lock);
280 }
281 };
282
283 public:
284
serialize_one_worker(rxsc::scheduler sc)285 explicit serialize_one_worker(rxsc::scheduler sc) : factory(sc) {}
286
287 typedef coordinator<input_type> coordinator_type;
288
now() const289 inline rxsc::scheduler::clock_type::time_point now() const {
290 return factory.now();
291 }
292
create_coordinator(composite_subscription cs=composite_subscription ()) const293 inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
294 auto w = factory.create_worker(std::move(cs));
295 std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
296 return coordinator_type(input_type(std::move(w), std::move(lock)));
297 }
298 };
299
serialize_event_loop()300 inline serialize_one_worker serialize_event_loop() {
301 static serialize_one_worker r(rxsc::make_event_loop());
302 return r;
303 }
304
serialize_new_thread()305 inline serialize_one_worker serialize_new_thread() {
306 static serialize_one_worker r(rxsc::make_new_thread());
307 return r;
308 }
309
serialize_same_worker(rxsc::worker w)310 inline serialize_one_worker serialize_same_worker(rxsc::worker w) {
311 return serialize_one_worker(rxsc::make_same_worker(w));
312 }
313
314 }
315
316 #endif
317