1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-timeout.hpp
6
7 \brief Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable.
8
9 \tparam Duration the type of time interval.
10 \tparam Coordination the type of the scheduler (optional).
11
12 \param period the period of time wait for another item from the source observable.
13 \param coordination the scheduler to manage timeout for each event (optional).
14
15 \return Observable that terminates with an error if a particular timespan has passed without emitting another item from the source observable.
16
17 \sample
18 \snippet timeout.cpp timeout sample
19 \snippet output.txt timeout sample
20 */
21
22 #if !defined(RXCPP_OPERATORS_RX_TIMEOUT_HPP)
23 #define RXCPP_OPERATORS_RX_TIMEOUT_HPP
24
25 #include "../rx-includes.hpp"
26
27 namespace rxcpp {
28
29 class timeout_error: public std::runtime_error
30 {
31 public:
timeout_error(const std::string & msg)32 explicit timeout_error(const std::string& msg):
33 std::runtime_error(msg)
34 {}
35 };
36
37 namespace operators {
38
39 namespace detail {
40
41 template<class... AN>
42 struct timeout_invalid_arguments {};
43
44 template<class... AN>
45 struct timeout_invalid : public rxo::operator_base<timeout_invalid_arguments<AN...>> {
46 using type = observable<timeout_invalid_arguments<AN...>, timeout_invalid<AN...>>;
47 };
48 template<class... AN>
49 using timeout_invalid_t = typename timeout_invalid<AN...>::type;
50
51 template<class T, class Duration, class Coordination>
52 struct timeout
53 {
54 typedef rxu::decay_t<T> source_value_type;
55 typedef rxu::decay_t<Coordination> coordination_type;
56 typedef typename coordination_type::coordinator_type coordinator_type;
57 typedef rxu::decay_t<Duration> duration_type;
58
59 struct timeout_values
60 {
timeout_valuesrxcpp::operators::detail::timeout::timeout_values61 timeout_values(duration_type p, coordination_type c)
62 : period(p)
63 , coordination(c)
64 {
65 }
66
67 duration_type period;
68 coordination_type coordination;
69 };
70 timeout_values initial;
71
timeoutrxcpp::operators::detail::timeout72 timeout(duration_type period, coordination_type coordination)
73 : initial(period, coordination)
74 {
75 }
76
77 template<class Subscriber>
78 struct timeout_observer
79 {
80 typedef timeout_observer<Subscriber> this_type;
81 typedef rxu::decay_t<T> value_type;
82 typedef rxu::decay_t<Subscriber> dest_type;
83 typedef observer<T, this_type> observer_type;
84
85 struct timeout_subscriber_values : public timeout_values
86 {
timeout_subscriber_valuesrxcpp::operators::detail::timeout::timeout_observer::timeout_subscriber_values87 timeout_subscriber_values(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
88 : timeout_values(v)
89 , cs(std::move(cs))
90 , dest(std::move(d))
91 , coordinator(std::move(c))
92 , worker(coordinator.get_worker())
93 , index(0)
94 {
95 }
96
97 composite_subscription cs;
98 dest_type dest;
99 coordinator_type coordinator;
100 rxsc::worker worker;
101 mutable std::size_t index;
102 };
103 typedef std::shared_ptr<timeout_subscriber_values> state_type;
104 state_type state;
105
timeout_observerrxcpp::operators::detail::timeout::timeout_observer106 timeout_observer(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
107 : state(std::make_shared<timeout_subscriber_values>(timeout_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
108 {
109 auto localState = state;
110
111 auto disposer = [=](const rxsc::schedulable&){
112 localState->cs.unsubscribe();
113 localState->dest.unsubscribe();
114 localState->worker.unsubscribe();
115 };
116 auto selectedDisposer = on_exception(
117 [&](){ return localState->coordinator.act(disposer); },
118 localState->dest);
119 if (selectedDisposer.empty()) {
120 return;
121 }
122
123 localState->dest.add([=](){
124 localState->worker.schedule(selectedDisposer.get());
125 });
126 localState->cs.add([=](){
127 localState->worker.schedule(selectedDisposer.get());
128 });
129
130 auto work = [v, localState](const rxsc::schedulable&) {
131 auto new_id = ++localState->index;
132 auto produce_time = localState->worker.now() + localState->period;
133
134 localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
135 };
136 auto selectedWork = on_exception(
137 [&](){return localState->coordinator.act(work);},
138 localState->dest);
139 if (selectedWork.empty()) {
140 return;
141 }
142 localState->worker.schedule(selectedWork.get());
143 }
144
produce_timeoutrxcpp::operators::detail::timeout::timeout_observer145 static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t id, state_type state) {
146 auto produce = [id, state](const rxsc::schedulable&) {
147 if(id != state->index)
148 return;
149
150 state->dest.on_error(rxu::make_error_ptr(rxcpp::timeout_error("timeout has occurred")));
151 };
152
153 auto selectedProduce = on_exception(
154 [&](){ return state->coordinator.act(produce); },
155 state->dest);
156 if (selectedProduce.empty()) {
157 return std::function<void(const rxsc::schedulable&)>();
158 }
159
160 return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
161 }
162
on_nextrxcpp::operators::detail::timeout::timeout_observer163 void on_next(T v) const {
164 auto localState = state;
165 auto work = [v, localState](const rxsc::schedulable&) {
166 auto new_id = ++localState->index;
167 auto produce_time = localState->worker.now() + localState->period;
168
169 localState->dest.on_next(v);
170 localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
171 };
172 auto selectedWork = on_exception(
173 [&](){return localState->coordinator.act(work);},
174 localState->dest);
175 if (selectedWork.empty()) {
176 return;
177 }
178 localState->worker.schedule(selectedWork.get());
179 }
180
on_errorrxcpp::operators::detail::timeout::timeout_observer181 void on_error(rxu::error_ptr e) const {
182 auto localState = state;
183 auto work = [e, localState](const rxsc::schedulable&) {
184 localState->dest.on_error(e);
185 };
186 auto selectedWork = on_exception(
187 [&](){ return localState->coordinator.act(work); },
188 localState->dest);
189 if (selectedWork.empty()) {
190 return;
191 }
192 localState->worker.schedule(selectedWork.get());
193 }
194
on_completedrxcpp::operators::detail::timeout::timeout_observer195 void on_completed() const {
196 auto localState = state;
197 auto work = [localState](const rxsc::schedulable&) {
198 localState->dest.on_completed();
199 };
200 auto selectedWork = on_exception(
201 [&](){ return localState->coordinator.act(work); },
202 localState->dest);
203 if (selectedWork.empty()) {
204 return;
205 }
206 localState->worker.schedule(selectedWork.get());
207 }
208
makerxcpp::operators::detail::timeout::timeout_observer209 static subscriber<T, observer_type> make(dest_type d, timeout_values v) {
210 auto cs = composite_subscription();
211 auto coordinator = v.coordination.create_coordinator();
212
213 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
214 }
215 };
216
217 template<class Subscriber>
operator ()rxcpp::operators::detail::timeout218 auto operator()(Subscriber dest) const
219 -> decltype(timeout_observer<Subscriber>::make(std::move(dest), initial)) {
220 return timeout_observer<Subscriber>::make(std::move(dest), initial);
221 }
222 };
223
224 }
225
226 /*! @copydoc rx-timeout.hpp
227 */
228 template<class... AN>
timeout(AN &&...an)229 auto timeout(AN&&... an)
230 -> operator_factory<timeout_tag, AN...> {
231 return operator_factory<timeout_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
232 }
233
234 }
235
236 template<>
237 struct member_overload<timeout_tag>
238 {
239 template<class Observable, class Duration,
240 class Enabled = rxu::enable_if_all_true_type_t<
241 is_observable<Observable>,
242 rxu::is_duration<Duration>>,
243 class SourceValue = rxu::value_type_t<Observable>,
244 class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
memberrxcpp::member_overload245 static auto member(Observable&& o, Duration&& d)
246 -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), identity_current_thread()))) {
247 return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), identity_current_thread()));
248 }
249
250 template<class Observable, class Coordination, class Duration,
251 class Enabled = rxu::enable_if_all_true_type_t<
252 is_observable<Observable>,
253 is_coordination<Coordination>,
254 rxu::is_duration<Duration>>,
255 class SourceValue = rxu::value_type_t<Observable>,
256 class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload257 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
258 -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
259 return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
260 }
261
262 template<class Observable, class Coordination, class Duration,
263 class Enabled = rxu::enable_if_all_true_type_t<
264 is_observable<Observable>,
265 is_coordination<Coordination>,
266 rxu::is_duration<Duration>>,
267 class SourceValue = rxu::value_type_t<Observable>,
268 class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload269 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
270 -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
271 return o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
272 }
273
274 template<class... AN>
memberrxcpp::member_overload275 static operators::detail::timeout_invalid_t<AN...> member(const AN&...) {
276 std::terminate();
277 return {};
278 static_assert(sizeof...(AN) == 10000, "timeout takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
279 }
280 };
281
282 }
283
284 #endif
285