1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-amb.hpp
6
7 \brief For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler.
8
9 There are 2 variants of the operator:
10 - The source observable emits nested observables, one of the nested observables is selected.
11 - The source observable and the arguments v0...vn are used to provide the observables to select from.
12
13 \tparam Coordination the type of the scheduler (optional).
14 \tparam Value0 ... (optional).
15 \tparam ValueN types of source observables (optional).
16
17 \param cn the scheduler to synchronize sources from different contexts (optional).
18 \param v0 ... (optional).
19 \param vn source observables (optional).
20
21 \return Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.
22
23 If scheduler is omitted, identity_current_thread is used.
24
25 \sample
26 \snippet amb.cpp threaded implicit amb sample
27 \snippet output.txt threaded implicit amb sample
28
29 \snippet amb.cpp implicit amb sample
30 \snippet output.txt implicit amb sample
31
32 \snippet amb.cpp amb sample
33 \snippet output.txt amb sample
34
35 \snippet amb.cpp threaded amb sample
36 \snippet output.txt threaded amb sample
37 */
38
39 #if !defined(RXCPP_OPERATORS_RX_AMB_HPP)
40 #define RXCPP_OPERATORS_RX_AMB_HPP
41
42 #include "../rx-includes.hpp"
43
44 namespace rxcpp {
45
46 namespace operators {
47
48 namespace detail {
49
50 template<class... AN>
51 struct amb_invalid_arguments {};
52
53 template<class... AN>
54 struct amb_invalid : public rxo::operator_base<amb_invalid_arguments<AN...>> {
55 using type = observable<amb_invalid_arguments<AN...>, amb_invalid<AN...>>;
56 };
57 template<class... AN>
58 using amb_invalid_t = typename amb_invalid<AN...>::type;
59
60 template<class T, class Observable, class Coordination>
61 struct amb
62 : public operator_base<rxu::value_type_t<T>>
63 {
64 //static_assert(is_observable<Observable>::value, "amb requires an observable");
65 //static_assert(is_observable<T>::value, "amb requires an observable that contains observables");
66
67 typedef amb<T, Observable, Coordination> this_type;
68
69 typedef rxu::decay_t<T> source_value_type;
70 typedef rxu::decay_t<Observable> source_type;
71
72 typedef typename source_type::source_operator_type source_operator_type;
73 typedef typename source_value_type::value_type value_type;
74
75 typedef rxu::decay_t<Coordination> coordination_type;
76 typedef typename coordination_type::coordinator_type coordinator_type;
77
78 struct values
79 {
valuesrxcpp::operators::detail::amb::values80 values(source_operator_type o, coordination_type sf)
81 : source_operator(std::move(o))
82 , coordination(std::move(sf))
83 {
84 }
85 source_operator_type source_operator;
86 coordination_type coordination;
87 };
88 values initial;
89
ambrxcpp::operators::detail::amb90 amb(const source_type& o, coordination_type sf)
91 : initial(o.source_operator, std::move(sf))
92 {
93 }
94
95 template<class Subscriber>
on_subscriberxcpp::operators::detail::amb96 void on_subscribe(Subscriber scbr) const {
97 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
98
99 typedef Subscriber output_type;
100
101 struct amb_state_type
102 : public std::enable_shared_from_this<amb_state_type>
103 , public values
104 {
105 amb_state_type(values i, coordinator_type coor, output_type oarg)
106 : values(i)
107 , source(i.source_operator)
108 , coordinator(std::move(coor))
109 , out(std::move(oarg))
110 , pendingObservables(0)
111 , firstEmitted(false)
112 {
113 }
114 observable<source_value_type, source_operator_type> source;
115 coordinator_type coordinator;
116 output_type out;
117 int pendingObservables;
118 bool firstEmitted;
119 std::vector<composite_subscription> innerSubscriptions;
120 };
121
122 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
123
124 // take a copy of the values for each subscription
125 auto state = std::make_shared<amb_state_type>(initial, std::move(coordinator), std::move(scbr));
126
127 composite_subscription outercs;
128
129 // when the out observer is unsubscribed all the
130 // inner subscriptions are unsubscribed as well
131 state->out.add(outercs);
132
133 auto source = on_exception(
134 [&](){return state->coordinator.in(state->source);},
135 state->out);
136 if (source.empty()) {
137 return;
138 }
139
140 // this subscribe does not share the observer subscription
141 // so that when it is unsubscribed the observer can be called
142 // until the inner subscriptions have finished
143 auto sink = make_subscriber<source_value_type>(
144 state->out,
145 outercs,
146 // on_next
147 [state](source_value_type st) {
148
149 if (state->firstEmitted)
150 return;
151
152 composite_subscription innercs;
153
154 state->innerSubscriptions.push_back(innercs);
155
156 // when the out observer is unsubscribed all the
157 // inner subscriptions are unsubscribed as well
158 auto innercstoken = state->out.add(innercs);
159
160 innercs.add(make_subscription([state, innercstoken](){
161 state->out.remove(innercstoken);
162 }));
163
164 auto selectedSource = state->coordinator.in(st);
165
166 auto current_id = state->pendingObservables++;
167
168 // this subscribe does not share the source subscription
169 // so that when it is unsubscribed the source will continue
170 auto sinkInner = make_subscriber<value_type>(
171 state->out,
172 innercs,
173 // on_next
174 [state, st, current_id](value_type ct) {
175 state->out.on_next(std::move(ct));
176 if (!state->firstEmitted) {
177 state->firstEmitted = true;
178 auto do_unsubscribe = [](composite_subscription cs) {
179 cs.unsubscribe();
180 };
181 std::for_each(state->innerSubscriptions.begin(), state->innerSubscriptions.begin() + current_id, do_unsubscribe);
182 std::for_each(state->innerSubscriptions.begin() + current_id + 1, state->innerSubscriptions.end(), do_unsubscribe);
183 }
184 },
185 // on_error
186 [state](rxu::error_ptr e) {
187 state->out.on_error(e);
188 },
189 //on_completed
190 [state](){
191 state->out.on_completed();
192 }
193 );
194
195 auto selectedSinkInner = state->coordinator.out(sinkInner);
196 selectedSource.subscribe(std::move(selectedSinkInner));
197 },
198 // on_error
199 [state](rxu::error_ptr e) {
200 state->out.on_error(e);
201 },
202 // on_completed
203 [state]() {
204 if (state->pendingObservables == 0) {
205 state->out.on_completed();
206 }
207 }
208 );
209 auto selectedSink = on_exception(
210 [&](){return state->coordinator.out(sink);},
211 state->out);
212 if (selectedSink.empty()) {
213 return;
214 }
215 source->subscribe(std::move(selectedSink.get()));
216 }
217 };
218
219 }
220
221 /*! @copydoc rx-amb.hpp
222 */
223 template<class... AN>
amb(AN &&...an)224 auto amb(AN&&... an)
225 -> operator_factory<amb_tag, AN...> {
226 return operator_factory<amb_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
227 }
228
229 }
230
231 template<>
232 struct member_overload<amb_tag>
233 {
234 template<class Observable,
235 class Enabled = rxu::enable_if_all_true_type_t<
236 is_observable<Observable>>,
237 class SourceValue = rxu::value_type_t<Observable>,
238 class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
239 class Value = rxu::value_type_t<SourceValue>,
240 class Result = observable<Value, Amb>
241 >
memberrxcpp::member_overload242 static Result member(Observable&& o) {
243 return Result(Amb(std::forward<Observable>(o), identity_current_thread()));
244 }
245
246 template<class Observable, class Coordination,
247 class Enabled = rxu::enable_if_all_true_type_t<
248 is_observable<Observable>,
249 is_coordination<Coordination>>,
250 class SourceValue = rxu::value_type_t<Observable>,
251 class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
252 class Value = rxu::value_type_t<SourceValue>,
253 class Result = observable<Value, Amb>
254 >
memberrxcpp::member_overload255 static Result member(Observable&& o, Coordination&& cn) {
256 return Result(Amb(std::forward<Observable>(o), std::forward<Coordination>(cn)));
257 }
258
259 template<class Observable, class Value0, class... ValueN,
260 class Enabled = rxu::enable_if_all_true_type_t<
261 all_observables<Observable, Value0, ValueN...>>,
262 class EmittedValue = rxu::value_type_t<Observable>,
263 class SourceValue = observable<EmittedValue>,
264 class ObservableObservable = observable<SourceValue>,
265 class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, identity_one_worker>::type,
266 class Value = rxu::value_type_t<Amb>,
267 class Result = observable<Value, Amb>
268 >
memberrxcpp::member_overload269 static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
270 return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
271 }
272
273 template<class Observable, class Coordination, class Value0, class... ValueN,
274 class Enabled = rxu::enable_if_all_true_type_t<
275 all_observables<Observable, Value0, ValueN...>,
276 is_coordination<Coordination>>,
277 class EmittedValue = rxu::value_type_t<Observable>,
278 class SourceValue = observable<EmittedValue>,
279 class ObservableObservable = observable<SourceValue>,
280 class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
281 class Value = rxu::value_type_t<Amb>,
282 class Result = observable<Value, Amb>
283 >
memberrxcpp::member_overload284 static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
285 return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
286 }
287
288 template<class... AN>
memberrxcpp::member_overload289 static operators::detail::amb_invalid_t<AN...> member(AN...) {
290 std::terminate();
291 return {};
292 static_assert(sizeof...(AN) == 10000, "amb takes (optional Coordination, optional Value0, optional ValueN...)");
293 }
294 };
295
296 }
297
298 #endif
299