1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-delay.hpp
6
7 \brief Return an observable that emits each item emitted by the source observable after the specified delay.
8
9 \tparam Duration the type of time interval
10 \tparam Coordination the type of the scheduler
11
12 \param period the period of time each item is delayed
13 \param coordination the scheduler for the delays
14
15 \return Observable that emits each item emitted by the source observable after the specified delay.
16
17 \sample
18 \snippet delay.cpp delay period+coordination sample
19 \snippet output.txt delay period+coordination sample
20 */
21
22 #if !defined(RXCPP_OPERATORS_RX_DELAY_HPP)
23 #define RXCPP_OPERATORS_RX_DELAY_HPP
24
25 #include "../rx-includes.hpp"
26
27 namespace rxcpp {
28
29 namespace operators {
30
31 namespace detail {
32
33 template<class... AN>
34 struct delay_invalid_arguments {};
35
36 template<class... AN>
37 struct delay_invalid : public rxo::operator_base<delay_invalid_arguments<AN...>> {
38 using type = observable<delay_invalid_arguments<AN...>, delay_invalid<AN...>>;
39 };
40 template<class... AN>
41 using delay_invalid_t = typename delay_invalid<AN...>::type;
42
43 template<class T, class Duration, class Coordination>
44 struct delay
45 {
46 typedef rxu::decay_t<T> source_value_type;
47 typedef rxu::decay_t<Coordination> coordination_type;
48 typedef typename coordination_type::coordinator_type coordinator_type;
49 typedef rxu::decay_t<Duration> duration_type;
50
51 struct delay_values
52 {
delay_valuesrxcpp::operators::detail::delay::delay_values53 delay_values(duration_type p, coordination_type c)
54 : period(p)
55 , coordination(c)
56 {
57 }
58 duration_type period;
59 coordination_type coordination;
60 };
61 delay_values initial;
62
delayrxcpp::operators::detail::delay63 delay(duration_type period, coordination_type coordination)
64 : initial(period, coordination)
65 {
66 }
67
68 template<class Subscriber>
69 struct delay_observer
70 {
71 typedef delay_observer<Subscriber> this_type;
72 typedef rxu::decay_t<T> value_type;
73 typedef rxu::decay_t<Subscriber> dest_type;
74 typedef observer<T, this_type> observer_type;
75
76 struct delay_subscriber_values : public delay_values
77 {
delay_subscriber_valuesrxcpp::operators::detail::delay::delay_observer::delay_subscriber_values78 delay_subscriber_values(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
79 : delay_values(v)
80 , cs(std::move(cs))
81 , dest(std::move(d))
82 , coordinator(std::move(c))
83 , worker(coordinator.get_worker())
84 , expected(worker.now())
85 {
86 }
87 composite_subscription cs;
88 dest_type dest;
89 coordinator_type coordinator;
90 rxsc::worker worker;
91 rxsc::scheduler::clock_type::time_point expected;
92 };
93 std::shared_ptr<delay_subscriber_values> state;
94
delay_observerrxcpp::operators::detail::delay::delay_observer95 delay_observer(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
96 : state(std::make_shared<delay_subscriber_values>(delay_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
97 {
98 auto localState = state;
99
100 auto disposer = [=](const rxsc::schedulable&){
101 localState->cs.unsubscribe();
102 localState->dest.unsubscribe();
103 localState->worker.unsubscribe();
104 };
105 auto selectedDisposer = on_exception(
106 [&](){return localState->coordinator.act(disposer);},
107 localState->dest);
108 if (selectedDisposer.empty()) {
109 return;
110 }
111
112 localState->dest.add([=](){
113 localState->worker.schedule(selectedDisposer.get());
114 });
115 localState->cs.add([=](){
116 localState->worker.schedule(localState->worker.now() + localState->period, selectedDisposer.get());
117 });
118 }
119
on_nextrxcpp::operators::detail::delay::delay_observer120 void on_next(T v) const {
121 auto localState = state;
122 auto work = [v, localState](const rxsc::schedulable&){
123 localState->dest.on_next(v);
124 };
125 auto selectedWork = on_exception(
126 [&](){return localState->coordinator.act(work);},
127 localState->dest);
128 if (selectedWork.empty()) {
129 return;
130 }
131 localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
132 }
133
on_errorrxcpp::operators::detail::delay::delay_observer134 void on_error(rxu::error_ptr e) const {
135 auto localState = state;
136 auto work = [e, localState](const rxsc::schedulable&){
137 localState->dest.on_error(e);
138 };
139 auto selectedWork = on_exception(
140 [&](){return localState->coordinator.act(work);},
141 localState->dest);
142 if (selectedWork.empty()) {
143 return;
144 }
145 localState->worker.schedule(selectedWork.get());
146 }
147
on_completedrxcpp::operators::detail::delay::delay_observer148 void on_completed() const {
149 auto localState = state;
150 auto work = [localState](const rxsc::schedulable&){
151 localState->dest.on_completed();
152 };
153 auto selectedWork = on_exception(
154 [&](){return localState->coordinator.act(work);},
155 localState->dest);
156 if (selectedWork.empty()) {
157 return;
158 }
159 localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
160 }
161
makerxcpp::operators::detail::delay::delay_observer162 static subscriber<T, observer_type> make(dest_type d, delay_values v) {
163 auto cs = composite_subscription();
164 auto coordinator = v.coordination.create_coordinator();
165
166 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
167 }
168 };
169
170 template<class Subscriber>
operator ()rxcpp::operators::detail::delay171 auto operator()(Subscriber dest) const
172 -> decltype(delay_observer<Subscriber>::make(std::move(dest), initial)) {
173 return delay_observer<Subscriber>::make(std::move(dest), initial);
174 }
175 };
176
177 }
178
179 /*! @copydoc rx-delay.hpp
180 */
181 template<class... AN>
delay(AN &&...an)182 auto delay(AN&&... an)
183 -> operator_factory<delay_tag, AN...> {
184 return operator_factory<delay_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
185 }
186
187 }
188
189 template<>
190 struct member_overload<delay_tag>
191 {
192 template<class Observable, class Duration,
193 class Enabled = rxu::enable_if_all_true_type_t<
194 is_observable<Observable>,
195 rxu::is_duration<Duration>>,
196 class SourceValue = rxu::value_type_t<Observable>,
197 class delay = rxo::detail::delay<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
memberrxcpp::member_overload198 static auto member(Observable&& o, Duration&& d)
199 -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), identity_current_thread()))) {
200 return o.template lift<SourceValue>(delay(std::forward<Duration>(d), identity_current_thread()));
201 }
202
203 template<class Observable, class Coordination, class Duration,
204 class Enabled = rxu::enable_if_all_true_type_t<
205 is_observable<Observable>,
206 is_coordination<Coordination>,
207 rxu::is_duration<Duration>>,
208 class SourceValue = rxu::value_type_t<Observable>,
209 class delay = rxo::detail::delay<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload210 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
211 -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
212 return o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)));
213 }
214
215 template<class Observable, class Coordination, class Duration,
216 class Enabled = rxu::enable_if_all_true_type_t<
217 is_observable<Observable>,
218 is_coordination<Coordination>,
219 rxu::is_duration<Duration>>,
220 class SourceValue = rxu::value_type_t<Observable>,
221 class delay = rxo::detail::delay<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload222 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
223 -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
224 return o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)));
225 }
226
227 template<class... AN>
memberrxcpp::member_overload228 static operators::detail::delay_invalid_t<AN...> member(const AN&...) {
229 std::terminate();
230 return {};
231 static_assert(sizeof...(AN) == 10000, "delay takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
232 }
233 };
234
235 }
236
237 #endif
238