1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-merge.hpp
6
7 \brief For each given observable subscribe.
8 For each item emitted from all of the given observables, deliver from the new observable that is returned.
9
10 There are 2 variants of the operator:
11 - The source observable emits nested observables, nested observables are merged.
12 - The source observable and the arguments v0...vn are used to provide the observables to merge.
13
14 \tparam Coordination the type of the scheduler (optional).
15 \tparam Value0 ... (optional).
16 \tparam ValueN types of source observables (optional).
17
18 \param cn the scheduler to synchronize sources from different contexts (optional).
19 \param v0 ... (optional).
20 \param vn source observables (optional).
21
22 \return Observable that emits items that are the result of flattening the observables emitted by the source observable.
23
24 If scheduler is omitted, identity_current_thread is used.
25
26 \sample
27 \snippet merge.cpp threaded implicit merge sample
28 \snippet output.txt threaded implicit merge sample
29
30 \sample
31 \snippet merge.cpp implicit merge sample
32 \snippet output.txt implicit merge sample
33
34 \sample
35 \snippet merge.cpp merge sample
36 \snippet output.txt merge sample
37
38 \sample
39 \snippet merge.cpp threaded merge sample
40 \snippet output.txt threaded merge sample
41 */
42
43 #if !defined(RXCPP_OPERATORS_RX_MERGE_HPP)
44 #define RXCPP_OPERATORS_RX_MERGE_HPP
45
46 #include "../rx-includes.hpp"
47
48 namespace rxcpp {
49
50 namespace operators {
51
52 namespace detail {
53
54 template<class... AN>
55 struct merge_invalid_arguments {};
56
57 template<class... AN>
58 struct merge_invalid : public rxo::operator_base<merge_invalid_arguments<AN...>> {
59 using type = observable<merge_invalid_arguments<AN...>, merge_invalid<AN...>>;
60 };
61 template<class... AN>
62 using merge_invalid_t = typename merge_invalid<AN...>::type;
63
64 template<class T, class Observable, class Coordination>
65 struct merge
66 : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
67 {
68 //static_assert(is_observable<Observable>::value, "merge requires an observable");
69 //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
70
71 typedef merge<T, Observable, Coordination> this_type;
72
73 typedef rxu::decay_t<T> source_value_type;
74 typedef rxu::decay_t<Observable> source_type;
75
76 typedef typename source_type::source_operator_type source_operator_type;
77 typedef typename source_value_type::value_type value_type;
78
79 typedef rxu::decay_t<Coordination> coordination_type;
80 typedef typename coordination_type::coordinator_type coordinator_type;
81
82 struct values
83 {
valuesrxcpp::operators::detail::merge::values84 values(source_operator_type o, coordination_type sf)
85 : source_operator(std::move(o))
86 , coordination(std::move(sf))
87 {
88 }
89 source_operator_type source_operator;
90 coordination_type coordination;
91 };
92 values initial;
93
mergerxcpp::operators::detail::merge94 merge(const source_type& o, coordination_type sf)
95 : initial(o.source_operator, std::move(sf))
96 {
97 }
98
99 template<class Subscriber>
on_subscriberxcpp::operators::detail::merge100 void on_subscribe(Subscriber scbr) const {
101 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
102
103 typedef Subscriber output_type;
104
105 struct merge_state_type
106 : public std::enable_shared_from_this<merge_state_type>
107 , public values
108 {
109 merge_state_type(values i, coordinator_type coor, output_type oarg)
110 : values(i)
111 , source(i.source_operator)
112 , pendingCompletions(0)
113 , coordinator(std::move(coor))
114 , out(std::move(oarg))
115 {
116 }
117 observable<source_value_type, source_operator_type> source;
118 // on_completed on the output must wait until all the
119 // subscriptions have received on_completed
120 int pendingCompletions;
121 coordinator_type coordinator;
122 output_type out;
123 };
124
125 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
126
127 // take a copy of the values for each subscription
128 auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
129
130 composite_subscription outercs;
131
132 // when the out observer is unsubscribed all the
133 // inner subscriptions are unsubscribed as well
134 state->out.add(outercs);
135
136 auto source = on_exception(
137 [&](){return state->coordinator.in(state->source);},
138 state->out);
139 if (source.empty()) {
140 return;
141 }
142
143 ++state->pendingCompletions;
144 // this subscribe does not share the observer subscription
145 // so that when it is unsubscribed the observer can be called
146 // until the inner subscriptions have finished
147 auto sink = make_subscriber<source_value_type>(
148 state->out,
149 outercs,
150 // on_next
151 [state](source_value_type st) {
152
153 composite_subscription innercs;
154
155 // when the out observer is unsubscribed all the
156 // inner subscriptions are unsubscribed as well
157 auto innercstoken = state->out.add(innercs);
158
159 innercs.add(make_subscription([state, innercstoken](){
160 state->out.remove(innercstoken);
161 }));
162
163 auto selectedSource = state->coordinator.in(st);
164
165 ++state->pendingCompletions;
166 // this subscribe does not share the source subscription
167 // so that when it is unsubscribed the source will continue
168 auto sinkInner = make_subscriber<value_type>(
169 state->out,
170 innercs,
171 // on_next
172 [state, st](value_type ct) {
173 state->out.on_next(std::move(ct));
174 },
175 // on_error
176 [state](rxu::error_ptr e) {
177 state->out.on_error(e);
178 },
179 //on_completed
180 [state](){
181 if (--state->pendingCompletions == 0) {
182 state->out.on_completed();
183 }
184 }
185 );
186
187 auto selectedSinkInner = state->coordinator.out(sinkInner);
188 selectedSource.subscribe(std::move(selectedSinkInner));
189 },
190 // on_error
191 [state](rxu::error_ptr e) {
192 state->out.on_error(e);
193 },
194 // on_completed
195 [state]() {
196 if (--state->pendingCompletions == 0) {
197 state->out.on_completed();
198 }
199 }
200 );
201 auto selectedSink = on_exception(
202 [&](){return state->coordinator.out(sink);},
203 state->out);
204 if (selectedSink.empty()) {
205 return;
206 }
207 source->subscribe(std::move(selectedSink.get()));
208 }
209 };
210
211 }
212
213 /*! @copydoc rx-merge.hpp
214 */
215 template<class... AN>
merge(AN &&...an)216 auto merge(AN&&... an)
217 -> operator_factory<merge_tag, AN...> {
218 return operator_factory<merge_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
219 }
220
221 }
222
223 template<>
224 struct member_overload<merge_tag>
225 {
226 template<class Observable,
227 class Enabled = rxu::enable_if_all_true_type_t<
228 is_observable<Observable>>,
229 class SourceValue = rxu::value_type_t<Observable>,
230 class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
231 class Value = rxu::value_type_t<SourceValue>,
232 class Result = observable<Value, Merge>
233 >
memberrxcpp::member_overload234 static Result member(Observable&& o) {
235 return Result(Merge(std::forward<Observable>(o), identity_current_thread()));
236 }
237
238 template<class Observable, class Coordination,
239 class Enabled = rxu::enable_if_all_true_type_t<
240 is_observable<Observable>,
241 is_coordination<Coordination>>,
242 class SourceValue = rxu::value_type_t<Observable>,
243 class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
244 class Value = rxu::value_type_t<SourceValue>,
245 class Result = observable<Value, Merge>
246 >
memberrxcpp::member_overload247 static Result member(Observable&& o, Coordination&& cn) {
248 return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
249 }
250
251 template<class Observable, class Value0, class... ValueN,
252 class Enabled = rxu::enable_if_all_true_type_t<
253 all_observables<Observable, Value0, ValueN...>>,
254 class EmittedValue = rxu::value_type_t<Observable>,
255 class SourceValue = observable<EmittedValue>,
256 class ObservableObservable = observable<SourceValue>,
257 class Merge = typename rxu::defer_type<rxo::detail::merge, SourceValue, ObservableObservable, identity_one_worker>::type,
258 class Value = rxu::value_type_t<Merge>,
259 class Result = observable<Value, Merge>
260 >
memberrxcpp::member_overload261 static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
262 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
263 }
264
265 template<class Observable, class Coordination, class Value0, class... ValueN,
266 class Enabled = rxu::enable_if_all_true_type_t<
267 all_observables<Observable, Value0, ValueN...>,
268 is_coordination<Coordination>>,
269 class EmittedValue = rxu::value_type_t<Observable>,
270 class SourceValue = observable<EmittedValue>,
271 class ObservableObservable = observable<SourceValue>,
272 class Merge = typename rxu::defer_type<rxo::detail::merge, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
273 class Value = rxu::value_type_t<Merge>,
274 class Result = observable<Value, Merge>
275 >
memberrxcpp::member_overload276 static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
277 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
278 }
279
280 template<class... AN>
memberrxcpp::member_overload281 static operators::detail::merge_invalid_t<AN...> member(AN...) {
282 std::terminate();
283 return {};
284 static_assert(sizeof...(AN) == 10000, "merge takes (optional Coordination, optional Value0, optional ValueN...)");
285 }
286 };
287
288 }
289
290 #endif
291