1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-take_until.hpp
6
7 \brief For each item from this observable until on_next occurs on the trigger observable or until the specified time, emit them from the new observable that is returned.
8 take_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)
9
10 \tparam TriggerSource the type of the trigger observable.
11 \tparam TimePoint the type of the time interval.
12 \tparam Coordination the type of the scheduler (optional).
13
14 \param t an observable whose first emitted item will stop emitting items from the source observable.
15 \param when a time point when the returned observable will stop emitting items.
16 \param cn the scheduler to use for scheduling the items (optional).
17
18 \return An observable that emits the items emitted by the source observable until trigger observable emitted or the time runs out.
19
20 \sample
21 \snippet take_until.cpp take_until sample
22 \snippet output.txt take_until sample
23
24 \sample
25 \snippet take_until.cpp threaded take_until sample
26 \snippet output.txt threaded take_until sample
27
28 \sample
29 \snippet take_until.cpp take_until time sample
30 \snippet output.txt take_until time sample
31
32 \sample
33 \snippet take_until.cpp threaded take_until time sample
34 \snippet output.txt threaded take_until time sample
35 */
36
37 #if !defined(RXCPP_OPERATORS_RX_TAKE_UNTIL_HPP)
38 #define RXCPP_OPERATORS_RX_TAKE_UNTIL_HPP
39
40 #include "../rx-includes.hpp"
41
42 namespace rxcpp {
43
44 namespace operators {
45
46 namespace detail {
47
48 template<class... AN>
49 struct take_until_invalid_arguments {};
50
51 template<class... AN>
52 struct take_until_invalid : public rxo::operator_base<take_until_invalid_arguments<AN...>> {
53 using type = observable<take_until_invalid_arguments<AN...>, take_until_invalid<AN...>>;
54 };
55 template<class... AN>
56 using take_until_invalid_t = typename take_until_invalid<AN...>::type;
57
58 template<class T, class Observable, class TriggerObservable, class Coordination>
59 struct take_until : public operator_base<T>
60 {
61 typedef rxu::decay_t<Observable> source_type;
62 typedef rxu::decay_t<TriggerObservable> trigger_source_type;
63 typedef rxu::decay_t<Coordination> coordination_type;
64 typedef typename coordination_type::coordinator_type coordinator_type;
65 struct values
66 {
valuesrxcpp::operators::detail::take_until::values67 values(source_type s, trigger_source_type t, coordination_type sf)
68 : source(std::move(s))
69 , trigger(std::move(t))
70 , coordination(std::move(sf))
71 {
72 }
73 source_type source;
74 trigger_source_type trigger;
75 coordination_type coordination;
76 };
77 values initial;
78
take_untilrxcpp::operators::detail::take_until79 take_until(source_type s, trigger_source_type t, coordination_type sf)
80 : initial(std::move(s), std::move(t), std::move(sf))
81 {
82 }
83
84 struct mode
85 {
86 enum type {
87 taking, // no messages from trigger
88 clear, // trigger completed
89 triggered, // trigger sent on_next
90 errored, // error either on trigger or on observable
91 stopped // observable completed
92 };
93 };
94
95 template<class Subscriber>
on_subscriberxcpp::operators::detail::take_until96 void on_subscribe(Subscriber s) const {
97
98 typedef Subscriber output_type;
99 struct take_until_state_type
100 : public std::enable_shared_from_this<take_until_state_type>
101 , public values
102 {
103 take_until_state_type(const values& i, coordinator_type coor, const output_type& oarg)
104 : values(i)
105 , mode_value(mode::taking)
106 , coordinator(std::move(coor))
107 , out(oarg)
108 {
109 out.add(trigger_lifetime);
110 out.add(source_lifetime);
111 }
112 typename mode::type mode_value;
113 composite_subscription trigger_lifetime;
114 composite_subscription source_lifetime;
115 coordinator_type coordinator;
116 output_type out;
117 };
118
119 auto coordinator = initial.coordination.create_coordinator(s.get_subscription());
120
121 // take a copy of the values for each subscription
122 auto state = std::make_shared<take_until_state_type>(initial, std::move(coordinator), std::move(s));
123
124 auto trigger = on_exception(
125 [&](){return state->coordinator.in(state->trigger);},
126 state->out);
127 if (trigger.empty()) {
128 return;
129 }
130
131 auto source = on_exception(
132 [&](){return state->coordinator.in(state->source);},
133 state->out);
134 if (source.empty()) {
135 return;
136 }
137
138 auto sinkTrigger = make_subscriber<typename trigger_source_type::value_type>(
139 // share parts of subscription
140 state->out,
141 // new lifetime
142 state->trigger_lifetime,
143 // on_next
144 [state](const typename trigger_source_type::value_type&) {
145 if (state->mode_value != mode::taking) {return;}
146 state->mode_value = mode::triggered;
147 state->out.on_completed();
148 },
149 // on_error
150 [state](rxu::error_ptr e) {
151 if (state->mode_value != mode::taking) {return;}
152 state->mode_value = mode::errored;
153 state->out.on_error(e);
154 },
155 // on_completed
156 [state]() {
157 if (state->mode_value != mode::taking) {return;}
158 state->mode_value = mode::clear;
159 }
160 );
161 auto selectedSinkTrigger = on_exception(
162 [&](){return state->coordinator.out(sinkTrigger);},
163 state->out);
164 if (selectedSinkTrigger.empty()) {
165 return;
166 }
167 trigger->subscribe(std::move(selectedSinkTrigger.get()));
168
169 auto sinkSource = make_subscriber<T>(
170 // split subscription lifetime
171 state->source_lifetime,
172 // on_next
173 [state](T t) {
174 //
175 // everything is crafted to minimize the overhead of this function.
176 //
177 if (state->mode_value < mode::triggered) {
178 state->out.on_next(t);
179 }
180 },
181 // on_error
182 [state](rxu::error_ptr e) {
183 if (state->mode_value > mode::clear) {return;}
184 state->mode_value = mode::errored;
185 state->out.on_error(e);
186 },
187 // on_completed
188 [state]() {
189 if (state->mode_value > mode::clear) {return;}
190 state->mode_value = mode::stopped;
191 state->out.on_completed();
192 }
193 );
194 auto selectedSinkSource = on_exception(
195 [&](){return state->coordinator.out(sinkSource);},
196 state->out);
197 if (selectedSinkSource.empty()) {
198 return;
199 }
200 source->subscribe(std::move(selectedSinkSource.get()));
201 }
202 };
203
204 }
205
206 /*! @copydoc rx-take_until.hpp
207 */
208 template<class... AN>
take_until(AN &&...an)209 auto take_until(AN&&... an)
210 -> operator_factory<take_until_tag, AN...> {
211 return operator_factory<take_until_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
212 }
213
214 }
215
216 template<>
217 struct member_overload<take_until_tag>
218 {
219 template<class Observable, class TimePoint,
220 class Enabled = rxu::enable_if_all_true_type_t<
221 is_observable<Observable>,
222 std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
223 class SourceValue = rxu::value_type_t<Observable>,
224 class Timer = typename rxu::defer_type<rxs::detail::timer, identity_one_worker>::type,
225 class TimerValue = rxu::value_type_t<Timer>,
226 class TriggerObservable = observable<TimerValue, Timer>,
227 class TakeUntil = rxo::detail::take_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, identity_one_worker>,
228 class Value = rxu::value_type_t<TakeUntil>,
229 class Result = observable<Value, TakeUntil>>
memberrxcpp::member_overload230 static Result member(Observable&& o, TimePoint&& when) {
231 auto cn = identity_current_thread();
232 return Result(TakeUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
233 }
234
235 template<class Observable, class TimePoint, class Coordination,
236 class Enabled = rxu::enable_if_all_true_type_t<
237 is_observable<Observable>,
238 is_coordination<Coordination>,
239 std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
240 class SourceValue = rxu::value_type_t<Observable>,
241 class Timer = typename rxu::defer_type<rxs::detail::timer, rxu::decay_t<Coordination>>::type,
242 class TimerValue = rxu::value_type_t<Timer>,
243 class TriggerObservable = observable<TimerValue, Timer>,
244 class TakeUntil = rxo::detail::take_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, rxu::decay_t<Coordination>>,
245 class Value = rxu::value_type_t<TakeUntil>,
246 class Result = observable<Value, TakeUntil>>
memberrxcpp::member_overload247 static Result member(Observable&& o, TimePoint&& when, Coordination cn) {
248 return Result(TakeUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
249 }
250
251 template<class Observable, class TriggerObservable,
252 class Enabled = rxu::enable_if_all_true_type_t<
253 all_observables<Observable, TriggerObservable>>,
254 class SourceValue = rxu::value_type_t<Observable>,
255 class TakeUntil = rxo::detail::take_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, identity_one_worker>,
256 class Value = rxu::value_type_t<TakeUntil>,
257 class Result = observable<Value, TakeUntil>>
memberrxcpp::member_overload258 static Result member(Observable&& o, TriggerObservable&& t) {
259 return Result(TakeUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), identity_current_thread()));
260 }
261
262 template<class Observable, class TriggerObservable, class Coordination,
263 class Enabled = rxu::enable_if_all_true_type_t<
264 all_observables<Observable, TriggerObservable>,
265 is_coordination<Coordination>>,
266 class SourceValue = rxu::value_type_t<Observable>,
267 class TakeUntil = rxo::detail::take_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, rxu::decay_t<Coordination>>,
268 class Value = rxu::value_type_t<TakeUntil>,
269 class Result = observable<Value, TakeUntil>>
memberrxcpp::member_overload270 static Result member(Observable&& o, TriggerObservable&& t, Coordination&& cn) {
271 return Result(TakeUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), std::forward<Coordination>(cn)));
272 }
273
274 template<class... AN>
memberrxcpp::member_overload275 static operators::detail::take_until_invalid_t<AN...> member(AN...) {
276 std::terminate();
277 return {};
278 static_assert(sizeof...(AN) == 10000, "take_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)");
279 }
280 };
281
282 }
283
284 #endif
285