1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-concat.hpp
6
7 \brief For each item from this observable subscribe to one at a time, in the order received.
8 For each item from all of the given observables deliver from the new observable that is returned.
9
10 There are 2 variants of the operator:
11 - The source observable emits nested observables, nested observables are concatenated.
12 - The source observable and the arguments v0...vn are used to provide the observables to concatenate.
13
14 \tparam Coordination the type of the scheduler (optional).
15 \tparam Value0 ... (optional).
16 \tparam ValueN types of source observables (optional).
17
18 \param cn the scheduler to synchronize sources from different contexts (optional).
19 \param v0 ... (optional).
20 \param vn source observables (optional).
21
22 \return Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
23
24 \sample
25 \snippet concat.cpp implicit concat sample
26 \snippet output.txt implicit concat sample
27
28 \sample
29 \snippet concat.cpp threaded implicit concat sample
30 \snippet output.txt threaded implicit concat sample
31
32 \sample
33 \snippet concat.cpp concat sample
34 \snippet output.txt concat sample
35
36 \sample
37 \snippet concat.cpp threaded concat sample
38 \snippet output.txt threaded concat sample
39 */
40
41 #if !defined(RXCPP_OPERATORS_RX_CONCAT_HPP)
42 #define RXCPP_OPERATORS_RX_CONCAT_HPP
43
44 #include "../rx-includes.hpp"
45
46 namespace rxcpp {
47
48 namespace operators {
49
50 namespace detail {
51
52 template<class... AN>
53 struct concat_invalid_arguments {};
54
55 template<class... AN>
56 struct concat_invalid : public rxo::operator_base<concat_invalid_arguments<AN...>> {
57 using type = observable<concat_invalid_arguments<AN...>, concat_invalid<AN...>>;
58 };
59 template<class... AN>
60 using concat_invalid_t = typename concat_invalid<AN...>::type;
61
62 template<class T, class Observable, class Coordination>
63 struct concat
64 : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
65 {
66 typedef concat<T, Observable, Coordination> this_type;
67
68 typedef rxu::decay_t<T> source_value_type;
69 typedef rxu::decay_t<Observable> source_type;
70 typedef rxu::decay_t<Coordination> coordination_type;
71
72 typedef typename coordination_type::coordinator_type coordinator_type;
73
74 typedef typename source_type::source_operator_type source_operator_type;
75 typedef source_value_type collection_type;
76 typedef typename collection_type::value_type value_type;
77
78 struct values
79 {
valuesrxcpp::operators::detail::concat::values80 values(source_operator_type o, coordination_type sf)
81 : source_operator(std::move(o))
82 , coordination(std::move(sf))
83 {
84 }
85 source_operator_type source_operator;
86 coordination_type coordination;
87 };
88 values initial;
89
concatrxcpp::operators::detail::concat90 concat(const source_type& o, coordination_type sf)
91 : initial(o.source_operator, std::move(sf))
92 {
93 }
94
95 template<class Subscriber>
on_subscriberxcpp::operators::detail::concat96 void on_subscribe(Subscriber scbr) const {
97 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
98
99 typedef Subscriber output_type;
100
101 struct concat_state_type
102 : public std::enable_shared_from_this<concat_state_type>
103 , public values
104 {
105 concat_state_type(values i, coordinator_type coor, output_type oarg)
106 : values(i)
107 , source(i.source_operator)
108 , sourceLifetime(composite_subscription::empty())
109 , collectionLifetime(composite_subscription::empty())
110 , coordinator(std::move(coor))
111 , out(std::move(oarg))
112 {
113 }
114
115 void subscribe_to(collection_type st)
116 {
117 auto state = this->shared_from_this();
118
119 collectionLifetime = composite_subscription();
120
121 // when the out observer is unsubscribed all the
122 // inner subscriptions are unsubscribed as well
123 auto innercstoken = state->out.add(collectionLifetime);
124
125 collectionLifetime.add(make_subscription([state, innercstoken](){
126 state->out.remove(innercstoken);
127 }));
128
129 auto selectedSource = on_exception(
130 [&](){return state->coordinator.in(std::move(st));},
131 state->out);
132 if (selectedSource.empty()) {
133 return;
134 }
135
136 // this subscribe does not share the out subscription
137 // so that when it is unsubscribed the out will continue
138 auto sinkInner = make_subscriber<value_type>(
139 state->out,
140 collectionLifetime,
141 // on_next
142 [state, st](value_type ct) {
143 state->out.on_next(ct);
144 },
145 // on_error
146 [state](rxu::error_ptr e) {
147 state->out.on_error(e);
148 },
149 //on_completed
150 [state](){
151 if (!state->selectedCollections.empty()) {
152 auto value = state->selectedCollections.front();
153 state->selectedCollections.pop_front();
154 state->collectionLifetime.unsubscribe();
155 state->subscribe_to(value);
156 } else if (!state->sourceLifetime.is_subscribed()) {
157 state->out.on_completed();
158 }
159 }
160 );
161 auto selectedSinkInner = on_exception(
162 [&](){return state->coordinator.out(sinkInner);},
163 state->out);
164 if (selectedSinkInner.empty()) {
165 return;
166 }
167 selectedSource->subscribe(std::move(selectedSinkInner.get()));
168 }
169 observable<source_value_type, source_operator_type> source;
170 composite_subscription sourceLifetime;
171 composite_subscription collectionLifetime;
172 std::deque<collection_type> selectedCollections;
173 coordinator_type coordinator;
174 output_type out;
175 };
176
177 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
178
179 // take a copy of the values for each subscription
180 auto state = std::make_shared<concat_state_type>(initial, std::move(coordinator), std::move(scbr));
181
182 state->sourceLifetime = composite_subscription();
183
184 // when the out observer is unsubscribed all the
185 // inner subscriptions are unsubscribed as well
186 state->out.add(state->sourceLifetime);
187
188 auto source = on_exception(
189 [&](){return state->coordinator.in(state->source);},
190 state->out);
191 if (source.empty()) {
192 return;
193 }
194
195 // this subscribe does not share the observer subscription
196 // so that when it is unsubscribed the observer can be called
197 // until the inner subscriptions have finished
198 auto sink = make_subscriber<collection_type>(
199 state->out,
200 state->sourceLifetime,
201 // on_next
202 [state](collection_type st) {
203 if (state->collectionLifetime.is_subscribed()) {
204 state->selectedCollections.push_back(st);
205 } else if (state->selectedCollections.empty()) {
206 state->subscribe_to(st);
207 }
208 },
209 // on_error
210 [state](rxu::error_ptr e) {
211 state->out.on_error(e);
212 },
213 // on_completed
214 [state]() {
215 if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) {
216 state->out.on_completed();
217 }
218 }
219 );
220 auto selectedSink = on_exception(
221 [&](){return state->coordinator.out(sink);},
222 state->out);
223 if (selectedSink.empty()) {
224 return;
225 }
226 source->subscribe(std::move(selectedSink.get()));
227 }
228 };
229
230 }
231
232 /*! @copydoc rx-concat.hpp
233 */
234 template<class... AN>
concat(AN &&...an)235 auto concat(AN&&... an)
236 -> operator_factory<concat_tag, AN...> {
237 return operator_factory<concat_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
238 }
239
240 }
241
242 template<>
243 struct member_overload<concat_tag>
244 {
245 template<class Observable,
246 class Enabled = rxu::enable_if_all_true_type_t<
247 is_observable<Observable>>,
248 class SourceValue = rxu::value_type_t<Observable>,
249 class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
250 class Value = rxu::value_type_t<SourceValue>,
251 class Result = observable<Value, Concat>
252 >
memberrxcpp::member_overload253 static Result member(Observable&& o) {
254 return Result(Concat(std::forward<Observable>(o), identity_current_thread()));
255 }
256
257 template<class Observable, class Coordination,
258 class Enabled = rxu::enable_if_all_true_type_t<
259 is_observable<Observable>,
260 is_coordination<Coordination>>,
261 class SourceValue = rxu::value_type_t<Observable>,
262 class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
263 class Value = rxu::value_type_t<SourceValue>,
264 class Result = observable<Value, Concat>
265 >
memberrxcpp::member_overload266 static Result member(Observable&& o, Coordination&& cn) {
267 return Result(Concat(std::forward<Observable>(o), std::forward<Coordination>(cn)));
268 }
269
270 template<class Observable, class Value0, class... ValueN,
271 class Enabled = rxu::enable_if_all_true_type_t<
272 all_observables<Observable, Value0, ValueN...>>,
273 class EmittedValue = rxu::value_type_t<Observable>,
274 class SourceValue = observable<EmittedValue>,
275 class ObservableObservable = observable<SourceValue>,
276 class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, identity_one_worker>::type,
277 class Value = rxu::value_type_t<Concat>,
278 class Result = observable<Value, Concat>
279 >
memberrxcpp::member_overload280 static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
281 return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
282 }
283
284 template<class Observable, class Coordination, class Value0, class... ValueN,
285 class Enabled = rxu::enable_if_all_true_type_t<
286 all_observables<Observable, Value0, ValueN...>,
287 is_coordination<Coordination>>,
288 class EmittedValue = rxu::value_type_t<Observable>,
289 class SourceValue = observable<EmittedValue>,
290 class ObservableObservable = observable<SourceValue>,
291 class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
292 class Value = rxu::value_type_t<Concat>,
293 class Result = observable<Value, Concat>
294 >
memberrxcpp::member_overload295 static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
296 return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
297 }
298
299 template<class... AN>
memberrxcpp::member_overload300 static operators::detail::concat_invalid_t<AN...> member(AN...) {
301 std::terminate();
302 return {};
303 static_assert(sizeof...(AN) == 10000, "concat takes (optional Coordination, optional Value0, optional ValueN...)");
304 }
305 };
306
307 }
308
309 #endif
310