1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-merge_delay_error.hpp
6
7 \brief For each given observable subscribe.
8 For each item emitted from all of the given observables, deliver from the new observable that is returned.
9 The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission.
10
11 There are 2 variants of the operator:
12 - The source observable emits nested observables, nested observables are merged.
13 - The source observable and the arguments v0...vn are used to provide the observables to merge.
14
15 \tparam Coordination the type of the scheduler (optional).
16 \tparam Value0 ... (optional).
17 \tparam ValueN types of source observables (optional).
18
19 \param cn the scheduler to synchronize sources from different contexts (optional).
20 \param v0 ... (optional).
21 \param vn source observables (optional).
22
23 \return Observable that emits items that are the result of flattening the observables emitted by the source observable.
24
25 If scheduler is omitted, identity_current_thread is used.
26
27 \sample
28 \snippet merge_delay_error.cpp threaded implicit merge sample
29 \snippet output.txt threaded implicit merge sample
30
31 \sample
32 \snippet merge_delay_error.cpp implicit merge sample
33 \snippet output.txt implicit merge sample
34
35 \sample
36 \snippet merge_delay_error.cpp merge sample
37 \snippet output.txt merge sample
38
39 \sample
40 \snippet merge_delay_error.cpp threaded merge sample
41 \snippet output.txt threaded merge sample
42 */
43
44 #if !defined(RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP)
45 #define RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP
46
47 #include "rx-merge.hpp"
48
49 #include "../rx-composite_exception.hpp"
50
51 namespace rxcpp {
52
53 namespace operators {
54
55 namespace detail {
56
57 template<class T, class Observable, class Coordination>
58 struct merge_delay_error
59 : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
60 {
61 //static_assert(is_observable<Observable>::value, "merge requires an observable");
62 //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
63
64 typedef merge_delay_error<T, Observable, Coordination> this_type;
65
66 typedef rxu::decay_t<T> source_value_type;
67 typedef rxu::decay_t<Observable> source_type;
68
69 typedef typename source_type::source_operator_type source_operator_type;
70 typedef typename source_value_type::value_type value_type;
71
72 typedef rxu::decay_t<Coordination> coordination_type;
73 typedef typename coordination_type::coordinator_type coordinator_type;
74
75 struct values
76 {
valuesrxcpp::operators::detail::merge_delay_error::values77 values(source_operator_type o, coordination_type sf)
78 : source_operator(std::move(o))
79 , coordination(std::move(sf))
80 {
81 }
82 source_operator_type source_operator;
83 coordination_type coordination;
84 };
85 values initial;
86
merge_delay_errorrxcpp::operators::detail::merge_delay_error87 merge_delay_error(const source_type& o, coordination_type sf)
88 : initial(o.source_operator, std::move(sf))
89 {
90 }
91
92 template<class Subscriber>
on_subscriberxcpp::operators::detail::merge_delay_error93 void on_subscribe(Subscriber scbr) const {
94 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
95
96 typedef Subscriber output_type;
97
98 struct merge_state_type
99 : public std::enable_shared_from_this<merge_state_type>
100 , public values
101 {
102 merge_state_type(values i, coordinator_type coor, output_type oarg)
103 : values(i)
104 , source(i.source_operator)
105 , pendingCompletions(0)
106 , coordinator(std::move(coor))
107 , out(std::move(oarg))
108 {
109 }
110 observable<source_value_type, source_operator_type> source;
111 // on_completed on the output must wait until all the
112 // subscriptions have received on_completed
113 int pendingCompletions;
114 composite_exception exception;;
115 coordinator_type coordinator;
116 output_type out;
117 };
118
119 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
120
121 // take a copy of the values for each subscription
122 auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
123
124 composite_subscription outercs;
125
126 // when the out observer is unsubscribed all the
127 // inner subscriptions are unsubscribed as well
128 state->out.add(outercs);
129
130 auto source = on_exception(
131 [&](){return state->coordinator.in(state->source);},
132 state->out);
133 if (source.empty()) {
134 return;
135 }
136
137 ++state->pendingCompletions;
138 // this subscribe does not share the observer subscription
139 // so that when it is unsubscribed the observer can be called
140 // until the inner subscriptions have finished
141 auto sink = make_subscriber<source_value_type>(
142 state->out,
143 outercs,
144 // on_next
145 [state](source_value_type st) {
146
147 composite_subscription innercs;
148
149 // when the out observer is unsubscribed all the
150 // inner subscriptions are unsubscribed as well
151 auto innercstoken = state->out.add(innercs);
152
153 innercs.add(make_subscription([state, innercstoken](){
154 state->out.remove(innercstoken);
155 }));
156
157 auto selectedSource = state->coordinator.in(st);
158
159 ++state->pendingCompletions;
160 // this subscribe does not share the source subscription
161 // so that when it is unsubscribed the source will continue
162 auto sinkInner = make_subscriber<value_type>(
163 state->out,
164 innercs,
165 // on_next
166 [state, st](value_type ct) {
167 state->out.on_next(std::move(ct));
168 },
169 // on_error
170 [state](rxu::error_ptr e) {
171 if(--state->pendingCompletions == 0) {
172 state->out.on_error(
173 rxu::make_error_ptr(std::move(state->exception.add(e))));
174 } else {
175 state->exception.add(e);
176 }
177 },
178 //on_completed
179 [state](){
180 if (--state->pendingCompletions == 0) {
181 if(!state->exception.empty()) {
182 state->out.on_error(
183 rxu::make_error_ptr(std::move(state->exception)));
184 } else {
185 state->out.on_completed();
186 }
187 }
188 }
189 );
190
191 auto selectedSinkInner = state->coordinator.out(sinkInner);
192 selectedSource.subscribe(std::move(selectedSinkInner));
193 },
194 // on_error
195 [state](rxu::error_ptr e) {
196 if(--state->pendingCompletions == 0) {
197 state->out.on_error(
198 rxu::make_error_ptr(std::move(state->exception.add(e))));
199 } else {
200 state->exception.add(e);
201 }
202 },
203 // on_completed
204 [state]() {
205 if (--state->pendingCompletions == 0) {
206 if(!state->exception.empty()) {
207 state->out.on_error(
208 rxu::make_error_ptr(std::move(state->exception)));
209 } else {
210 state->out.on_completed();
211 }
212 }
213 }
214 );
215 auto selectedSink = on_exception(
216 [&](){return state->coordinator.out(sink);},
217 state->out);
218 if (selectedSink.empty()) {
219 return;
220 }
221 source->subscribe(std::move(selectedSink.get()));
222 }
223 };
224
225 }
226
227 /*! @copydoc rx-merge-delay-error.hpp
228 */
229 template<class... AN>
merge_delay_error(AN &&...an)230 auto merge_delay_error(AN&&... an)
231 -> operator_factory<merge_delay_error_tag, AN...> {
232 return operator_factory<merge_delay_error_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
233 }
234
235 }
236
237 template<>
238 struct member_overload<merge_delay_error_tag>
239 {
240 template<class Observable,
241 class Enabled = rxu::enable_if_all_true_type_t<
242 is_observable<Observable>>,
243 class SourceValue = rxu::value_type_t<Observable>,
244 class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
245 class Value = rxu::value_type_t<SourceValue>,
246 class Result = observable<Value, Merge>
247 >
memberrxcpp::member_overload248 static Result member(Observable&& o) {
249 return Result(Merge(std::forward<Observable>(o), identity_current_thread()));
250 }
251
252 template<class Observable, class Coordination,
253 class Enabled = rxu::enable_if_all_true_type_t<
254 is_observable<Observable>,
255 is_coordination<Coordination>>,
256 class SourceValue = rxu::value_type_t<Observable>,
257 class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
258 class Value = rxu::value_type_t<SourceValue>,
259 class Result = observable<Value, Merge>
260 >
memberrxcpp::member_overload261 static Result member(Observable&& o, Coordination&& cn) {
262 return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
263 }
264
265 template<class Observable, class Value0, class... ValueN,
266 class Enabled = rxu::enable_if_all_true_type_t<
267 all_observables<Observable, Value0, ValueN...>>,
268 class EmittedValue = rxu::value_type_t<Observable>,
269 class SourceValue = observable<EmittedValue>,
270 class ObservableObservable = observable<SourceValue>,
271 class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, identity_one_worker>::type,
272 class Value = rxu::value_type_t<Merge>,
273 class Result = observable<Value, Merge>
274 >
memberrxcpp::member_overload275 static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
276 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
277 }
278
279 template<class Observable, class Coordination, class Value0, class... ValueN,
280 class Enabled = rxu::enable_if_all_true_type_t<
281 all_observables<Observable, Value0, ValueN...>,
282 is_coordination<Coordination>>,
283 class EmittedValue = rxu::value_type_t<Observable>,
284 class SourceValue = observable<EmittedValue>,
285 class ObservableObservable = observable<SourceValue>,
286 class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
287 class Value = rxu::value_type_t<Merge>,
288 class Result = observable<Value, Merge>
289 >
memberrxcpp::member_overload290 static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
291 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
292 }
293
294 template<class... AN>
memberrxcpp::member_overload295 static operators::detail::merge_invalid_t<AN...> member(AN...) {
296 std::terminate();
297 return {};
298 static_assert(sizeof...(AN) == 10000, "merge_delay_error takes (optional Coordination, optional Value0, optional ValueN...)");
299 }
300 };
301
302 }
303
304 #endif
305