1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-switch_on_next.hpp
6
7 \brief Return observable that emits the items emitted by the observable most recently emitted by the source observable.
8
9 \tparam Coordination the type of the scheduler (optional).
10
11 \param cn the scheduler to synchronize sources from different contexts (optional).
12
13 \return Observable that emits the items emitted by the observable most recently emitted by the source observable.
14
15 \sample
16 \snippet switch_on_next.cpp switch_on_next sample
17 \snippet output.txt switch_on_next sample
18 */
19
20 #if !defined(RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP)
21 #define RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP
22
23 #include "../rx-includes.hpp"
24
25 namespace rxcpp {
26
27 namespace operators {
28
29 namespace detail {
30
31 template<class... AN>
32 struct switch_on_next_invalid_arguments {};
33
34 template<class... AN>
35 struct switch_on_next_invalid : public rxo::operator_base<switch_on_next_invalid_arguments<AN...>> {
36 using type = observable<switch_on_next_invalid_arguments<AN...>, switch_on_next_invalid<AN...>>;
37 };
38 template<class... AN>
39 using switch_on_next_invalid_t = typename switch_on_next_invalid<AN...>::type;
40
41 template<class T, class Observable, class Coordination>
42 struct switch_on_next
43 : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
44 {
45 //static_assert(is_observable<Observable>::value, "switch_on_next requires an observable");
46 //static_assert(is_observable<T>::value, "switch_on_next requires an observable that contains observables");
47
48 typedef switch_on_next<T, Observable, Coordination> this_type;
49
50 typedef rxu::decay_t<T> source_value_type;
51 typedef rxu::decay_t<Observable> source_type;
52
53 typedef typename source_type::source_operator_type source_operator_type;
54
55 typedef source_value_type collection_type;
56 typedef typename collection_type::value_type collection_value_type;
57
58 typedef rxu::decay_t<Coordination> coordination_type;
59 typedef typename coordination_type::coordinator_type coordinator_type;
60
61 struct values
62 {
valuesrxcpp::operators::detail::switch_on_next::values63 values(source_operator_type o, coordination_type sf)
64 : source_operator(std::move(o))
65 , coordination(std::move(sf))
66 {
67 }
68 source_operator_type source_operator;
69 coordination_type coordination;
70 };
71 values initial;
72
switch_on_nextrxcpp::operators::detail::switch_on_next73 switch_on_next(const source_type& o, coordination_type sf)
74 : initial(o.source_operator, std::move(sf))
75 {
76 }
77
78 template<class Subscriber>
on_subscriberxcpp::operators::detail::switch_on_next79 void on_subscribe(Subscriber scbr) const {
80 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
81
82 typedef Subscriber output_type;
83
84 struct switch_state_type
85 : public std::enable_shared_from_this<switch_state_type>
86 , public values
87 {
88 switch_state_type(values i, coordinator_type coor, output_type oarg)
89 : values(i)
90 , source(i.source_operator)
91 , pendingCompletions(0)
92 , coordinator(std::move(coor))
93 , out(std::move(oarg))
94 {
95 }
96 observable<source_value_type, source_operator_type> source;
97 // on_completed on the output must wait until all the
98 // subscriptions have received on_completed
99 int pendingCompletions;
100 coordinator_type coordinator;
101 composite_subscription inner_lifetime;
102 output_type out;
103 };
104
105 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
106
107 // take a copy of the values for each subscription
108 auto state = std::make_shared<switch_state_type>(initial, std::move(coordinator), std::move(scbr));
109
110 composite_subscription outercs;
111
112 // when the out observer is unsubscribed all the
113 // inner subscriptions are unsubscribed as well
114 state->out.add(outercs);
115
116 auto source = on_exception(
117 [&](){return state->coordinator.in(state->source);},
118 state->out);
119 if (source.empty()) {
120 return;
121 }
122
123 ++state->pendingCompletions;
124 // this subscribe does not share the observer subscription
125 // so that when it is unsubscribed the observer can be called
126 // until the inner subscriptions have finished
127 auto sink = make_subscriber<collection_type>(
128 state->out,
129 outercs,
130 // on_next
131 [state](collection_type st) {
132
133 state->inner_lifetime.unsubscribe();
134
135 state->inner_lifetime = composite_subscription();
136
137 // when the out observer is unsubscribed all the
138 // inner subscriptions are unsubscribed as well
139 auto innerlifetimetoken = state->out.add(state->inner_lifetime);
140
141 state->inner_lifetime.add(make_subscription([state, innerlifetimetoken](){
142 state->out.remove(innerlifetimetoken);
143 --state->pendingCompletions;
144 }));
145
146 auto selectedSource = state->coordinator.in(st);
147
148 // this subscribe does not share the source subscription
149 // so that when it is unsubscribed the source will continue
150 auto sinkInner = make_subscriber<collection_value_type>(
151 state->out,
152 state->inner_lifetime,
153 // on_next
154 [state, st](collection_value_type ct) {
155 state->out.on_next(std::move(ct));
156 },
157 // on_error
158 [state](rxu::error_ptr e) {
159 state->out.on_error(e);
160 },
161 //on_completed
162 [state](){
163 if (state->pendingCompletions == 1) {
164 state->out.on_completed();
165 }
166 }
167 );
168
169 auto selectedSinkInner = state->coordinator.out(sinkInner);
170 ++state->pendingCompletions;
171 selectedSource.subscribe(std::move(selectedSinkInner));
172 },
173 // on_error
174 [state](rxu::error_ptr e) {
175 state->out.on_error(e);
176 },
177 // on_completed
178 [state]() {
179 if (--state->pendingCompletions == 0) {
180 state->out.on_completed();
181 }
182 }
183 );
184
185 auto selectedSink = on_exception(
186 [&](){return state->coordinator.out(sink);},
187 state->out);
188 if (selectedSink.empty()) {
189 return;
190 }
191
192 source->subscribe(std::move(selectedSink.get()));
193
194 }
195 };
196
197 }
198
199 /*! @copydoc rx-switch_on_next.hpp
200 */
201 template<class... AN>
switch_on_next(AN &&...an)202 auto switch_on_next(AN&&... an)
203 -> operator_factory<switch_on_next_tag, AN...> {
204 return operator_factory<switch_on_next_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
205 }
206
207 }
208
209 template<>
210 struct member_overload<switch_on_next_tag>
211 {
212 template<class Observable,
213 class Enabled = rxu::enable_if_all_true_type_t<
214 is_observable<Observable>>,
215 class SourceValue = rxu::value_type_t<Observable>,
216 class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
217 class Value = rxu::value_type_t<SourceValue>,
218 class Result = observable<Value, SwitchOnNext>
219 >
memberrxcpp::member_overload220 static Result member(Observable&& o) {
221 return Result(SwitchOnNext(std::forward<Observable>(o), identity_current_thread()));
222 }
223
224 template<class Observable, class Coordination,
225 class Enabled = rxu::enable_if_all_true_type_t<
226 is_observable<Observable>,
227 is_coordination<Coordination>>,
228 class SourceValue = rxu::value_type_t<Observable>,
229 class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
230 class Value = rxu::value_type_t<SourceValue>,
231 class Result = observable<Value, SwitchOnNext>
232 >
memberrxcpp::member_overload233 static Result member(Observable&& o, Coordination&& cn) {
234 return Result(SwitchOnNext(std::forward<Observable>(o), std::forward<Coordination>(cn)));
235 }
236
237 template<class... AN>
memberrxcpp::member_overload238 static operators::detail::switch_on_next_invalid_t<AN...> member(AN...) {
239 std::terminate();
240 return {};
241 static_assert(sizeof...(AN) == 10000, "switch_on_next takes (optional Coordination)");
242 }
243 };
244
245 }
246
247 #endif
248