1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-flat_map.hpp
6
7 \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
8 For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
9
10 \tparam CollectionSelector the type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type)
11 \tparam ResultSelector the type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type).
12 \tparam Coordination the type of the scheduler (optional).
13
14 \param s a function that returns an observable for each item emitted by the source observable.
15 \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
16 \param cn the scheduler to synchronize sources from different contexts (optional).
17
18 \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
19
20 Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but concatenates the observables.
21
22 \sample
23 \snippet flat_map.cpp flat_map sample
24 \snippet output.txt flat_map sample
25
26 \sample
27 \snippet flat_map.cpp threaded flat_map sample
28 \snippet output.txt threaded flat_map sample
29 */
30
31 #if !defined(RXCPP_OPERATORS_RX_FLATMAP_HPP)
32 #define RXCPP_OPERATORS_RX_FLATMAP_HPP
33
34 #include "../rx-includes.hpp"
35
36 namespace rxcpp {
37
38 namespace operators {
39
40 namespace detail {
41
42 template<class... AN>
43 struct flat_map_invalid_arguments {};
44
45 template<class... AN>
46 struct flat_map_invalid : public rxo::operator_base<flat_map_invalid_arguments<AN...>> {
47 using type = observable<flat_map_invalid_arguments<AN...>, flat_map_invalid<AN...>>;
48 };
49 template<class... AN>
50 using flat_map_invalid_t = typename flat_map_invalid<AN...>::type;
51
52 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
53 struct flat_map_traits {
54 typedef rxu::decay_t<Observable> source_type;
55 typedef rxu::decay_t<CollectionSelector> collection_selector_type;
56 typedef rxu::decay_t<ResultSelector> result_selector_type;
57 typedef rxu::decay_t<Coordination> coordination_type;
58
59 typedef typename source_type::value_type source_value_type;
60
61 struct tag_not_valid {};
62 template<class CV, class CCS>
63 static auto collection_check(int) -> decltype((*(CCS*)nullptr)(*(CV*)nullptr));
64 template<class CV, class CCS>
65 static tag_not_valid collection_check(...);
66
67 static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");
68
69 typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type;
70
71 static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable");
72
73 typedef typename collection_type::value_type collection_value_type;
74
75 template<class CV, class CCV, class CRS>
76 static auto result_check(int) -> decltype((*(CRS*)nullptr)(*(CV*)nullptr, *(CCV*)nullptr));
77 template<class CV, class CCV, class CRS>
78 static tag_not_valid result_check(...);
79
80 static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");
81
82 typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
83 };
84
85 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
86 struct flat_map
87 : public operator_base<rxu::value_type_t<flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination>>>
88 {
89 typedef flat_map<Observable, CollectionSelector, ResultSelector, Coordination> this_type;
90 typedef flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination> traits;
91
92 typedef typename traits::source_type source_type;
93 typedef typename traits::collection_selector_type collection_selector_type;
94 typedef typename traits::result_selector_type result_selector_type;
95
96 typedef typename traits::source_value_type source_value_type;
97 typedef typename traits::collection_type collection_type;
98 typedef typename traits::collection_value_type collection_value_type;
99
100 typedef typename traits::coordination_type coordination_type;
101 typedef typename coordination_type::coordinator_type coordinator_type;
102
103 struct values
104 {
valuesrxcpp::operators::detail::flat_map::values105 values(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
106 : source(std::move(o))
107 , selectCollection(std::move(s))
108 , selectResult(std::move(rs))
109 , coordination(std::move(sf))
110 {
111 }
112 source_type source;
113 collection_selector_type selectCollection;
114 result_selector_type selectResult;
115 coordination_type coordination;
116 };
117 values initial;
118
flat_maprxcpp::operators::detail::flat_map119 flat_map(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
120 : initial(std::move(o), std::move(s), std::move(rs), std::move(sf))
121 {
122 }
123
124 template<class Subscriber>
on_subscriberxcpp::operators::detail::flat_map125 void on_subscribe(Subscriber scbr) const {
126 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
127
128 typedef Subscriber output_type;
129
130 struct state_type
131 : public std::enable_shared_from_this<state_type>
132 , public values
133 {
134 state_type(values i, coordinator_type coor, output_type oarg)
135 : values(std::move(i))
136 , pendingCompletions(0)
137 , coordinator(std::move(coor))
138 , out(std::move(oarg))
139 {
140 }
141 // on_completed on the output must wait until all the
142 // subscriptions have received on_completed
143 int pendingCompletions;
144 coordinator_type coordinator;
145 output_type out;
146 };
147
148 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
149
150 // take a copy of the values for each subscription
151 auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(scbr));
152
153 composite_subscription outercs;
154
155 // when the out observer is unsubscribed all the
156 // inner subscriptions are unsubscribed as well
157 state->out.add(outercs);
158
159 auto source = on_exception(
160 [&](){return state->coordinator.in(state->source);},
161 state->out);
162 if (source.empty()) {
163 return;
164 }
165
166 ++state->pendingCompletions;
167 // this subscribe does not share the observer subscription
168 // so that when it is unsubscribed the observer can be called
169 // until the inner subscriptions have finished
170 auto sink = make_subscriber<source_value_type>(
171 state->out,
172 outercs,
173 // on_next
174 [state](source_value_type st) {
175
176 composite_subscription innercs;
177
178 // when the out observer is unsubscribed all the
179 // inner subscriptions are unsubscribed as well
180 auto innercstoken = state->out.add(innercs);
181
182 innercs.add(make_subscription([state, innercstoken](){
183 state->out.remove(innercstoken);
184 }));
185
186 auto selectedCollection = state->selectCollection(st);
187 auto selectedSource = state->coordinator.in(selectedCollection);
188
189 ++state->pendingCompletions;
190 // this subscribe does not share the source subscription
191 // so that when it is unsubscribed the source will continue
192 auto sinkInner = make_subscriber<collection_value_type>(
193 state->out,
194 innercs,
195 // on_next
196 [state, st](collection_value_type ct) {
197 auto selectedResult = state->selectResult(st, std::move(ct));
198 state->out.on_next(std::move(selectedResult));
199 },
200 // on_error
201 [state](rxu::error_ptr e) {
202 state->out.on_error(e);
203 },
204 //on_completed
205 [state](){
206 if (--state->pendingCompletions == 0) {
207 state->out.on_completed();
208 }
209 }
210 );
211
212 auto selectedSinkInner = state->coordinator.out(sinkInner);
213 selectedSource.subscribe(std::move(selectedSinkInner));
214 },
215 // on_error
216 [state](rxu::error_ptr e) {
217 state->out.on_error(e);
218 },
219 // on_completed
220 [state]() {
221 if (--state->pendingCompletions == 0) {
222 state->out.on_completed();
223 }
224 }
225 );
226
227 auto selectedSink = on_exception(
228 [&](){return state->coordinator.out(sink);},
229 state->out);
230 if (selectedSink.empty()) {
231 return;
232 }
233
234 source->subscribe(std::move(selectedSink.get()));
235
236 }
237 };
238
239 }
240
241 /*! @copydoc rx-flat_map.hpp
242 */
243 template<class... AN>
flat_map(AN &&...an)244 auto flat_map(AN&&... an)
245 -> operator_factory<flat_map_tag, AN...> {
246 return operator_factory<flat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
247 }
248
249 /*! @copydoc rx-flat_map.hpp
250 */
251 template<class... AN>
merge_transform(AN &&...an)252 auto merge_transform(AN&&... an)
253 -> operator_factory<flat_map_tag, AN...> {
254 return operator_factory<flat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
255 }
256
257 }
258
259 template<>
260 struct member_overload<flat_map_tag>
261 {
262 template<class Observable, class CollectionSelector,
263 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
264 class SourceValue = rxu::value_type_t<Observable>,
265 class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
266 class ResultSelectorType = rxu::detail::take_at<1>,
267 class Enabled = rxu::enable_if_all_true_type_t<
268 all_observables<Observable, CollectionType>>,
269 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>,
270 class CollectionValueType = rxu::value_type_t<CollectionType>,
271 class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
272 class Result = observable<Value, FlatMap>
273 >
memberrxcpp::member_overload274 static Result member(Observable&& o, CollectionSelector&& s) {
275 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread()));
276 }
277
278 template<class Observable, class CollectionSelector, class Coordination,
279 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
280 class SourceValue = rxu::value_type_t<Observable>,
281 class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
282 class ResultSelectorType = rxu::detail::take_at<1>,
283 class Enabled = rxu::enable_if_all_true_type_t<
284 all_observables<Observable, CollectionType>,
285 is_coordination<Coordination>>,
286 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>,
287 class CollectionValueType = rxu::value_type_t<CollectionType>,
288 class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
289 class Result = observable<Value, FlatMap>
290 >
memberrxcpp::member_overload291 static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
292 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
293 }
294
295 template<class Observable, class CollectionSelector, class ResultSelector,
296 class IsCoordination = is_coordination<ResultSelector>,
297 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
298 class SourceValue = rxu::value_type_t<Observable>,
299 class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
300 class Enabled = rxu::enable_if_all_true_type_t<
301 all_observables<Observable, CollectionType>,
302 rxu::negation<IsCoordination>>,
303 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>,
304 class CollectionValueType = rxu::value_type_t<CollectionType>,
305 class ResultSelectorType = rxu::decay_t<ResultSelector>,
306 class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
307 class Result = observable<Value, FlatMap>
308 >
memberrxcpp::member_overload309 static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
310 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
311 }
312
313 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination,
314 class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
315 class SourceValue = rxu::value_type_t<Observable>,
316 class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
317 class Enabled = rxu::enable_if_all_true_type_t<
318 all_observables<Observable, CollectionType>,
319 is_coordination<Coordination>>,
320 class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
321 class CollectionValueType = rxu::value_type_t<CollectionType>,
322 class ResultSelectorType = rxu::decay_t<ResultSelector>,
323 class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
324 class Result = observable<Value, FlatMap>
325 >
memberrxcpp::member_overload326 static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
327 return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
328 }
329
330 template<class... AN>
memberrxcpp::member_overload331 static operators::detail::flat_map_invalid_t<AN...> member(AN...) {
332 std::terminate();
333 return {};
334 static_assert(sizeof...(AN) == 10000, "flat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
335 }
336 };
337
338 }
339
340 #endif
341