1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-sample_time.hpp
6
7 \brief Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals.
8
9 \tparam Duration the type of time interval.
10 \tparam Coordination the type of the scheduler (optional).
11
12 \param period the period of time to sample the source observable.
13 \param coordination the scheduler for the items (optional).
14
15 \return Observable that emits the most recently emitted item since the previous sampling.
16
17 \sample
18 \snippet sample.cpp sample period sample
19 \snippet output.txt sample period sample
20 */
21
22 #if !defined(RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP)
23 #define RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP
24
25 #include "../rx-includes.hpp"
26
27 namespace rxcpp {
28
29 namespace operators {
30
31 namespace detail {
32
33 template<class... AN>
34 struct sample_with_time_invalid_arguments {};
35
36 template<class... AN>
37 struct sample_with_time_invalid : public rxo::operator_base<sample_with_time_invalid_arguments<AN...>> {
38 using type = observable<sample_with_time_invalid_arguments<AN...>, sample_with_time_invalid<AN...>>;
39 };
40 template<class... AN>
41 using sample_with_time_invalid_t = typename sample_with_time_invalid<AN...>::type;
42
43 template<class T, class Duration, class Coordination>
44 struct sample_with_time
45 {
46 typedef rxu::decay_t<T> source_value_type;
47 typedef rxu::decay_t<Coordination> coordination_type;
48 typedef typename coordination_type::coordinator_type coordinator_type;
49 typedef rxu::decay_t<Duration> duration_type;
50
51 struct sample_with_time_value
52 {
sample_with_time_valuerxcpp::operators::detail::sample_with_time::sample_with_time_value53 sample_with_time_value(duration_type p, coordination_type c)
54 : period(p)
55 , coordination(c)
56 {
57 }
58 duration_type period;
59 coordination_type coordination;
60 };
61 sample_with_time_value initial;
62
sample_with_timerxcpp::operators::detail::sample_with_time63 sample_with_time(duration_type period, coordination_type coordination)
64 : initial(period, coordination)
65 {
66 }
67
68 template<class Subscriber>
69 struct sample_with_time_observer
70 {
71 typedef sample_with_time_observer<Subscriber> this_type;
72 typedef T value_type;
73 typedef rxu::decay_t<Subscriber> dest_type;
74 typedef observer<value_type, this_type> observer_type;
75
76 struct sample_with_time_subscriber_value : public sample_with_time_value
77 {
sample_with_time_subscriber_valuerxcpp::operators::detail::sample_with_time::sample_with_time_observer::sample_with_time_subscriber_value78 sample_with_time_subscriber_value(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
79 : sample_with_time_value(v)
80 , cs(std::move(cs))
81 , dest(std::move(d))
82 , coordinator(std::move(c))
83 , worker(coordinator.get_worker())
84 {
85 }
86 composite_subscription cs;
87 dest_type dest;
88 coordinator_type coordinator;
89 rxsc::worker worker;
90 mutable rxu::maybe<value_type> value;
91 };
92 std::shared_ptr<sample_with_time_subscriber_value> state;
93
sample_with_time_observerrxcpp::operators::detail::sample_with_time::sample_with_time_observer94 sample_with_time_observer(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
95 : state(std::make_shared<sample_with_time_subscriber_value>(sample_with_time_subscriber_value(std::move(cs), std::move(d), v, std::move(c))))
96 {
97 auto localState = state;
98
99 auto disposer = [=](const rxsc::schedulable&){
100 localState->cs.unsubscribe();
101 localState->dest.unsubscribe();
102 localState->worker.unsubscribe();
103 };
104 auto selectedDisposer = on_exception(
105 [&](){ return localState->coordinator.act(disposer); },
106 localState->dest);
107 if (selectedDisposer.empty()) {
108 return;
109 }
110
111 localState->dest.add([=](){
112 localState->worker.schedule(selectedDisposer.get());
113 });
114 localState->cs.add([=](){
115 localState->worker.schedule(selectedDisposer.get());
116 });
117
118 auto produce_sample = [localState](const rxsc::schedulable&) {
119 if(!localState->value.empty()) {
120 localState->dest.on_next(*localState->value);
121 localState->value.reset();
122 }
123 };
124 auto selectedProduce = on_exception(
125 [&](){ return localState->coordinator.act(produce_sample); },
126 localState->dest);
127 if (selectedProduce.empty()) {
128 return;
129 }
130
131 state->worker.schedule_periodically(
132 localState->worker.now(),
133 localState->period,
134 [localState, selectedProduce](const rxsc::schedulable&) {
135 localState->worker.schedule(selectedProduce.get());
136 });
137 }
138
on_nextrxcpp::operators::detail::sample_with_time::sample_with_time_observer139 void on_next(T v) const {
140 auto localState = state;
141 auto work = [v, localState](const rxsc::schedulable&) {
142 localState->value.reset(v);
143 };
144 auto selectedWork = on_exception(
145 [&](){ return localState->coordinator.act(work); },
146 localState->dest);
147 if (selectedWork.empty()) {
148 return;
149 }
150 localState->worker.schedule(selectedWork.get());
151 }
152
on_errorrxcpp::operators::detail::sample_with_time::sample_with_time_observer153 void on_error(rxu::error_ptr e) const {
154 auto localState = state;
155 auto work = [e, localState](const rxsc::schedulable&) {
156 localState->dest.on_error(e);
157 };
158 auto selectedWork = on_exception(
159 [&](){ return localState->coordinator.act(work); },
160 localState->dest);
161 if (selectedWork.empty()) {
162 return;
163 }
164 localState->worker.schedule(selectedWork.get());
165 }
166
on_completedrxcpp::operators::detail::sample_with_time::sample_with_time_observer167 void on_completed() const {
168 auto localState = state;
169 auto work = [localState](const rxsc::schedulable&) {
170 localState->dest.on_completed();
171 };
172 auto selectedWork = on_exception(
173 [&](){ return localState->coordinator.act(work); },
174 localState->dest);
175 if (selectedWork.empty()) {
176 return;
177 }
178 localState->worker.schedule(selectedWork.get());
179 }
180
makerxcpp::operators::detail::sample_with_time::sample_with_time_observer181 static subscriber<T, observer<T, this_type>> make(dest_type d, sample_with_time_value v) {
182 auto cs = composite_subscription();
183 auto coordinator = v.coordination.create_coordinator();
184
185 return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
186 }
187 };
188
189 template<class Subscriber>
operator ()rxcpp::operators::detail::sample_with_time190 auto operator()(Subscriber dest) const
191 -> decltype(sample_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
192 return sample_with_time_observer<Subscriber>::make(std::move(dest), initial);
193 }
194 };
195
196 }
197
198 /*! @copydoc rx-sample_time.hpp
199 */
200 template<class... AN>
sample_with_time(AN &&...an)201 auto sample_with_time(AN&&... an)
202 -> operator_factory<sample_with_time_tag, AN...> {
203 return operator_factory<sample_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
204 }
205
206 }
207
208 template<>
209 struct member_overload<sample_with_time_tag>
210 {
211 template<class Observable, class Duration,
212 class Enabled = rxu::enable_if_all_true_type_t<
213 is_observable<Observable>,
214 rxu::is_duration<Duration>>,
215 class SourceValue = rxu::value_type_t<Observable>,
216 class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
memberrxcpp::member_overload217 static auto member(Observable&& o, Duration&& d)
218 -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), identity_current_thread()))) {
219 return o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), identity_current_thread()));
220 }
221
222 template<class Observable, class Coordination, class Duration,
223 class Enabled = rxu::enable_if_all_true_type_t<
224 is_observable<Observable>,
225 is_coordination<Coordination>,
226 rxu::is_duration<Duration>>,
227 class SourceValue = rxu::value_type_t<Observable>,
228 class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload229 static auto member(Observable&& o, Coordination&& cn, Duration&& d)
230 -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
231 return o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)));
232 }
233
234 template<class Observable, class Coordination, class Duration,
235 class Enabled = rxu::enable_if_all_true_type_t<
236 is_observable<Observable>,
237 is_coordination<Coordination>,
238 rxu::is_duration<Duration>>,
239 class SourceValue = rxu::value_type_t<Observable>,
240 class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload241 static auto member(Observable&& o, Duration&& d, Coordination&& cn)
242 -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
243 return o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)));
244 }
245
246 template<class... AN>
memberrxcpp::member_overload247 static operators::detail::sample_with_time_invalid_t<AN...> member(const AN&...) {
248 std::terminate();
249 return {};
250 static_assert(sizeof...(AN) == 10000, "sample_with_time takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
251 }
252 };
253
254 }
255
256 #endif
257