1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-debounce.hpp
6
7 \brief Return an observable that emits an item if a particular timespan has passed without emitting another item from the source observable.
8
9 \tparam Duration the type of the time interval
10 \tparam Coordination the type of the scheduler
11
12 \param period the period of time to suppress any emitted items
13 \param coordination the scheduler to manage timeout for each event
14
15 \return Observable that emits an item if a particular timespan has passed without emitting another item from the source observable.
16
17 \sample
18 \snippet debounce.cpp debounce sample
19 \snippet output.txt debounce sample
20 */
21
22 #if !defined(RXCPP_OPERATORS_RX_DEBOUNCE_HPP)
23 #define RXCPP_OPERATORS_RX_DEBOUNCE_HPP
24
25 #include "../rx-includes.hpp"
26
27 namespace rxcpp {
28
29 namespace operators {
30
31 namespace detail {
32
33 template<class... AN>
34 struct debounce_invalid_arguments {};
35
36 template<class... AN>
37 struct debounce_invalid : public rxo::operator_base<debounce_invalid_arguments<AN...>> {
38 using type = observable<debounce_invalid_arguments<AN...>, debounce_invalid<AN...>>;
39 };
40 template<class... AN>
41 using debounce_invalid_t = typename debounce_invalid<AN...>::type;
42
43 template<class T, class Duration, class Coordination>
44 struct debounce
45 {
46 typedef rxu::decay_t<T> source_value_type;
47 typedef rxu::decay_t<Coordination> coordination_type;
48 typedef typename coordination_type::coordinator_type coordinator_type;
49 typedef rxu::decay_t<Duration> duration_type;
50
51 struct debounce_values
52 {
debounce_valuesrxcpp::operators::detail::debounce::debounce_values53 debounce_values(duration_type p, coordination_type c)
54 : period(p)
55 , coordination(c)
56 {
57 }
58
59 duration_type period;
60 coordination_type coordination;
61 };
62 debounce_values initial;
63
debouncerxcpp::operators::detail::debounce64 debounce(duration_type period, coordination_type coordination)
65 : initial(period, coordination)
66 {
67 }
68
69 template<class Subscriber>
70 struct debounce_observer
71 {
72 typedef debounce_observer<Subscriber> this_type;
73 typedef rxu::decay_t<T> value_type;
74 typedef rxu::decay_t<Subscriber> dest_type;
75 typedef observer<T, this_type> observer_type;
76
77 struct debounce_subscriber_values : public debounce_values
78 {
debounce_subscriber_valuesrxcpp::operators::detail::debounce::debounce_observer::debounce_subscriber_values79 debounce_subscriber_values(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
80 : debounce_values(v)
81 , cs(std::move(cs))
82 , dest(std::move(d))
83 , coordinator(std::move(c))
84 , worker(coordinator.get_worker())
85 , index(0)
86 {
87 }
88
89 composite_subscription cs;
90 dest_type dest;
91 coordinator_type coordinator;
92 rxsc::worker worker;
93 mutable std::size_t index;
94 mutable rxu::maybe<value_type> value;
95 };
96 typedef std::shared_ptr<debounce_subscriber_values> state_type;
97 state_type state;
98
debounce_observerrxcpp::operators::detail::debounce::debounce_observer99 debounce_observer(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
100 : state(std::make_shared<debounce_subscriber_values>(debounce_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
101 {
102 auto localState = state;
103
104 auto disposer = [=](const rxsc::schedulable&){
105 localState->cs.unsubscribe();
106 localState->dest.unsubscribe();
107 localState->worker.unsubscribe();
108 };
109 auto selectedDisposer = on_exception(
110 [&](){ return localState->coordinator.act(disposer); },
111 localState->dest);
112 if (selectedDisposer.empty()) {
113 return;
114 }
115
116 localState->dest.add([=](){
117 localState->worker.schedule(selectedDisposer.get());
118 });
119 localState->cs.add([=](){
120 localState->worker.schedule(selectedDisposer.get());
121 });
122 }
123
produce_itemrxcpp::operators::detail::debounce::debounce_observer124 static std::function<void(const rxsc::schedulable&)> produce_item(std::size_t id, state_type state) {
125 auto produce = [id, state](const rxsc::schedulable&) {
126 if(id != state->index)
127 return;
128
129 state->dest.on_next(*state->value);
130 state->value.reset();
131 };
132
133 auto selectedProduce = on_exception(
134 [&](){ return state->coordinator.act(produce); },
135 state->dest);
136 if (selectedProduce.empty()) {
137 return std::function<void(const rxsc::schedulable&)>();
138 }
139
140 return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
141 }
142
on_nextrxcpp::operators::detail::debounce::debounce_observer143 void on_next(T v) const {
144 auto localState = state;
145 auto work = [v, localState](const rxsc::schedulable&) {
146 auto new_id = ++localState->index;
147 auto produce_time = localState->worker.now() + localState->period;
148
149 localState->value.reset(v);
150 localState->worker.schedule(produce_time, produce_item(new_id, localState));
151 };
152 auto selectedWork = on_exception(
153 [&](){return localState->coordinator.act(work);},
154 localState->dest);
155 if (selectedWork.empty()) {
156 return;
157 }
158 localState->worker.schedule(selectedWork.get());
159 }
160
on_errorrxcpp::operators::detail::debounce::debounce_observer161 void on_error(rxu::error_ptr e) const {
162 auto localState = state;
163 auto work = [e, localState](const rxsc::schedulable&) {
164 localState->dest.on_error(e);
165 localState->value.reset();
166 };
167 auto selectedWork = on_exception(
168 [&](){ return localState->coordinator.act(work); },
169 localState->dest);
170 if (selectedWork.empty()) {
171 return;
172 }
173 localState->worker.schedule(selectedWork.get());
174 }
175
on_completedrxcpp::operators::detail::debounce::debounce_observer176 void on_completed() const {
177 auto localState = state;
178 auto work = [localState](const rxsc::schedulable&) {
179 if(!localState->value.empty()) {
180 localState->dest.on_next(*localState->value);
181 }
182 localState->dest.on_completed();
183 };
184 auto selectedWork = on_exception(
185 [&](){ return localState->coordinator.act(work); },
186 localState->dest);
187 if (selectedWork.empty()) {
188 return;
189 }
190 localState->worker.schedule(selectedWork.get());
191 }
192
makerxcpp::operators::detail::debounce::debounce_observer193 static subscriber<T, observer_type> make(dest_type d, debounce_values v) {
194 auto cs = composite_subscription();
195 auto coordinator = v.coordination.create_coordinator();
196
197 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
198 }
199 };
200
201 template<class Subscriber>
operator ()rxcpp::operators::detail::debounce202 auto operator()(Subscriber dest) const
203 -> decltype(debounce_observer<Subscriber>::make(std::move(dest), initial)) {
204 return debounce_observer<Subscriber>::make(std::move(dest), initial);
205 }
206 };
207
208 }
209
210 /*! @copydoc rx-debounce.hpp
211 */
212 template<class... AN>
debounce(AN &&...an)213 auto debounce(AN&&... an)
214 -> operator_factory<debounce_tag, AN...> {
215 return operator_factory<debounce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
216 }
217
218 }
219
220 template<>
221 struct member_overload<debounce_tag>
222 {
223 template<class Observable, class Duration,
224 class Enabled = rxu::enable_if_all_true_type_t<
225 is_observable<Observable>,
226 rxu::is_duration<Duration>>,
227 class SourceValue = rxu::value_type_t<Observable>,
228 class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
memberrxcpp::member_overload229 static auto member(Observable&& o, Duration&& d)
230 -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), identity_current_thread()))) {
231 return o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), identity_current_thread()));
232 }
233
234 template<class Observable, class Coordination, class Duration,
235 class Enabled = rxu::enable_if_all_true_type_t<
236 is_observable<Observable>,
237 is_coordination<Coordination>,
238 rxu::is_duration<Duration>>,
239 class SourceValue = rxu::value_type_t<Observable>,
240 class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload241 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
242 -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
243 return o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)));
244 }
245
246 template<class Observable, class Coordination, class Duration,
247 class Enabled = rxu::enable_if_all_true_type_t<
248 is_observable<Observable>,
249 is_coordination<Coordination>,
250 rxu::is_duration<Duration>>,
251 class SourceValue = rxu::value_type_t<Observable>,
252 class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload253 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
254 -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
255 return o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)));
256 }
257
258 template<class... AN>
memberrxcpp::member_overload259 static operators::detail::debounce_invalid_t<AN...> member(const AN&...) {
260 std::terminate();
261 return {};
262 static_assert(sizeof...(AN) == 10000, "debounce takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
263 }
264 };
265
266 }
267
268 #endif
269