1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-observe_on.hpp
6
7 \brief All values are queued and delivered using the scheduler from the supplied coordination.
8
9 \tparam Coordination the type of the scheduler.
10
11 \param cn the scheduler to notify observers on.
12
13 \return The source observable modified so that its observers are notified on the specified scheduler.
14
15 \sample
16 \snippet observe_on.cpp observe_on sample
17 \snippet output.txt observe_on sample
18
19 Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results:
20 \snippet output.txt subscribe_on sample
21 */
22
23 #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP)
24 #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP
25
26 #include "../rx-includes.hpp"
27
28 namespace rxcpp {
29
30 namespace operators {
31
32 namespace detail {
33
34 template<class... AN>
35 struct observe_on_invalid_arguments {};
36
37 template<class... AN>
38 struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> {
39 using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>;
40 };
41 template<class... AN>
42 using observe_on_invalid_t = typename observe_on_invalid<AN...>::type;
43
44 template<class T, class Coordination>
45 struct observe_on
46 {
47 typedef rxu::decay_t<T> source_value_type;
48
49 typedef rxu::decay_t<Coordination> coordination_type;
50 typedef typename coordination_type::coordinator_type coordinator_type;
51
52 coordination_type coordination;
53
observe_onrxcpp::operators::detail::observe_on54 observe_on(coordination_type cn)
55 : coordination(std::move(cn))
56 {
57 }
58
59 template<class Subscriber>
60 struct observe_on_observer
61 {
62 typedef observe_on_observer<Subscriber> this_type;
63 typedef source_value_type value_type;
64 typedef rxu::decay_t<Subscriber> dest_type;
65 typedef observer<value_type, this_type> observer_type;
66
67 typedef rxn::notification<T> notification_type;
68 typedef typename notification_type::type base_notification_type;
69 typedef std::deque<base_notification_type> queue_type;
70
71 struct mode
72 {
73 enum type {
74 Invalid = 0,
75 Processing,
76 Empty,
77 Disposed,
78 Errored
79 };
80 };
81 struct observe_on_state : std::enable_shared_from_this<observe_on_state>
82 {
83 mutable std::mutex lock;
84 mutable queue_type fill_queue;
85 mutable queue_type drain_queue;
86 composite_subscription lifetime;
87 mutable typename mode::type current;
88 coordinator_type coordinator;
89 dest_type destination;
90
observe_on_staterxcpp::operators::detail::observe_on::observe_on_observer::observe_on_state91 observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs)
92 : lifetime(std::move(cs))
93 , current(mode::Empty)
94 , coordinator(std::move(coor))
95 , destination(std::move(d))
96 {
97 }
98
finishrxcpp::operators::detail::observe_on::observe_on_observer::observe_on_state99 void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const {
100 if (!guard.owns_lock()) {
101 std::terminate();
102 }
103 if (current == mode::Errored || current == mode::Disposed) {return;}
104 current = end;
105 queue_type fill_expired;
106 swap(fill_expired, fill_queue);
107 queue_type drain_expired;
108 swap(drain_expired, drain_queue);
109 RXCPP_UNWIND_AUTO([&](){guard.lock();});
110 guard.unlock();
111 lifetime.unsubscribe();
112 destination.unsubscribe();
113 }
114
ensure_processingrxcpp::operators::detail::observe_on::observe_on_observer::observe_on_state115 void ensure_processing(std::unique_lock<std::mutex>& guard) const {
116 if (!guard.owns_lock()) {
117 std::terminate();
118 }
119 if (current == mode::Empty) {
120 current = mode::Processing;
121
122 if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) {
123 finish(guard, mode::Disposed);
124 }
125
126 auto keepAlive = this->shared_from_this();
127
128 auto drain = [keepAlive, this](const rxsc::schedulable& self){
129 using std::swap;
130 RXCPP_TRY {
131 for (;;) {
132 if (drain_queue.empty() || !destination.is_subscribed()) {
133 std::unique_lock<std::mutex> guard(lock);
134 if (!destination.is_subscribed() ||
135 (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
136 finish(guard, mode::Disposed);
137 return;
138 }
139 if (drain_queue.empty()) {
140 if (fill_queue.empty()) {
141 current = mode::Empty;
142 return;
143 }
144 swap(fill_queue, drain_queue);
145 }
146 }
147 auto notification = std::move(drain_queue.front());
148 drain_queue.pop_front();
149 notification->accept(destination);
150 std::unique_lock<std::mutex> guard(lock);
151 self();
152 if (lifetime.is_subscribed()) break;
153 }
154 }
155 RXCPP_CATCH(...) {
156 destination.on_error(rxu::current_exception());
157 std::unique_lock<std::mutex> guard(lock);
158 finish(guard, mode::Errored);
159 }
160 };
161
162 auto selectedDrain = on_exception(
163 [&](){return coordinator.act(drain);},
164 destination);
165 if (selectedDrain.empty()) {
166 finish(guard, mode::Errored);
167 return;
168 }
169
170 auto processor = coordinator.get_worker();
171
172 RXCPP_UNWIND_AUTO([&](){guard.lock();});
173 guard.unlock();
174
175 processor.schedule(selectedDrain.get());
176 }
177 }
178 };
179 std::shared_ptr<observe_on_state> state;
180
observe_on_observerrxcpp::operators::detail::observe_on::observe_on_observer181 observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs)
182 : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs)))
183 {
184 }
185
on_nextrxcpp::operators::detail::observe_on::observe_on_observer186 void on_next(source_value_type v) const {
187 std::unique_lock<std::mutex> guard(state->lock);
188 if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
189 state->fill_queue.push_back(notification_type::on_next(std::move(v)));
190 state->ensure_processing(guard);
191 }
on_errorrxcpp::operators::detail::observe_on::observe_on_observer192 void on_error(rxu::error_ptr e) const {
193 std::unique_lock<std::mutex> guard(state->lock);
194 if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
195 state->fill_queue.push_back(notification_type::on_error(e));
196 state->ensure_processing(guard);
197 }
on_completedrxcpp::operators::detail::observe_on::observe_on_observer198 void on_completed() const {
199 std::unique_lock<std::mutex> guard(state->lock);
200 if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
201 state->fill_queue.push_back(notification_type::on_completed());
202 state->ensure_processing(guard);
203 }
204
makerxcpp::operators::detail::observe_on::observe_on_observer205 static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) {
206 auto coor = cn.create_coordinator(d.get_subscription());
207 d.add(cs);
208
209 this_type o(d, std::move(coor), cs);
210 auto keepAlive = o.state;
211 cs.add([=](){
212 std::unique_lock<std::mutex> guard(keepAlive->lock);
213 keepAlive->ensure_processing(guard);
214 });
215
216 return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o)));
217 }
218 };
219
220 template<class Subscriber>
operator ()rxcpp::operators::detail::observe_on221 auto operator()(Subscriber dest) const
222 -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) {
223 return observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination);
224 }
225 };
226
227 }
228
229 /*! @copydoc rx-observe_on.hpp
230 */
231 template<class... AN>
observe_on(AN &&...an)232 auto observe_on(AN&&... an)
233 -> operator_factory<observe_on_tag, AN...> {
234 return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
235 }
236
237 }
238
239 template<>
240 struct member_overload<observe_on_tag>
241 {
242 template<class Observable, class Coordination,
243 class Enabled = rxu::enable_if_all_true_type_t<
244 is_observable<Observable>,
245 is_coordination<Coordination>>,
246 class SourceValue = rxu::value_type_t<Observable>,
247 class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload248 static auto member(Observable&& o, Coordination&& cn)
249 -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) {
250 return o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)));
251 }
252
253 template<class... AN>
memberrxcpp::member_overload254 static operators::detail::observe_on_invalid_t<AN...> member(AN...) {
255 std::terminate();
256 return {};
257 static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)");
258 }
259 };
260
261 class observe_on_one_worker : public coordination_base
262 {
263 rxsc::scheduler factory;
264
265 class input_type
266 {
267 rxsc::worker controller;
268 rxsc::scheduler factory;
269 identity_one_worker coordination;
270 public:
input_type(rxsc::worker w)271 explicit input_type(rxsc::worker w)
272 : controller(w)
273 , factory(rxsc::make_same_worker(w))
274 , coordination(factory)
275 {
276 }
get_worker() const277 inline rxsc::worker get_worker() const {
278 return controller;
279 }
get_scheduler() const280 inline rxsc::scheduler get_scheduler() const {
281 return factory;
282 }
now() const283 inline rxsc::scheduler::clock_type::time_point now() const {
284 return factory.now();
285 }
286 template<class Observable>
in(Observable o) const287 auto in(Observable o) const
288 -> decltype(o.observe_on(coordination)) {
289 return o.observe_on(coordination);
290 }
291 template<class Subscriber>
out(Subscriber s) const292 auto out(Subscriber s) const
293 -> Subscriber {
294 return s;
295 }
296 template<class F>
act(F f) const297 auto act(F f) const
298 -> F {
299 return f;
300 }
301 };
302
303 public:
304
observe_on_one_worker(rxsc::scheduler sc)305 explicit observe_on_one_worker(rxsc::scheduler sc) : factory(sc) {}
306
307 typedef coordinator<input_type> coordinator_type;
308
now() const309 inline rxsc::scheduler::clock_type::time_point now() const {
310 return factory.now();
311 }
312
create_coordinator(composite_subscription cs=composite_subscription ()) const313 inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
314 auto w = factory.create_worker(std::move(cs));
315 return coordinator_type(input_type(std::move(w)));
316 }
317 };
318
observe_on_run_loop(const rxsc::run_loop & rl)319 inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) {
320 return observe_on_one_worker(rxsc::make_run_loop(rl));
321 }
322
observe_on_event_loop()323 inline observe_on_one_worker observe_on_event_loop() {
324 static observe_on_one_worker r(rxsc::make_event_loop());
325 return r;
326 }
327
observe_on_new_thread()328 inline observe_on_one_worker observe_on_new_thread() {
329 static observe_on_one_worker r(rxsc::make_new_thread());
330 return r;
331 }
332
333 }
334
335 #endif
336