• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-flat_map.hpp
6 
7     \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
8            For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
9 
10     \tparam CollectionSelector  the type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type)
11     \tparam ResultSelector      the type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type).
12     \tparam Coordination        the type of the scheduler (optional).
13 
14     \param  s   a function that returns an observable for each item emitted by the source observable.
15     \param  rs  a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
16     \param  cn  the scheduler to synchronize sources from different contexts (optional).
17 
18     \return  Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
19 
20     Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but concatenates the observables.
21 
22     \sample
23     \snippet flat_map.cpp flat_map sample
24     \snippet output.txt flat_map sample
25 
26     \sample
27     \snippet flat_map.cpp threaded flat_map sample
28     \snippet output.txt threaded flat_map sample
29 */
30 
31 #if !defined(RXCPP_OPERATORS_RX_FLATMAP_HPP)
32 #define RXCPP_OPERATORS_RX_FLATMAP_HPP
33 
34 #include "../rx-includes.hpp"
35 
36 namespace rxcpp {
37 
38 namespace operators {
39 
40 namespace detail {
41 
42 template<class... AN>
43 struct flat_map_invalid_arguments {};
44 
45 template<class... AN>
46 struct flat_map_invalid : public rxo::operator_base<flat_map_invalid_arguments<AN...>> {
47     using type = observable<flat_map_invalid_arguments<AN...>, flat_map_invalid<AN...>>;
48 };
49 template<class... AN>
50 using flat_map_invalid_t = typename flat_map_invalid<AN...>::type;
51 
52 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
53 struct flat_map_traits {
54     typedef rxu::decay_t<Observable> source_type;
55     typedef rxu::decay_t<CollectionSelector> collection_selector_type;
56     typedef rxu::decay_t<ResultSelector> result_selector_type;
57     typedef rxu::decay_t<Coordination> coordination_type;
58 
59     typedef typename source_type::value_type source_value_type;
60 
61     struct tag_not_valid {};
62     template<class CV, class CCS>
63     static auto collection_check(int) -> decltype((*(CCS*)nullptr)(*(CV*)nullptr));
64     template<class CV, class CCS>
65     static tag_not_valid collection_check(...);
66 
67     static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");
68 
69     typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type;
70 
71     static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable");
72 
73     typedef typename collection_type::value_type collection_value_type;
74 
75     template<class CV, class CCV, class CRS>
76     static auto result_check(int) -> decltype((*(CRS*)nullptr)(*(CV*)nullptr, *(CCV*)nullptr));
77     template<class CV, class CCV, class CRS>
78     static tag_not_valid result_check(...);
79 
80     static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");
81 
82     typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
83 };
84 
85 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
86 struct flat_map
87     : public operator_base<rxu::value_type_t<flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination>>>
88 {
89     typedef flat_map<Observable, CollectionSelector, ResultSelector, Coordination> this_type;
90     typedef flat_map_traits<Observable, CollectionSelector, ResultSelector, Coordination> traits;
91 
92     typedef typename traits::source_type source_type;
93     typedef typename traits::collection_selector_type collection_selector_type;
94     typedef typename traits::result_selector_type result_selector_type;
95 
96     typedef typename traits::source_value_type source_value_type;
97     typedef typename traits::collection_type collection_type;
98     typedef typename traits::collection_value_type collection_value_type;
99 
100     typedef typename traits::coordination_type coordination_type;
101     typedef typename coordination_type::coordinator_type coordinator_type;
102 
103     struct values
104     {
valuesrxcpp::operators::detail::flat_map::values105         values(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
106             : source(std::move(o))
107             , selectCollection(std::move(s))
108             , selectResult(std::move(rs))
109             , coordination(std::move(sf))
110         {
111         }
112         source_type source;
113         collection_selector_type selectCollection;
114         result_selector_type selectResult;
115         coordination_type coordination;
116     };
117     values initial;
118 
flat_maprxcpp::operators::detail::flat_map119     flat_map(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
120         : initial(std::move(o), std::move(s), std::move(rs), std::move(sf))
121     {
122     }
123 
124     template<class Subscriber>
on_subscriberxcpp::operators::detail::flat_map125     void on_subscribe(Subscriber scbr) const {
126         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
127 
128         typedef Subscriber output_type;
129 
130         struct state_type
131             : public std::enable_shared_from_this<state_type>
132             , public values
133         {
134             state_type(values i, coordinator_type coor, output_type oarg)
135                 : values(std::move(i))
136                 , pendingCompletions(0)
137                 , coordinator(std::move(coor))
138                 , out(std::move(oarg))
139             {
140             }
141             // on_completed on the output must wait until all the
142             // subscriptions have received on_completed
143             int pendingCompletions;
144             coordinator_type coordinator;
145             output_type out;
146         };
147 
148         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
149 
150         // take a copy of the values for each subscription
151         auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(scbr));
152 
153         composite_subscription outercs;
154 
155         // when the out observer is unsubscribed all the
156         // inner subscriptions are unsubscribed as well
157         state->out.add(outercs);
158 
159         auto source = on_exception(
160             [&](){return state->coordinator.in(state->source);},
161             state->out);
162         if (source.empty()) {
163             return;
164         }
165 
166         ++state->pendingCompletions;
167         // this subscribe does not share the observer subscription
168         // so that when it is unsubscribed the observer can be called
169         // until the inner subscriptions have finished
170         auto sink = make_subscriber<source_value_type>(
171             state->out,
172             outercs,
173         // on_next
174             [state](source_value_type st) {
175 
176                 composite_subscription innercs;
177 
178                 // when the out observer is unsubscribed all the
179                 // inner subscriptions are unsubscribed as well
180                 auto innercstoken = state->out.add(innercs);
181 
182                 innercs.add(make_subscription([state, innercstoken](){
183                     state->out.remove(innercstoken);
184                 }));
185 
186                 auto selectedCollection = state->selectCollection(st);
187                 auto selectedSource = state->coordinator.in(selectedCollection);
188 
189                 ++state->pendingCompletions;
190                 // this subscribe does not share the source subscription
191                 // so that when it is unsubscribed the source will continue
192                 auto sinkInner = make_subscriber<collection_value_type>(
193                     state->out,
194                     innercs,
195                 // on_next
196                     [state, st](collection_value_type ct) {
197                         auto selectedResult = state->selectResult(st, std::move(ct));
198                         state->out.on_next(std::move(selectedResult));
199                     },
200                 // on_error
201                     [state](rxu::error_ptr e) {
202                         state->out.on_error(e);
203                     },
204                 //on_completed
205                     [state](){
206                         if (--state->pendingCompletions == 0) {
207                             state->out.on_completed();
208                         }
209                     }
210                 );
211 
212                 auto selectedSinkInner = state->coordinator.out(sinkInner);
213                 selectedSource.subscribe(std::move(selectedSinkInner));
214             },
215         // on_error
216             [state](rxu::error_ptr e) {
217                 state->out.on_error(e);
218             },
219         // on_completed
220             [state]() {
221                 if (--state->pendingCompletions == 0) {
222                     state->out.on_completed();
223                 }
224             }
225         );
226 
227         auto selectedSink = on_exception(
228             [&](){return state->coordinator.out(sink);},
229             state->out);
230         if (selectedSink.empty()) {
231             return;
232         }
233 
234         source->subscribe(std::move(selectedSink.get()));
235 
236     }
237 };
238 
239 }
240 
241 /*! @copydoc rx-flat_map.hpp
242 */
243 template<class... AN>
flat_map(AN &&...an)244 auto flat_map(AN&&... an)
245 ->     operator_factory<flat_map_tag, AN...> {
246     return operator_factory<flat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
247 }
248 
249 /*! @copydoc rx-flat_map.hpp
250 */
251 template<class... AN>
merge_transform(AN &&...an)252 auto merge_transform(AN&&... an)
253 ->     operator_factory<flat_map_tag, AN...> {
254     return operator_factory<flat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
255 }
256 
257 }
258 
259 template<>
260 struct member_overload<flat_map_tag>
261 {
262     template<class Observable, class CollectionSelector,
263         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
264         class SourceValue = rxu::value_type_t<Observable>,
265         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
266         class ResultSelectorType = rxu::detail::take_at<1>,
267         class Enabled = rxu::enable_if_all_true_type_t<
268             all_observables<Observable, CollectionType>>,
269         class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>,
270         class CollectionValueType = rxu::value_type_t<CollectionType>,
271         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
272         class Result = observable<Value, FlatMap>
273     >
memberrxcpp::member_overload274     static Result member(Observable&& o, CollectionSelector&& s) {
275         return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread()));
276     }
277 
278     template<class Observable, class CollectionSelector, class Coordination,
279         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
280         class SourceValue = rxu::value_type_t<Observable>,
281         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
282         class ResultSelectorType = rxu::detail::take_at<1>,
283         class Enabled = rxu::enable_if_all_true_type_t<
284             all_observables<Observable, CollectionType>,
285             is_coordination<Coordination>>,
286         class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>,
287         class CollectionValueType = rxu::value_type_t<CollectionType>,
288         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
289         class Result = observable<Value, FlatMap>
290     >
memberrxcpp::member_overload291     static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
292         return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
293     }
294 
295     template<class Observable, class CollectionSelector, class ResultSelector,
296         class IsCoordination = is_coordination<ResultSelector>,
297         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
298         class SourceValue = rxu::value_type_t<Observable>,
299         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
300         class Enabled = rxu::enable_if_all_true_type_t<
301             all_observables<Observable, CollectionType>,
302             rxu::negation<IsCoordination>>,
303         class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>,
304         class CollectionValueType = rxu::value_type_t<CollectionType>,
305         class ResultSelectorType = rxu::decay_t<ResultSelector>,
306         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
307         class Result = observable<Value, FlatMap>
308     >
memberrxcpp::member_overload309     static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
310         return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
311     }
312 
313     template<class Observable, class CollectionSelector, class ResultSelector, class Coordination,
314         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
315         class SourceValue = rxu::value_type_t<Observable>,
316         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
317         class Enabled = rxu::enable_if_all_true_type_t<
318             all_observables<Observable, CollectionType>,
319             is_coordination<Coordination>>,
320         class FlatMap = rxo::detail::flat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
321         class CollectionValueType = rxu::value_type_t<CollectionType>,
322         class ResultSelectorType = rxu::decay_t<ResultSelector>,
323         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
324         class Result = observable<Value, FlatMap>
325     >
memberrxcpp::member_overload326     static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
327         return Result(FlatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
328     }
329 
330     template<class... AN>
memberrxcpp::member_overload331     static operators::detail::flat_map_invalid_t<AN...> member(AN...) {
332         std::terminate();
333         return {};
334         static_assert(sizeof...(AN) == 10000, "flat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
335     }
336 };
337 
338 }
339 
340 #endif
341