1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-subscribe_on.hpp
6
7 \brief Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination.
8
9 \tparam Coordination the type of the scheduler.
10
11 \param cn the scheduler to perform subscription actions on.
12
13 \return The source observable modified so that its subscriptions happen on the specified scheduler.
14
15 \sample
16 \snippet subscribe_on.cpp subscribe_on sample
17 \snippet output.txt subscribe_on sample
18
19 Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results:
20 \snippet output.txt observe_on sample
21 */
22
23 #if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP)
24 #define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP
25
26 #include "../rx-includes.hpp"
27
28 namespace rxcpp {
29
30 namespace operators {
31
32 namespace detail {
33
34 template<class... AN>
35 struct subscribe_on_invalid_arguments {};
36
37 template<class... AN>
38 struct subscribe_on_invalid : public rxo::operator_base<subscribe_on_invalid_arguments<AN...>> {
39 using type = observable<subscribe_on_invalid_arguments<AN...>, subscribe_on_invalid<AN...>>;
40 };
41 template<class... AN>
42 using subscribe_on_invalid_t = typename subscribe_on_invalid<AN...>::type;
43
44 template<class T, class Observable, class Coordination>
45 struct subscribe_on : public operator_base<T>
46 {
47 typedef rxu::decay_t<Observable> source_type;
48 typedef rxu::decay_t<Coordination> coordination_type;
49 typedef typename coordination_type::coordinator_type coordinator_type;
50 struct subscribe_on_values
51 {
~subscribe_on_valuesrxcpp::operators::detail::subscribe_on::subscribe_on_values52 ~subscribe_on_values()
53 {
54 }
subscribe_on_valuesrxcpp::operators::detail::subscribe_on::subscribe_on_values55 subscribe_on_values(source_type s, coordination_type sf)
56 : source(std::move(s))
57 , coordination(std::move(sf))
58 {
59 }
60 source_type source;
61 coordination_type coordination;
62 private:
63 subscribe_on_values& operator=(subscribe_on_values o) RXCPP_DELETE;
64 };
65 const subscribe_on_values initial;
66
~subscribe_onrxcpp::operators::detail::subscribe_on67 ~subscribe_on()
68 {
69 }
subscribe_onrxcpp::operators::detail::subscribe_on70 subscribe_on(source_type s, coordination_type sf)
71 : initial(std::move(s), std::move(sf))
72 {
73 }
74
75 template<class Subscriber>
on_subscriberxcpp::operators::detail::subscribe_on76 void on_subscribe(Subscriber s) const {
77
78 typedef Subscriber output_type;
79 struct subscribe_on_state_type
80 : public std::enable_shared_from_this<subscribe_on_state_type>
81 , public subscribe_on_values
82 {
83 subscribe_on_state_type(const subscribe_on_values& i, const output_type& oarg)
84 : subscribe_on_values(i)
85 , out(oarg)
86 {
87 }
88 composite_subscription source_lifetime;
89 output_type out;
90 private:
91 subscribe_on_state_type& operator=(subscribe_on_state_type o) RXCPP_DELETE;
92 };
93
94 composite_subscription coordinator_lifetime;
95
96 auto coordinator = initial.coordination.create_coordinator(coordinator_lifetime);
97
98 auto controller = coordinator.get_worker();
99
100 // take a copy of the values for each subscription
101 auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(s));
102
103 auto sl = state->source_lifetime;
104 auto ol = state->out.get_subscription();
105
106 auto disposer = [=](const rxsc::schedulable&){
107 sl.unsubscribe();
108 ol.unsubscribe();
109 coordinator_lifetime.unsubscribe();
110 };
111 auto selectedDisposer = on_exception(
112 [&](){return coordinator.act(disposer);},
113 state->out);
114 if (selectedDisposer.empty()) {
115 return;
116 }
117
118 state->source_lifetime.add([=](){
119 controller.schedule(selectedDisposer.get());
120 });
121
122 state->out.add([=](){
123 sl.unsubscribe();
124 ol.unsubscribe();
125 coordinator_lifetime.unsubscribe();
126 });
127
128 auto producer = [=](const rxsc::schedulable&){
129 state->source.subscribe(state->source_lifetime, state->out);
130 };
131
132 auto selectedProducer = on_exception(
133 [&](){return coordinator.act(producer);},
134 state->out);
135 if (selectedProducer.empty()) {
136 return;
137 }
138
139 controller.schedule(selectedProducer.get());
140 }
141 private:
142 subscribe_on& operator=(subscribe_on o) RXCPP_DELETE;
143 };
144
145 }
146
147 /*! @copydoc rx-subscribe_on.hpp
148 */
149 template<class... AN>
subscribe_on(AN &&...an)150 auto subscribe_on(AN&&... an)
151 -> operator_factory<subscribe_on_tag, AN...> {
152 return operator_factory<subscribe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
153 }
154
155 }
156
157 template<>
158 struct member_overload<subscribe_on_tag>
159 {
160 template<class Observable, class Coordination,
161 class Enabled = rxu::enable_if_all_true_type_t<
162 is_observable<Observable>,
163 is_coordination<Coordination>>,
164 class SourceValue = rxu::value_type_t<Observable>,
165 class SubscribeOn = rxo::detail::subscribe_on<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
166 class Value = rxu::value_type_t<SubscribeOn>,
167 class Result = observable<Value, SubscribeOn>>
memberrxcpp::member_overload168 static Result member(Observable&& o, Coordination&& cn) {
169 return Result(SubscribeOn(std::forward<Observable>(o), std::forward<Coordination>(cn)));
170 }
171
172 template<class... AN>
memberrxcpp::member_overload173 static operators::detail::subscribe_on_invalid_t<AN...> member(AN...) {
174 std::terminate();
175 return {};
176 static_assert(sizeof...(AN) == 10000, "subscribe_on takes (Coordination)");
177 }
178 };
179
180 }
181
182 #endif
183