1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_OPERATORS_RX_ZIP_HPP)
6 #define RXCPP_OPERATORS_RX_ZIP_HPP
7
8 #include "../rx-includes.hpp"
9
10 /*! \file rx-zip.hpp
11
12 \brief Bring by one item from all given observables and select a value to emit from the new observable that is returned.
13
14 \tparam AN types of scheduler (optional), aggregate function (optional), and source observables
15
16 \param an scheduler (optional), aggregation function (optional), and source observables
17
18 \return Observable that emits the result of combining the items emitted and brought by one from each of the source observables.
19
20 If scheduler is omitted, identity_current_thread is used.
21
22 If aggregation function is omitted, the resulting observable returns tuples of emitted items.
23
24 \sample
25
26 Neither scheduler nor aggregation function are present:
27 \snippet zip.cpp zip sample
28 \snippet output.txt zip sample
29
30 Only scheduler is present:
31 \snippet zip.cpp Coordination zip sample
32 \snippet output.txt Coordination zip sample
33
34 Only aggregation function is present:
35 \snippet zip.cpp Selector zip sample
36 \snippet output.txt Selector zip sample
37
38 Both scheduler and aggregation function are present:
39 \snippet zip.cpp Coordination+Selector zip sample
40 \snippet output.txt Coordination+Selector zip sample
41 */
42
43 namespace rxcpp {
44
45 namespace operators {
46
47 namespace detail {
48
49 template<class Observable>
50 struct zip_source_state
51 {
52 using value_type = rxu::value_type_t<Observable>;
zip_source_staterxcpp::operators::detail::zip_source_state53 zip_source_state()
54 : completed(false)
55 {
56 }
57 std::list<value_type> values;
58 bool completed;
59 };
60
61 struct values_not_empty {
62 template<class Observable>
operator ()rxcpp::operators::detail::values_not_empty63 bool operator()(zip_source_state<Observable>& source) const {
64 return !source.values.empty();
65 }
66 };
67
68 struct source_completed_values_empty {
69 template<class Observable>
operator ()rxcpp::operators::detail::source_completed_values_empty70 bool operator()(zip_source_state<Observable>& source) const {
71 return source.completed && source.values.empty();
72 }
73 };
74
75 struct extract_value_front {
76 template<class Observable, class Value = rxu::value_type_t<Observable>>
operator ()rxcpp::operators::detail::extract_value_front77 Value operator()(zip_source_state<Observable>& source) const {
78 auto val = std::move(source.values.front());
79 source.values.pop_front();
80 return val;
81 }
82 };
83
84 template<class... AN>
85 struct zip_invalid_arguments {};
86
87 template<class... AN>
88 struct zip_invalid : public rxo::operator_base<zip_invalid_arguments<AN...>> {
89 using type = observable<zip_invalid_arguments<AN...>, zip_invalid<AN...>>;
90 };
91 template<class... AN>
92 using zip_invalid_t = typename zip_invalid<AN...>::type;
93
94 template<class Selector, class... ObservableN>
95 struct is_zip_selector_check {
96 typedef rxu::decay_t<Selector> selector_type;
97
98 struct tag_not_valid;
99 template<class CS, class... CON>
100 static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...));
101 template<class CS, class... CON>
102 static tag_not_valid check(...);
103
104 using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0));
105
106 static const bool value = !std::is_same<type, tag_not_valid>::value;
107 };
108
109 template<class Selector, class... ObservableN>
110 struct invalid_zip_selector {
111 static const bool value = false;
112 };
113
114 template<class Selector, class... ObservableN>
115 struct is_zip_selector : public std::conditional<
116 is_zip_selector_check<Selector, ObservableN...>::value,
117 is_zip_selector_check<Selector, ObservableN...>,
118 invalid_zip_selector<Selector, ObservableN...>>::type {
119 };
120
121 template<class Selector, class... ON>
122 using result_zip_selector_t = typename is_zip_selector<Selector, ON...>::type;
123
124 template<class Coordination, class Selector, class... ObservableN>
125 struct zip_traits {
126 typedef std::tuple<rxu::decay_t<ObservableN>...> tuple_source_type;
127 typedef std::tuple<zip_source_state<ObservableN>...> tuple_source_values_type;
128
129 typedef rxu::decay_t<Selector> selector_type;
130 typedef rxu::decay_t<Coordination> coordination_type;
131
132 typedef typename is_zip_selector<selector_type, ObservableN...>::type value_type;
133 };
134
135 template<class Coordination, class Selector, class... ObservableN>
136 struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Selector, ObservableN...>>>
137 {
138 typedef zip<Coordination, Selector, ObservableN...> this_type;
139
140 typedef zip_traits<Coordination, Selector, ObservableN...> traits;
141
142 typedef typename traits::tuple_source_type tuple_source_type;
143 typedef typename traits::tuple_source_values_type tuple_source_values_type;
144
145 typedef typename traits::selector_type selector_type;
146
147 typedef typename traits::coordination_type coordination_type;
148 typedef typename coordination_type::coordinator_type coordinator_type;
149
150 struct values
151 {
valuesrxcpp::operators::detail::zip::values152 values(tuple_source_type o, selector_type s, coordination_type sf)
153 : source(std::move(o))
154 , selector(std::move(s))
155 , coordination(std::move(sf))
156 {
157 }
158 tuple_source_type source;
159 selector_type selector;
160 coordination_type coordination;
161 };
162 values initial;
163
ziprxcpp::operators::detail::zip164 zip(coordination_type sf, selector_type s, tuple_source_type ts)
165 : initial(std::move(ts), std::move(s), std::move(sf))
166 {
167 }
168
169 template<int Index, class State>
subscribe_onerxcpp::operators::detail::zip170 void subscribe_one(std::shared_ptr<State> state) const {
171
172 typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type;
173
174 composite_subscription innercs;
175
176 // when the out observer is unsubscribed all the
177 // inner subscriptions are unsubscribed as well
178 state->out.add(innercs);
179
180 auto source = on_exception(
181 [&](){return state->coordinator.in(std::get<Index>(state->source));},
182 state->out);
183 if (source.empty()) {
184 return;
185 }
186
187 // this subscribe does not share the observer subscription
188 // so that when it is unsubscribed the observer can be called
189 // until the inner subscriptions have finished
190 auto sink = make_subscriber<source_value_type>(
191 state->out,
192 innercs,
193 // on_next
194 [state](source_value_type st) {
195 auto& values = std::get<Index>(state->pending).values;
196 values.push_back(st);
197 if (rxu::apply_to_each(state->pending, values_not_empty(), rxu::all_values_true())) {
198 auto selectedResult = rxu::apply_to_each(state->pending, extract_value_front(), state->selector);
199 state->out.on_next(selectedResult);
200 }
201 if (rxu::apply_to_each(state->pending, source_completed_values_empty(), rxu::any_value_true())) {
202 state->out.on_completed();
203 }
204 },
205 // on_error
206 [state](rxu::error_ptr e) {
207 state->out.on_error(e);
208 },
209 // on_completed
210 [state]() {
211 auto& completed = std::get<Index>(state->pending).completed;
212 completed = true;
213 if (--state->pendingCompletions == 0) {
214 state->out.on_completed();
215 }
216 }
217 );
218 auto selectedSink = on_exception(
219 [&](){return state->coordinator.out(sink);},
220 state->out);
221 if (selectedSink.empty()) {
222 return;
223 }
224 source->subscribe(std::move(selectedSink.get()));
225 }
226 template<class State, int... IndexN>
subscribe_allrxcpp::operators::detail::zip227 void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const {
228 bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
229 subscribed[0] = (*subscribed); // silence warning
230 }
231
232 template<class Subscriber>
on_subscriberxcpp::operators::detail::zip233 void on_subscribe(Subscriber scbr) const {
234 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
235
236 typedef Subscriber output_type;
237
238 struct zip_state_type
239 : public std::enable_shared_from_this<zip_state_type>
240 , public values
241 {
242 zip_state_type(values i, coordinator_type coor, output_type oarg)
243 : values(std::move(i))
244 , pendingCompletions(sizeof... (ObservableN))
245 , valuesSet(0)
246 , coordinator(std::move(coor))
247 , out(std::move(oarg))
248 {
249 }
250
251 // on_completed on the output must wait until all the
252 // subscriptions have received on_completed
253 mutable int pendingCompletions;
254 mutable int valuesSet;
255 mutable tuple_source_values_type pending;
256 coordinator_type coordinator;
257 output_type out;
258 };
259
260 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
261
262 // take a copy of the values for each subscription
263 auto state = std::make_shared<zip_state_type>(initial, std::move(coordinator), std::move(scbr));
264
265 subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type());
266 }
267 };
268
269 }
270
271 /*! @copydoc rx-zip.hpp
272 */
273 template<class... AN>
zip(AN &&...an)274 auto zip(AN&&... an)
275 -> operator_factory<zip_tag, AN...> {
276 return operator_factory<zip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
277 }
278
279 }
280
281 template<>
282 struct member_overload<zip_tag>
283 {
284 template<class Observable, class... ObservableN,
285 class Enabled = rxu::enable_if_all_true_type_t<
286 all_observables<Observable, ObservableN...>>,
287 class Zip = rxo::detail::zip<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
288 class Value = rxu::value_type_t<Zip>,
289 class Result = observable<Value, Zip>>
memberrxcpp::member_overload290 static Result member(Observable&& o, ObservableN&&... on)
291 {
292 return Result(Zip(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
293 }
294
295 template<class Observable, class Selector, class... ObservableN,
296 class Enabled = rxu::enable_if_all_true_type_t<
297 operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
298 all_observables<Observable, ObservableN...>>,
299 class ResolvedSelector = rxu::decay_t<Selector>,
300 class Zip = rxo::detail::zip<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
301 class Value = rxu::value_type_t<Zip>,
302 class Result = observable<Value, Zip>>
memberrxcpp::member_overload303 static Result member(Observable&& o, Selector&& s, ObservableN&&... on)
304 {
305 return Result(Zip(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
306 }
307
308 template<class Coordination, class Observable, class... ObservableN,
309 class Enabled = rxu::enable_if_all_true_type_t<
310 is_coordination<Coordination>,
311 all_observables<Observable, ObservableN...>>,
312 class Zip = rxo::detail::zip<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
313 class Value = rxu::value_type_t<Zip>,
314 class Result = observable<Value, Zip>>
memberrxcpp::member_overload315 static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on)
316 {
317 return Result(Zip(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
318 }
319
320 template<class Coordination, class Selector, class Observable, class... ObservableN,
321 class Enabled = rxu::enable_if_all_true_type_t<
322 is_coordination<Coordination>,
323 operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
324 all_observables<Observable, ObservableN...>>,
325 class ResolvedSelector = rxu::decay_t<Selector>,
326 class Zip = rxo::detail::zip<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
327 class Value = rxu::value_type_t<Zip>,
328 class Result = observable<Value, Zip>>
memberrxcpp::member_overload329 static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on)
330 {
331 return Result(Zip(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
332 }
333
334 template<class... AN>
memberrxcpp::member_overload335 static operators::detail::zip_invalid_t<AN...> member(const AN&...) {
336 std::terminate();
337 return {};
338 static_assert(sizeof...(AN) == 10000, "zip takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)");
339 }
340 };
341
342 }
343
344 #endif
345